Skip to content

Commit

Permalink
feat(operator): Add distinctUntilChanged and distinctUntilKeyChanged
Browse files Browse the repository at this point in the history
  • Loading branch information
trxcllnt committed Aug 14, 2015
1 parent b9e5888 commit f9ba4da
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 1 deletion.
15 changes: 15 additions & 0 deletions spec/operators/distinctUntilChanged-spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/* globals describe, it, expect */
var Rx = require('../../dist/cjs/Rx');
var Observable = Rx.Observable;

describe('Observable.prototype.distinctUntilChanged()', function () {
it('should distinguish between values', function (done) {
var expected = [1, 2, 1];
Observable
.of(1, 1, 1, 2, 2, 1)
.distinctUntilChanged()
.subscribe(function(x) {
expect(x).toBe(expected.shift());
}, null, done);
});
});
15 changes: 15 additions & 0 deletions spec/operators/distinctUntilKeyChanged-spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/* globals describe, it, expect */
var Rx = require('../../dist/cjs/Rx');
var Observable = Rx.Observable;

describe('Observable.prototype.distinctUntilKeyChanged()', function () {
it('should distinguish between values', function (done) {
var expected = [{val: 1}, {val: 2}, {val: 1}];
Observable
.of({val: 1}, {val: 1}, {val: 1}, {val: 2}, {val: 2}, {val: 1})
.distinctUntilKeyChanged("val")
.subscribe(function(x) {
expect(x).toDeepEqual(expected.shift());
}, null, done);
});
});
2 changes: 2 additions & 0 deletions src/Observable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ export default class Observable<T> {
startWith: <T>(x: T) => Observable<T>;

filter: (predicate: (x: T) => boolean, ix?: number, thisArg?: any) => Observable<T>;
distinctUntilChanged: (compare?: (x: T, y: T) => boolean, thisArg?: any) => Observable<T>;
distinctUntilKeyChanged: (key: string, compare?: (x: any, y: any) => boolean, thisArg?: any) => Observable<T>;
skip: (count: number) => Observable<T>;
take: (count: number) => Observable<T>;
takeUntil: (observable: Observable<any>) => Observable<T>;
Expand Down
6 changes: 5 additions & 1 deletion src/Rx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,15 +87,19 @@ observableProto.scan = scan;
observableProto.reduce = reduce;
observableProto.startWith = startWith;

import filter from './operators/filter';
import take from './operators/take';
import skip from './operators/skip';
import takeUntil from './operators/takeUntil';
import filter from './operators/filter';
import distinctUntilChanged from './operators/distinctUntilChanged';
import distinctUntilKeyChanged from './operators/distinctUntilKeyChanged';

observableProto.take = take;
observableProto.skip = skip;
observableProto.takeUntil = takeUntil;
observableProto.filter = filter;
observableProto.distinctUntilChanged = distinctUntilChanged;
observableProto.distinctUntilKeyChanged = distinctUntilKeyChanged;

import combineLatest from './operators/combineLatest';
import combineAll from './operators/combineAll';
Expand Down
64 changes: 64 additions & 0 deletions src/operators/distinctUntilChanged.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import Operator from '../Operator';
import Observer from '../Observer';
import Subscriber from '../Subscriber';

import tryCatch from '../util/tryCatch';
import {errorObject} from '../util/errorObject';
import bindCallback from '../util/bindCallback';

export default function distinctUntilChanged<T>(compare?: (x: T, y: T) => boolean, thisArg?: any) {
return this.lift(new DistinctUntilChangedOperator(thisArg ?
<(x: T, y: T) => boolean> bindCallback(compare, thisArg, 2) :
compare));
}

export class DistinctUntilChangedOperator<T, R> extends Operator<T, R> {

compare: (x: T, y: T) => boolean;

constructor(compare?: (x: T, y: T) => boolean) {
super();
this.compare = compare;
}

call(observer: Observer<T>): Observer<T> {
return new DistinctUntilChangedSubscriber(observer, this.compare);
}
}

export class DistinctUntilChangedSubscriber<T> extends Subscriber<T> {

value: T;
hasValue: boolean = false;

constructor(destination: Observer<T>, compare?: (x: T, y: T) => boolean) {
super(destination);
if (typeof compare === "function") {
this.compare = compare;
}
}

compare(x: T, y: T) {
return x === y;
}

_next(x) {

let result: any = false;

if(this.hasValue) {
result = tryCatch(this.compare)(this.value, x);
if (result === errorObject) {
this.destination.error(errorObject.e);
return;
}
} else {
this.hasValue = true;
}

if (Boolean(result) === false) {
this.value = x;
this.destination.next(x);
}
}
}
10 changes: 10 additions & 0 deletions src/operators/distinctUntilKeyChanged.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import distinctUntilChanged from './distinctUntilChanged';

export default function distinctUntilKeyChanged<T>(key: string, compare?: (x: any, y: any) => boolean, thisArg?: any) {
return distinctUntilChanged.call(this, function(x, y) {
if (compare) {
return compare.call(thisArg, x[key], y[key]);
}
return x[key] === y[key];
});
}

0 comments on commit f9ba4da

Please sign in to comment.