From 93ff2ddff48cabe7e3a32bb494a25bf4c5bd40d1 Mon Sep 17 00:00:00 2001 From: thefringeninja <495495+thefringeninja@users.noreply.github.com> Date: Thu, 27 Jan 2022 20:24:34 +0100 Subject: [PATCH] implement subscription position --- samples/subscribing-to-streams/Program.cs | 25 ++- .../EventStoreClient.Subscriptions.cs | 151 ++---------------- .../Streams/ReadReq.cs | 52 +++--- src/EventStore.Client/SubscriptionPosition.cs | 116 ++++++++++++++ .../SubscriptionStreamPosition.cs | 107 +++++++++++++ .../update_existing_with_check_point.cs | 19 ++- ...date_existing_with_check_point_filtered.cs | 12 +- .../when_writing_and_filtering_out_events.cs | 21 +-- .../update_existing_with_check_point.cs | 19 +-- .../Bugs/Issue_104.cs | 21 +-- .../Bugs/Issue_2544.cs | 14 +- .../Security/SecurityFixture.cs | 50 +++--- .../subscribe_resolve_link_to.cs | 22 +-- .../subscribe_to_all.cs | 16 +- .../subscribe_to_all_filtered.cs | 12 +- .../subscribe_to_all_filtered_live.cs | 7 +- ...subscribe_to_all_filtered_with_position.cs | 7 +- .../subscribe_to_all_live.cs | 8 +- .../subscribe_to_all_with_position.cs | 13 +- .../subscribe_to_stream.cs | 24 ++- .../subscribe_to_stream_live.cs | 16 +- .../subscribe_to_stream_with_revision.cs | 22 ++- .../SubscriptionPositionTests.cs | 61 +++++++ .../SubscriptionStreamPositionTests.cs | 64 ++++++++ 24 files changed, 570 insertions(+), 309 deletions(-) create mode 100644 src/EventStore.Client/SubscriptionPosition.cs create mode 100644 src/EventStore.Client/SubscriptionStreamPosition.cs create mode 100644 test/EventStore.Client.Tests/SubscriptionPositionTests.cs create mode 100644 test/EventStore.Client.Tests/SubscriptionStreamPositionTests.cs diff --git a/samples/subscribing-to-streams/Program.cs b/samples/subscribing-to-streams/Program.cs index 6286305c1..8a61db311 100644 --- a/samples/subscribing-to-streams/Program.cs +++ b/samples/subscribing-to-streams/Program.cs @@ -1,5 +1,6 @@ using System; using System.Net.Http; +using System.Reflection.Metadata; using System.Threading; using System.Threading.Tasks; using EventStore.Client; @@ -20,6 +21,7 @@ static async Task Main(string[] args) { private static async Task SubscribeToStream(EventStoreClient client) { #region subscribe-to-stream await client.SubscribeToStreamAsync("some-stream", + SubscriptionStreamPosition.Start, async (subscription, evnt, cancellationToken) => { Console.WriteLine($"Received event {evnt.OriginalEventNumber}@{evnt.OriginalStreamId}"); await HandleEvent(evnt); @@ -29,33 +31,34 @@ await client.SubscribeToStreamAsync("some-stream", #region subscribe-to-stream-from-position await client.SubscribeToStreamAsync( "some-stream", - StreamPosition.FromInt64(20), + SubscriptionStreamPosition.After(StreamPosition.FromInt64(20)), EventAppeared); #endregion subscribe-to-stream-from-position #region subscribe-to-stream-live await client.SubscribeToStreamAsync( "some-stream", - StreamPosition.End, + SubscriptionStreamPosition.End, EventAppeared); #endregion subscribe-to-stream-live #region subscribe-to-stream-resolving-linktos await client.SubscribeToStreamAsync( "$et-myEventType", - StreamPosition.Start, + SubscriptionStreamPosition.Start, EventAppeared, resolveLinkTos: true); #endregion subscribe-to-stream-resolving-linktos #region subscribe-to-stream-subscription-dropped - var checkpoint = StreamPosition.Start; + + var checkpoint = await ReadStreamCheckpointAsync(); await client.SubscribeToStreamAsync( "some-stream", checkpoint, eventAppeared: async (subscription, evnt, cancellationToken) => { await HandleEvent(evnt); - checkpoint = evnt.OriginalEventNumber; + checkpoint = SubscriptionStreamPosition.After(evnt.OriginalEventNumber); }, subscriptionDropped: ((subscription, reason, exception) => { Console.WriteLine($"Subscription was dropped due to {reason}. {exception}"); @@ -90,12 +93,12 @@ await client.SubscribeToAllAsync( #region subscribe-to-all-live await client.SubscribeToAllAsync( - SubscriptionPosition.Live, + SubscriptionPosition.End, EventAppeared); #endregion subscribe-to-all-live #region subscribe-to-all-subscription-dropped - var checkpoint = SubscriptionPosition.Start; + var checkpoint = await ReadCheckpointAsync(); await client.SubscribeToAllAsync( checkpoint, eventAppeared: async (subscription, evnt, cancellationToken) => { @@ -147,7 +150,13 @@ private static Task HandleEvent(ResolvedEvent evnt) { return Task.CompletedTask; } - private static void Resubscribe(StreamPosition checkpoint) { } + private static void Resubscribe(SubscriptionStreamPosition checkpoint) { } private static void Resubscribe(SubscriptionPosition checkpoint) { } + + private static Task ReadStreamCheckpointAsync() => + Task.FromResult(SubscriptionStreamPosition.Start); + + private static Task ReadCheckpointAsync() => + Task.FromResult(SubscriptionPosition.Start); } } diff --git a/src/EventStore.Client.Streams/EventStoreClient.Subscriptions.cs b/src/EventStore.Client.Streams/EventStoreClient.Subscriptions.cs index 9d4b6070f..00ab62084 100644 --- a/src/EventStore.Client.Streams/EventStoreClient.Subscriptions.cs +++ b/src/EventStore.Client.Streams/EventStoreClient.Subscriptions.cs @@ -6,33 +6,10 @@ #nullable enable namespace EventStore.Client { public partial class EventStoreClient { - private Task SubscribeToAllAsync( - Func eventAppeared, - EventStoreClientOperationOptions operationOptions, - bool resolveLinkTos = false, - Action? subscriptionDropped = default, - SubscriptionFilterOptions? filterOptions = null, - UserCredentials? userCredentials = null, - CancellationToken cancellationToken = default) { - operationOptions.TimeoutAfter = DeadLine.None; - - return StreamSubscription.Confirm(ReadInternal(new ReadReq { - Options = new ReadReq.Types.Options { - ReadDirection = ReadReq.Types.Options.Types.ReadDirection.Forwards, - ResolveLinks = resolveLinkTos, - All = new ReadReq.Types.Options.Types.AllOptions { - Start = new Empty() - }, - Subscription = new ReadReq.Types.Options.Types.SubscriptionOptions(), - Filter = GetFilterOptions(filterOptions)! - } - }, operationOptions, userCredentials, cancellationToken), eventAppeared, subscriptionDropped, _log, - filterOptions?.CheckpointReached, cancellationToken); - } - /// - /// Subscribes to all events. Use this when you have no checkpoint. + /// Subscribes to all events. /// + /// A (exclusive of) to start the subscription from. /// A Task invoked and awaited when a new event is received over the subscription. /// An to configure the operation's options. /// Whether to resolve LinkTo events automatically. @@ -42,6 +19,7 @@ private Task SubscribeToAllAsync( /// The optional . /// public Task SubscribeToAllAsync( + SubscriptionPosition start, Func eventAppeared, bool resolveLinkTos = false, Action? subscriptionDropped = default, @@ -52,25 +30,13 @@ public Task SubscribeToAllAsync( var operationOptions = Settings.OperationOptions.Clone(); configureOperationOptions?.Invoke(operationOptions); - return SubscribeToAllAsync(eventAppeared, operationOptions, resolveLinkTos, subscriptionDropped, - filterOptions, userCredentials, cancellationToken); - } - - private Task SubscribeToAllAsync(Position start, - Func eventAppeared, - EventStoreClientOperationOptions operationOptions, - bool resolveLinkTos = false, - Action? subscriptionDropped = default, - SubscriptionFilterOptions? filterOptions = null, - UserCredentials? userCredentials = null, - CancellationToken cancellationToken = default) { operationOptions.TimeoutAfter = DeadLine.None; return StreamSubscription.Confirm(ReadInternal(new ReadReq { Options = new ReadReq.Types.Options { ReadDirection = ReadReq.Types.Options.Types.ReadDirection.Forwards, ResolveLinks = resolveLinkTos, - All = ReadReq.Types.Options.Types.AllOptions.FromPosition(start), + All = ReadReq.Types.Options.Types.AllOptions.FromSubscriptionPosition(start), Subscription = new ReadReq.Types.Options.Types.SubscriptionOptions(), Filter = GetFilterOptions(filterOptions)! } @@ -79,136 +45,39 @@ private Task SubscribeToAllAsync(Position start, } /// - /// Subscribes to all events from a checkpoint. This is exclusive of. + /// Subscribes to a stream from a checkpoint. /// - /// A (exclusive of) to start the subscription from. + /// A (exclusive of) to start the subscription from. + /// The name of the stream to read events from. /// A Task invoked and awaited when a new event is received over the subscription. /// An to configure the operation's options. /// Whether to resolve LinkTo events automatically. /// An action invoked if the subscription is dropped. - /// The optional to apply. /// The optional user credentials to perform operation with. /// The optional . /// - public Task SubscribeToAllAsync(Position start, + public Task SubscribeToStreamAsync(string streamName, + SubscriptionStreamPosition start, Func eventAppeared, bool resolveLinkTos = false, Action? subscriptionDropped = default, - SubscriptionFilterOptions? filterOptions = null, Action? configureOperationOptions = null, UserCredentials? userCredentials = null, CancellationToken cancellationToken = default) { var operationOptions = Settings.OperationOptions.Clone(); configureOperationOptions?.Invoke(operationOptions); - return SubscribeToAllAsync(start, eventAppeared, operationOptions, resolveLinkTos, subscriptionDropped, - filterOptions, userCredentials, cancellationToken); - } - - private Task SubscribeToStreamAsync(string streamName, - Func eventAppeared, - EventStoreClientOperationOptions operationOptions, - bool resolveLinkTos = false, - Action? subscriptionDropped = default, - UserCredentials? userCredentials = null, - CancellationToken cancellationToken = default) { operationOptions.TimeoutAfter = DeadLine.None; return StreamSubscription.Confirm(ReadInternal(new ReadReq { Options = new ReadReq.Types.Options { ReadDirection = ReadReq.Types.Options.Types.ReadDirection.Forwards, ResolveLinks = resolveLinkTos, - Stream = new ReadReq.Types.Options.Types.StreamOptions { - Start = new Empty(), - StreamIdentifier = streamName - }, + Stream = ReadReq.Types.Options.Types.StreamOptions.FromSubscriptionPosition(streamName, start), Subscription = new ReadReq.Types.Options.Types.SubscriptionOptions() } }, operationOptions, userCredentials, cancellationToken), eventAppeared, subscriptionDropped, _log, cancellationToken: cancellationToken); } - - /// - /// Subscribes to all events in a stream. Use this when you have no checkpoint. - /// - /// The name of the stream to read events from. - /// A Task invoked and awaited when a new event is received over the subscription. - /// An to configure the operation's options. - /// Whether to resolve LinkTo events automatically. - /// An action invoked if the subscription is dropped. - /// The optional user credentials to perform operation with. - /// The optional . - /// - public Task SubscribeToStreamAsync(string streamName, - Func eventAppeared, - bool resolveLinkTos = false, - Action? subscriptionDropped = default, - Action? configureOperationOptions = null, - UserCredentials? userCredentials = null, - CancellationToken cancellationToken = default) { - var operationOptions = Settings.OperationOptions.Clone(); - configureOperationOptions?.Invoke(operationOptions); - - return SubscribeToStreamAsync(streamName, eventAppeared, operationOptions, resolveLinkTos, - subscriptionDropped, - userCredentials, cancellationToken); - } - - private Task SubscribeToStreamAsync(string streamName, - StreamPosition start, - Func eventAppeared, - EventStoreClientOperationOptions operationOptions, - bool resolveLinkTos = false, - Action? subscriptionDropped = default, - UserCredentials? userCredentials = null, - CancellationToken cancellationToken = default) { - operationOptions.TimeoutAfter = DeadLine.None; - - return StreamSubscription.Confirm(ReadInternal(new ReadReq { - Options = new ReadReq.Types.Options { - ReadDirection = ReadReq.Types.Options.Types.ReadDirection.Forwards, - ResolveLinks = resolveLinkTos, - Stream = start == StreamPosition.End - ? new ReadReq.Types.Options.Types.StreamOptions { - End = new Empty(), - StreamIdentifier = streamName - } - : new ReadReq.Types.Options.Types.StreamOptions { - Revision = start, - StreamIdentifier = streamName - }, - Subscription = new ReadReq.Types.Options.Types.SubscriptionOptions() - } - }, - operationOptions, userCredentials, cancellationToken), eventAppeared, subscriptionDropped, _log, - cancellationToken: cancellationToken); - } - - /// - /// Subscribes to a stream from a checkpoint. This is exclusive of. - /// - /// A (exclusive of) to start the subscription from. - /// The name of the stream to read events from. - /// A Task invoked and awaited when a new event is received over the subscription. - /// An to configure the operation's options. - /// Whether to resolve LinkTo events automatically. - /// An action invoked if the subscription is dropped. - /// The optional user credentials to perform operation with. - /// The optional . - /// - public Task SubscribeToStreamAsync(string streamName, - StreamPosition start, - Func eventAppeared, - bool resolveLinkTos = false, - Action? subscriptionDropped = default, - Action? configureOperationOptions = null, - UserCredentials? userCredentials = null, - CancellationToken cancellationToken = default) { - var operationOptions = Settings.OperationOptions.Clone(); - configureOperationOptions?.Invoke(operationOptions); - - return SubscribeToStreamAsync(streamName, start, eventAppeared, operationOptions, resolveLinkTos, - subscriptionDropped, userCredentials, cancellationToken); - } } } diff --git a/src/EventStore.Client.Streams/Streams/ReadReq.cs b/src/EventStore.Client.Streams/Streams/ReadReq.cs index c60020c8c..16f42394c 100644 --- a/src/EventStore.Client.Streams/Streams/ReadReq.cs +++ b/src/EventStore.Client.Streams/Streams/ReadReq.cs @@ -6,9 +6,25 @@ partial class Types { partial class Options { partial class Types { partial class StreamOptions { + public static StreamOptions FromSubscriptionPosition(string streamName, + SubscriptionStreamPosition subscriptionPosition) => + subscriptionPosition.TryGetUInt64(out var revision) switch { + false => new() { + StreamIdentifier = streamName, + Start = new Empty() + }, + true when subscriptionPosition == SubscriptionStreamPosition.End => new() { + StreamIdentifier = streamName, + End = new Empty() + }, + _ => new() { + StreamIdentifier = streamName, + Revision = revision + } + }; + public static StreamOptions FromStreamNameAndRevision( - string streamName, - StreamPosition streamRevision) { + string streamName, StreamPosition streamRevision) { if (streamName == null) { throw new ArgumentNullException(nameof(streamName)); } @@ -35,25 +51,21 @@ public static StreamOptions FromStreamNameAndRevision( } partial class AllOptions { - public static AllOptions FromPosition(Client.Position position) { - if (position == Client.Position.End) { - return new AllOptions { - End = new Empty() - }; - } - - if (position == Client.Position.Start) { - return new AllOptions { - Start = new Empty() - }; - } - - return new AllOptions { - Position = new Position { - CommitPosition = position.CommitPosition, - PreparePosition = position.PreparePosition + public static AllOptions FromSubscriptionPosition(SubscriptionPosition position) { + return position.TryGetUInt64(out var p) switch { + false => new() {Start = new()}, + true when position == SubscriptionPosition.End => new() {End = new()}, + _ => new() { + Position = new() + {CommitPosition = p.commitPosition, PreparePosition = p.preparePosition} } - }; + }; false => new() {Start = new()}, + true when position == SubscriptionPosition.End => new() {End = new()}, + _ => new() { + Position = new() + {CommitPosition = p.commitPosition, PreparePosition = p.preparePosition} + } + } } } diff --git a/src/EventStore.Client/SubscriptionPosition.cs b/src/EventStore.Client/SubscriptionPosition.cs new file mode 100644 index 000000000..5c11d2cb1 --- /dev/null +++ b/src/EventStore.Client/SubscriptionPosition.cs @@ -0,0 +1,116 @@ +using System; + +#nullable enable +namespace EventStore.Client { + /// + /// A structure representing the logical position of a subscription to all. /> + /// + public readonly struct SubscriptionPosition : IEquatable, IComparable, + IComparable { + /// + /// Represents a when no events have been seen (i.e., the beginning). + /// + public static readonly SubscriptionPosition Start = new(null); + + /// + /// Represents a to receive events written after the subscription is confirmed. + /// + public static readonly SubscriptionPosition End = new(Position.End); + + /// + /// Returns a for the given . + /// + /// The . + /// + /// + public static SubscriptionPosition After(Position position) => position == Position.End + ? throw new ArgumentException("Use 'SubscriptionPosition.End.'", nameof(position)) + : new(position); + + private readonly Position? _value; + + private SubscriptionPosition(Position? value) => _value = value; + + /// + /// Converts the to a . + /// + /// + /// + public bool TryGetUInt64(out (ulong commitPosition, ulong preparePosition) value) { + value = default; + if (this == Start) { + return false; + } + + value = (_value!.Value.CommitPosition, _value.Value.PreparePosition); + return true; + } + + /// + /// Converts the to a . + /// + /// + /// + public (ulong commitPosition, ulong preparePosition) ToUInt64() => this == Start + ? throw new InvalidOperationException( + $"{nameof(SubscriptionPosition)}.{nameof(Start)} may not be converted.") + : (_value!.Value.CommitPosition, _value!.Value.PreparePosition); + + /// + public bool Equals(SubscriptionPosition other) => Nullable.Equals(_value, other._value); + + /// + public override bool Equals(object? obj) => obj is SubscriptionPosition other && Equals(other); + + /// + public override int GetHashCode() => _value.GetHashCode(); + +#pragma warning disable CS1591 + public static bool operator ==(SubscriptionPosition left, SubscriptionPosition right) => + Nullable.Equals(left, right); + + public static bool operator !=(SubscriptionPosition left, SubscriptionPosition right) => + !Nullable.Equals(left, right); + + public static bool operator >(SubscriptionPosition left, SubscriptionPosition right) => + left.CompareTo(right) > 0; + + public static bool operator <(SubscriptionPosition left, SubscriptionPosition right) => + left.CompareTo(right) < 0; + + public static bool operator >=(SubscriptionPosition left, SubscriptionPosition right) => + left.CompareTo(right) >= 0; + + public static bool operator <=(SubscriptionPosition left, SubscriptionPosition right) => + left.CompareTo(right) <= 0; +#pragma warning restore CS1591 + + /// + public int CompareTo(SubscriptionPosition other) => (_value, other._value) switch { + (null, null) => 0, + (null, _) => -1, + (_, null) => 1, + _ => _value.Value.CompareTo(other._value.Value) + }; + + /// + public int CompareTo(object? obj) => obj switch { + null => 1, + SubscriptionPosition other => CompareTo(other), + _ => throw new ArgumentException("Object is not a SubscriptionPosition"), + }; + + /// + public override string ToString() { + if (_value is null) { + return "Start"; + } + + if (_value == Position.End) { + return "Live"; + } + + return _value.Value.ToString(); + } + } +} diff --git a/src/EventStore.Client/SubscriptionStreamPosition.cs b/src/EventStore.Client/SubscriptionStreamPosition.cs new file mode 100644 index 000000000..dee4d5f20 --- /dev/null +++ b/src/EventStore.Client/SubscriptionStreamPosition.cs @@ -0,0 +1,107 @@ +using System; + +#nullable enable +namespace EventStore.Client { + /// + /// A structure representing the logical position of a subscription. /> + /// + public readonly struct SubscriptionStreamPosition : IEquatable, + IComparable, IComparable { + /// + /// Represents a when no events have been seen (i.e., the beginning). + /// + public static readonly SubscriptionStreamPosition Start = new(null); + + /// + /// Represents a to receive events written after the subscription is confirmed. + /// + public static readonly SubscriptionStreamPosition End = new(StreamPosition.End); + + private readonly StreamPosition? _value; + + /// + /// Returns a for the given . + /// + /// The . + /// + /// + public static SubscriptionStreamPosition After(StreamPosition streamPosition) => + streamPosition == StreamPosition.End + ? throw new ArgumentException("Use 'SubscriptionStreamPosition.End.'", nameof(streamPosition)) + : new(streamPosition); + + private SubscriptionStreamPosition(StreamPosition? value) => _value = value; + + /// + /// Converts the to a . + /// + /// + /// + public bool TryGetUInt64(out ulong value) { + value = default; + if (this == Start) { + return false; + } + + value = _value!.Value; + return true; + } + + /// + public bool Equals(SubscriptionStreamPosition other) => Nullable.Equals(_value, other._value); + + /// + public override bool Equals(object? obj) => obj is SubscriptionStreamPosition other && Equals(other); + + /// + public override int GetHashCode() => _value.GetHashCode(); + +#pragma warning disable CS1591 + public static bool operator ==(SubscriptionStreamPosition left, SubscriptionStreamPosition right) => + left.Equals(right); + + public static bool operator !=(SubscriptionStreamPosition left, SubscriptionStreamPosition right) => + !left.Equals(right); + + public static bool operator <(SubscriptionStreamPosition left, SubscriptionStreamPosition right) => + left.CompareTo(right) < 0; + + public static bool operator >(SubscriptionStreamPosition left, SubscriptionStreamPosition right) => + left.CompareTo(right) > 0; + + public static bool operator <=(SubscriptionStreamPosition left, SubscriptionStreamPosition right) => + left.CompareTo(right) <= 0; + + public static bool operator >=(SubscriptionStreamPosition left, SubscriptionStreamPosition right) => + left.CompareTo(right) >= 0; +#pragma warning restore CS1591 + + /// + public int CompareTo(SubscriptionStreamPosition other) => (_value, other._value) switch { + (null, null) => 0, + (null, _) => -1, + (_, null) => 1, + _ => _value.Value.CompareTo(other._value.Value) + }; + + /// + public int CompareTo(object? obj) => obj switch { + null => 1, + SubscriptionStreamPosition other => CompareTo(other), + _ => throw new ArgumentException("Object is not a SubscriptionStreamPosition"), + }; + + /// + public override string ToString() { + if (_value is null) { + return "Start"; + } + + if (_value == StreamPosition.End) { + return "Live"; + } + + return _value.Value.ToString(); + } + } +} diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/update_existing_with_check_point.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/update_existing_with_check_point.cs index 0663c1b37..5e23ab40c 100644 --- a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/update_existing_with_check_point.cs +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/update_existing_with_check_point.cs @@ -7,7 +7,6 @@ namespace EventStore.Client.SubscriptionToAll { public class update_existing_with_check_point : IClassFixture { - private const string Group = "existing-with-check-point"; private readonly Fixture _fixture; @@ -24,7 +23,7 @@ public async Task resumes_from_check_point() { public class Fixture : EventStoreClientFixture { public Task Resumed => _resumedSource.Task; public Position CheckPoint { get; private set; } - + private readonly TaskCompletionSource<(SubscriptionDroppedReason, Exception)> _droppedSource; private readonly TaskCompletionSource _resumedSource; private readonly TaskCompletionSource _checkPointSource; @@ -43,12 +42,12 @@ public Fixture() { _appearedEvents = new List(); _events = CreateTestEvents(5).ToArray(); } - + protected override async Task Given() { foreach (var e in _events) { await StreamsClient.AppendToStreamAsync("test-" + Guid.NewGuid(), StreamState.Any, new[] {e}); } - + await Client.CreateToAllAsync(Group, new PersistentSubscriptionSettings( minCheckPointCount: 5, @@ -58,8 +57,9 @@ await Client.CreateToAllAsync(Group, var checkPointStream = $"$persistentsubscription-$all::{Group}-checkpoint"; _checkPointSubscription = await StreamsClient.SubscribeToStreamAsync(checkPointStream, - (s, e, ct) => { - _checkPointSource.TrySetResult(e); + SubscriptionStreamPosition.Start, + (_, e, _) => { + _checkPointSource.TrySetResult(e); return Task.CompletedTask; }, subscriptionDropped: (_, reason, ex) => { if (ex is not null) { @@ -69,7 +69,7 @@ await Client.CreateToAllAsync(Group, } }, userCredentials: TestCredentials.Root); - + _firstSubscription = await Client.SubscribeToAllAsync(Group, eventAppeared: async (s, e, r, ct) => { _appearedEvents.Add(e); @@ -91,14 +91,13 @@ protected override async Task When() { await Client.UpdateToAllAsync(Group, new PersistentSubscriptionSettings(), TestCredentials.Root); await _droppedSource.Task.WithTimeout(); - + _secondSubscription = await Client.SubscribeToAllAsync(Group, async (s, e, r, ct) => { _resumedSource.TrySetResult(e); await s.Ack(e); }, (_, reason, ex) => { - if (ex is not null) { _resumedSource.TrySetException(ex); } else { @@ -106,7 +105,7 @@ protected override async Task When() { } }, TestCredentials.Root); - + foreach (var e in _events) { await StreamsClient.AppendToStreamAsync("test-" + Guid.NewGuid(), StreamState.Any, new[] {e}); } diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/update_existing_with_check_point_filtered.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/update_existing_with_check_point_filtered.cs index fb752100d..bb1e05b10 100644 --- a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/update_existing_with_check_point_filtered.cs +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/update_existing_with_check_point_filtered.cs @@ -7,7 +7,6 @@ namespace EventStore.Client.SubscriptionToAll { public class update_existing_with_check_point_filtered : IClassFixture { - private const string Group = "existing-with-check-point-filtered"; private readonly Fixture _fixture; @@ -24,7 +23,7 @@ public async Task resumes_from_check_point() { public class Fixture : EventStoreClientFixture { public Task Resumed => _resumedSource.Task; public Position CheckPoint { get; private set; } - + private readonly TaskCompletionSource<(SubscriptionDroppedReason, Exception)> _droppedSource; private readonly TaskCompletionSource _resumedSource; private readonly TaskCompletionSource _checkPointSource; @@ -43,7 +42,7 @@ public Fixture() { _appearedEvents = new List(); _events = CreateTestEvents(5).ToArray(); } - + protected override async Task Given() { foreach (var e in _events) { await StreamsClient.AppendToStreamAsync("test-" + Guid.NewGuid(), StreamState.NoStream, new[] {e}); @@ -59,6 +58,7 @@ await Client.CreateToAllAsync(Group, var checkPointStream = $"$persistentsubscription-$all::{Group}-checkpoint"; _checkPointSubscription = await StreamsClient.SubscribeToStreamAsync(checkPointStream, + SubscriptionStreamPosition.Start, (_, e, ct) => { _checkPointSource.TrySetResult(e); return Task.CompletedTask; @@ -71,7 +71,7 @@ await Client.CreateToAllAsync(Group, } }, userCredentials: TestCredentials.Root); - + _firstSubscription = await Client.SubscribeToAllAsync(Group, eventAppeared: async (s, e, r, ct) => { _appearedEvents.Add(e); @@ -93,7 +93,7 @@ protected override async Task When() { await Client.UpdateToAllAsync(Group, new PersistentSubscriptionSettings(), TestCredentials.Root); await _droppedSource.Task.WithTimeout(); - + _secondSubscription = await Client.SubscribeToAllAsync(Group, eventAppeared: async (s, e, r, ct) => { _resumedSource.TrySetResult(e); @@ -108,7 +108,7 @@ protected override async Task When() { } }, userCredentials: TestCredentials.Root); - + foreach (var e in _events) { await StreamsClient.AppendToStreamAsync("test-" + Guid.NewGuid(), StreamState.NoStream, new[] {e}); } diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/when_writing_and_filtering_out_events.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/when_writing_and_filtering_out_events.cs index 29fe9dafa..e4ff684f5 100644 --- a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/when_writing_and_filtering_out_events.cs +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToAll/when_writing_and_filtering_out_events.cs @@ -7,7 +7,6 @@ namespace EventStore.Client.SubscriptionToAll { public class when_writing_and_filtering_out_events : IClassFixture { - private const string Group = "filtering-out-events"; private readonly Fixture _fixture; @@ -29,7 +28,7 @@ public class Fixture : EventStoreClientFixture { public Position FirstCheckPoint { get; private set; } public EventData[] Events => _events.ToArray(); public ResolvedEvent[] AppearedEvents => _appearedEvents.ToArray(); - + private readonly TaskCompletionSource _firstCheckPointSource, _secondCheckPointSource; private PersistentSubscription _subscription; private StreamSubscription _checkPointSubscription; @@ -46,12 +45,12 @@ public Fixture() { _checkPoints = new List(); _events = CreateTestEvents(5).ToArray(); } - + protected override async Task Given() { foreach (var e in _events) { await StreamsClient.AppendToStreamAsync("test-" + Guid.NewGuid(), StreamState.Any, new[] {e}); } - + await Client.CreateToAllAsync(Group, StreamFilter.Prefix("test"), new PersistentSubscriptionSettings( @@ -59,19 +58,21 @@ await Client.CreateToAllAsync(Group, checkPointAfter: TimeSpan.FromSeconds(1), startFrom: Position.Start), TestCredentials.Root); - + _checkPointSubscription = await StreamsClient.SubscribeToStreamAsync(_checkPointStream, - (s, e, ct) => { + SubscriptionStreamPosition.Start, + (_, e, _) => { if (_checkPoints.Count == 0) { - _firstCheckPointSource.TrySetResult(e); + _firstCheckPointSource.TrySetResult(e); } else { _secondCheckPointSource.TrySetResult(e); } + _checkPoints.Add(e); return Task.CompletedTask; }, userCredentials: TestCredentials.Root); - + _subscription = await Client.SubscribeToAllAsync(Group, eventAppeared: async (s, e, r, ct) => { _appearedEvents.Add(e); @@ -83,13 +84,13 @@ await Client.CreateToAllAsync(Group, userCredentials: TestCredentials.Root); await Task.WhenAll(_appeared.Task, _firstCheckPointSource.Task).WithTimeout(); - + FirstCheckPoint = _firstCheckPointSource.Task.Result.Event.Data.ParsePosition(); } protected override async Task When() { foreach (var e in _events) { - await StreamsClient.AppendToStreamAsync("filtered-out-stream-" + Guid.NewGuid(), + await StreamsClient.AppendToStreamAsync("filtered-out-stream-" + Guid.NewGuid(), StreamState.Any, new[] {e}); } } diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/update_existing_with_check_point.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/update_existing_with_check_point.cs index bbf89654e..d029fbe29 100644 --- a/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/update_existing_with_check_point.cs +++ b/test/EventStore.Client.PersistentSubscriptions.Tests/SubscriptionToStream/update_existing_with_check_point.cs @@ -23,11 +23,11 @@ public async Task resumes_from_check_point() { var resumedEvent = await _fixture.Resumed.WithTimeout(TimeSpan.FromSeconds(10)); Assert.Equal(_fixture.CheckPoint.Next(), resumedEvent.Event.EventNumber); } - + public class Fixture : EventStoreClientFixture { public Task Resumed => _resumedSource.Task; public StreamPosition CheckPoint { get; private set; } - + private readonly TaskCompletionSource<(SubscriptionDroppedReason, Exception)> _droppedSource; private readonly TaskCompletionSource _resumedSource; private readonly TaskCompletionSource _checkPointSource; @@ -48,7 +48,7 @@ public Fixture() { protected override async Task Given() { await StreamsClient.AppendToStreamAsync(Stream, StreamState.NoStream, _events); - + await Client.CreateAsync(Stream, Group, new PersistentSubscriptionSettings( minCheckPointCount: 5, @@ -58,7 +58,8 @@ await Client.CreateAsync(Stream, Group, var checkPointStream = $"$persistentsubscription-{Stream}::{Group}-checkpoint"; await StreamsClient.SubscribeToStreamAsync(checkPointStream, - (_, e, ct) => { + SubscriptionStreamPosition.Start, + (_, e, _) => { _checkPointSource.TrySetResult(e); return Task.CompletedTask; }, @@ -70,7 +71,7 @@ await StreamsClient.SubscribeToStreamAsync(checkPointStream, } }, userCredentials: TestCredentials.Root); - + _firstSubscription = await Client.SubscribeToStreamAsync(Stream, Group, eventAppeared: async (s, e, r, ct) => { _appearedEvents.Add(e); @@ -86,13 +87,13 @@ await StreamsClient.SubscribeToStreamAsync(checkPointStream, CheckPoint = _checkPointSource.Task.Result.Event.Data.ParseStreamPosition(); } - + protected override async Task When() { // Force restart of the subscription await Client.UpdateAsync(Stream, Group, new PersistentSubscriptionSettings(), TestCredentials.Root); - + await _droppedSource.Task.WithTimeout(); - + _secondSubscription = await Client.SubscribeToStreamAsync(Stream, Group, eventAppeared: async (s, e, r, ct) => { _resumedSource.TrySetResult(e); @@ -106,7 +107,7 @@ protected override async Task When() { } }, userCredentials: TestCredentials.Root); - + await StreamsClient.AppendToStreamAsync(Stream, StreamState.Any, CreateTestEvents(1)); } diff --git a/test/EventStore.Client.Streams.Tests/Bugs/Issue_104.cs b/test/EventStore.Client.Streams.Tests/Bugs/Issue_104.cs index 66c31a6f5..1188d6253 100644 --- a/test/EventStore.Client.Streams.Tests/Bugs/Issue_104.cs +++ b/test/EventStore.Client.Streams.Tests/Bugs/Issue_104.cs @@ -19,19 +19,20 @@ public async Task subscription_does_not_send_checkpoint_reached_after_disposal() await _fixture.Client.AppendToStreamAsync(streamName, StreamRevision.None, _fixture.CreateTestEvents()); - var subscription = await _fixture.Client.SubscribeToAllAsync((_, __, ___) => { + var subscription = await _fixture.Client.SubscribeToAllAsync( + SubscriptionPosition.Start, + (_, _, _) => { eventAppeared.TrySetResult(true); return Task.CompletedTask; - }, false, - (_, __, ____) => subscriptionDisposed.TrySetResult(true), - new SubscriptionFilterOptions(StreamFilter.Prefix(streamName), 1, (_, __, ____) => { - if (!subscriptionDisposed.Task.IsCompleted) { - return Task.CompletedTask; - } + }, false, (_, _, _) => subscriptionDisposed.TrySetResult(true), new SubscriptionFilterOptions( + StreamFilter.Prefix(streamName), 1, (_, _, _) => { + if (!subscriptionDisposed.Task.IsCompleted) { + return Task.CompletedTask; + } - checkpointReachAfterDisposed.TrySetResult(true); - return Task.CompletedTask; - }), userCredentials: new UserCredentials("admin", "changeit")); + checkpointReachAfterDisposed.TrySetResult(true); + return Task.CompletedTask; + }), userCredentials: new UserCredentials("admin", "changeit")); await eventAppeared.Task; diff --git a/test/EventStore.Client.Streams.Tests/Bugs/Issue_2544.cs b/test/EventStore.Client.Streams.Tests/Bugs/Issue_2544.cs index 968d81344..0d871ba45 100644 --- a/test/EventStore.Client.Streams.Tests/Bugs/Issue_2544.cs +++ b/test/EventStore.Client.Streams.Tests/Bugs/Issue_2544.cs @@ -1,7 +1,6 @@ using System; using System.Collections.Generic; using System.Linq; -using System.Text; using System.Threading.Tasks; using Xunit; using Xunit.Abstractions; @@ -33,7 +32,8 @@ public async Task subscribe_to_stream(int iteration) { var streamName = $"{_fixture.GetStreamName()}_{iteration}"; using var _ = await _fixture.Client.SubscribeToStreamAsync(streamName, - (_, e, ct) => EventAppeared(e, streamName), subscriptionDropped: SubscriptionDropped); + SubscriptionStreamPosition.Start, + (_, e, _) => EventAppeared(e, streamName), subscriptionDropped: SubscriptionDropped); await AppendEvents(streamName); @@ -44,8 +44,8 @@ public async Task subscribe_to_stream(int iteration) { public async Task subscribe_to_all(int iteration) { var streamName = $"{_fixture.GetStreamName()}_{iteration}"; - using var _ = await _fixture.Client.SubscribeToAllAsync((_, e, ct) => EventAppeared(e, streamName), - subscriptionDropped: SubscriptionDropped); + using var _ = await _fixture.Client.SubscribeToAllAsync(SubscriptionPosition.Start, + (_, e, _) => EventAppeared(e, streamName), false, SubscriptionDropped); await AppendEvents(streamName); @@ -56,9 +56,9 @@ public async Task subscribe_to_all(int iteration) { public async Task subscribe_to_all_filtered(int iteration) { var streamName = $"{_fixture.GetStreamName()}_{iteration}"; - using var _ = await _fixture.Client.SubscribeToAllAsync((_, e, ct) => EventAppeared(e, streamName), - subscriptionDropped: SubscriptionDropped, - filterOptions: new SubscriptionFilterOptions(EventTypeFilter.ExcludeSystemEvents())); + using var _ = await _fixture.Client.SubscribeToAllAsync(SubscriptionPosition.Start, + (_, e, _) => EventAppeared(e, streamName), false, SubscriptionDropped, + new SubscriptionFilterOptions(EventTypeFilter.ExcludeSystemEvents())); await AppendEvents(streamName); diff --git a/test/EventStore.Client.Streams.Tests/Security/SecurityFixture.cs b/test/EventStore.Client.Streams.Tests/Security/SecurityFixture.cs index a0e4e08bc..e9c6f7682 100644 --- a/test/EventStore.Client.Streams.Tests/Security/SecurityFixture.cs +++ b/test/EventStore.Client.Streams.Tests/Security/SecurityFixture.cs @@ -1,5 +1,4 @@ using System; -using System.Collections.Generic; using System.Linq; using System.Runtime.CompilerServices; using System.Threading.Tasks; @@ -136,7 +135,7 @@ public Task ReadStreamBackward(string streamId, UserCredentials userCredentials public Task AppendStream(string streamId, UserCredentials userCredentials = default) => Client.AppendToStreamAsync(streamId, StreamState.Any, CreateTestEvents(3), - userCredentials: userCredentials) + userCredentials: userCredentials) .WithTimeout(TimeSpan.FromMilliseconds(TimeoutMs)); public Task ReadAllForward(UserCredentials userCredentials = default) => @@ -160,38 +159,39 @@ public Task ReadMeta(string streamId, UserCredentials user public Task WriteMeta(string streamId, UserCredentials userCredentials = default, string role = default) => Client.SetStreamMetadataAsync(streamId, StreamState.Any, - new StreamMetadata(acl: new StreamAcl( - writeRole: role, - readRole: role, - metaWriteRole: role, - metaReadRole: role)), - userCredentials: userCredentials) + new StreamMetadata(acl: new StreamAcl( + writeRole: role, + readRole: role, + metaWriteRole: role, + metaReadRole: role)), + userCredentials: userCredentials) .WithTimeout(TimeSpan.FromMilliseconds(TimeoutMs)); public async Task SubscribeToStream(string streamId, UserCredentials userCredentials = default) { var source = new TaskCompletionSource(); - using (await Client.SubscribeToStreamAsync(streamId, (_, _, _) => { - source.TrySetResult(true); - return Task.CompletedTask; - }, - subscriptionDropped: (_, _, ex) => { - if (ex == null) source.TrySetResult(true); - else source.TrySetException(ex); - }, userCredentials: userCredentials).WithTimeout(TimeSpan.FromMilliseconds(TimeoutMs))) { + using (await Client.SubscribeToStreamAsync(streamId, SubscriptionStreamPosition.Start, (_, _, _) => { + source.TrySetResult(true); + return Task.CompletedTask; + }, + subscriptionDropped: (_, _, ex) => { + if (ex == null) source.TrySetResult(true); + else source.TrySetException(ex); + }, userCredentials: userCredentials).WithTimeout(TimeSpan.FromMilliseconds(TimeoutMs))) { await source.Task.WithTimeout(TimeSpan.FromMilliseconds(TimeoutMs)); } } public async Task SubscribeToAll(UserCredentials userCredentials = default) { var source = new TaskCompletionSource(); - using (await Client.SubscribeToAllAsync((_, _, _) => { - source.TrySetResult(true); - return Task.CompletedTask; - }, - subscriptionDropped: (_, _, ex) => { - if (ex == null) source.TrySetResult(true); - else source.TrySetException(ex); - }, userCredentials: userCredentials).WithTimeout(TimeSpan.FromMilliseconds(TimeoutMs))) { + using (await Client.SubscribeToAllAsync(SubscriptionPosition.Start, + (_, _, _) => { + source.TrySetResult(true); + return Task.CompletedTask; + }, false, (_, _, ex) => { + if (ex == null) source.TrySetResult(true); + else source.TrySetException(ex); + }, null, null, + userCredentials, default).WithTimeout(TimeSpan.FromMilliseconds(TimeoutMs))) { await source.Task.WithTimeout(TimeSpan.FromMilliseconds(TimeoutMs)); } } @@ -199,7 +199,7 @@ public async Task SubscribeToAll(UserCredentials userCredentials = default) { public async Task CreateStreamWithMeta(StreamMetadata metadata, [CallerMemberName] string streamId = "") { await Client.SetStreamMetadataAsync(streamId, StreamState.NoStream, - metadata, userCredentials: TestCredentials.TestAdmin) + metadata, userCredentials: TestCredentials.TestAdmin) .WithTimeout(TimeSpan.FromMilliseconds(TimeoutMs)); return streamId; } diff --git a/test/EventStore.Client.Streams.Tests/subscribe_resolve_link_to.cs b/test/EventStore.Client.Streams.Tests/subscribe_resolve_link_to.cs index 3eefb0083..cb125a2d2 100644 --- a/test/EventStore.Client.Streams.Tests/subscribe_resolve_link_to.cs +++ b/test/EventStore.Client.Streams.Tests/subscribe_resolve_link_to.cs @@ -35,8 +35,9 @@ await _fixture.Client.AppendToStreamAsync(stream, StreamState.NoStream, beforeEv .WithTimeout(); using var subscription = await _fixture.Client - .SubscribeToStreamAsync($"$et-{EventStoreClientFixtureBase.TestEventType}", EventAppeared, true, - SubscriptionDropped, userCredentials: TestCredentials.Root) + .SubscribeToStreamAsync($"$et-{EventStoreClientFixtureBase.TestEventType}", + SubscriptionStreamPosition.Start, EventAppeared, true, SubscriptionDropped, + userCredentials: TestCredentials.Root) .WithTimeout(); await _fixture.Client.AppendToStreamAsync(stream, StreamState.Any, afterEvents) @@ -88,12 +89,11 @@ public async Task all_subscription() { await _fixture.Client.AppendToStreamAsync(stream, StreamState.NoStream, beforeEvents) .WithTimeout(); - using var subscription = await _fixture.Client - .SubscribeToAllAsync(EventAppeared, true, SubscriptionDropped, userCredentials: TestCredentials.Root) + using var subscription = await _fixture.Client.SubscribeToAllAsync(SubscriptionPosition.Start, + EventAppeared, true, SubscriptionDropped, userCredentials: TestCredentials.Root) .WithTimeout(); - await _fixture.Client.AppendToStreamAsync(stream, StreamState.Any, afterEvents) - .WithTimeout(); + await _fixture.Client.AppendToStreamAsync(stream, StreamState.Any, afterEvents).WithTimeout(); await appeared.Task.WithTimeout(); @@ -108,6 +108,7 @@ Task EventAppeared(StreamSubscription s, ResolvedEvent e, CancellationToken ct) if (e.OriginalEvent.EventStreamId != $"$et-{EventStoreClientFixtureBase.TestEventType}") { return Task.CompletedTask; } + try { Assert.Equal(enumerator.Current.EventId, e.Event.EventId); if (!enumerator.MoveNext()) { @@ -144,8 +145,10 @@ public async Task all_filtered_subscription() { var result = await _fixture.Client.AppendToStreamAsync(stream, StreamState.NoStream, beforeEvents) .WithTimeout(); - using var subscription = await _fixture.Client.SubscribeToAllAsync(EventAppeared, true, SubscriptionDropped, - new SubscriptionFilterOptions(StreamFilter.Prefix($"$et-{EventStoreClientFixtureBase.TestEventType}")), + using var subscription = await _fixture.Client.SubscribeToAllAsync(SubscriptionPosition.Start, + EventAppeared, true, SubscriptionDropped, + new SubscriptionFilterOptions( + StreamFilter.Prefix($"$et-{EventStoreClientFixtureBase.TestEventType}")), userCredentials: TestCredentials.Root) .WithTimeout(); @@ -165,6 +168,7 @@ Task EventAppeared(StreamSubscription s, ResolvedEvent e, CancellationToken ct) if (e.OriginalEvent.EventStreamId != $"$et-{EventStoreClientFixtureBase.TestEventType}") { return Task.CompletedTask; } + try { Assert.Equal(enumerator.Current.EventId, e.Event.EventId); if (!enumerator.MoveNext()) { @@ -187,8 +191,8 @@ public class Fixture : EventStoreClientFixture { ["EVENTSTORE_RUN_PROJECTIONS"] = "All", ["EVENTSTORE_START_STANDARD_PROJECTIONS"] = "True" }) { - } + protected override Task Given() => Task.CompletedTask; protected override Task When() => Task.CompletedTask; } diff --git a/test/EventStore.Client.Streams.Tests/subscribe_to_all.cs b/test/EventStore.Client.Streams.Tests/subscribe_to_all.cs index acc98a746..389b1fa63 100644 --- a/test/EventStore.Client.Streams.Tests/subscribe_to_all.cs +++ b/test/EventStore.Client.Streams.Tests/subscribe_to_all.cs @@ -23,8 +23,8 @@ public subscribe_to_all(ITestOutputHelper outputHelper) { public async Task calls_subscription_dropped_when_disposed() { var dropped = new TaskCompletionSource<(SubscriptionDroppedReason, Exception)>(); - using var subscription = await _fixture.Client - .SubscribeToAllAsync(EventAppeared, false, SubscriptionDropped) + using var subscription = await _fixture.Client.SubscribeToAllAsync(SubscriptionPosition.Start, + EventAppeared, false, SubscriptionDropped) .WithTimeout(); if (dropped.Task.IsCompleted) { @@ -50,8 +50,8 @@ public async Task calls_subscription_dropped_when_error_processing_event() { var dropped = new TaskCompletionSource<(SubscriptionDroppedReason, Exception)>(); var expectedException = new Exception("Error"); - using var subscription = await _fixture.Client - .SubscribeToAllAsync(EventAppeared, false, SubscriptionDropped) + using var subscription = await _fixture.Client.SubscribeToAllAsync(SubscriptionPosition.Start, + EventAppeared, false, SubscriptionDropped) .WithTimeout(); await _fixture.Client.AppendToStreamAsync(stream, StreamState.NoStream, _fixture.CreateTestEvents()); @@ -73,8 +73,8 @@ public async Task subscribe_to_empty_database() { var appeared = new TaskCompletionSource(); var dropped = new TaskCompletionSource<(SubscriptionDroppedReason, Exception)>(); - using var subscription = await _fixture.Client - .SubscribeToAllAsync(EventAppeared, false, SubscriptionDropped) + using var subscription = await _fixture.Client.SubscribeToAllAsync(SubscriptionPosition.Start, + EventAppeared, false, SubscriptionDropped) .WithTimeout(); Assert.False(appeared.Task.IsCompleted); @@ -115,8 +115,8 @@ await _fixture.Client.AppendToStreamAsync($"stream-{@event.EventId:n}", StreamSt new[] {@event}); } - using var subscription = await _fixture.Client - .SubscribeToAllAsync(EventAppeared, false, SubscriptionDropped) + using var subscription = await _fixture.Client.SubscribeToAllAsync(SubscriptionPosition.Start, + EventAppeared, false, SubscriptionDropped) .WithTimeout(); foreach (var @event in afterEvents) { diff --git a/test/EventStore.Client.Streams.Tests/subscribe_to_all_filtered.cs b/test/EventStore.Client.Streams.Tests/subscribe_to_all_filtered.cs index 6b977ea7b..f22dd2686 100644 --- a/test/EventStore.Client.Streams.Tests/subscribe_to_all_filtered.cs +++ b/test/EventStore.Client.Streams.Tests/subscribe_to_all_filtered.cs @@ -40,9 +40,9 @@ await _fixture.Client.AppendToStreamAsync($"{streamPrefix}_{Guid.NewGuid():n}", StreamState.NoStream, new[] {e}); } - using var subscription = await _fixture.Client.SubscribeToAllAsync(EventAppeared, false, - filterOptions: new SubscriptionFilterOptions(filter, 5, CheckpointReached), - subscriptionDropped: SubscriptionDropped) + using var subscription = await _fixture.Client.SubscribeToAllAsync(SubscriptionPosition.Start, + EventAppeared, false, SubscriptionDropped, + new SubscriptionFilterOptions(filter, 5, CheckpointReached)) .WithTimeout(); await Task.WhenAll(appeared.Task, checkpointSeen.Task).WithTimeout(); @@ -111,9 +111,9 @@ await _fixture.Client.AppendToStreamAsync($"{streamPrefix}_{Guid.NewGuid():n}", StreamState.NoStream, new[] {e}); } - using var subscription = await _fixture.Client.SubscribeToAllAsync(EventAppeared, false, - filterOptions: new SubscriptionFilterOptions(filter, 5, CheckpointReached), - subscriptionDropped: SubscriptionDropped) + using var subscription = await _fixture.Client.SubscribeToAllAsync(SubscriptionPosition.Start, + EventAppeared, false, SubscriptionDropped, + new SubscriptionFilterOptions(filter, 5, CheckpointReached)) .WithTimeout(); foreach (var e in afterEvents) { diff --git a/test/EventStore.Client.Streams.Tests/subscribe_to_all_filtered_live.cs b/test/EventStore.Client.Streams.Tests/subscribe_to_all_filtered_live.cs index 7a3722887..981d4bb10 100644 --- a/test/EventStore.Client.Streams.Tests/subscribe_to_all_filtered_live.cs +++ b/test/EventStore.Client.Streams.Tests/subscribe_to_all_filtered_live.cs @@ -3,8 +3,6 @@ using System.Linq; using System.Threading; using System.Threading.Tasks; -using EventStore.Client.Streams; -using Serilog; using Xunit; using Xunit.Abstractions; @@ -40,9 +38,8 @@ await _fixture.Client.AppendToStreamAsync($"{streamPrefix}_{Guid.NewGuid():n}", StreamState.NoStream, new[] {e}); } - using var subscription = await _fixture.Client.SubscribeToAllAsync(Position.End, EventAppeared, false, - filterOptions: new SubscriptionFilterOptions(filter), - subscriptionDropped: SubscriptionDropped) + using var subscription = await _fixture.Client.SubscribeToAllAsync(SubscriptionPosition.End, EventAppeared, + false, SubscriptionDropped, new SubscriptionFilterOptions(filter)) .WithTimeout(); foreach (var e in afterEvents) { diff --git a/test/EventStore.Client.Streams.Tests/subscribe_to_all_filtered_with_position.cs b/test/EventStore.Client.Streams.Tests/subscribe_to_all_filtered_with_position.cs index 7187744b6..5c4eb3e4d 100644 --- a/test/EventStore.Client.Streams.Tests/subscribe_to_all_filtered_with_position.cs +++ b/test/EventStore.Client.Streams.Tests/subscribe_to_all_filtered_with_position.cs @@ -3,7 +3,6 @@ using System.Linq; using System.Threading; using System.Threading.Tasks; -using EventStore.Client.Streams; using Xunit; using Xunit.Abstractions; @@ -46,9 +45,9 @@ await _fixture.Client.AppendToStreamAsync($"{streamPrefix}_{Guid.NewGuid():n}", await _fixture.Client.AppendToStreamAsync(Guid.NewGuid().ToString(), StreamState.NoStream, _fixture.CreateTestEvents(256)); - using var subscription = await _fixture.Client.SubscribeToAllAsync(writeResult.LogPosition, EventAppeared, - false, filterOptions: new SubscriptionFilterOptions(filter, 4, CheckpointReached), - subscriptionDropped: SubscriptionDropped) + using var subscription = await _fixture.Client.SubscribeToAllAsync( + SubscriptionPosition.After(writeResult.LogPosition), + EventAppeared, false, SubscriptionDropped, new SubscriptionFilterOptions(filter, 4, CheckpointReached)) .WithTimeout(); foreach (var e in afterEvents) { diff --git a/test/EventStore.Client.Streams.Tests/subscribe_to_all_live.cs b/test/EventStore.Client.Streams.Tests/subscribe_to_all_live.cs index 51d5305e3..33324c5e9 100644 --- a/test/EventStore.Client.Streams.Tests/subscribe_to_all_live.cs +++ b/test/EventStore.Client.Streams.Tests/subscribe_to_all_live.cs @@ -24,7 +24,7 @@ public async Task calls_subscription_dropped_when_disposed() { var dropped = new TaskCompletionSource<(SubscriptionDroppedReason, Exception)>(); using var subscription = await _fixture.Client - .SubscribeToAllAsync(Position.End, EventAppeared, false, SubscriptionDropped) + .SubscribeToAllAsync(SubscriptionPosition.End, EventAppeared, false, SubscriptionDropped) .WithTimeout(); if (dropped.Task.IsCompleted) { @@ -51,7 +51,7 @@ public async Task calls_subscription_dropped_when_error_processing_event() { var expectedException = new Exception("Error"); using var subscription = await _fixture.Client - .SubscribeToAllAsync(Position.End, EventAppeared, false, SubscriptionDropped) + .SubscribeToAllAsync(SubscriptionPosition.End, EventAppeared, false, SubscriptionDropped) .WithTimeout(); await _fixture.Client.AppendToStreamAsync(stream, StreamState.NoStream, _fixture.CreateTestEvents()); @@ -74,7 +74,7 @@ public async Task subscribe_to_empty_database() { var dropped = new TaskCompletionSource<(SubscriptionDroppedReason, Exception)>(); using var subscription = await _fixture.Client - .SubscribeToAllAsync(Position.End, EventAppeared, false, SubscriptionDropped) + .SubscribeToAllAsync(SubscriptionPosition.End, EventAppeared, false, SubscriptionDropped) .WithTimeout(); Assert.False(appeared.Task.IsCompleted); @@ -110,7 +110,7 @@ public async Task does_not_read_existing_events_but_keep_listening_to_new_ones() var afterEvents = _fixture.CreateTestEvents(10).ToArray(); using var subscription = await _fixture.Client - .SubscribeToAllAsync(Position.End, EventAppeared, false, SubscriptionDropped) + .SubscribeToAllAsync(SubscriptionPosition.End, EventAppeared, false, SubscriptionDropped) .WithTimeout(); foreach (var @event in afterEvents) { diff --git a/test/EventStore.Client.Streams.Tests/subscribe_to_all_with_position.cs b/test/EventStore.Client.Streams.Tests/subscribe_to_all_with_position.cs index 9aa3b163c..ac703a09a 100644 --- a/test/EventStore.Client.Streams.Tests/subscribe_to_all_with_position.cs +++ b/test/EventStore.Client.Streams.Tests/subscribe_to_all_with_position.cs @@ -26,7 +26,8 @@ public async Task calls_subscription_dropped_when_disposed() { .FirstOrDefaultAsync(); using var subscription = await _fixture.Client - .SubscribeToAllAsync(firstEvent.OriginalEvent.Position, EventAppeared, false, SubscriptionDropped) + .SubscribeToAllAsync(SubscriptionPosition.After(firstEvent.OriginalEvent.Position), EventAppeared, + false, SubscriptionDropped) .WithTimeout(); if (dropped.Task.IsCompleted) { @@ -56,7 +57,8 @@ public async Task calls_subscription_dropped_when_error_processing_event() { .FirstOrDefaultAsync(); using var subscription = await _fixture.Client - .SubscribeToAllAsync(firstEvent.OriginalEvent.Position, EventAppeared, false, SubscriptionDropped) + .SubscribeToAllAsync(SubscriptionPosition.After(firstEvent.OriginalEvent.Position), EventAppeared, + false, SubscriptionDropped) .WithTimeout(); await _fixture.Client.AppendToStreamAsync(stream, StreamState.NoStream, _fixture.CreateTestEvents(2)); @@ -82,7 +84,8 @@ public async Task subscribe_to_empty_database() { .FirstOrDefaultAsync(); using var subscription = await _fixture.Client - .SubscribeToAllAsync(firstEvent.OriginalEvent.Position, EventAppeared, false, SubscriptionDropped) + .SubscribeToAllAsync(SubscriptionPosition.After(firstEvent.OriginalEvent.Position), EventAppeared, + false, SubscriptionDropped) .WithTimeout(); Assert.False(appeared.Task.IsCompleted); @@ -138,8 +141,8 @@ await _fixture.Client.AppendToStreamAsync($"stream-{@event.EventId:n}", StreamSt new[] {@event}); } - using var subscription = await _fixture.Client - .SubscribeToAllAsync(position, EventAppeared, false, SubscriptionDropped) + using var subscription = await _fixture.Client.SubscribeToAllAsync(SubscriptionPosition.After(position), + EventAppeared, false, SubscriptionDropped) .WithTimeout(); foreach (var @event in afterEvents) { diff --git a/test/EventStore.Client.Streams.Tests/subscribe_to_stream.cs b/test/EventStore.Client.Streams.Tests/subscribe_to_stream.cs index 939796d64..abc12a412 100644 --- a/test/EventStore.Client.Streams.Tests/subscribe_to_stream.cs +++ b/test/EventStore.Client.Streams.Tests/subscribe_to_stream.cs @@ -22,7 +22,8 @@ public async Task subscribe_to_non_existing_stream() { var dropped = new TaskCompletionSource<(SubscriptionDroppedReason, Exception)>(); using var subscription = await _fixture.Client - .SubscribeToStreamAsync(stream, EventAppeared, false, SubscriptionDropped) + .SubscribeToStreamAsync(stream, SubscriptionStreamPosition.Start, EventAppeared, false, + SubscriptionDropped) .WithTimeout(); Assert.False(appeared.Task.IsCompleted); @@ -53,7 +54,8 @@ public async Task subscribe_to_non_existing_stream_then_get_event() { var dropped = new TaskCompletionSource<(SubscriptionDroppedReason, Exception)>(); using var subscription = await _fixture.Client - .SubscribeToStreamAsync(stream, EventAppeared, false, SubscriptionDropped) + .SubscribeToStreamAsync(stream, SubscriptionStreamPosition.Start, EventAppeared, false, + SubscriptionDropped) .WithTimeout(); await _fixture.Client.AppendToStreamAsync(stream, StreamState.NoStream, _fixture.CreateTestEvents()); @@ -88,8 +90,10 @@ public async Task allow_multiple_subscriptions_to_same_stream() { int appearedCount = 0; await _fixture.Client.AppendToStreamAsync(stream, StreamState.NoStream, _fixture.CreateTestEvents()); - using var s1 = await _fixture.Client.SubscribeToStreamAsync(stream, EventAppeared).WithTimeout(); - using var s2 = await _fixture.Client.SubscribeToStreamAsync(stream, EventAppeared).WithTimeout(); + using var s1 = await _fixture.Client + .SubscribeToStreamAsync(stream, SubscriptionStreamPosition.Start, EventAppeared).WithTimeout(); + using var s2 = await _fixture.Client + .SubscribeToStreamAsync(stream, SubscriptionStreamPosition.Start, EventAppeared).WithTimeout(); Assert.True(await appeared.Task.WithTimeout()); @@ -108,7 +112,8 @@ public async Task calls_subscription_dropped_when_disposed() { var dropped = new TaskCompletionSource<(SubscriptionDroppedReason, Exception)>(); using var subscription = await _fixture.Client - .SubscribeToStreamAsync(stream, EventAppeared, false, SubscriptionDropped) + .SubscribeToStreamAsync(stream, SubscriptionStreamPosition.Start, EventAppeared, false, + SubscriptionDropped) .WithTimeout(); if (dropped.Task.IsCompleted) { @@ -135,7 +140,8 @@ public async Task calls_subscription_dropped_when_error_processing_event() { var expectedException = new Exception("Error"); using var subscription = await _fixture.Client - .SubscribeToStreamAsync(stream, EventAppeared, false, SubscriptionDropped) + .SubscribeToStreamAsync(stream, SubscriptionStreamPosition.Start, EventAppeared, false, + SubscriptionDropped) .WithTimeout(); if (dropped.Task.IsCompleted) { @@ -176,7 +182,8 @@ await _fixture.Client.AppendToStreamAsync(stream, StreamState.NoStream, beforeEv .WithTimeout(); using var subscription = await _fixture.Client - .SubscribeToStreamAsync(stream, EventAppeared, false, SubscriptionDropped) + .SubscribeToStreamAsync(stream, SubscriptionStreamPosition.Start, EventAppeared, false, + SubscriptionDropped) .WithTimeout(); await _fixture.Client.AppendToStreamAsync(stream, StreamState.Any, afterEvents) @@ -216,7 +223,8 @@ public async Task catches_deletions() { var dropped = new TaskCompletionSource<(SubscriptionDroppedReason, Exception)>(); using var _ = await _fixture.Client - .SubscribeToStreamAsync(stream, EventAppeared, false, SubscriptionDropped) + .SubscribeToStreamAsync(stream, SubscriptionStreamPosition.Start, EventAppeared, false, + SubscriptionDropped) .WithTimeout(); await _fixture.Client.TombstoneAsync(stream, StreamState.NoStream); diff --git a/test/EventStore.Client.Streams.Tests/subscribe_to_stream_live.cs b/test/EventStore.Client.Streams.Tests/subscribe_to_stream_live.cs index 6deeedd27..4f1e8a9af 100644 --- a/test/EventStore.Client.Streams.Tests/subscribe_to_stream_live.cs +++ b/test/EventStore.Client.Streams.Tests/subscribe_to_stream_live.cs @@ -24,7 +24,7 @@ await _fixture.Client.AppendToStreamAsync(stream, StreamState.NoStream, _fixture.CreateTestEvents()); using var _ = await _fixture.Client - .SubscribeToStreamAsync(stream, StreamPosition.End, (s, e, ct) => { + .SubscribeToStreamAsync(stream, SubscriptionStreamPosition.End, (_, e, _) => { appeared.TrySetResult(e.OriginalEventNumber); return Task.CompletedTask; }, false, (s, reason, ex) => dropped.TrySetResult(true)) @@ -43,7 +43,7 @@ public async Task subscribe_to_non_existing_stream_and_then_catch_new_event() { var dropped = new TaskCompletionSource(); using var _ = await _fixture.Client - .SubscribeToStreamAsync(stream, StreamPosition.End, (s, e, ct) => { + .SubscribeToStreamAsync(stream, SubscriptionStreamPosition.End, (_, _, _) => { appeared.TrySetResult(true); return Task.CompletedTask; }, false, (s, reason, ex) => dropped.TrySetResult(true)) @@ -62,9 +62,11 @@ public async Task allow_multiple_subscriptions_to_same_stream() { int appearedCount = 0; - using var s1 = await _fixture.Client.SubscribeToStreamAsync(stream, StreamPosition.End, EventAppeared) + using var s1 = await _fixture.Client + .SubscribeToStreamAsync(stream, SubscriptionStreamPosition.End, EventAppeared) .WithTimeout(); - using var s2 = await _fixture.Client.SubscribeToStreamAsync(stream, StreamPosition.End, EventAppeared) + using var s2 = await _fixture.Client + .SubscribeToStreamAsync(stream, SubscriptionStreamPosition.End, EventAppeared) .WithTimeout(); await _fixture.Client.AppendToStreamAsync(stream, StreamState.NoStream, _fixture.CreateTestEvents()); @@ -86,7 +88,8 @@ public async Task calls_subscription_dropped_when_disposed() { var dropped = new TaskCompletionSource<(SubscriptionDroppedReason, Exception)>(); using var _ = await _fixture.Client - .SubscribeToStreamAsync(stream, StreamPosition.End, EventAppeared, false, SubscriptionDropped) + .SubscribeToStreamAsync(stream, SubscriptionStreamPosition.End, EventAppeared, false, + SubscriptionDropped) .WithTimeout(); await _fixture.Client.AppendToStreamAsync(stream, StreamState.NoStream, _fixture.CreateTestEvents()); @@ -111,7 +114,8 @@ public async Task catches_deletions() { var dropped = new TaskCompletionSource<(SubscriptionDroppedReason, Exception)>(); using var _ = await _fixture.Client - .SubscribeToStreamAsync(stream, StreamPosition.End, EventAppeared, false, SubscriptionDropped) + .SubscribeToStreamAsync(stream, SubscriptionStreamPosition.End, EventAppeared, false, + SubscriptionDropped) .WithTimeout(); await _fixture.Client.TombstoneAsync(stream, StreamState.NoStream); diff --git a/test/EventStore.Client.Streams.Tests/subscribe_to_stream_with_revision.cs b/test/EventStore.Client.Streams.Tests/subscribe_to_stream_with_revision.cs index 10beeb02c..d5308cac8 100644 --- a/test/EventStore.Client.Streams.Tests/subscribe_to_stream_with_revision.cs +++ b/test/EventStore.Client.Streams.Tests/subscribe_to_stream_with_revision.cs @@ -9,6 +9,7 @@ namespace EventStore.Client { [Trait("Category", "LongRunning")] public class subscribe_to_stream_with_revision : IAsyncLifetime { private readonly Fixture _fixture; + public subscribe_to_stream_with_revision(ITestOutputHelper outputHelper) { _fixture = new Fixture(); _fixture.CaptureLogs(outputHelper); @@ -21,7 +22,8 @@ public async Task subscribe_to_non_existing_stream() { var dropped = new TaskCompletionSource<(SubscriptionDroppedReason, Exception)>(); using var subscription = await _fixture.Client - .SubscribeToStreamAsync(stream, StreamPosition.Start, EventAppeared, false, SubscriptionDropped) + .SubscribeToStreamAsync(stream, SubscriptionStreamPosition.Start, EventAppeared, false, + SubscriptionDropped) .WithTimeout(); Assert.False(appeared.Task.IsCompleted); @@ -52,7 +54,8 @@ public async Task subscribe_to_non_existing_stream_then_get_event() { var dropped = new TaskCompletionSource<(SubscriptionDroppedReason, Exception)>(); using var subscription = await _fixture.Client - .SubscribeToStreamAsync(stream, StreamPosition.Start, EventAppeared, false, SubscriptionDropped) + .SubscribeToStreamAsync(stream, SubscriptionStreamPosition.After(StreamPosition.Start), EventAppeared, + false, SubscriptionDropped) .WithTimeout(); await _fixture.Client.AppendToStreamAsync(stream, StreamState.NoStream, @@ -93,10 +96,10 @@ public async Task allow_multiple_subscriptions_to_same_stream() { int appearedCount = 0; using var s1 = await _fixture.Client - .SubscribeToStreamAsync(stream, StreamPosition.Start, EventAppeared) + .SubscribeToStreamAsync(stream, SubscriptionStreamPosition.After(StreamPosition.Start), EventAppeared) .WithTimeout(); using var s2 = await _fixture.Client - .SubscribeToStreamAsync(stream, StreamPosition.Start, EventAppeared) + .SubscribeToStreamAsync(stream, SubscriptionStreamPosition.After(StreamPosition.Start), EventAppeared) .WithTimeout(); await _fixture.Client.AppendToStreamAsync(stream, StreamState.NoStream, _fixture.CreateTestEvents(2)); @@ -123,7 +126,8 @@ public async Task calls_subscription_dropped_when_disposed() { var dropped = new TaskCompletionSource<(SubscriptionDroppedReason, Exception)>(); using var subscription = await _fixture.Client - .SubscribeToStreamAsync(stream, StreamPosition.Start, EventAppeared, false, SubscriptionDropped) + .SubscribeToStreamAsync(stream, SubscriptionStreamPosition.Start, EventAppeared, false, + SubscriptionDropped) .WithTimeout(); if (dropped.Task.IsCompleted) { @@ -151,8 +155,9 @@ public async Task calls_subscription_dropped_when_error_processing_event() { await _fixture.Client.AppendToStreamAsync(stream, StreamState.NoStream, _fixture.CreateTestEvents(2)); - using var subscription = await _fixture.Client.SubscribeToStreamAsync(stream, StreamPosition.Start, - EventAppeared, false, SubscriptionDropped) + using var subscription = await _fixture.Client.SubscribeToStreamAsync(stream, + SubscriptionStreamPosition.Start, + EventAppeared, false, SubscriptionDropped) .WithTimeout(); var (reason, ex) = await dropped.Task.WithTimeout(); @@ -188,7 +193,8 @@ public async Task reads_all_existing_events_and_keep_listening_to_new_ones() { await _fixture.Client.AppendToStreamAsync(stream, StreamState.Any, beforeEvents); using var subscription = await _fixture.Client - .SubscribeToStreamAsync(stream, StreamPosition.Start, EventAppeared, false, SubscriptionDropped) + .SubscribeToStreamAsync(stream, SubscriptionStreamPosition.After(StreamPosition.Start), EventAppeared, + false, SubscriptionDropped) .WithTimeout(); var writeResult = await _fixture.Client.AppendToStreamAsync(stream, StreamState.Any, afterEvents); diff --git a/test/EventStore.Client.Tests/SubscriptionPositionTests.cs b/test/EventStore.Client.Tests/SubscriptionPositionTests.cs new file mode 100644 index 000000000..71980a897 --- /dev/null +++ b/test/EventStore.Client.Tests/SubscriptionPositionTests.cs @@ -0,0 +1,61 @@ +using System; +using System.Collections.Generic; +using AutoFixture; +using Xunit; + +namespace EventStore.Client { + public class SubscriptionPositionTests : ValueObjectTests { + public SubscriptionPositionTests() : base(new ScenarioFixture()) { + } + + [Fact] + public void IsComparable() => + Assert.IsAssignableFrom>(_fixture.Create()); + + [Theory, AutoScenarioData(typeof(ScenarioFixture))] + public void StartIsLessThanAll(SubscriptionPosition other) => Assert.True(SubscriptionPosition.Start < other); + + [Theory, AutoScenarioData(typeof(ScenarioFixture))] + public void LiveIsGreaterThanAll(SubscriptionPosition other) => Assert.True(SubscriptionPosition.End > other); + + public static IEnumerable ToStringCases() { + var fixture = new ScenarioFixture(); + var position = fixture.Create(); + yield return new object[] {SubscriptionPosition.After(position), position.ToString()}; + yield return new object[] {SubscriptionPosition.Start, "Start"}; + yield return new object[] {SubscriptionPosition.End, "Live"}; + } + + [Theory, MemberData(nameof(ToStringCases))] + public void ToStringReturnsExpectedResult(SubscriptionPosition sut, string expected) => + Assert.Equal(expected, sut.ToString()); + + [Fact] + public void AfterLiveThrows() => + Assert.Throws(() => SubscriptionPosition.After(Position.End)); + + public static IEnumerable TryGetUInt64Cases() { + var fixture = new ScenarioFixture(); + + var streamPosition = fixture.Create(); + yield return new object[] { + SubscriptionPosition.After(streamPosition), true, + (streamPosition.CommitPosition, streamPosition.PreparePosition) + }; + yield return new object[] + {SubscriptionPosition.Start, false, default((ulong, ulong))}; + } + [Theory, MemberData(nameof(TryGetUInt64Cases))] + public void TryGetUInt64ReturnsExpectedResult(SubscriptionPosition sut, bool success, (ulong,ulong) expected) { + Assert.Equal(sut.TryGetUInt64(out var actual), success); + Assert.Equal(expected, actual); + } + private class ScenarioFixture : Fixture { + public ScenarioFixture() { + Customize(composer => composer.FromFactory(value => new Position(value, value))); + Customize(composter => + composter.FromFactory(SubscriptionPosition.After)); + } + } + } +} diff --git a/test/EventStore.Client.Tests/SubscriptionStreamPositionTests.cs b/test/EventStore.Client.Tests/SubscriptionStreamPositionTests.cs new file mode 100644 index 000000000..094046cd5 --- /dev/null +++ b/test/EventStore.Client.Tests/SubscriptionStreamPositionTests.cs @@ -0,0 +1,64 @@ +using System; +using System.Collections; +using System.Collections.Generic; +using AutoFixture; +using Xunit; + +namespace EventStore.Client { + public class SubscriptionStreamPositionTests : ValueObjectTests { + public SubscriptionStreamPositionTests() : base(new ScenarioFixture()) { + } + + [Fact] + public void IsComparable() => + Assert.IsAssignableFrom>( + _fixture.Create()); + + [Theory, AutoScenarioData(typeof(ScenarioFixture))] + public void StartIsLessThanAll(SubscriptionStreamPosition other) => + Assert.True(SubscriptionStreamPosition.Start < other); + + [Theory, AutoScenarioData(typeof(ScenarioFixture))] + public void LiveIsGreaterThanAll(SubscriptionStreamPosition other) => + Assert.True(SubscriptionStreamPosition.End > other); + + public static IEnumerable ToStringCases() { + var fixture = new ScenarioFixture(); + var position = fixture.Create(); + yield return new object[] {SubscriptionStreamPosition.After(position), position.ToString()}; + yield return new object[] {SubscriptionStreamPosition.Start, "Start"}; + yield return new object[] {SubscriptionStreamPosition.End, "Live"}; + } + + [Theory, MemberData(nameof(ToStringCases))] + public void ToStringReturnsExpectedResult(SubscriptionStreamPosition sut, string expected) => + Assert.Equal(expected, sut.ToString()); + + [Fact] + public void AfterLiveThrows() => + Assert.Throws(() => SubscriptionStreamPosition.After(StreamPosition.End)); + + public static IEnumerable TryGetUInt64Cases() { + var fixture = new ScenarioFixture(); + + var streamPosition = fixture.Create(); + yield return new object[] + {SubscriptionStreamPosition.After(streamPosition), true, streamPosition.ToUInt64()}; + yield return new object[] + {SubscriptionStreamPosition.Start, false, default(ulong)}; + } + [Theory, MemberData(nameof(TryGetUInt64Cases))] + public void TryGetUInt64ReturnsExpectedResult(SubscriptionStreamPosition sut, bool success, ulong expected) { + Assert.Equal(sut.TryGetUInt64(out var actual), success); + Assert.Equal(expected, actual); + } + + private class ScenarioFixture : Fixture { + public ScenarioFixture() { + Customize(composer => composer.FromFactory(value => new StreamPosition(value))); + Customize(composter => + composter.FromFactory(SubscriptionStreamPosition.After)); + } + } + } +}