Skip to content

Commit

Permalink
WaitFor Milvus (#5707)
Browse files Browse the repository at this point in the history
* Add waitfor milvus

* Update src/Aspire.Hosting.Milvus/MilvusBuilderExtensions.cs

Co-authored-by: David Fowler <davidfowl@gmail.com>

* Fix healthchecks and address PR feedback

---------

Co-authored-by: David Fowler <davidfowl@gmail.com>
  • Loading branch information
Alirexaa and davidfowl committed Sep 17, 2024
1 parent f177eb5 commit 8fd65d2
Show file tree
Hide file tree
Showing 4 changed files with 217 additions and 8 deletions.
12 changes: 11 additions & 1 deletion src/Aspire.Hosting.Milvus/Aspire.Hosting.Milvus.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
<PackageTags>aspire integration hosting milvus database vector search</PackageTags>
<Description>Milvus vector database support for .NET Aspire.</Description>
<PackageIconFullPath>$(SharedDir)Milvus_256x.png</PackageIconFullPath>
<NoWarn>$(NoWarn);CS8002</NoWarn><!-- Milvus.Client packages are not signed -->

<!-- Disable package validation as this package hasn't shipped yet. -->
<EnablePackageValidation>false</EnablePackageValidation>
</PropertyGroup>
Expand All @@ -17,10 +19,18 @@
<ItemGroup>
<Compile Include="$(SharedDir)StringComparers.cs" Link="Utils\StringComparers.cs" />
<Compile Include="$(SharedDir)VolumeNameGenerator.cs" Link="Utils\VolumeNameGenerator.cs" />
<Compile Include="$(ComponentsDir)Aspire.Milvus.Client\MilvusHealthCheck.cs" Link="MilvusHealthCheck.cs"></Compile>
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\Aspire.Hosting\Aspire.Hosting.csproj" />
</ItemGroup>

<ItemGroup>
<InternalsVisibleTo Include="Aspire.Hosting.Milvus.Tests"></InternalsVisibleTo>
</ItemGroup>

<ItemGroup>
<PackageReference Include="Milvus.Client" />
</ItemGroup>
</Project>
115 changes: 112 additions & 3 deletions src/Aspire.Hosting.Milvus/MilvusBuilderExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System.Data.Common;
using Aspire.Hosting.ApplicationModel;
using Aspire.Hosting.Milvus;
using Aspire.Hosting.Utils;
using Aspire.Milvus.Client;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Diagnostics.HealthChecks;
using Microsoft.Extensions.Logging;
using Milvus.Client;

namespace Aspire.Hosting;

Expand Down Expand Up @@ -51,6 +57,43 @@ public static IResourceBuilder<MilvusServerResource> AddMilvus(this IDistributed

var milvus = new MilvusServerResource(name, apiKeyParameter);

string? connectionString = null;
MilvusClient? milvusClient = null;

builder.Eventing.Subscribe<ConnectionStringAvailableEvent>(milvus, async (@event, ct) =>
{
connectionString = await milvus.ConnectionStringExpression.GetValueAsync(ct).ConfigureAwait(false);
if (connectionString is null)
{
throw new DistributedApplicationException($"ConnectionStringAvailableEvent was published for the '{milvus.Name}' resource but the connection string was null.");
}
milvusClient = CreateMilvusClient(@event.Services, connectionString);
var lookup = builder.Resources.OfType<MilvusDatabaseResource>().ToDictionary(d => d.Name);
foreach (var databaseName in milvus.Databases)
{
if (!lookup.TryGetValue(databaseName.Key, out var databaseResource))
{
throw new DistributedApplicationException($"Database resource '{databaseName}' under Milvus Server resource '{milvus.Name}' was not found in the model.");
}
var connectionStringAvailableEvent = new ConnectionStringAvailableEvent(databaseResource, @event.Services);
await builder.Eventing.PublishAsync<ConnectionStringAvailableEvent>(connectionStringAvailableEvent, ct).ConfigureAwait(false);
var beforeResourceStartedEvent = new BeforeResourceStartedEvent(databaseResource, @event.Services);
await builder.Eventing.PublishAsync(beforeResourceStartedEvent, ct).ConfigureAwait(false);
}
});

var healthCheckKey = $"{name}_check";
// TODO: Use health check from AspNetCore.Diagnostics.HealthChecks once it's implemented via this issue:
// https://github.com/Xabaril/AspNetCore.Diagnostics.HealthChecks/issues/2214
builder.Services.AddHealthChecks()
.Add(new HealthCheckRegistration(
healthCheckKey,
sp => new MilvusHealthCheck(milvusClient!),
failureStatus: default,
tags: default,
timeout: default));

return builder.AddResource(milvus)
.WithImage(MilvusContainerImageTags.Image, MilvusContainerImageTags.Tag)
.WithImageRegistry(MilvusContainerImageTags.Registry)
Expand All @@ -67,7 +110,8 @@ public static IResourceBuilder<MilvusServerResource> AddMilvus(this IDistributed
{
ctx.EnvironmentVariables["COMMON_SECURITY_DEFAULTROOTPASSWORD"] = milvus.ApiKeyParameter;
})
.WithArgs("milvus", "run", "standalone");
.WithArgs("milvus", "run", "standalone")
.WithHealthCheck(healthCheckKey);
}

/// <summary>
Expand Down Expand Up @@ -101,8 +145,33 @@ public static IResourceBuilder<MilvusDatabaseResource> AddDatabase(this IResourc
databaseName ??= name;

builder.Resource.AddDatabase(name, databaseName);
var milvusResource = new MilvusDatabaseResource(name, databaseName, builder.Resource);
return builder.ApplicationBuilder.AddResource(milvusResource);
var milvusDatabaseResource = new MilvusDatabaseResource(name, databaseName, builder.Resource);

string? connectionString = null;
MilvusClient? milvusClient = null;
builder.ApplicationBuilder.Eventing.Subscribe<ConnectionStringAvailableEvent>(milvusDatabaseResource, async (@event, ct) =>
{
connectionString = await milvusDatabaseResource.ConnectionStringExpression.GetValueAsync(ct).ConfigureAwait(false);
if (connectionString == null)
{
throw new DistributedApplicationException($"ConnectionStringAvailableEvent was published for the '{milvusDatabaseResource.Name}' resource but the connection string was null.");
}
milvusClient = CreateMilvusClient(@event.Services, connectionString);
});

var healthCheckKey = $"{name}_check";
// TODO: Use health check from AspNetCore.Diagnostics.HealthChecks once it's implemented via this issue:
// https://github.com/Xabaril/AspNetCore.Diagnostics.HealthChecks/issues/2214
builder.ApplicationBuilder.Services.AddHealthChecks()
.Add(new HealthCheckRegistration(
healthCheckKey,
sp => new MilvusHealthCheck(milvusClient!),
failureStatus: default,
tags: default,
timeout: default));

return builder.ApplicationBuilder.AddResource(milvusDatabaseResource)
.WithHealthCheck(healthCheckKey);
}

/// <summary>
Expand Down Expand Up @@ -190,4 +259,44 @@ private static void ConfigureAttuContainer(EnvironmentCallbackContext context, M
// This will need to be refactored once updated service discovery APIs are available
context.EnvironmentVariables.Add("MILVUS_URL", $"{resource.PrimaryEndpoint.Scheme}://{resource.Name}:{resource.PrimaryEndpoint.TargetPort}");
}
internal static MilvusClient CreateMilvusClient(IServiceProvider sp, string? connectionString)
{
if (connectionString is null)
{
throw new InvalidOperationException("Connection string is unavailable");
}

Uri? endpoint = null;
string? key = null;
string? database = null;

if (Uri.TryCreate(connectionString, UriKind.Absolute, out var uri))
{
endpoint = uri;
}
else
{
var connectionBuilder = new DbConnectionStringBuilder
{
ConnectionString = connectionString
};

if (connectionBuilder.ContainsKey("Endpoint") && Uri.TryCreate(connectionBuilder["Endpoint"].ToString(), UriKind.Absolute, out var serviceUri))
{
endpoint = serviceUri;
}

if (connectionBuilder.ContainsKey("Key"))
{
key = connectionBuilder["Key"].ToString();
}

if (connectionBuilder.ContainsKey("Database"))
{
database = connectionBuilder["Database"].ToString();
}
}

return new MilvusClient(endpoint!, apiKey: key!, database: database, loggerFactory: sp.GetRequiredService<ILoggerFactory>());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,4 @@
<PackageReference Include="Microsoft.Extensions.Http.Resilience" />
</ItemGroup>

<ItemGroup>
<Compile Include="$(RepoRoot)src\Aspire.Hosting.Milvus\MilvusContainerImageTags.cs" />
<Compile Include="$(SharedDir)VolumeNameGenerator.cs" Link="Utils\VolumeNameGenerator.cs" />
</ItemGroup>
</Project>
94 changes: 94 additions & 0 deletions tests/Aspire.Hosting.Milvus.Tests/MilvusFunctionalTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@
// The .NET Foundation licenses this file to you under the MIT license.

using Aspire.Components.Common.Tests;
using Aspire.Hosting.ApplicationModel;
using Aspire.Hosting.Utils;
using Grpc.Core;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Diagnostics.HealthChecks;
using Microsoft.Extensions.Hosting;
using Milvus.Client;
using Polly;
Expand Down Expand Up @@ -229,4 +231,96 @@ await pipeline.ExecuteAsync(
}
}
}

