diff --git a/src/Aspire.Hosting.Milvus/Aspire.Hosting.Milvus.csproj b/src/Aspire.Hosting.Milvus/Aspire.Hosting.Milvus.csproj
index 874d012244..4e9e2700c2 100644
--- a/src/Aspire.Hosting.Milvus/Aspire.Hosting.Milvus.csproj
+++ b/src/Aspire.Hosting.Milvus/Aspire.Hosting.Milvus.csproj
@@ -6,6 +6,8 @@
aspire integration hosting milvus database vector search
Milvus vector database support for .NET Aspire.
$(SharedDir)Milvus_256x.png
+ $(NoWarn);CS8002
+
false
@@ -17,10 +19,18 @@
+
-
+
+
+
+
+
+
+
+
diff --git a/src/Aspire.Hosting.Milvus/MilvusBuilderExtensions.cs b/src/Aspire.Hosting.Milvus/MilvusBuilderExtensions.cs
index a7de4edec5..098bc61969 100644
--- a/src/Aspire.Hosting.Milvus/MilvusBuilderExtensions.cs
+++ b/src/Aspire.Hosting.Milvus/MilvusBuilderExtensions.cs
@@ -1,9 +1,15 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
+using System.Data.Common;
using Aspire.Hosting.ApplicationModel;
using Aspire.Hosting.Milvus;
using Aspire.Hosting.Utils;
+using Aspire.Milvus.Client;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Diagnostics.HealthChecks;
+using Microsoft.Extensions.Logging;
+using Milvus.Client;
namespace Aspire.Hosting;
@@ -51,6 +57,43 @@ public static IResourceBuilder AddMilvus(this IDistributed
var milvus = new MilvusServerResource(name, apiKeyParameter);
+ string? connectionString = null;
+ MilvusClient? milvusClient = null;
+
+ builder.Eventing.Subscribe(milvus, async (@event, ct) =>
+ {
+ connectionString = await milvus.ConnectionStringExpression.GetValueAsync(ct).ConfigureAwait(false);
+ if (connectionString is null)
+ {
+ throw new DistributedApplicationException($"ConnectionStringAvailableEvent was published for the '{milvus.Name}' resource but the connection string was null.");
+ }
+ milvusClient = CreateMilvusClient(@event.Services, connectionString);
+ var lookup = builder.Resources.OfType().ToDictionary(d => d.Name);
+ foreach (var databaseName in milvus.Databases)
+ {
+ if (!lookup.TryGetValue(databaseName.Key, out var databaseResource))
+ {
+ throw new DistributedApplicationException($"Database resource '{databaseName}' under Milvus Server resource '{milvus.Name}' was not found in the model.");
+ }
+ var connectionStringAvailableEvent = new ConnectionStringAvailableEvent(databaseResource, @event.Services);
+ await builder.Eventing.PublishAsync(connectionStringAvailableEvent, ct).ConfigureAwait(false);
+
+ var beforeResourceStartedEvent = new BeforeResourceStartedEvent(databaseResource, @event.Services);
+ await builder.Eventing.PublishAsync(beforeResourceStartedEvent, ct).ConfigureAwait(false);
+ }
+ });
+
+ var healthCheckKey = $"{name}_check";
+ // TODO: Use health check from AspNetCore.Diagnostics.HealthChecks once it's implemented via this issue:
+ // https://github.com/Xabaril/AspNetCore.Diagnostics.HealthChecks/issues/2214
+ builder.Services.AddHealthChecks()
+ .Add(new HealthCheckRegistration(
+ healthCheckKey,
+ sp => new MilvusHealthCheck(milvusClient!),
+ failureStatus: default,
+ tags: default,
+ timeout: default));
+
return builder.AddResource(milvus)
.WithImage(MilvusContainerImageTags.Image, MilvusContainerImageTags.Tag)
.WithImageRegistry(MilvusContainerImageTags.Registry)
@@ -67,7 +110,8 @@ public static IResourceBuilder AddMilvus(this IDistributed
{
ctx.EnvironmentVariables["COMMON_SECURITY_DEFAULTROOTPASSWORD"] = milvus.ApiKeyParameter;
})
- .WithArgs("milvus", "run", "standalone");
+ .WithArgs("milvus", "run", "standalone")
+ .WithHealthCheck(healthCheckKey);
}
///
@@ -101,8 +145,33 @@ public static IResourceBuilder AddDatabase(this IResourc
databaseName ??= name;
builder.Resource.AddDatabase(name, databaseName);
- var milvusResource = new MilvusDatabaseResource(name, databaseName, builder.Resource);
- return builder.ApplicationBuilder.AddResource(milvusResource);
+ var milvusDatabaseResource = new MilvusDatabaseResource(name, databaseName, builder.Resource);
+
+ string? connectionString = null;
+ MilvusClient? milvusClient = null;
+ builder.ApplicationBuilder.Eventing.Subscribe(milvusDatabaseResource, async (@event, ct) =>
+ {
+ connectionString = await milvusDatabaseResource.ConnectionStringExpression.GetValueAsync(ct).ConfigureAwait(false);
+ if (connectionString == null)
+ {
+ throw new DistributedApplicationException($"ConnectionStringAvailableEvent was published for the '{milvusDatabaseResource.Name}' resource but the connection string was null.");
+ }
+ milvusClient = CreateMilvusClient(@event.Services, connectionString);
+ });
+
+ var healthCheckKey = $"{name}_check";
+ // TODO: Use health check from AspNetCore.Diagnostics.HealthChecks once it's implemented via this issue:
+ // https://github.com/Xabaril/AspNetCore.Diagnostics.HealthChecks/issues/2214
+ builder.ApplicationBuilder.Services.AddHealthChecks()
+ .Add(new HealthCheckRegistration(
+ healthCheckKey,
+ sp => new MilvusHealthCheck(milvusClient!),
+ failureStatus: default,
+ tags: default,
+ timeout: default));
+
+ return builder.ApplicationBuilder.AddResource(milvusDatabaseResource)
+ .WithHealthCheck(healthCheckKey);
}
///
@@ -190,4 +259,44 @@ private static void ConfigureAttuContainer(EnvironmentCallbackContext context, M
// This will need to be refactored once updated service discovery APIs are available
context.EnvironmentVariables.Add("MILVUS_URL", $"{resource.PrimaryEndpoint.Scheme}://{resource.Name}:{resource.PrimaryEndpoint.TargetPort}");
}
+ internal static MilvusClient CreateMilvusClient(IServiceProvider sp, string? connectionString)
+ {
+ if (connectionString is null)
+ {
+ throw new InvalidOperationException("Connection string is unavailable");
+ }
+
+ Uri? endpoint = null;
+ string? key = null;
+ string? database = null;
+
+ if (Uri.TryCreate(connectionString, UriKind.Absolute, out var uri))
+ {
+ endpoint = uri;
+ }
+ else
+ {
+ var connectionBuilder = new DbConnectionStringBuilder
+ {
+ ConnectionString = connectionString
+ };
+
+ if (connectionBuilder.ContainsKey("Endpoint") && Uri.TryCreate(connectionBuilder["Endpoint"].ToString(), UriKind.Absolute, out var serviceUri))
+ {
+ endpoint = serviceUri;
+ }
+
+ if (connectionBuilder.ContainsKey("Key"))
+ {
+ key = connectionBuilder["Key"].ToString();
+ }
+
+ if (connectionBuilder.ContainsKey("Database"))
+ {
+ database = connectionBuilder["Database"].ToString();
+ }
+ }
+
+ return new MilvusClient(endpoint!, apiKey: key!, database: database, loggerFactory: sp.GetRequiredService());
+ }
}
diff --git a/tests/Aspire.Hosting.Milvus.Tests/Aspire.Hosting.Milvus.Tests.csproj b/tests/Aspire.Hosting.Milvus.Tests/Aspire.Hosting.Milvus.Tests.csproj
index bd11f4ab0f..bff36d23b5 100644
--- a/tests/Aspire.Hosting.Milvus.Tests/Aspire.Hosting.Milvus.Tests.csproj
+++ b/tests/Aspire.Hosting.Milvus.Tests/Aspire.Hosting.Milvus.Tests.csproj
@@ -14,8 +14,4 @@
-
-
-
-
diff --git a/tests/Aspire.Hosting.Milvus.Tests/MilvusFunctionalTests.cs b/tests/Aspire.Hosting.Milvus.Tests/MilvusFunctionalTests.cs
index 54b13550e1..ad33dae5e7 100644
--- a/tests/Aspire.Hosting.Milvus.Tests/MilvusFunctionalTests.cs
+++ b/tests/Aspire.Hosting.Milvus.Tests/MilvusFunctionalTests.cs
@@ -2,10 +2,12 @@
// The .NET Foundation licenses this file to you under the MIT license.
using Aspire.Components.Common.Tests;
+using Aspire.Hosting.ApplicationModel;
using Aspire.Hosting.Utils;
using Grpc.Core;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Diagnostics.HealthChecks;
using Microsoft.Extensions.Hosting;
using Milvus.Client;
using Polly;
@@ -229,4 +231,96 @@ await pipeline.ExecuteAsync(
}
}
}
+
+ [Fact]
+ [RequiresDocker]
+ public async Task VerifyWaitForOnMilvusBlocksDependentResources()
+ {
+ var cts = new CancellationTokenSource(TimeSpan.FromMinutes(3));
+ using var builder = TestDistributedApplicationBuilder.CreateWithTestContainerRegistry(testOutputHelper);
+
+ var healthCheckTcs = new TaskCompletionSource();
+ builder.Services.AddHealthChecks().AddAsyncCheck("blocking_check", () =>
+ {
+ return healthCheckTcs.Task;
+ });
+
+ var resource = builder.AddMilvus("resource")
+ .WithHealthCheck("blocking_check");
+
+ var dependentResource = builder.AddMilvus("dependentresource")
+ .WaitFor(resource);
+
+ using var app = builder.Build();
+
+ var pendingStart = app.StartAsync(cts.Token);
+
+ var rns = app.Services.GetRequiredService();
+
+ await rns.WaitForResourceAsync(resource.Resource.Name, KnownResourceStates.Running, cts.Token);
+
+ await rns.WaitForResourceAsync(dependentResource.Resource.Name, KnownResourceStates.Waiting, cts.Token);
+
+ healthCheckTcs.SetResult(HealthCheckResult.Healthy());
+
+ await rns.WaitForResourceAsync(resource.Resource.Name, (re => re.Snapshot.HealthStatus == HealthStatus.Healthy), cts.Token);
+
+ await rns.WaitForResourceAsync(dependentResource.Resource.Name, KnownResourceStates.Running, cts.Token);
+
+ await pendingStart;
+
+ await app.StopAsync();
+ }
+
+ [Fact]
+ [RequiresDocker]
+ public async Task VerifyWaitForOnMilvusDatabaseBlocksDependentResources()
+ {
+ var cts = new CancellationTokenSource(TimeSpan.FromMinutes(5));
+ using var builder = TestDistributedApplicationBuilder.CreateWithTestContainerRegistry(testOutputHelper);
+
+ var healthCheckTcs = new TaskCompletionSource();
+ builder.Services.AddHealthChecks().AddAsyncCheck("blocking_check", () =>
+ {
+ return healthCheckTcs.Task;
+ });
+
+ var resource = builder.AddMilvus("resource")
+ .WithHealthCheck("blocking_check");
+
+ var db = resource.AddDatabase("db");
+
+ var dependentResource = builder.AddMilvus("dependentresource")
+ .WaitFor(db);
+
+ using var app = builder.Build();
+
+ var pendingStart = app.StartAsync(cts.Token);
+
+ var rns = app.Services.GetRequiredService();
+
+ await rns.WaitForResourceAsync(resource.Resource.Name, KnownResourceStates.Running, cts.Token);
+
+ await rns.WaitForResourceAsync(db.Resource.Name, KnownResourceStates.Running, cts.Token);
+
+ await rns.WaitForResourceAsync(dependentResource.Resource.Name, KnownResourceStates.Waiting, cts.Token);
+
+ healthCheckTcs.SetResult(HealthCheckResult.Healthy());
+
+ await rns.WaitForResourceAsync(resource.Resource.Name, (re => re.Snapshot.HealthStatus == HealthStatus.Healthy), cts.Token);
+
+ // Create the database.
+ var connectionString = await resource.Resource.ConnectionStringExpression.GetValueAsync(cts.Token);
+ var milvusClient = MilvusBuilderExtensions.CreateMilvusClient(app.Services, connectionString);
+ await milvusClient.CreateDatabaseAsync(db.Resource.Name);
+
+ await rns.WaitForResourceAsync(db.Resource.Name, re => re.Snapshot.HealthStatus == HealthStatus.Healthy, cts.Token);
+
+ await rns.WaitForResourceAsync(dependentResource.Resource.Name, KnownResourceStates.Running, cts.Token);
+
+ await pendingStart;
+
+ await app.StopAsync();
+ }
+
}