Skip to content

feat(parser): add support to decompress Kinesis CloudWatch logs in Kinesis envelope #6656

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
May 15, 2025
15 changes: 14 additions & 1 deletion aws_lambda_powertools/utilities/parser/envelopes/kinesis.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import logging
import zlib
from typing import TYPE_CHECKING, Any, cast

from aws_lambda_powertools.utilities.parser.envelopes.base import BaseEnvelope
Expand Down Expand Up @@ -45,5 +46,17 @@ def parse(self, data: dict[str, Any] | Any | None, model: type[Model]) -> list[M
for record in parsed_envelope.Records:
# We allow either AWS expected contract (bytes) or a custom Model, see #943
data = cast(bytes, record.kinesis.data)
models.append(self._parse(data=data.decode("utf-8"), model=model))
try:
decoded_data = data.decode("utf-8")
except UnicodeDecodeError as ude:
try:
logger.debug(
f"{type(ude).__name__}: {str(ude)} encountered. "
"Data will be decompressed with zlib.decompress().",
)
decompressed_data = zlib.decompress(data, zlib.MAX_WBITS | 32)
decoded_data = decompressed_data.decode("utf-8")
except Exception as e:
raise ValueError("Unable to decode and/or decompress data.") from e
models.append(self._parse(data=decoded_data, model=model))
return models
25 changes: 25 additions & 0 deletions tests/unit/parser/_pydantic/test_kinesis.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,3 +105,28 @@ class DummyModel(BaseModel): ...
for record in stream_data.Records:
record.kinesis.data = DummyModel()
record.decompress_zlib_record_data_as_json()


def test_kinesis_stream_event_with_cloud_watch_logs_data_using_envelope():
# GIVEN Kinesis Data Stream event with compressed data
# such as CloudWatch Logs
raw_event = load_event("kinesisStreamCloudWatchLogsEvent.json")

# WHEN parsing using KinesisDataStreamEvelope to CloudWatchLogsDecode
logs = envelopes.KinesisDataStreamEnvelope().parse(raw_event, CloudWatchLogsDecode)

# THEN logs should be extracted as CloudWatchLogsDecode objects
assert isinstance(logs[0], CloudWatchLogsDecode)


def test_kinesis_stream_event_with_cloud_watch_logs_data_fails_using_envelope():
# GIVEN Kinesis Data Stream event with corrupted compressed data
# such as CloudWatch Logs
raw_event = load_event("kinesisStreamCloudWatchLogsEvent.json")

# WHEN parsing using KinesisDataStreamEvelope to CloudWatchLogsDecode
# and the data is corrupted
raw_event["Records"][0]["kinesis"]["data"] = "eyJ4eXoiOiAiYWJjIn0KH4sIAK25JWgAA6tWqqisUrJSUEpMSlaq5QIAbdJPfw8AAAA="
# THEN a ValueError should be thrown
with pytest.raises(ValueError):
envelopes.KinesisDataStreamEnvelope().parse(raw_event, CloudWatchLogsDecode)
Loading