Skip to content

Commit

Permalink
fix(node): Subscriber no longer trampled if from another copy of rxjs
Browse files Browse the repository at this point in the history
Only fixes v6 and above, fix coming for v5 (stable) shortly

related #3475
  • Loading branch information
benlesh committed Mar 26, 2018
1 parent f5eda97 commit 371b658
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 7 deletions.
2 changes: 1 addition & 1 deletion node-tests/test.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ var id = setTimeout(function () {
}, 200);

of1(0).pipe(
mergeMap1(function () { return of(x); }),
mergeMap1(function (x) { return of(x); }),
mergeMap(function () { return from1(Promise.resolve(1)); })
).subscribe({
next: function (value) { actual.push(value); },
Expand Down
2 changes: 1 addition & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 12 additions & 5 deletions src/internal/Subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,13 @@ export class Subscriber<T> extends Subscription implements Observer<T> {
break;
}
if (typeof destinationOrNext === 'object') {
if (destinationOrNext instanceof Subscriber) {
this.syncErrorThrowable = destinationOrNext.syncErrorThrowable;
this.destination = (<Subscriber<any>> destinationOrNext);
(<any> this.destination).add(this);
// HACK(benlesh): For situations where Node has multiple copies of rxjs in
// node_modules, we cannot rely on `instanceof` checks
if (isTrustedSubscriber(destinationOrNext)) {
const trustedSubscriber = destinationOrNext[rxSubscriberSymbol]() as Subscriber<any>;
this.syncErrorThrowable = trustedSubscriber.syncErrorThrowable;
this.destination = trustedSubscriber;
trustedSubscriber.add(this);
} else {
this.syncErrorThrowable = true;
this.destination = new SafeSubscriber<T>(this, <PartialObserver<any>> destinationOrNext);
Expand Down Expand Up @@ -299,4 +302,8 @@ class SafeSubscriber<T> extends Subscriber<T> {
this._parentSubscriber = null;
_parentSubscriber.unsubscribe();
}
}
}

function isTrustedSubscriber(obj: any) {
return obj instanceof Subscriber || ('syncErrorThrowable' in obj && obj[rxSubscriberSymbol]);
}

0 comments on commit 371b658

Please sign in to comment.