Skip to content

Commit

Permalink
Merge pull request #551 from jan-ivar/atomicwrite
Browse files Browse the repository at this point in the history
Add writer.atomicWrite() method.
  • Loading branch information
jan-ivar committed Jan 31, 2024
2 parents 607bfa8 + fbc834b commit 989aaa5
Showing 1 changed file with 132 additions and 7 deletions.
139 changes: 132 additions & 7 deletions index.bs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ spec:fetch; type:dfn; for:/; text:fetch
spec:url; type:dfn; text:scheme
spec:url; type:dfn; text:fragment
spec:infra; type:dfn; for:/; text:ASCII case-insensitive
spec:infra; type:dfn; text:list
</pre>
<pre class="anchors">
url: https://html.spec.whatwg.org/multipage/origin.html#concept-origin; type: dfn; text: origin; for:/
Expand Down Expand Up @@ -1474,6 +1475,7 @@ interface WebTransportSendStream : WritableStream {
attribute WebTransportSendGroup? sendGroup;
attribute long long sendOrder;
Promise&lt;WebTransportSendStreamStats&gt; getStats();
WebTransportWriter getWriter();
};
</pre>

Expand Down Expand Up @@ -1519,6 +1521,12 @@ The {{WebTransportSendStream}}'s [=transfer steps=] and
1. [=Resolve=] |p| with |stats|.
1. Return |p|.

: <dfn for="WebTransportSendStream" method>getWriter()</dfn>
:: This method must be implemented in the same manner as {{WritableStream/getWriter}}
inherited from {{WritableStream}}, except in place of creating a
{{WritableStreamDefaultWriter}}, it must instead
[=WebTransportWriter/create=] a {{WebTransportWriter}} with [=this=].

## Internal Slots ## {#send-stream-internal-slots}

A {{WebTransportSendStream}} has the following internal slots.
Expand Down Expand Up @@ -1552,6 +1560,11 @@ A {{WebTransportSendStream}} has the following internal slots.
<td><dfn>`[[SendOrder]]`</dfn>
<td class="non-normative">An optional send order number, defaulting to 0.
</tr>
<tr>
<td><dfn>`[[AtomicWriteRequests]]`</dfn>
<td class="non-normative">An [=ordered set=] of promises, keeping track of the subset of
write requests that are atomic among those queued to be processed by the underlying sink.
</tr>
<tbody>
</table>

