Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add CosmosStoreSource #103

Merged
merged 3 commits into from
Mar 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ The `Unreleased` section name is replaced by the expected version of next releas
## [Unreleased]

### Added

- `Propulsion.CosmosStore.CosmosStoreSource` - equivalents of features in `Propulsion.Cosmos` [#103](https://github.com/jet/propulsion/pull/103)

### Changed
### Removed
### Fixed
Expand All @@ -18,7 +21,7 @@ The `Unreleased` section name is replaced by the expected version of next releas

### Added

- `Propulsion.CosmosStore.CosmosStoreSink`+`CosmosStorePruner` - equivalents of equivalent features in `Propulsion.Cosmos` [#89](https://github.com/jet/propulsion/pull/89)
- `Propulsion.CosmosStore.CosmosStoreSink`+`CosmosStorePruner` - equivalents of features in `Propulsion.Cosmos` [#89](https://github.com/jet/propulsion/pull/89)
- `StreamSpan.Version`, `SpanResult.NoneProcessed` [#102](https://github.com/jet/propulsion/pull/102)

### Changed
Expand Down
12 changes: 6 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@ The components within this repository are delivered as a multi-targeted Nuget pa
- `Propulsion` [![NuGet](https://img.shields.io/nuget/v/Propulsion.svg)](https://www.nuget.org/packages/Propulsion/) Implements core functionality in a channel-independent fashion including `ParallelProjector`, `StreamsProjector`. [Depends](https://www.fuget.org/packages/Propulsion) on `MathNet.Numerics`, `Serilog`
- `Propulsion.Cosmos` [![NuGet](https://img.shields.io/nuget/v/Propulsion.Cosmos.svg)](https://www.nuget.org/packages/Propulsion.Cosmos/) Provides bindings to Azure CosmosDB. [Depends](https://www.fuget.org/packages/Propulsion.Cosmos) on `Equinox.Cosmos`, `Microsoft.Azure.DocumentDB.ChangeFeedProcessor`, `Serilog`

1. writing to `Equinox.Cosmos` :- `CosmosSink`
2. reading from CosmosDb's ChangeFeed by wrapping the [`dotnet-changefeedprocessor` library](https://github.com/Azure/azure-documentdb-changefeedprocessor-dotnet) :- `CosmosSource`.
3. pruning `Equinox.Cosmos` :- `CosmosPruner`
1. `CosmosSource`: reading from CosmosDb's ChangeFeed by wrapping the [`dotnet-changefeedprocessor` library](https://github.com/Azure/azure-documentdb-changefeedprocessor-dotnet).
2. `CosmosSink`: writing to `Equinox.Cosmos`.
3. `CosmosPruner`: pruning `Equinox.Cosmos`.

- `Propulsion.CosmosStore` [![NuGet](https://img.shields.io/nuget/v/Propulsion.CosmosStore.svg)](https://www.nuget.org/packages/Propulsion.CosmosStore/) Provides bindings to Azure CosmosDB. [Depends](https://www.fuget.org/packages/Propulsion.CosmosStore) on `Equinox.CosmosStore`

1. writing to `Equinox.CosmosStore` :- `CosmosStoreSink`
2. pruning from `Equinox.CosmosStore` :- `CosmosStorePruner`.
3. **TODO `CosmosSource`: Not yet implemented as [the Azure Cosmos SDK Team have yet to re-expose CheckpointAsync and other such required APIs](https://github.com/jet/propulsion/issues/15)**
1. `CosmosStoreSource`: reading from CosmosDb's ChangeFeed by wrapping the [`dotnet-changefeedprocessor` library](https://github.com/Azure/azure-documentdb-changefeedprocessor-dotnet). **NOTE not yet implemented using the V3 SDK as yet as [the Azure Cosmos SDK Team have yet to re-expose CheckpointAsync and other such required APIs](https://github.com/jet/propulsion/issues/15)**
2. `CosmosStoreSink`: writing to `Equinox.CosmosStore`.
3. `CosmosStorePruner`: pruning from `Equinox.CosmosStore`.

- `Propulsion.EventStore` [![NuGet](https://img.shields.io/nuget/v/Propulsion.EventStore.svg)](https://www.nuget.org/packages/Propulsion.EventStore/). Provides bindings to [EventStore](https://www.eventstore.org), writing via `Propulsion.EventStore.EventStoreSink` [Depends](https://www.fuget.org/packages/Propulsion.EventStore) on `Equinox.EventStore`, `Serilog`
- `Propulsion.Kafka` [![NuGet](https://img.shields.io/nuget/v/Propulsion.Kafka.svg)](https://www.nuget.org/packages/Propulsion.Kafka/) Provides bindings for producing and consuming both streamwise and in parallel. Includes a standard codec for use with streamwise projection and consumption, `Propulsion.Kafka.Codec.NewtonsoftJson.RenderedSpan`. [Depends](https://www.fuget.org/packages/Propulsion.Kafka) on `FsKafka` v `1.5.0`, `Serilog`
Expand Down
4 changes: 4 additions & 0 deletions src/Propulsion.Cosmos/ChangeFeedProcessor.fs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
#if COSMOSSTORE
namespace Propulsion.CosmosStore
#else
namespace Propulsion.Cosmos
#endif

open Microsoft.Azure.Documents
open Microsoft.Azure.Documents.Client
Expand Down
11 changes: 10 additions & 1 deletion src/Propulsion.Cosmos/CosmosSource.fs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
namespace Propulsion.Cosmos
#if COSMOSSTORE
namespace Propulsion.CosmosStore
#else
namespace Propulsion.Cosmos
#endif

open Equinox.Core // Stopwatch.Time
open Microsoft.Azure.Documents
Expand Down Expand Up @@ -37,7 +41,12 @@ module Log =
| true, SerilogScalar (:? Metric as e) -> Some e
| _ -> None

#if COSMOSSTORE
type CosmosStoreSource =
#else
type CosmosSource =
#endif

static member CreateObserver<'Items,'Batch>
( log : ILogger, context : ChangeFeedObserverContext,
createIngester : ILogger * int -> Propulsion.Ingestion.Ingester<'Items,'Batch>,
Expand Down
12 changes: 10 additions & 2 deletions src/Propulsion.Cosmos/EquinoxCosmosParser.fs
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
#if COSMOSSTORE
namespace Propulsion.CosmosStore

open Equinox.CosmosStore.Core
#else
namespace Propulsion.Cosmos

open Equinox.Cosmos.Store
#endif

open Microsoft.Azure.Documents
open Propulsion.Streams

Expand All @@ -19,11 +27,11 @@ module EquinoxCosmosParser =
&& d.GetPropertyValue "n" <> null && d.GetPropertyValue "e" <> null

/// Enumerates the events represented within a batch
let enumEquinoxCosmosEvents (batch : Equinox.Cosmos.Store.Batch) : StreamEvent<byte[]> seq =
let enumEquinoxCosmosEvents (batch : Batch) : StreamEvent<byte[]> seq =
let streamName = FsCodec.StreamName.parse batch.p // we expect all Equinox data to adhere to "{category}-{aggregateId}" form (or we'll throw)
batch.e |> Seq.mapi (fun offset x -> { stream = streamName; event = FsCodec.Core.TimelineEvent.Create(batch.i+int64 offset, x.c, x.d, x.m, timestamp=x.t) })

/// Collects all events with a Document [typically obtained via the CosmosDb ChangeFeed] that potentially represents an Equinox.Cosmos event-batch
let enumStreamEvents (d : Document) : StreamEvent<byte[]> seq =
if isEquinoxBatch d then d.Cast<Equinox.Cosmos.Store.Batch>() |> enumEquinoxCosmosEvents
if isEquinoxBatch d then d.Cast<Batch>() |> enumEquinoxCosmosEvents
else Seq.empty
10 changes: 7 additions & 3 deletions src/Propulsion.Cosmos/Infrastructure.fs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
namespace Propulsion.Cosmos
#if COSMOSSTORE
namespace Propulsion.CosmosStore
#else
namespace Propulsion.Cosmos
#endif

open System
open System.Threading
Expand All @@ -11,7 +15,7 @@ open System.Threading.Tasks
module private AsyncHelpers =
type Async with
/// Asynchronously awaits the next keyboard interrupt event
static member AwaitKeyboardInterrupt () : Async<unit> =
static member AwaitKeyboardInterrupt () : Async<unit> =
Async.FromContinuations(fun (sc,_,_) ->
let isDisposed = ref 0
let rec callback _ = Task.Run(fun () -> if Interlocked.Increment isDisposed = 1 then d.Dispose() ; sc ()) |> ignore
Expand Down Expand Up @@ -41,4 +45,4 @@ module private AsyncHelpers =
elif t.IsCanceled then ek (TaskCanceledException("Task wrapped with Async has been cancelled."))
elif t.IsCompleted then k ()
else ek(Exception "invalid Task state!"))
|> ignore
|> ignore
8 changes: 8 additions & 0 deletions src/Propulsion.Cosmos/PropulsionCosmosPrometheus.fs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
#if COSMOSSTORE
namespace Propulsion.CosmosStore.Prometheus
#else
namespace Propulsion.Cosmos.Prometheus
#endif

module private Impl =

Expand Down Expand Up @@ -53,7 +57,11 @@ module private Histogram =
let latency stat desc = sHistogram (Impl.baseName stat + "_seconds") (Impl.baseDesc desc + " latency")
let charge stat desc = ruHistogram (Impl.baseName stat + "_ru") (Impl.baseDesc desc + " charge")

#if COSMOSSTORE
open Propulsion.CosmosStore.Log
#else
open Propulsion.Cosmos.Log
#endif

module private Stats =

Expand Down
17 changes: 17 additions & 0 deletions src/Propulsion.CosmosStore/Propulsion.CosmosStore.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,31 @@
<PropertyGroup>
<TargetFramework>netstandard2.1</TargetFramework>
<WarningLevel>5</WarningLevel>
<DefineConstants>COSMOSSTORE</DefineConstants>
<IsTestProject>false</IsTestProject>
<DisableImplicitFSharpCoreReference>true</DisableImplicitFSharpCoreReference>
<DisableImplicitSystemValueTupleReference>true</DisableImplicitSystemValueTupleReference>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
</PropertyGroup>

<ItemGroup>
<Compile Include="..\Propulsion.Cosmos\EquinoxCosmosParser.fs">
<Link>EquinoxCosmosParser.fs</Link>
</Compile>
<Compile Include="..\Propulsion.Cosmos\Infrastructure.fs">
<Link>Infrastructure.fs</Link>
</Compile>
<Compile Include="..\Propulsion.Cosmos\ChangeFeedProcessor.fs">
<Link>ChangeFeedProcessor.fs</Link>
</Compile>
<Compile Include="..\Propulsion.Cosmos\CosmosSource.fs">
<Link>CosmosSource.fs</Link>
</Compile>
<Compile Include="CosmosStoreSink.fs" />
<Compile Include="CosmosStorePruner.fs" />
<Compile Include="..\Propulsion.Cosmos\PropulsionCosmosPrometheus.fs">
<Link>PropulsionCosmosPrometheus.fs</Link>
</Compile>
</ItemGroup>

<ItemGroup>
Expand All @@ -21,6 +37,7 @@
<PackageReference Include="FSharp.Core" Version="4.3.4" />

<PackageReference Include="Equinox.CosmosStore" Version="3.0.0-beta.2" />
<PackageReference Include="Microsoft.Azure.DocumentDB.ChangeFeedProcessor" Version="2.3.2" />
</ItemGroup>

<ItemGroup>
Expand Down