Skip to content

Commit

Permalink
feat(mergeAll): now supports promises, iterables and lowercase-o obse…
Browse files Browse the repository at this point in the history
…rvables
  • Loading branch information
benlesh committed Sep 23, 2015
1 parent c5239e9 commit 4c16aa6
Showing 1 changed file with 5 additions and 12 deletions.
17 changes: 5 additions & 12 deletions src/operators/mergeAll-support.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ import Operator from '../Operator';
import Subscriber from '../Subscriber';
import Observer from '../Observer';
import Subscription from '../Subscription';
import OuterSubscriber from '../OuterSubscriber';
import subscribeToResult from '../util/subscribeToResult';

export class MergeAllOperator<T, R> implements Operator<T, R> {
constructor(private concurrent: number) {
Expand All @@ -14,10 +16,11 @@ export class MergeAllOperator<T, R> implements Operator<T, R> {
}
}

export class MergeAllSubscriber<T> extends Subscriber<T> {
export class MergeAllSubscriber<T, R> extends OuterSubscriber<T, R> {
private hasCompleted: boolean = false;
private buffer: Observable<any>[] = [];
private active: number = 0;

constructor(destination: Observer<T>, private concurrent:number) {
super(destination);
}
Expand All @@ -28,7 +31,7 @@ export class MergeAllSubscriber<T> extends Subscriber<T> {
this.destination.next(observable.value);
} else {
this.active++;
this.add(observable.subscribe(new MergeAllInnerSubscriber(this.destination, this)))
this.add(subscribeToResult<T, R>(this, observable));
}
} else {
this.buffer.push(observable);
Expand All @@ -52,14 +55,4 @@ export class MergeAllSubscriber<T> extends Subscriber<T> {
this.destination.complete();
}
}
}

export class MergeAllInnerSubscriber<T> extends Subscriber<T> {
constructor(destination: Observer<T>, private parent: MergeAllSubscriber<T>) {
super(destination);
}

_complete() {
this.parent.notifyComplete(this);
}
}

0 comments on commit 4c16aa6

Please sign in to comment.