Skip to content

Commit

Permalink
Fix: import for large files
Browse files Browse the repository at this point in the history
  • Loading branch information
lvancraen committed Jun 21, 2019
1 parent 0aa36d1 commit c0d5bf4
Showing 1 changed file with 107 additions and 59 deletions.
166 changes: 107 additions & 59 deletions helpers/import.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
const infoLog = require('debug')('gtfsNodeLib:i');
const fs = require('fs-extra');
const Papa = require('papaparse');
const { StringDecoder } = require('string_decoder');

const eachWithLog = require('./logging_iterator_wrapper');

/**
* Import a table in the GTFS.
Expand All @@ -17,8 +20,13 @@ function importTable(gtfs, tableName) {
const fullPath = `${gtfs.getPath() + tableName}.txt`;

if (fs.existsSync(fullPath)) {
const fileContent = fs.readFileSync(fullPath, 'utf8');
gtfs._tables.set(tableName, processGtfsTable(gtfs, fileContent, tableName, indexKeys));
const fileContent = fs.readFileSync(fullPath);
const { keys, rowsSlices } = getKeysAndRowsSlices(
fileContent,
gtfs._regexPatternObjectsByTableName.get(tableName),
tableName
);
gtfs._tables.set(tableName, processGtfsTable(gtfs, keys, rowsSlices, tableName, indexKeys));
} else {
infoLog(`Empty table will be set for table ${tableName} (no input file at path ${gtfs._path}).`);

Expand All @@ -38,87 +46,127 @@ function importTable(gtfs, tableName) {
* Private functions
*/

