Skip to content

Commit

Permalink
feat(gatsby-core-utils): create proper mutex (#34761)
Browse files Browse the repository at this point in the history
  • Loading branch information
wardpeet committed Feb 16, 2022
1 parent 21ef185 commit f2d4830
Show file tree
Hide file tree
Showing 11 changed files with 333 additions and 26 deletions.
1 change: 1 addition & 0 deletions docs/docs/how-to/testing/unit-testing.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ module.exports = {
".+\\.(css|styl|less|sass|scss)$": `identity-obj-proxy`,
".+\\.(jpg|jpeg|png|gif|eot|otf|webp|svg|ttf|woff|woff2|mp4|webm|wav|mp3|m4a|aac|oga)$": `<rootDir>/__mocks__/file-mock.js`,
"^gatsby-page-utils/(.*)$": `gatsby-page-utils/dist/$1`, // Workaround for https://github.com/facebook/jest/issues/9771
"^gatsby-core-utils/(.*)$": `gatsby-core-utils/dist/$1`, // Workaround for https://github.com/facebook/jest/issues/9771
},
testPathIgnorePatterns: [`node_modules`, `\\.cache`, `<rootDir>.*/public`],
transformIgnorePatterns: [`node_modules/(?!(gatsby)/)`],
Expand Down
1 change: 1 addition & 0 deletions examples/using-jest/jest.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ module.exports = {
".+\\.(css|styl|less|sass|scss)$": `identity-obj-proxy`,
".+\\.(jpg|jpeg|png|gif|eot|otf|webp|svg|ttf|woff|woff2|mp4|webm|wav|mp3|m4a|aac|oga)$": `<rootDir>/__mocks__/file-mock.js`,
"^gatsby-page-utils/(.*)$": `gatsby-page-utils/dist/$1`, // Workaround for https://github.com/facebook/jest/issues/9771
"^gatsby-core-utils/(.*)$": `gatsby-core-utils/dist/$1`, // Workaround for https://github.com/facebook/jest/issues/9771
},
testPathIgnorePatterns: [`node_modules`, `.cache`],
transformIgnorePatterns: [`node_modules/(?!(gatsby)/)`],
Expand Down
1 change: 1 addition & 0 deletions jest.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ module.exports = {
"^ordered-binary$": `<rootDir>/node_modules/ordered-binary/dist/index.cjs`,
"^msgpackr$": `<rootDir>/node_modules/msgpackr/dist/node.cjs`,
"^gatsby-page-utils/(.*)$": `gatsby-page-utils/dist/$1`, // Workaround for https://github.com/facebook/jest/issues/9771
"^gatsby-core-utils/(.*)$": `gatsby-core-utils/dist/$1`, // Workaround for https://github.com/facebook/jest/issues/9771
},
snapshotSerializers: [`jest-serializer-path`],
collectCoverageFrom: coverageDirs,
Expand Down
17 changes: 17 additions & 0 deletions packages/gatsby-core-utils/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,3 +104,20 @@ const requireUtil = createRequireFromPath("../src/utils/")
requireUtil("./some-tool")
// ...
```

### Mutex

When working inside workers or async operations you want some kind of concurrency control that a specific work load can only concurrent one at a time. This is what a [Mutex](https://en.wikipedia.org/wiki/Mutual_exclusion) does.

By implementing the following code, the code is only executed one at a time and the other threads/async workloads are awaited until the current one is done. This is handy when writing to the same file to disk.

```js
const { createMutex } = require("gatsby-core-utils/mutex")

const mutex = createMutex("my-custom-mutex-key")
await mutex.acquire()

await fs.writeFile("pathToFile", "my custom content")

await mutex.release()
```
15 changes: 15 additions & 0 deletions packages/gatsby-core-utils/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,18 @@
"gatsby",
"gatsby-core-utils"
],
"exports": {
".": "./dist/index.js",
"./*": "./dist/*.js"
},
"typesVersions": {
"*": {
"*": [
"dist/*.d.ts",
"dist/index.d.ts"
]
}
},
"author": "Ward Peeters <ward@coding-tech.com>",
"homepage": "https://github.com/gatsbyjs/gatsby/tree/master/packages/gatsby-core-utils#readme",
"license": "MIT",
Expand Down Expand Up @@ -36,9 +48,12 @@
"file-type": "^16.5.3",
"fs-extra": "^10.0.0",
"got": "^11.8.3",
"import-from": "^4.0.0",
"lock": "^1.1.0",
"lmdb": "^2.1.7",
"node-object-hash": "^2.3.10",
"proper-lockfile": "^4.1.2",
"resolve-from": "^5.0.0",
"tmp": "^0.2.1",
"xdg-basedir": "^4.0.0"
},
Expand Down
99 changes: 99 additions & 0 deletions packages/gatsby-core-utils/src/__tests__/mutex.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import path from "path"
import { remove, mkdirp } from "fs-extra"
import { createMutex } from "../mutex"
import * as storage from "../utils/get-storage"

jest.spyOn(storage, `getDatabaseDir`)

function sleep(timeout = 100): Promise<void> {
return new Promise(resolve => setTimeout(resolve, timeout))
}

async function doAsync(
mutex: ReturnType<typeof createMutex>,
result: Array<string> = [],
waitTime: number,
id: string
): Promise<Array<string>> {
await mutex.acquire()
result.push(`start ${id}`)
await sleep(waitTime)
result.push(`stop ${id}`)
await mutex.release()

return result
}

describe(`mutex`, () => {
const cachePath = path.join(__dirname, `.cache`)
beforeAll(async () => {
await mkdirp(cachePath)
storage.getDatabaseDir.mockReturnValue(cachePath)
})

afterAll(async () => {
await storage.closeDatabase()
await remove(cachePath)
})

it(`should only allow one action go through at the same time`, async () => {
const mutex = createMutex(`test-key`, 300)

const result: Array<string> = []

doAsync(mutex, result, 50, `1`)
await sleep(0)
await doAsync(mutex, result, 10, `2`)

expect(result).toMatchInlineSnapshot(`
Array [
"start 1",
"stop 1",
"start 2",
"stop 2",
]
`)
})

it(`should generate the same mutex if key are identical`, async () => {
const mutex1 = createMutex(`test-key`, 300)
const mutex2 = createMutex(`test-key`, 300)

const result: Array<string> = []

const mutexPromise = doAsync(mutex1, result, 50, `1`)
await sleep(0)
await doAsync(mutex2, result, 10, `2`)
await mutexPromise

expect(result).toMatchInlineSnapshot(`
Array [
"start 1",
"stop 1",
"start 2",
"stop 2",
]
`)
})

it(`shouldn't wait if keys are different`, async () => {
const mutex1 = createMutex(`test-key`, 300)
const mutex2 = createMutex(`other-key`, 300)

const result: Array<string> = []

const mutexPromise = doAsync(mutex1, result, 50, `1`)
await sleep(0)
await doAsync(mutex2, result, 10, `2`)
await mutexPromise

expect(result).toMatchInlineSnapshot(`
Array [
"start 1",
"start 2",
"stop 2",
"stop 1",
]
`)
})
})
57 changes: 57 additions & 0 deletions packages/gatsby-core-utils/src/mutex.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import { getStorage, LockStatus, getDatabaseDir } from "./utils/get-storage"

