Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restructured stream name for future planned changes #33

Merged
merged 2 commits into from
Jun 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ message ReadReq {
}

message Options {
string stream_name = 1;
event_store.client.StreamIdentifier stream_identifier = 1;
string group_name = 2;
int32 buffer_size = 3;
UUIDOption uuid_option = 4;
Expand Down Expand Up @@ -71,7 +71,7 @@ message ReadResp {
}
message RecordedEvent {
event_store.client.UUID id = 1;
string stream_name = 2;
event_store.client.StreamIdentifier stream_identifier = 2;
uint64 stream_revision = 3;
uint64 prepare_position = 4;
uint64 commit_position = 5;
Expand All @@ -89,7 +89,7 @@ message CreateReq {
Options options = 1;

message Options {
string stream_name = 1;
event_store.client.StreamIdentifier stream_identifier = 1;
string group_name = 2;
Settings settings = 3;
}
Expand Down Expand Up @@ -124,7 +124,7 @@ message UpdateReq {
Options options = 1;

message Options {
string stream_name = 1;
event_store.client.StreamIdentifier stream_identifier = 1;
string group_name = 2;
Settings settings = 3;
}
Expand Down Expand Up @@ -159,7 +159,7 @@ message DeleteReq {
Options options = 1;

message Options {
string stream_name = 1;
event_store.client.StreamIdentifier stream_identifier = 1;
string group_name = 2;
}
}
Expand Down
5 changes: 5 additions & 0 deletions src/EventStore.Client.Common/protos/shared.proto
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,8 @@ message UUID {
}
message Empty {
}

message StreamIdentifier {
reserved 1 to 2;
bytes streamName = 3;
}
10 changes: 5 additions & 5 deletions src/EventStore.Client.Common/protos/streams.proto
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ message ReadReq {
Backwards = 1;
}
message StreamOptions {
string stream_name = 1;
event_store.client.StreamIdentifier stream_identifier = 1;
oneof revision_option {
uint64 revision = 2;
event_store.client.Empty start = 3;
Expand Down Expand Up @@ -99,7 +99,7 @@ message ReadResp {

message RecordedEvent {
event_store.client.UUID id = 1;
string stream_name = 2;
event_store.client.StreamIdentifier stream_identifier = 2;
uint64 stream_revision = 3;
uint64 prepare_position = 4;
uint64 commit_position = 5;
Expand Down Expand Up @@ -127,7 +127,7 @@ message AppendReq {
}

message Options {
string stream_name = 1;
event_store.client.StreamIdentifier stream_identifier = 1;
oneof expected_stream_revision {
uint64 revision = 2;
event_store.client.Empty no_stream = 3;
Expand Down Expand Up @@ -182,7 +182,7 @@ message DeleteReq {
Options options = 1;

message Options {
string stream_name = 1;
event_store.client.StreamIdentifier stream_identifier = 1;
oneof expected_stream_revision {
uint64 revision = 2;
event_store.client.Empty no_stream = 3;
Expand All @@ -208,7 +208,7 @@ message TombstoneReq {
Options options = 1;

message Options {
string stream_name = 1;
event_store.client.StreamIdentifier stream_identifier = 1;
oneof expected_stream_revision {
uint64 revision = 2;
event_store.client.Empty no_stream = 3;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public async Task CreateAsync(string streamName, string groupName,

await _client.CreateAsync(new CreateReq {
Options = new CreateReq.Types.Options {
StreamName = streamName,
StreamIdentifier = streamName,
GroupName = groupName,
Settings = new CreateReq.Types.Settings {
Revision = settings.StartFrom,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ public async Task DeleteAsync(string streamName, string groupName, UserCredentia
CancellationToken cancellationToken = default) {
await _client.DeleteAsync(new DeleteReq {
Options = new DeleteReq.Types.Options {
StreamName = streamName,
StreamIdentifier = streamName,
GroupName = groupName
}
}, RequestMetadata.Create(userCredentials ?? Settings.DefaultCredentials), cancellationToken: cancellationToken);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public Task<PersistentSubscription> SubscribeAsync(string streamName, string gro
return PersistentSubscription.Confirm(call, new ReadReq.Types.Options {
BufferSize = bufferSize,
GroupName = groupName,
StreamName = streamName,
StreamIdentifier = streamName,
UuidOption = new ReadReq.Types.Options.Types.UUIDOption {Structured = new Empty()}
}, autoAck, eventAppeared,
subscriptionDropped ?? delegate { }, cancellationToken);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public async Task UpdateAsync(string streamName, string groupName, PersistentSub

await _client.UpdateAsync(new UpdateReq {
Options = new UpdateReq.Types.Options {
StreamName = streamName,
StreamIdentifier = streamName,
GroupName = groupName,
Settings = new UpdateReq.Types.Settings {
Revision = settings.StartFrom,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ ResolvedEvent ConvertToResolvedEvent(ReadResp response) =>
e == null
? null
: new EventRecord(
e.StreamName,
e.StreamIdentifier,
Uuid.FromDto(e.Id),
new StreamPosition(e.StreamRevision),
new Position(e.CommitPosition, e.PreparePosition),
Expand Down
14 changes: 7 additions & 7 deletions src/EventStore.Client.Streams/EventStoreClient.Append.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ private Task<IWriteResult> AppendToStreamAsync(

return AppendToStreamInternal(new AppendReq {
Options = new AppendReq.Types.Options {
StreamName = streamName,
StreamIdentifier = streamName,
Revision = expectedRevision
}
}, eventData, operationOptions, userCredentials, cancellationToken);
Expand Down Expand Up @@ -62,7 +62,7 @@ private Task<IWriteResult> AppendToStreamAsync(

return AppendToStreamInternal(new AppendReq {
Options = new AppendReq.Types.Options {
StreamName = streamName
StreamIdentifier = streamName
}
}.WithAnyStreamRevision(expectedState), eventData, operationOptions, userCredentials, cancellationToken);
}
Expand Down Expand Up @@ -106,7 +106,7 @@ private async Task<IWriteResult> AppendToStreamInternal(

foreach (var e in eventData) {
_log.LogTrace("Appending event to stream - {streamName}@{eventId} {eventType}.",
header.Options.StreamName, e.EventId, e.Type);
header.Options.StreamIdentifier, e.EventId, e.Type);
await call.RequestStream.WriteAsync(new AppendReq {
ProposedMessage = new AppendReq.Types.ProposedMessage {
Id = e.EventId.ToDto(),
Expand Down Expand Up @@ -135,7 +135,7 @@ await call.RequestStream.WriteAsync(new AppendReq {
response.Success.Position.PreparePosition)
: default);
_log.LogDebug("Append to stream succeeded - {streamName}@{logPosition}/{nextExpectedVersion}.",
header.Options.StreamName, writeResult.LogPosition, writeResult.NextExpectedVersion);
header.Options.StreamIdentifier, writeResult.LogPosition, writeResult.NextExpectedVersion);
} else {
if (response.WrongExpectedVersion != null) {
var expectedRevision = response.WrongExpectedVersion.ExpectedRevisionOptionCase switch {
Expand All @@ -154,15 +154,15 @@ await call.RequestStream.WriteAsync(new AppendReq {

_log.LogDebug(
"Append to stream failed with Wrong Expected Version - {streamName}/{expectedRevision}/{currentRevision}",
header.Options.StreamName, expectedRevision, currentRevision);
header.Options.StreamIdentifier, expectedRevision, currentRevision);

if (operationOptions.ThrowOnAppendFailure) {
throw new WrongExpectedVersionException(header.Options.StreamName, expectedRevision,
throw new WrongExpectedVersionException(header.Options.StreamIdentifier, expectedRevision,
currentRevision);
}

writeResult = new WrongExpectedVersionResult(
header.Options.StreamName, expectedRevision, currentRevision);
header.Options.StreamIdentifier, expectedRevision, currentRevision);
} else {
throw new InvalidOperationException("The operation completed with an unexpected result.");
}
Expand Down
6 changes: 3 additions & 3 deletions src/EventStore.Client.Streams/EventStoreClient.Delete.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ private Task<DeleteResult> SoftDeleteAsync(
CancellationToken cancellationToken = default) =>
DeleteInternal(new DeleteReq {
Options = new DeleteReq.Types.Options {
StreamName = streamName
StreamIdentifier = streamName
}
}.WithAnyStreamRevision(expectedState), operationOptions, userCredentials, cancellationToken);

Expand All @@ -71,15 +71,15 @@ private Task<DeleteResult> SoftDeleteAsync(
CancellationToken cancellationToken = default) =>
DeleteInternal(new DeleteReq {
Options = new DeleteReq.Types.Options {
StreamName = streamName,
StreamIdentifier = streamName,
Revision = expectedRevision
}
}, operationOptions, userCredentials, cancellationToken);

private async Task<DeleteResult> DeleteInternal(DeleteReq request, EventStoreClientOperationOptions operationOptions,
UserCredentials? userCredentials,
CancellationToken cancellationToken) {
_log.LogDebug("Deleting stream {streamName}.", request.Options.StreamName);
_log.LogDebug("Deleting stream {streamName}.", request.Options.StreamIdentifier);
var result = await _client.DeleteAsync(request, RequestMetadata.Create(userCredentials ?? Settings.DefaultCredentials),
deadline: DeadLine.After(operationOptions.TimeoutAfter), cancellationToken);

Expand Down
4 changes: 2 additions & 2 deletions src/EventStore.Client.Streams/EventStoreClient.Metadata.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ private Task<IWriteResult> SetStreamMetadataAsync(string streamName, StreamState
UserCredentials? userCredentials = null, CancellationToken cancellationToken = default)
=> SetStreamMetadataInternal(metadata, new AppendReq {
Options = new AppendReq.Types.Options {
StreamName = SystemStreams.MetastreamOf(streamName)
StreamIdentifier = SystemStreams.MetastreamOf(streamName)
}
}.WithAnyStreamRevision(expectedState), operationOptions, userCredentials, cancellationToken);

Expand Down Expand Up @@ -82,7 +82,7 @@ private Task<IWriteResult> SetStreamMetadataAsync(string streamName, StreamRevis
UserCredentials? userCredentials = null, CancellationToken cancellationToken = default)
=> SetStreamMetadataInternal(metadata, new AppendReq {
Options = new AppendReq.Types.Options {
StreamName = SystemStreams.MetastreamOf(streamName),
StreamIdentifier = SystemStreams.MetastreamOf(streamName),
Revision = expectedRevision
}
}, operationOptions, userCredentials, cancellationToken);
Expand Down
4 changes: 2 additions & 2 deletions src/EventStore.Client.Streams/EventStoreClient.Read.cs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ public ReadStreamResult(
throw new ArgumentOutOfRangeException("count");
}

_streamName = request.Options.Stream.StreamName;
_streamName = request.Options.Stream.StreamIdentifier;

if (request.Options.Filter == null) {
request.Options.NoFilter = new Empty();
Expand Down Expand Up @@ -271,7 +271,7 @@ private static ResolvedEvent ConvertToResolvedEvent(ReadResp.Types.ReadEvent rea
e == null
? null
: new EventRecord(
e.StreamName,
e.StreamIdentifier,
Uuid.FromDto(e.Id),
new StreamPosition(e.StreamRevision),
new Position(e.CommitPosition, e.PreparePosition),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ private Task<StreamSubscription> SubscribeToStreamAsync(string streamName,
ResolveLinks = resolveLinkTos,
Stream = new ReadReq.Types.Options.Types.StreamOptions {
Start = new Empty(),
StreamName = streamName
StreamIdentifier = streamName
},
Subscription = new ReadReq.Types.Options.Types.SubscriptionOptions()
}
Expand Down Expand Up @@ -171,11 +171,11 @@ private Task<StreamSubscription> SubscribeToStreamAsync(string streamName,
Stream = start == StreamPosition.End
? new ReadReq.Types.Options.Types.StreamOptions {
End = new Empty(),
StreamName = streamName
StreamIdentifier = streamName
}
: new ReadReq.Types.Options.Types.StreamOptions {
Revision = start,
StreamName = streamName
StreamIdentifier = streamName
},
Subscription = new ReadReq.Types.Options.Types.SubscriptionOptions()
}
Expand Down
6 changes: 3 additions & 3 deletions src/EventStore.Client.Streams/EventStoreClient.Tombstone.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ private Task<DeleteResult> TombstoneAsync(
CancellationToken cancellationToken = default) =>
TombstoneInternal(new TombstoneReq {
Options = new TombstoneReq.Types.Options {
StreamName = streamName,
StreamIdentifier = streamName,
Revision = expectedRevision
}
}, operationOptions, userCredentials, cancellationToken);
Expand Down Expand Up @@ -50,7 +50,7 @@ private Task<DeleteResult> TombstoneAsync(
CancellationToken cancellationToken = default) =>
TombstoneInternal(new TombstoneReq {
Options = new TombstoneReq.Types.Options {
StreamName = streamName
StreamIdentifier = streamName
}
}.WithAnyStreamRevision(expectedState), operationOptions, userCredentials, cancellationToken);

Expand Down Expand Up @@ -79,7 +79,7 @@ public Task<DeleteResult> TombstoneAsync(
private async Task<DeleteResult> TombstoneInternal(TombstoneReq request,
EventStoreClientOperationOptions operationOptions, UserCredentials? userCredentials,
CancellationToken cancellationToken) {
_log.LogDebug("Tombstoning stream {streamName}.", request.Options.StreamName);
_log.LogDebug("Tombstoning stream {streamName}.", request.Options.StreamIdentifier);

var result = await _client.TombstoneAsync(request, RequestMetadata.Create(userCredentials ?? Settings.DefaultCredentials),
deadline: DeadLine.After(operationOptions.TimeoutAfter), cancellationToken);
Expand Down
6 changes: 3 additions & 3 deletions src/EventStore.Client.Streams/Streams/ReadReq.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,20 @@ public static StreamOptions FromStreamNameAndRevision(

if (streamRevision == StreamPosition.End) {
return new StreamOptions {
StreamName = streamName,
StreamIdentifier = streamName,
End = new Empty()
};
}

if (streamRevision == StreamPosition.Start) {
return new StreamOptions {
StreamName = streamName,
StreamIdentifier = streamName,
Start = new Empty()
};
}

return new StreamOptions {
StreamName = streamName,
StreamIdentifier = streamName,
Revision = streamRevision
};
}
Expand Down
20 changes: 20 additions & 0 deletions src/EventStore.Client/StreamIdentifier.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
using System.Text;
using Google.Protobuf;

namespace EventStore.Client {
public partial class StreamIdentifier{private string _cached;
public static implicit operator string(StreamIdentifier source) {
if (source._cached != null || source.StreamName.IsEmpty) return source._cached;
var tmp = Encoding.UTF8.GetString(source.StreamName.Span);
//this doesn't have to be thread safe, its just a cache in case the identifier is turned into a string several times
source._cached = tmp;
return source._cached;
}

public static implicit operator StreamIdentifier(string source) {
var result = new StreamIdentifier();
result.StreamName = ByteString.CopyFromUtf8(source);
return result;
}
}
}