function processGtfsTable(gtfs, fileContent, tableName, indexKeys) {
const parsedFileContent = Papa.parse(fileContent, {
delimiter: ',',
skipEmptyLines: true,
});
function getKeysAndRowsSlices(buffer, regexPatternObjects, tableName) {
let keys;
const rowsSlices = [];
let rowsSlice;
let position = 0;
const batchLength = 5000000; // 5mb
let merge;
/*
Use string decoder to properly decode utf8 characters. Characters not in the basic ASCII take more
than one byte.
If the end of the batch cuts one of those characters, then we will yield weird characters.
decoder will accumulate any "lost" utf8 character at the end of the batch and accumulate it for the next
iteration.
*/
const decoder = new StringDecoder('utf8');
const rowsSliceRegex = /(.*[\r\n]+)((.*[\r\n]*)*)/;

while (position < buffer.length) {
rowsSlice = decoder.write(buffer.slice(position, Math.min(buffer.length, position + batchLength)));

if (parsedFileContent.errors.length) {
let errorMessage = `Invalid rows in table ${tableName}:\n`;
if (regexPatternObjects) {
regexPatternObjects.forEach(({ regex, pattern }) => {
const modifiedRowsSlice = rowsSlice.replace(regex, pattern || '');

if (modifiedRowsSlice !== rowsSlice) {
process.notices.addInfo(__filename, `Applying regex replace to table: "${tableName}". regex: "${regex}".`);
rowsSlice = modifiedRowsSlice;
}
});
}

parsedFileContent.errors.forEach((error) => {
errorMessage += `Line: ${error.row}
Issue: ${error.message}
Row: ${parsedFileContent.data[error.row].join(',')}`;
});
const rowsSliceIndex = position / batchLength;

if (gtfs._shouldThrow === true) {
throw new Error(errorMessage);
if (!keys) {
const [, firstRowSlice, remainingRowsSlice] = rowsSlice.match(rowsSliceRegex);
keys = firstRowSlice;
rowsSlice = remainingRowsSlice;
}
}

const [keys, ...rows] = parsedFileContent.data;
if (merge) {
const [, firstRowSlice, remainingRowsSlice] = rowsSlice.match(rowsSliceRegex);
rowsSlices[rowsSliceIndex - 1] += firstRowSlice;
rowsSlice = remainingRowsSlice;
}

const trimmedKeys = keys.map(key => key.trim());
checkThatKeysIncludeIndexKeys(trimmedKeys, indexKeys, tableName);
rowsSlices[rowsSliceIndex] = rowsSlice;

const GtfsRow = createGtfsClassForKeys(trimmedKeys);
merge = rowsSlices[rowsSlice.length] !== '\n';
position += batchLength;
}

return processGtfsTableRows(gtfs, tableName, trimmedKeys, rows, indexKeys, GtfsRow);
return {
keys,
rowsSlices,
};
}

function processGtfsTableRows(gtfs, tableName, keys, rows, indexKeys, GtfsRow) {
function processGtfsTable(gtfs, keys, rowsSlices, tableName, indexKeys) {
let table = (indexKeys.setOfItems) ? new Set() : new Map();

const regexPatternObjects = gtfs._regexPatternObjectsByTableName.get(tableName);

rows.forEach((row) => {
if (regexPatternObjects) {
row = applyRegexPatternObjectsByTableName(regexPatternObjects, keys, row, tableName);
}
if (rowsSlices === undefined || rowsSlices === null || rowsSlices.length === 0) {
return table;
}

const trimmedRow = row.map(value => value.trim());
const gtfsRow = new GtfsRow(trimmedRow);
const parsedKeys = Papa.parse(keys, { delimiter: ',', skipEmptyLines: true });
const trimmedKeys = parsedKeys.data[0].map(key => key.trim());
checkThatKeysIncludeIndexKeys(trimmedKeys, indexKeys, tableName);

if (indexKeys.indexKey) {
table.set(gtfsRow[indexKeys.indexKey], gtfsRow);
} else if (indexKeys.firstIndexKey && indexKeys.secondIndexKey) {
if (table.has(gtfsRow[indexKeys.firstIndexKey]) === false) {
table.set(gtfsRow[indexKeys.firstIndexKey], new Map());
}
const GtfsRow = createGtfsClassForKeys(trimmedKeys);
let errorMessage;

table.get(gtfsRow[indexKeys.firstIndexKey]).set(gtfsRow[indexKeys.secondIndexKey], gtfsRow);
} else if (indexKeys.singleton) {
table = gtfsRow;
} else if (indexKeys.setOfItems) {
table.add(gtfsRow);
eachWithLog(`Importation:${tableName}`, rowsSlices, (rowsSlice) => {
if (!rowsSlice || !rowsSlice.trim) {
return;
}
});

return table;
}
rowsSlice = rowsSlice.trim();

function applyRegexPatternObjectsByTableName(regexPatternObjects, keys, row, tableName) {
const rowStringified = String(row);
let modifiedRowStringified = rowStringified;
const parsedRow = Papa.parse(`${keys}${rowsSlice}`, { delimiter: ',', skipEmptyLines: true });

regexPatternObjects.forEach(({ regex, pattern }) => {
modifiedRowStringified = rowStringified.replace(regex, pattern || '');
if (parsedRow.errors.length) {
if (!errorMessage) {
errorMessage = `Invalid rows in table ${tableName}:\n`;
}

if (modifiedRowStringified !== rowStringified) {
process.notices.addInfo(
'Applying Changes on Raw GTFS',
`Applying regex replace to table: "${tableName}". regex: "${regex}".`
);
parsedRow.errors.forEach((error) => {
errorMessage += `Line: ${error.row}
Issue: ${error.message}
Row: ${parsedRow.data[error.row].join(',')}`;
});
}
});

const parsedModifiedRow = Papa.parse(`${keys}\n${modifiedRowStringified}`, {
delimiter: ',',
const [, ...rows] = parsedRow.data; // we don't need the header row, it's already stored in parsedKeys/trimmedKeys

rows.forEach((row) => {
const trimmedRow = row.map(value => value.trim());
if (trimmedRow !== null) {
const item = new GtfsRow(trimmedRow);

if (indexKeys.indexKey) {
table.set(item[indexKeys.indexKey], item);
} else if (indexKeys.firstIndexKey && indexKeys.secondIndexKey) {
if (table.has(item[indexKeys.firstIndexKey]) === false) {
table.set(item[indexKeys.firstIndexKey], new Map());
}

table.get(item[indexKeys.firstIndexKey]).set(item[indexKeys.secondIndexKey], item);
} else if (indexKeys.singleton) {
table = item;
} else if (indexKeys.setOfItems) {
table.add(item);
}
}
});
});

return parsedModifiedRow.data[1];
if (errorMessage && gtfs._shouldThrow) {
throw new Error(errorMessage);
}

return table;
}

function checkThatKeysIncludeIndexKeys(sortedKeys, indexKeys, tableName) {
Expand Down

0 comments on commit c0d5bf4

Please sign in to comment.