interface IMutex {
acquire(): Promise<void>
release(): Promise<void>
}

// Random number to re-check if mutex got released
const DEFAULT_MUTEX_INTERVAL = 3000

async function waitUntilUnlocked(
storage: ReturnType<typeof getStorage>,
key: string,
timeout: number
): Promise<void> {
const isUnlocked = await storage.mutex.ifNoExists(key, () => {
storage.mutex.put(key, LockStatus.Locked)
})

if (isUnlocked) {
return
}

await new Promise<void>(resolve => {
setTimeout(() => {
resolve(waitUntilUnlocked(storage, key, timeout))
}, timeout)
})
}

/**
* Creates a mutex, make sure to call `release` when you're done with it.
*
* @param {string} key A unique key
*/
export function createMutex(
key: string,
timeout = DEFAULT_MUTEX_INTERVAL
): IMutex {
const storage = getStorage(getDatabaseDir())
const BUILD_ID = global.__GATSBY?.buildId ?? ``
const prefixedKey = `${BUILD_ID}-${key}`

return {
acquire: (): Promise<void> =>
waitUntilUnlocked(storage, prefixedKey, timeout),
release: async (): Promise<void> => {
await storage.mutex.remove(prefixedKey)
},
}
}

export async function releaseAllMutexes(): Promise<void> {
const storage = getStorage(getDatabaseDir())

await storage.mutex.clearAsync()
}
16 changes: 16 additions & 0 deletions packages/gatsby-core-utils/src/utils/get-lmdb.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import path from "path"
import importFrom from "import-from"
import resolveFrom from "resolve-from"

export function getLmdb(): typeof import("lmdb") {
const gatsbyPkgRoot = path.dirname(
resolveFrom(process.cwd(), `gatsby/package.json`)
)

// Try to use lmdb from gatsby if not we use our own version
try {
return importFrom(gatsbyPkgRoot, `lmdb`) as typeof import("lmdb")
} catch (err) {
return require(`lmdb`)
}
}
68 changes: 68 additions & 0 deletions packages/gatsby-core-utils/src/utils/get-storage.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import path from "path"
import { getLmdb } from "./get-lmdb"
import type { RootDatabase, Database } from "lmdb"

export enum LockStatus {
Locked = 0,
Unlocked = 1,
}

