Skip to content

Commit

Permalink
refactor(Subscriber): make Observer an interface, inherit from Subscr…
Browse files Browse the repository at this point in the history
…iption

Changed Observer to an interface throughout the app since we were not using it directly as a class anywhere
Fixed macro perf tests that were calling  rather than  for some reason

closes #224
  • Loading branch information
benlesh committed Aug 28, 2015
1 parent 249ab8d commit 623edf7
Show file tree
Hide file tree
Showing 16 changed files with 100 additions and 114 deletions.
2 changes: 1 addition & 1 deletion perf/macro/flatMap-scalar/perf.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ var RxNextTestObservable = new RxNext.Observable(function(observer) {
while(++index < numIterations) {
observer.next(index);
}
observer.completed();
observer.complete();
});

var Rx2TestObservable = Rx.Observable.create(function(observer) {
Expand Down
2 changes: 1 addition & 1 deletion perf/macro/flatMap/perf.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ var RxNextTestObservable = new RxNext.Observable(function(observer) {
while(++index < numIterations) {
observer.next(index);
}
observer.completed();
observer.complete();
});

var Rx2TestObservable = Rx.Observable.create(function(observer) {
Expand Down
44 changes: 7 additions & 37 deletions src/Observer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,41 +2,11 @@ import noop from './util/noop';
import throwError from './util/throwError';
import tryOrOnError from './util/tryOrOnError';

export default class Observer<T> {

destination: Observer<any>;

constructor(destination?: Observer<any>) {
if (!destination) {
return;
}
(typeof destination.next === "function") || (destination.next = noop);
(typeof destination.error === "function") || (destination.error = throwError);
(typeof destination.complete === "function") || (destination.complete = noop);
this.destination = destination;
}

next(value?): void {
this._next(value);
}

error(error?): void {
this._error(error);
}

complete(): void {
this._complete();
}

_next(value?) {
this.destination.next(value);
}

_error(error?) {
this.destination.error(error);
}

_complete() {
this.destination.complete();
}
interface Observer<T> {
next(value: T): void;
error(err?: any): void;
complete(): void;
isUnsubscribed: boolean;
}

export default Observer;
25 changes: 9 additions & 16 deletions src/Subject.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ const _subscriberComplete = Subscriber.prototype._complete;
const _observableSubscribe = Observable.prototype._subscribe;

export default class Subject<T> extends Observable<T> implements Observer<T>, Subscription<T> {

_subscriptions: Subscription<T>[];
_unsubscribe: () => void;

static create<T>(source: Observable<T>, destination: Observer<T>): Subject<T> {
return new BidiSubject(source, destination);
return new BidirectionalSubject(source, destination);
}

destination: Observer<T>;
Expand All @@ -34,12 +36,12 @@ export default class Subject<T> extends Observable<T> implements Observer<T>, Su
completeSignal: boolean = false;

lift<T, R>(operator: Operator<T, R>): Observable<T> {
const subject = new BidiSubject(this, this.destination || this);
const subject = new BidirectionalSubject(this, this.destination || this);
subject.operator = operator;
return subject;
}

_subscribe(subscriber) {
_subscribe(subscriber: Observer<any>) : Subscription<T> {

if (subscriber.isUnsubscribed) {
return;
Expand Down Expand Up @@ -165,23 +167,14 @@ export default class Subject<T> extends Observable<T> implements Observer<T>, Su
}
}

export class SubjectSubscription<T> implements Subscription<T> {

export class SubjectSubscription<T> extends Subscription<T> {
isUnsubscribed: boolean = false;

constructor(public subject: Subject<T>, public observer: Observer<any>) {
}

add(x?) {
subscriptionAdd.call(this, x);
}

remove(x?) {
subscriptionRemove.call(this, x);
super();
}

unsubscribe() {

if (this.isUnsubscribed) {
return;
}
Expand All @@ -205,7 +198,7 @@ export class SubjectSubscription<T> implements Subscription<T> {
}
}

class BidiSubject<T> extends Subject<T> {
class BidirectionalSubject<T> extends Subject<T> {

constructor(source: Observable<any>, destination: Observer<any>) {
super();
Expand Down
48 changes: 35 additions & 13 deletions src/Subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@ import tryOrOnError from './util/tryOrOnError';
import Observer from './Observer';
import Subscription from './Subscription';

const subscriptionAdd = Subscription.prototype.add;
const subscriptionRemove = Subscription.prototype.remove;
const subscriptionUnsubscribe = Subscription.prototype.unsubscribe;

export default class Subscriber<T> extends Observer<T> implements Subscription<T> {
export default class Subscriber<T> extends Subscription<T> implements Observer<T> {
protected destination: Observer<any>;

private _subscription: Subscription<T>;

private _isUnsubscribed: boolean = false;

static create<T>(next ?: (x?) => void,
error ?: (e?) => void,
complete?: () => void): Subscriber<T> {
Expand All @@ -20,12 +22,23 @@ export default class Subscriber<T> extends Observer<T> implements Subscription<T
subscriber._complete = (typeof complete === "function") && complete || noop;
return subscriber;
}

_subscription: Subscription<T>;
_isUnsubscribed: boolean = false;

_next(value: T) {
this.destination.next(value);
}

_error(err: any) {
this.destination.error(err);
}

_complete() {
this.destination.complete();
}

constructor(destination?: Observer<any>) {
super(destination);
super();
this.destination = destination;

if (!destination) {
return;
}
Expand Down Expand Up @@ -57,14 +70,23 @@ export default class Subscriber<T> extends Observer<T> implements Subscription<T
}
}

add(sub?) {
add(sub: Subscription<T>|Function|void) {
// route add to the shared Subscription if it exists
subscriptionAdd.call(this._subscription || this, sub);
const _subscription = this._subscription;
if (_subscription) {
_subscription.add(sub);
} else {
super.add(sub);
}
}

remove(sub?) {
remove(sub: Subscription<T>) {
// route remove to the shared Subscription if it exists
subscriptionRemove.call(this._subscription || this, sub);
if (this._subscription) {
this._subscription.remove(sub);
} else {
super.remove(sub);
}
}

unsubscribe() {
Expand All @@ -73,7 +95,7 @@ export default class Subscriber<T> extends Observer<T> implements Subscription<T
} else if(this._subscription) {
this._isUnsubscribed = true;
} else {
subscriptionUnsubscribe.call(this);
super.unsubscribe();
}
}

Expand Down
50 changes: 25 additions & 25 deletions src/Subscription.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
export default class Subscription<T> {

public static empty: Subscription<void> = ((empty) => {
export default class Subscription<T> {
public static EMPTY: Subscription<void> = (function(empty){
empty.isUnsubscribed = true;
return empty;
})(new Subscription<void>());
}(new Subscription<void>()));

isUnsubscribed: boolean = false;

_subscriptions: Subscription<any>[];

_unsubscribe(): void {
}

constructor(_unsubscribe?: () => void) {
// hide `_unsubscribe` from TypeScript so we can implement Subscription
if(_unsubscribe) {
(<any> this)._unsubscribe = _unsubscribe;
if (_unsubscribe) {
this._unsubscribe = _unsubscribe;
}
}

Expand All @@ -22,13 +26,12 @@ export default class Subscription<T> {

this.isUnsubscribed = true;

const self = (<any> this);
const unsubscribe = self._unsubscribe;
const subscriptions = self._subscriptions;

self._subscriptions = void 0;
const unsubscribe = this._unsubscribe;
const subscriptions = this._subscriptions;

if (unsubscribe != null) {
this._subscriptions = void 0;

if (unsubscribe) {
unsubscribe.call(this);
}

Expand All @@ -42,22 +45,20 @@ export default class Subscription<T> {
}
}

add(subscription?: Subscription<T> | Function | void): void {

add(subscription: Subscription<T>|Function|void): void {
// return early if:
// 1. the subscription is null
// 2. we're attempting to add ourself
// 2. we're attempting to add our this
// 3. we're attempting to add the static `empty` Subscription
if (subscription == null || (
if (!subscription || (
subscription === this) || (
subscription === Subscription.empty)) {
subscription === Subscription.EMPTY)) {
return;
}

const self = (<any> this);
let sub = (<Subscription<T>> subscription);

switch(typeof sub) {
switch(typeof subscription) {
case "function":
sub = new Subscription<void>(<(() => void) > subscription);
case "object":
Expand All @@ -66,7 +67,7 @@ export default class Subscription<T> {
} else if (this.isUnsubscribed) {
sub.unsubscribe();
} else {
const subscriptions = self._subscriptions || (self._subscriptions = []);
const subscriptions = this._subscriptions || (this._subscriptions = []);
subscriptions.push(sub);
}
break;
Expand All @@ -75,20 +76,19 @@ export default class Subscription<T> {
}
}

remove(subscription?: Subscription<T>): void {
remove(subscription: Subscription<T>): void {

// return early if:
// 1. the subscription is null
// 2. we're attempting to remove ourself
// 2. we're attempting to remove ourthis
// 3. we're attempting to remove the static `empty` Subscription
if (subscription == null || (
subscription === this) || (
subscription === Subscription.empty)) {
subscription === Subscription.EMPTY)) {
return;
}

const self = (<any> this);
const subscriptions = self._subscriptions;
const subscriptions = this._subscriptions;

if (subscriptions) {
const subscriptionIndex = subscriptions.indexOf(subscription);
Expand Down
2 changes: 1 addition & 1 deletion src/observables/ConnectableObservable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ export class RefCountObservable<T> extends Observable<T> {

export class RefCountSubscription<T> extends Subscription<T> {

constructor(protected refCountObservable: RefCountObservable<T>) {
constructor(private refCountObservable: RefCountObservable<T>) {
super();
}

Expand Down
6 changes: 3 additions & 3 deletions src/operators/combineLatest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ export class CombineLatestSubscriber<T, R> extends ZipSubscriber<T, R> {
}

_subscribeInner(observable, values, index, total) {
return observable.subscribe(new CombineLatestInnerSubscriber(this, values, index, total));
return observable.subscribe(new CombineLatestInnerSubscriber(this.destination, this, values, index, total));
}

_innerComplete(innerSubscriber) {
Expand All @@ -61,8 +61,8 @@ export class CombineLatestSubscriber<T, R> extends ZipSubscriber<T, R> {

export class CombineLatestInnerSubscriber<T, R> extends ZipInnerSubscriber<T, R> {

constructor(parent: ZipSubscriber<T, R>, values: any, index : number, total : number) {
super(parent, values, index, total);
constructor(destination: Observer<T>, parent: ZipSubscriber<T, R>, values: any, index : number, total : number) {
super(destination, parent, values, index, total);
}

_next(x) {
Expand Down
6 changes: 3 additions & 3 deletions src/operators/expand.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,14 @@ export class ExpandSubscriber<T, R> extends MergeSubscriber<T, R> {
} else if(observable instanceof EmptyObservable) {
this._innerComplete();
} else {
return observable.subscribe(new ExpandInnerSubscriber(this));
return observable.subscribe(new ExpandInnerSubscriber(this.destination, this));
}
}
}

export class ExpandInnerSubscriber<T, R> extends MergeInnerSubscriber<T, R> {
constructor(parent: ExpandSubscriber<T, R>) {
super(parent);
constructor(destination: Observer<T>, parent: ExpandSubscriber<T, R>) {
super(destination, parent);
}
_next(value) {
this.destination.next(value);
Expand Down
Loading

0 comments on commit 623edf7

Please sign in to comment.