Skip to content

Commit

Permalink
feat(Observable): add static create method
Browse files Browse the repository at this point in the history
- adds more tests around Observable
- adds some code to assure isUnsubscribed is always boolean in Subscriber

closes #255
  • Loading branch information
benlesh committed Sep 3, 2015
1 parent 2028a61 commit e0d27ba
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 6 deletions.
78 changes: 77 additions & 1 deletion spec/observable-spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,86 @@ var Observable = Rx.Observable;
describe('Observable', function () {
it('should be constructed with a subscriber function', function (done) {
var source = new Observable(function (observer) {
expectFullObserver(observer);
observer.next(1);
observer.complete();
});

source.subscribe(function (x) { expect(x).toBe(1); }, null, done);
});
});

describe('subscribe', function () {
it('should be synchronous', function () {
var subscribed = false;
var nexted, completed;
var source = new Observable(function (observer) {
subscribed = true;
observer.next('wee');
expect(nexted).toBe('wee');
observer.complete();
expect(completed).toBe(true);
});
expect(subscribed).toBe(false);

var mutatedByNext = false;
var mutatedByComplete = false;

source.subscribe(function (x) {
nexted = x;
mutatedByNext = true;
}, null, function () {
completed = true;
mutatedByComplete = true;
});

expect(mutatedByNext).toBe(true);
expect(mutatedByComplete).toBe(true);
});

it('should return a Subscription that calls the unsubscribe function returned by the subscriber', function () {
var unsubscribeCalled = false;

var source = new Observable(function () {
return function () {
unsubscribeCalled = true;
};
});

var sub = source.subscribe(function () { });
expect(sub instanceof Rx.Subscription).toBe(true);
expect(unsubscribeCalled).toBe(false);
expect(typeof sub.unsubscribe).toBe('function');

sub.unsubscribe();
expect(unsubscribeCalled).toBe(true);
});
});
});

describe('Observable.create', function () {
it('should create an Observable', function () {
var result = Observable.create(function () { });
expect(result instanceof Observable).toBe(true);
});

it('should provide an observer to the function', function () {
var called = false;
var result = Observable.create(function (observer) {
called = true;
expectFullObserver(observer);
observer.complete();
});

expect(called).toBe(false);
result.subscribe(function () { });
expect(called).toBe(true);
});
});

function expectFullObserver(val) {
expect(typeof val).toBe('object');
expect(typeof val.next).toBe('function');
expect(typeof val.error).toBe('function');
expect(typeof val.complete).toBe('function');
expect(typeof val.isUnsubscribed).toBe('boolean');
}
13 changes: 10 additions & 3 deletions src/Observable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ import GroupSubject from './subjects/GroupSubject';

import $$observer from './util/Symbol_observer';

export default class Observable<T> {

export default class Observable<T> {

source: Observable<any>;
operator: Operator<any, T>;
Expand All @@ -20,7 +21,13 @@ export default class Observable<T> {
this._subscribe = subscribe;
}
}


// HACK: Since TypeScript inherits static properties too, we have to
// fight against TypeScript here so Subject can have a different static create signature.
static create: Function = <T>(subscribe?: <R>(subscriber: Subscriber<R>) => Subscription<T> | Function | void) => {
return new Observable<T>(subscribe);
};

lift<T, R>(operator: Operator<T, R>): Observable<T> {
const observable = new Observable();
observable.source = this;
Expand Down Expand Up @@ -174,4 +181,4 @@ export default class Observable<T> {
finally: (ensure: () => void, thisArg?: any) => Observable<T>;
timeout: <T>(due: number|Date, errorToSend?: any, scheduler?: Scheduler) => Observable<T>;
timeoutWith: <T>(due: number|Date, withObservable: Observable<any>, scheduler?: Scheduler) => Observable<T>;
}
}
4 changes: 2 additions & 2 deletions src/Subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,9 @@ export default class Subscriber<T> extends Subscription<T> implements Observer<T
const subscription = this._subscription;
if (subscription) {
// route to the shared Subscription if it exists
subscription.isUnsubscribed = value;
subscription.isUnsubscribed = Boolean(value);
} else {
this._isUnsubscribed = value;
this._isUnsubscribed = Boolean(value);
}
}

Expand Down

0 comments on commit e0d27ba

Please sign in to comment.