Skip to content

Commit

Permalink
Refactor: re-implement csv import using papa parse
Browse files Browse the repository at this point in the history
  • Loading branch information
lvancraen committed Apr 17, 2019
1 parent fdf5255 commit bff1f39
Showing 1 changed file with 83 additions and 93 deletions.
176 changes: 83 additions & 93 deletions helpers/import.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,21 @@

const infoLog = require('debug')('gtfsNodeLib:i');
const fs = require('fs-extra');
const { StringDecoder } = require('string_decoder');

const eachWithLog = require('./logging_iterator_wrapper');
const { fromCsvStringToArray } = require('./csv');
const Papa = require('papaparse');

/**
* Import a table in the GTFS.
*
* @param {Gtfs} gtfs The GTFS in which to import the table.
* @param {string} tableName The table of the name to import.
*/

exports.importTable = (gtfs, tableName) => {
function importTable(gtfs, tableName) {
const indexKeys = gtfs._schema.indexKeysByTableName[tableName];
const fullPath = `${gtfs.getPath() + tableName}.txt`;

if (fs.existsSync(fullPath)) {
const fileContent = fs.readFileSync(fullPath);
const rows = getRows(fileContent, gtfs._regexPatternObjectsByTableName.get(tableName), tableName);

gtfs._tables.set(tableName, processRows(gtfs, tableName, indexKeys, rows, gtfs._shouldThrow));
const fileContent = fs.readFileSync(fullPath, 'utf8');
gtfs._tables.set(tableName, processGtfsTable(gtfs, fileContent, tableName, indexKeys));
} else {
infoLog(`Empty table will be set for table ${tableName} (no input file at path ${gtfs._path}).`);

Expand All @@ -38,93 +32,22 @@ exports.importTable = (gtfs, tableName) => {
gtfs.forEachItemInTable(tableName, gtfs._postImportItemFunction);
}
}
};
}

/**
* Private functions
*/

function getRows(buffer, regexPatternObjects, tableName) {
const rows = [];
let rowsSlice;
let position = 0;
const batchLength = 50000;
let merge;
/*
Use string decoder to properly decode utf8 characters. Characters not in the basic ASCII take more
than one byte.
If the end of the batch cuts one of those characters, then we will yield weird characters.
decoder will accumulate any "lost" utf8 character at the end of the batch and accumulate it for the next
iteration.
*/
const decoder = new StringDecoder('utf8');

while (position < buffer.length) {
rowsSlice = decoder.write(buffer.slice(position, Math.min(buffer.length, position + batchLength)));

if (regexPatternObjects) {
regexPatternObjects.forEach(({ regex, pattern }) => {
const modifiedRowsSlice = rowsSlice.replace(regex, pattern || '');

if (modifiedRowsSlice !== rowsSlice) {
process.notices.addInfo(__filename, `Applying regex replace to table: "${tableName}". regex: "${regex}".`);
rowsSlice = modifiedRowsSlice;
}
});
}

rowsSlice.split('\n').forEach((row, i) => {
if (i === 0 && merge) {
rows[rows.length - 1] += row;
} else {
rows.push(row);
}
});

merge = rowsSlice[rowsSlice.length] !== '\n';
position += batchLength;
}

return rows;
}

