Skip to content

Commit

Permalink
chore(merge): Merge missed commits from master into publishing branch
Browse files Browse the repository at this point in the history
  • Loading branch information
benlesh committed Feb 25, 2021
2 parents f2ffada + 6f7d88b commit c8ea5a1
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 7 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci_ts_latest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,6 @@ jobs:
run: |
npm install -g npm@latest
npm ci
npm install --no-save typescript@latest
npm install --no-save typescript@latest tslib@latest @types/node@latest
npm run build_all
22 changes: 22 additions & 0 deletions spec/operators/shareReplay-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -262,4 +262,26 @@ describe('shareReplay operator', () => {
expectObservable(result).toBe(expected);
});

const FinalizationRegistry = (global as any).FinalizationRegistry;
if (FinalizationRegistry) {

it('should not leak the subscriber for sync sources', (done) => {
const registry = new FinalizationRegistry((value: any) => {
expect(value).to.equal('callback');
done();
});
let callback: (() => void) | undefined = () => { /* noop */ };
registry.register(callback, 'callback');

const shared = of(42).pipe(shareReplay(1));
shared.subscribe(callback);

callback = undefined;
global.gc();
});

} else {
console.warn(`No support for FinalizationRegistry in Node ${process.version}`);
}

});
1 change: 1 addition & 0 deletions spec/support/default.opts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

--reporter dot

--expose-gc
--check-leaks
--globals WebSocket,FormData,XDomainRequest,ActiveXObject,fetch,AbortController

Expand Down
32 changes: 26 additions & 6 deletions src/internal/operators/shareReplay.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,14 @@ export interface ShareReplayConfig {
* @method shareReplay
* @owner Observable
*/
export function shareReplay<T>(config: ShareReplayConfig): MonoTypeOperatorFunction<T>;
export function shareReplay<T>(bufferSize?: number, windowTime?: number, scheduler?: SchedulerLike): MonoTypeOperatorFunction<T>;
export function shareReplay<T>(
config: ShareReplayConfig
): MonoTypeOperatorFunction<T>;
export function shareReplay<T>(
bufferSize?: number,
windowTime?: number,
scheduler?: SchedulerLike
): MonoTypeOperatorFunction<T>;
export function shareReplay<T>(
configOrBufferSize?: ShareReplayConfig | number,
windowTime?: number,
Expand All @@ -71,7 +77,7 @@ export function shareReplay<T>(
bufferSize: configOrBufferSize as number | undefined,
windowTime,
refCount: false,
scheduler
scheduler,
};
}
return (source: Observable<T>) => source.lift(shareReplayOperator(config));
Expand All @@ -81,23 +87,28 @@ function shareReplayOperator<T>({
bufferSize = Number.POSITIVE_INFINITY,
windowTime = Number.POSITIVE_INFINITY,
refCount: useRefCount,
scheduler
scheduler,
}: ShareReplayConfig) {
let subject: ReplaySubject<T> | undefined;
let refCount = 0;
let subscription: Subscription | undefined;
let hasError = false;
let isComplete = false;

return function shareReplayOperation(this: Subscriber<T>, source: Observable<T>) {
return function shareReplayOperation(
this: Subscriber<T>,
source: Observable<T>
) {
refCount++;
let innerSub: Subscription;
if (!subject || hasError) {
hasError = false;
subject = new ReplaySubject<T>(bufferSize, windowTime, scheduler);
innerSub = subject.subscribe(this);
subscription = source.subscribe({
next(value) { subject.next(value); },
next(value) {
subject.next(value);
},
error(err) {
hasError = true;
subject.error(err);
Expand All @@ -108,13 +119,22 @@ function shareReplayOperator<T>({
subject.complete();
},
});

// Here we need to check to see if the source synchronously completed. Although
// we're setting `subscription = undefined` in the completion handler, if the source
// is synchronous, that will happen *before* subscription is set by the return of
// the `subscribe` call.
if (isComplete) {
subscription = undefined;
}
} else {
innerSub = subject.subscribe(this);
}

this.add(() => {
refCount--;
innerSub.unsubscribe();
innerSub = undefined;
if (subscription && !isComplete && useRefCount && refCount === 0) {
subscription.unsubscribe();
subscription = undefined;
Expand Down

0 comments on commit c8ea5a1

Please sign in to comment.