Skip to content

Commit

Permalink
DOM: Don't use dependent AbortSignals for Observable Subscribers
Browse files Browse the repository at this point in the history
This CL introduces subtle timing differences in Subscriber abortion and
teardown execution, across Subscribers in a chain of Observables. These
timing differences are the result of no longer using the DOM Standard's
dependent AbortSignal concept for Observables that are chained together.

For a full description of this change, see
WICG/observable#154.

Bug: 40282760
Change-Id: I4feb6f9ad67e2dd7d7a4d5ec51fdecebc4e6ae18
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/5676226
Reviewed-by: Joey Arhar <jarhar@chromium.org>
Commit-Queue: Dominic Farolino <dom@chromium.org>
Cr-Commit-Position: refs/heads/main@{#1325506}
  • Loading branch information
domfarolino authored and sadym-chromium committed Jul 18, 2024
1 parent 4c0c325 commit d4464f8
Show file tree
Hide file tree
Showing 8 changed files with 160 additions and 22 deletions.
143 changes: 140 additions & 3 deletions dom/observable/tentative/observable-constructor.any.js
Original file line number Diff line number Diff line change
Expand Up @@ -688,17 +688,154 @@ test(() => {
assert_array_equals(results, ['subscribe() callback']);
ac.abort();
results.push('abort() returned');
// The reason the "inner" abort event handler is invoked first is because the
// "inner" AbortSignal is not a dependent signal (that would ordinarily get
// aborted after the parent, aka "outer" signal, is completely finished being
// aborted). Instead, the order of operations looks like this:
// 1. "Outer" signal begins to be aborted
// 2. Its abort algorithms [1] run [2]; the internal abort algorithm here is
// the "inner" Subscriber's "Close a subscription" [0].
// a. This signals abort on the "inner" Subscriber's signal, firing the
// abort event
// b. Then, the "inner" Subscriber's teardowns run.
// 3. Once the "outer" signal's abort algorithms are finished, the abort
// event is fired [3], triggering the outer abort handler.
//
// [0]: https://wicg.github.io/observable/#close-a-subscription
// [1]: https://dom.spec.whatwg.org/#abortsignal-abort-algorithms
// [2]: https://dom.spec.whatwg.org/#ref-for-abortsignal-abort-algorithms%E2%91%A2:~:text=For%20each%20algorithm%20of%20signal%E2%80%99s%20abort%20algorithms%3A%20run%20algorithm
// [3]: https://dom.spec.whatwg.org/#abortsignal-signal-abort:~:text=Fire%20an%20event%20named%20abort%20at%20signal
assert_array_equals(results, [
'subscribe() callback',
'outer abort handler', 'teardown 2', 'teardown 1',
'inner abort handler', 'abort() returned',
'subscribe() callback', 'inner abort handler', 'teardown 2', 'teardown 1',
'outer abort handler', 'abort() returned',
]);
assert_false(activeDuringTeardown1, 'should not be active during teardown callback 1');
assert_false(activeDuringTeardown2, 'should not be active during teardown callback 2');
assert_true(abortedDuringTeardown1, 'should be aborted during teardown callback 1');
assert_true(abortedDuringTeardown2, 'should be aborted during teardown callback 2');
}, "Unsubscription lifecycle");

// In the usual consumer-initiated unsubscription case, when the AbortController
// is aborted after subscription, teardowns run from upstream->downstream. This
// is because for a given Subscriber, when a downstream signal is aborted
// (`ac.signal` in this case), the "Close" algorithm prompts the Subscriber to
// first abort *its* own signal (the one accessible via `Subscriber#signal`) and
// then run its teardowns.
//
// This means upstream Subscribers get the first opportunity their teardowns
// before the control flow is returned to downstream Subscribers to run *their*
// teardowns (after they abort their internal signal).
test(() => {
const results = [];
const upstream = new Observable(subscriber => {
subscriber.signal.addEventListener('abort',
e => results.push('upstream abort handler'), {once: true});
subscriber.addTeardown(
() => results.push(`upstream teardown. reason: ${subscriber.signal.reason}`));
});
const middle = new Observable(subscriber => {
subscriber.signal.addEventListener('abort',
e => results.push('middle abort handler'), {once: true});
subscriber.addTeardown(
() => results.push(`middle teardown. reason: ${subscriber.signal.reason}`));
upstream.subscribe({}, {signal: subscriber.signal});
});
const downstream = new Observable(subscriber => {
subscriber.signal.addEventListener('abort',
e => results.push('downstream abort handler'), {once: true});
subscriber.addTeardown(
() => results.push(`downstream teardown. reason: ${subscriber.signal.reason}`));
middle.subscribe({}, {signal: subscriber.signal});
});

const ac = new AbortController();
downstream.subscribe({}, {signal: ac.signal});
ac.abort('Abort!');
assert_array_equals(results, [
'upstream abort handler',
'upstream teardown. reason: Abort!',
'middle abort handler',
'middle teardown. reason: Abort!',
'downstream abort handler',
'downstream teardown. reason: Abort!',
]);
}, "Teardowns are called in upstream->downstream order on " +
"consumer-initiated unsubscription");

// This test is like the above, but asserts the exact opposite order of
// teardowns. This is because, since the Subscriber's signal is aborted
// immediately upon construction, `addTeardown()` runs teardowns synchronously
// in subscriber-order, which goes from downstream->upstream.
test(() => {
const results = [];
const upstream = new Observable(subscriber => {
subscriber.addTeardown(
() => results.push(`upstream teardown. reason: ${subscriber.signal.reason}`));
});
const middle = new Observable(subscriber => {
subscriber.addTeardown(
() => results.push(`middle teardown. reason: ${subscriber.signal.reason}`));
upstream.subscribe({}, {signal: subscriber.signal});
});
const downstream = new Observable(subscriber => {
subscriber.addTeardown(
() => results.push(`downstream teardown. reason: ${subscriber.signal.reason}`));
middle.subscribe({}, {signal: subscriber.signal});
});

downstream.subscribe({}, {signal: AbortSignal.abort('Initial abort')});
assert_array_equals(results, [
"downstream teardown. reason: Initial abort",
"middle teardown. reason: Initial abort",
"upstream teardown. reason: Initial abort",
]);
}, "Teardowns are called in downstream->upstream order on " +
"consumer-initiated unsubscription with pre-aborted Signal");

// Producer-initiated unsubscription test, capturing the ordering of abort events and teardowns.
test(() => {
const results = [];

const source = new Observable(subscriber => {
subscriber.addTeardown(() => results.push('source teardown'));
subscriber.signal.addEventListener('abort',
e => results.push('source abort event'));
});

const middle = new Observable(subscriber => {
subscriber.addTeardown(() => results.push('middle teardown'));
subscriber.signal.addEventListener('abort',
e => results.push('middle abort event'));

source.subscribe(() => {}, {signal: subscriber.signal});
});

let innerSubscriber = null;
const downstream = new Observable(subscriber => {
innerSubscriber = subscriber;
subscriber.addTeardown(() => results.push('downstream teardown'));
subscriber.signal.addEventListener('abort',
e => results.push('downstream abort event'));

middle.subscribe(() => {}, {signal: subscriber.signal});
});

downstream.subscribe();

// Trigger a producer-initiated unsubscription from the most-downstream Observable.
innerSubscriber.complete();

assert_array_equals(results, [
'source abort event',
'source teardown',
'middle abort event',
'middle teardown',
'downstream abort event',
'downstream teardown',
]);
}, "Producer-initiated unsubscription in a downstream Observable fires abort " +
"events before each teardown, in downstream->upstream order");

test(t => {
let innerSubscriber = null;
const source = new Observable(subscriber => {
Expand Down
5 changes: 3 additions & 2 deletions dom/observable/tentative/observable-filter.any.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ test(() => {
subscriber.next(1);
assert_true(teardownCalled, "Teardown called once map unsubscribes due to error");
assert_false(subscriber.active, "Unsubscription makes Subscriber inactive");
results.push(subscriber.signal.reason);
subscriber.next(2);
subscriber.complete();
});
Expand All @@ -44,7 +45,7 @@ test(() => {
complete: () => results.push("complete"),
});

assert_array_equals(results, [error]);
assert_array_equals(results, [error, error]);
}, "filter(): Errors thrown in filter predicate are emitted to Observer error() handler");

test(() => {
Expand Down Expand Up @@ -100,7 +101,7 @@ test(() => {
});

assert_array_equals(results,
['source teardown', 'source abort event', 'filter observable complete']);
['source abort event', 'source teardown', 'filter observable complete']);
}, "filter(): Upon source completion, source Observable teardown sequence " +
"happens after downstream filter complete() is called");

Expand Down
4 changes: 2 additions & 2 deletions dom/observable/tentative/observable-first.any.js
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ promise_test(async () => {
"calling first",
"source subscribe",
"before source next 1",
"source teardown",
"source abort",
"source teardown",
"after source next 1"
], "Array values after first() is called");

Expand All @@ -106,8 +106,8 @@ promise_test(async () => {
"calling first",
"source subscribe",
"before source next 1",
"source teardown",
"source abort",
"source teardown",
"after source next 1",
"first resolved with: 1",
], "Array values after Promise is awaited");
Expand Down
4 changes: 2 additions & 2 deletions dom/observable/tentative/observable-last.any.js
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ promise_test(async () => {
"before source next 1",
"after source next 1",
"before source complete",
"source teardown",
"source abort",
"source teardown",
"after source complete",
], "Array values after last() is called");

Expand All @@ -105,8 +105,8 @@ promise_test(async () => {
"before source next 1",
"after source next 1",
"before source complete",
"source teardown",
"source abort",
"source teardown",
"after source complete",
"last resolved with: 1",
], "Array values after Promise is awaited");
Expand Down
2 changes: 1 addition & 1 deletion dom/observable/tentative/observable-map.any.js
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ test(() => {
});

assert_array_equals(results,
['source teardown', 'source abort event', 'map observable complete']);
['source abort event', 'source teardown', 'map observable complete']);
}, "map(): Upon source completion, source Observable teardown sequence " +
"happens before downstream mapper complete() is called");

