Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(scheduler-targets-alpha): SageMakerStartPipelineExecution Target #28927

Merged
merged 15 commits into from
Feb 13, 2024
Merged
24 changes: 24 additions & 0 deletions packages/@aws-cdk/aws-scheduler-targets-alpha/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ The following targets are supported:
8. `targets.KinesisStreamPutRecord`: [Put a record to an Amazon Kinesis Data Streams](#put-a-record-to-an-amazon-kinesis-data-streams)
9. `targets.KinesisDataFirehosePutRecord`: [Put a record to a Kinesis Data Firehose](#put-a-record-to-a-kinesis-data-firehose)
10. `targets.CodePipelineStartPipelineExecution`: [Start a CodePipeline execution](#start-a-codepipeline-execution)
11. `targets.SageMakerStartPipelineExecution`: [Start a Sagemaker pipeline execution](#start-a-sagemaker-pipeline-execution)

## Invoke a Lambda function

Expand Down Expand Up @@ -289,3 +290,26 @@ new Schedule(this, 'Schedule', {
target: new targets.CodePipelineStartPipelineExecution(pipeline),
});
```

## Start a Sagemaker pipeline execution

Use the `SageMakerStartPipelineExecution` target to start a new execution for a Sagemaker pipeline.

The code snippet below creates an event rule with a Sagemaker pipeline as target which is
called every hour by Event Bridge Scheduler.

```ts
import * as sagemaker from 'aws-cdk-lib/aws-sagemaker';

declare const pipeline: sagemaker.CfnPipeline;

new Schedule(this, 'Schedule', {
schedule: ScheduleExpression.rate(Duration.minutes(60)),
target: new targets.CodePipelineStartPipelineExecution(pipeline, {
pipelineParameterList: [{
name: 'parameter-name',
value: 'parameter-value',
}],
}),
});
```
1 change: 1 addition & 0 deletions packages/@aws-cdk/aws-scheduler-targets-alpha/lib/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ export * from './inspector-start-assessment-run';
export * from './kinesis-data-firehose-put-record';
export * from './kinesis-stream-put-record';
export * from './lambda-invoke';
export * from './sage-maker-start-pipeline-execution';
export * from './sns-publish';
export * from './sqs-send-message';
export * from './stepfunctions-start-execution';
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import { ISchedule, IScheduleTarget, ScheduleTargetConfig } from '@aws-cdk/aws-scheduler-alpha';
import { ArnFormat, Names, Stack } from 'aws-cdk-lib';
import { IRole, PolicyStatement } from 'aws-cdk-lib/aws-iam';
import { CfnPipeline } from 'aws-cdk-lib/aws-sagemaker';
import { ScheduleTargetBase, ScheduleTargetBaseProps } from './target';
import { sameEnvDimension } from './util';

/**
* Properties for a pipeline parameter
*/
export interface SageMakerPipelineParameter {
/**
* Name of parameter to start execution of a SageMaker Model Building Pipeline.
*/
readonly name: string;

/**
* Value of parameter to start execution of a SageMaker Model Building Pipeline.
*/
readonly value: string;
}

/**
* Properties for a SageMaker Target
*/
export interface SageMakerStartPipelineExecutionProps extends ScheduleTargetBaseProps {
/**
* List of parameter names and values to use when executing the SageMaker Model Building Pipeline.
*
* The length must be between 0 and 200.
*
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-scheduler-schedule-sagemakerpipelineparameters.html#cfn-scheduler-schedule-sagemakerpipelineparameters-pipelineparameterlist
*
* @default - no pipeline parameter list
*/
readonly pipelineParameterList?: SageMakerPipelineParameter[];
}

/**
* Use a SageMaker pipeline as a target for AWS EventBridge Scheduler.
*/
export class SageMakerStartPipelineExecution extends ScheduleTargetBase implements IScheduleTarget {
private readonly pipelineArn: string;
constructor(
private readonly pipeline: CfnPipeline,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Everything is fine except for this... it should be an IPipeline but that doesn't exist yet. The problem is that this locks in people to the L1 pipeline, and ideally we'd want this to be interoperable with a community L2 if that existed.

So how about adding a very barebones IPipeline construct to aws-sagemaker. Nothing controversial, just kind of a placeholder. In the future, if we develop a Pipeline L2, it can build off of the IPipeline as can any community L2. As long as we don't make any crazy decisions, this can be stable from day 1. We did something similar in IEndpoint.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea.
The IEndpoint is also creating an interface in the aws-sagemaker-alpha module, but that is to ensure that the existing implementation is not affected, so is it correct to create it only in the aws-sagemaker module for this PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed.

private readonly props: SageMakerStartPipelineExecutionProps = {},
) {
const targetArn = Stack.of(pipeline).formatArn({
service: 'sagemaker',
resource: 'pipeline',
resourceName: pipeline.pipelineName,
arnFormat: ArnFormat.SLASH_RESOURCE_NAME,
});
super(props, targetArn);
this.pipelineArn = targetArn;

if (props.pipelineParameterList !== undefined && props.pipelineParameterList.length > 200) {
throw new Error(`pipelineParameterList length must be between 0 and 200, got ${props.pipelineParameterList.length}`);
}
}

protected addTargetActionToRole(schedule: ISchedule, role: IRole): void {
if (!sameEnvDimension(this.pipeline.stack.region, schedule.env.region)) {
throw new Error(`Cannot assign pipeline in region ${this.pipeline.stack.region} to the schedule ${Names.nodeUniqueId(schedule.node)} in region ${schedule.env.region}. Both the schedule and the pipeline must be in the same region.`);
}

if (!sameEnvDimension(this.pipeline.stack.account, schedule.env.account)) {
throw new Error(`Cannot assign pipeline in account ${this.pipeline.stack.account} to the schedule ${Names.nodeUniqueId(schedule.node)} in account ${schedule.env.region}. Both the schedule and the pipeline must be in the same account.`);
}

if (this.props.role && !sameEnvDimension(this.props.role.env.account, this.pipeline.stack.account)) {
throw new Error(`Cannot grant permission to execution role in account ${this.props.role.env.account} to invoke target ${Names.nodeUniqueId(this.pipeline.node)} in account ${this.pipeline.stack.account}. Both the target and the execution role must be in the same account.`);
}

role.addToPrincipalPolicy(new PolicyStatement({
actions: ['sagemaker:StartPipelineExecution'],
resources: [this.pipelineArn],
}));
}

protected bindBaseTargetConfig(_schedule: ISchedule): ScheduleTargetConfig {
const sageMakerPipelineParameters = this.props.pipelineParameterList ? {
pipelineParameterList: this.props.pipelineParameterList.map(param => {
return {
name: param.name,
value: param.value,
};
}),
} : undefined;
return {
...super.bindBaseTargetConfig(_schedule),
sageMakerPipelineParameters,
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
import * as scheduler from '@aws-cdk/aws-scheduler-alpha';
import { ExpectedResult, IntegTest } from '@aws-cdk/integ-tests-alpha';
import * as cdk from 'aws-cdk-lib';
import { ManagedPolicy, Role, ServicePrincipal } from 'aws-cdk-lib/aws-iam';
import { CfnPipeline } from 'aws-cdk-lib/aws-sagemaker';
import { SageMakerPipelineParameter, SageMakerStartPipelineExecution } from '../lib';
import { Bucket } from 'aws-cdk-lib/aws-s3';

const app = new cdk.App();
const stack = new cdk.Stack(app, 'aws-cdk-scheduler-targets-sagemaker-start-pipeline-execution');

const s3Bucket = new Bucket(stack, 'SageMakerBucket', {
removalPolicy: cdk.RemovalPolicy.DESTROY,
autoDeleteObjects: true,
});

const pipelineDefinition = {
PipelineDefinitionBody: JSON.stringify({
Version: '2020-12-01',
Metadata: {},
Parameters: [
{
Name: 'ParameterName',
Type: 'String',
DefaultValue: 'Value',
},
],
Steps: [
{
Name: 'TrainingStep',
Type: 'Training',
Arguments: {
AlgorithmSpecification: {
TrainingImage: '382416733822.dkr.ecr.us-east-1.amazonaws.com/linear-learner:1',
TrainingInputMode: 'File',
},
OutputDataConfig: {
S3OutputPath: s3Bucket.s3UrlForObject(),
},
ResourceConfig: {
InstanceCount: 1,
InstanceType: 'ml.m5.large',
VolumeSizeInGB: 10,
},
StoppingCondition: {
MaxRuntimeInSeconds: 3600,
},
},
},
],
}),
};

const pipelineRole = new Role(stack, 'SageMakerPipelineRole', {
assumedBy: new ServicePrincipal('sagemaker.amazonaws.com'),
});
pipelineRole.addManagedPolicy(ManagedPolicy.fromAwsManagedPolicyName('AmazonSageMakerFullAccess'));
const pipelineParameterList: SageMakerPipelineParameter[] = [{
name: 'ParameterName',
value: 'ParameterValue',
}];
const pipeline = new CfnPipeline(stack, 'Pipeline', {
pipelineName: 'sagemaker-pipeline',
pipelineDefinition: pipelineDefinition,
roleArn: pipelineRole.roleArn,
});

new scheduler.Schedule(stack, 'Schedule', {
schedule: scheduler.ScheduleExpression.rate(cdk.Duration.minutes(10)),
target: new SageMakerStartPipelineExecution(pipeline, {
pipelineParameterList,
}),
});

const integrationTest = new IntegTest(app, 'integrationtest-sagemaker-start-pipeline-execution', {
testCases: [stack],
stackUpdateWorkflow: false, // this would cause the schedule to trigger with the old code
});

// Verifies that the pipeline run by the scheduler
integrationTest.assertions.awsApiCall('Sagemaker', 'listPipelineExecutions', {
PipelineName: 'sagemaker-pipeline',
}).assertAtPath(
'PipelineExecutionSummaries.0.PipelineExecutionStatus',
ExpectedResult.stringLikeRegexp('Succeeded'),
).waitForAssertions({
interval: cdk.Duration.seconds(30),
totalTimeout: cdk.Duration.minutes(10),
});

app.synth();
Loading
Loading