Skip to content

Commit

Permalink
Merge pull request #172 from thefringeninja/b0rked-append
Browse files Browse the repository at this point in the history
Detect Server Capabilities
  • Loading branch information
YoEight committed Jan 26, 2022
2 parents 4723a87 + 953e9ab commit 0c2e9e2
Show file tree
Hide file tree
Showing 89 changed files with 2,180 additions and 1,095 deletions.
5 changes: 2 additions & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ jobs:
strategy:
fail-fast: false
matrix:
framework: [netcoreapp3.1, net5.0, net48]
framework: [netcoreapp3.1, net5.0]
os: [ubuntu-18.04, windows-latest]
runs-on: ${{ matrix.os }}
name: scan-vulnerabilities/${{ matrix.os }}/${{ matrix.framework }}
Expand All @@ -41,7 +41,7 @@ jobs:
strategy:
fail-fast: false
matrix:
framework: [netcoreapp3.1, net5.0, net48]
framework: [netcoreapp3.1, net5.0]
os: [ubuntu-18.04]
test: ["", .Streams, .PersistentSubscriptions, .Operations, .UserManagement, .ProjectionManagement]
configuration: [release]
Expand All @@ -60,7 +60,6 @@ jobs:
docker pull ghcr.io/eventstore/eventstore:${{ matrix.docker-tag }}
- name: Install netcoreapp3.1
uses: actions/setup-dotnet@v1
if: matrix.framework == 'netcoreapp3.1'
with:
dotnet-version: 3.1.x
- name: Install net5.0
Expand Down
12 changes: 2 additions & 10 deletions Directory.Build.props
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<Project>
<PropertyGroup>
<TargetFrameworks>net48;netcoreapp3.1;net5.0</TargetFrameworks>
<TargetFrameworks>netcoreapp3.1;net5.0</TargetFrameworks>
<Platform>x64</Platform>
<GenerateAssemblyInfo>true</GenerateAssemblyInfo>
<Nullable>disable</Nullable>
Expand All @@ -14,15 +14,7 @@
<RootNamespace>EventStore.Client</RootNamespace>
<UseLinkBase>true</UseLinkBase>
</PropertyGroup>
<PropertyGroup Condition="'$(TargetFramework)' == 'net48' Or '$(TargetFramework)' == 'netcoreapp3.1'">
<PropertyGroup Condition="'$(TargetFramework)' == 'netcoreapp3.1'">
<DefineConstants>$(DefineConstants);GRPC_CORE</DefineConstants>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="NETStandard.Library" Version="2.0.3"/>
<PackageReference Include="System.Net.Http" Version="4.3.4"/>
</ItemGroup>
<ItemGroup Condition="'$(TargetFramework)' == 'net48'">
<PackageReference Include="IndexRange" Version="1.0.0" />
<PackageReference Include="Microsoft.NETFramework.ReferenceAssemblies" Version="1.0.0" PrivateAssets="All" />
</ItemGroup>
</Project>
5 changes: 3 additions & 2 deletions src/Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@
<Copyright>Copyright 2012-2020 Event Store Ltd</Copyright>
<MinVerTagPrefix>v</MinVerTagPrefix>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
<GrpcPackageVersion>2.38.0</GrpcPackageVersion>
<GrpcPackageVersion>2.40.0</GrpcPackageVersion>
<GrpcCorePackageVersion>2.42.0</GrpcCorePackageVersion>
</PropertyGroup>
<ItemGroup>
<None Include="..\..\LICENSE.md" Pack="true" PackagePath="\"/>
Expand All @@ -36,7 +37,7 @@
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Include="Grpc.Tools" Version="$(GrpcPackageVersion)">
<PackageReference Include="Grpc.Tools" Version="$(GrpcCorePackageVersion)">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers</IncludeAssets>
</PackageReference>
Expand Down
13 changes: 0 additions & 13 deletions src/EventStore.Client.Common/DeconstructionExtensions.cs

This file was deleted.

