From 98644c8028fef919f70006b765cb98c532f472f0 Mon Sep 17 00:00:00 2001 From: Timothy Coleman Date: Fri, 9 Sep 2022 13:40:09 +0100 Subject: [PATCH] Trigger rediscovery when failing to send a message on a streaming call - Duplex streaming and client streaming. Before this, persistent subscriptions could keep retrying subscribing to a down node without rediscovering. This could happen if the original subscription failed while sending an ack (we were not triggering a rediscovery on failed send) and then on retry we send the ReadReq (which would fail, not triggering rediscovery) before trying to read from the server. --- .../Interceptors/ReportLeaderInterceptor.cs | 40 ++++++++++++++++--- 1 file changed, 35 insertions(+), 5 deletions(-) diff --git a/src/EventStore.Client/Interceptors/ReportLeaderInterceptor.cs b/src/EventStore.Client/Interceptors/ReportLeaderInterceptor.cs index a3f111861..0685d838e 100644 --- a/src/EventStore.Client/Interceptors/ReportLeaderInterceptor.cs +++ b/src/EventStore.Client/Interceptors/ReportLeaderInterceptor.cs @@ -35,7 +35,9 @@ public override AsyncClientStreamingCall AsyncClientStreami response.ResponseAsync.ContinueWith(OnReconnectionRequired, ContinuationOptions); - return new AsyncClientStreamingCall(response.RequestStream, response.ResponseAsync, + return new AsyncClientStreamingCall( + new StreamWriter(response.RequestStream, OnReconnectionRequired), + response.ResponseAsync, response.ResponseHeadersAsync, response.GetStatus, response.GetTrailers, response.Dispose); } @@ -44,7 +46,8 @@ public override AsyncDuplexStreamingCall AsyncDuplexStreami AsyncDuplexStreamingCallContinuation continuation) { var response = continuation(context); - return new AsyncDuplexStreamingCall(response.RequestStream, + return new AsyncDuplexStreamingCall( + new StreamWriter(response.RequestStream, OnReconnectionRequired), new StreamReader(response.ResponseStream, OnReconnectionRequired), response.ResponseHeadersAsync, response.GetStatus, response.GetTrailers, response.Dispose); @@ -61,7 +64,7 @@ public override AsyncServerStreamingCall AsyncServerStreamingCall(Task task) { + private void OnReconnectionRequired(Task task) { ReconnectionRequired reconnectionRequired = task.Exception?.InnerException switch { NotLeaderException ex => new ReconnectionRequired.NewLeader(ex.LeaderEndpoint), RpcException { @@ -75,11 +78,38 @@ private void OnReconnectionRequired(Task task) { _onReconnectionRequired(reconnectionRequired); } + private class StreamWriter : IClientStreamWriter { + private readonly IClientStreamWriter _inner; + private readonly Action _reportNewLeader; + + public StreamWriter(IClientStreamWriter inner, Action reportNewLeader) { + _inner = inner; + _reportNewLeader = reportNewLeader; + } + + public WriteOptions WriteOptions { + get => _inner.WriteOptions; + set => _inner.WriteOptions = value; + } + + public Task CompleteAsync() { + var task = _inner.CompleteAsync(); + task.ContinueWith(_reportNewLeader, ContinuationOptions); + return task; + } + + public Task WriteAsync(T message) { + var task = _inner.WriteAsync(message); + task.ContinueWith(_reportNewLeader, ContinuationOptions); + return task; + } + } + private class StreamReader : IAsyncStreamReader { private readonly IAsyncStreamReader _inner; - private readonly Action> _reportNewLeader; + private readonly Action _reportNewLeader; - public StreamReader(IAsyncStreamReader inner, Action> reportNewLeader) { + public StreamReader(IAsyncStreamReader inner, Action reportNewLeader) { _inner = inner; _reportNewLeader = reportNewLeader; }