Expand All @@ -1575,6 +1588,8 @@ To <dfn export for="WebTransportSendStream" lt="create|creating">create</dfn> a
:: |sendGroup|
: {{WebTransportSendStream/[[SendOrder]]}}
:: |sendOrder|
: {{WebTransportSendStream/[[AtomicWriteRequests]]}}
:: An empty [=ordered set=] of promises.
1. Let |writeAlgorithm| be an action that [=writes=] |chunk| to |stream|, given |chunk|.
1. Let |closeAlgorithm| be an action that [=closes=] |stream|.
1. Let |abortAlgorithm| be an action that [=aborts=] |stream| with |reason|, given |reason|.
Expand Down Expand Up @@ -1602,9 +1617,18 @@ To <dfn for="WebTransportSendStream">write</dfn> |chunk| to a {{WebTransportSend
1. Let |promise| be a new promise.
1. Let |bytes| be a copy of the [=byte sequence=] which |chunk| represents.
1. Set |stream|.{{[[PendingOperation]]}} to |promise|.
1. Let |inFlightWriteRequest| be
|stream|.<a href="https://streams.spec.whatwg.org/#writablestream-inflightwriterequest">inFlightWriteRequest</a>.
1. Let |atomic| be true if [=stream=].{{WebTransportSendStream/[[AtomicWriteRequests]]}}
[=list/contains=] |inFlightWriteRequest|, otherwise false.
1. Run the following steps [=in parallel=]:
1. [=stream/Send=] |bytes| on |stream|.{{WebTransportSendStream/[[InternalStream]]}} and wait for the
operation to complete.
1. If |atomic| is true and the current [=flow control=] window is too small for |bytes| to be sent
in its entirety, then abort the remaining steps and [=queue a network task=] with |transport|
to run these sub-steps:
1. Set |stream|.{{[[PendingOperation]]}} to null.
1. [=Abort all atomic write requests=] on |stream|.
1. Otherwise, [=stream/send=] |bytes| on |stream|.{{WebTransportSendStream/[[InternalStream]]}}
and wait for the operation to complete.
This sending MAY be interleaved with sending of previously queued streams and datagrams,
as well as streams and datagrams yet to be queued to be sent over this transport.

Expand All @@ -1629,20 +1653,22 @@ To <dfn for="WebTransportSendStream">write</dfn> |chunk| to a {{WebTransportSend

Note: The definition of fairness here is [=implementation-defined=].

1. If the previous step failed, abort the remaining steps.
1. If the previous step failed due to a network error, abort the remaining steps.

Note: We don't reject |promise| here because we handle network errors elsewhere, and those steps
reject |stream|.{{[[PendingOperation]]}}.

1. [=Queue a network task=] with |transport| to run these steps:
1. Otherwise, [=queue a network task=] with |transport|
to run these steps:
1. Set |stream|.{{[[PendingOperation]]}} to null.
1. If |stream|.{{WebTransportSendStream/[[AtomicWriteRequests]]}} [=list/contains=] |inFlightWriteRequest|, [=list/remove=] |inFlightWriteRequest|.
1. [=Resolve=] |promise| with undefined.
1. Return |promise|.

Note: The [=fulfilled|fulfillment=] of the promise returned from this algorithm (or,
{{WritableStreamDefaultWriter/write|WritableStreamDefaultWriter.write}}) does **NOT** necessarily mean that the chunk is acked by
{{WritableStreamDefaultWriter/write(chunk)}}) does **NOT** necessarily mean that the chunk is acked by
the server [[!QUIC]]. It may just mean that the chunk is appended to the buffer. To make sure that
the chunk arrives at the server, use an application-level protocol.
the chunk arrives at the server, the server needs to send an application-level acknowledgment message.

</div>

Expand Down Expand Up @@ -1687,6 +1713,20 @@ To <dfn for="WebTransportSendStream">abort</dfn> a {{WebTransportSendStream}} |s

</div>

<div algorithm>
To <dfn for="WebTransportSendStream">abort all atomic write requests</dfn> on a {{WebTransportSendStream}} |stream|, run these steps:
1. Let |writeRequests| be
|stream|.<a href="https://streams.spec.whatwg.org/#writablestream-writerequests">writeRequests</a>.
1. Let |requestsToAbort| be [=stream=].{{WebTransportSendStream/[[AtomicWriteRequests]]}}.
1. If |writeRequests| [=list/contains=] a promise not in |requestsToAbort|, then
[=WritableStream/error=] |stream| with {{AbortError}}, and abort these steps.
1. [=list/Empty=] [=stream=].{{WebTransportSendStream/[[AtomicWriteRequests]]}}.
1. [=For each=] |promise| in |requestsToAbort|, [=reject=] |promise| with {{AbortError}}.
1. [=In parallel=], [=for each=] |promise| in |requestsToAbort|, abort the
[=stream/send|sending=] of bytes associated with |promise|.

</div>

## STOP_SENDING signal coming from the server ## {#send-stream-STOP_SENDING}

<div algorithm="STOP_SENDING signal">
Expand Down Expand Up @@ -2116,6 +2156,66 @@ object |transport|, and a |sendOrder|, run these steps.

</div>

# `WebTransportWriter` Interface # {#web-transport-writer-interface}

{{WebTransportWriter}} is a subclass of {{WritableStreamDefaultWriter}} that
adds one method.

A {{WebTransportWriter}} is always created by the
[=WebTransportWriter/create=] procedure.

<pre class="idl">
[Exposed=*, SecureContext]
interface WebTransportWriter : WritableStreamDefaultWriter {
Promise&lt;undefined&gt; atomicWrite(optional any chunk);
};
</pre>

## Methods ## {#web-transport-writer-methods}

: <dfn for="WebTransportWriter" method>atomicWrite(chunk)</dfn>
:: The {{atomicWrite}} method will reject if the |chunk| given to it
could not be sent in its entirety within the [=flow control=] window that
is current at the time of sending. This behavior is designed to satisfy niche
transactional applications sensitive to [=flow control=] deadlocks ([[RFC9308]]
[Section 4.4](https://datatracker.ietf.org/doc/html/rfc9308#section-4.4)).

Note: {{atomicWrite}} can still reject after sending some data. Though it
provides atomicity with respect to flow control, other errors may occur.
{{atomicWrite}} does not prevent data from being split between packets
or being interleaved with other data. Only the sender learns if
{{atomicWrite}} fails due to lack of available flow control credit.

Note: Atomic writes can still block if queued behind non-atomic writes. If
the atomic write is rejected, everything queued behind it at that moment
will be rejected as well. Any non-atomic writes rejected in this way will
[=WritableStream/error=] the stream. Applications are therefore encouraged to
always await atomic writes.

When {{atomicWrite}} is called, the user agent MUST run the following steps:
1. Let |p| be the result of {{WritableStreamDefaultWriter/write(chunk)}}
on {{WritableStreamDefaultWriter}} with |chunk|.
1. [=set/Append=] |p| to |stream|.{{WebTransportSendStream/[[AtomicWriteRequests]]}}.
1. Return the result of [=reacting=] to |p| with the following steps:
1. If |stream|.{{WebTransportSendStream/[[AtomicWriteRequests]]}} [=list/contains=] |p|,
[=list/remove=] |p|.
1. If |p| was rejected with reason |r|, then return [=a promise rejected with=] |r|.
1. Return undefined.

## Procedures ## {#web-transport-writer-procedures}

<div algorithm="create a writer">

To <dfn export for="WebTransportWriter" lt="create|creating">create</dfn> a
{{WebTransportWriter}}, with a {{WebTransportSendStream}} |stream|, run these
steps:
1. Let |writer| be a [=new=] {{WebTransportWriter}}.
1. Run the [new WritableStreamDefaultWriter(stream)](https://streams.spec.whatwg.org/#default-writer-constructor)
constructor steps passing |writer| as this, and |stream| as the constructor argument.
1. Return |writer|.

</div>

# `WebTransportError` Interface # {#web-transport-error-interface}

<dfn interface>WebTransportError</dfn> is a subclass of {{DOMException}} that represents
Expand Down Expand Up @@ -2260,7 +2360,7 @@ converted to an httpErrorCode, and vice versa, as specified in [[!WEB-TRANSPORT-
<td>[=stream/Send|sends=] STREAM with FIN bit set</td>
</tr>
<tr>
<td>{{WebTransportBidirectionalStream/writable}}.getWriter().{{WritableStreamDefaultWriter/write}}()</td>
<td>{{WebTransportBidirectionalStream/writable}}.getWriter().{{WritableStreamDefaultWriter/write(chunk)}}()</td>
<td>[=stream/Send|sends=] STREAM</td>
</tr>
<tr>
Expand Down Expand Up @@ -2784,6 +2884,31 @@ async function receiveText(url, createWritableStreamForTextData) {
}
</pre>

## Sending a transactional chunk on a stream ## {#example-transactional-stream}

*This section is non-normative.*

Sending a transactional piece of data on a unidirectional stream, only if it can be done
entirely without blocking on [=flow control=], can be achieved by using the
{{WebTransportSendStream/getWriter}} function and the resulting writer.

<pre class="example" highlight="js">
async function sendTransactionalData(wt, bytes) {
const writable = await wt.createUnidirectionalStream();
const writer = writable.getWriter();
await writer.ready;
try {
await writer.atomicWrite(bytes);
} catch (e) {
if (e.name != "AbortError") throw e;
// rejected to avoid blocking on flow control
// The writable remains un-errored provided no non-atomic writes are pending
} finally {
writer.releaseLock();
}
}
</pre>

## Complete example ## {#example-complete}

*This section is non-normative.*
Expand Down

0 comments on commit 989aaa5

Please sign in to comment.