8 changes: 1 addition & 7 deletions src/EventStore.Client.Common/EpochExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,7 @@
#nullable enable
namespace EventStore.Client {
internal static class EpochExtensions {
private static readonly DateTime UnixEpoch =
#if NETFRAMEWORK
new DateTime(1970, 1, 1, 0, 0, 0, 0, DateTimeKind.Utc)
#else
DateTime.UnixEpoch
#endif
;
private static readonly DateTime UnixEpoch = DateTime.UnixEpoch;

public static DateTime FromTicksSinceEpoch(this long value) =>
new DateTime(UnixEpoch.Ticks + value, DateTimeKind.Utc);
Expand Down
19 changes: 19 additions & 0 deletions src/EventStore.Client.Common/protos/serverfeatures.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
syntax = "proto3";
package event_store.client.server_features;
option java_package = "com.eventstore.dbclient.proto.serverfeatures";
import "shared.proto";

service ServerFeatures {
rpc GetSupportedMethods (event_store.client.Empty) returns (SupportedMethods);
}

message SupportedMethods {
repeated SupportedMethod methods = 1;
string event_store_server_version = 2;
}

message SupportedMethod {
string method_name = 1;
string service_name = 2;
repeated string features = 3;
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@ public partial class EventStoreOperationsClient {
public async Task ShutdownAsync(
UserCredentials? userCredentials = null,
CancellationToken cancellationToken = default) {
await new Operations.Operations.OperationsClient(
await SelectCallInvoker(cancellationToken).ConfigureAwait(false)).ShutdownAsync(EmptyResult,
var channelInfo = await GetChannelInfo(cancellationToken).ConfigureAwait(false);
using var call = new Operations.Operations.OperationsClient(
channelInfo.CallInvoker).ShutdownAsync(EmptyResult,
EventStoreCallOptions.Create(Settings, Settings.OperationOptions, userCredentials, cancellationToken));
await call.ResponseAsync.ConfigureAwait(false);
}

/// <summary>
Expand All @@ -30,9 +32,11 @@ await SelectCallInvoker(cancellationToken).ConfigureAwait(false)).ShutdownAsync(
public async Task MergeIndexesAsync(
UserCredentials? userCredentials = null,
CancellationToken cancellationToken = default) {
await new Operations.Operations.OperationsClient(
await SelectCallInvoker(cancellationToken).ConfigureAwait(false)).MergeIndexesAsync(EmptyResult,
var channelInfo = await GetChannelInfo(cancellationToken).ConfigureAwait(false);
using var call = new Operations.Operations.OperationsClient(
channelInfo.CallInvoker).MergeIndexesAsync(EmptyResult,
EventStoreCallOptions.Create(Settings, Settings.OperationOptions, userCredentials, cancellationToken));
await call.ResponseAsync.ConfigureAwait(false);
}

/// <summary>
Expand All @@ -44,9 +48,11 @@ await SelectCallInvoker(cancellationToken).ConfigureAwait(false)).MergeIndexesAs
public async Task ResignNodeAsync(
UserCredentials? userCredentials = null,
CancellationToken cancellationToken = default) {
await new Operations.Operations.OperationsClient(
await SelectCallInvoker(cancellationToken).ConfigureAwait(false)).ResignNodeAsync(EmptyResult,
var channelInfo = await GetChannelInfo(cancellationToken).ConfigureAwait(false);
using var call = new Operations.Operations.OperationsClient(
channelInfo.CallInvoker).ResignNodeAsync(EmptyResult,
EventStoreCallOptions.Create(Settings, Settings.OperationOptions, userCredentials, cancellationToken));
await call.ResponseAsync.ConfigureAwait(false);
}

/// <summary>
Expand All @@ -59,10 +65,12 @@ await SelectCallInvoker(cancellationToken).ConfigureAwait(false)).ResignNodeAsyn
public async Task SetNodePriorityAsync(int nodePriority,
UserCredentials? userCredentials = null,
CancellationToken cancellationToken = default) {
await new Operations.Operations.OperationsClient(
await SelectCallInvoker(cancellationToken).ConfigureAwait(false)).SetNodePriorityAsync(
var channelInfo = await GetChannelInfo(cancellationToken).ConfigureAwait(false);
using var call = new Operations.Operations.OperationsClient(
channelInfo.CallInvoker).SetNodePriorityAsync(
new SetNodePriorityReq {Priority = nodePriority},
EventStoreCallOptions.Create(Settings, Settings.OperationOptions, userCredentials, cancellationToken));
await call.ResponseAsync.ConfigureAwait(false);
}

/// <summary>
Expand All @@ -73,10 +81,12 @@ await SelectCallInvoker(cancellationToken).ConfigureAwait(false)).SetNodePriorit
/// <returns></returns>
public async Task RestartPersistentSubscriptions(UserCredentials? userCredentials = null,
CancellationToken cancellationToken = default) {
await new Operations.Operations.OperationsClient(
await SelectCallInvoker(cancellationToken).ConfigureAwait(false)).RestartPersistentSubscriptionsAsync(
var channelInfo = await GetChannelInfo(cancellationToken).ConfigureAwait(false);
using var call = new Operations.Operations.OperationsClient(
channelInfo.CallInvoker).RestartPersistentSubscriptionsAsync(
EmptyResult,
EventStoreCallOptions.Create(Settings, Settings.OperationOptions, userCredentials, cancellationToken));
await call.ResponseAsync.ConfigureAwait(false);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ public async Task<DatabaseScavengeResult> StartScavengeAsync(
throw new ArgumentOutOfRangeException(nameof(startFromChunk));
}

var result = await new Operations.Operations.OperationsClient(
await SelectCallInvoker(cancellationToken).ConfigureAwait(false)).StartScavengeAsync(
var channelInfo = await GetChannelInfo(cancellationToken).ConfigureAwait(false);
using var call = new Operations.Operations.OperationsClient(
channelInfo.CallInvoker).StartScavengeAsync(
new StartScavengeReq {
Options = new StartScavengeReq.Types.Options {
ThreadCount = threadCount,
Expand All @@ -38,6 +39,7 @@ await SelectCallInvoker(cancellationToken).ConfigureAwait(false)).StartScavengeA
},
EventStoreCallOptions.Create(Settings, Settings.OperationOptions, userCredentials,
cancellationToken));
var result = await call.ResponseAsync.ConfigureAwait(false);

return result.ScavengeResult switch {
ScavengeResp.Types.ScavengeResult.Started => DatabaseScavengeResult.Started(result.ScavengeId),
Expand All @@ -58,8 +60,9 @@ public async Task<DatabaseScavengeResult> StopScavengeAsync(
string scavengeId,
UserCredentials? userCredentials = null,
CancellationToken cancellationToken = default) {
var channelInfo = await GetChannelInfo(cancellationToken).ConfigureAwait(false);
var result = await new Operations.Operations.OperationsClient(
await SelectCallInvoker(cancellationToken).ConfigureAwait(false)).StopScavengeAsync(new StopScavengeReq {
channelInfo.CallInvoker).StopScavengeAsync(new StopScavengeReq {
Options = new StopScavengeReq.Types.Options {
ScavengeId = scavengeId
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ partial class EventStorePersistentSubscriptionsClient {
[SystemConsumerStrategies.Pinned] = CreateReq.Types.ConsumerStrategy.Pinned,
};

private static CreateReq.Types.StreamOptions StreamOptionsForCreateProto(string streamName, StreamPosition position) {
private static CreateReq.Types.StreamOptions StreamOptionsForCreateProto(string streamName,
StreamPosition position) {
if (position == StreamPosition.Start) {
return new CreateReq.Types.StreamOptions {
StreamIdentifier = streamName,
Expand Down Expand Up @@ -42,8 +43,7 @@ private static CreateReq.Types.AllOptions AllOptionsForCreateProto(Position posi
allOptions = new CreateReq.Types.AllOptions {
Start = new Empty(),
};
}
else if (position == Position.End) {
} else if (position == Position.End) {
allOptions = new CreateReq.Types.AllOptions {
End = new Empty()
};
Expand Down Expand Up @@ -153,33 +153,53 @@ private async Task CreateInternalAsync(string streamName, string groupName, IEve
throw new ArgumentNullException(nameof(settings));
}

if (streamName != SystemStreams.AllStream && settings.StartFrom != null && !(settings.StartFrom is StreamPosition)) {
throw new ArgumentException($"{nameof(settings.StartFrom)} must be of type '{nameof(StreamPosition)}' when subscribing to a stream");
if (streamName != SystemStreams.AllStream && settings.StartFrom != null &&
!(settings.StartFrom is StreamPosition)) {
throw new ArgumentException(
$"{nameof(settings.StartFrom)} must be of type '{nameof(StreamPosition)}' when subscribing to a stream");
}

if (streamName == SystemStreams.AllStream && settings.StartFrom != null && !(settings.StartFrom is Position)) {
throw new ArgumentException($"{nameof(settings.StartFrom)} must be of type '{nameof(Position)}' when subscribing to {SystemStreams.AllStream}");
if (streamName == SystemStreams.AllStream && settings.StartFrom != null &&
!(settings.StartFrom is Position)) {
throw new ArgumentException(
$"{nameof(settings.StartFrom)} must be of type '{nameof(Position)}' when subscribing to {SystemStreams.AllStream}");
}

if (eventFilter != null && streamName != SystemStreams.AllStream) {
throw new ArgumentException($"Filters are only supported when subscribing to {SystemStreams.AllStream}");
throw new ArgumentException(
$"Filters are only supported when subscribing to {SystemStreams.AllStream}");
}

await new PersistentSubscriptions.PersistentSubscriptions.PersistentSubscriptionsClient(
await SelectCallInvoker(cancellationToken).ConfigureAwait(false)).CreateAsync(new CreateReq {
var channelInfo = await GetChannelInfo(cancellationToken).ConfigureAwait(false);

if (streamName == SystemStreams.AllStream &&
!channelInfo.ServerCapabilities.SupportsPersistentSubscriptionsToAll) {
throw new NotSupportedException("The server does not support persistent subscriptions to $all.");
}

using var call = new PersistentSubscriptions.PersistentSubscriptions.PersistentSubscriptionsClient(
channelInfo.CallInvoker).CreateAsync(new CreateReq {
Options = new CreateReq.Types.Options {
Stream = streamName != SystemStreams.AllStream ?
StreamOptionsForCreateProto(streamName, (StreamPosition)(settings.StartFrom ?? StreamPosition.End)) : null,
All = streamName == SystemStreams.AllStream ?
AllOptionsForCreateProto((Position)(settings.StartFrom ?? Position.End), eventFilter) : null,
#pragma warning disable 612
StreamIdentifier = streamName != SystemStreams.AllStream ? streamName : string.Empty, /*for backwards compatibility*/
#pragma warning restore 612
Stream = streamName != SystemStreams.AllStream
? StreamOptionsForCreateProto(streamName,
(StreamPosition)(settings.StartFrom ?? StreamPosition.End))
: null,
All = streamName == SystemStreams.AllStream
? AllOptionsForCreateProto((Position)(settings.StartFrom ?? Position.End), eventFilter)
: null,
#pragma warning disable 612
StreamIdentifier =
streamName != SystemStreams.AllStream
? streamName
: string.Empty, /*for backwards compatibility*/
#pragma warning restore 612
GroupName = groupName,
Settings = new CreateReq.Types.Settings {
#pragma warning disable 612
Revision = streamName != SystemStreams.AllStream ? ((StreamPosition)(settings.StartFrom ?? StreamPosition.End)).ToUInt64() : default, /*for backwards compatibility*/
#pragma warning restore 612
#pragma warning disable 612
Revision = streamName != SystemStreams.AllStream
? ((StreamPosition)(settings.StartFrom ?? StreamPosition.End)).ToUInt64()
: default, /*for backwards compatibility*/
#pragma warning restore 612
CheckpointAfterMs = (int)settings.CheckPointAfter.TotalMilliseconds,
ExtraStatistics = settings.ExtraStatistics,
MessageTimeoutMs = (int)settings.MessageTimeout.TotalMilliseconds,
Expand All @@ -195,6 +215,7 @@ await SelectCallInvoker(cancellationToken).ConfigureAwait(false)).CreateAsync(ne
}
}
}, EventStoreCallOptions.Create(Settings, Settings.OperationOptions, userCredentials, cancellationToken));
await call.ResponseAsync.ConfigureAwait(false);
}

/// <summary>
Expand All @@ -210,12 +231,12 @@ public async Task CreateToAllAsync(string groupName, IEventFilter eventFilter,
PersistentSubscriptionSettings settings, UserCredentials? userCredentials = null,
CancellationToken cancellationToken = default) =>
await CreateInternalAsync(
streamName: SystemStreams.AllStream,
groupName: groupName,
eventFilter: eventFilter,
settings: settings,
userCredentials: userCredentials,
cancellationToken: cancellationToken)
streamName: SystemStreams.AllStream,
groupName: groupName,
eventFilter: eventFilter,
settings: settings,
userCredentials: userCredentials,
cancellationToken: cancellationToken)
.ConfigureAwait(false);

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using EventStore.Client.PersistentSubscriptions;
Expand All @@ -15,6 +16,13 @@ partial class EventStorePersistentSubscriptionsClient {
/// <returns></returns>
public async Task DeleteAsync(string streamName, string groupName, UserCredentials? userCredentials = null,
CancellationToken cancellationToken = default) {
var channelInfo = await GetChannelInfo(cancellationToken).ConfigureAwait(false);

if (streamName == SystemStreams.AllStream &&
!channelInfo.ServerCapabilities.SupportsPersistentSubscriptionsToAll) {
throw new NotSupportedException("The server does not support persistent subscriptions to $all.");
}

var deleteOptions = new DeleteReq.Types.Options {
GroupName = groupName
};
Expand All @@ -25,10 +33,12 @@ public async Task DeleteAsync(string streamName, string groupName, UserCredentia
deleteOptions.StreamIdentifier = streamName;
}

await new PersistentSubscriptions.PersistentSubscriptions.PersistentSubscriptionsClient(
await SelectCallInvoker(cancellationToken).ConfigureAwait(false)).DeleteAsync(new DeleteReq {
Options = deleteOptions
}, EventStoreCallOptions.Create(Settings, Settings.OperationOptions, userCredentials, cancellationToken));
using var call =
new PersistentSubscriptions.PersistentSubscriptions.PersistentSubscriptionsClient(channelInfo.CallInvoker)
.DeleteAsync(new DeleteReq {Options = deleteOptions},
EventStoreCallOptions.Create(Settings, Settings.OperationOptions, userCredentials,
cancellationToken));
await call.ResponseAsync.ConfigureAwait(false);
}

/// <summary>
Expand Down
Loading

0 comments on commit 0c2e9e2

Please sign in to comment.