Skip to content

Commit

Permalink
feat: switches to REST for calling nwaku messages/subscription endpoints
Browse files Browse the repository at this point in the history
This commit modifies functions in ServiceNode to use the REST API
instead of JSON RPC when reading messages for a pubsub topic or
content topic, and when ensuring a nwaku node is subscribed to a
pubsub topic. Also modifies default Docker params to enable the
rest API and provide a port.
  • Loading branch information
adklempner committed Feb 15, 2024
1 parent a9fb796 commit 3547160
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 31 deletions.
8 changes: 5 additions & 3 deletions packages/tests/src/lib/dockerode.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import fs from "fs";
import { Logger } from "@waku/utils";
import Docker from "dockerode";

import { Args } from "../types.js";
import { Args, Ports } from "../types.js";

const log = new Logger("test:docker");

Expand Down Expand Up @@ -87,12 +87,12 @@ export default class Dockerode {
}

async startContainer(
ports: number[],
ports: Ports,
args: Args,
logPath: string,
wakuServiceNodeParams?: string
): Promise<Docker.Container> {
const [rpcPort, tcpPort, websocketPort, discv5UdpPort] = ports;
const { rpcPort, restPort, tcpPort, websocketPort, discv5UdpPort } = ports;

await this.confirmImageExistsOrPull();

Expand All @@ -109,6 +109,7 @@ export default class Dockerode {
HostConfig: {
AutoRemove: true,
PortBindings: {
[`${restPort}/tcp`]: [{ HostPort: restPort.toString() }],
[`${rpcPort}/tcp`]: [{ HostPort: rpcPort.toString() }],
[`${tcpPort}/tcp`]: [{ HostPort: tcpPort.toString() }],
[`${websocketPort}/tcp`]: [{ HostPort: websocketPort.toString() }],
Expand All @@ -118,6 +119,7 @@ export default class Dockerode {
}
},
ExposedPorts: {
[`${restPort}/tcp`]: {},
[`${rpcPort}/tcp`]: {},
[`${tcpPort}/tcp`]: {},
[`${websocketPort}/tcp`]: {},
Expand Down
111 changes: 83 additions & 28 deletions packages/tests/src/lib/service_node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ import {
KeyPair,
LogLevel,
MessageRpcQuery,
MessageRpcResponse
MessageRpcResponse,
Ports
} from "../types.js";
import { existsAsync, mkdirAsync, openAsync } from "../utils/async_fs.js";
import { delay } from "../utils/delay.js";
Expand Down Expand Up @@ -49,6 +50,7 @@ export class ServiceNode {
private websocketPort?: number;
private readonly logPath: string;
private rpcPort?: number;
private restPort?: number;

/**
* Convert a [[WakuMessage]] to a [[WakuRelayMessage]]. The latter is used
Expand Down Expand Up @@ -116,18 +118,26 @@ export class ServiceNode {
// we also randomize the first port that portfinder will try
const startPort = Math.floor(Math.random() * (65535 - 1025) + 1025);

const ports: number[] = await new Promise((resolve, reject) => {
portfinder.getPorts(4, { port: startPort }, (err, ports) => {
const ports: Ports = await new Promise((resolve, reject) => {
portfinder.getPorts(5, { port: startPort }, (err, ports) => {
if (err) reject(err);
resolve(ports);
resolve({
rpcPort: ports[0],
tcpPort: ports[1],
websocketPort: ports[2],
restPort: ports[3],
discv5UdpPort: ports[4]
});
});
});

if (isGoWaku && !args.logLevel) {
args.logLevel = LogLevel.Debug;
}

const [rpcPort, tcpPort, websocketPort, discv5UdpPort] = ports;
const { rpcPort, tcpPort, websocketPort, restPort, discv5UdpPort } =
ports;
this.restPort = restPort;
this.rpcPort = rpcPort;
this.websocketPort = websocketPort;

Expand All @@ -138,13 +148,15 @@ export class ServiceNode {
Object.assign(
mergedArgs,
{
rest: true,
restPort,
rpcPort,
tcpPort,
websocketPort,
...(args?.peerExchange && { discv5UdpPort }),
...(isGoWaku && { minRelayPeersToPublish: 0, legacyFilter })
},
{ rpcAddress: "0.0.0.0" },
{ rpcAddress: "0.0.0.0", restAddress: "0.0.0.0" },
_args
);

Expand Down Expand Up @@ -210,11 +222,27 @@ export class ServiceNode {
async ensureSubscriptions(
pubsubTopics: string[] = [DefaultPubsubTopic]
): Promise<boolean> {
this.checkProcess();
return this.restCall<boolean>(
"/relay/v1/subscriptions",
"POST",
pubsubTopics,
async (response) => response.status === 200
);
}

return this.rpcCall<boolean>("post_waku_v2_relay_v1_subscriptions", [
pubsubTopics
]);
async messages(
pubsubTopic: string = DefaultPubsubTopic
): Promise<MessageRpcResponse[]> {
pubsubTopic = encodeURIComponent(pubsubTopic);
return this.restCall<MessageRpcResponse[]>(
`/relay/v1/messages/${pubsubTopic}`,
"GET",
null,
async (response) => {
const data = await response.json();
return data?.length ? data : [];
}
);
}

async ensureSubscriptionsAutosharding(
Expand Down Expand Up @@ -255,30 +283,21 @@ export class ServiceNode {
]);
}

async messages(
pubsubTopic: string = DefaultPubsubTopic
): Promise<MessageRpcResponse[]> {
this.checkProcess();

const msgs = await this.rpcCall<MessageRpcResponse[]>(
"get_waku_v2_relay_v1_messages",
[pubsubTopic]
);

return msgs.filter(isDefined);
}

async messagesAutosharding(
contentTopic: string
): Promise<MessageRpcResponse[]> {
this.checkProcess();

const msgs = await this.rpcCall<MessageRpcResponse[]>(
"get_waku_v2_relay_v1_auto_messages",
[contentTopic]
contentTopic = encodeURIComponent(contentTopic);
return this.restCall<MessageRpcResponse[]>(
`/relay/v1/auto/messages/${contentTopic}`,
"GET",
null,
async (response) => {
const data = await response.json();
return data?.length ? data.filter(isDefined) : [];
}
);

return msgs.filter(isDefined);
}

async getAsymmetricKeyPair(): Promise<KeyPair> {
Expand Down Expand Up @@ -411,6 +430,10 @@ export class ServiceNode {
return `http://127.0.0.1:${this.rpcPort}/`;
}

get httpUrl(): string {
return `http://127.0.0.1:${this.restPort}`;
}

async rpcCall<T>(
method: string,
params: Array<string | number | unknown>
Expand Down Expand Up @@ -442,6 +465,37 @@ export class ServiceNode {
);
}

async restCall<T>(
endpoint: string,
method: "GET" | "POST",
body: any = null,

Check warning on line 471 in packages/tests/src/lib/service_node.ts

View workflow job for this annotation

GitHub Actions / check

Unexpected any. Specify a different type

Check warning on line 471 in packages/tests/src/lib/service_node.ts

View workflow job for this annotation

GitHub Actions / proto

Unexpected any. Specify a different type
processResponse: (response: Response) => Promise<T>
): Promise<T> {
this.checkProcess();

return await pRetry(
async () => {
try {
log.info("Making a REST Call: ", endpoint, body);
const options: RequestInit = {
method,
headers: new Headers({ "Content-Type": "application/json" })
};
if (body) options.body = JSON.stringify(body);

const response = await fetch(`${this.httpUrl}${endpoint}`, options);
log.info(`Received REST Response: `, response.status);
return await processResponse(response);
} catch (error) {
log.error(`${this.httpUrl} failed with error:`, error);
await delay(10);
throw error;
}
},
{ retries: 5 }
);
}

private checkProcess(): void {
if (!this.docker?.container) {
throw `${this.type} container hasn't started`;
Expand All @@ -454,6 +508,7 @@ export function defaultArgs(): Args {
listenAddress: "0.0.0.0",
rpc: true,
relay: false,
rest: true,
rpcAdmin: true,
websocketSupport: true,
logLevel: LogLevel.Trace
Expand Down
10 changes: 10 additions & 0 deletions packages/tests/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ export interface Args {
nat?: "none";
listenAddress?: string;
relay?: boolean;
rest?: boolean;
rpc?: boolean;
rpcAdmin?: boolean;
nodekey?: string;
Expand All @@ -18,6 +19,7 @@ export interface Args {
rpcPrivate?: boolean;
websocketSupport?: boolean;
tcpPort?: number;
restPort?: number;
rpcPort?: number;
websocketPort?: number;
discv5BootstrapNode?: string;
Expand All @@ -27,6 +29,14 @@ export interface Args {
clusterId?: number;
}

export interface Ports {
rpcPort: number;
tcpPort: number;
websocketPort: number;
restPort: number;
discv5UdpPort: number;
}

export enum LogLevel {
Error = "ERROR",
Info = "INFO",
Expand Down
1 change: 1 addition & 0 deletions packages/tests/tests/nwaku.node.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ describe("nwaku", () => {
"--listen-address=0.0.0.0",
"--rpc=true",
"--relay=false",
"--rest=true",
"--rpc-admin=true",
"--websocket-support=true",
"--log-level=TRACE",
Expand Down

0 comments on commit 3547160

Please sign in to comment.