Skip to content

Feature request: KinesisDataStreamEnvelope().parse() method seems to be missing data decompression step #6625

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Artur-T-Malas opened this issue May 8, 2025 · 4 comments · Fixed by #6656
Assignees
Labels
feature-request feature request

Comments

@Artur-T-Malas
Copy link
Contributor

Artur-T-Malas commented May 8, 2025

Expected Behaviour

When provided with an event and a Pydantic model, the envelope's parse() method should correctly parse the data, including decompression of it (as as far as I am aware, it is compressed by the Kinesis itself).

Current Behaviour

My Lambda function receives CloudWatch Logs via a Kinesis Data Stream. When using event parser with the KinesisDataStreamEnvelope either as a annotation above lambda_handler or explicitly calling envelope's parse method, I keep getting a UnicodeDecodeError: 'utf-8' codec can't decode byte 0x8b in position 1: invalid start byte.

After debugging it appears as if there is a decompression step missing in the envelope's parse method between casting to bytes and decoding using 'utf-8'.

Code snippet

# Pydantic models for CloudWatch Logs representation
class CloudWatchLogEvent(BaseModel):
  model_config = ConfigDict(alias_generator=to_camel)

 id: str
 timestamp: str | int
 message: str


class CloudWatchLog(BaseModel):
  model_config = ConfigDict(alias_generator=to_camel)

  message_type: str
  owner: str = Field(pattern=r"^\d{12}$")  # 12 digits pattern
  log_group: str
  log_stream: str
  subscription_filter: list[str]
  log_events: list[CloudWatchLogEvent] = Field(min_length=1)


# Version 1 - not working
@event_parser(model=CloudWatchLog, envelope=envelopes.KinesisDataStreamEnvelope)
def lambda_handler(event: list[CloudWatchLog], context: LambdaContext) -> dict:
  ...


# Version 2 - also not working
def lambda_handler(event: dict, context: LambdaContext) -> dict:
log: list[MyLogModel] = envelopes.KinesisDataStreamEnvelope().parse(
  event, CloudWatchLog
)
...

Possible Solution

When manually parsing Kinesis Data Stream input in a Lambda function and locally (using a saved event), the following code works:

record_data: bytes = base64.b64decode(record['kinesis']['data'])
decompressed_data: bytes = gzip.decompress(record_data)
decoded_data: str = record_data_decode('utf-8')

Fixing the envelope

To fix the envelope itself, importing gzip and adding a line:
data = gzip.decompress(data)
in utilities/parser/envelopes/kinesis.py file between casting to bytes and decoding using 'utf-8' fixes the issue.

Since this could be an edge case, it should also be fine to wrap the models.append(self._parse(data=data.decode('utf-8')... line in a try/except clause catching the UnicodeDecodeError exception and then performing the decompression before trying again to decode using 'utf-8'.

Steps to Reproduce

  1. Have a Kinesis Data Stream moving CloudWatch Logs to a Lambda function (or have a saved event and use it locally)
  2. Use the CloudWatchLog Pydantic model provided in the code snippet
  3. Use the KinesisDataStream envelope with an event parser either as an annotation above the Lambda handler, or explicitly call the parse method

Thank you

Powertools for AWS Lambda (Python) version

3.9.0

AWS Lambda function runtime

3.12

Packaging format used

PyPi

Debugging logs

UnicodeDecodeError: 'utf-8' codec can't decode byte 0x8b in position 1: invalid start byte
@Artur-T-Malas Artur-T-Malas added bug Something isn't working triage Pending triage from maintainers labels May 8, 2025
Copy link

boring-cyborg bot commented May 8, 2025

Thanks for opening your first issue here! We'll come back to you as soon as we can.
In the meantime, check out the #python channel on our Powertools for AWS Lambda Discord: Invite link

@leandrodamascena
Copy link
Contributor

Hi @Artur-T-Malas, thanks for opening this issue and reporting this! While I recognize that one of the possible use cases for Kinesis is ingesting CloudWatch logs, this is not the only use case. I say this because if we assume that we always need to base64 decode + gzip decompress and then return the value, we can break all use cases where customers use Kinesis to not parse CloudWatch logs and using plain text records.

That said, I agree that we should solve this problem and improve this experience, but we need a solution that is able to handle both cases: gziped and plain text.

If you have any solution, I'm more than happy to accept a PR, if not, I'll work this on the next week.

Thanks

@leandrodamascena leandrodamascena changed the title Bug: KinesisDataStreamEnvelope().parse() method seems to be missing data decompression step Feature request: KinesisDataStreamEnvelope().parse() method seems to be missing data decompression step May 12, 2025
@leandrodamascena leandrodamascena added feature-request feature request and removed bug Something isn't working labels May 12, 2025
@leandrodamascena leandrodamascena removed the triage Pending triage from maintainers label May 12, 2025
@Artur-T-Malas
Copy link
Contributor Author

Artur-T-Malas commented May 13, 2025

Hi @leandrodamascena. I've created a PR #6656. To be honest it's my first PR to a project this big, so I'll greatly appreciate tips on what could be better.

I hope that my solution with wrapping it in a try/except block and decompressing only after encountering a UnicodeDecodeError is acceptable.

Copy link
Contributor

⚠️COMMENT VISIBILITY WARNING⚠️

This issue is now closed. Please be mindful that future comments are hard for our team to see.

If you need more assistance, please either tag a team member or open a new issue that references this one.

If you wish to keep having a conversation with other community members under this issue feel free to do so.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature-request feature request
Projects
Status: Coming soon
2 participants