Skip to content

Commit

Permalink
implement RFC 574
Browse files Browse the repository at this point in the history
- set grpc.max_receive_message_length to 17MB
- force rediscovery only when lost connection or unknown error
  • Loading branch information
thefringeninja committed Feb 16, 2022
1 parent 0377efe commit b08718f
Show file tree
Hide file tree
Showing 13 changed files with 190 additions and 37 deletions.
16 changes: 8 additions & 8 deletions samples/projection-management/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ private static async Task DisableNotFound(EventStoreProjectionManagementClient m
#region DisableNotFound
try {
await managementClient.DisableAsync("projection that does not exists");
} catch (InvalidOperationException e) when (e.Message.Contains("NotFound")) {
} catch (Exception e) when (e.Message.Contains("NotFound")) {
Console.WriteLine(e.Message);
}
#endregion DisableNotFound
Expand All @@ -116,7 +116,7 @@ private static async Task EnableNotFound(EventStoreProjectionManagementClient ma
#region EnableNotFound
try {
await managementClient.EnableAsync("projection that does not exists");
} catch (InvalidOperationException e) when (e.Message.Contains("NotFound")) {
} catch (Exception e) when (e.Message.Contains("NotFound")) {
Console.WriteLine(e.Message);
}
#endregion EnableNotFound
Expand All @@ -135,7 +135,7 @@ private static async Task Abort(EventStoreProjectionManagementClient managementC
var js =
"fromAll() .when({$init:function(){return {count:0};},$any:function(s, e){s.count += 1;}}).outputState();";
await managementClient.CreateContinuousAsync("countEvents_Abort", js);
} catch (InvalidOperationException e) when (e.Message.Contains("Conflict")) {
} catch (Exception e) when (e.Message.Contains("Conflict")) {
// ignore was already created in a previous run
}

Expand All @@ -149,7 +149,7 @@ private static async Task Abort_NotFound(EventStoreProjectionManagementClient ma
#region Abort_NotFound
try {
await managementClient.AbortAsync("projection that does not exists");
} catch (InvalidOperationException e) when (e.Message.Contains("NotFound")) {
} catch (Exception e) when (e.Message.Contains("NotFound")) {
Console.WriteLine(e.Message);
}
#endregion Abort_NotFound
Expand All @@ -160,7 +160,7 @@ private static async Task Reset(EventStoreProjectionManagementClient managementC
var js =
"fromAll() .when({$init:function(){return {count:0};},$any:function(s, e){s.count += 1;}}).outputState();";
await managementClient.CreateContinuousAsync("countEvents_Reset", js);
} catch (InvalidOperationException e) when (e.Message.Contains("Conflict")) {
} catch (Exception e) when (e.Message.Contains("Conflict")) {
// ignore was already created in a previous run
}

Expand All @@ -175,7 +175,7 @@ private static async Task Reset_NotFound(EventStoreProjectionManagementClient ma
#region Reset_NotFound
try {
await managementClient.ResetAsync("projection that does not exists");
} catch (InvalidOperationException e) when (e.Message.Contains("NotFound")) {
} catch (Exception e) when (e.Message.Contains("NotFound")) {
Console.WriteLine(e.Message);
}
#endregion Reset_NotFound
Expand Down Expand Up @@ -227,7 +227,7 @@ private static async Task CreateContinuous_Conflict(EventStoreProjectionManageme
try {

await managementClient.CreateContinuousAsync(name, js);
} catch (InvalidOperationException e) when (e.Message.Contains("Conflict")) {
} catch (Exception e) when (e.Message.Contains("Conflict")) {
var format = $"{name} already exists";
Console.WriteLine(format);
}
Expand Down Expand Up @@ -259,7 +259,7 @@ private static async Task Update_NotFound(EventStoreProjectionManagementClient m
#region Update_NotFound
try {
await managementClient.UpdateAsync("Update Not existing projection", "fromAll().when()");
} catch (InvalidOperationException e) when (e.Message.Contains("NotFound")) {
} catch (Exception e) when (e.Message.Contains("NotFound")) {
Console.WriteLine("'Update Not existing projection' does not exists and can not be updated");
}
#endregion Update_NotFound
Expand Down
7 changes: 6 additions & 1 deletion src/EventStore.Client/ChannelFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
#nullable enable
namespace EventStore.Client {
internal static class ChannelFactory {
private const int MaxReceiveMessageLength = 17 * 1024 * 1024;

public static TChannel CreateChannel(EventStoreClientSettings settings, EndPoint endPoint, bool https) =>
CreateChannel(settings, endPoint.ToUri(https));

Expand All @@ -32,7 +34,8 @@ public static TChannel CreateChannel(EventStoreClientSettings settings, Uri? add
},
LoggerFactory = settings.LoggerFactory,
Credentials = settings.ChannelCredentials,
DisposeHttpClient = true
DisposeHttpClient = true,
MaxReceiveMessageSize = MaxReceiveMessageLength
});

HttpMessageHandler CreateHandler() {
Expand All @@ -56,6 +59,8 @@ IEnumerable<ChannelOption> GetChannelOptions() {

yield return new ChannelOption("grpc.keepalive_timeout_ms",
GetValue((int)settings.ConnectivitySettings.KeepAliveTimeout.TotalMilliseconds));

yield return new ChannelOption("grpc.max_receive_message_length", MaxReceiveMessageLength);
}

static int GetValue(int value) => value switch {
Expand Down
7 changes: 5 additions & 2 deletions src/EventStore.Client/Interceptors/ReportLeaderInterceptor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,11 @@ public override AsyncServerStreamingCall<TResponse> AsyncServerStreamingCall<TRe
private void ReportNewLeader<TResponse>(Task<TResponse> task) {
if (task.Exception?.InnerException is NotLeaderException ex) {
_onError(ex.LeaderEndpoint);
} else if (task.Exception?.InnerException?.InnerException is RpcException rpcException &&
rpcException.StatusCode == StatusCode.Unavailable) {
} else if (task.Exception?.InnerException is RpcException {
StatusCode: StatusCode.Unavailable or
// StatusCode.Unknown or TODO: use RPC exceptions on server
StatusCode.Aborted
}) {
_onError(null);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ private static Exception ConvertRpcException(RpcException ex,
StatusCode.DeadlineExceeded, ex.Status.Detail, ex.Status.DebugException)),
(StatusCode.DeadlineExceeded, _) => ex,
(StatusCode.Unauthenticated, _) => new NotAuthenticatedException(ex.Message, ex),
_ => new InvalidOperationException(ex.Message, ex)
_ => ex
}
};
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Threading.Tasks;
using Grpc.Core;
using Xunit;

namespace EventStore.Client.SubscriptionToAll {
Expand All @@ -21,10 +22,12 @@ protected override Task When() =>
}

[Fact]
public Task the_completion_fails_with_invalid_operation_exception() =>
Assert.ThrowsAsync<InvalidOperationException>(
public async Task the_completion_fails() {
var ex = await Assert.ThrowsAsync<RpcException>(
() => _fixture.Client.CreateToAllAsync("group32",
new PersistentSubscriptionSettings(),
TestCredentials.Root));
Assert.Equal(StatusCode.AlreadyExists, ex.StatusCode);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
using System;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using Grpc.Core;
using Xunit;

namespace EventStore.Client.SubscriptionToAll {
Expand All @@ -24,11 +26,13 @@ protected override async Task Given() {
}

[Fact]
public Task fails_with_invalid_operation_exception() =>
Assert.ThrowsAsync<InvalidOperationException>(() =>
public async Task fails() {
var ex = await Assert.ThrowsAsync<RpcException>(() =>
_fixture.Client.CreateToAllAsync("group57",
new PersistentSubscriptionSettings(
startFrom: new Position(_fixture.LastCommitPosition+1, _fixture.LastCommitPosition)),
TestCredentials.Root));
startFrom: new Position(_fixture.LastCommitPosition + 1, _fixture.LastCommitPosition)),
TestCredentials.Root));
Assert.Equal(StatusCode.Internal, ex.StatusCode);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System.Linq;
using System.Threading.Tasks;
using Grpc.Core;
using Xunit;

namespace EventStore.Client.SubscriptionToAll {
Expand Down Expand Up @@ -28,13 +29,15 @@ await Client.CreateToAllAsync(Group,
}
protected override Task Given() => Task.CompletedTask;
}

[Fact]
public Task fails_with_invalid_operation_exception() =>
Assert.ThrowsAsync<InvalidOperationException>(() =>
public async Task fails() {
var ex = await Assert.ThrowsAsync<RpcException>(() =>
_fixture.Client.UpdateToAllAsync(Group,
new PersistentSubscriptionSettings(
startFrom: new Position(_fixture.LastCommitPosition+1, _fixture.LastCommitPosition)),
TestCredentials.Root));
startFrom: new Position(_fixture.LastCommitPosition + 1, _fixture.LastCommitPosition)),
TestCredentials.Root));
Assert.Equal(StatusCode.Internal, ex.StatusCode);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Threading.Tasks;
using Grpc.Core;
using Xunit;

namespace EventStore.Client.SubscriptionToStream {
Expand All @@ -21,10 +22,12 @@ protected override Task When() =>
}

[Fact]
public Task the_completion_fails_with_invalid_operation_exception() =>
Assert.ThrowsAsync<InvalidOperationException>(
public async Task the_completion_fails() {
var ex = await Assert.ThrowsAsync<RpcException>(
() => _fixture.Client.CreateAsync(Stream, "group32",
new PersistentSubscriptionSettings(),
TestCredentials.Root));
Assert.Equal(StatusCode.AlreadyExists, ex.StatusCode);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ namespace EventStore.Client {
public abstract class EventStoreClientFixture : EventStoreClientFixtureBase {
public EventStoreClient Client { get; }
protected EventStoreClientFixture(EventStoreClientSettings? settings = null,
IDictionary<string, string>? env = null) : base(settings, env) {
Dictionary<string, string>? env = null) : base(settings, env) {
Client = new EventStoreClient(Settings);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Grpc.Core;
using Polly;
using Xunit;

Expand All @@ -25,7 +26,11 @@ public async Task can_retry() {

// writeTask cannot complete because ES is stopped
var ex = await Assert.ThrowsAnyAsync<Exception>(() => WriteAnEventAsync(new StreamRevision(0)));
Assert.True(ex is InvalidOperationException or DiscoveryException);
Assert.True(ex is RpcException {
Status: {
StatusCode: StatusCode.Unavailable
}
} or DiscoveryException);

await _fixture.TestServer.StartAsync().WithTimeout();

Expand Down
86 changes: 86 additions & 0 deletions test/EventStore.Client.Streams.Tests/reconnection.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Grpc.Core;
using Xunit;

namespace EventStore.Client {
public class reconnection : IClassFixture<reconnection.Fixture> {
private readonly Fixture _fixture;

public reconnection(Fixture fixture) {
_fixture = fixture;
}

[Fact]
public async Task when_the_connection_is_lost() {
var streamName = _fixture.GetStreamName();
var eventCount = 512;
var tcs = new TaskCompletionSource<object>();
var signal = new TaskCompletionSource<object>();
var events = new List<ResolvedEvent>();
var resubscribe = new TaskCompletionSource<StreamSubscription>();

using var _ = await _fixture.Client.SubscribeToStreamAsync(streamName, FromStream.Start,
EventAppeared, subscriptionDropped: SubscriptionDropped)
.WithTimeout();

await _fixture.Client
.AppendToStreamAsync(streamName, StreamState.NoStream, _fixture.CreateTestEvents(eventCount))
.WithTimeout(); // ensure we get backpressure

_fixture.TestServer.Stop();
await Task.Delay(TimeSpan.FromSeconds(2));

await _fixture.TestServer.StartAsync().WithTimeout();
signal.SetResult(null);

await resubscribe.Task.WithTimeout(TimeSpan.FromSeconds(10));

await tcs.Task.WithTimeout(TimeSpan.FromSeconds(10));

async Task EventAppeared(StreamSubscription s, ResolvedEvent e, CancellationToken ct) {
await signal.Task;
events.Add(e);
if (events.Count == eventCount) {
tcs.TrySetResult(null);
}
}

void SubscriptionDropped(StreamSubscription s, SubscriptionDroppedReason reason, Exception ex) {
if (reason == SubscriptionDroppedReason.Disposed) {
return;
}

if (ex is not RpcException {
Status: {
StatusCode: StatusCode.Unavailable
}
}) {
tcs.TrySetException(ex);
} else {
var task = _fixture.Client.SubscribeToStreamAsync(streamName,
FromStream.After(events[^1].OriginalEventNumber),
EventAppeared, subscriptionDropped: SubscriptionDropped);
task.ContinueWith(_ => resubscribe.SetResult(_.Result), TaskContinuationOptions.NotOnFaulted);
task.ContinueWith(_ => resubscribe.SetException(_.Exception!.GetBaseException()),
TaskContinuationOptions.OnlyOnFaulted);
}
}
}

public class Fixture : EventStoreClientFixture {
protected override Task Given() => Task.CompletedTask;

protected override Task When() => Task.CompletedTask;

public Fixture() : base(env: new() {
["EVENTSTORE_MEM_DB"] = "false"
}) {
Settings.ConnectivitySettings.DiscoveryInterval = TimeSpan.FromMilliseconds(100);
Settings.ConnectivitySettings.GossipTimeout = TimeSpan.FromMilliseconds(100);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,10 @@ public sending_and_receiving_large_messages(Fixture fixture, ITestOutputHelper o
[Fact]
public async Task over_the_hard_limit() {
var streamName = _fixture.GetStreamName();
var ex = await Assert.ThrowsAsync<InvalidOperationException>(() => _fixture.Client.AppendToStreamAsync(
var ex = await Assert.ThrowsAsync<RpcException>(() => _fixture.Client.AppendToStreamAsync(
streamName, StreamState.NoStream,
_fixture.LargeEvent));
var rpcEx = Assert.IsType<RpcException>(ex.InnerException);
Assert.Equal(StatusCode.ResourceExhausted, rpcEx.StatusCode);
Assert.Equal(StatusCode.ResourceExhausted, ex.StatusCode);
}

public class Fixture : EventStoreClientFixture {
Expand Down
Loading

0 comments on commit b08718f

Please sign in to comment.