Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transport Agnostic RPC implementation #498

Merged
merged 44 commits into from
Feb 14, 2023
Merged

Conversation

tegefaulkes
Copy link
Contributor

@tegefaulkes tegefaulkes commented Jan 3, 2023

Description

This PR addresses adding our own custom transport agnostic RPC implementation.

The transport agnostic RPC needs to cover everything from the byte stream to the service handlers. This includes

  1. Converting a byte stream to RPC messages.
  2. Parsing and validating RPC messages.
  3. Creating the service handler interfaces.

Stage 1 - Converting the byte stream

Each RPC message is a single JSON RPC message object. There are 4 forms to this, a request, notification, response result and a response error. These messages are specified in the JSON-RPC 2.0 spec at https://www.jsonrpc.org/specification.

We need convert a continuous stream of these messages into distinct messages. The stream is made up of JSON-RPC objects that are JSON.stringify()ed and converted to buffers. The messages are structured like {JsonRpcMessage}{JsonRpcMessage} but in buffer form.

These object strings are delimited by their top level {} bracket pair. No delimiter character exists between each message. So to extract a single message out we need to find the top level matching {} take the sub-array of that part of the incoming buffer.

Ultimately the resulting transformation stream takes in arbitrary sized buffers of the incoming messages and outputs buffers of single complete message buffers. E.G. {jso|nMessa|ge}{JsonMess|age} => {JsonRpcMessage}|{JsonRpcMessage} This can then be piped through a message validation stream.

Stage 2 - parsing and message validation

In this stage we are taking single JSON-RPC messages and converting them into objects and validating them. If the message is still in the buffer form then the conversion is done with Json.parse(Buffer.toString('utf-8')).

These objects need to be valid JSON-RPC message objects. So we need to create validation functions for each type of message. There are 4 kinds of JSON-RPC messages specified in https://www.jsonrpc.org/specification. Each one will need it's own validation function.

Solid types for each type of message need to be created. These message types need to be generic on the data so we can enforce a strict message schema for each service handler. The message data can't be validated at this stage. This should be left for the handlers to validate.

On top of the JSON-RPC spec, we need to add some more parameters to the messages.
A type parameter is needed to distinguish message types from each other for simple pattern matching. We also need some kind of sub type to distinguish control messages from data messages.

Stage 3 - service and client handlers

We need a structured way of implementing client call and service handlers the the RPC. The data type of the requests and responses need to be identical for both and enforced by the typescript type system.

The request and response data structure isn't set in stone yet. Right now it's a POJO, there was discussion about it being an Array but the JSON-RPC spec allows for any JSONValue. When implementing the handler we need to specify the request and response types. This is to enforce the type on input and output data in the handler.

request and response data validation needs to be handled within the handlers. We will need to create a parser function for our message data.

The handler themselves need to be generators that implement our handler interface. The interface will look something like this..

type TestGen<I extends JSONValue, O extends JSONValue> = (  
  input: AsyncGenerator<I>, // Input generator
  context, // Context data such as connection information  
  ctx, // Timeout and cancellation  
) => AsyncGenerator<O>;
  
const testGen: TestGen<DataA, DataA> = async function* (input, a, b){  
  yield* input; // Echo  
}

This will be the most generic form of the duplex stream. Some prototyping needs to be done but it should be possible to have higher order functions that converts the unary, server stream and client stream handlers to this duplex stream form.

function unary<I, O>(f: (input: I, context, ctx) => O): TestGen<I, O>;
function serverStream<I, O>(f: (input: AsyncGenerator<I>, context, ctx) => O): TestGen<I, O>;
function clientStream<I, O>(f: (input: I, context, ctx) => AsyncGenerator<O>): TestGen<I, O>;

Remaining spec

