Skip to content

Commit

Permalink
Renovated snapshots example to the latest ESDB client syntax
Browse files Browse the repository at this point in the history
  • Loading branch information
oskardudycz committed Jan 4, 2024
1 parent ffef134 commit d7662bb
Show file tree
Hide file tree
Showing 21 changed files with 167 additions and 128 deletions.
4 changes: 2 additions & 2 deletions samples/decider/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ services:
#######################################################
# EventStoreDB
#######################################################
eventstoredb:
image: eventstore/eventstore:22.10.0-buster-slim
eventstore.db:
image: eventstore/eventstore:23.10.0-bookworm-slim
# use this image if you're running ARM-based proc like Apple M1
# image: eventstore/eventstore:23.10.0-alpha-arm64v8
environment:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ const EVENTSTOREDB_PORT = 2113;
const EVENTSTOREDB_TCP_PORT = 1113;
const EVENTSTOREDB_TCP_PORTS = [EVENTSTOREDB_TCP_PORT, EVENTSTOREDB_PORT];
const EVENTSTOREDB_IMAGE_NAME = 'eventstore/eventstore';
const EVENTSTOREDB_IMAGE_TAG = '22.10.1-buster-slim';
const EVENTSTOREDB_IMAGE_TAG = '23.10.0-bookworm-slim';

