From 8fd65d2c6dfcdb98963364662e5345114a608ff4 Mon Sep 17 00:00:00 2001 From: Alireza Baloochi Date: Tue, 17 Sep 2024 17:08:08 +0330 Subject: [PATCH] WaitFor Milvus (#5707) * Add waitfor milvus * Update src/Aspire.Hosting.Milvus/MilvusBuilderExtensions.cs Co-authored-by: David Fowler * Fix healthchecks and address PR feedback --------- Co-authored-by: David Fowler --- .../Aspire.Hosting.Milvus.csproj | 12 +- .../MilvusBuilderExtensions.cs | 115 +++++++++++++++++- .../Aspire.Hosting.Milvus.Tests.csproj | 4 - .../MilvusFunctionalTests.cs | 94 ++++++++++++++ 4 files changed, 217 insertions(+), 8 deletions(-) diff --git a/src/Aspire.Hosting.Milvus/Aspire.Hosting.Milvus.csproj b/src/Aspire.Hosting.Milvus/Aspire.Hosting.Milvus.csproj index 874d012244..4e9e2700c2 100644 --- a/src/Aspire.Hosting.Milvus/Aspire.Hosting.Milvus.csproj +++ b/src/Aspire.Hosting.Milvus/Aspire.Hosting.Milvus.csproj @@ -6,6 +6,8 @@ aspire integration hosting milvus database vector search Milvus vector database support for .NET Aspire. $(SharedDir)Milvus_256x.png + $(NoWarn);CS8002 + false @@ -17,10 +19,18 @@ + - + + + + + + + + diff --git a/src/Aspire.Hosting.Milvus/MilvusBuilderExtensions.cs b/src/Aspire.Hosting.Milvus/MilvusBuilderExtensions.cs index a7de4edec5..098bc61969 100644 --- a/src/Aspire.Hosting.Milvus/MilvusBuilderExtensions.cs +++ b/src/Aspire.Hosting.Milvus/MilvusBuilderExtensions.cs @@ -1,9 +1,15 @@ // Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT license. +using System.Data.Common; using Aspire.Hosting.ApplicationModel; using Aspire.Hosting.Milvus; using Aspire.Hosting.Utils; +using Aspire.Milvus.Client; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Diagnostics.HealthChecks; +using Microsoft.Extensions.Logging; +using Milvus.Client; namespace Aspire.Hosting; @@ -51,6 +57,43 @@ public static IResourceBuilder AddMilvus(this IDistributed var milvus = new MilvusServerResource(name, apiKeyParameter); + string? connectionString = null; + MilvusClient? milvusClient = null; + + builder.Eventing.Subscribe(milvus, async (@event, ct) => + { + connectionString = await milvus.ConnectionStringExpression.GetValueAsync(ct).ConfigureAwait(false); + if (connectionString is null) + { + throw new DistributedApplicationException($"ConnectionStringAvailableEvent was published for the '{milvus.Name}' resource but the connection string was null."); + } + milvusClient = CreateMilvusClient(@event.Services, connectionString); + var lookup = builder.Resources.OfType().ToDictionary(d => d.Name); + foreach (var databaseName in milvus.Databases) + { + if (!lookup.TryGetValue(databaseName.Key, out var databaseResource)) + { + throw new DistributedApplicationException($"Database resource '{databaseName}' under Milvus Server resource '{milvus.Name}' was not found in the model."); + } + var connectionStringAvailableEvent = new ConnectionStringAvailableEvent(databaseResource, @event.Services); + await builder.Eventing.PublishAsync(connectionStringAvailableEvent, ct).ConfigureAwait(false); + + var beforeResourceStartedEvent = new BeforeResourceStartedEvent(databaseResource, @event.Services); + await builder.Eventing.PublishAsync(beforeResourceStartedEvent, ct).ConfigureAwait(false); + } + }); + + var healthCheckKey = $"{name}_check"; + // TODO: Use health check from AspNetCore.Diagnostics.HealthChecks once it's implemented via this issue: + // https://github.com/Xabaril/AspNetCore.Diagnostics.HealthChecks/issues/2214 + builder.Services.AddHealthChecks() + .Add(new HealthCheckRegistration( + healthCheckKey, + sp => new MilvusHealthCheck(milvusClient!), + failureStatus: default, + tags: default, + timeout: default)); + return builder.AddResource(milvus) .WithImage(MilvusContainerImageTags.Image, MilvusContainerImageTags.Tag) .WithImageRegistry(MilvusContainerImageTags.Registry) @@ -67,7 +110,8 @@ public static IResourceBuilder AddMilvus(this IDistributed { ctx.EnvironmentVariables["COMMON_SECURITY_DEFAULTROOTPASSWORD"] = milvus.ApiKeyParameter; }) - .WithArgs("milvus", "run", "standalone"); + .WithArgs("milvus", "run", "standalone") + .WithHealthCheck(healthCheckKey); } /// @@ -101,8 +145,33 @@ public static IResourceBuilder AddDatabase(this IResourc databaseName ??= name; builder.Resource.AddDatabase(name, databaseName); - var milvusResource = new MilvusDatabaseResource(name, databaseName, builder.Resource); - return builder.ApplicationBuilder.AddResource(milvusResource); + var milvusDatabaseResource = new MilvusDatabaseResource(name, databaseName, builder.Resource); + + string? connectionString = null; + MilvusClient? milvusClient = null; + builder.ApplicationBuilder.Eventing.Subscribe(milvusDatabaseResource, async (@event, ct) => + { + connectionString = await milvusDatabaseResource.ConnectionStringExpression.GetValueAsync(ct).ConfigureAwait(false); + if (connectionString == null) + { + throw new DistributedApplicationException($"ConnectionStringAvailableEvent was published for the '{milvusDatabaseResource.Name}' resource but the connection string was null."); + } + milvusClient = CreateMilvusClient(@event.Services, connectionString); + }); + + var healthCheckKey = $"{name}_check"; + // TODO: Use health check from AspNetCore.Diagnostics.HealthChecks once it's implemented via this issue: + // https://github.com/Xabaril/AspNetCore.Diagnostics.HealthChecks/issues/2214 + builder.ApplicationBuilder.Services.AddHealthChecks() + .Add(new HealthCheckRegistration( + healthCheckKey, + sp => new MilvusHealthCheck(milvusClient!), + failureStatus: default, + tags: default, + timeout: default)); + + return builder.ApplicationBuilder.AddResource(milvusDatabaseResource) + .WithHealthCheck(healthCheckKey); } /// @@ -190,4 +259,44 @@ private static void ConfigureAttuContainer(EnvironmentCallbackContext context, M // This will need to be refactored once updated service discovery APIs are available context.EnvironmentVariables.Add("MILVUS_URL", $"{resource.PrimaryEndpoint.Scheme}://{resource.Name}:{resource.PrimaryEndpoint.TargetPort}"); } + internal static MilvusClient CreateMilvusClient(IServiceProvider sp, string? connectionString) + { + if (connectionString is null) + { + throw new InvalidOperationException("Connection string is unavailable"); + } + + Uri? endpoint = null; + string? key = null; + string? database = null; + + if (Uri.TryCreate(connectionString, UriKind.Absolute, out var uri)) + { + endpoint = uri; + } + else + { + var connectionBuilder = new DbConnectionStringBuilder + { + ConnectionString = connectionString + }; + + if (connectionBuilder.ContainsKey("Endpoint") && Uri.TryCreate(connectionBuilder["Endpoint"].ToString(), UriKind.Absolute, out var serviceUri)) + { + endpoint = serviceUri; + } + + if (connectionBuilder.ContainsKey("Key")) + { + key = connectionBuilder["Key"].ToString(); + } + + if (connectionBuilder.ContainsKey("Database")) + { + database = connectionBuilder["Database"].ToString(); + } + } + + return new MilvusClient(endpoint!, apiKey: key!, database: database, loggerFactory: sp.GetRequiredService()); + } } diff --git a/tests/Aspire.Hosting.Milvus.Tests/Aspire.Hosting.Milvus.Tests.csproj b/tests/Aspire.Hosting.Milvus.Tests/Aspire.Hosting.Milvus.Tests.csproj index bd11f4ab0f..bff36d23b5 100644 --- a/tests/Aspire.Hosting.Milvus.Tests/Aspire.Hosting.Milvus.Tests.csproj +++ b/tests/Aspire.Hosting.Milvus.Tests/Aspire.Hosting.Milvus.Tests.csproj @@ -14,8 +14,4 @@ - - - - diff --git a/tests/Aspire.Hosting.Milvus.Tests/MilvusFunctionalTests.cs b/tests/Aspire.Hosting.Milvus.Tests/MilvusFunctionalTests.cs index 54b13550e1..ad33dae5e7 100644 --- a/tests/Aspire.Hosting.Milvus.Tests/MilvusFunctionalTests.cs +++ b/tests/Aspire.Hosting.Milvus.Tests/MilvusFunctionalTests.cs @@ -2,10 +2,12 @@ // The .NET Foundation licenses this file to you under the MIT license. using Aspire.Components.Common.Tests; +using Aspire.Hosting.ApplicationModel; using Aspire.Hosting.Utils; using Grpc.Core; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Diagnostics.HealthChecks; using Microsoft.Extensions.Hosting; using Milvus.Client; using Polly; @@ -229,4 +231,96 @@ await pipeline.ExecuteAsync( } } } + + [Fact] + [RequiresDocker] + public async Task VerifyWaitForOnMilvusBlocksDependentResources() + { + var cts = new CancellationTokenSource(TimeSpan.FromMinutes(3)); + using var builder = TestDistributedApplicationBuilder.CreateWithTestContainerRegistry(testOutputHelper); + + var healthCheckTcs = new TaskCompletionSource(); + builder.Services.AddHealthChecks().AddAsyncCheck("blocking_check", () => + { + return healthCheckTcs.Task; + }); + + var resource = builder.AddMilvus("resource") + .WithHealthCheck("blocking_check"); + + var dependentResource = builder.AddMilvus("dependentresource") + .WaitFor(resource); + + using var app = builder.Build(); + + var pendingStart = app.StartAsync(cts.Token); + + var rns = app.Services.GetRequiredService(); + + await rns.WaitForResourceAsync(resource.Resource.Name, KnownResourceStates.Running, cts.Token); + + await rns.WaitForResourceAsync(dependentResource.Resource.Name, KnownResourceStates.Waiting, cts.Token); + + healthCheckTcs.SetResult(HealthCheckResult.Healthy()); + + await rns.WaitForResourceAsync(resource.Resource.Name, (re => re.Snapshot.HealthStatus == HealthStatus.Healthy), cts.Token); + + await rns.WaitForResourceAsync(dependentResource.Resource.Name, KnownResourceStates.Running, cts.Token); + + await pendingStart; + + await app.StopAsync(); + } + + [Fact] + [RequiresDocker] + public async Task VerifyWaitForOnMilvusDatabaseBlocksDependentResources() + { + var cts = new CancellationTokenSource(TimeSpan.FromMinutes(5)); + using var builder = TestDistributedApplicationBuilder.CreateWithTestContainerRegistry(testOutputHelper); + + var healthCheckTcs = new TaskCompletionSource(); + builder.Services.AddHealthChecks().AddAsyncCheck("blocking_check", () => + { + return healthCheckTcs.Task; + }); + + var resource = builder.AddMilvus("resource") + .WithHealthCheck("blocking_check"); + + var db = resource.AddDatabase("db"); + + var dependentResource = builder.AddMilvus("dependentresource") + .WaitFor(db); + + using var app = builder.Build(); + + var pendingStart = app.StartAsync(cts.Token); + + var rns = app.Services.GetRequiredService(); + + await rns.WaitForResourceAsync(resource.Resource.Name, KnownResourceStates.Running, cts.Token); + + await rns.WaitForResourceAsync(db.Resource.Name, KnownResourceStates.Running, cts.Token); + + await rns.WaitForResourceAsync(dependentResource.Resource.Name, KnownResourceStates.Waiting, cts.Token); + + healthCheckTcs.SetResult(HealthCheckResult.Healthy()); + + await rns.WaitForResourceAsync(resource.Resource.Name, (re => re.Snapshot.HealthStatus == HealthStatus.Healthy), cts.Token); + + // Create the database. + var connectionString = await resource.Resource.ConnectionStringExpression.GetValueAsync(cts.Token); + var milvusClient = MilvusBuilderExtensions.CreateMilvusClient(app.Services, connectionString); + await milvusClient.CreateDatabaseAsync(db.Resource.Name); + + await rns.WaitForResourceAsync(db.Resource.Name, re => re.Snapshot.HealthStatus == HealthStatus.Healthy, cts.Token); + + await rns.WaitForResourceAsync(dependentResource.Resource.Name, KnownResourceStates.Running, cts.Token); + + await pendingStart; + + await app.StopAsync(); + } + }