Skip to content

Commit

Permalink
Merge pull request #491 from jvalue/navigate-pipes-instead-blocks
Browse files Browse the repository at this point in the history
Navigate Pipeline via Pipes instead of Blocks
  • Loading branch information
georg-schwarz committed Jan 15, 2024
2 parents 5de305f + 815a580 commit 00867cd
Show file tree
Hide file tree
Showing 12 changed files with 236 additions and 215 deletions.
29 changes: 12 additions & 17 deletions example/gtfs-static.jv
Original file line number Diff line number Diff line change
Expand Up @@ -14,94 +14,89 @@ pipeline GtfsPipeline {
// 2. The origin for multiple pipe sequences is a zip
// file. Each csv file in this zip is further processed
// by its own sequence of blocks and pipes.
GTFSSampleFeedExtractor -> ZipArchiveInterpreter;

ZipArchiveInterpreter
GTFSSampleFeedExtractor
-> AgencyFilePicker
-> AgencyTextFileInterpreter
-> AgencyCSVInterpreter
-> AgencyTableInterpreter
-> AgencyLoader;

ZipArchiveInterpreter
GTFSSampleFeedExtractor
-> CalendarDatesFilePicker
-> CalendarDatesTextFileInterpreter
-> CalendarDatesCSVInterpreter
-> CalendarDatesTableInterpreter
-> CalendarDatesLoader;

ZipArchiveInterpreter
GTFSSampleFeedExtractor
-> CalendarFilePicker
-> CalendarTextFileInterpreter
-> CalendarCSVInterpreter
-> CalendarTableInterpreter
-> CalendarLoader;

ZipArchiveInterpreter
GTFSSampleFeedExtractor
-> FareAttributesFilePicker
-> FareAttributesTextFileInterpreter
-> FareAttributesCSVInterpreter
-> FareAttributesTableInterpreter
-> FareAttributesLoader;

ZipArchiveInterpreter
GTFSSampleFeedExtractor
-> FareRulesFilePicker
-> FareRulesTextFileInterpreter
-> FareRulesCSVInterpreter
-> FareRulesTableInterpreter
-> FareRulesLoader;

ZipArchiveInterpreter
GTFSSampleFeedExtractor
-> FrequenciesFilePicker
-> FrequenciesTextFileInterpreter
-> FrequenciesCSVInterpreter
-> FrequenciesTableInterpreter
-> FrequenciesLoader;

ZipArchiveInterpreter
GTFSSampleFeedExtractor
-> RoutesFilePicker
-> RoutesTextFileInterpreter
-> RoutesCSVInterpreter
-> RoutesTableInterpreter
-> RoutesLoader;

ZipArchiveInterpreter
GTFSSampleFeedExtractor
-> ShapesFilePicker
-> ShapesTextFileInterpreter
-> ShapesCSVInterpreter
-> ShapesTableInterpreter
-> ShapesLoader;

ZipArchiveInterpreter
GTFSSampleFeedExtractor
-> StopTimesFilePicker
-> StopTimesTextFileInterpreter
-> StopTimesCSVInterpreter
-> StopTimesTableInterpreter
-> StopTimesLoader;

ZipArchiveInterpreter
GTFSSampleFeedExtractor
-> StopsFilePicker
-> StopsTextFileInterpreter
-> StopsCSVInterpreter
-> StopsTableInterpreter
-> StopsLoader;

ZipArchiveInterpreter
GTFSSampleFeedExtractor
-> TripsFilePicker
-> TripsTextFileInterpreter
-> TripsCSVInterpreter
-> TripsTableInterpreter
-> TripsLoader;

// 3. As a first step, we download the zip file and interpret it.
block GTFSSampleFeedExtractor oftype HttpExtractor {
block GTFSSampleFeedExtractor oftype GTFSExtractor {
url: "https://developers.google.com/static/transit/gtfs/examples/sample-feed.zip";
}

block ZipArchiveInterpreter oftype ArchiveInterpreter {
archiveType: "zip";
}

// 4. Next, we pick several csv files (with the file extension ".txt")
// for further processing .
block AgencyFilePicker oftype FilePicker {
Expand Down
25 changes: 18 additions & 7 deletions libs/execution/src/lib/blocks/block-execution-util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@

import {
BlockDefinition,
collectParents,
CompositeBlocktypeDefinition,
PipelineDefinition,
PipelineWrapper,
} from '@jvalue/jayvee-language-server';

import { ExecutionContext } from '../execution-context';
Expand All @@ -31,18 +33,27 @@ export interface ExecutionOrderItem {
*/
export async function executeBlocks(
executionContext: ExecutionContext,
executionOrder: ExecutionOrderItem[],
pipesContainer: CompositeBlocktypeDefinition | PipelineDefinition,
initialInputValue: IOTypeImplementation | undefined = undefined,
): Promise<R.Result<ExecutionOrderItem[]>> {
const pipelineWrapper = new PipelineWrapper(pipesContainer);
const executionOrder: {
block: BlockDefinition;
value: IOTypeImplementation | null;
}[] = pipelineWrapper.getBlocksInTopologicalSorting().map((block) => {
return { block: block, value: NONE };
});

let isFirstBlock = true;

for (const blockData of executionOrder) {
const block = blockData.block;
const parentData = collectParents(block).map((parent) =>
executionOrder.find((blockData) => parent === blockData.block),
);
let inputValue =
parentData[0]?.value === undefined ? NONE : parentData[0]?.value;
const parentData = pipelineWrapper
.getParentBlocks(block)
.map((parent) =>
executionOrder.find((blockData) => parent === blockData.block),
);
let inputValue = parentData[0]?.value ?? NONE;

const useExternalInputValueForFirstBlock =
isFirstBlock && inputValue === NONE && initialInputValue !== undefined;
Expand Down
11 changes: 2 additions & 9 deletions libs/execution/src/lib/blocks/composite-block-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,12 @@ import {
createValuetype,
evaluateExpression,
evaluatePropertyValue,
getBlocksInTopologicalSorting,
getIOType,
isCompositeBlocktypeDefinition,
} from '@jvalue/jayvee-language-server';

import { ExecutionContext } from '../execution-context';
import { IOTypeImplementation, NONE } from '../types';
import { IOTypeImplementation } from '../types';

// eslint-disable-next-line import/no-cycle
import { executeBlocks } from './block-execution-util';
Expand Down Expand Up @@ -69,15 +68,9 @@ export function createCompositeBlockExecutor(

this.addVariablesToContext(block, blockTypeReference.properties, context);

const executionOrder = getBlocksInTopologicalSorting(
blockTypeReference,
).map((block) => {
return { block: block, value: NONE };
});

const executionResult = await executeBlocks(
context,
executionOrder,
blockTypeReference,
input,
);

Expand Down
19 changes: 7 additions & 12 deletions libs/interpreter-lib/src/interpreter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import {
DebugGranularity,
ExecutionContext,
Logger,
NONE,
executeBlocks,
isDebugGranularity,
logExecutionDuration,
Expand All @@ -24,11 +23,9 @@ import {
JayveeModel,
JayveeServices,
PipelineDefinition,
PipelineWrapper,
RuntimeParameterProvider,
collectChildren,
collectStartingBlocks,
createJayveeServices,
getBlocksInTopologicalSorting,
initializeWorkspace,
} from '@jvalue/jayvee-language-server';
import * as chalk from 'chalk';
Expand Down Expand Up @@ -228,12 +225,7 @@ async function runPipeline(

const startTime = new Date();

const executionOrder = getBlocksInTopologicalSorting(pipeline).map(
(block) => {
return { block: block, value: NONE };
},
);
const executionResult = await executeBlocks(executionContext, executionOrder);
const executionResult = await executeBlocks(executionContext, pipeline);

if (R.isErr(executionResult)) {
const diagnosticError = executionResult.left;
Expand All @@ -254,13 +246,16 @@ export function logPipelineOverview(
runtimeParameterProvider: RuntimeParameterProvider,
logger: Logger,
) {
const pipelineWrapper = new PipelineWrapper(pipeline);

const toString = (block: BlockDefinition, depth = 0): string => {
const blockTypeName = block.type.ref?.name;
assert(blockTypeName !== undefined);
const blockString = `${'\t'.repeat(depth)} -> ${
block.name
} (${blockTypeName})`;
const childString = collectChildren(block)
const childString = pipelineWrapper
.getChildBlocks(block)
.map((child) => toString(child, depth + 1))
.join('\n');
return blockString + '\n' + childString;
Expand All @@ -280,7 +275,7 @@ export function logPipelineOverview(
linesBuffer.push(
`\tBlocks (${pipeline.blocks.length} blocks with ${pipeline.pipes.length} pipes):`,
);
for (const block of collectStartingBlocks(pipeline)) {
for (const block of pipelineWrapper.getStartingBlocks()) {
linesBuffer.push(toString(block, 1));
}
logger.logInfo(linesBuffer.join('\n'));
Expand Down
Loading

0 comments on commit 00867cd

Please sign in to comment.