Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add stream-utils sub-import #294

Merged
merged 1 commit into from
Apr 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 11 additions & 23 deletions integration-test/vite-streaming/server.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import express from "express";
import { renderToReadableStream } from "react-dom/server.edge";
import { readFile } from "node:fs/promises";
import {
createInjectionTransformStream,
pipeReaderToResponse,
} from "@apollo/client-react-streaming/stream-utils";

// Constants
const isProduction = process.env.NODE_ENV === "production";
Expand Down Expand Up @@ -51,14 +55,14 @@ app.use("*", async (req, res) => {
console.error("Fatal", error);
});

const { createTransport, render } =
/** @type {import('./src/entry-server.jsx.js')}*/ (
await (isProduction
? import("./dist/server/entry-server.js")
: vite.ssrLoadModule("/src/entry-server.jsx"))
);
const { render } = /** @type {import('./src/entry-server.jsx.js')}*/ (
await (isProduction
? import("./dist/server/entry-server.js")
: vite.ssrLoadModule("/src/entry-server.jsx"))
);

const { injectIntoStream, transformStream } = createTransport();
const { injectIntoStream, transformStream } =
createInjectionTransformStream();

const App = render({
isProduction,
Expand All @@ -79,22 +83,6 @@ app.use("*", async (req, res) => {
);
});

async function pipeReaderToResponse(reader, res) {
try {
while (true) {
const { done, value } = await reader.read();
if (done) {
res.end();
return;
} else {
res.write(value);
}
}
} catch (e) {
res.destroy(e);
}
}

// Start http server
app.listen(port, () => {
console.log(`Server started at http://localhost:${port}`);
Expand Down
79 changes: 4 additions & 75 deletions integration-test/vite-streaming/src/Transport.tsx
Original file line number Diff line number Diff line change
@@ -1,19 +1,14 @@
/**
* The logic for the transform stream was strongly inspired by `createHeadInsertionTransformStream`
* from https://github.com/vercel/next.js/blob/6481c92038cce43056005c07f80f2938faf29c29/packages/next/src/server/node-web-streams-helper.ts
*
* released under a MIT license (https://github.com/vercel/next.js/blob/6481c92038cce43056005c07f80f2938faf29c29/packages/next/license.md)
* by Vercel, Inc., marked Copyright (c) 2023 Vercel, Inc.
*/

import { WrapApolloProvider } from "@apollo/client-react-streaming";
import { buildManualDataTransport } from "@apollo/client-react-streaming/manual-transport";
import { renderToString } from "react-dom/server";
import * as React from "react";
import { setVerbosity } from "ts-invariant";

setVerbosity("debug");

const InjectionContext = React.createContext<
(callback: () => React.ReactNode) => void
>(() => {});

export const InjectionContextProvider = InjectionContext.Provider;

export const WrappedApolloProvider = WrapApolloProvider(
Expand All @@ -23,69 +18,3 @@ export const WrappedApolloProvider = WrapApolloProvider(
},
})
);

export function createTransport(): {
transformStream: TransformStream;
injectIntoStream: (callback: () => React.ReactNode) => void;
} {
let queuedInjections: Array<() => React.ReactNode> = [];

async function renderInjectedHtml() {
const injections = [...queuedInjections];
queuedInjections = [];
return renderToString(
<>
{injections.map((callback, i) => (
<React.Fragment key={i}>{callback()}</React.Fragment>
))}
</>
);
}

let headInserted = false;
let currentlyStreaming = false;
const textDecoder = new TextDecoder();

const transformStream = new TransformStream({
async transform(chunk, controller) {
// While react is flushing chunks, we don't apply insertions
if (currentlyStreaming) {
controller.enqueue(chunk);
return;
}

if (!headInserted) {
const content = textDecoder.decode(chunk, { stream: true });
const index = content.indexOf("</head>");
if (index !== -1) {
const insertedHeadContent =
content.slice(0, index) +
(await renderInjectedHtml()) +
content.slice(index);
controller.enqueue(new TextEncoder().encode(insertedHeadContent));
currentlyStreaming = true;
setImmediate(() => {
currentlyStreaming = false;
});
headInserted = true;
} else {
controller.enqueue(chunk);
}
} else {
controller.enqueue(
new TextEncoder().encode(await renderInjectedHtml())
);
controller.enqueue(chunk);
currentlyStreaming = true;
setImmediate(() => {
currentlyStreaming = false;
});
}
},
});

return {
transformStream,
injectIntoStream: (callback) => queuedInjections.push(callback),
};
}
14 changes: 14 additions & 0 deletions packages/client-react-streaming/package-shape.json
Original file line number Diff line number Diff line change
Expand Up @@ -60,5 +60,19 @@
"resetManualSSRApolloSingletons",
"built_for_ssr"
]
},
"@apollo/client-react-streaming/stream-utils": {
"react-server": ["built_for_other"],
"browser": ["built_for_other"],
"node": [
"built_for_ssr",
"createInjectionTransformStream",
"pipeReaderToResponse"
],
"edge-light,worker,browser": [
"built_for_ssr",
"createInjectionTransformStream",
"pipeReaderToResponse"
]
}
}
19 changes: 19 additions & 0 deletions packages/client-react-streaming/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,31 @@
"node": "./dist/manual-transport.ssr.js"
}
},
"./stream-utils": {
"require": {
"types": "./dist/stream-utils.node.d.cts",
"edge-light": "./dist/stream-utils.node.cjs",
"react-server": "./dist/empty.cjs",
"browser": "./dist/empty.cjs",
"node": "./dist/stream-utils.node.cjs"
},
"import": {
"types": "./dist/stream-utils.node.d.ts",
"edge-light": "./dist/stream-utils.node.js",
"react-server": "./dist/empty.js",
"browser": "./dist/empty.js",
"node": "./dist/stream-utils.node.js"
}
},
"./package.json": "./package.json"
},
"typesVersions": {
"*": {
"manual-transport": [
"./dist/manual-transport.ssr.d.ts"
],
"stream-utils": [
"./dist/stream-utils.node.d.ts"
]
}
},
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/**
* The logic for `createInjectionTransformStream` was strongly inspired by `createHeadInsertionTransformStream`
* from https://github.com/vercel/next.js/blob/6481c92038cce43056005c07f80f2938faf29c29/packages/next/src/server/node-web-streams-helper.ts
*
* released under a MIT license (https://github.com/vercel/next.js/blob/6481c92038cce43056005c07f80f2938faf29c29/packages/next/license.md)
* by Vercel, Inc., marked Copyright (c) 2023 Vercel, Inc.
*/

