diff --git a/aws_lambda_powertools/utilities/parser/envelopes/kinesis.py b/aws_lambda_powertools/utilities/parser/envelopes/kinesis.py index a4d484931df..41527e03930 100644 --- a/aws_lambda_powertools/utilities/parser/envelopes/kinesis.py +++ b/aws_lambda_powertools/utilities/parser/envelopes/kinesis.py @@ -1,6 +1,7 @@ from __future__ import annotations import logging +import zlib from typing import TYPE_CHECKING, Any, cast from aws_lambda_powertools.utilities.parser.envelopes.base import BaseEnvelope @@ -45,5 +46,17 @@ def parse(self, data: dict[str, Any] | Any | None, model: type[Model]) -> list[M for record in parsed_envelope.Records: # We allow either AWS expected contract (bytes) or a custom Model, see #943 data = cast(bytes, record.kinesis.data) - models.append(self._parse(data=data.decode("utf-8"), model=model)) + try: + decoded_data = data.decode("utf-8") + except UnicodeDecodeError as ude: + try: + logger.debug( + f"{type(ude).__name__}: {str(ude)} encountered. " + "Data will be decompressed with zlib.decompress().", + ) + decompressed_data = zlib.decompress(data, zlib.MAX_WBITS | 32) + decoded_data = decompressed_data.decode("utf-8") + except Exception as e: + raise ValueError("Unable to decode and/or decompress data.") from e + models.append(self._parse(data=decoded_data, model=model)) return models diff --git a/tests/unit/parser/_pydantic/test_kinesis.py b/tests/unit/parser/_pydantic/test_kinesis.py index 9da19ed3e0b..9c3e365c5c5 100644 --- a/tests/unit/parser/_pydantic/test_kinesis.py +++ b/tests/unit/parser/_pydantic/test_kinesis.py @@ -105,3 +105,28 @@ class DummyModel(BaseModel): ... for record in stream_data.Records: record.kinesis.data = DummyModel() record.decompress_zlib_record_data_as_json() + + +def test_kinesis_stream_event_with_cloud_watch_logs_data_using_envelope(): + # GIVEN Kinesis Data Stream event with compressed data + # such as CloudWatch Logs + raw_event = load_event("kinesisStreamCloudWatchLogsEvent.json") + + # WHEN parsing using KinesisDataStreamEvelope to CloudWatchLogsDecode + logs = envelopes.KinesisDataStreamEnvelope().parse(raw_event, CloudWatchLogsDecode) + + # THEN logs should be extracted as CloudWatchLogsDecode objects + assert isinstance(logs[0], CloudWatchLogsDecode) + + +def test_kinesis_stream_event_with_cloud_watch_logs_data_fails_using_envelope(): + # GIVEN Kinesis Data Stream event with corrupted compressed data + # such as CloudWatch Logs + raw_event = load_event("kinesisStreamCloudWatchLogsEvent.json") + + # WHEN parsing using KinesisDataStreamEvelope to CloudWatchLogsDecode + # and the data is corrupted + raw_event["Records"][0]["kinesis"]["data"] = "eyJ4eXoiOiAiYWJjIn0KH4sIAK25JWgAA6tWqqisUrJSUEpMSlaq5QIAbdJPfw8AAAA=" + # THEN a ValueError should be thrown + with pytest.raises(ValueError): + envelopes.KinesisDataStreamEnvelope().parse(raw_event, CloudWatchLogsDecode)