Expand Down
4 changes: 2 additions & 2 deletions dom/observable/tentative/observable-switchMap.any.js
Original file line number Diff line number Diff line change
Expand Up @@ -212,10 +212,10 @@ test(() => {

ac.abort();
assert_array_equals(results, [
"source teardown",
"source onabort",
"inner teardown",
"source teardown",
"inner onabort",
"inner teardown",
], "Unsubscription order is correct");
}, "switchMap(): should unsubscribe in the correct order when user aborts " +
"the subscription");
Expand Down
8 changes: 4 additions & 4 deletions dom/observable/tentative/observable-takeUntil.any.js
Original file line number Diff line number Diff line change
Expand Up @@ -179,10 +179,10 @@ promise_test(async t => {
assert_array_equals(results, [
"notifier subscribed",
"source subscribed",
"notifier teardown",
"notifier signal abort",
"source teardown",
"notifier teardown",
"source signal abort",
"source teardown",
"complete callback",
]);
}, "takeUntil: notifier next() unsubscribes from notifier & source observable");
Expand Down Expand Up @@ -235,10 +235,10 @@ promise_test(async t => {
assert_array_equals(results, [
"notifier subscribed",
"source subscribed",
"notifier teardown",
"notifier signal abort",
"notifier teardown",
"source signal abort",
"source teardown",
"source signal abort"
]);
}, "takeUntil()'s AbortSignal unsubscribes from notifier & source observable");

Expand Down
12 changes: 6 additions & 6 deletions dom/observable/tentative/observable-toArray.any.js
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,9 @@ promise_test(async () => {

assert_array_equals(results, [
"Subscribed. active: true",
"Outer signal abort event",
"Teardown",
"Inner signal abort event",
"Teardown",
"Outer signal abort event",
], "Events and teardowns are fired in the right ordered");

// Everything microtask above should be queued up by now, so queue one more
Expand All @@ -163,12 +163,12 @@ promise_test(async () => {
await Promise.resolve();
assert_array_equals(results, [
"Subscribed. active: true",
"Outer signal abort event",
"Teardown",
"Inner signal abort event",
"Outer signal Promise",
"Teardown Promise",
"Teardown",
"Outer signal abort event",
"Inner signal Promise",
"Teardown Promise",
"Outer signal Promise",
], "Promises resolve in the right order");
}, "Operator Promise abort ordering");

0 comments on commit d4464f8

Please sign in to comment.