Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove two types of nacks - "Nonexistent client" & "Readonly client" #7753

Merged
merged 9 commits into from
Oct 9, 2021
12 changes: 11 additions & 1 deletion packages/loader/container-loader/src/container.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1615,7 +1615,17 @@ export class Container extends EventEmitterWithErrorHandling<IContainerEvents> i
* If it's not true, runtime is not in position to send ops.
*/
private activeConnection() {
return this.connectionState === ConnectionState.Connected && this._deltaManager.connectionMode === "write";
const active = this.connectionState === ConnectionState.Connected &&
this._deltaManager.connectionMode === "write";

// Check for presence of current client in quorum for "write" connections - inactive clients
// would get leave op after some long timeout (5 min) and that should automatically transition
// state to "read" mode.
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
assert(!active || this.getQuorum().getMember(this.clientId!) !== undefined,
"active connection not present in quorum");

return active;
}

private createDeltaManager() {
Expand Down
87 changes: 77 additions & 10 deletions packages/loader/container-loader/src/deltaManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import {
ITrace,
MessageType,
ScopeType,
ISequencedDocumentSystemMessage,
} from "@fluidframework/protocol-definitions";
import {
canRetryOnError,
Expand Down Expand Up @@ -241,6 +242,12 @@ export class DeltaManager

private readonly closeAbortController = new AbortController();

// True if there is pending (async) reconnection from "read" to "write"
private pendingReconnect = false;

// downgrade "write" connection to "read"
private downgradedConnection = false;

/**
* Tells if current connection has checkpoint information.
* I.e. we know how far behind the client was at the time of establishing connection
Expand Down Expand Up @@ -316,7 +323,9 @@ export class DeltaManager
* The current connection mode, initially read.
*/
public get connectionMode(): ConnectionMode {
if (this.connection === undefined) {
assert(!this.downgradedConnection || this.connection?.mode === "write",
"Did we forget to reset downgradedConnection on new connection?");
if (this.connection === undefined || this.downgradedConnection) {
return "read";
}
return this.connection.mode;
Expand Down Expand Up @@ -703,11 +712,8 @@ export class DeltaManager

if (docService.policies?.storageOnly === true) {
const connection = new NoDeltaStream();
this.connectionP = new Promise((resolve) => {
this.setupNewSuccessfulConnection(connection, "read");
resolve(connection);
});
return this.connectionP;
this.setupNewSuccessfulConnection(connection, "read");
return connection;
}

// The promise returned from connectCore will settle with a resolved connection or reject with error
Expand Down Expand Up @@ -793,7 +799,7 @@ export class DeltaManager

// Attempt the connection
connectCore().then((connection) => {
this.connectionP = undefined;
assert(this.connectionP === undefined, "this.connectionP has been reset on successful connection");
this.removeListener("closed", cleanupAndReject);
resolve(connection);
}).catch(cleanupAndReject);
Expand Down Expand Up @@ -849,6 +855,31 @@ export class DeltaManager
this.clientSequenceNumberObserved = 0;
}

// If connection is "read" or implicit "read" (got leave op for "write" connection),
// then op can't make it through - we will get a nack if op is sent.
// We can short-circuit this process.
// Note that we also want nacks to be rare and be treated as catastrophic failures.
// Be careful with reentrancy though - disconnected event should not be be raised in the
// middle of the current workflow, but rather on clean stack!
if (this.connectionMode === "read") {
if (!this.pendingReconnect) {
this.pendingReconnect = true;
Promise.resolve().then(async () => {
if (this.pendingReconnect) { // still valid?
return this.reconnectOnErrorCore(
"write", // connectionMode
"Switch to write", // message
);
}
})
.catch(() => {});
}

// Can return -1 here, but no other path does it (other than error path in Container),
// so it's better not to introduce new states.
return ++this.clientSequenceNumber;
}

const service = this.clientDetails.type === undefined || this.clientDetails.type === ""
? "unknown"
: this.clientDetails.type;
Expand Down Expand Up @@ -1125,6 +1156,7 @@ export class DeltaManager
}

this.connection = connection;
this.connectionP = undefined;

// Does information in scopes & mode matches?
// If we asked for "write" and got "read", then file is read-only
Expand Down Expand Up @@ -1238,10 +1270,15 @@ export class DeltaManager
* @param reason - Text description of disconnect reason to emit with disconnect event
*/
private disconnectFromDeltaStream(reason: string) {
this.pendingReconnect = false;
this.downgradedConnection = false;

if (this.connection === undefined) {
return false;
}

assert(this.connectionP === undefined, "reentrnacy may result in incorrect behavior");

const connection = this.connection;
// Avoid any re-entrancy - clear object reference
this.connection = undefined;
Expand Down Expand Up @@ -1290,16 +1327,35 @@ export class DeltaManager
private async reconnectOnError(
requestedMode: ConnectionMode,
error: DriverError,
) {
return this.reconnectOnErrorCore(
requestedMode,
error.message,
error);
}

/**
* Disconnect the current connection and reconnect.
* @param connection - The connection that wants to reconnect - no-op if it's different from this.connection
* @param requestedMode - Read or write
* @param error - Error reconnect information including whether or not to reconnect
* @returns A promise that resolves when the connection is reestablished or we stop trying
*/
private async reconnectOnErrorCore(
requestedMode: ConnectionMode,
disconnectMessage: string,
error?: DriverError,
) {
// We quite often get protocol errors before / after observing nack/disconnect
// we do not want to run through same sequence twice.
// If we're already disconnected/disconnecting it's not appropriate to call this again.
assert(this.connection !== undefined, 0x0eb /* "Missing connection for reconnect" */);

this.disconnectFromDeltaStream(error.message);
this.disconnectFromDeltaStream(disconnectMessage);

const canRetry = error !== undefined ? canRetryOnError(error) : true;

// If reconnection is not an option, close the DeltaManager
const canRetry = canRetryOnError(error);
vladsud marked this conversation as resolved.
Show resolved Hide resolved
if (this.reconnectMode === ReconnectMode.Never || !canRetry) {
// Do not raise container error if we are closing just because we lost connection.
// Those errors (like IdleDisconnect) would show up in telemetry dashboards and
Expand All @@ -1313,7 +1369,7 @@ export class DeltaManager
}

if (this.reconnectMode === ReconnectMode.Enabled) {
const delayMs = getRetryDelayFromError(error);
const delayMs = error !== undefined ? getRetryDelayFromError(error) : undefined;
if (delayMs !== undefined) {
this.emitDelayInfo(this.deltaStreamDelayId, delayMs, error);
await waitForConnectedState(delayMs);
Expand Down Expand Up @@ -1504,6 +1560,17 @@ export class DeltaManager
message.contents = JSON.parse(message.contents);
}

if (message.type === MessageType.ClientLeave) {
const systemLeaveMessage = message as ISequencedDocumentSystemMessage;
const clientId = JSON.parse(systemLeaveMessage.data) as string;
if (clientId === this.connection?.clientId) {
// We have been kicked out from quorum
this.logger.sendPerformanceEvent({ eventName: "ReadConnectionTransition" });
this.downgradedConnection = true;
assert(this.connectionMode === "read", "effective connectionMode should be 'read' after downgrade");
}
}

// Add final ack trace.
if (message.traces !== undefined && message.traces.length > 0) {
const service = this.clientDetails.type === undefined || this.clientDetails.type === ""
Expand Down
3 changes: 1 addition & 2 deletions packages/test/test-end-to-end-tests/src/test/blobs.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -189,10 +189,9 @@ describeFullCompat("blobs", (getTestObjectProvider) => {
const container = await provider.makeTestContainer(testContainerConfig);
const dataStore = await requestFluidObject<ITestDataObject>(container, "default");

(container.deltaManager as any)._inbound.pause();

const blobOpP = new Promise<void>((res) => container.deltaManager.on("submitOp", (op) => {
if (op.contents.includes("blobAttach")) {
(container.deltaManager as any)._inbound.pause();
res();
}
}));
Expand Down