[Fact]
[RequiresDocker]
public async Task VerifyWaitForOnMilvusBlocksDependentResources()
{
var cts = new CancellationTokenSource(TimeSpan.FromMinutes(3));
using var builder = TestDistributedApplicationBuilder.CreateWithTestContainerRegistry(testOutputHelper);

var healthCheckTcs = new TaskCompletionSource<HealthCheckResult>();
builder.Services.AddHealthChecks().AddAsyncCheck("blocking_check", () =>
{
return healthCheckTcs.Task;
});

var resource = builder.AddMilvus("resource")
.WithHealthCheck("blocking_check");

var dependentResource = builder.AddMilvus("dependentresource")
.WaitFor(resource);

using var app = builder.Build();

var pendingStart = app.StartAsync(cts.Token);

var rns = app.Services.GetRequiredService<ResourceNotificationService>();

await rns.WaitForResourceAsync(resource.Resource.Name, KnownResourceStates.Running, cts.Token);

await rns.WaitForResourceAsync(dependentResource.Resource.Name, KnownResourceStates.Waiting, cts.Token);

healthCheckTcs.SetResult(HealthCheckResult.Healthy());

await rns.WaitForResourceAsync(resource.Resource.Name, (re => re.Snapshot.HealthStatus == HealthStatus.Healthy), cts.Token);

await rns.WaitForResourceAsync(dependentResource.Resource.Name, KnownResourceStates.Running, cts.Token);

await pendingStart;

await app.StopAsync();
}