export class EventStoreDBContainer extends GenericContainer {
private readonly tcpPorts = EVENTSTOREDB_TCP_PORTS;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ const EVENTSTOREDB_PORT = 2113;
const EVENTSTOREDB_TCP_PORT = 1113;
const EVENTSTOREDB_TCP_PORTS = [EVENTSTOREDB_TCP_PORT, EVENTSTOREDB_PORT];
const EVENTSTOREDB_IMAGE_NAME = 'eventstore/eventstore';
const EVENTSTOREDB_IMAGE_TAG = '22.10.1-buster-slim';
const EVENTSTOREDB_IMAGE_TAG = '23.10.0-bookworm-slim';

export class EventStoreDBContainer extends GenericContainer {
private readonly tcpPorts = EVENTSTOREDB_TCP_PORTS;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ describe('EndShift command', () => {
const streamName = getCashRegisterStreamName(cashRegisterId);

beforeAll(async () => {
esdbContainer = await new EventStoreDBContainer().startContainer();
esdbContainer = await new EventStoreDBContainer().start();
config.eventStoreDB.connectionString = esdbContainer.getConnectionString();

eventStore = esdbContainer.getClient();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ describe('DELETE /cash-registers/:id/shifts', () => {
let esdbContainer: StartedEventStoreDBContainer;

beforeAll(async () => {
esdbContainer = await new EventStoreDBContainer().startContainer();
esdbContainer = await new EventStoreDBContainer().start();
config.eventStoreDB.connectionString = esdbContainer.getConnectionString();
console.log(config.eventStoreDB.connectionString);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ describe('PlaceAtWorkStation command', () => {
let eventStore: EventStoreDBClient;

beforeAll(async () => {
esdbContainer = await new EventStoreDBContainer().startContainer();
esdbContainer = await new EventStoreDBContainer().start();
config.eventStoreDB.connectionString = esdbContainer.getConnectionString();

eventStore = esdbContainer.getClient();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ describe('POST /cash-register/', () => {
let esdbContainer: StartedEventStoreDBContainer;

beforeAll(async () => {
esdbContainer = await new EventStoreDBContainer().startContainer();
esdbContainer = await new EventStoreDBContainer().start();
config.eventStoreDB.connectionString = esdbContainer.getConnectionString();
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ describe('POST /cash-registers/:id/transactions', () => {
let esdbContainer: StartedEventStoreDBContainer;

beforeAll(async () => {
esdbContainer = await new EventStoreDBContainer().startContainer();
esdbContainer = await new EventStoreDBContainer().start();
config.eventStoreDB.connectionString = esdbContainer.getConnectionString();
console.log(config.eventStoreDB.connectionString);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ describe('EndShift command', () => {
const streamName = getCashRegisterStreamName(cashRegisterId);

beforeAll(async () => {
esdbContainer = await new EventStoreDBContainer().startContainer();
esdbContainer = await new EventStoreDBContainer().start();
config.eventStoreDB.connectionString = esdbContainer.getConnectionString();

eventStore = esdbContainer.getClient();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ describe('POST /cash-registers/:id/shifts', () => {
let esdbContainer: StartedEventStoreDBContainer;

beforeAll(async () => {
esdbContainer = await new EventStoreDBContainer().startContainer();
esdbContainer = await new EventStoreDBContainer().start();
config.eventStoreDB.connectionString = esdbContainer.getConnectionString();
console.log(config.eventStoreDB.connectionString);
});
Expand Down
72 changes: 38 additions & 34 deletions samples/snapshots/src/cashRegisters/testFlow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {
Position,
ResolvedEvent,
START,
StreamNotFoundError,
} from '@eventstore/db-client';
import {
AppendToStreamOptions,
Expand All @@ -22,21 +23,21 @@ import {

export type Event<
EventType extends string = string,
EventData extends object = object,
EventMetadata extends object = object,
> = {
type: EventType;
data: EventData;
metadata?: EventMetadata;
};

type SnapshotMetadata = {
snapshottedStreamRevision: string;
};
EventData extends Record<string, unknown> = Record<string, unknown>,
EventMetadata extends Record<string, unknown> = Record<string, unknown>,
> = Readonly<{
type: Readonly<EventType>;
data: Readonly<EventData>;
metadata?: Readonly<EventMetadata>;
}>;

type SnapshotMetadata = Readonly<{
snapshottedStreamVersion: string;
}>;

type SnapshotEvent<
EventType extends string = string,
EventData extends object = object,
EventData extends Record<string, unknown> = Record<string, unknown>,
EventMetadata extends SnapshotMetadata = SnapshotMetadata,
> = Event<EventType, EventData, EventMetadata> & {
metadata: Readonly<EventMetadata>;
Expand All @@ -51,8 +52,8 @@ export type Command<
CommandData extends Record<string, unknown> = Record<string, unknown>,
CommandMetadata extends Record<string, unknown> = Record<string, unknown>,
> = {
readonly type: CommandType;
readonly data: CommandData;
type: Readonly<CommandType>;
data: Readonly<CommandData>;
metadata?: Readonly<CommandMetadata>;
};

Expand All @@ -67,22 +68,26 @@ async function readFromStream<StreamEvent extends Event>(
streamName: string,
options?: ReadStreamOptions,
): Promise<StreamEvent[] | STREAM_NOT_FOUND> {
const events = [];
try {
const events = await eventStore.readStream(streamName, options);

return events
.filter((resolvedEvent) => !!resolvedEvent.event)
.map((resolvedEvent) => {
return <StreamEvent>{
type: resolvedEvent.event!.type,
data: resolvedEvent.event!.data,
metadata: resolvedEvent.event!.metadata,
};
for await (const { event } of eventStore.readStream<StreamEvent>(
streamName,
options,
)) {
if (!event) continue;

events.push(<StreamEvent>{
type: event.type,
data: event.data,
metadata: event.metadata,
});
}
return events;
} catch (error) {
if (error.type == ErrorType.STREAM_NOT_FOUND) {
if (error instanceof StreamNotFoundError) {
return 'STREAM_NOT_FOUND';
}

throw error;
}
}
Expand Down Expand Up @@ -126,7 +131,7 @@ async function readEventsFromExternalSnapshot<
const snapshot = await getLastSnapshot(streamName);

const lastSnapshotRevision = snapshot
? BigInt(snapshot.metadata.snapshottedStreamRevision)
? BigInt(snapshot.metadata.snapshottedStreamVersion)
: undefined;

const streamEvents = await readFromStream<StreamEvent>(
Expand Down Expand Up @@ -194,7 +199,7 @@ async function getLastSnapshotRevisionFromStreamMetadata(
);

return streamMetadata
? BigInt(streamMetadata.snapshottedStreamRevision)
? BigInt(streamMetadata.snapshottedStreamVersion)
: undefined;
}

Expand Down Expand Up @@ -274,7 +279,7 @@ async function appendEventAndExternalSnapshot<
return appendResult;
}

function appendSnapshotToSeparateStream<
async function appendSnapshotToSeparateStream<
SnapshotStreamEvent extends SnapshotEvent,
>(
eventStore: EventStoreDBClient,
Expand All @@ -288,7 +293,7 @@ function appendSnapshotToSeparateStream<
// set snapshot stream metadata $maxCount to 1.
// This will make sure that there is only one snapshot event.
if (lastSnapshotRevision === undefined) {
eventStore.setStreamMetadata(snapshotStreamName, { maxCount: 1 });
await eventStore.setStreamMetadata(snapshotStreamName, { maxCount: 1 });
}

return appendToStream(eventStore, snapshotStreamName, [snapshot]);
Expand Down Expand Up @@ -317,11 +322,10 @@ async function appendEventAndSnapshotToTheSameStream<
eventsToAppend,
);

const snapshottedStreamRevision =
appendResult.nextExpectedRevision.toString();
const snapshottedStreamVersion = appendResult.nextExpectedRevision.toString();

await eventStore.setStreamMetadata<SnapshotMetadata>(streamName, {
snapshottedStreamRevision,
snapshottedStreamVersion,
});

return appendResult;
Expand Down Expand Up @@ -497,7 +501,7 @@ function buildCashRegisterSnapshot(
return {
type: 'cash-register-snapshoted',
data: currentState,
metadata: { snapshottedStreamRevision: newStreamRevision.toString() },
metadata: { snapshottedStreamVersion: newStreamRevision.toString() },
};
}

Expand All @@ -521,7 +525,7 @@ function tryBuildCashRegisterSnapshotNoMetadata(
return {
type: 'cash-register-snapshoted',
data: currentState,
metadata: { snapshottedStreamRevision: undefined! },
metadata: { snapshottedStreamVersion: undefined! },
};
}

Expand Down
51 changes: 29 additions & 22 deletions samples/snapshots/src/core/eventStore/reading/readFromStream.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
import { ErrorType, EventStoreDBClient } from '@eventstore/db-client';
import {
ErrorType,
EventStoreDBClient,
StreamNotFoundError,
} from '@eventstore/db-client';
import { ReadStreamOptions } from '@eventstore/db-client/dist/streams';
import { STREAM_NOT_FOUND } from '.';

Expand All @@ -14,29 +18,32 @@ export async function readFromStream<StreamEvent extends Event>(
streamName: string,
options?: ReadFromStreamOptions,
): Promise<Result<StreamEvent[], STREAM_NOT_FOUND>> {
const events = [];

const toPosition = options?.toPosition;

try {
const events = await eventStore.readStream(streamName, options);

const toPosition = options?.toPosition;

return success(
events
.filter(
(resolvedEvent) =>
!!resolvedEvent.event &&
(toPosition === undefined ||
(resolvedEvent.commitPosition ?? 0) < toPosition),
)
.map((resolvedEvent) => {
return <StreamEvent>{
type: resolvedEvent.event!.type,
data: resolvedEvent.event!.data,
metadata: resolvedEvent.event?.metadata,
};
}),
);
for await (const { event } of eventStore.readStream<StreamEvent>(
streamName,
options,
)) {
if (!event) continue;

if (
toPosition != undefined &&
(event.position?.commit ?? 0) >= toPosition
)
break;

events.push(<StreamEvent>{
type: event.type,
data: event.data,
metadata: event.metadata,
});
}
return success(events);
} catch (error) {
if (error.type == ErrorType.STREAM_NOT_FOUND) {
if (error instanceof StreamNotFoundError) {
return failure('STREAM_NOT_FOUND');
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
import { ErrorType, EventStoreDBClient } from '@eventstore/db-client';
import {
ErrorType,
EventStoreDBClient,
StreamNotFoundError,
} from '@eventstore/db-client';
import { GetStreamMetadataOptions } from '@eventstore/db-client/dist/streams';
import { STREAM_NOT_FOUND, METADATA_NOT_FOUND } from './';
import { failure, Result, success } from '../../primitives';
Expand All @@ -20,7 +24,7 @@ export async function readStreamMetadata<

return success(result.metadata);
} catch (error) {
if (error.type == ErrorType.STREAM_NOT_FOUND) {
if (error instanceof StreamNotFoundError) {
return failure('STREAM_NOT_FOUND');
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import { Event } from '../../events';
export type SnapshotMetadata = {

export type SnapshotMetadata = Readonly<{
snapshottedStreamVersion: string;
};
}>;

export type SnapshotEvent<
EventType extends string = string,
EventData extends Record<string, unknown> = Record<string, unknown>,
EventMetadata extends SnapshotMetadata &
Record<string, unknown> = SnapshotMetadata & Record<string, unknown>,
EventMetadata extends SnapshotMetadata = SnapshotMetadata,
> = Event<EventType, EventData, EventMetadata> &
Readonly<{
metadata: Readonly<EventMetadata>;
metadata: EventMetadata;
}>;
12 changes: 10 additions & 2 deletions samples/snapshots/src/core/events/event.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,14 @@ export function isEvent<
EventType extends string = string,
EventData extends Record<string, unknown> = Record<string, unknown>,
EventMetadata extends Record<string, unknown> = Record<string, unknown>,
>(event: any): event is Event<EventType, EventData, EventMetadata> {
return typeof event.type !== 'undefined' && typeof event.data !== 'undefined';
>(event: unknown): event is Event<EventType, EventData, EventMetadata> {
return (
typeof event === 'object' &&
event !== null &&
'type' in event &&
typeof event.type === 'string' &&
'data' in event &&
typeof event.data === 'object' &&
event.data !== null
);
}
Loading

0 comments on commit d7662bb

Please sign in to comment.