Skip to content

Commit

Permalink
Remove two types of nacks - "Nonexistent client" & "Readonly client" (#…
Browse files Browse the repository at this point in the history
…7753)

This is part of looking into issue #7137 (nacks, specifically - tight nack loops).
This change adds logic for DeltaManager to realize when it does not have "write" connection and stop sending ops on such connections. This removes "expected" nacks from the workflow. Remaining nack types are (in most cases) logical errors that needs to be raised in telemetry and looked into (10K unsummarized ops, 1Mb+ ops, etc.).

Removing "expected" nacks from the workflows allows us to make next step - apply some kind of workflow that breaks tight infinite loop of remaining nacks types. For example, we may add delays, or better - close container if we see number of nacks in a row (this is not part of this change, I'll continue to use this issue to track this work).

The con of this approach - it adds more logic to already complicated DeltaManager.

One of the other pros - it makes things more consistent for other layers. I.e. if Container checks for connection mode and it's "write", now it knows for sure that it can send an op. Even more important - client knows that it should not send ops (for example, consensus / leader election DDSs) when it was downgraded to "read" and thus would stop sending ops, and allow this client to stay in "read" connection mode once it got there.
  • Loading branch information
vladsud committed Oct 9, 2021
1 parent 5fc2cbf commit 1ea711f
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 13 deletions.
12 changes: 11 additions & 1 deletion packages/loader/container-loader/src/container.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1615,7 +1615,17 @@ export class Container extends EventEmitterWithErrorHandling<IContainerEvents> i
* If it's not true, runtime is not in position to send ops.
*/
private activeConnection() {
return this.connectionState === ConnectionState.Connected && this._deltaManager.connectionMode === "write";
const active = this.connectionState === ConnectionState.Connected &&
this._deltaManager.connectionMode === "write";

// Check for presence of current client in quorum for "write" connections - inactive clients
// would get leave op after some long timeout (5 min) and that should automatically transition
// state to "read" mode.
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
assert(!active || this.getQuorum().getMember(this.clientId!) !== undefined,
"active connection not present in quorum");

return active;
}

private createDeltaManager() {
Expand Down
87 changes: 77 additions & 10 deletions packages/loader/container-loader/src/deltaManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import {
ITrace,
MessageType,
ScopeType,
ISequencedDocumentSystemMessage,
} from "@fluidframework/protocol-definitions";
import {
canRetryOnError,
Expand Down Expand Up @@ -241,6 +242,12 @@ export class DeltaManager

private readonly closeAbortController = new AbortController();

// True if there is pending (async) reconnection from "read" to "write"
private pendingReconnect = false;

// downgrade "write" connection to "read"
private downgradedConnection = false;

/**
* Tells if current connection has checkpoint information.
* I.e. we know how far behind the client was at the time of establishing connection
Expand Down Expand Up @@ -316,7 +323,9 @@ export class DeltaManager
* The current connection mode, initially read.
*/
public get connectionMode(): ConnectionMode {
if (this.connection === undefined) {
assert(!this.downgradedConnection || this.connection?.mode === "write",
"Did we forget to reset downgradedConnection on new connection?");
if (this.connection === undefined || this.downgradedConnection) {
return "read";
}
return this.connection.mode;
Expand Down Expand Up @@ -703,11 +712,8 @@ export class DeltaManager

if (docService.policies?.storageOnly === true) {
const connection = new NoDeltaStream();
this.connectionP = new Promise((resolve) => {
this.setupNewSuccessfulConnection(connection, "read");
resolve(connection);
});
return this.connectionP;
this.setupNewSuccessfulConnection(connection, "read");
return connection;
}

// The promise returned from connectCore will settle with a resolved connection or reject with error
Expand Down Expand Up @@ -793,7 +799,7 @@ export class DeltaManager

// Attempt the connection
connectCore().then((connection) => {
this.connectionP = undefined;
assert(this.connectionP === undefined, "this.connectionP has been reset on successful connection");
this.removeListener("closed", cleanupAndReject);
resolve(connection);
}).catch(cleanupAndReject);
Expand Down Expand Up @@ -849,6 +855,31 @@ export class DeltaManager
this.clientSequenceNumberObserved = 0;
}

// If connection is "read" or implicit "read" (got leave op for "write" connection),
// then op can't make it through - we will get a nack if op is sent.
// We can short-circuit this process.
// Note that we also want nacks to be rare and be treated as catastrophic failures.
// Be careful with reentrancy though - disconnected event should not be be raised in the
// middle of the current workflow, but rather on clean stack!
if (this.connectionMode === "read") {
if (!this.pendingReconnect) {
this.pendingReconnect = true;
Promise.resolve().then(async () => {
if (this.pendingReconnect) { // still valid?
return this.reconnectOnErrorCore(
"write", // connectionMode
"Switch to write", // message
);
}
})
.catch(() => {});
}

// Can return -1 here, but no other path does it (other than error path in Container),
// so it's better not to introduce new states.
return ++this.clientSequenceNumber;
}

const service = this.clientDetails.type === undefined || this.clientDetails.type === ""
? "unknown"
: this.clientDetails.type;
Expand Down Expand Up @@ -1125,6 +1156,7 @@ export class DeltaManager
}

this.connection = connection;
this.connectionP = undefined;

// Does information in scopes & mode matches?
// If we asked for "write" and got "read", then file is read-only
Expand Down Expand Up @@ -1238,10 +1270,15 @@ export class DeltaManager
* @param reason - Text description of disconnect reason to emit with disconnect event
*/
private disconnectFromDeltaStream(reason: string) {
this.pendingReconnect = false;
this.downgradedConnection = false;

if (this.connection === undefined) {
return false;
}

assert(this.connectionP === undefined, "reentrnacy may result in incorrect behavior");

const connection = this.connection;
// Avoid any re-entrancy - clear object reference
this.connection = undefined;
Expand Down Expand Up @@ -1290,16 +1327,35 @@ export class DeltaManager
private async reconnectOnError(
requestedMode: ConnectionMode,
error: DriverError,
) {
return this.reconnectOnErrorCore(
requestedMode,
error.message,
error);
}

/**
* Disconnect the current connection and reconnect.
* @param connection - The connection that wants to reconnect - no-op if it's different from this.connection
* @param requestedMode - Read or write
* @param error - Error reconnect information including whether or not to reconnect
* @returns A promise that resolves when the connection is reestablished or we stop trying
*/
private async reconnectOnErrorCore(
requestedMode: ConnectionMode,
disconnectMessage: string,
error?: DriverError,
) {
// We quite often get protocol errors before / after observing nack/disconnect
// we do not want to run through same sequence twice.
// If we're already disconnected/disconnecting it's not appropriate to call this again.
assert(this.connection !== undefined, 0x0eb /* "Missing connection for reconnect" */);

this.disconnectFromDeltaStream(error.message);
this.disconnectFromDeltaStream(disconnectMessage);

const canRetry = error !== undefined ? canRetryOnError(error) : true;

// If reconnection is not an option, close the DeltaManager
const canRetry = canRetryOnError(error);
if (this.reconnectMode === ReconnectMode.Never || !canRetry) {
// Do not raise container error if we are closing just because we lost connection.
// Those errors (like IdleDisconnect) would show up in telemetry dashboards and
Expand All @@ -1313,7 +1369,7 @@ export class DeltaManager
}

if (this.reconnectMode === ReconnectMode.Enabled) {
const delayMs = getRetryDelayFromError(error);
const delayMs = error !== undefined ? getRetryDelayFromError(error) : undefined;
if (delayMs !== undefined) {
this.emitDelayInfo(this.deltaStreamDelayId, delayMs, error);
await waitForConnectedState(delayMs);
Expand Down Expand Up @@ -1504,6 +1560,17 @@ export class DeltaManager
message.contents = JSON.parse(message.contents);
}

if (message.type === MessageType.ClientLeave) {
const systemLeaveMessage = message as ISequencedDocumentSystemMessage;
const clientId = JSON.parse(systemLeaveMessage.data) as string;
if (clientId === this.connection?.clientId) {
// We have been kicked out from quorum
this.logger.sendPerformanceEvent({ eventName: "ReadConnectionTransition" });
this.downgradedConnection = true;
assert(this.connectionMode === "read", "effective connectionMode should be 'read' after downgrade");
}
}

// Add final ack trace.
if (message.traces !== undefined && message.traces.length > 0) {
const service = this.clientDetails.type === undefined || this.clientDetails.type === ""
Expand Down
3 changes: 1 addition & 2 deletions packages/test/test-end-to-end-tests/src/test/blobs.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -189,10 +189,9 @@ describeFullCompat("blobs", (getTestObjectProvider) => {
const container = await provider.makeTestContainer(testContainerConfig);
const dataStore = await requestFluidObject<ITestDataObject>(container, "default");

(container.deltaManager as any)._inbound.pause();

const blobOpP = new Promise<void>((res) => container.deltaManager.on("submitOp", (op) => {
if (op.contents.includes("blobAttach")) {
(container.deltaManager as any)._inbound.pause();
res();
}
}));
Expand Down

0 comments on commit 1ea711f

Please sign in to comment.