Skip to content

Commit

Permalink
gossip
Browse files Browse the repository at this point in the history
  • Loading branch information
pvanbuijtene committed Feb 9, 2022
1 parent 68c0ae7 commit f6b5084
Show file tree
Hide file tree
Showing 10 changed files with 47 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System.Threading;
using System.Threading.Tasks;
using EventStore.Client.PersistentSubscriptions;
using Grpc.Core;

#nullable enable
namespace EventStore.Client {
Expand All @@ -25,7 +26,7 @@ public async Task<PersistentSubscriptionInfo> GetInfoToAllAsync(string groupName
}
};

return await GetInfoGrpcAsync(req, userCredentials, channelInfo, cancellationToken).ConfigureAwait(false);
return await GetInfoGrpcAsync(req, userCredentials, channelInfo.CallInvoker, cancellationToken).ConfigureAwait(false);
}

if (channelInfo.ServerCapabilities.SupportsPersistentSubscriptionsToAll) {
Expand Down Expand Up @@ -55,19 +56,17 @@ public async Task<PersistentSubscriptionInfo> GetInfoToStreamAsync(string stream
}
};

return await GetInfoGrpcAsync(req, userCredentials, channelInfo, cancellationToken).ConfigureAwait(false);
return await GetInfoGrpcAsync(req, userCredentials, channelInfo.CallInvoker, cancellationToken).ConfigureAwait(false);
}

return await GetInfoHttpAsync(streamName, groupName, userCredentials, cancellationToken).ConfigureAwait(false);
}

private async Task<PersistentSubscriptionInfo> GetInfoGrpcAsync(GetInfoReq req,
UserCredentials? userCredentials, ChannelInfo channelInfo, CancellationToken cancellationToken) {
UserCredentials? userCredentials, CallInvoker callInvoker, CancellationToken cancellationToken) {

var result = await new PersistentSubscriptions.PersistentSubscriptions
.PersistentSubscriptionsClient(channelInfo.CallInvoker)
.GetInfoAsync(req, EventStoreCallOptions
.Create(Settings, Settings.OperationOptions, userCredentials, cancellationToken));
var result = await new PersistentSubscriptions.PersistentSubscriptions.PersistentSubscriptionsClient(callInvoker)
.GetInfoAsync(req, EventStoreCallOptions.Create(Settings, Settings.OperationOptions, userCredentials, cancellationToken));

return PersistentSubscriptionInfo.From(result.SubscriptionInfo);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System.Threading;
using System.Threading.Tasks;
using EventStore.Client.PersistentSubscriptions;
using Grpc.Core;

#nullable enable
namespace EventStore.Client {
Expand All @@ -27,7 +28,7 @@ public async Task<IEnumerable<PersistentSubscriptionInfo>> ListToAllAsync(UserCr
}
};

return await ListGrpcAsync(req, userCredentials, channelInfo, cancellationToken).ConfigureAwait(false);
return await ListGrpcAsync(req, userCredentials, channelInfo.CallInvoker, cancellationToken).ConfigureAwait(false);
}

if (channelInfo.ServerCapabilities.SupportsPersistentSubscriptionsToAll) {
Expand Down Expand Up @@ -57,7 +58,7 @@ public async Task<IEnumerable<PersistentSubscriptionInfo>> ListToStreamAsync(str
}
};

return await ListGrpcAsync(req, userCredentials, channelInfo, cancellationToken).ConfigureAwait(false);
return await ListGrpcAsync(req, userCredentials, channelInfo.CallInvoker, cancellationToken).ConfigureAwait(false);
}

return await ListHttpAsync(streamName, userCredentials, cancellationToken).ConfigureAwait(false);
Expand All @@ -80,17 +81,17 @@ public async Task<IEnumerable<PersistentSubscriptionInfo>> ListAllAsync(UserCred
}
};

