Skip to content

Commit

Permalink
feat(operator): add single operator
Browse files Browse the repository at this point in the history
  • Loading branch information
kwonoj authored and benlesh committed Sep 18, 2015
1 parent 1836d9e commit 49484a2
Show file tree
Hide file tree
Showing 6 changed files with 228 additions and 1 deletion.
26 changes: 26 additions & 0 deletions perf/micro/immediate-scheduler/operators/single-predicate-this.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
var RxOld = require("rx");
var RxNew = require("../../../../index");

module.exports = function (suite) {

var predicate = function(value, i) {
return value === 20;
};

var testThis = {};

var oldSinglePredicateThisArg = RxOld.Observable.range(0, 50, RxOld.Scheduler.immediate).single(predicate, testThis);
var newSinglePredicateThisArg = RxNew.Observable.range(0, 50).single(predicate, testThis);

return suite
.add('old single(predicate, thisArg) with immediate scheduler', function () {
oldSinglePredicateThisArg.subscribe(_next, _error, _complete);
})
.add('new single(predicate, thisArg) with immediate scheduler', function () {
newSinglePredicateThisArg.subscribe(_next, _error, _complete);
});

function _next(x) { }
function _error(e){ }
function _complete(){ }
};
24 changes: 24 additions & 0 deletions perf/micro/immediate-scheduler/operators/single-predicate.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
var RxOld = require("rx");
var RxNew = require("../../../../index");

module.exports = function (suite) {

var predicate = function(value, i) {
return value === 20;
};

var oldSinglePredicate = RxOld.Observable.range(0, 50, RxOld.Scheduler.immediate).single(predicate);
var newSinglePredicate = RxNew.Observable.range(0, 50).single(predicate);

return suite
.add('old single() with immediate scheduler', function () {
oldSinglePredicate.subscribe(_next, _error, _complete);
})
.add('new single() with immediate scheduler', function () {
newSinglePredicate.subscribe(_next, _error, _complete);
});

function _next(x) { }
function _error(e){ }
function _complete(){ }
};
101 changes: 101 additions & 0 deletions spec/operators/single-spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/* globals describe, it, expect, expectObservable, hot, cold */
var Rx = require('../../dist/cjs/Rx');

