Skip to content

Commit

Permalink
Trigger rediscovery when failing to send a message on a streaming call
Browse files Browse the repository at this point in the history
- Duplex streaming and client streaming.

Before this, persistent subscriptions could keep retrying subscribing to a down node without rediscovering.
This could happen if the original subscription failed while sending an ack (we were not triggering a rediscovery on failed send)
and then on retry we send the ReadReq (which would fail, not triggering rediscovery) before trying to read from the server.
  • Loading branch information
timothycoleman committed Sep 9, 2022
1 parent f8c7704 commit 98644c8
Showing 1 changed file with 35 additions and 5 deletions.
40 changes: 35 additions & 5 deletions src/EventStore.Client/Interceptors/ReportLeaderInterceptor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ public override AsyncClientStreamingCall<TRequest, TResponse> AsyncClientStreami

response.ResponseAsync.ContinueWith(OnReconnectionRequired, ContinuationOptions);

return new AsyncClientStreamingCall<TRequest, TResponse>(response.RequestStream, response.ResponseAsync,
return new AsyncClientStreamingCall<TRequest, TResponse>(
new StreamWriter<TRequest>(response.RequestStream, OnReconnectionRequired),
response.ResponseAsync,
response.ResponseHeadersAsync, response.GetStatus, response.GetTrailers, response.Dispose);
}

Expand All @@ -44,7 +46,8 @@ public override AsyncDuplexStreamingCall<TRequest, TResponse> AsyncDuplexStreami
AsyncDuplexStreamingCallContinuation<TRequest, TResponse> continuation) {
var response = continuation(context);

return new AsyncDuplexStreamingCall<TRequest, TResponse>(response.RequestStream,
return new AsyncDuplexStreamingCall<TRequest, TResponse>(
new StreamWriter<TRequest>(response.RequestStream, OnReconnectionRequired),
new StreamReader<TResponse>(response.ResponseStream, OnReconnectionRequired),
response.ResponseHeadersAsync,
response.GetStatus, response.GetTrailers, response.Dispose);
Expand All @@ -61,7 +64,7 @@ public override AsyncServerStreamingCall<TResponse> AsyncServerStreamingCall<TRe
response.GetStatus, response.GetTrailers, response.Dispose);
}

private void OnReconnectionRequired<TResponse>(Task<TResponse> task) {
private void OnReconnectionRequired(Task task) {
ReconnectionRequired reconnectionRequired = task.Exception?.InnerException switch {
NotLeaderException ex => new ReconnectionRequired.NewLeader(ex.LeaderEndpoint),
RpcException {
Expand All @@ -75,11 +78,38 @@ private void OnReconnectionRequired<TResponse>(Task<TResponse> task) {
_onReconnectionRequired(reconnectionRequired);
}

private class StreamWriter<T> : IClientStreamWriter<T> {
private readonly IClientStreamWriter<T> _inner;
private readonly Action<Task> _reportNewLeader;

public StreamWriter(IClientStreamWriter<T> inner, Action<Task> reportNewLeader) {
_inner = inner;
_reportNewLeader = reportNewLeader;
}

public WriteOptions WriteOptions {
get => _inner.WriteOptions;
set => _inner.WriteOptions = value;
}

public Task CompleteAsync() {
var task = _inner.CompleteAsync();
task.ContinueWith(_reportNewLeader, ContinuationOptions);
return task;
}

public Task WriteAsync(T message) {
var task = _inner.WriteAsync(message);
task.ContinueWith(_reportNewLeader, ContinuationOptions);
return task;
}
}

private class StreamReader<T> : IAsyncStreamReader<T> {
private readonly IAsyncStreamReader<T> _inner;
private readonly Action<Task<bool>> _reportNewLeader;
private readonly Action<Task> _reportNewLeader;

public StreamReader(IAsyncStreamReader<T> inner, Action<Task<bool>> reportNewLeader) {
public StreamReader(IAsyncStreamReader<T> inner, Action<Task> reportNewLeader) {
_inner = inner;
_reportNewLeader = reportNewLeader;
}
Expand Down

0 comments on commit 98644c8

Please sign in to comment.