From c26692d6a9d9169445248cab6562bac492313bb7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Artur=20Ma=C5=82as?= Date: Tue, 13 May 2025 11:15:37 +0200 Subject: [PATCH 1/6] feat(parser): decompress data from KinesisDataStreamEnvelope if it contains compressed data --- .../utilities/parser/envelopes/kinesis.py | 17 ++++++++++++++++- tests/unit/parser/_pydantic/test_kinesis.py | 14 ++++++++++++++ 2 files changed, 30 insertions(+), 1 deletion(-) diff --git a/aws_lambda_powertools/utilities/parser/envelopes/kinesis.py b/aws_lambda_powertools/utilities/parser/envelopes/kinesis.py index a4d484931df..ccb508c88c8 100644 --- a/aws_lambda_powertools/utilities/parser/envelopes/kinesis.py +++ b/aws_lambda_powertools/utilities/parser/envelopes/kinesis.py @@ -1,6 +1,7 @@ from __future__ import annotations import logging +import zlib from typing import TYPE_CHECKING, Any, cast from aws_lambda_powertools.utilities.parser.envelopes.base import BaseEnvelope @@ -45,5 +46,19 @@ def parse(self, data: dict[str, Any] | Any | None, model: type[Model]) -> list[M for record in parsed_envelope.Records: # We allow either AWS expected contract (bytes) or a custom Model, see #943 data = cast(bytes, record.kinesis.data) - models.append(self._parse(data=data.decode("utf-8"), model=model)) + try: + models.append(self._parse(data=data.decode("utf-8"), model=model)) + # If the Data Stream contains compressed data eg. CloudWatch Logs + # `decode` method will throw a UnicodeDecodeError + # which signals that decompression might be required + except UnicodeDecodeError as ude: + try: + logger.debug(f"{type(ude).__name__}: {str(ude)} encountered. " + "Data will be decompressed with gzip.decompress().") + decompressed_data = zlib.decompress(data, zlib.MAX_WBITS | 32) + models.append( + self._parse(data=decompressed_data.decode("utf-8"), model=model) + ) + except Exception: + raise ValueError("Unable to decode and/or decompress data.") return models diff --git a/tests/unit/parser/_pydantic/test_kinesis.py b/tests/unit/parser/_pydantic/test_kinesis.py index 9da19ed3e0b..6041e5462a1 100644 --- a/tests/unit/parser/_pydantic/test_kinesis.py +++ b/tests/unit/parser/_pydantic/test_kinesis.py @@ -105,3 +105,17 @@ class DummyModel(BaseModel): ... for record in stream_data.Records: record.kinesis.data = DummyModel() record.decompress_zlib_record_data_as_json() + + +def test_kinesis_stream_event_with_cloud_watch_logs_data_using_envelope(): + # GIVEN Kinesis Data Stream event with compressed data + # such as CloudWatch Logs + raw_event = load_event("kinesisStreamCloudWatchLogsEvent.json") + + # WHEN parsing using KinesisDataStreamEvelope to CloudWatchLogsDecode + logs = envelopes.KinesisDataStreamEnvelope().parse( + raw_event, CloudWatchLogsDecode + ) + + # THEN logs should be extracted as CloudWatchLogsDecode objects + assert isinstance(logs[0], CloudWatchLogsDecode) From 7bea13421f58573f5a53a4bc31fda50814b763a4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Artur=20Ma=C5=82as?= Date: Wed, 14 May 2025 10:58:43 +0200 Subject: [PATCH 2/6] docs: correct comment about data decompression --- aws_lambda_powertools/utilities/parser/envelopes/kinesis.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aws_lambda_powertools/utilities/parser/envelopes/kinesis.py b/aws_lambda_powertools/utilities/parser/envelopes/kinesis.py index ccb508c88c8..66b905babd4 100644 --- a/aws_lambda_powertools/utilities/parser/envelopes/kinesis.py +++ b/aws_lambda_powertools/utilities/parser/envelopes/kinesis.py @@ -54,7 +54,7 @@ def parse(self, data: dict[str, Any] | Any | None, model: type[Model]) -> list[M except UnicodeDecodeError as ude: try: logger.debug(f"{type(ude).__name__}: {str(ude)} encountered. " - "Data will be decompressed with gzip.decompress().") + "Data will be decompressed with zlib.decompress().") decompressed_data = zlib.decompress(data, zlib.MAX_WBITS | 32) models.append( self._parse(data=decompressed_data.decode("utf-8"), model=model) From 6b1a93ba8041872e7525cbd587d5dc95b6dda9a9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Artur=20Ma=C5=82as?= Date: Wed, 14 May 2025 11:19:04 +0200 Subject: [PATCH 3/6] refactor: simplify KinesisDataStreamEnvelope decompression and decoding workflow --- .../utilities/parser/envelopes/kinesis.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/aws_lambda_powertools/utilities/parser/envelopes/kinesis.py b/aws_lambda_powertools/utilities/parser/envelopes/kinesis.py index 66b905babd4..f1f8f40c111 100644 --- a/aws_lambda_powertools/utilities/parser/envelopes/kinesis.py +++ b/aws_lambda_powertools/utilities/parser/envelopes/kinesis.py @@ -47,18 +47,17 @@ def parse(self, data: dict[str, Any] | Any | None, model: type[Model]) -> list[M # We allow either AWS expected contract (bytes) or a custom Model, see #943 data = cast(bytes, record.kinesis.data) try: - models.append(self._parse(data=data.decode("utf-8"), model=model)) + decoded_data = data.decode("utf-8") # If the Data Stream contains compressed data eg. CloudWatch Logs # `decode` method will throw a UnicodeDecodeError # which signals that decompression might be required except UnicodeDecodeError as ude: try: logger.debug(f"{type(ude).__name__}: {str(ude)} encountered. " - "Data will be decompressed with zlib.decompress().") + "Data will be decompressed with zlib.decompress().") decompressed_data = zlib.decompress(data, zlib.MAX_WBITS | 32) - models.append( - self._parse(data=decompressed_data.decode("utf-8"), model=model) - ) + decoded_data = decompressed_data.decode("utf-8") except Exception: raise ValueError("Unable to decode and/or decompress data.") + models.append(self._parse(data=decoded_data, model=model)) return models From 14724a752f0405c3dc6598bd10fa92369ceffa6c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Artur=20Ma=C5=82as?= Date: Thu, 15 May 2025 08:14:56 +0200 Subject: [PATCH 4/6] style(parser): format new code using make format --- aws_lambda_powertools/utilities/parser/envelopes/kinesis.py | 6 ++++-- tests/unit/parser/_pydantic/test_kinesis.py | 4 +--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/aws_lambda_powertools/utilities/parser/envelopes/kinesis.py b/aws_lambda_powertools/utilities/parser/envelopes/kinesis.py index f1f8f40c111..e154b3f748a 100644 --- a/aws_lambda_powertools/utilities/parser/envelopes/kinesis.py +++ b/aws_lambda_powertools/utilities/parser/envelopes/kinesis.py @@ -53,8 +53,10 @@ def parse(self, data: dict[str, Any] | Any | None, model: type[Model]) -> list[M # which signals that decompression might be required except UnicodeDecodeError as ude: try: - logger.debug(f"{type(ude).__name__}: {str(ude)} encountered. " - "Data will be decompressed with zlib.decompress().") + logger.debug( + f"{type(ude).__name__}: {str(ude)} encountered. " + "Data will be decompressed with zlib.decompress()." + ) decompressed_data = zlib.decompress(data, zlib.MAX_WBITS | 32) decoded_data = decompressed_data.decode("utf-8") except Exception: diff --git a/tests/unit/parser/_pydantic/test_kinesis.py b/tests/unit/parser/_pydantic/test_kinesis.py index 6041e5462a1..f0cd7208a30 100644 --- a/tests/unit/parser/_pydantic/test_kinesis.py +++ b/tests/unit/parser/_pydantic/test_kinesis.py @@ -113,9 +113,7 @@ def test_kinesis_stream_event_with_cloud_watch_logs_data_using_envelope(): raw_event = load_event("kinesisStreamCloudWatchLogsEvent.json") # WHEN parsing using KinesisDataStreamEvelope to CloudWatchLogsDecode - logs = envelopes.KinesisDataStreamEnvelope().parse( - raw_event, CloudWatchLogsDecode - ) + logs = envelopes.KinesisDataStreamEnvelope().parse(raw_event, CloudWatchLogsDecode) # THEN logs should be extracted as CloudWatchLogsDecode objects assert isinstance(logs[0], CloudWatchLogsDecode) From 5d27857e4b7fd79af0818282cb0289e9b1f63a4f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Artur=20Ma=C5=82as?= Date: Thu, 15 May 2025 08:46:21 +0200 Subject: [PATCH 5/6] test(parser): create test for negative test case of parsing CloudWatch Logs from Kinesis envelope --- tests/unit/parser/_pydantic/test_kinesis.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/tests/unit/parser/_pydantic/test_kinesis.py b/tests/unit/parser/_pydantic/test_kinesis.py index f0cd7208a30..d5136d32c84 100644 --- a/tests/unit/parser/_pydantic/test_kinesis.py +++ b/tests/unit/parser/_pydantic/test_kinesis.py @@ -117,3 +117,17 @@ def test_kinesis_stream_event_with_cloud_watch_logs_data_using_envelope(): # THEN logs should be extracted as CloudWatchLogsDecode objects assert isinstance(logs[0], CloudWatchLogsDecode) + + +def test_kinesis_stream_event_with_cloud_watch_logs_data_fails_using_envelope(): + # GIVEN Kinesis Data Stream event with corrupted compressed data + # such as CloudWatch Logs + raw_event = load_event("kinesisStreamCloudWatchLogsEvent.json") + + # WHEN parsing using KinesisDataStreamEvelope to CloudWatchLogsDecode + # and the data is corrupted + raw_event["Records"][0]["kinesis"]["data"] = "123" + + # THEN a ValueError should be thrown + with pytest.raises(ValueError): + envelopes.KinesisDataStreamEnvelope().parse(raw_event, CloudWatchLogsDecode) From 2b25e4629f46c9e53a7330fa8a8a6abc4c11beda Mon Sep 17 00:00:00 2001 From: Leandro Damascena Date: Thu, 15 May 2025 10:59:43 +0100 Subject: [PATCH 6/6] Changing data type --- .../utilities/parser/envelopes/kinesis.py | 9 +++------ tests/unit/parser/_pydantic/test_kinesis.py | 3 +-- 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/aws_lambda_powertools/utilities/parser/envelopes/kinesis.py b/aws_lambda_powertools/utilities/parser/envelopes/kinesis.py index e154b3f748a..41527e03930 100644 --- a/aws_lambda_powertools/utilities/parser/envelopes/kinesis.py +++ b/aws_lambda_powertools/utilities/parser/envelopes/kinesis.py @@ -48,18 +48,15 @@ def parse(self, data: dict[str, Any] | Any | None, model: type[Model]) -> list[M data = cast(bytes, record.kinesis.data) try: decoded_data = data.decode("utf-8") - # If the Data Stream contains compressed data eg. CloudWatch Logs - # `decode` method will throw a UnicodeDecodeError - # which signals that decompression might be required except UnicodeDecodeError as ude: try: logger.debug( f"{type(ude).__name__}: {str(ude)} encountered. " - "Data will be decompressed with zlib.decompress()." + "Data will be decompressed with zlib.decompress().", ) decompressed_data = zlib.decompress(data, zlib.MAX_WBITS | 32) decoded_data = decompressed_data.decode("utf-8") - except Exception: - raise ValueError("Unable to decode and/or decompress data.") + except Exception as e: + raise ValueError("Unable to decode and/or decompress data.") from e models.append(self._parse(data=decoded_data, model=model)) return models diff --git a/tests/unit/parser/_pydantic/test_kinesis.py b/tests/unit/parser/_pydantic/test_kinesis.py index d5136d32c84..9c3e365c5c5 100644 --- a/tests/unit/parser/_pydantic/test_kinesis.py +++ b/tests/unit/parser/_pydantic/test_kinesis.py @@ -126,8 +126,7 @@ def test_kinesis_stream_event_with_cloud_watch_logs_data_fails_using_envelope(): # WHEN parsing using KinesisDataStreamEvelope to CloudWatchLogsDecode # and the data is corrupted - raw_event["Records"][0]["kinesis"]["data"] = "123" - + raw_event["Records"][0]["kinesis"]["data"] = "eyJ4eXoiOiAiYWJjIn0KH4sIAK25JWgAA6tWqqisUrJSUEpMSlaq5QIAbdJPfw8AAAA=" # THEN a ValueError should be thrown with pytest.raises(ValueError): envelopes.KinesisDataStreamEnvelope().parse(raw_event, CloudWatchLogsDecode)