Skip to content

Commit

Permalink
feat: add option to disable thread pool for keystore decryption (#6949)
Browse files Browse the repository at this point in the history
  • Loading branch information
nflaig authored and philknows committed Jul 19, 2024
1 parent f20484b commit 810f4a5
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 25 deletions.
Original file line number Diff line number Diff line change
@@ -1,17 +1,21 @@
import fs from "node:fs";
import path from "node:path";
import bls from "@chainsafe/bls";
import {Keystore} from "@chainsafe/bls-keystore";
import {SignerLocal, SignerType} from "@lodestar/validator";
import {LogLevel, Logger} from "@lodestar/utils";
import {lockFilepath, unlockFilepath} from "../../../util/lockfile.js";
import {LocalKeystoreDefinition} from "./interface.js";
import {clearKeystoreCache, loadKeystoreCache, writeKeystoreCache} from "./keystoreCache.js";
import {DecryptKeystoresThreadPool} from "./decryptKeystores/index.js";

type KeystoreDecryptOptions = {
export type KeystoreDecryptOptions = {
ignoreLockFile?: boolean;
onDecrypt?: (index: number) => void;
// Try to use the cache file if it exists
cacheFilePath?: string;
/** Use main thread to decrypt keystores */
disableThreadPool?: boolean;
logger: Pick<Logger, LogLevel.info | LogLevel.warn | LogLevel.debug>;
signal: AbortSignal;
};
Expand Down Expand Up @@ -57,14 +61,50 @@ export async function decryptKeystoreDefinitions(
const signers = new Array<SignerLocal>(keystoreCount);
const passwords = new Array<string>(keystoreCount);
const errors: KeystoreDecryptError[] = [];
const decryptKeystores = new DecryptKeystoresThreadPool(keystoreCount, opts.signal);

for (const [index, definition] of keystoreDefinitions.entries()) {
lockKeystore(definition.keystorePath, opts);
if (!opts.disableThreadPool) {
const decryptKeystores = new DecryptKeystoresThreadPool(keystoreCount, opts.signal);

for (const [index, definition] of keystoreDefinitions.entries()) {
lockKeystore(definition.keystorePath, opts);

decryptKeystores.queue(
definition,
(secretKeyBytes: Uint8Array) => {
const signer: SignerLocal = {
type: SignerType.Local,
secretKey: bls.SecretKey.fromBytes(secretKeyBytes),
};

signers[index] = signer;
passwords[index] = definition.password;

if (opts?.onDecrypt) {
opts?.onDecrypt(index);
}
},
(error: Error) => {
// In-progress tasks can't be canceled, so there's a chance that multiple errors may be caught
// add to the list of errors
errors.push({keystoreFile: path.basename(definition.keystorePath), error});
// cancel all pending tasks, no need to continue decrypting after we hit one error
decryptKeystores.cancel();
}
);
}

await decryptKeystores.completed();
} else {
// Decrypt keystores in main thread
for (const [index, definition] of keystoreDefinitions.entries()) {
lockKeystore(definition.keystorePath, opts);

try {
const keystore = Keystore.parse(fs.readFileSync(definition.keystorePath, "utf8"));

// Memory-hogging function
const secretKeyBytes = await keystore.decrypt(definition.password);

decryptKeystores.queue(
definition,
(secretKeyBytes: Uint8Array) => {
const signer: SignerLocal = {
type: SignerType.Local,
secretKey: bls.SecretKey.fromBytes(secretKeyBytes),
Expand All @@ -76,19 +116,14 @@ export async function decryptKeystoreDefinitions(
if (opts?.onDecrypt) {
opts?.onDecrypt(index);
}
},
(error: Error) => {
// In-progress tasks can't be canceled, so there's a chance that multiple errors may be caught
// add to the list of errors
errors.push({keystoreFile: path.basename(definition.keystorePath), error});
// cancel all pending tasks, no need to continue decrypting after we hit one error
decryptKeystores.cancel();
} catch (e) {
errors.push({keystoreFile: path.basename(definition.keystorePath), error: e as Error});
// stop processing, no need to continue decrypting after we hit one error
break;
}
);
}
}

await decryptKeystores.completed();

if (errors.length > 0) {
// If an error occurs, the program isn't going to be running,
// so we should unlock all lockfiles we created
Expand Down
8 changes: 8 additions & 0 deletions packages/cli/src/cmds/validator/options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ export type IValidatorCliArgs = AccountValidatorArgs &

importKeystores?: string[];
importKeystoresPassword?: string;
disableKeystoresThreadPool?: boolean;

"http.requestWireFormat"?: string;
"http.responseWireFormat"?: string;
Expand Down Expand Up @@ -301,6 +302,13 @@ export const validatorOptions: CliCommandOptions<IValidatorCliArgs> = {
type: "string",
},

disableKeystoresThreadPool: {
hidden: true,
description:
"Disable thread pool and instead use main thread to decrypt keystores. This can speed up decryption in testing environments like Kurtosis",
type: "boolean",
},

doppelgangerProtection: {
alias: ["doppelgangerProtectionEnabled"],
description: "Enables Doppelganger protection",
Expand Down
2 changes: 2 additions & 0 deletions packages/cli/src/cmds/validator/signers/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ export async function getSignersFromArgs(
ignoreLockFile: args.force,
onDecrypt: needle,
cacheFilePath: path.join(accountPaths.cacheDir, "imported_keystores.cache"),
disableThreadPool: args["disableKeystoresThreadPool"],
logger,
signal,
});
Expand Down Expand Up @@ -133,6 +134,7 @@ export async function getSignersFromArgs(
ignoreLockFile: args.force,
onDecrypt: needle,
cacheFilePath: path.join(accountPaths.cacheDir, "local_keystores.cache"),
disableThreadPool: args["disableKeystoresThreadPool"],
logger,
signal,
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ import {rimraf} from "rimraf";
import {getKeystoresStr} from "@lodestar/test-utils";
import {cachedSeckeysHex} from "../../utils/cachedKeys.js";
import {testFilesDir} from "../../utils.js";
import {decryptKeystoreDefinitions} from "../../../src/cmds/validator/keymanager/decryptKeystoreDefinitions.js";
import {
decryptKeystoreDefinitions,
KeystoreDecryptOptions,
} from "../../../src/cmds/validator/keymanager/decryptKeystoreDefinitions.js";
import {LocalKeystoreDefinition} from "../../../src/cmds/validator/keymanager/interface.js";
import {LockfileError, unlockFilepath} from "../../../src/util/lockfile.js";

Expand Down Expand Up @@ -56,16 +59,20 @@ describe("decryptKeystoreDefinitions", () => {
}
});

testDecryptKeystoreDefinitions(cacheFilePath);
testDecryptKeystoreDefinitions({cacheFilePath});
});

describe("without keystore cache", () => {
testDecryptKeystoreDefinitions();
});

function testDecryptKeystoreDefinitions(cacheFilePath?: string): void {
describe("disabled thread pool", () => {
testDecryptKeystoreDefinitions({disableThreadPool: true});
});

function testDecryptKeystoreDefinitions(opts?: Partial<KeystoreDecryptOptions>): void {
it("decrypt keystores", async () => {
const signers = await decryptKeystoreDefinitions(definitions, {logger: console, signal, cacheFilePath});
const signers = await decryptKeystoreDefinitions(definitions, {logger: console, signal, ...opts});
expect(signers.length).toBe(secretKeys.length);
for (const signer of signers) {
const hexSecret = signer.secretKey.toHex();
Expand All @@ -75,22 +82,22 @@ describe("decryptKeystoreDefinitions", () => {
});

it("fail to decrypt keystores if lockfiles already exist", async () => {
await decryptKeystoreDefinitions(definitions, {logger: console, signal, cacheFilePath});
await decryptKeystoreDefinitions(definitions, {logger: console, signal, ...opts});
// lockfiles should exist after the first run

try {
await decryptKeystoreDefinitions(definitions, {logger: console, signal, cacheFilePath});
await decryptKeystoreDefinitions(definitions, {logger: console, signal, ...opts});
expect.fail("Second decrypt should fail due to failure to get lockfile");
} catch (e) {
expect((e as LockfileError).code).toBe<LockfileError["code"]>("ELOCKED");
}
});

it("decrypt keystores if lockfiles already exist if ignoreLockFile=true", async () => {
await decryptKeystoreDefinitions(definitions, {logger: console, signal, cacheFilePath});
await decryptKeystoreDefinitions(definitions, {logger: console, signal, ...opts});
// lockfiles should exist after the first run

await decryptKeystoreDefinitions(definitions, {logger: console, signal, cacheFilePath, ignoreLockFile: true});
await decryptKeystoreDefinitions(definitions, {logger: console, signal, ...opts, ignoreLockFile: true});
});
}
});

0 comments on commit 810f4a5

Please sign in to comment.