[Fact]
[RequiresDocker]
public async Task VerifyWaitForOnMilvusDatabaseBlocksDependentResources()
{
var cts = new CancellationTokenSource(TimeSpan.FromMinutes(5));
using var builder = TestDistributedApplicationBuilder.CreateWithTestContainerRegistry(testOutputHelper);

var healthCheckTcs = new TaskCompletionSource<HealthCheckResult>();
builder.Services.AddHealthChecks().AddAsyncCheck("blocking_check", () =>
{
return healthCheckTcs.Task;
});

var resource = builder.AddMilvus("resource")
.WithHealthCheck("blocking_check");

var db = resource.AddDatabase("db");

var dependentResource = builder.AddMilvus("dependentresource")
.WaitFor(db);

using var app = builder.Build();

var pendingStart = app.StartAsync(cts.Token);

var rns = app.Services.GetRequiredService<ResourceNotificationService>();

await rns.WaitForResourceAsync(resource.Resource.Name, KnownResourceStates.Running, cts.Token);

await rns.WaitForResourceAsync(db.Resource.Name, KnownResourceStates.Running, cts.Token);

await rns.WaitForResourceAsync(dependentResource.Resource.Name, KnownResourceStates.Waiting, cts.Token);

healthCheckTcs.SetResult(HealthCheckResult.Healthy());

await rns.WaitForResourceAsync(resource.Resource.Name, (re => re.Snapshot.HealthStatus == HealthStatus.Healthy), cts.Token);

// Create the database.
var connectionString = await resource.Resource.ConnectionStringExpression.GetValueAsync(cts.Token);
var milvusClient = MilvusBuilderExtensions.CreateMilvusClient(app.Services, connectionString);
await milvusClient.CreateDatabaseAsync(db.Resource.Name);

await rns.WaitForResourceAsync(db.Resource.Name, re => re.Snapshot.HealthStatus == HealthStatus.Healthy, cts.Token);

await rns.WaitForResourceAsync(dependentResource.Resource.Name, KnownResourceStates.Running, cts.Token);

await pendingStart;

await app.StopAsync();
}

}

0 comments on commit 8fd65d2

Please sign in to comment.