import { renderToString } from "react-dom/server";
import * as React from "react";

/**
* > This export is only available in streaming SSR Server environments
*
* Used to create a `TransformStream` that can be used for piping a React stream rendered by
* `renderToReadableStream` and using the callback to insert chunks of HTML between React Chunks.
*/
export function createInjectionTransformStream(): {
/**
* @example
* ```js
* const { injectIntoStream, transformStream } = createInjectionTransformStream();
* const App = render({ assets, injectIntoStream });
* const reactStream = await renderToReadableStream(App, { bootstrapModules }));
* await pipeReaderToResponse(
* reactStream.pipeThrough(transformStream).getReader(),
* response
* );
* ```
*/
transformStream: TransformStream;
/**
* `injectIntoStream` method that can be injected into your React application, to be made available to
*
* @example
* ```js title="setup"
* // create a Context for injection of `injectIntoStream`
* const InjectionContext = React.createContext<
* (callback: () => React.ReactNode) => void
* >(() => {});
* // to be used in your application
* export const InjectionContextProvider = InjectionContext.Provider;
* // make it accessible to `WrapApolloProvider`
* export const WrappedApolloProvider = WrapApolloProvider(
* buildManualDataTransport({
* useInsertHtml() {
* return React.useContext(InjectionContext);
* },
* })
* );
* ```
* Then in your applications SSR render, pass this function to `InjectionContextProvider`:
* ```js
* <InjectionContextProvider value={injectIntoStream}>
* ```
*/
injectIntoStream: (callback: () => React.ReactNode) => void;
} {
let queuedInjections: Array<() => React.ReactNode> = [];

async function renderInjectedHtml() {
const injections = [...queuedInjections];
queuedInjections = [];
return renderToString(
<>
{injections.map((callback, i) => (
<React.Fragment key={i}>{callback()}</React.Fragment>
))}
</>
);
}

let headInserted = false;
let currentlyStreaming = false;
const textDecoder = new TextDecoder();

const transformStream = new TransformStream({
async transform(chunk, controller) {
// While react is flushing chunks, we don't apply insertions
if (currentlyStreaming) {
controller.enqueue(chunk);
return;
}

if (!headInserted) {
const content = textDecoder.decode(chunk, { stream: true });
const index = content.indexOf("</head>");
if (index !== -1) {
const insertedHeadContent =
content.slice(0, index) +
(await renderInjectedHtml()) +
content.slice(index);
controller.enqueue(new TextEncoder().encode(insertedHeadContent));
currentlyStreaming = true;
setImmediate(() => {
currentlyStreaming = false;
});
headInserted = true;
} else {
controller.enqueue(chunk);
}
} else {
controller.enqueue(
new TextEncoder().encode(await renderInjectedHtml())
);
controller.enqueue(chunk);
currentlyStreaming = true;
setImmediate(() => {
currentlyStreaming = false;
});
}
},
});

return {
transformStream,
injectIntoStream: (callback) => queuedInjections.push(callback),
};
}
2 changes: 2 additions & 0 deletions packages/client-react-streaming/src/stream-utils/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export { createInjectionTransformStream } from "./createInjectionTransformStream.js";
export { pipeReaderToResponse } from "./pipeReaderToResponse.js";
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import type { ServerResponse } from "node:http";
/**
/**
* > This export is only available in streaming SSR Server environments
*
* Used to pipe a `ReadableStreamDefaultReader` to a `ServerResponse`.
*
* @example
* ```js
* const { injectIntoStream, transformStream } = createInjectionTransformStream();
* const App = render({ assets, injectIntoStream });
* const reactStream = await renderToReadableStream(App, { bootstrapModules }));
* await pipeReaderToResponse(
* reactStream.pipeThrough(transformStream).getReader(),
* response
* );
* ```
*/
export async function pipeReaderToResponse(
reader: ReadableStreamDefaultReader<any>,
res: ServerResponse
) {
try {
// eslint-disable-next-line no-constant-condition
while (true) {
const { done, value } = await reader.read();
if (done) {
res.end();
return;
} else {
res.write(value);
}
}
} catch (e: any) {
res.destroy(e);
}
}
8 changes: 7 additions & 1 deletion packages/client-react-streaming/tsup.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,12 @@ export default defineConfig((options) => {
}
: false,
outDir: "dist/",
external: ["@apollo/client-react-streaming", "react", "rehackt"],
external: [
"@apollo/client-react-streaming",
"react",
"rehackt",
"react-dom",
],
noExternal: ["@apollo/client"], // will be handled by `acModuleImports`
esbuildPlugins: [acModuleImports],
};
Expand Down Expand Up @@ -64,6 +69,7 @@ export default defineConfig((options) => {
"src/ManualDataTransport/index.ts",
"manual-transport.browser"
),
entry("ssr", "src/stream-utils/index.ts", "stream-utils.node"),
];
});

Expand Down
Loading