diff --git a/playground/kafka/KafkaBasic.AppHost/Program.cs b/playground/kafka/KafkaBasic.AppHost/Program.cs index 481bbde01d..1bb676d4e1 100644 --- a/playground/kafka/KafkaBasic.AppHost/Program.cs +++ b/playground/kafka/KafkaBasic.AppHost/Program.cs @@ -7,11 +7,11 @@ .WithKafkaUI(kafkaUi => kafkaUi.WithHostPort(8080)); builder.AddProject("producer") - .WithReference(kafka) + .WithReference(kafka).WaitFor(kafka) .WithArgs(kafka.Resource.Name); builder.AddProject("consumer") - .WithReference(kafka) + .WithReference(kafka).WaitFor(kafka) .WithArgs(kafka.Resource.Name); builder.AddKafka("kafka2").WithKafkaUI(); diff --git a/playground/waitfor/WaitForSandbox.ApiService/WaitForSandbox.ApiService.csproj b/playground/waitfor/WaitForSandbox.ApiService/WaitForSandbox.ApiService.csproj index d0104da3df..1ae39c8c4b 100644 --- a/playground/waitfor/WaitForSandbox.ApiService/WaitForSandbox.ApiService.csproj +++ b/playground/waitfor/WaitForSandbox.ApiService/WaitForSandbox.ApiService.csproj @@ -12,4 +12,4 @@ - + \ No newline at end of file diff --git a/src/Aspire.Hosting.Kafka/Aspire.Hosting.Kafka.csproj b/src/Aspire.Hosting.Kafka/Aspire.Hosting.Kafka.csproj index 3daca34c0b..fb7a53c1aa 100644 --- a/src/Aspire.Hosting.Kafka/Aspire.Hosting.Kafka.csproj +++ b/src/Aspire.Hosting.Kafka/Aspire.Hosting.Kafka.csproj @@ -1,10 +1,10 @@ - $(DefaultTargetFramework) - true - aspire integration hosting kafka - Kafka support for .NET Aspire. + $(DefaultTargetFramework) + true + aspire integration hosting kafka + Kafka support for .NET Aspire. @@ -14,6 +14,12 @@ + + + + + + diff --git a/src/Aspire.Hosting.Kafka/KafkaBuilderExtensions.cs b/src/Aspire.Hosting.Kafka/KafkaBuilderExtensions.cs index 72abebb608..ff082a9fb9 100644 --- a/src/Aspire.Hosting.Kafka/KafkaBuilderExtensions.cs +++ b/src/Aspire.Hosting.Kafka/KafkaBuilderExtensions.cs @@ -3,6 +3,10 @@ using Aspire.Hosting.ApplicationModel; using Aspire.Hosting.Utils; +using Confluent.Kafka; +using HealthChecks.Kafka; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Diagnostics.HealthChecks; namespace Aspire.Hosting; @@ -29,12 +33,47 @@ public static IResourceBuilder AddKafka(this IDistributedAp ArgumentNullException.ThrowIfNull(name); var kafka = new KafkaServerResource(name); + + string? connectionString = null; + + builder.Eventing.Subscribe(kafka, async (@event, ct) => + { + connectionString = await kafka.ConnectionStringExpression.GetValueAsync(ct).ConfigureAwait(false); + + if (connectionString == null) + { + throw new DistributedApplicationException($"ConnectionStringAvailableEvent was published for the '{kafka.Name}' resource but the connection string was null."); + } + }); + + var healthCheckKey = $"{name}_check"; + + // NOTE: We cannot use AddKafka here because it registers the health check as a singleton + // which means if you have multiple Kafka resources the factory callback will end + // up using the connection string of the last Kafka resource that was added. The + // client packages also have to work around this issue. + // + // SEE: https://github.com/Xabaril/AspNetCore.Diagnostics.HealthChecks/issues/2298 + var healthCheckRegistration = new HealthCheckRegistration( + healthCheckKey, + sp => + { + var options = new KafkaHealthCheckOptions(); + options.Configuration = new ProducerConfig(); + options.Configuration.BootstrapServers = connectionString ?? throw new InvalidOperationException("Connection string is unavailable"); + return new KafkaHealthCheck(options); + }, + failureStatus: default, + tags: default); + builder.Services.AddHealthChecks().Add(healthCheckRegistration); + return builder.AddResource(kafka) .WithEndpoint(targetPort: KafkaBrokerPort, port: port, name: KafkaServerResource.PrimaryEndpointName) .WithEndpoint(targetPort: KafkaInternalBrokerPort, name: KafkaServerResource.InternalEndpointName) .WithImage(KafkaContainerImageTags.Image, KafkaContainerImageTags.Tag) .WithImageRegistry(KafkaContainerImageTags.Registry) - .WithEnvironment(context => ConfigureKafkaContainer(context, kafka)); + .WithEnvironment(context => ConfigureKafkaContainer(context, kafka)) + .WithHealthCheck(healthCheckKey); } /// diff --git a/tests/Aspire.Hosting.Kafka.Tests/KafkaFunctionalTests.cs b/tests/Aspire.Hosting.Kafka.Tests/KafkaFunctionalTests.cs index 401e22c619..8ab34f23b2 100644 --- a/tests/Aspire.Hosting.Kafka.Tests/KafkaFunctionalTests.cs +++ b/tests/Aspire.Hosting.Kafka.Tests/KafkaFunctionalTests.cs @@ -2,10 +2,12 @@ // The .NET Foundation licenses this file to you under the MIT license. using Aspire.Components.Common.Tests; +using Aspire.Hosting.ApplicationModel; using Aspire.Hosting.Tests.Utils; using Aspire.Hosting.Utils; using Confluent.Kafka; using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Diagnostics.HealthChecks; using Microsoft.Extensions.Hosting; using Polly; using Xunit; @@ -15,6 +17,46 @@ namespace Aspire.Hosting.Kafka.Tests; public class KafkaFunctionalTests(ITestOutputHelper testOutputHelper) { + [Fact] + [RequiresDocker] + public async Task VerifyWaitForOnKafkaBlocksDependentResources() + { + var cts = new CancellationTokenSource(TimeSpan.FromMinutes(3)); + using var builder = TestDistributedApplicationBuilder.Create(testOutputHelper); + + var healthCheckTcs = new TaskCompletionSource(); + builder.Services.AddHealthChecks().AddAsyncCheck("blocking_check", () => + { + return healthCheckTcs.Task; + }); + + var resource = builder.AddKafka("resource") + .WithHealthCheck("blocking_check"); + + var dependentResource = builder.AddKafka("dependentresource") + .WaitFor(resource); + + using var app = builder.Build(); + + var pendingStart = app.StartAsync(cts.Token); + + var rns = app.Services.GetRequiredService(); + + await rns.WaitForResourceAsync(resource.Resource.Name, KnownResourceStates.Running, cts.Token); + + await rns.WaitForResourceAsync(dependentResource.Resource.Name, KnownResourceStates.Waiting, cts.Token); + + healthCheckTcs.SetResult(HealthCheckResult.Healthy()); + + await rns.WaitForResourceAsync(resource.Resource.Name, (re => re.Snapshot.HealthStatus == HealthStatus.Healthy), cts.Token); + + await rns.WaitForResourceAsync(dependentResource.Resource.Name, KnownResourceStates.Running, cts.Token); + + await pendingStart; + + await app.StopAsync(); + } + [Fact] [RequiresDocker] public async Task VerifyKafkaResource()