Skip to content

Commit

Permalink
Fix/import export with large files (#16)
Browse files Browse the repository at this point in the history
Fix/import export with large files
  • Loading branch information
lvancraen committed Jun 21, 2019
2 parents d350453 + 9dc268d commit 7054a62
Show file tree
Hide file tree
Showing 5 changed files with 239 additions and 122 deletions.
167 changes: 107 additions & 60 deletions helpers/export.js
Original file line number Diff line number Diff line change
Expand Up @@ -64,16 +64,116 @@ function copyUntouchedTable(inputPath, outputPath, tableName, callback) {
}

function exportTable(tableName, gtfs, outputPath, callback) {
const csv = processGtfsTable(tableName, gtfs);

const actualKeys = gtfs.getActualKeysForTable(tableName);
const firstRow = `${actualKeys.join(',')}`;
const outputFullPath = `${outputPath + tableName}.txt`;
fs.writeFile(outputFullPath, csv, (error) => {
if (error) {
throw error;

fs.writeFile(outputFullPath, firstRow, (writeError) => {
if (writeError) {
throw writeError;
}

const indexKeys = gtfs._schema.indexKeysByTableName[tableName];

if (indexKeys.singleton) {
let item = gtfs.getIndexedTable(tableName);
if (!item) {
callback();
return;
}

if (gtfs._preExportItemFunction) {
item = gtfs._preExportItemFunction(item, tableName);
}

const formattedGtfsRowValues = getObjectValuesUsingKeyOrdering(item, actualKeys);
const row = Papa.unparse({
fields: actualKeys,
data: formattedGtfsRowValues,
},
{
header: false,
});

fs.appendFile(outputFullPath, `\r\n${row}`, callback);
return;
}

infoLog(`[${getHHmmss()}] Table has been exported: ${tableName}`);
callback();
const deepness = gtfs._schema.deepnessByTableName[tableName];

let rows = [];

/*
About why the async wrapper is used inside the async.eachSeries:
If the function async.eachSeries runs without doing anything, just calling the callback (which
happens when there are a lot of empty objects), it crashes. It is a known bug of async.
They don't fix it due to performance reasons (see Common Pitfalls - https://caolan.github.io/async/v3/)
To deal with this, we simply wrap the possible asynchronous function with the keyword async.
The ES2017 async functions are returned as-is.
This is useful for preventing stack overflows (RangeError: Maximum call stack size exceeded),
and generally keeping Zalgo contained. Hence, Async Functions are immune to Zalgo's corrupting influences,
as they always resolve on a later tick.
More info on Zalgo (https://blog.izs.me/2013/08/designing-apis-for-asynchrony)
*/
async.eachSeries(gtfs.getIndexedTable(tableName), async ([key, object]) => {
if (deepness === 0 || deepness === 1) {
if (gtfs._preExportItemFunction) {
object = gtfs._preExportItemFunction(object, tableName, key);
}

const formattedGtfsRowValues = getObjectValuesUsingKeyOrdering(object, actualKeys);
const row = Papa.unparse({
fields: actualKeys,
data: formattedGtfsRowValues,
},
{
header: false,
});

rows.push(`\r\n${row}`);
} else if (deepness === 2) {
object.forEach((subObject, subKey) => {
if (gtfs._preExportItemFunction) {
subObject = gtfs._preExportItemFunction(subObject, tableName, key, subKey);
}

const formattedGtfsRowValues = getObjectValuesUsingKeyOrdering(subObject, actualKeys);
const row = Papa.unparse({
fields: actualKeys,
data: formattedGtfsRowValues,
},
{
header: false,
});

rows.push(`\r\n${row}`);
});
}

if (rows.length < 100) {
return;
}

await fs.appendFile(outputFullPath, rows.join(''));
rows = [];
}, (asyncEachSeriesError) => {
if (asyncEachSeriesError) {
throw asyncEachSeriesError;
}

if (rows.length === 0) {
infoLog(`[${getHHmmss()}] Table has been exported: ${tableName}`);
callback();
return;
}

fs.appendFile(outputFullPath, rows.join(''), (appendingError) => {
if (appendingError) { throw appendingError; }

infoLog(`[${getHHmmss()}] Table has been exported: ${tableName}`);
callback();
});
});
});
}

Expand All @@ -96,59 +196,6 @@ function getObjectValuesUsingKeyOrdering(object, keys) {
});
}

function processGtfsTable(tableName, gtfs) {
let itemMap = gtfs.getIndexedTable(tableName);
if (!itemMap) {
return undefined;
}

const actualKeys = gtfs.getActualKeysForTable(tableName);
const indexKeys = gtfs._schema.indexKeysByTableName[tableName];
const deepness = gtfs._schema.deepnessByTableName[tableName];
const itemValues = [];

if (indexKeys.singleton) {
if (gtfs._preExportItemFunction) {
itemMap = gtfs._preExportItemFunction(itemMap, tableName);
}

const formattedGtfsRowValues = getObjectValuesUsingKeyOrdering(itemMap, actualKeys);
itemValues.push(formattedGtfsRowValues);

return Papa.unparse({
fields: actualKeys,
data: itemValues,
});
}

itemMap.forEach((gtfsRowObjectOrMap, key) => {
if (deepness === 0 || deepness === 1) {
if (gtfs._preExportItemFunction) {
gtfsRowObjectOrMap = gtfs._preExportItemFunction(gtfsRowObjectOrMap, tableName, key);
}

const formattedGtfsRowValues = getObjectValuesUsingKeyOrdering(gtfsRowObjectOrMap, actualKeys);
itemValues.push(formattedGtfsRowValues);
}

if (deepness === 2) {
gtfsRowObjectOrMap.forEach((gtfsRowObject, subKey) => {
if (gtfs._preExportItemFunction) {
gtfsRowObject = gtfs._preExportItemFunction(gtfsRowObject, tableName, key, subKey);
}

const formattedGtfsRowValues = getObjectValuesUsingKeyOrdering(gtfsRowObject, actualKeys);
itemValues.push(formattedGtfsRowValues);
});
}
});

return Papa.unparse({
fields: actualKeys,
data: itemValues,
});
}

/**
* Public function
*/
Expand Down
165 changes: 106 additions & 59 deletions helpers/import.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
const infoLog = require('debug')('gtfsNodeLib:i');
const fs = require('fs-extra');
const Papa = require('papaparse');
const { StringDecoder } = require('string_decoder');

const eachWithLog = require('./logging_iterator_wrapper');

/**
* Import a table in the GTFS.
Expand All @@ -17,8 +20,13 @@ function importTable(gtfs, tableName) {
const fullPath = `${gtfs.getPath() + tableName}.txt`;

if (fs.existsSync(fullPath)) {
const fileContent = fs.readFileSync(fullPath, 'utf8');
gtfs._tables.set(tableName, processGtfsTable(gtfs, fileContent, tableName, indexKeys));
const fileContent = fs.readFileSync(fullPath);
const { keys, rowsSlices } = getKeysAndRowsSlices(
fileContent,
gtfs._regexPatternObjectsByTableName.get(tableName),
tableName
);
gtfs._tables.set(tableName, processGtfsTable(gtfs, keys, rowsSlices, tableName, indexKeys));
} else {
infoLog(`Empty table will be set for table ${tableName} (no input file at path ${gtfs._path}).`);

Expand All @@ -38,87 +46,126 @@ function importTable(gtfs, tableName) {
* Private functions
*/

function processGtfsTable(gtfs, fileContent, tableName, indexKeys) {
const parsedFileContent = Papa.parse(fileContent, {
delimiter: ',',
skipEmptyLines: true,
});
function getKeysAndRowsSlices(buffer, regexPatternObjects, tableName) {
let keys;
const rowsSlices = [];
let rowsSlice;
let position = 0;
const batchLength = 5000000; // 5mb
let merge;
/*
Use string decoder to properly decode utf8 characters. Characters not in the basic ASCII take more
than one byte.
If the end of the batch cuts one of those characters, then we will yield weird characters.
decoder will accumulate any "lost" utf8 character at the end of the batch and accumulate it for the next
iteration.
*/
const decoder = new StringDecoder('utf8');
const rowsSliceRegex = /(.*[\r\n]+)((.*[\r\n]*)*)/;

while (position < buffer.length) {
rowsSlice = decoder.write(buffer.slice(position, Math.min(buffer.length, position + batchLength)));

if (parsedFileContent.errors.length) {
let errorMessage = `Invalid rows in table ${tableName}:\n`;
if (regexPatternObjects) {
regexPatternObjects.forEach(({ regex, pattern }) => {
const modifiedRowsSlice = rowsSlice.replace(regex, pattern || '');

if (modifiedRowsSlice !== rowsSlice) {
process.notices.addInfo(__filename, `Applying regex replace to table: "${tableName}". regex: "${regex}".`);
rowsSlice = modifiedRowsSlice;
}
});
}

parsedFileContent.errors.forEach((error) => {
errorMessage += `Line: ${error.row}
Issue: ${error.message}
Row: ${parsedFileContent.data[error.row].join(',')}`;
});
const rowsSliceIndex = position / batchLength;

if (gtfs._shouldThrow === true) {
throw new Error(errorMessage);
if (!keys) {
const [, firstRowSlice, remainingRowsSlice] = rowsSlice.match(rowsSliceRegex);
keys = firstRowSlice;
rowsSlice = remainingRowsSlice;
}
}

const [keys, ...rows] = parsedFileContent.data;
if (merge) {
const [, firstRowSlice, remainingRowsSlice] = rowsSlice.match(rowsSliceRegex);
rowsSlices[rowsSliceIndex - 1] += firstRowSlice;
rowsSlice = remainingRowsSlice;
}

const trimmedKeys = keys.map(key => key.trim());
checkThatKeysIncludeIndexKeys(trimmedKeys, indexKeys, tableName);
rowsSlices[rowsSliceIndex] = rowsSlice;

const GtfsRow = createGtfsClassForKeys(trimmedKeys);
merge = rowsSlices[rowsSlice.length] !== '\n';
position += batchLength;
}

return processGtfsTableRows(gtfs, tableName, trimmedKeys, rows, indexKeys, GtfsRow);
return {
keys,
rowsSlices,
};
}

function processGtfsTableRows(gtfs, tableName, keys, rows, indexKeys, GtfsRow) {
function processGtfsTable(gtfs, keys, rowsSlices, tableName, indexKeys) {
let table = (indexKeys.setOfItems) ? new Set() : new Map();

const regexPatternObjects = gtfs._regexPatternObjectsByTableName.get(tableName);

rows.forEach((row) => {
if (regexPatternObjects) {
row = applyRegexPatternObjectsByTableName(regexPatternObjects, keys, row, tableName);
}
if (!rowsSlices || rowsSlices.length === 0) {
return table;
}

const trimmedRow = row.map(value => value.trim());
const gtfsRow = new GtfsRow(trimmedRow);
const parsedKeys = Papa.parse(keys, { delimiter: ',', skipEmptyLines: true });
const trimmedKeys = parsedKeys.data[0].map(key => key.trim());
checkThatKeysIncludeIndexKeys(trimmedKeys, indexKeys, tableName);

if (indexKeys.indexKey) {
table.set(gtfsRow[indexKeys.indexKey], gtfsRow);
} else if (indexKeys.firstIndexKey && indexKeys.secondIndexKey) {
if (table.has(gtfsRow[indexKeys.firstIndexKey]) === false) {
table.set(gtfsRow[indexKeys.firstIndexKey], new Map());
}
const GtfsRow = createGtfsClassForKeys(trimmedKeys);
let errorMessage;

table.get(gtfsRow[indexKeys.firstIndexKey]).set(gtfsRow[indexKeys.secondIndexKey], gtfsRow);
} else if (indexKeys.singleton) {
table = gtfsRow;
} else if (indexKeys.setOfItems) {
table.add(gtfsRow);
eachWithLog(`Importation:${tableName}`, rowsSlices, (rowsSlice) => {
if (!rowsSlice || !rowsSlice.trim) {
return;
}
});

return table;
}
rowsSlice = rowsSlice.trim();

function applyRegexPatternObjectsByTableName(regexPatternObjects, keys, row, tableName) {
const rowStringified = String(row);
let modifiedRowStringified = rowStringified;
const parsedRow = Papa.parse(`${keys}${rowsSlice}`, { delimiter: ',', skipEmptyLines: true });

regexPatternObjects.forEach(({ regex, pattern }) => {
modifiedRowStringified = rowStringified.replace(regex, pattern || '');
if (parsedRow.errors.length) {
if (!errorMessage) {
errorMessage = `Invalid rows in table ${tableName}:\n`;
}

if (modifiedRowStringified !== rowStringified) {
process.notices.addInfo(
'Applying Changes on Raw GTFS',
`Applying regex replace to table: "${tableName}". regex: "${regex}".`
);
parsedRow.errors.forEach((error) => {
errorMessage += `Line: ${error.row}
Issue: ${error.message}
Row: ${parsedRow.data[error.row].join(',')}`;
});
}
});

const parsedModifiedRow = Papa.parse(`${keys}\n${modifiedRowStringified}`, {
delimiter: ',',
for (let i = 1; i < parsedRow.data.length; i += 1) { // we only want to add to the table the remaining rows
const row = parsedRow.data[i];
const trimmedRow = row.map(value => value.trim());
if (trimmedRow !== null) {
const item = new GtfsRow(trimmedRow);

if (indexKeys.indexKey) {
table.set(item[indexKeys.indexKey], item);
} else if (indexKeys.firstIndexKey && indexKeys.secondIndexKey) {
if (table.has(item[indexKeys.firstIndexKey]) === false) {
table.set(item[indexKeys.firstIndexKey], new Map());
}

table.get(item[indexKeys.firstIndexKey]).set(item[indexKeys.secondIndexKey], item);
} else if (indexKeys.singleton) {
table = item;
} else if (indexKeys.setOfItems) {
table.add(item);
}
}
}
});

return parsedModifiedRow.data[1];
if (errorMessage && gtfs._shouldThrow) {
throw new Error(errorMessage);
}

return table;
}

function checkThatKeysIncludeIndexKeys(sortedKeys, indexKeys, tableName) {
Expand Down
Loading

0 comments on commit 7054a62

Please sign in to comment.