diff --git a/samples/projection-management/Program.cs b/samples/projection-management/Program.cs index a8db65bbc..1d5151be8 100644 --- a/samples/projection-management/Program.cs +++ b/samples/projection-management/Program.cs @@ -100,7 +100,7 @@ private static async Task DisableNotFound(EventStoreProjectionManagementClient m #region DisableNotFound try { await managementClient.DisableAsync("projection that does not exists"); - } catch (InvalidOperationException e) when (e.Message.Contains("NotFound")) { + } catch (Exception e) when (e.Message.Contains("NotFound")) { Console.WriteLine(e.Message); } #endregion DisableNotFound @@ -116,7 +116,7 @@ private static async Task EnableNotFound(EventStoreProjectionManagementClient ma #region EnableNotFound try { await managementClient.EnableAsync("projection that does not exists"); - } catch (InvalidOperationException e) when (e.Message.Contains("NotFound")) { + } catch (Exception e) when (e.Message.Contains("NotFound")) { Console.WriteLine(e.Message); } #endregion EnableNotFound @@ -135,7 +135,7 @@ private static async Task Abort(EventStoreProjectionManagementClient managementC var js = "fromAll() .when({$init:function(){return {count:0};},$any:function(s, e){s.count += 1;}}).outputState();"; await managementClient.CreateContinuousAsync("countEvents_Abort", js); - } catch (InvalidOperationException e) when (e.Message.Contains("Conflict")) { + } catch (Exception e) when (e.Message.Contains("Conflict")) { // ignore was already created in a previous run } @@ -149,7 +149,7 @@ private static async Task Abort_NotFound(EventStoreProjectionManagementClient ma #region Abort_NotFound try { await managementClient.AbortAsync("projection that does not exists"); - } catch (InvalidOperationException e) when (e.Message.Contains("NotFound")) { + } catch (Exception e) when (e.Message.Contains("NotFound")) { Console.WriteLine(e.Message); } #endregion Abort_NotFound @@ -160,7 +160,7 @@ private static async Task Reset(EventStoreProjectionManagementClient managementC var js = "fromAll() .when({$init:function(){return {count:0};},$any:function(s, e){s.count += 1;}}).outputState();"; await managementClient.CreateContinuousAsync("countEvents_Reset", js); - } catch (InvalidOperationException e) when (e.Message.Contains("Conflict")) { + } catch (Exception e) when (e.Message.Contains("Conflict")) { // ignore was already created in a previous run } @@ -175,7 +175,7 @@ private static async Task Reset_NotFound(EventStoreProjectionManagementClient ma #region Reset_NotFound try { await managementClient.ResetAsync("projection that does not exists"); - } catch (InvalidOperationException e) when (e.Message.Contains("NotFound")) { + } catch (Exception e) when (e.Message.Contains("NotFound")) { Console.WriteLine(e.Message); } #endregion Reset_NotFound @@ -227,7 +227,7 @@ private static async Task CreateContinuous_Conflict(EventStoreProjectionManageme try { await managementClient.CreateContinuousAsync(name, js); - } catch (InvalidOperationException e) when (e.Message.Contains("Conflict")) { + } catch (Exception e) when (e.Message.Contains("Conflict")) { var format = $"{name} already exists"; Console.WriteLine(format); } @@ -259,7 +259,7 @@ private static async Task Update_NotFound(EventStoreProjectionManagementClient m #region Update_NotFound try { await managementClient.UpdateAsync("Update Not existing projection", "fromAll().when()"); - } catch (InvalidOperationException e) when (e.Message.Contains("NotFound")) { + } catch (Exception e) when (e.Message.Contains("NotFound")) { Console.WriteLine("'Update Not existing projection' does not exists and can not be updated"); } #endregion Update_NotFound diff --git a/src/EventStore.Client/ChannelFactory.cs b/src/EventStore.Client/ChannelFactory.cs index 6030f0d51..10b9cb36c 100644 --- a/src/EventStore.Client/ChannelFactory.cs +++ b/src/EventStore.Client/ChannelFactory.cs @@ -13,6 +13,8 @@ #nullable enable namespace EventStore.Client { internal static class ChannelFactory { + private const int MaxReceiveMessageLength = 17 * 1024 * 1024; + public static TChannel CreateChannel(EventStoreClientSettings settings, EndPoint endPoint, bool https) => CreateChannel(settings, endPoint.ToUri(https)); @@ -32,7 +34,8 @@ public static TChannel CreateChannel(EventStoreClientSettings settings, Uri? add }, LoggerFactory = settings.LoggerFactory, Credentials = settings.ChannelCredentials, - DisposeHttpClient = true + DisposeHttpClient = true, + MaxReceiveMessageSize = MaxReceiveMessageLength }); HttpMessageHandler CreateHandler() { @@ -56,6 +59,8 @@ IEnumerable GetChannelOptions() { yield return new ChannelOption("grpc.keepalive_timeout_ms", GetValue((int)settings.ConnectivitySettings.KeepAliveTimeout.TotalMilliseconds)); + + yield return new ChannelOption("grpc.max_receive_message_length", MaxReceiveMessageLength); } static int GetValue(int value) => value switch { diff --git a/src/EventStore.Client/Interceptors/ReportLeaderInterceptor.cs b/src/EventStore.Client/Interceptors/ReportLeaderInterceptor.cs index e30c697f1..58bdfbe66 100644 --- a/src/EventStore.Client/Interceptors/ReportLeaderInterceptor.cs +++ b/src/EventStore.Client/Interceptors/ReportLeaderInterceptor.cs @@ -64,8 +64,11 @@ public override AsyncServerStreamingCall AsyncServerStreamingCall(Task task) { if (task.Exception?.InnerException is NotLeaderException ex) { _onError(ex.LeaderEndpoint); - } else if (task.Exception?.InnerException?.InnerException is RpcException rpcException && - rpcException.StatusCode == StatusCode.Unavailable) { + } else if (task.Exception?.InnerException is RpcException { + StatusCode: StatusCode.Unavailable or + // StatusCode.Unknown or TODO: use RPC exceptions on server + StatusCode.Aborted + }) { _onError(null); } } diff --git a/src/EventStore.Client/Interceptors/TypedExceptionInterceptor.cs b/src/EventStore.Client/Interceptors/TypedExceptionInterceptor.cs index e9327813a..ece3eaf8b 100644 --- a/src/EventStore.Client/Interceptors/TypedExceptionInterceptor.cs +++ b/src/EventStore.Client/Interceptors/TypedExceptionInterceptor.cs @@ -91,7 +91,7 @@ private static Exception ConvertRpcException(RpcException ex, StatusCode.DeadlineExceeded, ex.Status.Detail, ex.Status.DebugException)), (StatusCode.DeadlineExceeded, _) => ex, (StatusCode.Unauthenticated, _) => new NotAuthenticatedException(ex.Message, ex), - _ => new InvalidOperationException(ex.Message, ex) + _ => ex } }; } diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/create_duplicate.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/create_duplicate.cs index 573188416..bde91ae87 100644 --- a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/create_duplicate.cs +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/create_duplicate.cs @@ -1,5 +1,6 @@ using System; using System.Threading.Tasks; +using Grpc.Core; using Xunit; namespace EventStore.Client.SubscriptionToAll { @@ -21,10 +22,12 @@ protected override Task When() => } [Fact] - public Task the_completion_fails_with_invalid_operation_exception() => - Assert.ThrowsAsync( + public async Task the_completion_fails() { + var ex = await Assert.ThrowsAsync( () => _fixture.Client.CreateToAllAsync("group32", new PersistentSubscriptionSettings(), TestCredentials.Root)); + Assert.Equal(StatusCode.AlreadyExists, ex.StatusCode); + } } } diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/create_with_commit_position_larger_than_last_indexed_position.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/create_with_commit_position_larger_than_last_indexed_position.cs index 95f5012cc..4e4558878 100644 --- a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/create_with_commit_position_larger_than_last_indexed_position.cs +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/create_with_commit_position_larger_than_last_indexed_position.cs @@ -1,6 +1,8 @@ using System; +using System.IO; using System.Linq; using System.Threading.Tasks; +using Grpc.Core; using Xunit; namespace EventStore.Client.SubscriptionToAll { @@ -24,11 +26,13 @@ protected override async Task Given() { } [Fact] - public Task fails_with_invalid_operation_exception() => - Assert.ThrowsAsync(() => + public async Task fails() { + var ex = await Assert.ThrowsAsync(() => _fixture.Client.CreateToAllAsync("group57", new PersistentSubscriptionSettings( - startFrom: new Position(_fixture.LastCommitPosition+1, _fixture.LastCommitPosition)), - TestCredentials.Root)); + startFrom: new Position(_fixture.LastCommitPosition + 1, _fixture.LastCommitPosition)), + TestCredentials.Root)); + Assert.Equal(StatusCode.Internal, ex.StatusCode); + } } } diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/update_existing_with_commit_position_larger_than_last_indexed_position.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/update_existing_with_commit_position_larger_than_last_indexed_position.cs index e17f95498..c3a395a78 100644 --- a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/update_existing_with_commit_position_larger_than_last_indexed_position.cs +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/update_existing_with_commit_position_larger_than_last_indexed_position.cs @@ -1,6 +1,7 @@ using System; using System.Linq; using System.Threading.Tasks; +using Grpc.Core; using Xunit; namespace EventStore.Client.SubscriptionToAll { @@ -28,13 +29,15 @@ await Client.CreateToAllAsync(Group, } protected override Task Given() => Task.CompletedTask; } - + [Fact] - public Task fails_with_invalid_operation_exception() => - Assert.ThrowsAsync(() => + public async Task fails() { + var ex = await Assert.ThrowsAsync(() => _fixture.Client.UpdateToAllAsync(Group, new PersistentSubscriptionSettings( - startFrom: new Position(_fixture.LastCommitPosition+1, _fixture.LastCommitPosition)), - TestCredentials.Root)); + startFrom: new Position(_fixture.LastCommitPosition + 1, _fixture.LastCommitPosition)), + TestCredentials.Root)); + Assert.Equal(StatusCode.Internal, ex.StatusCode); + } } } diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/create_duplicate.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/create_duplicate.cs index 5367f6762..00ad95d1b 100644 --- a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/create_duplicate.cs +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/create_duplicate.cs @@ -1,5 +1,6 @@ using System; using System.Threading.Tasks; +using Grpc.Core; using Xunit; namespace EventStore.Client.SubscriptionToStream { @@ -21,10 +22,12 @@ protected override Task When() => } [Fact] - public Task the_completion_fails_with_invalid_operation_exception() => - Assert.ThrowsAsync( + public async Task the_completion_fails() { + var ex = await Assert.ThrowsAsync( () => _fixture.Client.CreateAsync(Stream, "group32", new PersistentSubscriptionSettings(), TestCredentials.Root)); + Assert.Equal(StatusCode.AlreadyExists, ex.StatusCode); + } } } diff --git a/test/EventStore.Client.Streams.Tests/EventStoreClientFixture.cs b/test/EventStore.Client.Streams.Tests/EventStoreClientFixture.cs index 14209bf70..04b11f4d3 100644 --- a/test/EventStore.Client.Streams.Tests/EventStoreClientFixture.cs +++ b/test/EventStore.Client.Streams.Tests/EventStoreClientFixture.cs @@ -6,7 +6,7 @@ namespace EventStore.Client { public abstract class EventStoreClientFixture : EventStoreClientFixtureBase { public EventStoreClient Client { get; } protected EventStoreClientFixture(EventStoreClientSettings? settings = null, - IDictionary? env = null) : base(settings, env) { + Dictionary? env = null) : base(settings, env) { Client = new EventStoreClient(Settings); } diff --git a/test/EventStore.Client.Streams.Tests/append_to_stream_retry.cs b/test/EventStore.Client.Streams.Tests/append_to_stream_retry.cs index f0e3d8011..3273dde58 100644 --- a/test/EventStore.Client.Streams.Tests/append_to_stream_retry.cs +++ b/test/EventStore.Client.Streams.Tests/append_to_stream_retry.cs @@ -1,6 +1,7 @@ using System; using System.Collections.Generic; using System.Threading.Tasks; +using Grpc.Core; using Polly; using Xunit; @@ -25,7 +26,11 @@ public async Task can_retry() { // writeTask cannot complete because ES is stopped var ex = await Assert.ThrowsAnyAsync(() => WriteAnEventAsync(new StreamRevision(0))); - Assert.True(ex is InvalidOperationException or DiscoveryException); + Assert.True(ex is RpcException { + Status: { + StatusCode: StatusCode.Unavailable + } + } or DiscoveryException); await _fixture.TestServer.StartAsync().WithTimeout(); diff --git a/test/EventStore.Client.Streams.Tests/reconnection.cs b/test/EventStore.Client.Streams.Tests/reconnection.cs new file mode 100644 index 000000000..1d4b6a2bb --- /dev/null +++ b/test/EventStore.Client.Streams.Tests/reconnection.cs @@ -0,0 +1,86 @@ +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using Grpc.Core; +using Xunit; + +namespace EventStore.Client { + public class reconnection : IClassFixture { + private readonly Fixture _fixture; + + public reconnection(Fixture fixture) { + _fixture = fixture; + } + + [Fact] + public async Task when_the_connection_is_lost() { + var streamName = _fixture.GetStreamName(); + var eventCount = 512; + var tcs = new TaskCompletionSource(); + var signal = new TaskCompletionSource(); + var events = new List(); + var resubscribe = new TaskCompletionSource(); + + using var _ = await _fixture.Client.SubscribeToStreamAsync(streamName, FromStream.Start, + EventAppeared, subscriptionDropped: SubscriptionDropped) + .WithTimeout(); + + await _fixture.Client + .AppendToStreamAsync(streamName, StreamState.NoStream, _fixture.CreateTestEvents(eventCount)) + .WithTimeout(); // ensure we get backpressure + + _fixture.TestServer.Stop(); + await Task.Delay(TimeSpan.FromSeconds(2)); + + await _fixture.TestServer.StartAsync().WithTimeout(); + signal.SetResult(null); + + await resubscribe.Task.WithTimeout(TimeSpan.FromSeconds(10)); + + await tcs.Task.WithTimeout(TimeSpan.FromSeconds(10)); + + async Task EventAppeared(StreamSubscription s, ResolvedEvent e, CancellationToken ct) { + await signal.Task; + events.Add(e); + if (events.Count == eventCount) { + tcs.TrySetResult(null); + } + } + + void SubscriptionDropped(StreamSubscription s, SubscriptionDroppedReason reason, Exception ex) { + if (reason == SubscriptionDroppedReason.Disposed) { + return; + } + + if (ex is not RpcException { + Status: { + StatusCode: StatusCode.Unavailable + } + }) { + tcs.TrySetException(ex); + } else { + var task = _fixture.Client.SubscribeToStreamAsync(streamName, + FromStream.After(events[^1].OriginalEventNumber), + EventAppeared, subscriptionDropped: SubscriptionDropped); + task.ContinueWith(_ => resubscribe.SetResult(_.Result), TaskContinuationOptions.NotOnFaulted); + task.ContinueWith(_ => resubscribe.SetException(_.Exception!.GetBaseException()), + TaskContinuationOptions.OnlyOnFaulted); + } + } + } + + public class Fixture : EventStoreClientFixture { + protected override Task Given() => Task.CompletedTask; + + protected override Task When() => Task.CompletedTask; + + public Fixture() : base(env: new() { + ["EVENTSTORE_MEM_DB"] = "false" + }) { + Settings.ConnectivitySettings.DiscoveryInterval = TimeSpan.FromMilliseconds(100); + Settings.ConnectivitySettings.GossipTimeout = TimeSpan.FromMilliseconds(100); + } + } + } +} diff --git a/test/EventStore.Client.Streams.Tests/sending_and_receiving_large_messages.cs b/test/EventStore.Client.Streams.Tests/sending_and_receiving_large_messages.cs index 8aa8b2d8a..49b7e6961 100644 --- a/test/EventStore.Client.Streams.Tests/sending_and_receiving_large_messages.cs +++ b/test/EventStore.Client.Streams.Tests/sending_and_receiving_large_messages.cs @@ -18,11 +18,10 @@ public sending_and_receiving_large_messages(Fixture fixture, ITestOutputHelper o [Fact] public async Task over_the_hard_limit() { var streamName = _fixture.GetStreamName(); - var ex = await Assert.ThrowsAsync(() => _fixture.Client.AppendToStreamAsync( + var ex = await Assert.ThrowsAsync(() => _fixture.Client.AppendToStreamAsync( streamName, StreamState.NoStream, _fixture.LargeEvent)); - var rpcEx = Assert.IsType(ex.InnerException); - Assert.Equal(StatusCode.ResourceExhausted, rpcEx.StatusCode); + Assert.Equal(StatusCode.ResourceExhausted, ex.StatusCode); } public class Fixture : EventStoreClientFixture { diff --git a/test/EventStore.Client.Tests/Interceptors/ReportLeaderInterceptorTests.cs b/test/EventStore.Client.Tests/Interceptors/ReportLeaderInterceptorTests.cs index ab27174ae..f36599e2b 100644 --- a/test/EventStore.Client.Tests/Interceptors/ReportLeaderInterceptorTests.cs +++ b/test/EventStore.Client.Tests/Interceptors/ReportLeaderInterceptorTests.cs @@ -10,11 +10,17 @@ namespace EventStore.Client.Interceptors { public class ReportLeaderInterceptorTests { - private static readonly Marshaller _marshaller = - new Marshaller(_ => Array.Empty(), _ => new object()); - public delegate Task GrpcCall(Interceptor interceptor, Task response = null); + private static readonly Marshaller _marshaller = new(_ => Array.Empty(), _ => new object()); + + private static readonly StatusCode[] ForcesRediscoveryStatusCodes = { + StatusCode.Aborted, + //StatusCode.Unknown, TODO: use RPC exceptions on server + StatusCode.Unavailable + }; + + private static IEnumerable GrpcCalls() { yield return MakeUnaryCall; yield return MakeClientStreamingCall; @@ -22,9 +28,9 @@ private static IEnumerable GrpcCalls() { yield return MakeServerStreamingCall; } - public static IEnumerable TestCases() => GrpcCalls().Select(call => new object[] {call}); + public static IEnumerable ReportsNewLeaderCases() => GrpcCalls().Select(call => new object[] {call}); - [Theory, MemberData(nameof(TestCases))] + [Theory, MemberData(nameof(ReportsNewLeaderCases))] public async Task ReportsNewLeader(GrpcCall call) { EndPoint actual = default; var sut = new ReportLeaderInterceptor(ep => actual = ep); @@ -34,6 +40,43 @@ public async Task ReportsNewLeader(GrpcCall call) { Assert.Equal(result.LeaderEndpoint, actual); } + public static IEnumerable ForcesRediscoveryCases() => from call in GrpcCalls() + from statusCode in ForcesRediscoveryStatusCodes + select new object[] {call, statusCode}; + + [Theory, MemberData(nameof(ForcesRediscoveryCases))] + public async Task ForcesRediscovery(GrpcCall call, StatusCode statusCode) { + EndPoint actual = default; + bool invoked = false; + + var sut = new ReportLeaderInterceptor(ep => { + invoked = true; + actual = ep; + }); + + var result = await Assert.ThrowsAsync(() => call(sut, + Task.FromException(new RpcException(new Status(statusCode, "oops"))))); + Assert.Null(actual); + Assert.True(invoked); + } + + public static IEnumerable DoesNotForceRediscoveryCases() => from call in GrpcCalls() + from statusCode in Enum.GetValues(typeof(StatusCode)) + .OfType() + .Except(ForcesRediscoveryStatusCodes) + select new object[] {call, statusCode}; + + [Theory, MemberData(nameof(DoesNotForceRediscoveryCases))] + public async Task DoesNotForceRediscovery(GrpcCall call, StatusCode statusCode) { + bool invoked = false; + var sut = new ReportLeaderInterceptor(ep => invoked = true); + + var result = await Assert.ThrowsAsync(() => call(sut, + Task.FromException(new RpcException(new Status(statusCode, "oops"))))); + Assert.False(invoked); + } + + private static async Task MakeUnaryCall(Interceptor interceptor, Task response = null) { using var call = interceptor.AsyncUnaryCall(new object(), CreateClientInterceptorContext(MethodType.Unary), @@ -73,8 +116,7 @@ private static async Task MakeDuplexStreamingCall(Interceptor interceptor, Task< private static void OnDispose() { } private static ClientInterceptorContext CreateClientInterceptorContext(MethodType methodType) => - new ClientInterceptorContext( - new Method(methodType, string.Empty, string.Empty, _marshaller, _marshaller), + new(new Method(methodType, string.Empty, string.Empty, _marshaller, _marshaller), null, new CallOptions(new Metadata())); private class TestAsyncStreamReader : IAsyncStreamReader {