Skip to content

Commit d0b18ed

Browse files
feat(parser): add support to decompress Kinesis CloudWatch logs in Kinesis envelope (#6656)
* feat(parser): decompress data from KinesisDataStreamEnvelope if it contains compressed data * docs<parser>: correct comment about data decompression * refactor<parser>: simplify KinesisDataStreamEnvelope decompression and decoding workflow * style(parser): format new code using make format * test(parser): create test for negative test case of parsing CloudWatch Logs from Kinesis envelope * Changing data type --------- Co-authored-by: Leandro Damascena <[email protected]>
1 parent b3d59ab commit d0b18ed

File tree

2 files changed

+39
-1
lines changed

2 files changed

+39
-1
lines changed

aws_lambda_powertools/utilities/parser/envelopes/kinesis.py

+14-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from __future__ import annotations
22

33
import logging
4+
import zlib
45
from typing import TYPE_CHECKING, Any, cast
56

67
from aws_lambda_powertools.utilities.parser.envelopes.base import BaseEnvelope
@@ -45,5 +46,17 @@ def parse(self, data: dict[str, Any] | Any | None, model: type[Model]) -> list[M
4546
for record in parsed_envelope.Records:
4647
# We allow either AWS expected contract (bytes) or a custom Model, see #943
4748
data = cast(bytes, record.kinesis.data)
48-
models.append(self._parse(data=data.decode("utf-8"), model=model))
49+
try:
50+
decoded_data = data.decode("utf-8")
51+
except UnicodeDecodeError as ude:
52+
try:
53+
logger.debug(
54+
f"{type(ude).__name__}: {str(ude)} encountered. "
55+
"Data will be decompressed with zlib.decompress().",
56+
)
57+
decompressed_data = zlib.decompress(data, zlib.MAX_WBITS | 32)
58+
decoded_data = decompressed_data.decode("utf-8")
59+
except Exception as e:
60+
raise ValueError("Unable to decode and/or decompress data.") from e
61+
models.append(self._parse(data=decoded_data, model=model))
4962
return models

tests/unit/parser/_pydantic/test_kinesis.py

+25
Original file line numberDiff line numberDiff line change
@@ -105,3 +105,28 @@ class DummyModel(BaseModel): ...
105105
for record in stream_data.Records:
106106
record.kinesis.data = DummyModel()
107107
record.decompress_zlib_record_data_as_json()
108+
109+
110+
def test_kinesis_stream_event_with_cloud_watch_logs_data_using_envelope():
111+
# GIVEN Kinesis Data Stream event with compressed data
112+
# such as CloudWatch Logs
113+
raw_event = load_event("kinesisStreamCloudWatchLogsEvent.json")
114+
115+
# WHEN parsing using KinesisDataStreamEvelope to CloudWatchLogsDecode
116+
logs = envelopes.KinesisDataStreamEnvelope().parse(raw_event, CloudWatchLogsDecode)
117+
118+
# THEN logs should be extracted as CloudWatchLogsDecode objects
119+
assert isinstance(logs[0], CloudWatchLogsDecode)
120+
121+
122+
def test_kinesis_stream_event_with_cloud_watch_logs_data_fails_using_envelope():
123+
# GIVEN Kinesis Data Stream event with corrupted compressed data
124+
# such as CloudWatch Logs
125+
raw_event = load_event("kinesisStreamCloudWatchLogsEvent.json")
126+
127+
# WHEN parsing using KinesisDataStreamEvelope to CloudWatchLogsDecode
128+
# and the data is corrupted
129+
raw_event["Records"][0]["kinesis"]["data"] = "eyJ4eXoiOiAiYWJjIn0KH4sIAK25JWgAA6tWqqisUrJSUEpMSlaq5QIAbdJPfw8AAAA="
130+
# THEN a ValueError should be thrown
131+
with pytest.raises(ValueError):
132+
envelopes.KinesisDataStreamEnvelope().parse(raw_event, CloudWatchLogsDecode)

0 commit comments

Comments
 (0)