diff --git a/src/EventStore.Client.PersistentSubscriptions/EventStorePersistentSubscriptionsClient.Info.cs b/src/EventStore.Client.PersistentSubscriptions/EventStorePersistentSubscriptionsClient.Info.cs index 5d42de8c3..d37c85fc3 100644 --- a/src/EventStore.Client.PersistentSubscriptions/EventStorePersistentSubscriptionsClient.Info.cs +++ b/src/EventStore.Client.PersistentSubscriptions/EventStorePersistentSubscriptionsClient.Info.cs @@ -2,6 +2,7 @@ using System.Threading; using System.Threading.Tasks; using EventStore.Client.PersistentSubscriptions; +using Grpc.Core; #nullable enable namespace EventStore.Client { @@ -25,7 +26,7 @@ public async Task GetInfoToAllAsync(string groupName } }; - return await GetInfoGrpcAsync(req, userCredentials, channelInfo, cancellationToken).ConfigureAwait(false); + return await GetInfoGrpcAsync(req, userCredentials, channelInfo.CallInvoker, cancellationToken).ConfigureAwait(false); } if (channelInfo.ServerCapabilities.SupportsPersistentSubscriptionsToAll) { @@ -55,19 +56,17 @@ public async Task GetInfoToStreamAsync(string stream } }; - return await GetInfoGrpcAsync(req, userCredentials, channelInfo, cancellationToken).ConfigureAwait(false); + return await GetInfoGrpcAsync(req, userCredentials, channelInfo.CallInvoker, cancellationToken).ConfigureAwait(false); } return await GetInfoHttpAsync(streamName, groupName, userCredentials, cancellationToken).ConfigureAwait(false); } private async Task GetInfoGrpcAsync(GetInfoReq req, - UserCredentials? userCredentials, ChannelInfo channelInfo, CancellationToken cancellationToken) { + UserCredentials? userCredentials, CallInvoker callInvoker, CancellationToken cancellationToken) { - var result = await new PersistentSubscriptions.PersistentSubscriptions - .PersistentSubscriptionsClient(channelInfo.CallInvoker) - .GetInfoAsync(req, EventStoreCallOptions - .Create(Settings, Settings.OperationOptions, userCredentials, cancellationToken)); + var result = await new PersistentSubscriptions.PersistentSubscriptions.PersistentSubscriptionsClient(callInvoker) + .GetInfoAsync(req, EventStoreCallOptions.Create(Settings, Settings.OperationOptions, userCredentials, cancellationToken)); return PersistentSubscriptionInfo.From(result.SubscriptionInfo); } diff --git a/src/EventStore.Client.PersistentSubscriptions/EventStorePersistentSubscriptionsClient.List.cs b/src/EventStore.Client.PersistentSubscriptions/EventStorePersistentSubscriptionsClient.List.cs index 6f41c15a1..f94dc114d 100644 --- a/src/EventStore.Client.PersistentSubscriptions/EventStorePersistentSubscriptionsClient.List.cs +++ b/src/EventStore.Client.PersistentSubscriptions/EventStorePersistentSubscriptionsClient.List.cs @@ -4,6 +4,7 @@ using System.Threading; using System.Threading.Tasks; using EventStore.Client.PersistentSubscriptions; +using Grpc.Core; #nullable enable namespace EventStore.Client { @@ -27,7 +28,7 @@ public async Task> ListToAllAsync(UserCr } }; - return await ListGrpcAsync(req, userCredentials, channelInfo, cancellationToken).ConfigureAwait(false); + return await ListGrpcAsync(req, userCredentials, channelInfo.CallInvoker, cancellationToken).ConfigureAwait(false); } if (channelInfo.ServerCapabilities.SupportsPersistentSubscriptionsToAll) { @@ -57,7 +58,7 @@ public async Task> ListToStreamAsync(str } }; - return await ListGrpcAsync(req, userCredentials, channelInfo, cancellationToken).ConfigureAwait(false); + return await ListGrpcAsync(req, userCredentials, channelInfo.CallInvoker, cancellationToken).ConfigureAwait(false); } return await ListHttpAsync(streamName, userCredentials, cancellationToken).ConfigureAwait(false); @@ -80,7 +81,7 @@ public async Task> ListAllAsync(UserCred } }; - return await ListGrpcAsync(req, userCredentials, channelInfo, cancellationToken).ConfigureAwait(false); + return await ListGrpcAsync(req, userCredentials, channelInfo.CallInvoker, cancellationToken).ConfigureAwait(false); } return await HttpGet>("/subscriptions", userCredentials, @@ -88,9 +89,9 @@ public async Task> ListAllAsync(UserCred } private async Task> ListGrpcAsync(ListReq req, - UserCredentials? userCredentials, ChannelInfo channelInfo, CancellationToken cancellationToken) { + UserCredentials? userCredentials, CallInvoker callInvoker, CancellationToken cancellationToken) { - using var call = new PersistentSubscriptions.PersistentSubscriptions.PersistentSubscriptionsClient(channelInfo.CallInvoker) + using var call = new PersistentSubscriptions.PersistentSubscriptions.PersistentSubscriptionsClient(callInvoker) .ListAsync(req, EventStoreCallOptions.Create(Settings, Settings.OperationOptions, userCredentials, cancellationToken)); ListResp? response = await call.ResponseAsync.ConfigureAwait(false); diff --git a/src/EventStore.Client.PersistentSubscriptions/EventStorePersistentSubscriptionsClient.ReplayParked.cs b/src/EventStore.Client.PersistentSubscriptions/EventStorePersistentSubscriptionsClient.ReplayParked.cs index 0aaff7cee..90a683d65 100644 --- a/src/EventStore.Client.PersistentSubscriptions/EventStorePersistentSubscriptionsClient.ReplayParked.cs +++ b/src/EventStore.Client.PersistentSubscriptions/EventStorePersistentSubscriptionsClient.ReplayParked.cs @@ -2,6 +2,7 @@ using System.Threading; using System.Threading.Tasks; using EventStore.Client.PersistentSubscriptions; +using Grpc.Core; #nullable enable namespace EventStore.Client { @@ -26,8 +27,9 @@ public async Task ReplayParkedToAllAsync(string groupName, long? numberOfEvents }, }; - await ReplayParkedGrpcAsync(req, numberOfEvents, userCredentials, channelInfo, cancellationToken) + await ReplayParkedGrpcAsync(req, numberOfEvents, userCredentials, channelInfo.CallInvoker, cancellationToken) .ConfigureAwait(false); + return; } @@ -61,17 +63,17 @@ public async Task ReplayParkedToStreamAsync(string streamName, string groupName, }, }; - await ReplayParkedGrpcAsync(req, numberOfEvents, userCredentials, channelInfo, cancellationToken) + await ReplayParkedGrpcAsync(req, numberOfEvents, userCredentials, channelInfo.CallInvoker, cancellationToken) .ConfigureAwait(false); return; } - await ReplayParkedHttpAsync(streamName, groupName, numberOfEvents, userCredentials, - cancellationToken).ConfigureAwait(false); + await ReplayParkedHttpAsync(streamName, groupName, numberOfEvents, userCredentials, cancellationToken) + .ConfigureAwait(false); } private async Task ReplayParkedGrpcAsync(ReplayParkedReq req, long? numberOfEvents, - UserCredentials? userCredentials, ChannelInfo channelInfo, CancellationToken cancellationToken) { + UserCredentials? userCredentials, CallInvoker callInvoker, CancellationToken cancellationToken) { if (numberOfEvents.HasValue) { req.Options.StopAt = numberOfEvents.Value; @@ -79,7 +81,7 @@ private async Task ReplayParkedGrpcAsync(ReplayParkedReq req, long? numberOfEven req.Options.NoLimit = new Empty(); } - await new PersistentSubscriptions.PersistentSubscriptions.PersistentSubscriptionsClient(channelInfo.CallInvoker) + await new PersistentSubscriptions.PersistentSubscriptions.PersistentSubscriptionsClient(callInvoker) .ReplayParkedAsync(req, EventStoreCallOptions.Create(Settings, Settings.OperationOptions, userCredentials, cancellationToken)); } @@ -89,8 +91,8 @@ private async Task ReplayParkedHttpAsync(string streamName, string groupName, lo var path = $"/subscriptions/{UrlEncode(streamName)}/{UrlEncode(groupName)}/replayParked"; var query = numberOfEvents.HasValue ? $"stopAt={numberOfEvents.Value}":""; await HttpPost(path, query, userCredentials, - () => throw new PersistentSubscriptionNotFoundException(streamName, groupName), - cancellationToken).ConfigureAwait(false); + () => throw new PersistentSubscriptionNotFoundException(streamName, groupName), cancellationToken) + .ConfigureAwait(false); } } } diff --git a/src/EventStore.Client/ChannelInfo.cs b/src/EventStore.Client/ChannelInfo.cs index 87c80b644..e74b7fe46 100644 --- a/src/EventStore.Client/ChannelInfo.cs +++ b/src/EventStore.Client/ChannelInfo.cs @@ -1,9 +1,11 @@ -using Grpc.Core; +using System.Net; +using Grpc.Core; namespace EventStore.Client { #pragma warning disable 1591 public record ChannelInfo( ChannelBase Channel, ServerCapabilities ServerCapabilities, - CallInvoker CallInvoker); + CallInvoker CallInvoker, + DnsEndPoint EndPoint); } diff --git a/src/EventStore.Client/ChannelSelector.cs b/src/EventStore.Client/ChannelSelector.cs index 331830ff9..4622b9722 100644 --- a/src/EventStore.Client/ChannelSelector.cs +++ b/src/EventStore.Client/ChannelSelector.cs @@ -16,7 +16,7 @@ public ChannelSelector( : new GossipChannelSelector(settings, channelCache, new GrpcGossipClient()); } - public Task SelectChannelAsync(CancellationToken cancellationToken) => + public Task<(ChannelBase,DnsEndPoint)> SelectChannelAsync(CancellationToken cancellationToken) => _inner.SelectChannelAsync(cancellationToken); public ChannelBase SelectChannel(DnsEndPoint endPoint) => diff --git a/src/EventStore.Client/EventStoreClientBase.cs b/src/EventStore.Client/EventStoreClientBase.cs index 1fa3f3251..40280ec72 100644 --- a/src/EventStore.Client/EventStoreClientBase.cs +++ b/src/EventStore.Client/EventStoreClientBase.cs @@ -71,9 +71,9 @@ private async Task GetChannelInfoExpensive( IChannelSelector channelSelector, CancellationToken cancellationToken) { - var channel = endPoint is null + var (channel,selectedEndPoint) = endPoint is null ? await channelSelector.SelectChannelAsync(cancellationToken).ConfigureAwait(false) - : channelSelector.SelectChannel(endPoint); + : (channelSelector.SelectChannel(endPoint),endPoint); var invoker = channel.CreateCallInvoker() .Intercept(new TypedExceptionInterceptor(_exceptionMap)) @@ -90,7 +90,7 @@ private async Task GetChannelInfoExpensive( .GetAsync(invoker, cancellationToken) .ConfigureAwait(false); - return new(channel, caps, invoker); + return new(channel, caps, invoker, selectedEndPoint); } #pragma warning disable 1591 @@ -127,7 +127,7 @@ private HttpClient CreateHttpClient() { protected async Task HttpGet(string path, UserCredentials? userCredentials, Action onNotFound, CancellationToken cancellationToken) { - var request = CreateRequest(path, HttpMethod.Get, userCredentials); + var request = await CreateRequest(path, HttpMethod.Get, userCredentials, cancellationToken).ConfigureAwait(false); var httpResult = await HttpSend(request, onNotFound, cancellationToken).ConfigureAwait(false); var json = await httpResult.Content.ReadAsStringAsync().ConfigureAwait(false); var result = JsonConvert.DeserializeObject(json, _jsonSettings); @@ -150,7 +150,7 @@ protected async Task HttpGet(string path, UserCredentials? userCredentials protected async Task HttpPost(string path, string query, UserCredentials? userCredentials, Action onNotFound, CancellationToken cancellationToken) { - var request = CreateRequest(path, query, HttpMethod.Post, userCredentials); + var request = await CreateRequest(path, query, HttpMethod.Post, userCredentials, cancellationToken).ConfigureAwait(false); await HttpSend(request, onNotFound, cancellationToken).ConfigureAwait(false); } @@ -171,15 +171,17 @@ private async Task HttpSend(HttpRequestMessage request, Act throw new Exception($"The HTTP request failed with status code: {httpResult.StatusCode}"); } - private HttpRequestMessage CreateRequest(string path, HttpMethod method, UserCredentials? credentials) => - CreateRequest(path, query: "", method, credentials); + private async Task CreateRequest(string path, HttpMethod method, UserCredentials? credentials, CancellationToken cancellationToken) => + await CreateRequest(path, query: "", method, credentials, cancellationToken).ConfigureAwait(false); - private HttpRequestMessage CreateRequest(string path, string query, HttpMethod method, UserCredentials? credentials) { + private async Task CreateRequest(string path, string query, HttpMethod method, UserCredentials? credentials, CancellationToken cancellationToken) { credentials ??= Settings.DefaultCredentials; - var baseAddress = Settings.ConnectivitySettings.Address; - var uriBuilder = new UriBuilder(baseAddress.Scheme, baseAddress.Host, baseAddress.Port, path); - uriBuilder.Query = query; + var scheme = Settings.ConnectivitySettings.Address.Scheme; + var channelInfo = await GetChannelInfo(cancellationToken).ConfigureAwait(false); + var uriBuilder = new UriBuilder(scheme, channelInfo.EndPoint.Host, channelInfo.EndPoint.Port, path) { + Query = query + }; var httpRequest = new HttpRequestMessage(method, uriBuilder.Uri); httpRequest.Headers.Add("accept", "application/json"); diff --git a/src/EventStore.Client/GossipChannelSelector.cs b/src/EventStore.Client/GossipChannelSelector.cs index 0483b467e..e5e12d0ef 100644 --- a/src/EventStore.Client/GossipChannelSelector.cs +++ b/src/EventStore.Client/GossipChannelSelector.cs @@ -34,12 +34,12 @@ public ChannelBase SelectChannel(DnsEndPoint endPoint) { return _channels.GetChannelInfo(endPoint); } - public async Task SelectChannelAsync(CancellationToken cancellationToken) { + public async Task<(ChannelBase,DnsEndPoint)> SelectChannelAsync(CancellationToken cancellationToken) { var endPoint = await DiscoverAsync(cancellationToken).ConfigureAwait(false); _log.LogInformation("Successfully discovered candidate at {endPoint}.", endPoint); - return _channels.GetChannelInfo(endPoint); + return (_channels.GetChannelInfo(endPoint),endPoint); } private async Task DiscoverAsync(CancellationToken cancellationToken) { diff --git a/src/EventStore.Client/IChannelSelector.cs b/src/EventStore.Client/IChannelSelector.cs index 88d730933..1cf06b7c7 100644 --- a/src/EventStore.Client/IChannelSelector.cs +++ b/src/EventStore.Client/IChannelSelector.cs @@ -1,4 +1,3 @@ -using System; using System.Net; using System.Threading; using System.Threading.Tasks; @@ -8,7 +7,7 @@ namespace EventStore.Client { internal interface IChannelSelector { // Let the channel selector pick an endpoint. - Task SelectChannelAsync(CancellationToken cancellationToken); + Task<(ChannelBase,DnsEndPoint)> SelectChannelAsync(CancellationToken cancellationToken); // Get a channel for the specified endpoint ChannelBase SelectChannel(DnsEndPoint endPoint); diff --git a/src/EventStore.Client/SingleNodeChannelSelector.cs b/src/EventStore.Client/SingleNodeChannelSelector.cs index d10590032..23df62ae3 100644 --- a/src/EventStore.Client/SingleNodeChannelSelector.cs +++ b/src/EventStore.Client/SingleNodeChannelSelector.cs @@ -21,8 +21,8 @@ public SingleNodeChannelSelector( _https = string.Compare(uri.Scheme, Uri.UriSchemeHttps, ignoreCase: true) == 0; } - public Task SelectChannelAsync(CancellationToken cancellationToken) => - Task.FromResult(SelectChannel(_endPoint)); + public Task<(ChannelBase,DnsEndPoint)> SelectChannelAsync(CancellationToken cancellationToken) => + Task.FromResult((SelectChannel(_endPoint), _endPoint)); public ChannelBase SelectChannel(DnsEndPoint endPoint) => _channelCache.GetChannelInfo(endPoint, _https); diff --git a/test/EventStore.Client.Tests/GossipChannelSelectorTests.cs b/test/EventStore.Client.Tests/GossipChannelSelectorTests.cs index 04e8a1c72..905dec1f6 100644 --- a/test/EventStore.Client.Tests/GossipChannelSelectorTests.cs +++ b/test/EventStore.Client.Tests/GossipChannelSelectorTests.cs @@ -30,8 +30,9 @@ public async Task ExplicitlySettingEndPointChangesChannels() { new(secondId, ClusterMessages.VNodeState.Follower, true, secondSelection), }))); - var channel = await sut.SelectChannelAsync(cancellationToken: default); + var (channel,endPoint) = await sut.SelectChannelAsync(cancellationToken: default); Assert.Equal($"{firstSelection.Host}:{firstSelection.Port}", channel.Target); + Assert.Equal(firstSelection, endPoint); channel = sut.SelectChannel(secondSelection); Assert.Equal($"{secondSelection.Host}:{secondSelection.Port}", channel.Target);