Skip to content

Commit

Permalink
Remove flushMode tracking from PendingStateManager (#11419)
Browse files Browse the repository at this point in the history
[AB#1107](https://dev.azure.com/fluidframework/internal/_workitems/edit/1107)

For more information about how to contribute to this repo, visit [this page](https://github.com/microsoft/FluidFramework/blob/main/CONTRIBUTING.md).

## Description

The `PendingStateManager` should just track batches and ops and not care about the `flushMode` of the container runtime.
See #10930 (comment).
  • Loading branch information
kian-thompson committed Aug 9, 2022
1 parent 0e81146 commit 283da93
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 27 deletions.
15 changes: 9 additions & 6 deletions packages/runtime/container-runtime/src/containerRuntime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1387,7 +1387,6 @@ export class ContainerRuntime extends TypedEventEmitter<IContainerRuntimeEvents>
rollback: this.rollback.bind(this),
orderSequentially: this.orderSequentially.bind(this),
},
this.flushMode,
pendingRuntimeState?.pending);

this.context.quorum.on("removeMember", (clientId: string) => {
Expand Down Expand Up @@ -2065,11 +2064,15 @@ export class ContainerRuntime extends TypedEventEmitter<IContainerRuntimeEvents>
return;
}

// Let the PendingStateManager know that there was an attempt to flush messages.
// Note that this should happen before the `this.needsFlush` check below because in the scenario where we are
// not connected, `this.needsFlush` will be false but the PendingStateManager might have pending messages and
// hence needs to track this.
this.pendingStateManager.onFlush(isImmediateBatch);
// ! TODO: This condition should be removed once "flush" becomes private
// See https://dev.azure.com/fluidframework/internal/_workitems/edit/1076
if (this.flushMode !== FlushMode.Immediate || isImmediateBatch) {
// Let the PendingStateManager know that there was an attempt to flush messages.
// Note that this should happen before the `this.needsFlush` check below because in the scenario where we
// are not connected, `this.needsFlush` will be false but the PendingStateManager might have pending
// messages and hence needs to track this.
this.pendingStateManager.onFlush();
}

// If flush has already been called then exit early
if (!this.needsFlush) {
Expand Down
25 changes: 6 additions & 19 deletions packages/runtime/container-runtime/src/pendingStateManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import { DataProcessingError } from "@fluidframework/container-utils";
import {
ISequencedDocumentMessage,
} from "@fluidframework/protocol-definitions";
import { FlushMode } from "@fluidframework/runtime-definitions";
import { wrapError } from "@fluidframework/telemetry-utils";
import Deque from "double-ended-queue";
import { ContainerMessageType } from "./containerRuntime";
Expand Down Expand Up @@ -118,7 +117,6 @@ export class PendingStateManager implements IDisposable {

constructor(
private readonly stateHandler: IRuntimeStateHandler,
private readonly flushMode: FlushMode,
initialLocalState: IPendingLocalState | undefined,
) {
this.initialStates = new Deque<IPendingState>(initialLocalState?.pendingStates ?? []);
Expand Down Expand Up @@ -160,17 +158,8 @@ export class PendingStateManager implements IDisposable {

/**
* Called when flush() is called on the ContainerRuntime to manually flush messages.
* @param isImmediateBatch - add the "flush" message to the pending states even when flushMode is Immediate
*/
public onFlush(isImmediateBatch: boolean = false) {
/**
* If the FlushMode is Immediate, we don't need to track an explicit flush call because every message is
* automatically flushed. So, flush is a no-op.
*/
if (!isImmediateBatch && this.flushMode === FlushMode.Immediate) {
return;
}

public onFlush() {
// If the previous state is not a message, flush is a no-op.
const previousState = this.pendingStates.peekBack();
if (previousState?.type !== "message") {
Expand Down Expand Up @@ -404,13 +393,11 @@ export class PendingStateManager implements IDisposable {
assert(pendingState.opMetadata?.batch !== false || messageBatchQueue.length > 0,
"We cannot process batches in chunks");
/**
* We want to ensure grouped messages get processed in a batch when flush mode is Immediate.
* Note, it is not possible for the PendingStateManager to receive a partially acked batch. It will
* We want to ensure grouped messages get processed in a batch.
* Note: It is not possible for the PendingStateManager to receive a partially acked batch. It will
* either receive the whole batch ack or nothing at all.
*/
if (this.flushMode === FlushMode.Immediate
&& (messageBatchQueue.length > 0 || pendingState.opMetadata?.batch)
) {
if (messageBatchQueue.length > 0 || pendingState.opMetadata?.batch) {
messageBatchQueue.enqueue(pendingState);
} else {
this.stateHandler.reSubmit(
Expand All @@ -422,11 +409,11 @@ export class PendingStateManager implements IDisposable {
break;
case "flush":
/**
* When flushMode is Immediate, a "flush" call can indicate the end of a batch.
* A "flush" call can indicate the end of a batch.
* We can't rely on the "batch" property in the message metadata as it gets
* updated elsewhere and it is not the same object instance that gets updated.
*/
if (this.flushMode === FlushMode.Immediate && messageBatchQueue.length > 0) {
if (messageBatchQueue.length > 0) {
this.stateHandler.orderSequentially(() => {
while (messageBatchQueue.length > 0) {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
*/

import assert from "assert";
import { FlushMode } from "@fluidframework/runtime-definitions";
import { PendingStateManager } from "../pendingStateManager";
import { ContainerMessageType } from "..";

Expand Down Expand Up @@ -35,7 +34,7 @@ describe("Pending State Manager Rollback", () => {
}
},
orderSequentially: () => {},
}, FlushMode.Immediate, undefined);
}, undefined);
});

it("should do nothing when rolling back empty pending stack", () => {
Expand Down

0 comments on commit 283da93

Please sign in to comment.