Skip to content

Commit

Permalink
resubscribe in loop in case the server isn't ready yet
Browse files Browse the repository at this point in the history
  • Loading branch information
thefringeninja committed Feb 17, 2022
1 parent 94626a1 commit d34de6d
Showing 1 changed file with 21 additions and 6 deletions.
27 changes: 21 additions & 6 deletions test/EventStore.Client.Streams.Tests/reconnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,27 @@ void SubscriptionDropped(StreamSubscription s, SubscriptionDroppedReason reason,
}) {
tcs.TrySetException(ex);
} else {
var task = _fixture.Client.SubscribeToStreamAsync(streamName,
FromStream.After(events[^1].OriginalEventNumber),
EventAppeared, subscriptionDropped: SubscriptionDropped);
task.ContinueWith(_ => resubscribe.SetResult(_.Result), TaskContinuationOptions.NotOnFaulted);
task.ContinueWith(_ => resubscribe.SetException(_.Exception!.GetBaseException()),
TaskContinuationOptions.OnlyOnFaulted);
Resubscribe();

void Resubscribe() {
var task = _fixture.Client.SubscribeToStreamAsync(streamName,
FromStream.After(events[^1].OriginalEventNumber),
EventAppeared, subscriptionDropped: SubscriptionDropped);
task.ContinueWith(_ => resubscribe.SetResult(_.Result),
TaskContinuationOptions.OnlyOnRanToCompletion);
task.ContinueWith(_ => {
var ex = _.Exception!.GetBaseException();
if (ex is RpcException {
StatusCode: StatusCode.DeadlineExceeded
}) {
Task.Delay(200).ContinueWith(_ => Resubscribe());
} else {
resubscribe.SetException(_.Exception!.GetBaseException());
}
},
TaskContinuationOptions.OnlyOnFaulted);
}
}
}
}
Expand Down

0 comments on commit d34de6d

Please sign in to comment.