From e2166fbd36682bc1f6bfbde5be00a9d82b7f227f Mon Sep 17 00:00:00 2001 From: Niko Virtala Date: Fri, 13 Sep 2024 13:24:08 +0300 Subject: [PATCH] chore(lambda-event-sources): starting position timestamp for kafka --- .../aws-lambda-event-sources/lib/kafka.ts | 27 ++++++ .../test/kafka.test.ts | 93 +++++++++++++++++++ 2 files changed, 120 insertions(+) diff --git a/packages/aws-cdk-lib/aws-lambda-event-sources/lib/kafka.ts b/packages/aws-cdk-lib/aws-lambda-event-sources/lib/kafka.ts index 3c75a45a51447..95fcace8848c8 100644 --- a/packages/aws-cdk-lib/aws-lambda-event-sources/lib/kafka.ts +++ b/packages/aws-cdk-lib/aws-lambda-event-sources/lib/kafka.ts @@ -55,6 +55,13 @@ export interface KafkaEventSourceProps extends BaseStreamEventSourceProps { * @default - discarded records are ignored */ readonly onFailure?: lambda.IEventSourceDlq; + + /** + * The time from which to start reading, in Unix time seconds. + * + * @default - no timestamp + */ + readonly startingPositionTimestamp?: number; } /** @@ -148,6 +155,15 @@ export class ManagedKafkaEventSource extends StreamEventSource { constructor(props: ManagedKafkaEventSourceProps) { super(props); + + if (props.startingPosition === lambda.StartingPosition.AT_TIMESTAMP && !props.startingPositionTimestamp) { + throw new Error('startingPositionTimestamp must be provided when startingPosition is AT_TIMESTAMP'); + } + + if (props.startingPosition !== lambda.StartingPosition.AT_TIMESTAMP && props.startingPositionTimestamp) { + throw new Error('startingPositionTimestamp can only be used when startingPosition is AT_TIMESTAMP'); + } + this.innerProps = props; } @@ -159,6 +175,7 @@ export class ManagedKafkaEventSource extends StreamEventSource { filters: this.innerProps.filters, filterEncryption: this.innerProps.filterEncryption, startingPosition: this.innerProps.startingPosition, + startingPositionTimestamp: this.innerProps.startingPositionTimestamp, sourceAccessConfigurations: this.sourceAccessConfigurations(), kafkaTopic: this.innerProps.topic, kafkaConsumerGroupId: this.innerProps.consumerGroupId, @@ -239,6 +256,15 @@ export class SelfManagedKafkaEventSource extends StreamEventSource { } else if (!props.secret) { throw new Error('secret must be set if Kafka brokers accessed over Internet'); } + + if (props.startingPosition === lambda.StartingPosition.AT_TIMESTAMP && !props.startingPositionTimestamp) { + throw new Error('startingPositionTimestamp must be provided when startingPosition is AT_TIMESTAMP'); + } + + if (props.startingPosition !== lambda.StartingPosition.AT_TIMESTAMP && props.startingPositionTimestamp) { + throw new Error('startingPositionTimestamp can only be used when startingPosition is AT_TIMESTAMP'); + } + this.innerProps = props; } @@ -253,6 +279,7 @@ export class SelfManagedKafkaEventSource extends StreamEventSource { kafkaTopic: this.innerProps.topic, kafkaConsumerGroupId: this.innerProps.consumerGroupId, startingPosition: this.innerProps.startingPosition, + startingPositionTimestamp: this.innerProps.startingPositionTimestamp, sourceAccessConfigurations: this.sourceAccessConfigurations(), onFailure: this.innerProps.onFailure, supportS3OnFailureDestination: true, diff --git a/packages/aws-cdk-lib/aws-lambda-event-sources/test/kafka.test.ts b/packages/aws-cdk-lib/aws-lambda-event-sources/test/kafka.test.ts index d5b1df2c9a657..074e34aeb938a 100644 --- a/packages/aws-cdk-lib/aws-lambda-event-sources/test/kafka.test.ts +++ b/packages/aws-cdk-lib/aws-lambda-event-sources/test/kafka.test.ts @@ -308,6 +308,48 @@ describe('KafkaEventSource', () => { }); }); + test('AT_TIMESTAMP starting position', () => { + const stack = new cdk.Stack(); + const fn = new TestFunction(stack, 'Fn'); + const clusterArn = 'some-arn'; + const kafkaTopic = 'some-topic'; + + fn.addEventSource(new sources.ManagedKafkaEventSource({ + clusterArn, + topic: kafkaTopic, + startingPosition: lambda.StartingPosition.AT_TIMESTAMP, + startingPositionTimestamp: 1640995200, + }), + ); + + Template.fromStack(stack).hasResourceProperties('AWS::Lambda::EventSourceMapping', { + StartingPosition: 'AT_TIMESTAMP', + StartingPositionTimestamp: 1640995200, + }); + }); + + test('startingPositionTimestamp missing throws error', () => { + const clusterArn = 'some-arn'; + const kafkaTopic = 'some-topic'; + + expect(() => new sources.ManagedKafkaEventSource({ + clusterArn, + topic: kafkaTopic, + startingPosition: lambda.StartingPosition.AT_TIMESTAMP, + })).toThrow(/startingPositionTimestamp must be provided when startingPosition is AT_TIMESTAMP/); + }); + + test('startingPositionTimestamp without AT_TIMESTAMP throws error', () => { + const clusterArn = 'some-arn'; + const kafkaTopic = 'some-topic'; + + expect(() => new sources.ManagedKafkaEventSource({ + clusterArn, + topic: kafkaTopic, + startingPosition: lambda.StartingPosition.LATEST, + startingPositionTimestamp: 1640995200, + })).toThrow(/startingPositionTimestamp can only be used when startingPosition is AT_TIMESTAMP/); + }); }); describe('self-managed kafka', () => { @@ -998,5 +1040,56 @@ describe('KafkaEventSource', () => { expect(mskEventMapping.eventSourceMappingId).toBeDefined(); expect(mskEventMapping.eventSourceMappingArn).toBeDefined(); }); + + test('AT_TIMESTAMP starting position', () => { + const stack = new cdk.Stack(); + const fn = new TestFunction(stack, 'Fn'); + const bootstrapServers = ['kafka-broker:9092']; + const secret = new Secret(stack, 'Secret', { secretName: 'AmazonMSK_KafkaSecret' }); + const kafkaTopic = 'some-topic'; + + fn.addEventSource(new sources.SelfManagedKafkaEventSource({ + bootstrapServers, + secret: secret, + topic: kafkaTopic, + startingPosition: lambda.StartingPosition.AT_TIMESTAMP, + startingPositionTimestamp: 1640995200, + }), + ); + + Template.fromStack(stack).hasResourceProperties('AWS::Lambda::EventSourceMapping', { + StartingPosition: 'AT_TIMESTAMP', + StartingPositionTimestamp: 1640995200, + }); + }); + + test('startingPositionTimestamp missing throws error', () => { + const stack = new cdk.Stack(); + const bootstrapServers = ['kafka-broker:9092']; + const secret = new Secret(stack, 'Secret', { secretName: 'AmazonMSK_KafkaSecret' }); + const kafkaTopic = 'some-topic'; + + expect(() => new sources.SelfManagedKafkaEventSource({ + bootstrapServers, + secret: secret, + topic: kafkaTopic, + startingPosition: lambda.StartingPosition.AT_TIMESTAMP, + })).toThrow(/startingPositionTimestamp must be provided when startingPosition is AT_TIMESTAMP/); + }); + + test('startingPositionTimestamp without AT_TIMESTAMP throws error', () => { + const stack = new cdk.Stack(); + const bootstrapServers = ['kafka-broker:9092']; + const secret = new Secret(stack, 'Secret', { secretName: 'AmazonMSK_KafkaSecret' }); + const kafkaTopic = 'some-topic'; + + expect(() => new sources.SelfManagedKafkaEventSource({ + bootstrapServers, + secret: secret, + topic: kafkaTopic, + startingPosition: lambda.StartingPosition.LATEST, + startingPositionTimestamp: 1640995200, + })).toThrow(/startingPositionTimestamp can only be used when startingPosition is AT_TIMESTAMP/); + }); }); });