function processRows(gtfs, tableName, indexKeys, rows, shouldThrow) {
function processGtfsTable(gtfs, fileContent, tableName, indexKeys) {
let table = (indexKeys.setOfItems) ? new Set() : new Map();

if (rows === undefined || rows === null || rows.length === 0) {
return table;
}

const sortedKeys = fromCsvStringToArray(rows[0], tableName).map(key => key.trim());

const GtfsRow = createGtfsClassForKeys(sortedKeys);

checkThatKeysIncludeIndexKeys(sortedKeys, indexKeys, tableName);

eachWithLog(`Importation:${tableName}`, rows, (row, index) => {
if (index === 0 || !row || !row.trim) {
return;
}

row = row.trim();

if (row.length === 0) {
return;
}

const arrayOfValues = fromCsvStringToArray(row, tableName, gtfs).map(key => key.trim());

if (arrayOfValues !== null) {
const item = new GtfsRow(arrayOfValues);

if (sortedKeys.length !== arrayOfValues.length) {
if (shouldThrow === true) {
throw new Error(`Invalid raw in table ${tableName}: ${JSON.stringify(item)}`);
}

process.notices.addWarning(__filename, `Row not valid in table: ${JSON.stringify(item)}`);
Papa.parse(fileContent, {
delimiter: ',',
header: true,
skipEmptyLines: true,
step: (row) => { // streams the CSV by row
const item = processGtfsTableRow(gtfs, tableName, row, indexKeys);
if (!item) {
return;
}

Expand All @@ -141,12 +64,74 @@ function processRows(gtfs, tableName, indexKeys, rows, shouldThrow) {
} else if (indexKeys.setOfItems) {
table.add(item);
}
},
});

return table;
}

function processGtfsTableRow(gtfs, tableName, row, indexKeys) {
let processedRow = JSON.parse(JSON.stringify(row));
const rowAsCsv = Papa.unparse(processedRow);

const errorsInRow = processedRow.errors;
if (errorsInRow.length) {
let errorMessage = `Invalid row in table ${tableName}:
Line: ${errorsInRow[0].row}
${rowAsCsv}\n\n`;
errorsInRow.forEach((error) => {
errorMessage += `Issue: ${error.message}`;
});

const errorTypes = new Set(errorsInRow.map(error => error.type));
if (gtfs._shouldThrow === true && !errorTypes.has('FieldMismatch')) {
throw new Error(errorMessage);
}

rows[index] = undefined;
errorMessage += '\nError in CSV was fixed by parser.';
process.notices.addWarning('Invalid CSV', errorMessage);
processedRow = Papa.parse(rowAsCsv, { // fix FieldMismatch errors (TooFewFields / TooManyFields)
delimiter: ',',
header: true,
});
}

const regexPatternObjects = gtfs._regexPatternObjectsByTableName.get(tableName);
if (regexPatternObjects) {
processedRow = applyRegexPatternObjectsByTableName(regexPatternObjects, rowAsCsv, processedRow, tableName);
}

const rowObject = {};
for (const [field, value] of Object.entries(processedRow.data[0])) {
rowObject[field.trim()] = value.trim();
}

checkThatKeysIncludeIndexKeys(Object.keys(rowObject), indexKeys, tableName);

return createGtfsObjectFromSimpleObject(rowObject);
}

function applyRegexPatternObjectsByTableName(regexPatternObjects, rowAsCsv, row, tableName) {
let modifiedRowAsCsv;
let modifiedRow = JSON.parse(JSON.stringify(row));

regexPatternObjects.forEach(({ regex, pattern }) => {
modifiedRowAsCsv = rowAsCsv.replace(regex, pattern || '');

if (modifiedRowAsCsv !== rowAsCsv) {
process.notices.addInfo(
'Applying Changes on Raw GTFS',
`Applying regex replace to table: "${tableName}". regex: "${regex}".`
);
modifiedRow = Papa.parse(modifiedRowAsCsv, {
delimiter: ',',
header: true,
});
}
});

return table;
return modifiedRow;
}

function checkThatKeysIncludeIndexKeys(sortedKeys, indexKeys, tableName) {
Expand Down Expand Up @@ -235,7 +220,12 @@ function createGtfsClassForKeys(sortedKeys) {
return GtfsRow;
}

exports.createGtfsObjectFromSimpleObject = (obj) => {
function createGtfsObjectFromSimpleObject(obj) {
const GtfsRow = createGtfsClassForKeys(Object.keys(obj));
return new GtfsRow(Object.values(obj));
}

module.exports = {
createGtfsObjectFromSimpleObject,
importTable,
};

0 comments on commit bff1f39

Please sign in to comment.