Todo:

  • Plumbing new Quic streams into handlers.
  • How client calls work, how streaming will work with them.
  • How metadata will be handled.
  • Details on message streaming and control messages. Do we use request messages at all? or abstract over notifications? if just notifications do we use the response message types?
  • Registering handlers, dynamic or static configuration? If dynamic how do we enforce message data structure parity between handlers and client callers?

Issues Fixed

Tasks

  1. Try and fix the message splitting stream. Try and use the JSON parser again. Try and catch the error. If not then try a different parser.
  2. Post an upstream issue for the wrong offset value in the tokenizer
  3. Client and server streaming is backwards. need to swap their names.
  4. The RPC class needs an error event emitter. Non-handleable errors should be emitted here such as connection failures and non handled errors in raw handlers.
  5. Double check that the input streams are properly closed when done. Streams coming out of the quic system may never 'end' so for cases were our handler is done We can close the input.
  6. Optimize the stream parser
  7. Change up how the JsonRpcMessage params is structured to include metadata
  8. Using async iterators
  9. Changing Buffer streams to Uint8Array streams
  10. CREATING THE CLIENT SIDE
    1. Need to create split generator for IO
    2. Create generic call handlers
    3. Template and test
    4. Test out BYOB streams for the input and output streams to allow buffering.
    5. Outline how the stream handler is used and set up in the PR.
    6. remote node id isn't really avaliable in the connection info, should be extracted from the certificate info
    7. Still need to make raw stream handler
  11. Change the RPCServer to CreateDestroy
  12. The RPCClient needs to take a callback to create a stream
  13. Create a websocket based client for client communication

Final checklist

  • Domain specific tests
  • Full tests
  • Updated inline-comment documentation
  • Lint fixed
  • Squash and rebased
  • Sanity check the final build

@tegefaulkes tegefaulkes self-assigned this Jan 3, 2023
@ghost
Copy link

ghost commented Jan 3, 2023

👇 Click on the image for a new way to code review

Review these changes using an interactive CodeSee Map

Legend

CodeSee Map Legend

@tegefaulkes
Copy link
Contributor Author

The plan right now is to create some test utils using fast-check and the transform stream to convert the raw stream data into distinct JSONRPC messages.

Looking over the docs we can use generators to transform the stream pretty easily. But the example provided doesn't return a new stream. https://exploringjs.com/nodejs-shell-scripting/ch_web-streams.html#transforming-web-streams-via-generators. So we won't be able to compose it with other transform streams. If we don't care about that then it is fine. Otherwise we would have to do it the usual way. https://exploringjs.com/nodejs-shell-scripting/ch_web-streams.html#implementing-custom-transformstreams.

After creating the transformer stream I can look into parsing the JSONRPC messages. We need a good way of enforcing the structure of these using the type system. Looking over the JSON RPC spec there isn't a good way to tell the type of a message. For example, a request message contains the fields jsonrpc, method, params?, id. However a notification is exactly the same as a request but it doesn't include the id. So the only way to tell them apart is to check if the id key exists. This is not a great way to do it. I think we discussed previously about using a type field so I'll likely just go with that.

The parameters can be any valid JsonValue. This means it can be a json object, string, number or array. If I recall from previous discussion we wanted to just use arrays for the parameters? Is that something we still want to enforce?

@tegefaulkes
Copy link
Contributor Author

These web streams don't play very nice with strict typing. While interfaces with proper types are provided. I can't get proper type inference when implementing a transformer class.

For example. when implementing the class class ChunksToLinesTransformer implements Transformer<Buffer, string>
I can implement transform as

transform(chunk, controller) {
  chunk //any
  controller //any
};

To get proper types on the method I need to implemented as

transform: TransformerTransformCallback<Buffer, string> = async (chunk, controller) => {
  chunk // Buffer
  controller // TransformStreamDefaultController<string>
}

While this will work it is not ideal. We do still get a warning if the type annotation is wrong.

@CMCDragonkai You've worked with this. Do you have a good way of enforcing the types here?

@tegefaulkes
Copy link
Contributor Author

