diff --git a/PROTOCOL.md b/PROTOCOL.md index c21c8c45..b3c8f83c 100644 --- a/PROTOCOL.md +++ b/PROTOCOL.md @@ -195,11 +195,19 @@ _The client and the server has already gone through [successful connection initi 1. _Client_ generates a unique ID for the following operation 1. _Client_ dispatches the `Subscribe` message with the generated ID through the `id` field and the requested operation passed through the `payload` field
_All future communication is linked through this unique ID_ -1. _Server_ triggers the `onSubscribe` callback, if specified, and uses the returned `ExecutionArgs` for the operation -1. _Server_ validates the request and executes the single result GraphQL operation -1. _Server_ dispatches the `Next` message with the execution result +1. _Server_ triggers the `onSubscribe` callback + + - If `ExecutionArgs` are **not** returned, the arguments will be formed and validated using the payload + - If `ExecutionArgs` are returned, they will be used directly + +1. _Server_ executes the single result GraphQL operation using the arguments provided above +1. _Server_ triggers the `onNext` callback + + - If `ExecutionResult` is **not** returned, the direct result from the operation will be dispatched with the `Next` message + - If `ExecutionResult` is returned, it will be dispatched with the `Next` message + +1. _Server_ triggers the `onComplete` callback 1. _Server_ dispatches the `Complete` message indicating that the execution has completed -1. _Server_ triggers the `onComplete` callback, if specified ### Streaming operation @@ -208,14 +216,24 @@ _The client and the server has already gone through [successful connection initi _The client and the server has already gone through [successful connection initialisation](#successful-connection-initialisation)._ 1. _Client_ generates a unique ID for the following operation -1. _Client_ dispatches the `Subscribe` message with the generated ID through the `id` field and the requested streaming operation passed through the `payload` field +1. _Client_ dispatches the `Subscribe` message with the generated ID through the `id` field and the requested operation passed through the `payload` field
_All future communication is linked through this unique ID_ -1. _Server_ triggers the `onSubscribe` callback, if specified, and uses the returned `ExecutionArgs` for the operation -1. _Server_ validates the request and executes the streaming GraphQL operation +1. _Server_ triggers the `onSubscribe` callback + + - If `ExecutionArgs` are **not** returned, the arguments will be formed and validated using the payload + - If `ExecutionArgs` are returned, they will be used directly + +1. _Server_ executes the streaming GraphQL operation using the arguments provided above 1. _Server_ checks if the generated ID is unique across active streaming subscriptions + - If **not** unique, the _server_ will close the socket with the event `4409: Subscriber for already exists` - If unique, continue... -1. _Server_ dispatches `Next` messages for every event in the source stream + +1. _Server_ triggers the `onNext` callback + + - If `ExecutionResult` is **not** returned, the direct events from the source stream will be dispatched with the `Next` message + - If `ExecutionResult` is returned, it will be dispatched with the `Next` message instead of every event from the source stram + 1. - _Client_ stops the subscription by dispatching a `Complete` message - _Server_ completes the source stream
_or_ diff --git a/README.md b/README.md index 398e2aef..d538a128 100644 --- a/README.md +++ b/README.md @@ -400,29 +400,20 @@ const server = https.createServer(function weServeSocketsOnly(_, res) { createServer( { schema, - execute: async (args) => { - console.log('Execute', args); - const result = await execute(args); - console.debug('Execute result', result); - return result; - }, - subscribe: async (args) => { - console.log('Subscribe', args); - const subscription = await subscribe(args); - // NOTE: `subscribe` can sometimes return a single result, I dont consider it here for sake of simplicity - return (async function* () { - for await (const result of subscription) { - console.debug('Subscribe yielded result', { args, result }); - yield result; - } - })(); - }, onConnect: (ctx) => { console.log('Connect', ctx); - return true; // default behaviour - permit all connection attempts + }, + onSubscribe: (ctx, msg) => { + console.log('Subscribe', { ctx, msg }); + }, + onNext: (ctx, msg, args, result) => { + console.debug('Next', { ctx, msg, args, result }); + }, + onError: (ctx, msg, errors) => { + console.error('Error', { ctx, msg, errors }); }, onComplete: (ctx, msg) => { - console.debug('Complete', { ctx, msg }); + console.log('Complete', { ctx, msg }); }, }, { @@ -498,25 +489,58 @@ server.listen(443);
-Server usage with a custom GraphQL context +Server usage with custom static GraphQL arguments ```typescript -import { execute, subscribe } from 'graphql'; +import { validate, execute, subscribe } from 'graphql'; import { createServer } from 'graphql-transport-ws'; -import { schema } from 'my-graphql-schema'; +import { schema, roots, getStaticContext } from 'my-graphql'; createServer( { + context: getStaticContext(), schema, + roots, execute, subscribe, - onSubscribe: (ctx, msg, args) => { - return [ - { - ...args, - contextValue: getCustomContext(ctx, msg, args), - }, - ]; + }, + { + server, + path: '/graphql', + }, +); +``` + +
+ +
+Server usage with custom dynamic GraphQL arguments and validation + +```typescript +import { parse, validate, execute, subscribe } from 'graphql'; +import { createServer } from 'graphql-transport-ws'; +import { schema, getDynamicContext, myValidationRules } from 'my-graphql'; + +createServer( + { + execute, + subscribe, + onSubscribe: (ctx, msg) => { + const args = { + schema, + contextValue: getDynamicContext(ctx, msg), + operationName: msg.payload.operationName, + document: parse(msg.payload.operationName), + variableValues: msg.payload.variables, + }; + + // dont forget to validate when returning custom execution args! + const errors = validate(args.schema, args.document, myValidationRules); + if (errors.length > 0) { + return errors; // return `GraphQLError[]` to send `ErrorMessage` and stop subscription + } + + return args; }, }, { @@ -528,6 +552,88 @@ createServer(
+
+Server and client usage with persisted queries + +```typescript +// 🛸 server + +import { parse, execute, subscribe } from 'graphql'; +import { createServer } from 'graphql-transport-ws'; +import { schema } from 'my-graphql-schema'; + +type QueryID = string; + +const queriesStore: Record = { + iWantTheGreetings: { + schema, // you may even provide different schemas in the queries store + document: parse('subscription Greetings { greetings }'), + }, +}; + +createServer( + { + execute, + subscribe, + onSubscribe: (_ctx, msg) => { + // search using `SubscriptionPayload.query` as QueryID + // check the client example below for better understanding + const hit = queriesStore[msg.payload.query]; + if (hit) { + return { + ...hit, + variableValues: msg.payload.variables, // use the variables from the client + }; + } + // if no hit, execute as usual + return { + schema, + operationName: msg.payload.operationName, + document: parse(msg.payload.operationName), + variableValues: msg.payload.variables, + }; + }, + }, + { + server, + path: '/graphql', + }, +); +``` + +```typescript +// 📺 client + +import { createClient } from 'graphql-transport-ws'; + +const client = createClient({ + url: 'wss://persisted.graphql/queries', +}); + +(async () => { + const onNext = () => { + /**/ + }; + + await new Promise((resolve, reject) => { + client.subscribe( + { + query: 'iWantTheGreetings', + }, + { + next: onNext, + error: reject, + complete: resolve, + }, + ); + }); + + expect(onNext).toBeCalledTimes(5); // greetings in 5 languages +})(); +``` + +
+ ## [Documentation](docs/) Check the [docs folder](docs/) out for [TypeDoc](https://typedoc.org) generated documentation. diff --git a/docs/interfaces/_server_.serveroptions.md b/docs/interfaces/_server_.serveroptions.md index 73d3449d..77e3b075 100644 --- a/docs/interfaces/_server_.serveroptions.md +++ b/docs/interfaces/_server_.serveroptions.md @@ -15,15 +15,16 @@ * [connectionInitWaitTimeout](_server_.serveroptions.md#connectioninitwaittimeout) * [context](_server_.serveroptions.md#context) * [execute](_server_.serveroptions.md#execute) -* [formatExecutionResult](_server_.serveroptions.md#formatexecutionresult) * [keepAlive](_server_.serveroptions.md#keepalive) * [onComplete](_server_.serveroptions.md#oncomplete) * [onConnect](_server_.serveroptions.md#onconnect) +* [onError](_server_.serveroptions.md#onerror) +* [onNext](_server_.serveroptions.md#onnext) +* [onOperation](_server_.serveroptions.md#onoperation) * [onSubscribe](_server_.serveroptions.md#onsubscribe) * [roots](_server_.serveroptions.md#roots) * [schema](_server_.serveroptions.md#schema) * [subscribe](_server_.serveroptions.md#subscribe) -* [validationRules](_server_.serveroptions.md#validationrules) ## Properties @@ -31,8 +32,6 @@ • `Optional` **connectionInitWaitTimeout**: undefined \| number -**`default`** 3 * 1000 (3 seconds) - The amount of time for which the server will wait for `ConnectionInit` message. @@ -43,37 +42,30 @@ has not sent the `ConnectionInit` message, the server will terminate the socket by dispatching a close event `4408: Connection initialisation timeout` +**`default`** 3 * 1000 (3 seconds) + ___ ### context -• `Optional` **context**: SubscriptionArgs[\"contextValue\"] +• `Optional` **context**: unknown A value which is provided to every resolver and holds important contextual information like the currently logged in user, or access to a database. -Related operation context value will be injected to the -`ExecutionArgs` BEFORE the `onSubscribe` callback. + +If you return from the `onSubscribe` callback, this +context value will NOT be injected. You should add it +in the returned `ExecutionArgs` from the callback. ___ ### execute -• **execute**: (args: ExecutionArgs) => Promise\ \| ExecutionResult> \| AsyncIterableIterator\ \| ExecutionResult +• **execute**: (args: ExecutionArgs) => [OperationResult](../modules/_server_.md#operationresult) Is the `execute` function from GraphQL which is -used to execute the query/mutation operation. - -___ - -### formatExecutionResult - -• `Optional` **formatExecutionResult**: [ExecutionResultFormatter](../modules/_server_.md#executionresultformatter) - -Format the operation execution results -if the implementation requires an adjusted -result. This formatter is run BEFORE the -`onConnect` scoped formatter. +used to execute the query and mutation operations. ___ @@ -92,17 +84,17 @@ ___ ### onComplete -• `Optional` **onComplete**: undefined \| (ctx: [Context](_server_.context.md), message: [CompleteMessage](_message_.completemessage.md)) => void +• `Optional` **onComplete**: undefined \| (ctx: [Context](_server_.context.md), message: [CompleteMessage](_message_.completemessage.md)) => Promise\ \| void The complete callback is executed after the -operation has completed or the subscription -has been closed. +operation has completed right before sending +the complete message to the client. ___ ### onConnect -• `Optional` **onConnect**: undefined \| (ctx: [Context](_server_.context.md)) => Promise\ \| boolean +• `Optional` **onConnect**: undefined \| (ctx: [Context](_server_.context.md)) => Promise\ \| boolean \| void Is the connection callback called when the client requests the connection initialisation @@ -110,7 +102,7 @@ through the message `ConnectionInit`. The message payload (`connectionParams` on the client) is present in the `Context.connectionParams`. -- Returning `true` from the callback will +- Returning `true` or nothing from the callback will allow the client to connect. - Returning `false` from the callback will @@ -125,17 +117,79 @@ thrown `Error`. ___ +### onError + +• `Optional` **onError**: undefined \| (ctx: [Context](_server_.context.md), message: [ErrorMessage](_message_.errormessage.md), errors: readonly GraphQLError[]) => Promise\ \| readonly GraphQLError[] \| void + +Executed after an error occured right before it +has been dispatched to the client. + +Use this callback to format the outgoing GraphQL +errors before they reach the client. + +Returned result will be injected in the error message payload. + +___ + +### onNext + +• `Optional` **onNext**: undefined \| (ctx: [Context](_server_.context.md), message: [NextMessage](_message_.nextmessage.md), args: ExecutionArgs, result: ExecutionResult) => Promise\ \| ExecutionResult \| void + +Executed after an operation has emitted a result right before +that result has been sent to the client. Results from both +single value and streaming operations will appear in this callback. + +Use this callback if you want to format the execution result +before it reaches the client. + +Returned result will be injected in the next message payload. + +___ + +### onOperation + +• `Optional` **onOperation**: undefined \| (ctx: [Context](_server_.context.md), message: [SubscribeMessage](_message_.subscribemessage.md), args: ExecutionArgs, result: [OperationResult](../modules/_server_.md#operationresult)) => Promise\<[OperationResult](../modules/_server_.md#operationresult) \| void> \| [OperationResult](../modules/_server_.md#operationresult) \| void + +Executed after the operation call resolves. For streaming +operations, triggering this callback does not necessarely +mean that there is already a result available - it means +that the subscription process for the stream has resolved +and that the client is now subscribed. + +The `OperationResult` argument is the result of operation +execution. It can be an iterator or already value. + +If you want the single result and the events from a streaming +operation, use the `onNext` callback. + +Use this callback to listen for subscribe operation and +execution result manipulation. + +___ + ### onSubscribe -• `Optional` **onSubscribe**: undefined \| (ctx: [Context](_server_.context.md), message: [SubscribeMessage](_message_.subscribemessage.md), args: Optional\) => Promise\<[ExecutionArgs, undefined \| [ExecutionResultFormatter](../modules/_server_.md#executionresultformatter)]> \| [ExecutionArgs, undefined \| [ExecutionResultFormatter](../modules/_server_.md#executionresultformatter)] +• `Optional` **onSubscribe**: undefined \| (ctx: [Context](_server_.context.md), message: [SubscribeMessage](_message_.subscribemessage.md)) => Promise\ \| ExecutionArgs \| readonly GraphQLError[] \| void -The subscribe callback executed before -the actual operation execution. Useful -for manipulating the execution arguments -before the doing the operation. As a second -item in the array, you can pass in a scoped -execution result formatter. This formatter -is run AFTER the root `formatExecutionResult`. +The subscribe callback executed right after +acknowledging the request before any payload +processing has been performed. + +If you return `ExecutionArgs` from the callback, +it will be used instead of trying to build one +internally. In this case, you are responsible +for providing a ready set of arguments which will +be directly plugged in the operation execution. + +To report GraphQL errors simply return an array +of them from the callback, they will be reported +to the client through the error message. + +Useful for preparing the execution arguments +following a custom logic. A typical use case are +persisted queries, you can identify the query from +the subscribe message and create the GraphQL operation +execution args which are then returned by the function. ___ @@ -146,8 +200,10 @@ ___ The GraphQL root fields or resolvers to go alongside the schema. Learn more about them here: https://graphql.org/learn/execution/#root-fields-resolvers. -Related operation root value will be injected to the -`ExecutionArgs` BEFORE the `onSubscribe` callback. + +If you return from the `onSubscribe` callback, the +root field value will NOT be injected. You should add it +in the returned `ExecutionArgs` from the callback. ___ @@ -156,25 +212,17 @@ ___ • `Optional` **schema**: GraphQLSchema The GraphQL schema on which the operations -will be executed and validated against. If -the schema is left undefined, one must be -provided by in the resulting `ExecutionArgs` -from the `onSubscribe` callback. +will be executed and validated against. + +If the schema is left undefined, you're trusted to +provide one in the returned `ExecutionArgs` from the +`onSubscribe` callback. ___ ### subscribe -• **subscribe**: (args: ExecutionArgs) => Promise\ \| ExecutionResult> +• **subscribe**: (args: ExecutionArgs) => [OperationResult](../modules/_server_.md#operationresult) Is the `subscribe` function from GraphQL which is used to execute the subscription operation. - -___ - -### validationRules - -• `Optional` **validationRules**: readonly ValidationRule[] - -Custom validation rules overriding all -validation rules defined by the GraphQL spec. diff --git a/docs/modules/_message_.md b/docs/modules/_message_.md index c6178acf..728ca393 100644 --- a/docs/modules/_message_.md +++ b/docs/modules/_message_.md @@ -24,6 +24,12 @@ * [Message](_message_.md#message) +### Functions + +* [isMessage](_message_.md#ismessage) +* [parseMessage](_message_.md#parsemessage) +* [stringifyMessage](_message_.md#stringifymessage) + ## Type aliases ### Message @@ -35,3 +41,57 @@ Name | Type | Default | ------ | ------ | ------ | `T` | [MessageType](../enums/_message_.messagetype.md) | MessageType | + +## Functions + +### isMessage + +▸ **isMessage**(`val`: unknown): val is Message + +Checks if the provided value is a message. + +#### Parameters: + +Name | Type | +------ | ------ | +`val` | unknown | + +**Returns:** val is Message + +___ + +### parseMessage + +▸ **parseMessage**(`data`: unknown): [Message](_message_.md#message) + +Parses the raw websocket message data to a valid message. + +#### Parameters: + +Name | Type | +------ | ------ | +`data` | unknown | + +**Returns:** [Message](_message_.md#message) + +___ + +### stringifyMessage + +▸ **stringifyMessage**\(`msg`: [Message](_message_.md#message)\): string + +Stringifies a valid message ready to be sent through the socket. + +#### Type parameters: + +Name | Type | +------ | ------ | +`T` | [MessageType](../enums/_message_.messagetype.md) | + +#### Parameters: + +Name | Type | +------ | ------ | +`msg` | [Message](_message_.md#message)\ | + +**Returns:** string diff --git a/docs/modules/_server_.md b/docs/modules/_server_.md index f0525037..96f722e5 100644 --- a/docs/modules/_server_.md +++ b/docs/modules/_server_.md @@ -14,7 +14,7 @@ ### Type aliases -* [ExecutionResultFormatter](_server_.md#executionresultformatter) +* [OperationResult](_server_.md#operationresult) ### Functions @@ -22,9 +22,9 @@ ## Type aliases -### ExecutionResultFormatter +### OperationResult -Ƭ **ExecutionResultFormatter**: (ctx: [Context](../interfaces/_server_.context.md), result: ExecutionResult) => Promise\ \| ExecutionResult +Ƭ **OperationResult**: Promise\ \| ExecutionResult> \| AsyncIterableIterator\ \| ExecutionResult ## Functions diff --git a/src/message.ts b/src/message.ts index 8249a5e7..b8b45c57 100644 --- a/src/message.ts +++ b/src/message.ts @@ -7,10 +7,10 @@ import { GraphQLError, ExecutionResult, DocumentNode } from 'graphql'; import { isObject, + areGraphQLErrors, hasOwnProperty, hasOwnObjectProperty, hasOwnStringProperty, - hasOwnArrayProperty, } from './utils'; /** Types of messages allowed to be sent by the client/server over the WS protocol. */ @@ -78,11 +78,11 @@ export type Message< ? CompleteMessage : never; -/** @ignore */ +/** Checks if the provided value is a message. */ export function isMessage(val: unknown): val is Message { if (isObject(val)) { // all messages must have the `type` prop - if (!hasOwnProperty(val, 'type')) { + if (!hasOwnStringProperty(val, 'type')) { return false; } // validate other properties depending on the `type` @@ -102,7 +102,7 @@ export function isMessage(val: unknown): val is Message { hasOwnObjectProperty(val, 'payload') && (!hasOwnProperty(val.payload, 'operationName') || hasOwnStringProperty(val.payload, 'operationName')) && - (hasOwnStringProperty(val.payload, 'query') || // string query + (hasOwnStringProperty(val.payload, 'query') || // string query or persisted query id hasOwnObjectProperty(val.payload, 'query')) && // document node query (!hasOwnProperty(val.payload, 'variables') || hasOwnObjectProperty(val.payload, 'variables')) @@ -116,12 +116,7 @@ export function isMessage(val: unknown): val is Message { hasOwnObjectProperty(val.payload, 'errors')) ); case MessageType.Error: - return ( - hasOwnStringProperty(val, 'id') && - // GraphQLError - hasOwnArrayProperty(val, 'payload') && - val.payload.length > 0 // must be at least one error - ); + return hasOwnStringProperty(val, 'id') && areGraphQLErrors(val.payload); case MessageType.Complete: return hasOwnStringProperty(val, 'id'); default: @@ -131,7 +126,7 @@ export function isMessage(val: unknown): val is Message { return false; } -/** @ignore */ +/** Parses the raw websocket message data to a valid message. */ export function parseMessage(data: unknown): Message { if (isMessage(data)) { return data; @@ -146,10 +141,7 @@ export function parseMessage(data: unknown): Message { return message; } -/** - * @ignore - * Helps stringifying a valid message ready to be sent through the socket. - */ +/** Stringifies a valid message ready to be sent through the socket. */ export function stringifyMessage( msg: Message, ): string { diff --git a/src/server.ts b/src/server.ts index 05bec429..4e69ceb7 100644 --- a/src/server.ts +++ b/src/server.ts @@ -9,62 +9,68 @@ import * as WebSocket from 'ws'; import { OperationTypeNode, GraphQLSchema, - ValidationRule, - ExecutionResult, ExecutionArgs, parse, validate, getOperationAST, GraphQLError, SubscriptionArgs, + ExecutionResult, } from 'graphql'; import { Disposable } from './types'; import { GRAPHQL_TRANSPORT_WS_PROTOCOL } from './protocol'; import { Message, MessageType, + stringifyMessage, parseMessage, SubscribeMessage, + NextMessage, + ErrorMessage, CompleteMessage, - stringifyMessage, } from './message'; import { - Optional, isObject, isAsyncIterable, hasOwnObjectProperty, hasOwnStringProperty, + areGraphQLErrors, } from './utils'; import { ID } from './types'; -export type ExecutionResultFormatter = ( - ctx: Context, - result: ExecutionResult, -) => Promise | ExecutionResult; +export type OperationResult = + | Promise | ExecutionResult> + | AsyncIterableIterator + | ExecutionResult; export interface ServerOptions { /** * The GraphQL schema on which the operations - * will be executed and validated against. If - * the schema is left undefined, one must be - * provided by in the resulting `ExecutionArgs` - * from the `onSubscribe` callback. + * will be executed and validated against. + * + * If the schema is left undefined, you're trusted to + * provide one in the returned `ExecutionArgs` from the + * `onSubscribe` callback. */ schema?: GraphQLSchema; /** * A value which is provided to every resolver and holds * important contextual information like the currently * logged in user, or access to a database. - * Related operation context value will be injected to the - * `ExecutionArgs` BEFORE the `onSubscribe` callback. + * + * If you return from the `onSubscribe` callback, this + * context value will NOT be injected. You should add it + * in the returned `ExecutionArgs` from the callback. */ - context?: SubscriptionArgs['contextValue']; + context?: unknown; /** * The GraphQL root fields or resolvers to go * alongside the schema. Learn more about them * here: https://graphql.org/learn/execution/#root-fields-resolvers. - * Related operation root value will be injected to the - * `ExecutionArgs` BEFORE the `onSubscribe` callback. + * + * If you return from the `onSubscribe` callback, the + * root field value will NOT be injected. You should add it + * in the returned `ExecutionArgs` from the callback. */ roots?: { [operation in OperationTypeNode]?: Record< @@ -74,21 +80,37 @@ export interface ServerOptions { }; /** * Is the `execute` function from GraphQL which is - * used to execute the query/mutation operation. + * used to execute the query and mutation operations. */ - execute: ( - args: ExecutionArgs, - ) => - | Promise | ExecutionResult> - | AsyncIterableIterator - | ExecutionResult; + execute: (args: ExecutionArgs) => OperationResult; /** * Is the `subscribe` function from GraphQL which is * used to execute the subscription operation. */ - subscribe: ( - args: ExecutionArgs, - ) => Promise | ExecutionResult>; + subscribe: (args: ExecutionArgs) => OperationResult; + /** + * The amount of time for which the + * server will wait for `ConnectionInit` message. + * + * Set the value to `Infinity`, '', 0, null or undefined to skip waiting. + * + * If the wait timeout has passed and the client + * has not sent the `ConnectionInit` message, + * the server will terminate the socket by + * dispatching a close event `4408: Connection initialisation timeout` + * + * @default 3 * 1000 (3 seconds) + */ + connectionInitWaitTimeout?: number; + /** + * The timout between dispatched keep-alive messages. Internally the lib + * uses the [WebSocket Ping and Pongs]((https://developer.mozilla.org/en-US/docs/Web/API/WebSockets_API/Writing_WebSocket_servers#Pings_and_Pongs_The_Heartbeat_of_WebSockets)) to check that the link between + * the clients and the server is operating and to prevent the link from being broken due to idling. + * Set to nullish value to disable. + * + * @default 12 * 1000 (12 seconds) + */ + keepAlive?: number; /** * Is the connection callback called when the * client requests the connection initialisation @@ -96,7 +118,7 @@ export interface ServerOptions { * payload (`connectionParams` on the client) is * present in the `Context.connectionParams`. * - * - Returning `true` from the callback will + * - Returning `true` or nothing from the callback will * allow the client to connect. * * - Returning `false` from the callback will @@ -109,64 +131,94 @@ export interface ServerOptions { * the `` is the message of the * thrown `Error`. */ - onConnect?: (ctx: Context) => Promise | boolean; + onConnect?: (ctx: Context) => Promise | boolean | void; /** - * @default 3 * 1000 (3 seconds) + * The subscribe callback executed right after + * acknowledging the request before any payload + * processing has been performed. * - * The amount of time for which the - * server will wait for `ConnectionInit` message. + * If you return `ExecutionArgs` from the callback, + * it will be used instead of trying to build one + * internally. In this case, you are responsible + * for providing a ready set of arguments which will + * be directly plugged in the operation execution. * - * Set the value to `Infinity`, '', 0, null or undefined to skip waiting. + * To report GraphQL errors simply return an array + * of them from the callback, they will be reported + * to the client through the error message. * - * If the wait timeout has passed and the client - * has not sent the `ConnectionInit` message, - * the server will terminate the socket by - * dispatching a close event `4408: Connection initialisation timeout` + * Useful for preparing the execution arguments + * following a custom logic. A typical use case are + * persisted queries, you can identify the query from + * the subscribe message and create the GraphQL operation + * execution args which are then returned by the function. */ - connectionInitWaitTimeout?: number; + onSubscribe?: ( + ctx: Context, + message: SubscribeMessage, + ) => + | Promise + | ExecutionArgs + | readonly GraphQLError[] + | void; /** - * Custom validation rules overriding all - * validation rules defined by the GraphQL spec. + * Executed after the operation call resolves. For streaming + * operations, triggering this callback does not necessarely + * mean that there is already a result available - it means + * that the subscription process for the stream has resolved + * and that the client is now subscribed. + * + * The `OperationResult` argument is the result of operation + * execution. It can be an iterator or already value. + * + * If you want the single result and the events from a streaming + * operation, use the `onNext` callback. + * + * Use this callback to listen for subscribe operation and + * execution result manipulation. */ - validationRules?: readonly ValidationRule[]; + onOperation?: ( + ctx: Context, + message: SubscribeMessage, + args: ExecutionArgs, + result: OperationResult, + ) => Promise | OperationResult | void; /** - * Format the operation execution results - * if the implementation requires an adjusted - * result. This formatter is run BEFORE the - * `onConnect` scoped formatter. + * Executed after an error occured right before it + * has been dispatched to the client. + * + * Use this callback to format the outgoing GraphQL + * errors before they reach the client. + * + * Returned result will be injected in the error message payload. */ - formatExecutionResult?: ExecutionResultFormatter; + onError?: ( + ctx: Context, + message: ErrorMessage, + errors: readonly GraphQLError[], + ) => Promise | readonly GraphQLError[] | void; /** - * The subscribe callback executed before - * the actual operation execution. Useful - * for manipulating the execution arguments - * before the doing the operation. As a second - * item in the array, you can pass in a scoped - * execution result formatter. This formatter - * is run AFTER the root `formatExecutionResult`. + * Executed after an operation has emitted a result right before + * that result has been sent to the client. Results from both + * single value and streaming operations will appear in this callback. + * + * Use this callback if you want to format the execution result + * before it reaches the client. + * + * Returned result will be injected in the next message payload. */ - onSubscribe?: ( + onNext?: ( ctx: Context, - message: SubscribeMessage, - args: Optional, - ) => - | Promise<[ExecutionArgs, ExecutionResultFormatter?]> - | [ExecutionArgs, ExecutionResultFormatter?]; + message: NextMessage, + args: ExecutionArgs, + result: ExecutionResult, + ) => Promise | ExecutionResult | void; /** * The complete callback is executed after the - * operation has completed or the subscription - * has been closed. - */ - onComplete?: (ctx: Context, message: CompleteMessage) => void; - /** - * The timout between dispatched keep-alive messages. Internally the lib - * uses the [WebSocket Ping and Pongs]((https://developer.mozilla.org/en-US/docs/Web/API/WebSockets_API/Writing_WebSocket_servers#Pings_and_Pongs_The_Heartbeat_of_WebSockets)) to check that the link between - * the clients and the server is operating and to prevent the link from being broken due to idling. - * Set to nullish value to disable. - * - * @default 12 * 1000 (12 seconds) + * operation has completed right before sending + * the complete message to the client. */ - keepAlive?: number; + onComplete?: (ctx: Context, message: CompleteMessage) => Promise | void; } export interface Context { @@ -227,13 +279,14 @@ export function createServer( roots, execute, subscribe, - onConnect, connectionInitWaitTimeout = 3 * 1000, // 3 seconds - validationRules, - formatExecutionResult, + keepAlive = 12 * 1000, // 12 seconds + onConnect, onSubscribe, + onOperation, + onNext, + onError, onComplete, - keepAlive = 12 * 1000, // 12 seconds } = options; const webSocketServer = isWebSocketServer(websocketOptionsOrServer) ? websocketOptionsOrServer @@ -246,8 +299,7 @@ export function createServer( (Array.isArray(socket.protocol) && socket.protocol.indexOf(GRAPHQL_TRANSPORT_WS_PROTOCOL) === -1) ) { - socket.close(1002, 'Protocol Error'); - return; + return socket.close(1002, 'Protocol Error'); } const ctxRef: { current: Context } = { @@ -296,7 +348,6 @@ export function createServer( } }); - // issue a ping to the client socket.ping(); } }, keepAlive); @@ -341,33 +392,21 @@ export function createServer( }); // Sends through a message only if the socket is open. - function sendMessage( + async function sendMessage( ctx: Context, message: Message, - callback?: (err?: Error) => void, ) { - return new Promise((resolve, reject) => { - if (ctx.socket.readyState === WebSocket.OPEN) { - try { - ctx.socket.send(stringifyMessage(message), (err) => { - if (callback) callback(err); - if (err) { - return reject(err); - } - return resolve(); - }); - } catch (err) { - reject(err); - } - } else { - if (callback) callback(); - resolve(); - } - }); + if (ctx.socket.readyState === WebSocket.OPEN) { + return new Promise((resolve, reject) => { + ctx.socket.send(stringifyMessage(message), (err) => + err ? reject(err) : resolve(), + ); + }); + } } function makeOnMessage(ctx: Context) { - return async function (event: WebSocket.MessageEvent) { + return async function onMessage(event: WebSocket.MessageEvent) { try { const message = parseMessage(event.data); switch (message.type) { @@ -383,7 +422,7 @@ export function createServer( if (onConnect) { const permitted = await onConnect(ctx); - if (!permitted) { + if (permitted === false) { return ctx.socket.close(4403, 'Forbidden'); } } @@ -400,91 +439,131 @@ export function createServer( return ctx.socket.close(4401, 'Unauthorized'); } - const operation = message.payload; - const document = - typeof operation.query === 'string' - ? parse(operation.query) - : operation.query; - const operationAST = getOperationAST( - document, - operation.operationName, - ); - if (!operationAST) { - throw new Error('Unable to get operation AST'); - } - - let execArgsMaybeSchema: Optional = { - contextValue: context, - schema, - operationName: operation.operationName, - document, - variableValues: operation.variables, + const emit = { + next: async (result: ExecutionResult, args: ExecutionArgs) => { + let nextMessage: NextMessage = { + id: message.id, + type: MessageType.Next, + payload: result, + }; + if (onNext) { + const maybeResult = await onNext( + ctx, + nextMessage, + args, + result, + ); + if (maybeResult) { + nextMessage = { + ...nextMessage, + payload: maybeResult, + }; + } + } + await sendMessage(ctx, nextMessage); + }, + error: async (errors: readonly GraphQLError[]) => { + let errorMessage: ErrorMessage = { + id: message.id, + type: MessageType.Error, + payload: errors, + }; + if (onError) { + const maybeResult = await onError(ctx, errorMessage, errors); + if (maybeResult) { + errorMessage = { + ...errorMessage, + payload: maybeResult, + }; + } + } + await sendMessage(ctx, errorMessage); + }, + complete: async () => { + const completeMessage: CompleteMessage = { + id: message.id, + type: MessageType.Complete, + }; + await onComplete?.(ctx, completeMessage); + await sendMessage(ctx, completeMessage); + }, }; - // if roots are provided, inject the coresponding operation root - if (roots) { - execArgsMaybeSchema.rootValue = roots[operationAST.operation]; - } + let execArgs: ExecutionArgs; + const maybeExecArgsOrErrors = await onSubscribe?.(ctx, message); + if (maybeExecArgsOrErrors) { + if (areGraphQLErrors(maybeExecArgsOrErrors)) { + return await emit.error(maybeExecArgsOrErrors); + } + execArgs = maybeExecArgsOrErrors as ExecutionArgs; // because not graphql errors + } else { + if (!schema) { + // you either provide a schema dynamically through + // `onSubscribe` or you set one up during the server setup + return webSocketServer.emit( + 'error', + new Error('The GraphQL schema is not provided'), + ); + } - let onSubscribeFormatter: ExecutionResultFormatter | undefined; - if (onSubscribe) { - [execArgsMaybeSchema, onSubscribeFormatter] = await onSubscribe( - ctx, - message, - execArgsMaybeSchema, - ); - } - if (!execArgsMaybeSchema.schema) { - // not providing a schema is a fatal server error - return webSocketServer.emit( - 'error', - new Error('The GraphQL schema is not provided'), + const { operationName, query, variables } = message.payload; + const document = typeof query === 'string' ? parse(query) : query; + execArgs = { + contextValue: context, + schema, + operationName, + document, + variableValues: variables, + }; + + const validationErrors = validate( + execArgs.schema, + execArgs.document, ); + if (validationErrors.length > 0) { + return await emit.error(validationErrors); + } } - // the execution arguments should be complete now - const execArgs = execArgsMaybeSchema as ExecutionArgs; - - // validate - const validationErrors = validate( - execArgs.schema, + const operationAST = getOperationAST( execArgs.document, - validationRules, + execArgs.operationName, ); - if (validationErrors.length > 0) { - return await sendMessage(ctx, { - id: message.id, - type: MessageType.Error, - payload: validationErrors, - }); + if (!operationAST) { + return await emit.error([ + new GraphQLError('Unable to identify operation'), + ]); } - // formats the incoming result and emits it to the subscriber - const emitResult = async (result: ExecutionResult) => { - // use the root formater first - if (formatExecutionResult) { - result = await formatExecutionResult(ctx, result); - } - // then use the subscription specific formatter - if (onSubscribeFormatter) { - result = await onSubscribeFormatter(ctx, result); - } - await sendMessage(ctx, { - id: message.id, - type: MessageType.Next, - payload: result, - }); - }; + // if you've provided your own root through + // `onSubscribe`, prefer that over the root's root + if (!execArgs.rootValue) { + execArgs.rootValue = roots?.[operationAST.operation]; + } - // perform - let iterableOrResult; + // the execution arguments have been prepared + // perform the operation and act accordingly + let operationResult; if (operationAST.operation === 'subscription') { - iterableOrResult = await subscribe(execArgs); + operationResult = await subscribe(execArgs); } else { - // operationAST.operation === 'query' || 'mutation' - iterableOrResult = await execute(execArgs); + // operation === 'query' || 'mutation' + operationResult = await execute(execArgs); + } + + if (onOperation) { + const maybeResult = await onOperation( + ctx, + message, + execArgs, + operationResult, + ); + if (maybeResult) { + operationResult = maybeResult; + } } - if (isAsyncIterable(iterableOrResult)) { + + if (isAsyncIterable(operationResult)) { /** multiple emitted results */ // iterable subscriptions are distinct on ID @@ -494,57 +573,24 @@ export function createServer( `Subscriber for ${message.id} already exists`, ); } - ctx.subscriptions[message.id] = iterableOrResult; + ctx.subscriptions[message.id] = operationResult; - try { - for await (const result of iterableOrResult) { - await emitResult(result); - } - // source stream completed - const completeMessage: CompleteMessage = { - id: message.id, - type: MessageType.Complete, - }; - await sendMessage(ctx, completeMessage); - if (onComplete) { - onComplete(ctx, completeMessage); - } - } catch (err) { - await sendMessage(ctx, { - id: message.id, - type: MessageType.Error, - payload: [ - new GraphQLError( - err instanceof Error - ? err.message - : new Error(err).message, - ), - ], - }); - } finally { - delete ctx.subscriptions[message.id]; + // only case where this might fail is if the socket is broken + for await (const result of operationResult) { + await emit.next(result, execArgs); } + await emit.complete(); + delete ctx.subscriptions[message.id]; } else { /** single emitted result */ - await emitResult(iterableOrResult); - - // resolved - const completeMessage: CompleteMessage = { - id: message.id, - type: MessageType.Complete, - }; - await sendMessage(ctx, completeMessage); - if (onComplete) { - onComplete(ctx, completeMessage); - } + await emit.next(operationResult, execArgs); + await emit.complete(); } break; } case MessageType.Complete: { - if (ctx.subscriptions[message.id]) { - await ctx.subscriptions[message.id].return?.(); - } + await ctx.subscriptions[message.id]?.return?.(); break; } default: diff --git a/src/tests/client.ts b/src/tests/client.ts index 1e04716e..51ca9fb9 100644 --- a/src/tests/client.ts +++ b/src/tests/client.ts @@ -8,7 +8,7 @@ import { Server } from '../server'; import { createClient, EventListener } from '../client'; // Just does nothing -export function noop(): void { +function noop(): void { /**/ } diff --git a/src/tests/server.ts b/src/tests/server.ts index 7f4fbb33..19314842 100644 --- a/src/tests/server.ts +++ b/src/tests/server.ts @@ -1,7 +1,19 @@ import WebSocket from 'ws'; -import { parse, buildSchema, execute, subscribe } from 'graphql'; +import { + parse, + buildSchema, + execute, + subscribe, + GraphQLError, + ExecutionArgs, +} from 'graphql'; import { GRAPHQL_TRANSPORT_WS_PROTOCOL } from '../protocol'; -import { MessageType, parseMessage, stringifyMessage } from '../message'; +import { + MessageType, + parseMessage, + stringifyMessage, + SubscribePayload, +} from '../message'; import { startServer, url, schema, pong } from './fixtures/simple'; let forgottenDispose: (() => Promise) | undefined; @@ -367,7 +379,7 @@ describe('Connect', () => { }); }); - it('should acknowledge connection if not implemented or returning `true`', async () => { + it('should acknowledge connection if not implemented, returning `true` or nothing', async () => { async function test() { const client = await createTClient(); client.ws.send( @@ -381,18 +393,26 @@ describe('Connect', () => { } // no implementation - const [, dispose] = await makeServer(); + let [, dispose] = await makeServer(); await test(); - await dispose(); // returns true - await makeServer({ + [, dispose] = await makeServer({ onConnect: () => { return true; }, }); await test(); + await dispose(); + + // returns nothing + await makeServer({ + onConnect: () => { + /**/ + }, + }); + await test(); }); it('should pass in the `connectionParams` through the context and have other flags correctly set', async (done) => { @@ -575,16 +595,135 @@ describe('Subscribe', () => { }); }); - it('should pick up the schema from `onSubscribe`', async () => { + it('should close the socket with errors thrown from any callback', async () => { + const error = new Error('Stop'); + + // onConnect + let [, dispose] = await makeServer({ + onConnect: () => { + throw error; + }, + }); + const client = await createTClient(); + client.ws.send( + stringifyMessage({ + type: MessageType.ConnectionInit, + }), + ); + await client.waitForClose((event) => { + expect(event.code).toBe(4400); + expect(event.reason).toBe(error.message); + expect(event.wasClean).toBeTruthy(); + }); + await dispose(); + + async function test( + payload: SubscribePayload = { + query: `query { getValue }`, + }, + ) { + const client = await createTClient(); + client.ws.send( + stringifyMessage({ + type: MessageType.ConnectionInit, + }), + ); + + await client.waitForMessage(({ data }) => { + expect(parseMessage(data).type).toBe(MessageType.ConnectionAck); + client.ws.send( + stringifyMessage({ + id: '1', + type: MessageType.Subscribe, + payload, + }), + ); + }); + + await client.waitForClose((event) => { + expect(event.code).toBe(4400); + expect(event.reason).toBe(error.message); + expect(event.wasClean).toBeTruthy(); + }); + } + + // onSubscribe + [, dispose] = await makeServer({ + onSubscribe: () => { + throw error; + }, + }); + await test(); + await dispose(); + + [, dispose] = await makeServer({ + onOperation: () => { + throw error; + }, + }); + await test(); + await dispose(); + + // execute + [, dispose] = await makeServer({ + execute: () => { + throw error; + }, + }); + await test(); + await dispose(); + + // subscribe + [, dispose] = await makeServer({ + subscribe: () => { + throw error; + }, + }); + await test({ query: 'subscription { greetings }' }); + await dispose(); + + // onNext + [, dispose] = await makeServer({ + onNext: () => { + throw error; + }, + }); + await test(); + await dispose(); + + // onError + [, dispose] = await makeServer({ + onError: () => { + throw error; + }, + }); + await test({ query: 'query { noExisto }' }); + await dispose(); + + // onComplete + [, dispose] = await makeServer({ + onComplete: () => { + throw error; + }, + }); + await test(); + await dispose(); + }); + + it('should directly use the execution arguments returned from `onSubscribe`', async () => { + const nopeArgs = { + schema, + operationName: 'Nope', + document: parse(`query Nope { getValue }`), + }; await makeServer({ schema: undefined, - onSubscribe: (_ctx, _message, args) => { - return [ - { - ...args, - schema, - }, - ]; + execute: (args) => { + expect(args).toBe(nopeArgs); + return execute(args); + }, + onSubscribe: (_ctx, _message) => { + return nopeArgs; }, }); @@ -603,16 +742,18 @@ describe('Subscribe', () => { id: '1', type: MessageType.Subscribe, payload: { - operationName: 'TestString', - query: `query TestString { - getValue - }`, + operationName: 'Ping', + query: `subscribe Ping { + ping + }`, variables: {}, }, }), ); }); + // because onsubscribe changed the request + await client.waitForMessage(({ data }) => { expect(parseMessage(data)).toEqual({ id: '1', @@ -626,6 +767,207 @@ describe('Subscribe', () => { }, 30); }); + it('should report the graphql errors returned from `onSubscribe`', async () => { + await makeServer({ + onSubscribe: (_ctx, _message) => { + return [new GraphQLError('Report'), new GraphQLError('Me')]; + }, + }); + + const client = await createTClient(); + + client.ws.send( + stringifyMessage({ + type: MessageType.ConnectionInit, + }), + ); + + await client.waitForMessage(({ data }) => { + expect(parseMessage(data).type).toBe(MessageType.ConnectionAck); + client.ws.send( + stringifyMessage({ + id: '1', + type: MessageType.Subscribe, + payload: { + operationName: 'Ping', + query: `subscribe Ping { + ping + }`, + variables: {}, + }, + }), + ); + }); + + // because onsubscribe changed the request + + await client.waitForMessage(({ data }) => { + expect(parseMessage(data)).toEqual({ + id: '1', + type: MessageType.Error, + payload: [{ message: 'Report' }, { message: 'Me' }], + }); + }); + + await client.waitForClose(() => { + fail('Shouldt have closed'); + }, 30); + }); + + it('should use the execution result returned from `onNext`', async () => { + await makeServer({ + onNext: (_ctx, _message) => { + return { + data: { hey: 'there' }, + }; + }, + }); + + const client = await createTClient(); + + client.ws.send( + stringifyMessage({ + type: MessageType.ConnectionInit, + }), + ); + + await client.waitForMessage(({ data }) => { + expect(parseMessage(data).type).toBe(MessageType.ConnectionAck); + client.ws.send( + stringifyMessage({ + id: '1', + type: MessageType.Subscribe, + payload: { + query: `subscription { + greetings + }`, + variables: {}, + }, + }), + ); + }); + + // because onnext changed the result + + for (let i = 0; i < 5; i++) { + await client.waitForMessage(({ data }) => { + expect(parseMessage(data)).toEqual({ + id: '1', + type: MessageType.Next, + payload: { data: { hey: 'there' } }, + }); + }); + } + await client.waitForMessage(({ data }) => { + expect(parseMessage(data)).toEqual({ + id: '1', + type: MessageType.Complete, + }); + }); + + await client.waitForClose(() => { + fail('Shouldt have closed'); + }, 30); + }); + + it('should use the graphql errors returned from `onError`', async () => { + await makeServer({ + onError: (_ctx, _message) => { + return [new GraphQLError('Itsa me!'), new GraphQLError('Anda me!')]; + }, + }); + + const client = await createTClient(); + + client.ws.send( + stringifyMessage({ + type: MessageType.ConnectionInit, + }), + ); + + await client.waitForMessage(({ data }) => { + expect(parseMessage(data).type).toBe(MessageType.ConnectionAck); + client.ws.send( + stringifyMessage({ + id: '1', + type: MessageType.Subscribe, + payload: { + query: `query { + nogql + }`, + variables: {}, + }, + }), + ); + }); + + // because onnext changed the result + + await client.waitForMessage(({ data }) => { + expect(parseMessage(data)).toEqual({ + id: '1', + type: MessageType.Error, + payload: [{ message: 'Itsa me!' }, { message: 'Anda me!' }], + }); + }); + + await client.waitForClose(() => { + fail('Shouldt have closed'); + }, 30); + }); + + it('should use the operation result returned from `onOperation`', async () => { + await makeServer({ + onOperation: (_ctx, _message) => { + return (async function* () { + for (let i = 0; i < 3; i++) { + yield { data: { replaced: 'with me' } }; + } + })(); + }, + }); + + const client = await createTClient(); + + client.ws.send( + stringifyMessage({ + type: MessageType.ConnectionInit, + }), + ); + + await client.waitForMessage(({ data }) => { + expect(parseMessage(data).type).toBe(MessageType.ConnectionAck); + client.ws.send( + stringifyMessage({ + id: '1', + type: MessageType.Subscribe, + payload: { + query: `query { + getValue + }`, + variables: {}, + }, + }), + ); + }); + + // because onoperation changed the execution result + + for (let i = 0; i < 3; i++) { + await client.waitForMessage(({ data }) => { + expect(parseMessage(data)).toEqual({ + id: '1', + type: MessageType.Next, + payload: { data: { replaced: 'with me' } }, + }); + }); + } + + await client.waitForClose(() => { + fail('Shouldt have closed'); + }, 30); + }); + it('should execute the query of `string` type, "next" the result and then "complete"', async () => { await makeServer({ schema, @@ -996,6 +1338,55 @@ describe('Subscribe', () => { expect(event.wasClean).toBeTruthy(); }); }); + + it('should support persisted queries', async () => { + const queriesStore: Record = { + iWantTheValue: { + schema, + document: parse('query GetValue { getValue }'), + }, + }; + + await makeServer({ + onSubscribe: (_ctx, msg) => { + // search using `SubscriptionPayload.query` as QueryID + // check the client example below for better understanding + const hit = queriesStore[msg.payload.query as string]; + return { + ...hit, + variableValues: msg.payload.variables, // use the variables from the client + }; + }, + }); + + const client = await createTClient(); + client.ws.send( + stringifyMessage({ + type: MessageType.ConnectionInit, + }), + ); + + await client.waitForMessage(({ data }) => { + expect(parseMessage(data).type).toBe(MessageType.ConnectionAck); + client.ws.send( + stringifyMessage({ + id: '1', + type: MessageType.Subscribe, + payload: { + query: 'iWantTheValue', + }, + }), + ); + }); + + await client.waitForMessage(({ data }) => { + expect(parseMessage(data)).toEqual({ + id: '1', + type: MessageType.Next, + payload: { data: { getValue: 'value' } }, + }); + }); + }); }); describe('Keep-Alive', () => { diff --git a/src/utils.ts b/src/utils.ts index f593b935..4f95081c 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -3,24 +3,28 @@ * utils * */ - -export type Optional = Pick> & - Partial>; +import { GraphQLError } from 'graphql'; export function isObject(val: unknown): val is Record { return typeof val === 'object' && val !== null; } -export function isArray(val: unknown): val is unknown[] { - return typeof val === 'object' && val !== null && Array.isArray(val); -} - export function isAsyncIterable( val: unknown, ): val is AsyncIterableIterator { return typeof Object(val)[Symbol.asyncIterator] === 'function'; } +export function areGraphQLErrors(obj: unknown): obj is GraphQLError[] { + return ( + Array.isArray(obj) && + // must be at least one error + obj.length > 0 && + // error has at least a message + obj.every((ob) => 'message' in ob) + ); +} + export function hasOwnProperty< O extends Record, P extends PropertyKey @@ -39,7 +43,9 @@ export function hasOwnArrayProperty< O extends Record, P extends PropertyKey >(obj: O, prop: P): obj is O & Record { - return Object.prototype.hasOwnProperty.call(obj, prop) && isArray(obj[prop]); + return ( + Object.prototype.hasOwnProperty.call(obj, prop) && Array.isArray(obj[prop]) + ); } export function hasOwnStringProperty<