Skip to content

Commit

Permalink
implement subscription position
Browse files Browse the repository at this point in the history
  • Loading branch information
thefringeninja committed Feb 3, 2022
1 parent ffa33b4 commit 93ff2dd
Show file tree
Hide file tree
Showing 24 changed files with 570 additions and 309 deletions.
25 changes: 17 additions & 8 deletions samples/subscribing-to-streams/Program.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Net.Http;
using System.Reflection.Metadata;
using System.Threading;
using System.Threading.Tasks;
using EventStore.Client;
Expand All @@ -20,6 +21,7 @@ static async Task Main(string[] args) {
private static async Task SubscribeToStream(EventStoreClient client) {
#region subscribe-to-stream
await client.SubscribeToStreamAsync("some-stream",
SubscriptionStreamPosition.Start,
async (subscription, evnt, cancellationToken) => {
Console.WriteLine($"Received event {evnt.OriginalEventNumber}@{evnt.OriginalStreamId}");
await HandleEvent(evnt);
Expand All @@ -29,33 +31,34 @@ await client.SubscribeToStreamAsync("some-stream",
#region subscribe-to-stream-from-position
await client.SubscribeToStreamAsync(
"some-stream",
StreamPosition.FromInt64(20),
SubscriptionStreamPosition.After(StreamPosition.FromInt64(20)),
EventAppeared);
#endregion subscribe-to-stream-from-position

#region subscribe-to-stream-live
await client.SubscribeToStreamAsync(
"some-stream",
StreamPosition.End,
SubscriptionStreamPosition.End,
EventAppeared);
#endregion subscribe-to-stream-live

#region subscribe-to-stream-resolving-linktos
await client.SubscribeToStreamAsync(
"$et-myEventType",
StreamPosition.Start,
SubscriptionStreamPosition.Start,
EventAppeared,
resolveLinkTos: true);
#endregion subscribe-to-stream-resolving-linktos

#region subscribe-to-stream-subscription-dropped
var checkpoint = StreamPosition.Start;

var checkpoint = await ReadStreamCheckpointAsync();
await client.SubscribeToStreamAsync(
"some-stream",
checkpoint,
eventAppeared: async (subscription, evnt, cancellationToken) => {
await HandleEvent(evnt);
checkpoint = evnt.OriginalEventNumber;
checkpoint = SubscriptionStreamPosition.After(evnt.OriginalEventNumber);
},
subscriptionDropped: ((subscription, reason, exception) => {
Console.WriteLine($"Subscription was dropped due to {reason}. {exception}");
Expand Down Expand Up @@ -90,12 +93,12 @@ await client.SubscribeToAllAsync(

#region subscribe-to-all-live
await client.SubscribeToAllAsync(
SubscriptionPosition.Live,
SubscriptionPosition.End,
EventAppeared);
#endregion subscribe-to-all-live

#region subscribe-to-all-subscription-dropped
var checkpoint = SubscriptionPosition.Start;
var checkpoint = await ReadCheckpointAsync();
await client.SubscribeToAllAsync(
checkpoint,
eventAppeared: async (subscription, evnt, cancellationToken) => {
Expand Down Expand Up @@ -147,7 +150,13 @@ private static Task HandleEvent(ResolvedEvent evnt) {
return Task.CompletedTask;
}

private static void Resubscribe(StreamPosition checkpoint) { }
private static void Resubscribe(SubscriptionStreamPosition checkpoint) { }
private static void Resubscribe(SubscriptionPosition checkpoint) { }

private static Task<SubscriptionStreamPosition> ReadStreamCheckpointAsync() =>
Task.FromResult(SubscriptionStreamPosition.Start);

private static Task<SubscriptionPosition> ReadCheckpointAsync() =>
Task.FromResult(SubscriptionPosition.Start);
}
}
151 changes: 10 additions & 141 deletions src/EventStore.Client.Streams/EventStoreClient.Subscriptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,33 +6,10 @@
#nullable enable
namespace EventStore.Client {
public partial class EventStoreClient {
private Task<StreamSubscription> SubscribeToAllAsync(
Func<StreamSubscription, ResolvedEvent, CancellationToken, Task> eventAppeared,
EventStoreClientOperationOptions operationOptions,
bool resolveLinkTos = false,
Action<StreamSubscription, SubscriptionDroppedReason, Exception?>? subscriptionDropped = default,
SubscriptionFilterOptions? filterOptions = null,
UserCredentials? userCredentials = null,
CancellationToken cancellationToken = default) {
operationOptions.TimeoutAfter = DeadLine.None;

return StreamSubscription.Confirm(ReadInternal(new ReadReq {
Options = new ReadReq.Types.Options {
ReadDirection = ReadReq.Types.Options.Types.ReadDirection.Forwards,
ResolveLinks = resolveLinkTos,
All = new ReadReq.Types.Options.Types.AllOptions {
Start = new Empty()
},
Subscription = new ReadReq.Types.Options.Types.SubscriptionOptions(),
Filter = GetFilterOptions(filterOptions)!
}
}, operationOptions, userCredentials, cancellationToken), eventAppeared, subscriptionDropped, _log,
filterOptions?.CheckpointReached, cancellationToken);
}

/// <summary>
/// Subscribes to all events. Use this when you have no <see cref="Position">checkpoint</see>.
/// Subscribes to all events.
/// </summary>
/// <param name="start">A <see cref="SubscriptionPosition"/> (exclusive of) to start the subscription from.</param>
/// <param name="eventAppeared">A Task invoked and awaited when a new event is received over the subscription.</param>
/// <param name="configureOperationOptions">An <see cref="Action{EventStoreClientOperationOptions}"/> to configure the operation's options.</param>
/// <param name="resolveLinkTos">Whether to resolve LinkTo events automatically.</param>
Expand All @@ -42,6 +19,7 @@ private Task<StreamSubscription> SubscribeToAllAsync(
/// <param name="cancellationToken">The optional <see cref="System.Threading.CancellationToken"/>.</param>
/// <returns></returns>
public Task<StreamSubscription> SubscribeToAllAsync(
SubscriptionPosition start,
Func<StreamSubscription, ResolvedEvent, CancellationToken, Task> eventAppeared,
bool resolveLinkTos = false,
Action<StreamSubscription, SubscriptionDroppedReason, Exception?>? subscriptionDropped = default,
Expand All @@ -52,25 +30,13 @@ public Task<StreamSubscription> SubscribeToAllAsync(
var operationOptions = Settings.OperationOptions.Clone();
configureOperationOptions?.Invoke(operationOptions);

return SubscribeToAllAsync(eventAppeared, operationOptions, resolveLinkTos, subscriptionDropped,
filterOptions, userCredentials, cancellationToken);
}

private Task<StreamSubscription> SubscribeToAllAsync(Position start,
Func<StreamSubscription, ResolvedEvent, CancellationToken, Task> eventAppeared,
EventStoreClientOperationOptions operationOptions,
bool resolveLinkTos = false,
Action<StreamSubscription, SubscriptionDroppedReason, Exception?>? subscriptionDropped = default,
SubscriptionFilterOptions? filterOptions = null,
UserCredentials? userCredentials = null,
CancellationToken cancellationToken = default) {
operationOptions.TimeoutAfter = DeadLine.None;

return StreamSubscription.Confirm(ReadInternal(new ReadReq {
Options = new ReadReq.Types.Options {
ReadDirection = ReadReq.Types.Options.Types.ReadDirection.Forwards,
ResolveLinks = resolveLinkTos,
All = ReadReq.Types.Options.Types.AllOptions.FromPosition(start),
All = ReadReq.Types.Options.Types.AllOptions.FromSubscriptionPosition(start),
Subscription = new ReadReq.Types.Options.Types.SubscriptionOptions(),
Filter = GetFilterOptions(filterOptions)!
}
Expand All @@ -79,136 +45,39 @@ private Task<StreamSubscription> SubscribeToAllAsync(Position start,
}

/// <summary>
/// Subscribes to all events from a <see cref="Position">checkpoint</see>. This is exclusive of.
/// Subscribes to a stream from a <see cref="StreamPosition">checkpoint</see>.
/// </summary>
/// <param name="start">A <see cref="Position"/> (exclusive of) to start the subscription from.</param>
/// <param name="start">A <see cref="SubscriptionStreamPosition"/> (exclusive of) to start the subscription from.</param>
/// <param name="streamName">The name of the stream to read events from.</param>
/// <param name="eventAppeared">A Task invoked and awaited when a new event is received over the subscription.</param>
/// <param name="configureOperationOptions">An <see cref="Action{EventStoreClientOperationOptions}"/> to configure the operation's options.</param>
/// <param name="resolveLinkTos">Whether to resolve LinkTo events automatically.</param>
/// <param name="subscriptionDropped">An action invoked if the subscription is dropped.</param>
/// <param name="filterOptions">The optional <see cref="SubscriptionFilterOptions"/> to apply.</param>
/// <param name="userCredentials">The optional user credentials to perform operation with.</param>
/// <param name="cancellationToken">The optional <see cref="System.Threading.CancellationToken"/>.</param>
/// <returns></returns>
public Task<StreamSubscription> SubscribeToAllAsync(Position start,
public Task<StreamSubscription> SubscribeToStreamAsync(string streamName,
SubscriptionStreamPosition start,
Func<StreamSubscription, ResolvedEvent, CancellationToken, Task> eventAppeared,
bool resolveLinkTos = false,
Action<StreamSubscription, SubscriptionDroppedReason, Exception?>? subscriptionDropped = default,
SubscriptionFilterOptions? filterOptions = null,
Action<EventStoreClientOperationOptions>? configureOperationOptions = null,
UserCredentials? userCredentials = null,
CancellationToken cancellationToken = default) {
var operationOptions = Settings.OperationOptions.Clone();
configureOperationOptions?.Invoke(operationOptions);

return SubscribeToAllAsync(start, eventAppeared, operationOptions, resolveLinkTos, subscriptionDropped,
filterOptions, userCredentials, cancellationToken);
}

private Task<StreamSubscription> SubscribeToStreamAsync(string streamName,
Func<StreamSubscription, ResolvedEvent, CancellationToken, Task> eventAppeared,
EventStoreClientOperationOptions operationOptions,
bool resolveLinkTos = false,
Action<StreamSubscription, SubscriptionDroppedReason, Exception?>? subscriptionDropped = default,
UserCredentials? userCredentials = null,
CancellationToken cancellationToken = default) {
operationOptions.TimeoutAfter = DeadLine.None;

return StreamSubscription.Confirm(ReadInternal(new ReadReq {
Options = new ReadReq.Types.Options {
ReadDirection = ReadReq.Types.Options.Types.ReadDirection.Forwards,
ResolveLinks = resolveLinkTos,
Stream = new ReadReq.Types.Options.Types.StreamOptions {
Start = new Empty(),
StreamIdentifier = streamName
},
Stream = ReadReq.Types.Options.Types.StreamOptions.FromSubscriptionPosition(streamName, start),
Subscription = new ReadReq.Types.Options.Types.SubscriptionOptions()
}
}, operationOptions, userCredentials, cancellationToken), eventAppeared, subscriptionDropped, _log,
cancellationToken: cancellationToken);
}

/// <summary>
/// Subscribes to all events in a stream. Use this when you have no <see cref="StreamPosition">checkpoint</see>.
/// </summary>
/// <param name="streamName">The name of the stream to read events from.</param>
/// <param name="eventAppeared">A Task invoked and awaited when a new event is received over the subscription.</param>
/// <param name="configureOperationOptions">An <see cref="Action{EventStoreClientOperationOptions}"/> to configure the operation's options.</param>
/// <param name="resolveLinkTos">Whether to resolve LinkTo events automatically.</param>
/// <param name="subscriptionDropped">An action invoked if the subscription is dropped.</param>
/// <param name="userCredentials">The optional user credentials to perform operation with.</param>
/// <param name="cancellationToken">The optional <see cref="System.Threading.CancellationToken"/>.</param>
/// <returns></returns>
public Task<StreamSubscription> SubscribeToStreamAsync(string streamName,
Func<StreamSubscription, ResolvedEvent, CancellationToken, Task> eventAppeared,
bool resolveLinkTos = false,
Action<StreamSubscription, SubscriptionDroppedReason, Exception?>? subscriptionDropped = default,
Action<EventStoreClientOperationOptions>? configureOperationOptions = null,
UserCredentials? userCredentials = null,
CancellationToken cancellationToken = default) {
var operationOptions = Settings.OperationOptions.Clone();
configureOperationOptions?.Invoke(operationOptions);

return SubscribeToStreamAsync(streamName, eventAppeared, operationOptions, resolveLinkTos,
subscriptionDropped,
userCredentials, cancellationToken);
}

private Task<StreamSubscription> SubscribeToStreamAsync(string streamName,
StreamPosition start,
Func<StreamSubscription, ResolvedEvent, CancellationToken, Task> eventAppeared,
EventStoreClientOperationOptions operationOptions,
bool resolveLinkTos = false,
Action<StreamSubscription, SubscriptionDroppedReason, Exception?>? subscriptionDropped = default,
UserCredentials? userCredentials = null,
CancellationToken cancellationToken = default) {
operationOptions.TimeoutAfter = DeadLine.None;

return StreamSubscription.Confirm(ReadInternal(new ReadReq {
Options = new ReadReq.Types.Options {
ReadDirection = ReadReq.Types.Options.Types.ReadDirection.Forwards,
ResolveLinks = resolveLinkTos,
Stream = start == StreamPosition.End
? new ReadReq.Types.Options.Types.StreamOptions {
End = new Empty(),
StreamIdentifier = streamName
}
: new ReadReq.Types.Options.Types.StreamOptions {
Revision = start,
StreamIdentifier = streamName
},
Subscription = new ReadReq.Types.Options.Types.SubscriptionOptions()
}
},
operationOptions, userCredentials, cancellationToken), eventAppeared, subscriptionDropped, _log,
cancellationToken: cancellationToken);
}

/// <summary>
/// Subscribes to a stream from a <see cref="StreamPosition">checkpoint</see>. This is exclusive of.
/// </summary>
/// <param name="start">A <see cref="Position"/> (exclusive of) to start the subscription from.</param>
/// <param name="streamName">The name of the stream to read events from.</param>
/// <param name="eventAppeared">A Task invoked and awaited when a new event is received over the subscription.</param>
/// <param name="configureOperationOptions">An <see cref="Action{EventStoreClientOperationOptions}"/> to configure the operation's options.</param>
/// <param name="resolveLinkTos">Whether to resolve LinkTo events automatically.</param>
/// <param name="subscriptionDropped">An action invoked if the subscription is dropped.</param>
/// <param name="userCredentials">The optional user credentials to perform operation with.</param>
/// <param name="cancellationToken">The optional <see cref="System.Threading.CancellationToken"/>.</param>
/// <returns></returns>
public Task<StreamSubscription> SubscribeToStreamAsync(string streamName,
StreamPosition start,
Func<StreamSubscription, ResolvedEvent, CancellationToken, Task> eventAppeared,
bool resolveLinkTos = false,
Action<StreamSubscription, SubscriptionDroppedReason, Exception?>? subscriptionDropped = default,
Action<EventStoreClientOperationOptions>? configureOperationOptions = null,
UserCredentials? userCredentials = null,
CancellationToken cancellationToken = default) {
var operationOptions = Settings.OperationOptions.Clone();
configureOperationOptions?.Invoke(operationOptions);

return SubscribeToStreamAsync(streamName, start, eventAppeared, operationOptions, resolveLinkTos,
subscriptionDropped, userCredentials, cancellationToken);
}
}
}
52 changes: 32 additions & 20 deletions src/EventStore.Client.Streams/Streams/ReadReq.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,25 @@ partial class Types {
partial class Options {
partial class Types {
partial class StreamOptions {
public static StreamOptions FromSubscriptionPosition(string streamName,
SubscriptionStreamPosition subscriptionPosition) =>
subscriptionPosition.TryGetUInt64(out var revision) switch {
false => new() {
StreamIdentifier = streamName,
Start = new Empty()
},
true when subscriptionPosition == SubscriptionStreamPosition.End => new() {
StreamIdentifier = streamName,
End = new Empty()
},
_ => new() {
StreamIdentifier = streamName,
Revision = revision
}
};

public static StreamOptions FromStreamNameAndRevision(
string streamName,
StreamPosition streamRevision) {
string streamName, StreamPosition streamRevision) {
if (streamName == null) {
throw new ArgumentNullException(nameof(streamName));
}
Expand All @@ -35,25 +51,21 @@ public static StreamOptions FromStreamNameAndRevision(
}

partial class AllOptions {
public static AllOptions FromPosition(Client.Position position) {
if (position == Client.Position.End) {
return new AllOptions {
End = new Empty()
};
}

if (position == Client.Position.Start) {
return new AllOptions {
Start = new Empty()
};
}

return new AllOptions {
Position = new Position {
CommitPosition = position.CommitPosition,
PreparePosition = position.PreparePosition
public static AllOptions FromSubscriptionPosition(SubscriptionPosition position) {
return position.TryGetUInt64(out var p) switch {
false => new() {Start = new()},
true when position == SubscriptionPosition.End => new() {End = new()},
_ => new() {
Position = new()
{CommitPosition = p.commitPosition, PreparePosition = p.preparePosition}
}
};
}; false => new() {Start = new()},
true when position == SubscriptionPosition.End => new() {End = new()},
_ => new() {
Position = new()
{CommitPosition = p.commitPosition, PreparePosition = p.preparePosition}
}

}
}
}
Expand Down
Loading

0 comments on commit 93ff2dd

Please sign in to comment.