describe('Observable.prototype.single()', function() {
it('Should raise error from empty predicate if observable does not emit', function() {
var e1 = hot('--a--^--|');
var expected = '---#';

expectObservable(e1.single()).toBe(expected, null, new Rx.EmptyError);
});

it('Should return only element from empty predicate if observable emits only once', function() {
var e1 = hot('--a--|');
var expected = '-----(a|)';

expectObservable(e1.single()).toBe(expected);
});

it('Should raise error from empty predicate if observable emits multiple time', function() {
var e1 = hot('--a--b--c--|');
var expected = '-----#';

expectObservable(e1.single()).toBe(expected, null, 'Sequence contains more than one element');
});

it('Should raise error from empty predicate if observable emits error', function() {
var e1 = hot('--a--b^--#');
var expected = '---#';

expectObservable(e1.single()).toBe(expected);
});

it('Should raise error from predicate if observable emits error', function() {
var e1 = hot('--a--b^--#');
var expected = '---#';

var predicate = function (value) {
return value === 'c';
}

expectObservable(e1.single(predicate)).toBe(expected);
});

it('Should raise error if predicate throws error', function() {
var e1 = hot('--a--b--c--d--|');
var expected = '-----------#';

var predicate = function (value) {
if (value !== 'd') {
return false;
}
throw 'error';
}

expectObservable(e1.single(predicate)).toBe(expected);
});

it('Should return element from predicate if observable have single matching element', function() {
var e1 = hot('--a--b--c--|');
var expected = '-----------(b|)';

var predicate = function (value) {
return value === 'b';
}

expectObservable(e1.single(predicate)).toBe(expected);
});

it('Should raise error from predicate if observable have multiple matching element', function() {
var e1 = hot('--a--b--a--b--b--|');
var expected = '-----------#';

var predicate = function (value) {
return value === 'b';
}

expectObservable(e1.single(predicate)).toBe(expected, null, 'Sequence contains more than one element');
});

it('Should raise error from predicate if observable does not emit', function() {
var e1 = hot('--a--^--|');
var expected = '---#';

var predicate = function (value) {
return value === 'a';
}

expectObservable(e1.single(predicate)).toBe(expected, null, new Rx.EmptyError);
});

it('Should return undefined from predicate if observable does not contain matching element', function() {
var e1 = hot('--a--b--c--|');
var expected = '-----------(z|)';

var predicate = function (value) {
return value === 'x';
}

expectObservable(e1.single(predicate)).toBe(expected, {z: undefined});
});
});
1 change: 1 addition & 0 deletions src/Observable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ export default class Observable<T> {

elementAt: (index: number, defaultValue?: any) => Observable<T>;
last: (predicate?: (value: T, index:number) => boolean, thisArg?: any, defaultValue?: any) => Observable<T>;
single: (predicate?: (value: T, index:number) => boolean, thisArg?: any) => Observable<T>;

filter: (predicate: (x: T) => boolean, ix?: number, thisArg?: any) => Observable<T>;
distinctUntilChanged: (compare?: (x: T, y: T) => boolean, thisArg?: any) => Observable<T>;
Expand Down
3 changes: 2 additions & 1 deletion src/Rx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -222,8 +222,9 @@ observableProto.sample = sample;
observableProto.sampleTime = sampleTime;

import last from './operators/last';

import single from './operators/single';
observableProto.last = last;
observableProto.single = single

var Scheduler = {
nextTick,
Expand Down
74 changes: 74 additions & 0 deletions src/operators/single.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import Observable from '../Observable';
import Operator from '../Operator';
import Subscriber from '../Subscriber';
import Observer from '../Observer';

import tryCatch from '../util/tryCatch';
import {errorObject} from '../util/errorObject';
import bindCallback from '../util/bindCallback';
import EmptyError from '../util/EmptyError';

export default function single<T>(predicate?: (value: T, index: number, source: Observable<T>) => boolean, thisArg?: any) : Observable<T> {
return this.lift(new SingleOperator(predicate, thisArg, this));
}

class SingleOperator<T, R> implements Operator<T, R> {
constructor(private predicate?: (value: T, index: number, source: Observable<T>) => boolean, private thisArg?: any, private source?: Observable<T>) {

}

call(subscriber: Subscriber<R>): Subscriber<T> {
return new SingleSubscriber(subscriber, this.predicate, this.thisArg, this.source);
}
}

class SingleSubscriber<T> extends Subscriber<T> {
private predicate: Function;
private seenValue: boolean = false;
private singleValue: T;
private index: number = 0;

constructor(destination : Observer<T>, predicate?: (value: T, index: number, source: Observable<T>) => boolean, private thisArg?: any, private source?: Observable<T>) {
super(destination);

if (typeof predicate === 'function') {
this.predicate = bindCallback(predicate, thisArg, 3);
}
}

private applySingleValue(value): void {
if (this.seenValue) {
this.destination.error('Sequence contains more than one element');
} else {
this.seenValue = true;
this.singleValue = value;
}
}

_next(value: T) {
const predicate = this.predicate;
const currentIndex = this.index++;

if (predicate) {
let result = tryCatch(predicate)(value, currentIndex, this.source);
if (result === errorObject) {
this.destination.error(result.e);
} else if (result) {
this.applySingleValue(value);
}
} else {
this.applySingleValue(value);
}
}

_complete() {
const destination = this.destination;

if (this.index > 0) {
destination.next(this.seenValue ? this.singleValue : undefined);
destination.complete();
} else {
destination.error(new EmptyError);
}
}
}

0 comments on commit 49484a2

Please sign in to comment.