return await ListGrpcAsync(req, userCredentials, channelInfo, cancellationToken).ConfigureAwait(false);
return await ListGrpcAsync(req, userCredentials, channelInfo.CallInvoker, cancellationToken).ConfigureAwait(false);
}

return await HttpGet<IList<PersistentSubscriptionInfo>>("/subscriptions", userCredentials,
() => throw new Exception(), cancellationToken).ConfigureAwait(false);
}

private async Task<IEnumerable<PersistentSubscriptionInfo>> ListGrpcAsync(ListReq req,
UserCredentials? userCredentials, ChannelInfo channelInfo, CancellationToken cancellationToken) {
UserCredentials? userCredentials, CallInvoker callInvoker, CancellationToken cancellationToken) {

using var call = new PersistentSubscriptions.PersistentSubscriptions.PersistentSubscriptionsClient(channelInfo.CallInvoker)
using var call = new PersistentSubscriptions.PersistentSubscriptions.PersistentSubscriptionsClient(callInvoker)
.ListAsync(req, EventStoreCallOptions.Create(Settings, Settings.OperationOptions, userCredentials, cancellationToken));

ListResp? response = await call.ResponseAsync.ConfigureAwait(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System.Threading;
using System.Threading.Tasks;
using EventStore.Client.PersistentSubscriptions;
using Grpc.Core;

#nullable enable
namespace EventStore.Client {
Expand All @@ -26,8 +27,9 @@ public async Task ReplayParkedToAllAsync(string groupName, long? numberOfEvents
},
};

await ReplayParkedGrpcAsync(req, numberOfEvents, userCredentials, channelInfo, cancellationToken)
await ReplayParkedGrpcAsync(req, numberOfEvents, userCredentials, channelInfo.CallInvoker, cancellationToken)
.ConfigureAwait(false);

return;
}

Expand Down Expand Up @@ -61,25 +63,25 @@ public async Task ReplayParkedToStreamAsync(string streamName, string groupName,
},
};

await ReplayParkedGrpcAsync(req, numberOfEvents, userCredentials, channelInfo, cancellationToken)
await ReplayParkedGrpcAsync(req, numberOfEvents, userCredentials, channelInfo.CallInvoker, cancellationToken)
.ConfigureAwait(false);
return;
}

await ReplayParkedHttpAsync(streamName, groupName, numberOfEvents, userCredentials,
cancellationToken).ConfigureAwait(false);
await ReplayParkedHttpAsync(streamName, groupName, numberOfEvents, userCredentials, cancellationToken)
.ConfigureAwait(false);
}

private async Task ReplayParkedGrpcAsync(ReplayParkedReq req, long? numberOfEvents,
UserCredentials? userCredentials, ChannelInfo channelInfo, CancellationToken cancellationToken) {
UserCredentials? userCredentials, CallInvoker callInvoker, CancellationToken cancellationToken) {

if (numberOfEvents.HasValue) {
req.Options.StopAt = numberOfEvents.Value;
} else {
req.Options.NoLimit = new Empty();
}

await new PersistentSubscriptions.PersistentSubscriptions.PersistentSubscriptionsClient(channelInfo.CallInvoker)
await new PersistentSubscriptions.PersistentSubscriptions.PersistentSubscriptionsClient(callInvoker)
.ReplayParkedAsync(req, EventStoreCallOptions.Create(Settings, Settings.OperationOptions, userCredentials, cancellationToken));
}

Expand All @@ -89,8 +91,8 @@ private async Task ReplayParkedHttpAsync(string streamName, string groupName, lo
var path = $"/subscriptions/{UrlEncode(streamName)}/{UrlEncode(groupName)}/replayParked";
var query = numberOfEvents.HasValue ? $"stopAt={numberOfEvents.Value}":"";
await HttpPost(path, query, userCredentials,
() => throw new PersistentSubscriptionNotFoundException(streamName, groupName),
cancellationToken).ConfigureAwait(false);
() => throw new PersistentSubscriptionNotFoundException(streamName, groupName), cancellationToken)
.ConfigureAwait(false);
}
}
}
6 changes: 4 additions & 2 deletions src/EventStore.Client/ChannelInfo.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
using Grpc.Core;
using System.Net;
using Grpc.Core;