Slicing out the JSON messages from the stream is going to be a little tricky. I first thought that I could just keep a tally of {} brackets. The problem with this is that {} can be within strings in the JSON object. So I'll need to check if the brackets are within strings when I tally them AND account for any escape characters while doing so.

We may be able to delimit the messages by using the provided ASCII seperators. That being the codes 0x1C, 0x1D, 0x1E and 0x1F. I don't see these characters listed in the JSON grammar. https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/JSON#full_json_grammar. So we may be able to use these to delimit messages easily.

@CMCDragonkai
Copy link
Member

CMCDragonkai commented Jan 3, 2023

I think there are off the shelf JSON stream-parsers, can we make use of those?

It should be possible to repurpose them to specifically do a "partial parse" that is parse me a JSON message, expect that there may be left over data in the buffer, which can be left to the next iteration of the parsing loop.

@CMCDragonkai
Copy link
Member

I remember referencing this one: https://github.com/juanjoDiaz/streamparser-json

@CMCDragonkai
Copy link
Member

No thoughts yet on the type inference.

@tegefaulkes
Copy link
Contributor Author

I remember referencing this one: https://github.com/juanjoDiaz/streamparser-json

Just took a look at this. It's useful but not ideal here. The parser expects a properly formatted JSON, so it can parse a stream of [{},{},{},...] where each object is a JSONRPC message. The parser will also return every key value pair withing the JSON structure. so we need to filter and return the message objects only. I think I can make a workable solution with this.

