Skip to content

Commit

Permalink
fix(Subscription): fix leaks caused by unsubscribe functions that throw
Browse files Browse the repository at this point in the history
- adds a test for the issue

related #1256
  • Loading branch information
benlesh committed Jan 27, 2016
1 parent 1df8928 commit 9e88c2e
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 9 deletions.
39 changes: 39 additions & 0 deletions spec/Subscription-spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/* globals describe, it, expect */
var Rx = require('../dist/cjs/Rx');
var Subscription = Rx.Subscription;
var Observable = Rx.Observable;

describe('Subscription', function () {
it('should not leak', function (done) {
var tearDowns = [];

var source1 = Observable.create(function (observer) {
return function () {
tearDowns.push(1);
};
});

var source2 = Observable.create(function (observer) {
return function () {
tearDowns.push(2);
throw new Error('oops, I am a bad unsubscribe!');
};
});

var source3 = Observable.create(function (observer) {
return function () {
tearDowns.push(3);
};
});

var subscription = Observable.merge(source1, source2, source3).subscribe();

setTimeout(function () {
expect(function () {
subscription.unsubscribe();
}).toThrow();
expect(tearDowns).toEqual([1, 2, 3]);
done();
});
});
});
5 changes: 3 additions & 2 deletions src/Rx.DOM.ts
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ import './add/operator/zip';
import './add/operator/zipAll';

/* tslint:disable:no-unused-variable */
import {Subscription} from './Subscription';
import {Subscription, UnsubscriptionError} from './Subscription';
import {Subscriber} from './Subscriber';
import {AsyncSubject} from './subject/AsyncSubject';
import {ReplaySubject} from './subject/ReplaySubject';
Expand Down Expand Up @@ -159,5 +159,6 @@ export {
Notification,
EmptyError,
ArgumentOutOfRangeError,
ObjectUnsubscribedError
ObjectUnsubscribedError,
UnsubscriptionError
};
3 changes: 2 additions & 1 deletion src/Rx.KitchenSink.ts
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ import './add/operator/zipAll';

/* tslint:disable:no-unused-variable */
import {Observer} from './Observer';
import {Subscription} from './Subscription';
import {Subscription, UnsubscriptionError} from './Subscription';
import {Subscriber} from './Subscriber';
import {AsyncSubject} from './subject/AsyncSubject';
import {ReplaySubject} from './subject/ReplaySubject';
Expand Down Expand Up @@ -185,6 +185,7 @@ export {
EmptyError,
ArgumentOutOfRangeError,
ObjectUnsubscribedError,
UnsubscriptionError,
TestScheduler,
VirtualTimeScheduler,
TimeInterval,
Expand Down
5 changes: 3 additions & 2 deletions src/Rx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ import './add/operator/zipAll';
/* tslint:disable:no-unused-variable */
import {Operator} from './Operator';
import {Observer} from './Observer';
import {Subscription} from './Subscription';
import {Subscription, UnsubscriptionError} from './Subscription';
import {Subscriber} from './Subscriber';
import {AsyncSubject} from './subject/AsyncSubject';
import {ReplaySubject} from './subject/ReplaySubject';
Expand Down Expand Up @@ -158,5 +158,6 @@ export {
Notification,
EmptyError,
ArgumentOutOfRangeError,
ObjectUnsubscribedError
ObjectUnsubscribedError,
UnsubscriptionError
};
37 changes: 33 additions & 4 deletions src/Subscription.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import {isArray} from './util/isArray';
import {isObject} from './util/isObject';
import {isFunction} from './util/isFunction';
import {tryCatch} from './util/tryCatch';
import {errorObject} from './util/errorObject';

export class Subscription {
public static EMPTY: Subscription = (function(empty: any){
Expand All @@ -17,6 +19,8 @@ export class Subscription {
}

unsubscribe(): void {
let hasErrors = false;
let errors: any[];

if (this.isUnsubscribed) {
return;
Expand All @@ -29,7 +33,11 @@ export class Subscription {
(<any> this)._subscriptions = null;

if (isFunction(_unsubscribe)) {
_unsubscribe.call(this);
let trial = tryCatch(_unsubscribe).call(this);
if (trial === errorObject) {
hasErrors = true;
(errors = errors || []).push(errorObject.e);
}
}

if (isArray(_subscriptions)) {
Expand All @@ -38,12 +46,26 @@ export class Subscription {
const len = _subscriptions.length;

while (++index < len) {
const subscription = _subscriptions[index];
if (isObject(subscription)) {
subscription.unsubscribe();
const sub = _subscriptions[index];
if (isObject(sub)) {
let trial = tryCatch(sub.unsubscribe).call(sub);
if (trial === errorObject) {
hasErrors = true;
errors = errors || [];
let err = errorObject.e;
if (err instanceof UnsubscriptionError) {
errors = errors.concat(err.errors);
} else {
errors.push(err);
}
}
}
}
}

if (hasErrors) {
throw new UnsubscriptionError(errors);
}
}

add(subscription: Subscription | Function | void): void {
Expand Down Expand Up @@ -98,3 +120,10 @@ export class Subscription {
}
}
}

export class UnsubscriptionError extends Error {
constructor(public errors: any[]) {
super('unsubscriptoin error(s)');
this.name = 'UnsubscriptionError';
}
}

0 comments on commit 9e88c2e

Please sign in to comment.