From 623edf7f20f014d4976854c1f198e4ce41ac3b9f Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Thu, 27 Aug 2015 15:35:49 -0700 Subject: [PATCH] refactor(Subscriber): make Observer an interface, inherit from Subscription Changed Observer to an interface throughout the app since we were not using it directly as a class anywhere Fixed macro perf tests that were calling rather than for some reason closes #224 --- perf/macro/flatMap-scalar/perf.js | 2 +- perf/macro/flatMap/perf.js | 2 +- src/Observer.ts | 44 ++++----------------- src/Subject.ts | 25 +++++------- src/Subscriber.ts | 48 +++++++++++++++++------ src/Subscription.ts | 50 ++++++++++++------------ src/observables/ConnectableObservable.ts | 2 +- src/operators/combineLatest.ts | 6 +-- src/operators/expand.ts | 6 +-- src/operators/flatMap.ts | 9 +++-- src/operators/merge.ts | 6 +-- src/operators/skip.ts | 2 +- src/operators/skipUntil.ts | 2 +- src/operators/take.ts | 2 +- src/operators/withLatestFrom.ts | 2 +- src/operators/zip.ts | 6 +-- 16 files changed, 100 insertions(+), 114 deletions(-) diff --git a/perf/macro/flatMap-scalar/perf.js b/perf/macro/flatMap-scalar/perf.js index 0a7dd644fc..cec675d1fc 100644 --- a/perf/macro/flatMap-scalar/perf.js +++ b/perf/macro/flatMap-scalar/perf.js @@ -18,7 +18,7 @@ var RxNextTestObservable = new RxNext.Observable(function(observer) { while(++index < numIterations) { observer.next(index); } - observer.completed(); + observer.complete(); }); var Rx2TestObservable = Rx.Observable.create(function(observer) { diff --git a/perf/macro/flatMap/perf.js b/perf/macro/flatMap/perf.js index ce79767bb6..1c3194d5e6 100644 --- a/perf/macro/flatMap/perf.js +++ b/perf/macro/flatMap/perf.js @@ -15,7 +15,7 @@ var RxNextTestObservable = new RxNext.Observable(function(observer) { while(++index < numIterations) { observer.next(index); } - observer.completed(); + observer.complete(); }); var Rx2TestObservable = Rx.Observable.create(function(observer) { diff --git a/src/Observer.ts b/src/Observer.ts index c4159d26b2..9514ed98cb 100644 --- a/src/Observer.ts +++ b/src/Observer.ts @@ -2,41 +2,11 @@ import noop from './util/noop'; import throwError from './util/throwError'; import tryOrOnError from './util/tryOrOnError'; -export default class Observer { - - destination: Observer; - - constructor(destination?: Observer) { - if (!destination) { - return; - } - (typeof destination.next === "function") || (destination.next = noop); - (typeof destination.error === "function") || (destination.error = throwError); - (typeof destination.complete === "function") || (destination.complete = noop); - this.destination = destination; - } - - next(value?): void { - this._next(value); - } - - error(error?): void { - this._error(error); - } - - complete(): void { - this._complete(); - } - - _next(value?) { - this.destination.next(value); - } - - _error(error?) { - this.destination.error(error); - } - - _complete() { - this.destination.complete(); - } +interface Observer { + next(value: T): void; + error(err?: any): void; + complete(): void; + isUnsubscribed: boolean; } + +export default Observer; \ No newline at end of file diff --git a/src/Subject.ts b/src/Subject.ts index 92327b2c7a..559a2cdd81 100644 --- a/src/Subject.ts +++ b/src/Subject.ts @@ -18,9 +18,11 @@ const _subscriberComplete = Subscriber.prototype._complete; const _observableSubscribe = Observable.prototype._subscribe; export default class Subject extends Observable implements Observer, Subscription { - + _subscriptions: Subscription[]; + _unsubscribe: () => void; + static create(source: Observable, destination: Observer): Subject { - return new BidiSubject(source, destination); + return new BidirectionalSubject(source, destination); } destination: Observer; @@ -34,12 +36,12 @@ export default class Subject extends Observable implements Observer, Su completeSignal: boolean = false; lift(operator: Operator): Observable { - const subject = new BidiSubject(this, this.destination || this); + const subject = new BidirectionalSubject(this, this.destination || this); subject.operator = operator; return subject; } - _subscribe(subscriber) { + _subscribe(subscriber: Observer) : Subscription { if (subscriber.isUnsubscribed) { return; @@ -165,23 +167,14 @@ export default class Subject extends Observable implements Observer, Su } } -export class SubjectSubscription implements Subscription { - +export class SubjectSubscription extends Subscription { isUnsubscribed: boolean = false; constructor(public subject: Subject, public observer: Observer) { - } - - add(x?) { - subscriptionAdd.call(this, x); - } - - remove(x?) { - subscriptionRemove.call(this, x); + super(); } unsubscribe() { - if (this.isUnsubscribed) { return; } @@ -205,7 +198,7 @@ export class SubjectSubscription implements Subscription { } } -class BidiSubject extends Subject { +class BidirectionalSubject extends Subject { constructor(source: Observable, destination: Observer) { super(); diff --git a/src/Subscriber.ts b/src/Subscriber.ts index 0fc415dfb8..62bd43ef23 100644 --- a/src/Subscriber.ts +++ b/src/Subscriber.ts @@ -5,12 +5,14 @@ import tryOrOnError from './util/tryOrOnError'; import Observer from './Observer'; import Subscription from './Subscription'; -const subscriptionAdd = Subscription.prototype.add; -const subscriptionRemove = Subscription.prototype.remove; -const subscriptionUnsubscribe = Subscription.prototype.unsubscribe; -export default class Subscriber extends Observer implements Subscription { +export default class Subscriber extends Subscription implements Observer { + protected destination: Observer; + private _subscription: Subscription; + + private _isUnsubscribed: boolean = false; + static create(next ?: (x?) => void, error ?: (e?) => void, complete?: () => void): Subscriber { @@ -20,12 +22,23 @@ export default class Subscriber extends Observer implements Subscription; - _isUnsubscribed: boolean = false; + + _next(value: T) { + this.destination.next(value); + } + + _error(err: any) { + this.destination.error(err); + } + + _complete() { + this.destination.complete(); + } constructor(destination?: Observer) { - super(destination); + super(); + this.destination = destination; + if (!destination) { return; } @@ -57,14 +70,23 @@ export default class Subscriber extends Observer implements Subscription|Function|void) { // route add to the shared Subscription if it exists - subscriptionAdd.call(this._subscription || this, sub); + const _subscription = this._subscription; + if (_subscription) { + _subscription.add(sub); + } else { + super.add(sub); + } } - remove(sub?) { + remove(sub: Subscription) { // route remove to the shared Subscription if it exists - subscriptionRemove.call(this._subscription || this, sub); + if (this._subscription) { + this._subscription.remove(sub); + } else { + super.remove(sub); + } } unsubscribe() { @@ -73,7 +95,7 @@ export default class Subscriber extends Observer implements Subscription { - public static empty: Subscription = ((empty) => { +export default class Subscription { + public static EMPTY: Subscription = (function(empty){ empty.isUnsubscribed = true; return empty; - })(new Subscription()); + }(new Subscription())); isUnsubscribed: boolean = false; + _subscriptions: Subscription[]; + + _unsubscribe(): void { + } + constructor(_unsubscribe?: () => void) { - // hide `_unsubscribe` from TypeScript so we can implement Subscription - if(_unsubscribe) { - ( this)._unsubscribe = _unsubscribe; + if (_unsubscribe) { + this._unsubscribe = _unsubscribe; } } @@ -22,13 +26,12 @@ export default class Subscription { this.isUnsubscribed = true; - const self = ( this); - const unsubscribe = self._unsubscribe; - const subscriptions = self._subscriptions; - - self._subscriptions = void 0; + const unsubscribe = this._unsubscribe; + const subscriptions = this._subscriptions; - if (unsubscribe != null) { + this._subscriptions = void 0; + + if (unsubscribe) { unsubscribe.call(this); } @@ -42,22 +45,20 @@ export default class Subscription { } } - add(subscription?: Subscription | Function | void): void { - + add(subscription: Subscription|Function|void): void { // return early if: // 1. the subscription is null - // 2. we're attempting to add ourself + // 2. we're attempting to add our this // 3. we're attempting to add the static `empty` Subscription - if (subscription == null || ( + if (!subscription || ( subscription === this) || ( - subscription === Subscription.empty)) { + subscription === Subscription.EMPTY)) { return; } - const self = ( this); let sub = (> subscription); - switch(typeof sub) { + switch(typeof subscription) { case "function": sub = new Subscription(<(() => void) > subscription); case "object": @@ -66,7 +67,7 @@ export default class Subscription { } else if (this.isUnsubscribed) { sub.unsubscribe(); } else { - const subscriptions = self._subscriptions || (self._subscriptions = []); + const subscriptions = this._subscriptions || (this._subscriptions = []); subscriptions.push(sub); } break; @@ -75,20 +76,19 @@ export default class Subscription { } } - remove(subscription?: Subscription): void { + remove(subscription: Subscription): void { // return early if: // 1. the subscription is null - // 2. we're attempting to remove ourself + // 2. we're attempting to remove ourthis // 3. we're attempting to remove the static `empty` Subscription if (subscription == null || ( subscription === this) || ( - subscription === Subscription.empty)) { + subscription === Subscription.EMPTY)) { return; } - const self = ( this); - const subscriptions = self._subscriptions; + const subscriptions = this._subscriptions; if (subscriptions) { const subscriptionIndex = subscriptions.indexOf(subscription); diff --git a/src/observables/ConnectableObservable.ts b/src/observables/ConnectableObservable.ts index 2270c8e33d..5272d5981a 100644 --- a/src/observables/ConnectableObservable.ts +++ b/src/observables/ConnectableObservable.ts @@ -74,7 +74,7 @@ export class RefCountObservable extends Observable { export class RefCountSubscription extends Subscription { - constructor(protected refCountObservable: RefCountObservable) { + constructor(private refCountObservable: RefCountObservable) { super(); } diff --git a/src/operators/combineLatest.ts b/src/operators/combineLatest.ts index 3030d25dbb..35f1882053 100644 --- a/src/operators/combineLatest.ts +++ b/src/operators/combineLatest.ts @@ -49,7 +49,7 @@ export class CombineLatestSubscriber extends ZipSubscriber { } _subscribeInner(observable, values, index, total) { - return observable.subscribe(new CombineLatestInnerSubscriber(this, values, index, total)); + return observable.subscribe(new CombineLatestInnerSubscriber(this.destination, this, values, index, total)); } _innerComplete(innerSubscriber) { @@ -61,8 +61,8 @@ export class CombineLatestSubscriber extends ZipSubscriber { export class CombineLatestInnerSubscriber extends ZipInnerSubscriber { - constructor(parent: ZipSubscriber, values: any, index : number, total : number) { - super(parent, values, index, total); + constructor(destination: Observer, parent: ZipSubscriber, values: any, index : number, total : number) { + super(destination, parent, values, index, total); } _next(x) { diff --git a/src/operators/expand.ts b/src/operators/expand.ts index 0fc4074725..fbc2c8ee3b 100644 --- a/src/operators/expand.ts +++ b/src/operators/expand.ts @@ -54,14 +54,14 @@ export class ExpandSubscriber extends MergeSubscriber { } else if(observable instanceof EmptyObservable) { this._innerComplete(); } else { - return observable.subscribe(new ExpandInnerSubscriber(this)); + return observable.subscribe(new ExpandInnerSubscriber(this.destination, this)); } } } export class ExpandInnerSubscriber extends MergeInnerSubscriber { - constructor(parent: ExpandSubscriber) { - super(parent); + constructor(destination: Observer, parent: ExpandSubscriber) { + super(destination, parent); } _next(value) { this.destination.next(value); diff --git a/src/operators/flatMap.ts b/src/operators/flatMap.ts index 81e204eb2f..c66b455258 100644 --- a/src/operators/flatMap.ts +++ b/src/operators/flatMap.ts @@ -60,12 +60,12 @@ export class FlatMapSubscriber extends MergeSubscriber { _subscribeInner(observable, value, index) { const projectResult = this.projectResult; if(projectResult) { - return observable.subscribe(new FlatMapInnerSubscriber(this, value, index, projectResult)); + return observable.subscribe(new FlatMapInnerSubscriber(this.destination, this, value, index, projectResult)); } else if(observable instanceof ScalarObservable) { this.destination.next((> observable).value); this._innerComplete(); } else { - return observable.subscribe(new MergeInnerSubscriber(this)); + return observable.subscribe(new MergeInnerSubscriber(this.destination, this)); } } } @@ -77,11 +77,12 @@ export class FlatMapInnerSubscriber extends MergeInnerSubscriber { project: (x: T, y: any, ix: number, iy: number) => R; count: number = 0; - constructor(parent: FlatMapSubscriber, + constructor(destination: Observer, + parent: FlatMapSubscriber, value: any, index: number, project?: (x: T, y: any, ix: number, iy: number) => R) { - super(parent); + super(destination, parent); this.value = value; this.index = index; this.project = project; diff --git a/src/operators/merge.ts b/src/operators/merge.ts index a49c2bb301..dee2023427 100644 --- a/src/operators/merge.ts +++ b/src/operators/merge.ts @@ -102,7 +102,7 @@ export class MergeSubscriber extends Subscriber { this.destination.next((> observable).value); this._innerComplete(); } else { - return observable.subscribe(new MergeInnerSubscriber(this)); + return observable.subscribe(new MergeInnerSubscriber(this.destination, this)); } } @@ -125,8 +125,8 @@ export class MergeInnerSubscriber extends Subscriber { parent: MergeSubscriber; - constructor(parent: MergeSubscriber) { - super(parent.destination); + constructor(destination: Observer, parent: MergeSubscriber) { + super(destination); this.parent = parent; } diff --git a/src/operators/skip.ts b/src/operators/skip.ts index a7ee41b6de..3193c7504f 100644 --- a/src/operators/skip.ts +++ b/src/operators/skip.ts @@ -15,7 +15,7 @@ export class SkipOperator implements Operator { } call(observer: Observer): Observer { - return new SkipSubscriber(observer, this.total); + return new SkipSubscriber(observer, this.total); } } diff --git a/src/operators/skipUntil.ts b/src/operators/skipUntil.ts index 48a5bd05d1..d110e82fac 100644 --- a/src/operators/skipUntil.ts +++ b/src/operators/skipUntil.ts @@ -12,7 +12,7 @@ export class SkipUntilOperator implements Operator { } call(observer: Observer): Observer { - return new SkipUntilSubscriber(observer, this.notifier); + return new SkipUntilSubscriber(observer, this.notifier); } } diff --git a/src/operators/take.ts b/src/operators/take.ts index 38bf60eb56..e60cc01535 100644 --- a/src/operators/take.ts +++ b/src/operators/take.ts @@ -15,7 +15,7 @@ export class TakeOperator implements Operator { } call(observer: Observer): Observer { - return new TakeSubscriber(observer, this.total); + return new TakeSubscriber(observer, this.total); } } diff --git a/src/operators/withLatestFrom.ts b/src/operators/withLatestFrom.ts index a6d150e481..d28f7194e6 100644 --- a/src/operators/withLatestFrom.ts +++ b/src/operators/withLatestFrom.ts @@ -16,7 +16,7 @@ export class WithLatestFromOperator implements Operator { constructor(private observables: Observable[], private project: (...values: any[]) => Observable) { } - call(observer: Observer): Observer { + call(observer: Observer): Observer { return new WithLatestFromSubscriber(observer, this.observables, this.project); } } diff --git a/src/operators/zip.ts b/src/operators/zip.ts index f76796b375..f72f1ded31 100644 --- a/src/operators/zip.ts +++ b/src/operators/zip.ts @@ -70,7 +70,7 @@ export class ZipSubscriber extends Subscriber { } _subscribeInner(observable, values, index, total) { - return observable.subscribe(new ZipInnerSubscriber(this, values, index, total)); + return observable.subscribe(new ZipInnerSubscriber(this.destination, this, values, index, total)); } _innerComplete(innerSubscriber) { @@ -98,8 +98,8 @@ export class ZipInnerSubscriber extends Subscriber { total: number; events: number = 0; - constructor(parent: ZipSubscriber, values: any, index : number, total : number) { - super(parent.destination); + constructor(destination: Observer, parent: ZipSubscriber, values: any, index : number, total : number) { + super(destination); this.parent = parent; this.values = values; this.index = index;