Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature(gatsby-source-drupal): Use list of UUIDs generated by Drupal to fetch content individually #32131

Closed
wants to merge 3 commits into from
Closed
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
211 changes: 87 additions & 124 deletions packages/gatsby-source-drupal/src/gatsby-node.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ const requestQueue = require(`fastq`).promise(worker, 20)
const asyncPool = require(`tiny-async-pool`)
const bodyParser = require(`body-parser`)

const REPORTING_RATE_MS = 10000

function gracefullyRethrow(activity, error) {
// activity.panicOnBuild was implemented at some point in gatsby@2
// but plugin can still be used with older version of gatsby core
Expand Down Expand Up @@ -77,7 +79,7 @@ exports.sourceNodes = async (
apiBase = `jsonapi`,
basicAuth = {},
filters,
headers,
headers = {},
params = {},
concurrentFileRequests = 20,
concurrentAPIRequests = 20,
Expand All @@ -95,12 +97,17 @@ exports.sourceNodes = async (
enabledLanguages: [`und`],
translatableEntities: [],
},
useAuthOn = [],
} = pluginOptions
const { createNode, setPluginStatus, touchNode } = actions

// Update the concurrency limit from the plugin options
requestQueue.concurrency = concurrentAPIRequests

if (typeof basicAuth.username === 'string' && typeof basicAuth.password === 'string') {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to work around: sindresorhus/got#1169

headers['Authorization'] = `Basic ${Buffer.from(`${basicAuth.username}:${basicAuth.password}`).toString('base64')}`
}

if (webhookBody && Object.keys(webhookBody).length) {
const changesActivity = reporter.activityTimer(
`loading Drupal content changes`,
Expand Down Expand Up @@ -173,8 +180,6 @@ exports.sourceNodes = async (
const res = await requestQueue.push([
urlJoin(baseUrl, `gatsby-fastbuilds/sync/`, lastFetched.toString()),
{
username: basicAuth.username,
password: basicAuth.password,
headers,
searchParams: params,
responseType: `json`,
Expand Down Expand Up @@ -268,143 +273,101 @@ exports.sourceNodes = async (

drupalFetchActivity.start()

let allData
try {
const res = await requestQueue.push([
urlJoin(baseUrl, apiBase),
{
username: basicAuth.username,
password: basicAuth.password,
headers,
searchParams: params,
responseType: `json`,
},
])
allData = await Promise.all(
_.map(res.body.links, async (url, type) => {
const dataArray = []
if (disallowedLinkTypes.includes(type)) return
if (!url) return
if (!type) return

// Lookup this type in our list of language alterable entities.
const isTranslatable = languageConfig.translatableEntities.some(
entityType => entityType === type
)
const listResponse = await worker([
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you should be using the requestQueue here and other places

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I figured out how to do that yesterday. Feels weird pushing a return value from a queue.push to a queue.push but here we go :)

urlJoin(baseUrl, 'gatsby/content-list'),
{
headers,
responseType: `json`,
}
])

const getNext = async url => {
if (typeof url === `object`) {
// url can be string or object containing href field
url = url.href

// Apply any filters configured in gatsby-config.js. Filters
// can be any valid JSON API filter query string.
// See https://www.drupal.org/docs/8/modules/jsonapi/filtering
if (typeof filters === `object`) {
if (filters.hasOwnProperty(type)) {
url = new URL(url)
const filterParams = new URLSearchParams(filters[type])
const filterKeys = Array.from(filterParams.keys())
filterKeys.forEach(filterKey => {
// Only add filter params to url if it has not already been
// added.
if (!url.searchParams.has(filterKey)) {
url.searchParams.set(filterKey, filterParams.get(filterKey))
}
})
url = url.toString()
}
}
}
const requestUrls = []
for (let entityTypeAndBundle in listResponse.body) {
if (disallowedLinkTypes.indexOf(entityTypeAndBundle) !== -1) continue

let d
try {
d = await requestQueue.push([
url,
{
username: basicAuth.username,
password: basicAuth.password,
headers,
responseType: `json`,
},
])
} catch (error) {
if (error.response && error.response.statusCode == 405) {
// The endpoint doesn't support the GET method, so just skip it.
return
} else {
console.error(`Failed to fetch ${url}`, error.message)
console.log(error)
throw error
}
}
dataArray.push(...d.body.data)
// Add support for includes. Includes allow entity data to be expanded
// based on relationships. The expanded data is exposed as `included`
// in the JSON API response.
// See https://www.drupal.org/docs/8/modules/jsonapi/includes
if (d.body.included) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're losing "included"

dataArray.push(...d.body.included)
}
if (d.body.links && d.body.links.next) {
await getNext(d.body.links.next)
}
}
const [entityType, entityBundle] = entityTypeAndBundle.split('--')

if (isTranslatable === false) {
await getNext(url)
} else {
for (let i = 0; i < languageConfig.enabledLanguages.length; i++) {
let currentLanguage = languageConfig.enabledLanguages[i]
const urlPath = url.href.split(`${apiBase}/`).pop()
const baseUrlWithoutTrailingSlash = baseUrl.replace(/\/$/, ``)
// The default language's JSON API is at the root.
if (
currentLanguage === getOptions().languageConfig.defaultLanguage ||
baseUrlWithoutTrailingSlash.slice(-currentLanguage.length) ==
currentLanguage
) {
currentLanguage = ``
}
for (let entityUuid of listResponse.body[entityTypeAndBundle]) {
const isTranslatable = languageConfig.translatableEntities.some(
translatableEntityType => translatableEntityType === entityTypeAndBundle
)

const joinedUrl = urlJoin(
baseUrlWithoutTrailingSlash,
currentLanguage,
apiBase,
urlPath
)
const dataForLanguage = await getNext(joinedUrl)
requestUrls.push({
language: '',
entityType,
entityBundle,
entityUuid
})

dataArray.push(...dataForLanguage)
if (isTranslatable) {
for (let language of languageConfig.enabledLanguages) {
if (language !== languageConfig.defaultLanguage) {
requestUrls.push({
language,
entityType,
entityBundle,
entityUuid
})
}
}
}
}
}

const result = {
type,
data: dataArray,
let allData = []
let nRequests = 0
let nCachedRequests = 0
let lastReportRequests = 0
let lastReportTime = Date.now()
let lastFetchedContentType = ''
await asyncPool(concurrentAPIRequests, requestUrls, async ({ language, entityType, entityBundle, entityUuid }) => {
nRequests++

if (lastFetchedContentType !== `${entityType}--${entityBundle}`) {
lastFetchedContentType = `${entityType}--${entityBundle}`
const percentageCompleted = ((nRequests / requestUrls.length) * 100).toFixed(2)
const cacheHitRate = ((nCachedRequests / nRequests) * 100).toFixed(2)
reporter.info(`Starting ${lastFetchedContentType} (${percentageCompleted}%) (CHR: ${cacheHitRate}%)`)
}

if ((Date.now() - lastReportTime) >= REPORTING_RATE_MS) {
const nRequestsDoneThisPeriod = nRequests - lastReportRequests
const requestsPerSecond = Math.floor(nRequestsDoneThisPeriod / (REPORTING_RATE_MS / 1000))
const percentageCompleted = ((nRequests / requestUrls.length) * 100).toFixed(2)
const cacheHitRate = ((nCachedRequests / nRequests) * 100).toFixed(2)
reporter.info(`${requestsPerSecond}rps (${nRequests}/${requestUrls.length}) (${percentageCompleted}%) (CHR: ${cacheHitRate}%)`)

lastReportTime = Date.now()
lastReportRequests = nRequests
}

const shouldUseAuth = useAuthOn.indexOf(`${entityType}--${entityBundle}`) !== -1
try {
const entityResponse = await worker([
urlJoin(baseUrl, language, apiBase, `/${entityType}/${entityBundle}/${entityUuid}`),
{
headers: shouldUseAuth ? headers : undefined,
responseType: `json`,
}
])

// eslint-disable-next-line consistent-return
return result
})
)
} catch (e) {
gracefullyRethrow(drupalFetchActivity, e)
return
}
if (parseInt(entityResponse.headers.age) > 0) {
nCachedRequests++
}

allData.push(entityResponse.body.data)
} catch (err) {}
})

drupalFetchActivity.end()

const nodes = new Map()

// first pass - create basic nodes
_.each(allData, contentType => {
if (!contentType) return
_.each(contentType.data, datum => {
if (!datum) return
const node = nodeFromData(datum, createNodeId, entityReferenceRevisions)
nodes.set(node.id, node)
})
_.each(allData, datum => {
if (!datum) return
const node = nodeFromData(datum, createNodeId, entityReferenceRevisions)
nodes.set(node.id, node)
})

// second pass - handle relationships and back references
Expand Down