interface ICoreUtilsDatabase {
mutex: Database<LockStatus, string>
}

let databases: ICoreUtilsDatabase | undefined
let rootDb: RootDatabase

export function getDatabaseDir(): string {
const rootDir = global.__GATSBY?.root ?? process.cwd()
return path.join(rootDir, `.cache`, `data`, `gatsby-core-utils`)
}

export function getStorage(fullDbPath: string): ICoreUtilsDatabase {
if (!databases) {
if (!fullDbPath) {
throw new Error(`LMDB path is not set!`)
}

// __GATSBY_OPEN_LMDBS tracks if we already opened given db in this process
// In `gatsby serve` case we might try to open it twice - once for engines
// and second to get access to `SitePage` nodes (to power trailing slashes
// redirect middleware). This ensure there is single instance within a process.
// Using more instances seems to cause weird random errors.
if (!globalThis.__GATSBY_OPEN_LMDBS) {
globalThis.__GATSBY_OPEN_LMDBS = new Map()
}

databases = globalThis.__GATSBY_OPEN_LMDBS.get(fullDbPath)

if (databases) {
return databases
}

const open = getLmdb().open

rootDb = open({
name: `root`,
path: fullDbPath,
compression: true,
sharedStructuresKey: Symbol.for(`structures`),
})

databases = {
mutex: rootDb.openDB({
name: `mutex`,
}),
}

globalThis.__GATSBY_OPEN_LMDBS.set(fullDbPath, databases)
}

return databases as ICoreUtilsDatabase
}

export async function closeDatabase(): Promise<void> {
if (rootDb) {
await rootDb.close()
}
}
51 changes: 25 additions & 26 deletions packages/gatsby/src/services/initialize.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import _ from "lodash"
import { slash, isCI } from "gatsby-core-utils"
import { releaseAllMutexes } from "gatsby-core-utils/mutex"
import fs from "fs-extra"
import md5File from "md5-file"
import crypto from "crypto"
Expand Down Expand Up @@ -412,34 +413,29 @@ export async function initialize({
// }
// }

if (
process.env.GATSBY_EXPERIMENTAL_PRESERVE_FILE_DOWNLOAD_CACHE ||
process.env.GATSBY_EXPERIMENTAL_PRESERVE_WEBPACK_CACHE
) {
const deleteGlobs = [
// By default delete all files & subdirectories
`${cacheDirectory}/**`,
`${cacheDirectory}/*/`,
]

if (process.env.GATSBY_EXPERIMENTAL_PRESERVE_FILE_DOWNLOAD_CACHE) {
// Stop the caches directory from being deleted, add all sub directories,
// but remove gatsby-source-filesystem
deleteGlobs.push(`!${cacheDirectory}/caches`)
deleteGlobs.push(`${cacheDirectory}/caches/*`)
deleteGlobs.push(`!${cacheDirectory}/caches/gatsby-source-filesystem`)
}
const deleteGlobs = [
// By default delete all files & subdirectories
`${cacheDirectory}/**`,
`!${cacheDirectory}/data`,
`${cacheDirectory}/data/**`,
`!${cacheDirectory}/data/gatsby-core-utils/`,
`!${cacheDirectory}/data/gatsby-core-utils/**`,
]

if (process.env.GATSBY_EXPERIMENTAL_PRESERVE_FILE_DOWNLOAD_CACHE) {
// Stop the caches directory from being deleted, add all sub directories,
// but remove gatsby-source-filesystem
deleteGlobs.push(`!${cacheDirectory}/caches`)
deleteGlobs.push(`${cacheDirectory}/caches/*`)
deleteGlobs.push(`!${cacheDirectory}/caches/gatsby-source-filesystem`)
}

if (process.env.GATSBY_EXPERIMENTAL_PRESERVE_WEBPACK_CACHE) {
// Add webpack
deleteGlobs.push(`!${cacheDirectory}/webpack`)
}
await del(deleteGlobs)
} else {
// Attempt to empty dir if remove fails,
// like when directory is mount point
await fs.remove(cacheDirectory).catch(() => fs.emptyDir(cacheDirectory))
if (process.env.GATSBY_EXPERIMENTAL_PRESERVE_WEBPACK_CACHE) {
// Add webpack
deleteGlobs.push(`!${cacheDirectory}/webpack`)
}

await del(deleteGlobs)
} catch (e) {
reporter.error(`Failed to remove .cache files.`, e)
}
Expand All @@ -450,6 +446,9 @@ export async function initialize({
cacheIsCorrupt,
})

// make sure all previous mutexes are released
await releaseAllMutexes()

// in future this should show which plugin's caches are purged
// possibly should also have which plugins had caches
telemetry.decorateEvent(`BUILD_END`, {
Expand Down
Loading

0 comments on commit f2d4830

Please sign in to comment.