diff --git a/spec/observables/using-spec.ts b/spec/observables/using-spec.ts new file mode 100644 index 0000000000..078c36159d --- /dev/null +++ b/spec/observables/using-spec.ts @@ -0,0 +1,24 @@ +import * as Rx from '../../dist/cjs/Rx.KitchenSink'; +import {it} from '../helpers/test-helper'; + +const Observable = Rx.Observable; +const Subscription = Rx.Subscription; + +describe('Observable.using', () => { + it('should dispose of the resource when the subscription is disposed', (done) => { + let disposed = false; + const source = Observable.using( + () => new Subscription(() => disposed = true), + (resource) => Observable.range(0, 3) + ) + .take(2); + + source.subscribe(); + + if (disposed) { + done(); + } else { + done.fail('disposed should be true but was false'); + } + }); +}); diff --git a/src/Observable.ts b/src/Observable.ts index d1b0662e9a..98f25b3335 100644 --- a/src/Observable.ts +++ b/src/Observable.ts @@ -29,6 +29,7 @@ import {raceStatic} from './operator/race'; import {RangeObservable} from './observable/RangeObservable'; import {NeverObservable} from './observable/NeverObservable'; import {ErrorObservable} from './observable/ErrorObservable'; +import {UsingObservable} from './observable/UsingObservable'; import {AjaxCreationMethod} from './observable/dom/AjaxObservable'; import {WebSocketSubject} from './observable/dom/WebSocketSubject'; @@ -268,6 +269,7 @@ export class Observable implements CoreOperators { static range: typeof RangeObservable.create; static throw: typeof ErrorObservable.create; static timer: typeof TimerObservable.create; + static using: typeof UsingObservable.create; static webSocket: typeof WebSocketSubject.create; static zip: typeof zipStatic; diff --git a/src/Rx.KitchenSink.ts b/src/Rx.KitchenSink.ts index 5ed2620521..ac418b397f 100644 --- a/src/Rx.KitchenSink.ts +++ b/src/Rx.KitchenSink.ts @@ -56,6 +56,7 @@ import './add/observable/never'; import './add/observable/range'; import './add/observable/throw'; import './add/observable/timer'; +import './add/observable/using'; import './add/observable/zip'; // Operators diff --git a/src/add/observable/using.ts b/src/add/observable/using.ts new file mode 100644 index 0000000000..61e00e4dfd --- /dev/null +++ b/src/add/observable/using.ts @@ -0,0 +1,6 @@ +import {Observable} from '../../Observable'; +import {UsingObservable} from '../../observable/UsingObservable'; + +Observable.using = UsingObservable.create; + +export var _void: void; diff --git a/src/observable/UsingObservable.ts b/src/observable/UsingObservable.ts new file mode 100644 index 0000000000..a7b2ca941d --- /dev/null +++ b/src/observable/UsingObservable.ts @@ -0,0 +1,50 @@ +import {Observable} from '../Observable'; +import {Subscriber} from '../Subscriber'; +import {Subscription} from '../Subscription'; + +export class UsingObservable extends Observable { + + static create(resourceFactory: () => Subscription, + observableFactory: (resource: Subscription) => Observable): Observable { + return new UsingObservable(resourceFactory, observableFactory); + } + + constructor(private resourceFactory: () => Subscription, + private observableFactory: (resource: Subscription) => Observable) { + super(); + } + + protected _subscribe(subscriber: Subscriber): Subscription | Function | void { + + const { resourceFactory, observableFactory } = this; + + let resource: Subscription, + source: Observable, + error: any, errorHappened = false; + + try { + resource = resourceFactory(); + } catch (e) { + error = e; + errorHappened = true; + } + + if (errorHappened) { + subscriber.error(error); + } else { + subscriber.add(resource); + try { + source = observableFactory(resource); + } catch (e) { + error = e; + errorHappened = true; + } + + if (errorHappened) { + subscriber.error(error); + } else { + return source.subscribe(subscriber); + } + } + } +}