Skip to content

Commit

Permalink
fix(merge/concat): passed scalar observables will now complete properly
Browse files Browse the repository at this point in the history
- Removed a superfluous check for `_isScalar` that was doing the wrong thing and not completing the scalar observable

fixes #1150
  • Loading branch information
benlesh committed Feb 8, 2016
1 parent 0976d7a commit c01b92f
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 6 deletions.
25 changes: 25 additions & 0 deletions spec/operators/concat-spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,31 @@ describe('Observable.prototype.concat()', function () {
expectObservable(e1.concat(e2, rxTestScheduler)).toBe(expected);
});

it('should work properly with scalar observables', function (done) {
var results = [];

var s1 = Observable
.create(function (observer) {
setTimeout(function () {
observer.next(1);
observer.complete();
});
})
.concat(Observable.of(2));

s1.subscribe(
function (x) {
results.push('Next: ' + x);
},
done.fail,
function (x) {
results.push('Completed');
expect(results).toEqual(['Next: 1', 'Next: 2', 'Completed']);
done();
}
);
});

it('should complete without emit if both sources are empty', function () {
var e1 = cold('--|');
var e1subs = '^ !';
Expand Down
8 changes: 2 additions & 6 deletions src/operator/mergeAll.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,8 @@ export class MergeAllSubscriber<T> extends OuterSubscriber<Observable<T>, T> {

protected _next(observable: Observable<T>) {
if (this.active < this.concurrent) {
if (observable._isScalar) {
this.destination.next((<any>observable).value);
} else {
this.active++;
this.add(subscribeToResult<Observable<T>, T>(this, observable));
}
this.active++;
this.add(subscribeToResult<Observable<T>, T>(this, observable));
} else {
this.buffer.push(observable);
}
Expand Down

0 comments on commit c01b92f

Please sign in to comment.