Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Navigate Pipeline via Pipes instead of Blocks #491

Merged
merged 14 commits into from
Jan 15, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 12 additions & 17 deletions example/gtfs-static.jv
Original file line number Diff line number Diff line change
Expand Up @@ -14,94 +14,89 @@ pipeline GtfsPipeline {
// 2. The origin for multiple pipe sequences is a zip
// file. Each csv file in this zip is further processed
// by its own sequence of blocks and pipes.
GTFSSampleFeedExtractor -> ZipArchiveInterpreter;

ZipArchiveInterpreter
GTFSSampleFeedExtractor
-> AgencyFilePicker
-> AgencyTextFileInterpreter
-> AgencyCSVInterpreter
-> AgencyTableInterpreter
-> AgencyLoader;

ZipArchiveInterpreter
GTFSSampleFeedExtractor
-> CalendarDatesFilePicker
-> CalendarDatesTextFileInterpreter
-> CalendarDatesCSVInterpreter
-> CalendarDatesTableInterpreter
-> CalendarDatesLoader;

ZipArchiveInterpreter
GTFSSampleFeedExtractor
-> CalendarFilePicker
-> CalendarTextFileInterpreter
-> CalendarCSVInterpreter
-> CalendarTableInterpreter
-> CalendarLoader;

ZipArchiveInterpreter
GTFSSampleFeedExtractor
-> FareAttributesFilePicker
-> FareAttributesTextFileInterpreter
-> FareAttributesCSVInterpreter
-> FareAttributesTableInterpreter
-> FareAttributesLoader;

ZipArchiveInterpreter
GTFSSampleFeedExtractor
-> FareRulesFilePicker
-> FareRulesTextFileInterpreter
-> FareRulesCSVInterpreter
-> FareRulesTableInterpreter
-> FareRulesLoader;

ZipArchiveInterpreter
GTFSSampleFeedExtractor
-> FrequenciesFilePicker
-> FrequenciesTextFileInterpreter
-> FrequenciesCSVInterpreter
-> FrequenciesTableInterpreter
-> FrequenciesLoader;

ZipArchiveInterpreter
GTFSSampleFeedExtractor
-> RoutesFilePicker
-> RoutesTextFileInterpreter
-> RoutesCSVInterpreter
-> RoutesTableInterpreter
-> RoutesLoader;

ZipArchiveInterpreter
GTFSSampleFeedExtractor
-> ShapesFilePicker
-> ShapesTextFileInterpreter
-> ShapesCSVInterpreter
-> ShapesTableInterpreter
-> ShapesLoader;

ZipArchiveInterpreter
GTFSSampleFeedExtractor
-> StopTimesFilePicker
-> StopTimesTextFileInterpreter
-> StopTimesCSVInterpreter
-> StopTimesTableInterpreter
-> StopTimesLoader;

ZipArchiveInterpreter
GTFSSampleFeedExtractor
-> StopsFilePicker
-> StopsTextFileInterpreter
-> StopsCSVInterpreter
-> StopsTableInterpreter
-> StopsLoader;

ZipArchiveInterpreter
GTFSSampleFeedExtractor
-> TripsFilePicker
-> TripsTextFileInterpreter
-> TripsCSVInterpreter
-> TripsTableInterpreter
-> TripsLoader;

// 3. As a first step, we download the zip file and interpret it.
block GTFSSampleFeedExtractor oftype HttpExtractor {
block GTFSSampleFeedExtractor oftype GTFSExtractor {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this is not part of the PR but this seems very boring. We could enhance the GTFSExtractor in the std lib to also include picking a file and parsing it as CSV, maybe as a composite block inside a composite block.

The new block could be called GTFSFileExtractor and take an url and a path as property, then use the existing GTFSExtractor with the url and the FilePicker with the file, then parse that file as CSV file. This would reduce this example by 3 blocks and 3 lines for every file (so 33 less blocks and 33 less lines in the pipeline itself!).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you don't mind I'd leave it as it is for now.

But your example looks interesting, also in the face of optimizations. In the best case, we would also reuse executed blocks multiple times. I'm not sure if that would be the case in the current implementation, so I'd leave it open for a follow-up issue.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I agree on all counts, it's not in the scope of this PR anyway.

url: "https://developers.google.com/static/transit/gtfs/examples/sample-feed.zip";
}

block ZipArchiveInterpreter oftype ArchiveInterpreter {
archiveType: "zip";
}

// 4. Next, we pick several csv files (with the file extension ".txt")
// for further processing .
block AgencyFilePicker oftype FilePicker {
Expand Down
25 changes: 18 additions & 7 deletions libs/execution/src/lib/blocks/block-execution-util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@

import {
BlockDefinition,
collectParents,
CompositeBlocktypeDefinition,
PipelineDefinition,
PipelineWrapper,
} from '@jvalue/jayvee-language-server';

import { ExecutionContext } from '../execution-context';
Expand All @@ -31,18 +33,27 @@ export interface ExecutionOrderItem {
*/
export async function executeBlocks(
executionContext: ExecutionContext,
executionOrder: ExecutionOrderItem[],
pipesContainer: CompositeBlocktypeDefinition | PipelineDefinition,
initialInputValue: IOTypeImplementation | undefined = undefined,
): Promise<R.Result<ExecutionOrderItem[]>> {
const pipelineWrapper = new PipelineWrapper(pipesContainer);
const executionOrder: {
block: BlockDefinition;
value: IOTypeImplementation | null;
}[] = pipelineWrapper.getBlocksInTopologicalSorting().map((block) => {
return { block: block, value: NONE };
});

let isFirstBlock = true;

for (const blockData of executionOrder) {
const block = blockData.block;
const parentData = collectParents(block).map((parent) =>
executionOrder.find((blockData) => parent === blockData.block),
);
let inputValue =
parentData[0]?.value === undefined ? NONE : parentData[0]?.value;
const parentData = pipelineWrapper
.getParentBlocks(block)
.map((parent) =>
executionOrder.find((blockData) => parent === blockData.block),
);
let inputValue = parentData[0]?.value ?? NONE;

const useExternalInputValueForFirstBlock =
isFirstBlock && inputValue === NONE && initialInputValue !== undefined;
Expand Down
11 changes: 2 additions & 9 deletions libs/execution/src/lib/blocks/composite-block-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,12 @@ import {
createValuetype,
evaluateExpression,
evaluatePropertyValue,
getBlocksInTopologicalSorting,
getIOType,
isCompositeBlocktypeDefinition,
} from '@jvalue/jayvee-language-server';

import { ExecutionContext } from '../execution-context';
import { IOTypeImplementation, NONE } from '../types';
import { IOTypeImplementation } from '../types';

// eslint-disable-next-line import/no-cycle
import { executeBlocks } from './block-execution-util';
Expand Down Expand Up @@ -69,15 +68,9 @@ export function createCompositeBlockExecutor(

this.addVariablesToContext(block, blockTypeReference.properties, context);

const executionOrder = getBlocksInTopologicalSorting(
blockTypeReference,
).map((block) => {
return { block: block, value: NONE };
});

const executionResult = await executeBlocks(
context,
executionOrder,
blockTypeReference,
input,
);

Expand Down
19 changes: 7 additions & 12 deletions libs/interpreter-lib/src/interpreter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import {
DebugGranularity,
ExecutionContext,
Logger,
NONE,
executeBlocks,
isDebugGranularity,
logExecutionDuration,
Expand All @@ -24,11 +23,9 @@ import {
JayveeModel,
JayveeServices,
PipelineDefinition,
PipelineWrapper,
RuntimeParameterProvider,
collectChildren,
collectStartingBlocks,
createJayveeServices,
getBlocksInTopologicalSorting,
initializeWorkspace,
} from '@jvalue/jayvee-language-server';
import * as chalk from 'chalk';
Expand Down Expand Up @@ -228,12 +225,7 @@ async function runPipeline(

const startTime = new Date();

const executionOrder = getBlocksInTopologicalSorting(pipeline).map(
(block) => {
return { block: block, value: NONE };
},
);
const executionResult = await executeBlocks(executionContext, executionOrder);
const executionResult = await executeBlocks(executionContext, pipeline);

if (R.isErr(executionResult)) {
const diagnosticError = executionResult.left;
Expand All @@ -254,13 +246,16 @@ export function logPipelineOverview(
runtimeParameterProvider: RuntimeParameterProvider,
logger: Logger,
) {
const pipelineWrapper = new PipelineWrapper(pipeline);

const toString = (block: BlockDefinition, depth = 0): string => {
const blockTypeName = block.type.ref?.name;
assert(blockTypeName !== undefined);
const blockString = `${'\t'.repeat(depth)} -> ${
block.name
} (${blockTypeName})`;
const childString = collectChildren(block)
const childString = pipelineWrapper
.getChildBlocks(block)
.map((child) => toString(child, depth + 1))
.join('\n');
return blockString + '\n' + childString;
Expand All @@ -280,7 +275,7 @@ export function logPipelineOverview(
linesBuffer.push(
`\tBlocks (${pipeline.blocks.length} blocks with ${pipeline.pipes.length} pipes):`,
);
for (const block of collectStartingBlocks(pipeline)) {
for (const block of pipelineWrapper.getStartingBlocks()) {
linesBuffer.push(toString(block, 1));
}
logger.logInfo(linesBuffer.join('\n'));
Expand Down
Loading
Loading