diff --git a/packages/@aws-cdk/aws-iot-actions/README.md b/packages/@aws-cdk/aws-iot-actions/README.md index b232878cec486..be475fe028210 100644 --- a/packages/@aws-cdk/aws-iot-actions/README.md +++ b/packages/@aws-cdk/aws-iot-actions/README.md @@ -26,6 +26,7 @@ Currently supported are: - Put logs to CloudWatch Logs - Capture CloudWatch metrics - Change state for a CloudWatch alarm +- Put records to Kinesis Data stream - Put records to Kinesis Data Firehose stream - Send messages to SQS queues @@ -172,6 +173,26 @@ const topicRule = new iot.TopicRule(this, 'TopicRule', { }); ``` +## Put records to Kinesis Data stream + +The code snippet below creates an AWS IoT Rule that put records to Kinesis Data +stream when it is triggered. + +```ts +import * as kinesis from '@aws-cdk/aws-kinesis'; + +const stream = new kinesis.Stream(this, 'MyStream'); + +const topicRule = new iot.TopicRule(this, 'TopicRule', { + sql: iot.IotSql.fromStringAsVer20160323("SELECT * FROM 'device/+/data'"), + actions: [ + new actions.KinesisPutRecordAction(stream, { + partitionKey: '${newuuid()}', + }), + ], +}); +``` + ## Put records to Kinesis Data Firehose stream The code snippet below creates an AWS IoT Rule that put records to Put records diff --git a/packages/@aws-cdk/aws-iot-actions/lib/index.ts b/packages/@aws-cdk/aws-iot-actions/lib/index.ts index 77fb9fc4e0efe..a0ee864a5bce2 100644 --- a/packages/@aws-cdk/aws-iot-actions/lib/index.ts +++ b/packages/@aws-cdk/aws-iot-actions/lib/index.ts @@ -3,6 +3,7 @@ export * from './cloudwatch-put-metric-action'; export * from './cloudwatch-set-alarm-state-action'; export * from './common-action-props'; export * from './firehose-put-record-action'; +export * from './kinesis-put-record-action'; export * from './lambda-function-action'; export * from './s3-put-object-action'; export * from './sqs-queue-action'; diff --git a/packages/@aws-cdk/aws-iot-actions/lib/kinesis-put-record-action.ts b/packages/@aws-cdk/aws-iot-actions/lib/kinesis-put-record-action.ts new file mode 100644 index 0000000000000..6baa5976bccf4 --- /dev/null +++ b/packages/@aws-cdk/aws-iot-actions/lib/kinesis-put-record-action.ts @@ -0,0 +1,58 @@ +import * as iam from '@aws-cdk/aws-iam'; +import * as iot from '@aws-cdk/aws-iot'; +import * as kinesis from '@aws-cdk/aws-kinesis'; +import { CommonActionProps } from './common-action-props'; +import { singletonActionRole } from './private/role'; + +/** + * Configuration properties of an action for the Kinesis Data stream. + */ +export interface KinesisPutRecordActionProps extends CommonActionProps { + /** + * The partition key used to determine to which shard the data is written. + * The partition key is usually composed of an expression (for example, ${topic()} or ${timestamp()}). + * + * @see https://docs.aws.amazon.com/iot/latest/developerguide/iot-substitution-templates.html + * + * You can use the expression '${newuuid()}' if your payload does not have a high cardinarity property. + * If you use empty string, this action use no partition key and all records will put same one shard. + * + * @see https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html#API_PutRecord_RequestParameters + */ + readonly partitionKey: string; +} + +/** + * The action to put the record from an MQTT message to the Kinesis Data stream. + */ +export class KinesisPutRecordAction implements iot.IAction { + private readonly partitionKey?: string; + private readonly role?: iam.IRole; + + /** + * @param stream The Kinesis Data stream to which to put records. + * @param props Optional properties to not use default + */ + constructor(private readonly stream: kinesis.IStream, props: KinesisPutRecordActionProps) { + this.partitionKey = props.partitionKey; + this.role = props.role; + } + + bind(rule: iot.ITopicRule): iot.ActionConfig { + const role = this.role ?? singletonActionRole(rule); + role.addToPrincipalPolicy(new iam.PolicyStatement({ + actions: ['kinesis:PutRecord'], + resources: [this.stream.streamArn], + })); + + return { + configuration: { + kinesis: { + streamName: this.stream.streamName, + partitionKey: this.partitionKey || undefined, + roleArn: role.roleArn, + }, + }, + }; + } +} diff --git a/packages/@aws-cdk/aws-iot-actions/package.json b/packages/@aws-cdk/aws-iot-actions/package.json index bc112214866be..f03c1b14f1bb2 100644 --- a/packages/@aws-cdk/aws-iot-actions/package.json +++ b/packages/@aws-cdk/aws-iot-actions/package.json @@ -90,6 +90,7 @@ "@aws-cdk/aws-cloudwatch": "0.0.0", "@aws-cdk/aws-iam": "0.0.0", "@aws-cdk/aws-iot": "0.0.0", + "@aws-cdk/aws-kinesis": "0.0.0", "@aws-cdk/aws-kinesisfirehose": "0.0.0", "@aws-cdk/aws-lambda": "0.0.0", "@aws-cdk/aws-logs": "0.0.0", @@ -104,6 +105,7 @@ "@aws-cdk/aws-cloudwatch": "0.0.0", "@aws-cdk/aws-iam": "0.0.0", "@aws-cdk/aws-iot": "0.0.0", + "@aws-cdk/aws-kinesis": "0.0.0", "@aws-cdk/aws-kinesisfirehose": "0.0.0", "@aws-cdk/aws-lambda": "0.0.0", "@aws-cdk/aws-logs": "0.0.0", diff --git a/packages/@aws-cdk/aws-iot-actions/test/kinesis-stream/integ.kinesis-put-record-action.expected.json b/packages/@aws-cdk/aws-iot-actions/test/kinesis-stream/integ.kinesis-put-record-action.expected.json new file mode 100644 index 0000000000000..89229bf4731e3 --- /dev/null +++ b/packages/@aws-cdk/aws-iot-actions/test/kinesis-stream/integ.kinesis-put-record-action.expected.json @@ -0,0 +1,116 @@ +{ + "Resources": { + "TopicRule40A4EA44": { + "Type": "AWS::IoT::TopicRule", + "Properties": { + "TopicRulePayload": { + "Actions": [ + { + "Kinesis": { + "PartitionKey": "${timestamp()}", + "RoleArn": { + "Fn::GetAtt": [ + "TopicRuleTopicRuleActionRole246C4F77", + "Arn" + ] + }, + "StreamName": { + "Ref": "MyStream5C050E93" + } + } + } + ], + "AwsIotSqlVersion": "2016-03-23", + "Sql": "SELECT * FROM 'device/+/data'" + } + } + }, + "TopicRuleTopicRuleActionRole246C4F77": { + "Type": "AWS::IAM::Role", + "Properties": { + "AssumeRolePolicyDocument": { + "Statement": [ + { + "Action": "sts:AssumeRole", + "Effect": "Allow", + "Principal": { + "Service": "iot.amazonaws.com" + } + } + ], + "Version": "2012-10-17" + } + } + }, + "TopicRuleTopicRuleActionRoleDefaultPolicy99ADD687": { + "Type": "AWS::IAM::Policy", + "Properties": { + "PolicyDocument": { + "Statement": [ + { + "Action": "kinesis:PutRecord", + "Effect": "Allow", + "Resource": { + "Fn::GetAtt": [ + "MyStream5C050E93", + "Arn" + ] + } + } + ], + "Version": "2012-10-17" + }, + "PolicyName": "TopicRuleTopicRuleActionRoleDefaultPolicy99ADD687", + "Roles": [ + { + "Ref": "TopicRuleTopicRuleActionRole246C4F77" + } + ] + } + }, + "MyStream5C050E93": { + "Type": "AWS::Kinesis::Stream", + "Properties": { + "RetentionPeriodHours": 24, + "ShardCount": 3, + "StreamEncryption": { + "Fn::If": [ + "AwsCdkKinesisEncryptedStreamsUnsupportedRegions", + { + "Ref": "AWS::NoValue" + }, + { + "EncryptionType": "KMS", + "KeyId": "alias/aws/kinesis" + } + ] + }, + "StreamModeDetails": { + "StreamMode": "PROVISIONED" + } + } + } + }, + "Conditions": { + "AwsCdkKinesisEncryptedStreamsUnsupportedRegions": { + "Fn::Or": [ + { + "Fn::Equals": [ + { + "Ref": "AWS::Region" + }, + "cn-north-1" + ] + }, + { + "Fn::Equals": [ + { + "Ref": "AWS::Region" + }, + "cn-northwest-1" + ] + } + ] + } + } +} \ No newline at end of file diff --git a/packages/@aws-cdk/aws-iot-actions/test/kinesis-stream/integ.kinesis-put-record-action.ts b/packages/@aws-cdk/aws-iot-actions/test/kinesis-stream/integ.kinesis-put-record-action.ts new file mode 100644 index 0000000000000..953f370957ec3 --- /dev/null +++ b/packages/@aws-cdk/aws-iot-actions/test/kinesis-stream/integ.kinesis-put-record-action.ts @@ -0,0 +1,27 @@ +import * as iot from '@aws-cdk/aws-iot'; +import * as kinesis from '@aws-cdk/aws-kinesis'; +import * as cdk from '@aws-cdk/core'; +import * as actions from '../../lib'; + +class TestStack extends cdk.Stack { + constructor(scope: cdk.App, id: string, props?: cdk.StackProps) { + super(scope, id, props); + + const topicRule = new iot.TopicRule(this, 'TopicRule', { + sql: iot.IotSql.fromStringAsVer20160323( + "SELECT * FROM 'device/+/data'", + ), + }); + + const stream = new kinesis.Stream(this, 'MyStream', { + shardCount: 3, + }); + topicRule.addAction(new actions.KinesisPutRecordAction(stream, { + partitionKey: '${timestamp()}', + })); + } +} + +const app = new cdk.App(); +new TestStack(app, 'test-kinesis-stream-action-stack'); +app.synth(); diff --git a/packages/@aws-cdk/aws-iot-actions/test/kinesis-stream/kinesis-put-record-action.test.ts b/packages/@aws-cdk/aws-iot-actions/test/kinesis-stream/kinesis-put-record-action.test.ts new file mode 100644 index 0000000000000..05a0d603b4046 --- /dev/null +++ b/packages/@aws-cdk/aws-iot-actions/test/kinesis-stream/kinesis-put-record-action.test.ts @@ -0,0 +1,122 @@ +import { Template, Match } from '@aws-cdk/assertions'; +import * as iam from '@aws-cdk/aws-iam'; +import * as iot from '@aws-cdk/aws-iot'; +import * as kinesis from '@aws-cdk/aws-kinesis'; +import * as cdk from '@aws-cdk/core'; +import * as actions from '../../lib'; + +test('Default kinesis stream action', () => { + // GIVEN + const stack = new cdk.Stack(); + const topicRule = new iot.TopicRule(stack, 'MyTopicRule', { + sql: iot.IotSql.fromStringAsVer20160323("SELECT topic(2) as device_id FROM 'device/+/data'"), + }); + const stream = kinesis.Stream.fromStreamArn(stack, 'MyStream', 'arn:aws:kinesis:xx-west-1:111122223333:stream/my-stream'); + + // WHEN + topicRule.addAction(new actions.KinesisPutRecordAction(stream, { + partitionKey: '${newuuid()}', + })); + + // THEN + Template.fromStack(stack).hasResourceProperties('AWS::IoT::TopicRule', { + TopicRulePayload: { + Actions: [ + { + Kinesis: { + StreamName: 'my-stream', + PartitionKey: '${newuuid()}', + RoleArn: { + 'Fn::GetAtt': ['MyTopicRuleTopicRuleActionRoleCE2D05DA', 'Arn'], + }, + }, + }, + ], + }, + }); + + Template.fromStack(stack).hasResourceProperties('AWS::IAM::Role', { + AssumeRolePolicyDocument: { + Statement: [ + { + Action: 'sts:AssumeRole', + Effect: 'Allow', + Principal: { + Service: 'iot.amazonaws.com', + }, + }, + ], + Version: '2012-10-17', + }, + }); + + Template.fromStack(stack).hasResourceProperties('AWS::IAM::Policy', { + PolicyDocument: { + Statement: [ + { + Action: 'kinesis:PutRecord', + Effect: 'Allow', + Resource: 'arn:aws:kinesis:xx-west-1:111122223333:stream/my-stream', + }, + ], + Version: '2012-10-17', + }, + PolicyName: 'MyTopicRuleTopicRuleActionRoleDefaultPolicy54A701F7', + Roles: [ + { Ref: 'MyTopicRuleTopicRuleActionRoleCE2D05DA' }, + ], + }); +}); + +test('passes undefined to partitionKey if empty string is given', () => { + // GIVEN + const stack = new cdk.Stack(); + const topicRule = new iot.TopicRule(stack, 'MyTopicRule', { + sql: iot.IotSql.fromStringAsVer20160323("SELECT topic(2) as device_id FROM 'device/+/data'"), + }); + const stream = kinesis.Stream.fromStreamArn(stack, 'MyStream', 'arn:aws:kinesis:xx-west-1:111122223333:stream/my-stream'); + + // WHEN + topicRule.addAction(new actions.KinesisPutRecordAction(stream, { + partitionKey: '', + })); + + // THEN + Template.fromStack(stack).hasResourceProperties('AWS::IoT::TopicRule', { + TopicRulePayload: { + Actions: [ + Match.objectLike({ Kinesis: { PartitionKey: Match.absent() } }), + ], + }, + }); +}); + +test('can set role', () => { + // GIVEN + const stack = new cdk.Stack(); + const topicRule = new iot.TopicRule(stack, 'MyTopicRule', { + sql: iot.IotSql.fromStringAsVer20160323("SELECT topic(2) as device_id FROM 'device/+/data'"), + }); + const stream = kinesis.Stream.fromStreamArn(stack, 'MyStream', 'arn:aws:kinesis:xx-west-1:111122223333:stream/my-stream'); + const role = iam.Role.fromRoleArn(stack, 'MyRole', 'arn:aws:iam::123456789012:role/ForTest'); + + // WHEN + topicRule.addAction(new actions.KinesisPutRecordAction(stream, { + partitionKey: '${newuuid()}', + role, + })); + + // THEN + Template.fromStack(stack).hasResourceProperties('AWS::IoT::TopicRule', { + TopicRulePayload: { + Actions: [ + Match.objectLike({ Kinesis: { RoleArn: 'arn:aws:iam::123456789012:role/ForTest' } }), + ], + }, + }); + + Template.fromStack(stack).hasResourceProperties('AWS::IAM::Policy', { + PolicyName: 'MyRolePolicy64AB00A5', + Roles: ['ForTest'], + }); +});