namespace EventStore.Client {
#pragma warning disable 1591
public record ChannelInfo(
ChannelBase Channel,
ServerCapabilities ServerCapabilities,
CallInvoker CallInvoker);
CallInvoker CallInvoker,
DnsEndPoint EndPoint);
}
2 changes: 1 addition & 1 deletion src/EventStore.Client/ChannelSelector.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public ChannelSelector(
: new GossipChannelSelector(settings, channelCache, new GrpcGossipClient());
}

public Task<ChannelBase> SelectChannelAsync(CancellationToken cancellationToken) =>
public Task<(ChannelBase,DnsEndPoint)> SelectChannelAsync(CancellationToken cancellationToken) =>
_inner.SelectChannelAsync(cancellationToken);

public ChannelBase SelectChannel(DnsEndPoint endPoint) =>
Expand Down
24 changes: 13 additions & 11 deletions src/EventStore.Client/EventStoreClientBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,9 @@ private async Task<ChannelInfo> GetChannelInfoExpensive(
IChannelSelector channelSelector,
CancellationToken cancellationToken) {

var channel = endPoint is null
var (channel,selectedEndPoint) = endPoint is null
? await channelSelector.SelectChannelAsync(cancellationToken).ConfigureAwait(false)
: channelSelector.SelectChannel(endPoint);
: (channelSelector.SelectChannel(endPoint),endPoint);

var invoker = channel.CreateCallInvoker()
.Intercept(new TypedExceptionInterceptor(_exceptionMap))
Expand All @@ -90,7 +90,7 @@ private async Task<ChannelInfo> GetChannelInfoExpensive(
.GetAsync(invoker, cancellationToken)
.ConfigureAwait(false);

return new(channel, caps, invoker);
return new(channel, caps, invoker, selectedEndPoint);
}

#pragma warning disable 1591
Expand Down Expand Up @@ -127,7 +127,7 @@ private HttpClient CreateHttpClient() {
protected async Task<T> HttpGet<T>(string path, UserCredentials? userCredentials, Action onNotFound,
CancellationToken cancellationToken) {

var request = CreateRequest(path, HttpMethod.Get, userCredentials);
var request = await CreateRequest(path, HttpMethod.Get, userCredentials, cancellationToken).ConfigureAwait(false);
var httpResult = await HttpSend(request, onNotFound, cancellationToken).ConfigureAwait(false);
var json = await httpResult.Content.ReadAsStringAsync().ConfigureAwait(false);
var result = JsonConvert.DeserializeObject<T>(json, _jsonSettings);
Expand All @@ -150,7 +150,7 @@ protected async Task<T> HttpGet<T>(string path, UserCredentials? userCredentials
protected async Task HttpPost(string path, string query, UserCredentials? userCredentials, Action onNotFound,
CancellationToken cancellationToken) {

var request = CreateRequest(path, query, HttpMethod.Post, userCredentials);
var request = await CreateRequest(path, query, HttpMethod.Post, userCredentials, cancellationToken).ConfigureAwait(false);
await HttpSend(request, onNotFound, cancellationToken).ConfigureAwait(false);
}

Expand All @@ -171,15 +171,17 @@ private async Task<HttpResponseMessage> HttpSend(HttpRequestMessage request, Act
throw new Exception($"The HTTP request failed with status code: {httpResult.StatusCode}");
}

private HttpRequestMessage CreateRequest(string path, HttpMethod method, UserCredentials? credentials) =>
CreateRequest(path, query: "", method, credentials);
private async Task<HttpRequestMessage> CreateRequest(string path, HttpMethod method, UserCredentials? credentials, CancellationToken cancellationToken) =>
await CreateRequest(path, query: "", method, credentials, cancellationToken).ConfigureAwait(false);

private HttpRequestMessage CreateRequest(string path, string query, HttpMethod method, UserCredentials? credentials) {
private async Task<HttpRequestMessage> CreateRequest(string path, string query, HttpMethod method, UserCredentials? credentials, CancellationToken cancellationToken) {
credentials ??= Settings.DefaultCredentials;

var baseAddress = Settings.ConnectivitySettings.Address;
var uriBuilder = new UriBuilder(baseAddress.Scheme, baseAddress.Host, baseAddress.Port, path);
uriBuilder.Query = query;
var scheme = Settings.ConnectivitySettings.Address.Scheme;
var channelInfo = await GetChannelInfo(cancellationToken).ConfigureAwait(false);
var uriBuilder = new UriBuilder(scheme, channelInfo.EndPoint.Host, channelInfo.EndPoint.Port, path) {
Query = query
};

var httpRequest = new HttpRequestMessage(method, uriBuilder.Uri);
httpRequest.Headers.Add("accept", "application/json");
Expand Down
4 changes: 2 additions & 2 deletions src/EventStore.Client/GossipChannelSelector.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,12 @@ public ChannelBase SelectChannel(DnsEndPoint endPoint) {
return _channels.GetChannelInfo(endPoint);
}

public async Task<ChannelBase> SelectChannelAsync(CancellationToken cancellationToken) {
public async Task<(ChannelBase,DnsEndPoint)> SelectChannelAsync(CancellationToken cancellationToken) {
var endPoint = await DiscoverAsync(cancellationToken).ConfigureAwait(false);

_log.LogInformation("Successfully discovered candidate at {endPoint}.", endPoint);

return _channels.GetChannelInfo(endPoint);
return (_channels.GetChannelInfo(endPoint),endPoint);
}

private async Task<DnsEndPoint> DiscoverAsync(CancellationToken cancellationToken) {
Expand Down
3 changes: 1 addition & 2 deletions src/EventStore.Client/IChannelSelector.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
using System;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
Expand All @@ -8,7 +7,7 @@
namespace EventStore.Client {
internal interface IChannelSelector {
// Let the channel selector pick an endpoint.
Task<ChannelBase> SelectChannelAsync(CancellationToken cancellationToken);
Task<(ChannelBase,DnsEndPoint)> SelectChannelAsync(CancellationToken cancellationToken);

// Get a channel for the specified endpoint
ChannelBase SelectChannel(DnsEndPoint endPoint);
Expand Down
4 changes: 2 additions & 2 deletions src/EventStore.Client/SingleNodeChannelSelector.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ public SingleNodeChannelSelector(
_https = string.Compare(uri.Scheme, Uri.UriSchemeHttps, ignoreCase: true) == 0;
}

public Task<ChannelBase> SelectChannelAsync(CancellationToken cancellationToken) =>
Task.FromResult(SelectChannel(_endPoint));
public Task<(ChannelBase,DnsEndPoint)> SelectChannelAsync(CancellationToken cancellationToken) =>
Task.FromResult((SelectChannel(_endPoint), _endPoint));

public ChannelBase SelectChannel(DnsEndPoint endPoint) =>
_channelCache.GetChannelInfo(endPoint, _https);
Expand Down
3 changes: 2 additions & 1 deletion test/EventStore.Client.Tests/GossipChannelSelectorTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@ public async Task ExplicitlySettingEndPointChangesChannels() {
new(secondId, ClusterMessages.VNodeState.Follower, true, secondSelection),
})));

var channel = await sut.SelectChannelAsync(cancellationToken: default);
var (channel,endPoint) = await sut.SelectChannelAsync(cancellationToken: default);
Assert.Equal($"{firstSelection.Host}:{firstSelection.Port}", channel.Target);
Assert.Equal(firstSelection, endPoint);

channel = sut.SelectChannel(secondSelection);
Assert.Equal($"{secondSelection.Host}:{secondSelection.Port}", channel.Target);
Expand Down

0 comments on commit f6b5084

Please sign in to comment.