diff --git a/example/gtfs-static.jv b/example/gtfs-static.jv index e409abec1..7703ca1fe 100644 --- a/example/gtfs-static.jv +++ b/example/gtfs-static.jv @@ -14,79 +14,78 @@ pipeline GtfsPipeline { // 2. The origin for multiple pipe sequences is a zip // file. Each csv file in this zip is further processed // by its own sequence of blocks and pipes. - GTFSSampleFeedExtractor -> ZipArchiveInterpreter; - ZipArchiveInterpreter + GTFSSampleFeedExtractor -> AgencyFilePicker -> AgencyTextFileInterpreter -> AgencyCSVInterpreter -> AgencyTableInterpreter -> AgencyLoader; - ZipArchiveInterpreter + GTFSSampleFeedExtractor -> CalendarDatesFilePicker -> CalendarDatesTextFileInterpreter -> CalendarDatesCSVInterpreter -> CalendarDatesTableInterpreter -> CalendarDatesLoader; - ZipArchiveInterpreter + GTFSSampleFeedExtractor -> CalendarFilePicker -> CalendarTextFileInterpreter -> CalendarCSVInterpreter -> CalendarTableInterpreter -> CalendarLoader; - ZipArchiveInterpreter + GTFSSampleFeedExtractor -> FareAttributesFilePicker -> FareAttributesTextFileInterpreter -> FareAttributesCSVInterpreter -> FareAttributesTableInterpreter -> FareAttributesLoader; - ZipArchiveInterpreter + GTFSSampleFeedExtractor -> FareRulesFilePicker -> FareRulesTextFileInterpreter -> FareRulesCSVInterpreter -> FareRulesTableInterpreter -> FareRulesLoader; - ZipArchiveInterpreter + GTFSSampleFeedExtractor -> FrequenciesFilePicker -> FrequenciesTextFileInterpreter -> FrequenciesCSVInterpreter -> FrequenciesTableInterpreter -> FrequenciesLoader; - ZipArchiveInterpreter + GTFSSampleFeedExtractor -> RoutesFilePicker -> RoutesTextFileInterpreter -> RoutesCSVInterpreter -> RoutesTableInterpreter -> RoutesLoader; - ZipArchiveInterpreter + GTFSSampleFeedExtractor -> ShapesFilePicker -> ShapesTextFileInterpreter -> ShapesCSVInterpreter -> ShapesTableInterpreter -> ShapesLoader; - ZipArchiveInterpreter + GTFSSampleFeedExtractor -> StopTimesFilePicker -> StopTimesTextFileInterpreter -> StopTimesCSVInterpreter -> StopTimesTableInterpreter -> StopTimesLoader; - ZipArchiveInterpreter + GTFSSampleFeedExtractor -> StopsFilePicker -> StopsTextFileInterpreter -> StopsCSVInterpreter -> StopsTableInterpreter -> StopsLoader; - ZipArchiveInterpreter + GTFSSampleFeedExtractor -> TripsFilePicker -> TripsTextFileInterpreter -> TripsCSVInterpreter @@ -94,14 +93,10 @@ pipeline GtfsPipeline { -> TripsLoader; // 3. As a first step, we download the zip file and interpret it. - block GTFSSampleFeedExtractor oftype HttpExtractor { + block GTFSSampleFeedExtractor oftype GTFSExtractor { url: "https://developers.google.com/static/transit/gtfs/examples/sample-feed.zip"; } - block ZipArchiveInterpreter oftype ArchiveInterpreter { - archiveType: "zip"; - } - // 4. Next, we pick several csv files (with the file extension ".txt") // for further processing . block AgencyFilePicker oftype FilePicker { diff --git a/libs/execution/src/lib/blocks/block-execution-util.ts b/libs/execution/src/lib/blocks/block-execution-util.ts index ed17c157b..57e27972e 100644 --- a/libs/execution/src/lib/blocks/block-execution-util.ts +++ b/libs/execution/src/lib/blocks/block-execution-util.ts @@ -4,7 +4,9 @@ import { BlockDefinition, - collectParents, + CompositeBlocktypeDefinition, + PipelineDefinition, + PipelineWrapper, } from '@jvalue/jayvee-language-server'; import { ExecutionContext } from '../execution-context'; @@ -31,18 +33,27 @@ export interface ExecutionOrderItem { */ export async function executeBlocks( executionContext: ExecutionContext, - executionOrder: ExecutionOrderItem[], + pipesContainer: CompositeBlocktypeDefinition | PipelineDefinition, initialInputValue: IOTypeImplementation | undefined = undefined, ): Promise> { + const pipelineWrapper = new PipelineWrapper(pipesContainer); + const executionOrder: { + block: BlockDefinition; + value: IOTypeImplementation | null; + }[] = pipelineWrapper.getBlocksInTopologicalSorting().map((block) => { + return { block: block, value: NONE }; + }); + let isFirstBlock = true; for (const blockData of executionOrder) { const block = blockData.block; - const parentData = collectParents(block).map((parent) => - executionOrder.find((blockData) => parent === blockData.block), - ); - let inputValue = - parentData[0]?.value === undefined ? NONE : parentData[0]?.value; + const parentData = pipelineWrapper + .getParentBlocks(block) + .map((parent) => + executionOrder.find((blockData) => parent === blockData.block), + ); + let inputValue = parentData[0]?.value ?? NONE; const useExternalInputValueForFirstBlock = isFirstBlock && inputValue === NONE && initialInputValue !== undefined; diff --git a/libs/execution/src/lib/blocks/composite-block-executor.ts b/libs/execution/src/lib/blocks/composite-block-executor.ts index 8209eb015..f2fcf637c 100644 --- a/libs/execution/src/lib/blocks/composite-block-executor.ts +++ b/libs/execution/src/lib/blocks/composite-block-executor.ts @@ -16,13 +16,12 @@ import { createValuetype, evaluateExpression, evaluatePropertyValue, - getBlocksInTopologicalSorting, getIOType, isCompositeBlocktypeDefinition, } from '@jvalue/jayvee-language-server'; import { ExecutionContext } from '../execution-context'; -import { IOTypeImplementation, NONE } from '../types'; +import { IOTypeImplementation } from '../types'; // eslint-disable-next-line import/no-cycle import { executeBlocks } from './block-execution-util'; @@ -69,15 +68,9 @@ export function createCompositeBlockExecutor( this.addVariablesToContext(block, blockTypeReference.properties, context); - const executionOrder = getBlocksInTopologicalSorting( - blockTypeReference, - ).map((block) => { - return { block: block, value: NONE }; - }); - const executionResult = await executeBlocks( context, - executionOrder, + blockTypeReference, input, ); diff --git a/libs/interpreter-lib/src/interpreter.ts b/libs/interpreter-lib/src/interpreter.ts index 7fe7ac508..f0d114673 100644 --- a/libs/interpreter-lib/src/interpreter.ts +++ b/libs/interpreter-lib/src/interpreter.ts @@ -9,7 +9,6 @@ import { DebugGranularity, ExecutionContext, Logger, - NONE, executeBlocks, isDebugGranularity, logExecutionDuration, @@ -24,11 +23,9 @@ import { JayveeModel, JayveeServices, PipelineDefinition, + PipelineWrapper, RuntimeParameterProvider, - collectChildren, - collectStartingBlocks, createJayveeServices, - getBlocksInTopologicalSorting, initializeWorkspace, } from '@jvalue/jayvee-language-server'; import * as chalk from 'chalk'; @@ -228,12 +225,7 @@ async function runPipeline( const startTime = new Date(); - const executionOrder = getBlocksInTopologicalSorting(pipeline).map( - (block) => { - return { block: block, value: NONE }; - }, - ); - const executionResult = await executeBlocks(executionContext, executionOrder); + const executionResult = await executeBlocks(executionContext, pipeline); if (R.isErr(executionResult)) { const diagnosticError = executionResult.left; @@ -254,13 +246,16 @@ export function logPipelineOverview( runtimeParameterProvider: RuntimeParameterProvider, logger: Logger, ) { + const pipelineWrapper = new PipelineWrapper(pipeline); + const toString = (block: BlockDefinition, depth = 0): string => { const blockTypeName = block.type.ref?.name; assert(blockTypeName !== undefined); const blockString = `${'\t'.repeat(depth)} -> ${ block.name } (${blockTypeName})`; - const childString = collectChildren(block) + const childString = pipelineWrapper + .getChildBlocks(block) .map((child) => toString(child, depth + 1)) .join('\n'); return blockString + '\n' + childString; @@ -280,7 +275,7 @@ export function logPipelineOverview( linesBuffer.push( `\tBlocks (${pipeline.blocks.length} blocks with ${pipeline.pipes.length} pipes):`, ); - for (const block of collectStartingBlocks(pipeline)) { + for (const block of pipelineWrapper.getStartingBlocks()) { linesBuffer.push(toString(block, 1)); } logger.logInfo(linesBuffer.join('\n')); diff --git a/libs/language-server/src/lib/ast/model-util.ts b/libs/language-server/src/lib/ast/model-util.ts index 06bfcd544..b5c804a2a 100644 --- a/libs/language-server/src/lib/ast/model-util.ts +++ b/libs/language-server/src/lib/ast/model-util.ts @@ -2,175 +2,18 @@ // // SPDX-License-Identifier: AGPL-3.0-only -import { strict as assert } from 'assert'; - -import { - AstNode, - LangiumDocuments, - Reference, - assertUnreachable, -} from 'langium'; +import { AstNode, LangiumDocuments } from 'langium'; import { BinaryExpression, - BlockDefinition, BuiltinBlocktypeDefinition, BuiltinConstrainttypeDefinition, - CompositeBlocktypeDefinition, - PipelineDefinition, UnaryExpression, isBuiltinBlocktypeDefinition, - isCompositeBlocktypeDefinition, isJayveeModel, } from './generated/ast'; // eslint-disable-next-line import/no-cycle import { BlockTypeWrapper, ConstraintTypeWrapper } from './wrappers'; -import { - PipeWrapper, - createWrappersFromPipeChain, -} from './wrappers/pipe-wrapper'; - -export function collectStartingBlocks( - container: PipelineDefinition | CompositeBlocktypeDefinition, -): BlockDefinition[] { - // For composite blocks the first blocks of all pipelines are starting blocks as they have inputs - if (isCompositeBlocktypeDefinition(container)) { - const startingBlocks = container.pipes - .map((pipe) => pipe.blocks[0]) - .map((blockRef: Reference | undefined) => { - if ( - blockRef?.ref !== undefined && - BlockTypeWrapper.canBeWrapped(blockRef.ref.type) - ) { - return blockRef.ref; - } - return undefined; - }) - .filter((x): x is BlockDefinition => x !== undefined); - - return startingBlocks; - } - - const result: BlockDefinition[] = []; - - // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition - const blocks = container?.blocks ?? []; - for (const block of blocks) { - if (!BlockTypeWrapper.canBeWrapped(block.type)) { - continue; - } - const blockType = new BlockTypeWrapper(block.type); - - if (!blockType.hasInput()) { - result.push(block); - } - } - return result; -} - -export function collectChildren(block: BlockDefinition): BlockDefinition[] { - const outgoingPipes = collectOutgoingPipes(block); - return outgoingPipes.map((pipe) => pipe.to); -} - -export function collectParents(block: BlockDefinition): BlockDefinition[] { - const ingoingPipes = collectIngoingPipes(block); - return ingoingPipes.map((pipe) => pipe.from); -} - -export function collectOutgoingPipes(block: BlockDefinition) { - return collectPipes(block, 'outgoing'); -} - -export function collectIngoingPipes(block: BlockDefinition) { - return collectPipes(block, 'ingoing'); -} - -function collectPipes( - block: BlockDefinition, - kind: 'outgoing' | 'ingoing', -): PipeWrapper[] { - const pipeline = block.$container; - const allPipes = collectAllPipes(pipeline); - - return allPipes.filter((pipeWrapper) => { - switch (kind) { - case 'outgoing': - return pipeWrapper.from === block; - case 'ingoing': - return pipeWrapper.to === block; - case undefined: - return false; - } - return assertUnreachable(kind); - }); -} - -export function collectAllPipes( - container: PipelineDefinition | CompositeBlocktypeDefinition, -): PipeWrapper[] { - const result: PipeWrapper[] = []; - for (const pipe of container.pipes) { - result.push(...createWrappersFromPipeChain(pipe)); - } - return result; -} - -/** - * Returns blocks in a pipeline in topological order, based on - * Kahn's algorithm. - * - * Considers a pipeline as a directed, acyclical graph where - * blocks are nodes and pipes are edges. A list in topological - * order has the property that parent nodes are always listed - * before their children. - * - * "[...] a list in topological order is such that no element - * appears in it until after all elements appearing on all paths - * leading to the particular element have been listed." - * - * Kahn, A. B. (1962). Topological sorting of large networks. Communications of the ACM, 5(11), 558–562. - */ -export function getBlocksInTopologicalSorting( - pipeline: PipelineDefinition | CompositeBlocktypeDefinition, -): BlockDefinition[] { - const sortedNodes = []; - const currentNodes = [...collectStartingBlocks(pipeline)]; - let unvisitedEdges = [...collectAllPipes(pipeline)]; - - while (currentNodes.length > 0) { - const node = currentNodes.pop(); - assert(node !== undefined); - - sortedNodes.push(node); - - for (const childNode of collectChildren(node)) { - // Mark edges between parent and child as visited - collectIngoingPipes(childNode) - .filter((e) => e.from === node) - .forEach((e) => { - unvisitedEdges = unvisitedEdges.filter((edge) => !edge.equals(e)); - }); - - // If all edges to the child have been visited - const notRemovedEdges = collectIngoingPipes(childNode).filter((e) => - unvisitedEdges.some((edge) => edge.equals(e)), - ); - if (notRemovedEdges.length === 0) { - // Insert it into currentBlocks - currentNodes.push(childNode); - } - } - } - - // If the graph still contains unvisited edges it is not a DAG - assert( - unvisitedEdges.length === 0, - `The pipeline ${pipeline.name} is expected to have no cycles`, - ); - - return sortedNodes; -} export type UnaryExpressionOperator = UnaryExpression['operator']; export type BinaryExpressionOperator = BinaryExpression['operator']; diff --git a/libs/language-server/src/lib/ast/wrappers/index.ts b/libs/language-server/src/lib/ast/wrappers/index.ts index d757e8cda..81abbe4f1 100644 --- a/libs/language-server/src/lib/ast/wrappers/index.ts +++ b/libs/language-server/src/lib/ast/wrappers/index.ts @@ -10,3 +10,6 @@ export * from './ast-node-wrapper'; export * from './cell-range-wrapper'; export * from './typed-object'; + +export * from './pipe-wrapper'; +export * from './pipeline-wrapper'; diff --git a/libs/language-server/src/lib/ast/wrappers/pipeline-wrapper.ts b/libs/language-server/src/lib/ast/wrappers/pipeline-wrapper.ts new file mode 100644 index 000000000..f5d40c123 --- /dev/null +++ b/libs/language-server/src/lib/ast/wrappers/pipeline-wrapper.ts @@ -0,0 +1,140 @@ +// SPDX-FileCopyrightText: 2023 Friedrich-Alexander-Universitat Erlangen-Nurnberg +// +// SPDX-License-Identifier: AGPL-3.0-only + +import { strict as assert } from 'assert'; + +import { + BlockDefinition, + CompositeBlocktypeDefinition, + PipelineDefinition, +} from '../generated/ast'; + +import { AstNodeWrapper } from './ast-node-wrapper'; +import { PipeWrapper, createWrappersFromPipeChain } from './pipe-wrapper'; + +export class PipelineWrapper< + T extends PipelineDefinition | CompositeBlocktypeDefinition, +> implements AstNodeWrapper +{ + public readonly astNode: T; + + allPipes: PipeWrapper[] = []; + + constructor(pipesContainer: T) { + this.astNode = pipesContainer; + + this.allPipes = pipesContainer.pipes.flatMap((pipe) => + createWrappersFromPipeChain(pipe), + ); + } + + static canBeWrapped( + pipesContainer: PipelineDefinition | CompositeBlocktypeDefinition, + ): boolean { + for (const pipeDefinition of pipesContainer.pipes) { + for ( + let chainIndex = 0; + chainIndex < pipeDefinition.blocks.length - 1; + ++chainIndex + ) { + if (!PipeWrapper.canBeWrapped(pipeDefinition, chainIndex)) { + return false; + } + } + } + return true; + } + + getStartingBlockPipes(): PipeWrapper[] { + return this.allPipes.filter((pipe) => { + const parentBlock = pipe.from; + const isToOfOtherPipe = + this.allPipes.filter((p) => p.to === parentBlock).length > 0; + return !isToOfOtherPipe; + }); + } + + getStartingBlocks(): BlockDefinition[] { + const startingBlocks = this.getStartingBlockPipes().map((p) => p.from); + + // Special case: the extractor is reused for multiple paths + // Thus, we remove duplicates + const withoutDuplicates = [...new Set(startingBlocks)]; + return withoutDuplicates; + } + + getOutgoingPipes(blockDefinition: BlockDefinition): PipeWrapper[] { + return this.allPipes.filter((pipe) => { + return pipe.from === blockDefinition; + }); + } + + getChildBlocks(blockDefinition: BlockDefinition): BlockDefinition[] { + return this.getOutgoingPipes(blockDefinition).map((p) => p.to); + } + + getIngoingPipes(blockDefinition: BlockDefinition): PipeWrapper[] { + return this.allPipes.filter((pipe) => { + return pipe.to === blockDefinition; + }); + } + + getParentBlocks(blockDefinition: BlockDefinition): BlockDefinition[] { + return this.getIngoingPipes(blockDefinition).map((p) => p.from); + } + + /** + * Returns blocks in a pipeline in topological order, based on + * Kahn's algorithm. + * + * Considers a pipeline as a directed, acyclical graph where + * blocks are nodes and pipes are edges. A list in topological + * order has the property that parent nodes are always listed + * before their children. + * + * "[...] a list in topological order is such that no element + * appears in it until after all elements appearing on all paths + * leading to the particular element have been listed." + * + * Kahn, A. B. (1962). Topological sorting of large networks. Communications of the ACM, 5(11), 558–562. + */ + getBlocksInTopologicalSorting(): BlockDefinition[] { + const sortedNodes = []; + const currentNodes = [...this.getStartingBlocks()]; + let unvisitedEdges = [...this.allPipes]; + + while (currentNodes.length > 0) { + const node = currentNodes.pop(); + assert(node !== undefined); + + sortedNodes.push(node); + + for (const childNode of this.getChildBlocks(node)) { + // Mark edges between parent and child as visited + this.getIngoingPipes(childNode) + .filter((e) => e.from === node) + .forEach((e) => { + unvisitedEdges = unvisitedEdges.filter((edge) => !edge.equals(e)); + }); + + // If all edges to the child have been visited + const notRemovedEdges = this.getIngoingPipes(childNode).filter((e) => + unvisitedEdges.some((edge) => edge.equals(e)), + ); + if (notRemovedEdges.length === 0) { + // Insert it into currentBlocks + currentNodes.push(childNode); + } + } + } + + // If the graph still contains unvisited edges it is not a DAG + assert( + unvisitedEdges.length === 0, + `The pipeline ${this.astNode.name} is expected to have no cycles`, + ); + + return sortedNodes; + } +} diff --git a/libs/language-server/src/lib/validation/checks/block-definition.ts b/libs/language-server/src/lib/validation/checks/block-definition.ts index 97d011ba9..87da1ae64 100644 --- a/libs/language-server/src/lib/validation/checks/block-definition.ts +++ b/libs/language-server/src/lib/validation/checks/block-definition.ts @@ -9,14 +9,11 @@ import { assertUnreachable } from 'langium'; +import { PipelineWrapper } from '../../ast'; import { BlockDefinition, isCompositeBlocktypeDefinition, } from '../../ast/generated/ast'; -import { - collectIngoingPipes, - collectOutgoingPipes, -} from '../../ast/model-util'; import { PipeWrapper } from '../../ast/wrappers/pipe-wrapper'; import { BlockTypeWrapper } from '../../ast/wrappers/typed-object/blocktype-wrapper'; import { ValidationContext } from '../validation-context'; @@ -34,7 +31,10 @@ function checkPipesOfBlock( whatToCheck: 'input' | 'output', context: ValidationContext, ): void { - if (!BlockTypeWrapper.canBeWrapped(block?.type)) { + if ( + !BlockTypeWrapper.canBeWrapped(block?.type) || + !PipelineWrapper.canBeWrapped(block.$container) + ) { return; } const blockType = new BlockTypeWrapper(block?.type); @@ -100,14 +100,16 @@ function collectPipes( block: BlockDefinition, whatToCheck: 'input' | 'output', ): PipeWrapper[] { + const pipelineWrapper = new PipelineWrapper(block.$container); + let pipes: PipeWrapper[]; switch (whatToCheck) { case 'input': { - pipes = collectIngoingPipes(block); + pipes = pipelineWrapper.getIngoingPipes(block); break; } case 'output': { - pipes = collectOutgoingPipes(block); + pipes = pipelineWrapper.getOutgoingPipes(block); break; } default: { diff --git a/libs/language-server/src/lib/validation/checks/pipeline-definition.spec.ts b/libs/language-server/src/lib/validation/checks/pipeline-definition.spec.ts index 9b1dee65e..d4853ac08 100644 --- a/libs/language-server/src/lib/validation/checks/pipeline-definition.spec.ts +++ b/libs/language-server/src/lib/validation/checks/pipeline-definition.spec.ts @@ -63,7 +63,7 @@ describe('Validation of PipelineDefinition', () => { validationAcceptorMock.mockReset(); }); - it('should diagnose error on missing extractor block', async () => { + it('should diagnose error on missing starting block (no blocks)', async () => { const text = readJvTestAsset( 'pipeline-definition/invalid-empty-pipeline.jv', ); @@ -78,6 +78,21 @@ describe('Validation of PipelineDefinition', () => { ); }); + it('should diagnose error on missing starting block (no pipes)', async () => { + const text = readJvTestAsset( + 'pipeline-definition/invalid-pipeline-only-blocks.jv', + ); + + await parseAndValidatePipeline(text); + + expect(validationAcceptorMock).toHaveBeenCalledTimes(1); + expect(validationAcceptorMock).toHaveBeenCalledWith( + 'error', + `An extractor block is required for this pipeline`, + expect.any(Object), + ); + }); + it('should have no error on valid pipeline', async () => { const text = readJvTestAsset('pipeline-definition/valid-pipeline.jv'); diff --git a/libs/language-server/src/lib/validation/checks/pipeline-definition.ts b/libs/language-server/src/lib/validation/checks/pipeline-definition.ts index c3260e748..ca7992a7b 100644 --- a/libs/language-server/src/lib/validation/checks/pipeline-definition.ts +++ b/libs/language-server/src/lib/validation/checks/pipeline-definition.ts @@ -2,8 +2,8 @@ // // SPDX-License-Identifier: AGPL-3.0-only +import { PipelineWrapper } from '../../ast'; import { PipelineDefinition } from '../../ast/generated/ast'; -import { collectStartingBlocks } from '../../ast/model-util'; import { ValidationContext } from '../validation-context'; import { checkUniqueNames } from '../validation-util'; @@ -22,7 +22,12 @@ function checkStartingBlocks( pipeline: PipelineDefinition, context: ValidationContext, ): void { - const startingBlocks = collectStartingBlocks(pipeline); + if (!PipelineWrapper.canBeWrapped(pipeline)) { + return; + } + const pipelineWrapper = new PipelineWrapper(pipeline); + + const startingBlocks = pipelineWrapper.getStartingBlocks(); if (startingBlocks.length === 0) { context.accept( 'error', diff --git a/libs/language-server/src/test/assets/pipeline-definition/invalid-pipeline-only-blocks.jv b/libs/language-server/src/test/assets/pipeline-definition/invalid-pipeline-only-blocks.jv new file mode 100644 index 000000000..3c6115bfd --- /dev/null +++ b/libs/language-server/src/test/assets/pipeline-definition/invalid-pipeline-only-blocks.jv @@ -0,0 +1,12 @@ +// SPDX-FileCopyrightText: 2023 Friedrich-Alexander-Universitat Erlangen-Nurnberg +// +// SPDX-License-Identifier: AGPL-3.0-only + +pipeline Pipeline { + block TestExtractor oftype TestFileExtractor {} +} + +builtin blocktype TestFileExtractor { + input inPort oftype None; + output outPort oftype File; +} \ No newline at end of file diff --git a/libs/language-server/src/test/assets/pipeline-definition/valid-pipeline.jv b/libs/language-server/src/test/assets/pipeline-definition/valid-pipeline.jv index 26af22d9e..c63c7e306 100644 --- a/libs/language-server/src/test/assets/pipeline-definition/valid-pipeline.jv +++ b/libs/language-server/src/test/assets/pipeline-definition/valid-pipeline.jv @@ -3,11 +3,18 @@ // SPDX-License-Identifier: AGPL-3.0-only pipeline Pipeline { - block TestExtractor oftype TestFileExtractor { - } + TestExtractor -> TestLoader; + + block TestExtractor oftype TestFileExtractor {} + block TestLoader oftype TestFileLoader {} } builtin blocktype TestFileExtractor { input inPort oftype None; output outPort oftype File; +} + +builtin blocktype TestFileLoader { + input inPort oftype File; + output outPort oftype None; } \ No newline at end of file