I'm going to try 3 methods of slicing the messages from the stream. Using transform streams should make them interchangeable so we can pick the one we think works the best. Right the 3 approaches are.

  1. well defined message header specifying the number of bytes the message contains. The header can contain an enum for the message type so we can mix and match message types. We can share Protobuf, raw binary data and JSON messages within the same stream with this.
  2. ASCII separators. ASCII has 4 well defined separators for delimiting data. I checked and they're not valid in the JSON format so they shouldn't be contained within messages. This will not work with binary data but should work fine for the JSON messages.
  3. Using the JSON stream parser. We would need to format the stream as a JSON array. so the stream needs to start with [ and each message separated with ,. This will only work for JSON message and it's a bit more complex than option 2 so 2 may be a better solution here.

I'll prototype transform streams for each of these methods.

@tegefaulkes
Copy link
Contributor Author

I've been thinking about how to handle bad stream data. Do we even bother with this? A single stream would come from a quic stream. Theses should handle any problems with the data over the socket so I don't expect to see bad or mangled data within the stream.

We should be able to handle bad or mangled data within the stream but I don't think we should bother with that. Any failure to pass data should just throw an error back down the stream and close it out. This would be part of the normal error handling but at a slightly lower level.

@CMCDragonkai
Copy link
Member

CMCDragonkai commented Jan 4, 2023

Regarding streaming, based on what we talked about on the whiteboard I want to keep it as vanilla as possible. Try to get the json stream parser to "get you the next token", and then keep the remaining data.

So if the string input is { ... }. As soon as it reaches }, it should give you the complete message. If the next character is { such as { ... }{, then it even if it throws an error we must ignore it and leave it in the remaining buffer (for the next iteration of the "lexer"). This makes it closer to a compiler lexer which always gets you the next token but does not do more than that.

See if you can configure the streamparser-json to do that. If it can't do that out of the box, consider forking it and making upstream changes to be capable of this.

Finally regarding bad stream data, that should be considered a lexing failure, upon a lexing failure that should result in bad stream data, and while the initial JSON message has been passed onto the downstream consumer, an error should be thrown on the stream, and that means the error gets thrown to the downstream system. The downstream consumer can then decide what to do, which means it could rethrow that exception or close the stream... etc. There should be no automatic decisions made during the mapping operation with respect to the upstream producer.

Visually speaking that means something like:

BYTESTREAM -> Transformer -> JSONSTREAM

And errors in the transformer is propagated as errors to the JSONSTREAM, it is not an error for the input BYTESTREAM. Errors flow to the end consumer.

@tegefaulkes
Copy link
Contributor Author

It is possible to separate the messages using the original bracket counting method using the @streamparser/json tokenizer to detect valid brackets (Brackets not contained within a string). The tokenizer has a weird quirk however. When returning the offset of a token it doesn't take account of any escaped characters. So while I can work out the offset of the last } of a message. It offset value will be off by the number of escaped characters within the message.

This is an easy fix, I just need to work out the number of escaped characters and add that to the offset.

@CMCDragonkai
Copy link
Member

Make sure to fuzz test your technique so that it works in all situations of sending JSON frames, or sending invalid data or sending mixed JSON frames and invalid data.

@CMCDragonkai
Copy link
Member

Are you using @streamparser/json-whatwg? Apparently it wraps @streamparser/json to use web streams, which may be better suited to what we are doing.

@CMCDragonkai
Copy link
Member

@tegefaulkes can you start speccing out the tasks for this PR?

@tegefaulkes
Copy link
Contributor Author

I've fleshed out the PR some more. There is still some stuff to add but I need to settle some structure in my head first.

@tegefaulkes
Copy link
Contributor Author

The class can be given a stream and then handle all the control and service handling. The thing is, while this RPC call is open we have state that should be running in the background. Given that we need to shut down these connections gracefully at any time. we need the ability to track these active calls.

We can keep a set of active calls. When they are done they should remove themselves from the set. We can enumerate the set to destroy any active connections when we need to stop the RPC.

These active calls might need some amount of state such as what service handler is needed when processing messages. This could be CreateDestroy class for handling this.

@CMCDragonkai
Copy link
Member

Can you provide a file structure of all the new modules be developed here.

@CMCDragonkai
Copy link
Member

The #249 issue should be updated with the desired specification of this JSON RPC system. Cross out the old GRPC tasks, the old spec can just be separated with ---.

The new spec should have subsections with ### for each thing we need to work out.

Refer to the issues in #495 for extra detail, I've also updated a comment on each of those issues relating to the migration to JSON RPC.

Make sure to consider:

  • Middleware (from the GRPC interceptor)
    • Authentication
    • Method Dispatch
    • Client Side
    • Server Side
  • Schema for our params.meta and result.met for both request and response objects
  • Deadline/timeout parameter and the ability to set it to be null
  • How errors is being serialisation
  • How to bubble up errors from QUICStream to RPCClient to whatever upwards, whether should be an EventTarget or Observable.

@CMCDragonkai CMCDragonkai self-requested a review January 9, 2023 03:30
@CMCDragonkai
Copy link
Member

Based on some discussion:

  1. There are situations where using Promise and EventTarget as a replacement for callbacks and event emitter means that the errors go in a different direction (to an event handler) rather than to the caller. This is happening right now in js-quic, more details to follow there: Create QUIC library that can be exposed to JS and uses the Node dgram module js-quic#1
  2. The RPCServer should have a error event, this can be emitted in the case where there is no appropriate caller. Like if a raw handler of a stream were to throw an error. The more abstract handlers when throwing an error end up going to the stream.
  3. If you were to write to the stream, and that fails, that can be an error emitted as an error event.
  4. The PolykeyAgent is probably going to register an error handler for the RPCServer, and in that case it can either just log out the error with nothing else to do. This is because one does not really restart the RPCServer, since it's just always going to be handling a stream .
  5. Client streaming is where the client sends lots of messages.
  6. Server streaming is where the server sends lots of messages.

@CMCDragonkai
Copy link
Member

CMCDragonkai commented Jan 10, 2023

We should confirm how to do explicit stream closing for both readable and writable.

In the case of readable, there is way to "cancel" the stream using https://developer.mozilla.org/en-US/docs/Web/API/ReadableStreamBYOBReader/cancel as well as the readable.cancel.

This allows you to provide a reason for the cancellation. But in the case of readable stream the cancel reason I believe I will translate to a stream close.

So on the readable side, this means: https://docs.rs/quiche/latest/quiche/struct.Connection.html#method.stream_shutdown. Which I can pass a err number code corresponding a "reason".

But on the writable side, there's actually 2 ways to do this, the stream_send with a fin and stream_shutdown. I'm not entirely clear on the difference yet.

This is more on the QUIC side though, you need to ensure that you're explicitly signalling a close on the readable stream when you are finished reading (for unary, server streaming), and when you are finished writing (for unary, client streaming, server streaming, duplex streaming).

However at the end of the handling, no matter what, both readable and writable sides should be explicitly closed.

@CMCDragonkai
Copy link
Member

Ok so basically now we just need to fix up the above, and also prepare some middleware for authentication.

@CMCDragonkai
Copy link
Member

The incremental JSON parser has an inefficient way of figuring the exact source offset. We need to raise an upstream issue to indicate that when we have escaped strings, the offset is not accurate. The offset is only useful if it is correctly indexed against the source input. It will be quite slow if we have to use JSON.stringify each time to figure out how many escaped characters that affects the offset.

This seems like a bug/oversight that upstream should be correcting.

@CMCDragonkai
Copy link
Member

Furthermore another inefficiency is that we have to build up the chunks as an accumulating buffer before we do a JSON.parse because we are using the Tokenizer and not TokenParser from the incremental JSON parser.

We should be able to use the TokenParser to build up the tree internally and not have to do our own buffering. However the problem is that the parser currently throws an error when we write a string into it that has a concatenated message without a delimiter.

What we need to do is to raise an issue upstream as well, to make the parser actually truly incremental, in which case it should not throw an error immediately on writing the string, but upon acquiring the next token.

We can then ignore this error, and restart the parsing for the new message.

@CMCDragonkai
Copy link
Member

@tegefaulkes it's possible that if you just ignore the error on the write, the onValue handler might end up being called anyway.

@tegefaulkes
Copy link
Contributor Author

tegefaulkes commented Jan 10, 2023

Some small notes.

  1. Slight change to JSON RPC message structure. message metadata needs to be included within the params field. We should have a reserved metadata keyword but we can just keep the data separate and structure it as {metadata: JSONValue, data: JSONValue}.
  2. Authentication. The first message of the stream should include authentication details in the metadata field. This could be a bearer token or a password. This needs to be further speced out. Do we want functionality where the server can send some leading metadata and the client replies with a secret generated from that metadata?
    We need a standard way of handling the authentication. Likely through an authentication function that we should provide the RPC. Or maybe we don't need it as part of the RPC system, we only need the metadata available within the handler. This will tie into the middle-wear stuff. This will need to tie into the rpc client and some prototyping needs to be done there still.
  3. Look into using async iterator type as apposed to generator.
  4. client and server streams are backwards
  5. It's a stream of uint8 array
  6. Create an issue on the json-stream parser about the tokenizer offset problem.
  7. Raw handlers should emit on the RPC event emitter

@CMCDragonkai
Copy link
Member

Just remember that it should be RPCServer and RPCClient class.

@tegefaulkes
Copy link
Contributor Author

I've created an upstream issue for the tokenizer problem at juanjoDiaz/streamparser-json#24.

Related #500
Related #502

[ci skip]
Related #500
Related #501
Related #502

[ci skip]
- Related #500
- Related #501
- Related #502

[ci skip]
- Related #500
- Related #501
Related #500
Related #501
Related #502

[ci skip]
@tegefaulkes
Copy link
Contributor Author

Squashed and re-based on staging

@tegefaulkes
Copy link
Contributor Author

Enabling CI

@tegefaulkes
Copy link
Contributor Author

There seems to be some issues with all of the tests and building. But that's something coming from the staging branch so I'll have to fix that there. I should be good to merge now.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
3 participants