From 91623ee6658a04493d2d6cbc637291bb0c12a8a4 Mon Sep 17 00:00:00 2001 From: Luke Young <91491244+lyoung-confluent@users.noreply.github.com> Date: Tue, 30 Aug 2022 09:00:11 -0700 Subject: [PATCH 01/13] Add KafkaEvent and KafkaEventRecord data class --- .../utilities/data_classes/kafka_event.py | 85 +++++++++++++++++++ 1 file changed, 85 insertions(+) create mode 100644 aws_lambda_powertools/utilities/data_classes/kafka_event.py diff --git a/aws_lambda_powertools/utilities/data_classes/kafka_event.py b/aws_lambda_powertools/utilities/data_classes/kafka_event.py new file mode 100644 index 00000000000..ce510976d27 --- /dev/null +++ b/aws_lambda_powertools/utilities/data_classes/kafka_event.py @@ -0,0 +1,85 @@ +from base64 import b64decode +from typing import Dict, Iterator + +from aws_lambda_powertools.utilities.data_classes.common import DictWrapper + + +class KafkaEventRecord(DictWrapper): + @property + def topic(self) -> str: + """The Kafka topic.""" + return self["topic"] + + @property + def partition(self) -> str: + """The Kafka record parition.""" + return self["partition"] + + @property + def offset(self) -> str: + """The Kafka record offset.""" + return self["offset"] + + @property + def timestamp(self) -> int: + """The Kafka record timestamp.""" + return self["timestamp"] + + @property + def timestamp_type(self) -> str: + """The Kafka record timestamp type.""" + return self["timestamp_type"] + + @property + def key(self) -> bytes: + """The base64 decoded Kafka record key.""" + return b64decode(self.key) + + @property + def value(self) -> bytes: + """The base64 decoded Kafka record value.""" + return b64decode(self.value) + + @property + def headers(self) -> Dict[str, bytes]: + """The decoded Kafka record headers.""" + headers = {} + for chunk in self["headers"]: + for key, val in chunk.items(): + headers[key] = bytes(val) + return headers + + +class KafkaEvent(DictWrapper): + """Self-managed Apache Kafka event trigger + Documentation: + -------------- + - https://docs.aws.amazon.com/lambda/latest/dg/with-kafka.html + """ + + @property + def event_source(self) -> str: + """The AWS service from which the Kafka event record originated.""" + return self["eventSource"] + + @property + def event_source_arn(self) -> str: + """The AWS service ARN from which the Kafka event record originated.""" + return self["eventSourceArn"] + + @property + def bootstrap_servers(self) -> str: + """The Kafka bootstrap URL.""" + return self["bootstrapServers"] + + @property + def records(self) -> Iterator[KafkaEventRecord]: + """The Kafka records.""" + for chunk in self["records"].values(): + for record in chunk: + yield KafkaEventRecord(record) + + @property + def record(self) -> KafkaEventRecord: + """The next Kafka record.""" + return next(self.records) From 2f83089038564267eec1dc3992de391f4aea19e5 Mon Sep 17 00:00:00 2001 From: Luke Young <91491244+lyoung-confluent@users.noreply.github.com> Date: Tue, 30 Aug 2022 09:05:18 -0700 Subject: [PATCH 02/13] Add KafkaEvent to __init__.py --- aws_lambda_powertools/utilities/data_classes/__init__.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/aws_lambda_powertools/utilities/data_classes/__init__.py b/aws_lambda_powertools/utilities/data_classes/__init__.py index 20a41310523..83c8cbba47e 100644 --- a/aws_lambda_powertools/utilities/data_classes/__init__.py +++ b/aws_lambda_powertools/utilities/data_classes/__init__.py @@ -13,6 +13,7 @@ from .event_bridge_event import EventBridgeEvent from .event_source import event_source from .kinesis_stream_event import KinesisStreamEvent +from .kafka import KafkaEvent from .lambda_function_url_event import LambdaFunctionUrlEvent from .s3_event import S3Event from .ses_event import SESEvent @@ -30,6 +31,7 @@ "ConnectContactFlowEvent", "DynamoDBStreamEvent", "EventBridgeEvent", + "KafkaEvent", "KinesisStreamEvent", "LambdaFunctionUrlEvent", "S3Event", From c3d17240b779e8e706e748f24a4ab0c8e6f6da6c Mon Sep 17 00:00:00 2001 From: Luke Young Date: Tue, 30 Aug 2022 09:41:49 -0700 Subject: [PATCH 03/13] Mirror the pattern from kinesis/rabbitmq --- .../utilities/data_classes/kafka_event.py | 68 ++++++++++++++----- 1 file changed, 52 insertions(+), 16 deletions(-) diff --git a/aws_lambda_powertools/utilities/data_classes/kafka_event.py b/aws_lambda_powertools/utilities/data_classes/kafka_event.py index ce510976d27..872baac5f7c 100644 --- a/aws_lambda_powertools/utilities/data_classes/kafka_event.py +++ b/aws_lambda_powertools/utilities/data_classes/kafka_event.py @@ -1,7 +1,8 @@ -from base64 import b64decode -from typing import Dict, Iterator +import json +import base64 +from typing import Any, Dict, Iterator -from aws_lambda_powertools.utilities.data_classes.common import DictWrapper +from aws_lambda_powertools.utilities.data_classes.common import DictWrapper, get_header_value class KafkaEventRecord(DictWrapper): @@ -31,23 +32,58 @@ def timestamp_type(self) -> str: return self["timestamp_type"] @property - def key(self) -> bytes: - """The base64 decoded Kafka record key.""" - return b64decode(self.key) + def key(self) -> str: + """The raw (base64 encoded) Kafka record key.""" + return self["key"] @property - def value(self) -> bytes: - """The base64 decoded Kafka record value.""" - return b64decode(self.value) + def decoded_key(self) -> bytes: + """Decode the base64 encoded key as bytes.""" + return base64.b64decode(self.key) @property - def headers(self) -> Dict[str, bytes]: - """The decoded Kafka record headers.""" - headers = {} - for chunk in self["headers"]: - for key, val in chunk.items(): - headers[key] = bytes(val) - return headers + def value(self) -> str: + """The raw (base64 encoded) Kafka record value.""" + return self["value"] + + @property + def decoded_value(self) -> bytes: + """Decodes the base64 encoded value as bytes.""" + return base64.b64decode(self.value) + + @property + def json_value(self) -> Any: + """Decodes the text encoded data as JSON.""" + if self._json_data is None: + self._json_data = json.loads(self.decoded_value.decode("utf-8")) + return self._json_data + + @property + def headers(self) -> List[Dict[str, List[int]]]: + """The raw Kafka record headers.""" + return self["headers"] + + @property + def decoded_headers(self) -> Dict[str, bytes]: + """Decodes the headers as a single dictionary.""" + if self._headers is None: + self._headers = {k: bytes(v) for k, v in chunk.items() for chunk in self.headers} + return self._headers + + def get_header_value( + self, name: str, default_value: Optional[Any] = None, case_sensitive: bool = True + ) -> Optional[bytes]: + """Get a decoded header value by name.""" + if case_sensitive: + return self.decoded_headers.get(name, default_value) + name_lower = name.lower() + + return next( + # Iterate over the dict and do a case-insensitive key comparison + (value for key, value in self.decoded_headers.items() if key.lower() == name_lower), + # Default value is returned if no matches was found + default_value, + ) class KafkaEvent(DictWrapper): From 624d6522a4ea019cd32a4a444b3cfa7bf43a1606 Mon Sep 17 00:00:00 2001 From: Luke Young <91491244+lyoung-confluent@users.noreply.github.com> Date: Tue, 30 Aug 2022 09:42:41 -0700 Subject: [PATCH 04/13] Add docs --- docs/utilities/data_classes.md | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/docs/utilities/data_classes.md b/docs/utilities/data_classes.md index 86cbebd3c97..67d821fe04f 100644 --- a/docs/utilities/data_classes.md +++ b/docs/utilities/data_classes.md @@ -75,6 +75,7 @@ Event Source | Data_class [Connect Contact Flow](#connect-contact-flow) | `ConnectContactFlowEvent` [DynamoDB streams](#dynamodb-streams) | `DynamoDBStreamEvent`, `DynamoDBRecordEventName` [EventBridge](#eventbridge) | `EventBridgeEvent` +[Kafka](#kafka) | `KafkaEvent` [Kinesis Data Stream](#kinesis-streams) | `KinesisStreamEvent` [Lambda Function URL](#lambda-function-url) | `LambdaFunctionUrlEvent` [Rabbit MQ](#rabbit-mq) | `RabbitMQEvent` @@ -852,6 +853,22 @@ attributes values (`AttributeValue`), as well as enums for stream view type (`St ``` +### Kafka + +This example is based on the AWS docs for [Amazon MSK](https://docs.aws.amazon.com/lambda/latest/dg/with-msk.html){target="_blank"} and [self-managed Apache Kafka](https://docs.aws.amazon.com/lambda/latest/dg/with-kafka.html){target="_blank"}. + +=== "app.py" + + ```python + from aws_lambda_powertools.utilities.data_classes import event_source, KafkaEvent + + @event_source(data_class=KafkaEvent) + def lambda_handler(event: KafkaEvent, context): + for record in event.records: + do_something_with(record.decoded_key, record.json_value) + + ``` + ### Kinesis streams Kinesis events by default contain base64 encoded data. You can use the helper function to access the data either as json From c5cf3d7465a847a6ce6350a54c9cbef755ea2c6c Mon Sep 17 00:00:00 2001 From: Luke Young Date: Tue, 30 Aug 2022 10:12:39 -0700 Subject: [PATCH 05/13] Add tests, fix implementation bugs --- .../utilities/data_classes/__init__.py | 2 +- .../utilities/data_classes/kafka_event.py | 18 +++++----- tests/events/kafkaEvent.json | 34 +++++++++++++++++++ tests/functional/test_data_classes.py | 25 ++++++++++++++ 4 files changed, 69 insertions(+), 10 deletions(-) create mode 100644 tests/events/kafkaEvent.json diff --git a/aws_lambda_powertools/utilities/data_classes/__init__.py b/aws_lambda_powertools/utilities/data_classes/__init__.py index 83c8cbba47e..8ed77f9f3a3 100644 --- a/aws_lambda_powertools/utilities/data_classes/__init__.py +++ b/aws_lambda_powertools/utilities/data_classes/__init__.py @@ -12,8 +12,8 @@ from .dynamo_db_stream_event import DynamoDBStreamEvent from .event_bridge_event import EventBridgeEvent from .event_source import event_source +from .kafka_event import KafkaEvent from .kinesis_stream_event import KinesisStreamEvent -from .kafka import KafkaEvent from .lambda_function_url_event import LambdaFunctionUrlEvent from .s3_event import S3Event from .ses_event import SESEvent diff --git a/aws_lambda_powertools/utilities/data_classes/kafka_event.py b/aws_lambda_powertools/utilities/data_classes/kafka_event.py index 872baac5f7c..2d2b6303fb6 100644 --- a/aws_lambda_powertools/utilities/data_classes/kafka_event.py +++ b/aws_lambda_powertools/utilities/data_classes/kafka_event.py @@ -1,8 +1,8 @@ -import json import base64 -from typing import Any, Dict, Iterator +import json +from typing import Any, Dict, Iterator, List, Optional -from aws_lambda_powertools.utilities.data_classes.common import DictWrapper, get_header_value +from aws_lambda_powertools.utilities.data_classes.common import DictWrapper class KafkaEventRecord(DictWrapper): @@ -29,7 +29,7 @@ def timestamp(self) -> int: @property def timestamp_type(self) -> str: """The Kafka record timestamp type.""" - return self["timestamp_type"] + return self["timestampType"] @property def key(self) -> str: @@ -66,9 +66,7 @@ def headers(self) -> List[Dict[str, List[int]]]: @property def decoded_headers(self) -> Dict[str, bytes]: """Decodes the headers as a single dictionary.""" - if self._headers is None: - self._headers = {k: bytes(v) for k, v in chunk.items() for chunk in self.headers} - return self._headers + return {k: bytes(v) for chunk in self.headers for k, v in chunk.items()} def get_header_value( self, name: str, default_value: Optional[Any] = None, case_sensitive: bool = True @@ -104,9 +102,11 @@ def event_source_arn(self) -> str: return self["eventSourceArn"] @property - def bootstrap_servers(self) -> str: + def bootstrap_servers(self) -> List[str]: """The Kafka bootstrap URL.""" - return self["bootstrapServers"] + if "bootstrapServers" not in self: + return None + return self["bootstrapServers"].split(",") @property def records(self) -> Iterator[KafkaEventRecord]: diff --git a/tests/events/kafkaEvent.json b/tests/events/kafkaEvent.json new file mode 100644 index 00000000000..563163c9997 --- /dev/null +++ b/tests/events/kafkaEvent.json @@ -0,0 +1,34 @@ +{ + "eventSource":"aws:SelfManagedKafka", + "bootstrapServers":"b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092", + "records":{ + "mytopic-0":[ + { + "topic":"mytopic", + "partition":0, + "offset":15, + "timestamp":1545084650987, + "timestampType":"CREATE_TIME", + "key":"cmVjb3JkS2V5", + "value":"eyJrZXkiOiJ2YWx1ZSJ9", + "headers":[ + { + "headerKey":[ + 104, + 101, + 97, + 100, + 101, + 114, + 86, + 97, + 108, + 117, + 101 + ] + } + ] + } + ] + } +} \ No newline at end of file diff --git a/tests/functional/test_data_classes.py b/tests/functional/test_data_classes.py index 5c7423add64..107fd3a6319 100644 --- a/tests/functional/test_data_classes.py +++ b/tests/functional/test_data_classes.py @@ -17,6 +17,7 @@ CloudWatchLogsEvent, CodePipelineJobEvent, EventBridgeEvent, + KafkaEvent, KinesisStreamEvent, S3Event, SESEvent, @@ -1138,6 +1139,30 @@ def test_base_proxy_event_json_body_with_base64_encoded_data(): assert event.json_body == data +def test_kafka_event(): + event = KafkaEvent(load_event("kafkaEvent.json")) + assert event.event_source == "aws:SelfManagedKafka" + bootstrap_servers = [ + "b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092", + "b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092", + ] + assert event.bootstrap_servers == bootstrap_servers + + records = list(event.records) + assert len(records) == 1 + record = records[0] + assert record.topic == "mytopic" + assert record.partition == 0 + assert record.offset == 15 + assert record.timestamp == 1545084650987 + assert record.timestamp_type == "CREATE_TIME" + assert record.decoded_key == b"recordKey" + assert record.value == "eyJrZXkiOiJ2YWx1ZSJ9" + assert record.json_value == {"key": "value"} + assert record.decoded_headers == {"headerKey": b"headerValue"} + assert record.get_header_value("HeaderKey", case_sensitive=False) == b"headerValue" + + def test_kinesis_stream_event(): event = KinesisStreamEvent(load_event("kinesisStreamEvent.json")) From ed34c988af9fb660726dbc223415ae97d92d42a7 Mon Sep 17 00:00:00 2001 From: Luke Young <91491244+lyoung-confluent@users.noreply.github.com> Date: Wed, 31 Aug 2022 09:41:26 -0700 Subject: [PATCH 06/13] Fix typing on bootstrap_servers result --- aws_lambda_powertools/utilities/data_classes/kafka_event.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aws_lambda_powertools/utilities/data_classes/kafka_event.py b/aws_lambda_powertools/utilities/data_classes/kafka_event.py index 2d2b6303fb6..01b7a4cb551 100644 --- a/aws_lambda_powertools/utilities/data_classes/kafka_event.py +++ b/aws_lambda_powertools/utilities/data_classes/kafka_event.py @@ -102,7 +102,7 @@ def event_source_arn(self) -> str: return self["eventSourceArn"] @property - def bootstrap_servers(self) -> List[str]: + def bootstrap_servers(self) -> Optional[List[str]]: """The Kafka bootstrap URL.""" if "bootstrapServers" not in self: return None From 4c8b95797ca56ee254abf46944d41aeb5547db9b Mon Sep 17 00:00:00 2001 From: Luke Young Date: Wed, 31 Aug 2022 21:53:45 -0700 Subject: [PATCH 07/13] Fix typing, mypy runs locally now --- aws_lambda_powertools/utilities/data_classes/kafka_event.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/aws_lambda_powertools/utilities/data_classes/kafka_event.py b/aws_lambda_powertools/utilities/data_classes/kafka_event.py index 01b7a4cb551..a079976a8d2 100644 --- a/aws_lambda_powertools/utilities/data_classes/kafka_event.py +++ b/aws_lambda_powertools/utilities/data_classes/kafka_event.py @@ -104,9 +104,10 @@ def event_source_arn(self) -> str: @property def bootstrap_servers(self) -> Optional[List[str]]: """The Kafka bootstrap URL.""" - if "bootstrapServers" not in self: + servers = self.get("bootstrapServers") + if not servers: return None - return self["bootstrapServers"].split(",") + return servers.split(",") @property def records(self) -> Iterator[KafkaEventRecord]: From 3d9dd70343b60199ccc6de567817cf8d2ecc2849 Mon Sep 17 00:00:00 2001 From: Luke Young <91491244+lyoung-confluent@users.noreply.github.com> Date: Mon, 5 Sep 2022 15:21:08 +0200 Subject: [PATCH 08/13] address PR feedback --- .../utilities/data_classes/kafka_event.py | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/aws_lambda_powertools/utilities/data_classes/kafka_event.py b/aws_lambda_powertools/utilities/data_classes/kafka_event.py index a079976a8d2..17c9eddfe37 100644 --- a/aws_lambda_powertools/utilities/data_classes/kafka_event.py +++ b/aws_lambda_powertools/utilities/data_classes/kafka_event.py @@ -97,17 +97,14 @@ def event_source(self) -> str: return self["eventSource"] @property - def event_source_arn(self) -> str: + def event_source_arn(self) -> Optional[str]: """The AWS service ARN from which the Kafka event record originated.""" - return self["eventSourceArn"] + return self.get("eventSourceArn") @property - def bootstrap_servers(self) -> Optional[List[str]]: + def bootstrap_servers(self) -> List[str]: """The Kafka bootstrap URL.""" - servers = self.get("bootstrapServers") - if not servers: - return None - return servers.split(",") + return self["bootstrapServers"].split(",") @property def records(self) -> Iterator[KafkaEventRecord]: From 001973758a9de9ff55160d7323b109778fb918b0 Mon Sep 17 00:00:00 2001 From: Luke Young <91491244+lyoung-confluent@users.noreply.github.com> Date: Mon, 5 Sep 2022 15:22:18 +0200 Subject: [PATCH 09/13] change kafkaEvent from self-managed to MSK --- tests/events/kafkaEvent.json | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/events/kafkaEvent.json b/tests/events/kafkaEvent.json index 563163c9997..5a35b89680a 100644 --- a/tests/events/kafkaEvent.json +++ b/tests/events/kafkaEvent.json @@ -1,5 +1,6 @@ { - "eventSource":"aws:SelfManagedKafka", + "eventSource":"aws:kafka", + "eventSourceArn":"arn:aws:kafka:us-east-1:0123456789019:cluster/SalesCluster/abcd1234-abcd-cafe-abab-9876543210ab-4", "bootstrapServers":"b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092", "records":{ "mytopic-0":[ @@ -31,4 +32,4 @@ } ] } -} \ No newline at end of file +} From a864e255b6bf7d3146840d7df383f2fece3935d3 Mon Sep 17 00:00:00 2001 From: Luke Young <91491244+lyoung-confluent@users.noreply.github.com> Date: Mon, 5 Sep 2022 15:26:10 +0200 Subject: [PATCH 10/13] adjust tests to use MSK event --- tests/functional/test_data_classes.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/functional/test_data_classes.py b/tests/functional/test_data_classes.py index 107fd3a6319..9eb6f204444 100644 --- a/tests/functional/test_data_classes.py +++ b/tests/functional/test_data_classes.py @@ -1141,7 +1141,8 @@ def test_base_proxy_event_json_body_with_base64_encoded_data(): def test_kafka_event(): event = KafkaEvent(load_event("kafkaEvent.json")) - assert event.event_source == "aws:SelfManagedKafka" + assert event.event_source == "aws:kafka" + assert event.event_source_arn == "arn:aws:kafka:us-east-1:0123456789019:cluster/SalesCluster/abcd1234-abcd-cafe-abab-9876543210ab-4" bootstrap_servers = [ "b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092", "b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092", From 041b45466169571f4089c68499dc71594d34a482 Mon Sep 17 00:00:00 2001 From: Luke Young <91491244+lyoung-confluent@users.noreply.github.com> Date: Mon, 5 Sep 2022 15:39:04 +0200 Subject: [PATCH 11/13] split bootstrap_servers into decoded_bootstrap_servers --- .../utilities/data_classes/kafka_event.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/aws_lambda_powertools/utilities/data_classes/kafka_event.py b/aws_lambda_powertools/utilities/data_classes/kafka_event.py index 17c9eddfe37..5e9201dae9f 100644 --- a/aws_lambda_powertools/utilities/data_classes/kafka_event.py +++ b/aws_lambda_powertools/utilities/data_classes/kafka_event.py @@ -102,9 +102,14 @@ def event_source_arn(self) -> Optional[str]: return self.get("eventSourceArn") @property - def bootstrap_servers(self) -> List[str]: + def bootstrap_servers(self) -> str: """The Kafka bootstrap URL.""" - return self["bootstrapServers"].split(",") + return self["bootstrapServers"] + + @property + def decoded_bootstrap_servers(self) -> List[str]: + """The decoded Kafka bootstrap URL.""" + return self.bootstrap_servers.split(",") @property def records(self) -> Iterator[KafkaEventRecord]: From f00d2c6f4c7fee9d39519189342f9deac9a88a9b Mon Sep 17 00:00:00 2001 From: Luke Young <91491244+lyoung-confluent@users.noreply.github.com> Date: Mon, 5 Sep 2022 15:39:35 +0200 Subject: [PATCH 12/13] Update tests to use decode_bootstrap_servers --- tests/functional/test_data_classes.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/functional/test_data_classes.py b/tests/functional/test_data_classes.py index 9eb6f204444..52c7998fb99 100644 --- a/tests/functional/test_data_classes.py +++ b/tests/functional/test_data_classes.py @@ -1147,7 +1147,7 @@ def test_kafka_event(): "b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092", "b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092", ] - assert event.bootstrap_servers == bootstrap_servers + assert event.decoded_bootstrap_servers == bootstrap_servers records = list(event.records) assert len(records) == 1 From bd9c8a65226bf9bd68809b0bddace53dccd60441 Mon Sep 17 00:00:00 2001 From: Leandro Damascena Date: Mon, 5 Sep 2022 15:23:13 +0100 Subject: [PATCH 13/13] feat(data-classes): adding more tests to kafka events --- .../{kafkaEvent.json => kafkaEventMsk.json} | 0 tests/events/kafkaEventSelfManaged.json | 34 ++++++++++++++ tests/functional/test_data_classes.py | 47 +++++++++++++++++-- 3 files changed, 76 insertions(+), 5 deletions(-) rename tests/events/{kafkaEvent.json => kafkaEventMsk.json} (100%) create mode 100644 tests/events/kafkaEventSelfManaged.json diff --git a/tests/events/kafkaEvent.json b/tests/events/kafkaEventMsk.json similarity index 100% rename from tests/events/kafkaEvent.json rename to tests/events/kafkaEventMsk.json diff --git a/tests/events/kafkaEventSelfManaged.json b/tests/events/kafkaEventSelfManaged.json new file mode 100644 index 00000000000..17372b7c243 --- /dev/null +++ b/tests/events/kafkaEventSelfManaged.json @@ -0,0 +1,34 @@ +{ + "eventSource":"aws:aws:SelfManagedKafka", + "bootstrapServers":"b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092", + "records":{ + "mytopic-0":[ + { + "topic":"mytopic", + "partition":0, + "offset":15, + "timestamp":1545084650987, + "timestampType":"CREATE_TIME", + "key":"cmVjb3JkS2V5", + "value":"eyJrZXkiOiJ2YWx1ZSJ9", + "headers":[ + { + "headerKey":[ + 104, + 101, + 97, + 100, + 101, + 114, + 86, + 97, + 108, + 117, + 101 + ] + } + ] + } + ] + } +} diff --git a/tests/functional/test_data_classes.py b/tests/functional/test_data_classes.py index 52c7998fb99..00dd5100f67 100644 --- a/tests/functional/test_data_classes.py +++ b/tests/functional/test_data_classes.py @@ -1139,15 +1139,52 @@ def test_base_proxy_event_json_body_with_base64_encoded_data(): assert event.json_body == data -def test_kafka_event(): - event = KafkaEvent(load_event("kafkaEvent.json")) +def test_kafka_msk_event(): + event = KafkaEvent(load_event("kafkaEventMsk.json")) assert event.event_source == "aws:kafka" - assert event.event_source_arn == "arn:aws:kafka:us-east-1:0123456789019:cluster/SalesCluster/abcd1234-abcd-cafe-abab-9876543210ab-4" - bootstrap_servers = [ + assert ( + event.event_source_arn + == "arn:aws:kafka:us-east-1:0123456789019:cluster/SalesCluster/abcd1234-abcd-cafe-abab-9876543210ab-4" + ) + + bootstrap_servers_raw = "b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092" # noqa E501 + + bootstrap_servers_list = [ "b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092", "b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092", ] - assert event.decoded_bootstrap_servers == bootstrap_servers + + assert event.bootstrap_servers == bootstrap_servers_raw + assert event.decoded_bootstrap_servers == bootstrap_servers_list + + records = list(event.records) + assert len(records) == 1 + record = records[0] + assert record.topic == "mytopic" + assert record.partition == 0 + assert record.offset == 15 + assert record.timestamp == 1545084650987 + assert record.timestamp_type == "CREATE_TIME" + assert record.decoded_key == b"recordKey" + assert record.value == "eyJrZXkiOiJ2YWx1ZSJ9" + assert record.json_value == {"key": "value"} + assert record.decoded_headers == {"headerKey": b"headerValue"} + assert record.get_header_value("HeaderKey", case_sensitive=False) == b"headerValue" + + +def test_kafka_self_managed_event(): + event = KafkaEvent(load_event("kafkaEventSelfManaged.json")) + assert event.event_source == "aws:aws:SelfManagedKafka" + + bootstrap_servers_raw = "b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092" # noqa E501 + + bootstrap_servers_list = [ + "b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092", + "b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092", + ] + + assert event.bootstrap_servers == bootstrap_servers_raw + assert event.decoded_bootstrap_servers == bootstrap_servers_list records = list(event.records) assert len(records) == 1