Skip to content

feat(parser): add support to decompress Kinesis CloudWatch logs in Kinesis envelope #6656

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
May 15, 2025

Conversation

Artur-T-Malas
Copy link
Contributor

@Artur-T-Malas Artur-T-Malas commented May 13, 2025

Issue number: #6625

Summary

This feature allows parsing of CloudWatch Logs data using a KinesisDataStreamEnvelope.

Changes

In the utilities/parser/envelopes/kinesis.py file: Extracted and wrapped the data.decode("utf-8") operation from models.append(self._parse(data=data.decode("utf-8"), model=model)) line, inside a try/except block, to catch the UnicodeDecodeError. This error indicates that the data might be compressed, so in the except block there is another try/except block added with a decompression step before the decoding.

# We allow either AWS expected contract (bytes) or a custom Model, see #943
            data = cast(bytes, record.kinesis.data)
            try:
                decoded_data = data.decode("utf-8")
            # If the Data Stream contains compressed data eg. CloudWatch Logs
            # `decode` method will throw a UnicodeDecodeError
            # which signals that decompression might be required
            except UnicodeDecodeError as ude:
                try:
                    logger.debug(f"{type(ude).__name__}: {str(ude)} encountered. "
                                 "Data will be decompressed with zlib.decompress().")
                    decompressed_data = zlib.decompress(data, zlib.MAX_WBITS | 32)
                    decoded_data = decompressed_data.decode("utf-8")
                except Exception:
                    raise ValueError("Unable to decode and/or decompress data.")
            models.append(self._parse(data=decoded_data, model=model))

Added a test in the tests/unit/parser/_pydantic/test_kinesis.py file:

def test_kinesis_stream_event_with_cloudwatch_logs_data_using_envelope():
    # GIVEN Kinesis Data Stream event with compressed CloudWatch Logs
    raw_event = load_event("kinesisStreamCloudWatchLogsEvent.json")

    # WHEN parsing using KinesisDataStreamEvelope
    logs = envelopes.KinesisDataStreamEnvelope().parse(
        raw_event, CloudWatchLogsDecode
    )

    # THEN logs should be extracted as CloudWatchLogsDecode objects
    assert isinstance(logs[0], CloudWatchLogsDecode)

User experience

Before, as reported in the issue #6648, trying to parse an event using KinesisDataStreamEnvelope to get CloudWatch Logs compressed data resulted in the UnicodeDecodeError being thrown.

This change enables that workflow.

Checklist

If your change doesn't seem to apply, please leave them unchecked.

Is this a breaking change?

RFC issue number:

Checklist:

  • Migration process documented
  • Implement warnings (if it can live side by side)

Acknowledgment

By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.

Disclaimer: We value your time and bandwidth. As such, any pull requests created on non-triaged issues might not be successful.

@Artur-T-Malas Artur-T-Malas requested a review from a team as a code owner May 13, 2025 09:31
@Artur-T-Malas Artur-T-Malas requested a review from anafalcao May 13, 2025 09:31
@boring-cyborg boring-cyborg bot added the tests label May 13, 2025
@pull-request-size pull-request-size bot added the size/M Denotes a PR that changes 30-99 lines, ignoring generated files. label May 13, 2025
Copy link

boring-cyborg bot commented May 13, 2025

Thanks a lot for your first contribution! Please check out our contributing guidelines and don't hesitate to ask whatever you need.
In the meantime, check out the #python channel on our Powertools for AWS Lambda Discord: Invite link

@leandrodamascena leandrodamascena changed the title feat(parser): decompress data from KinesisDataStreamEnvelope if it contains compressed data eg. CloudWatch Logs feat(parser): add support to decompress Kinesis CloudWatch compressed logs May 14, 2025
Copy link
Contributor

@leandrodamascena leandrodamascena left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @Artur-T-Malas, thanks for working on this PR! Overall it looks good to me, but our CI is failing with some linting error. Can you run make format and commit again? I tried that, but you didn't give me access to modify your branch.

@github-actions github-actions bot added the feature New feature or functionality label May 14, 2025
@leandrodamascena leandrodamascena changed the title feat(parser): add support to decompress Kinesis CloudWatch compressed logs feat(parser): add support to decompress Kinesis CloudWatch logs May 14, 2025
@leandrodamascena leandrodamascena changed the title feat(parser): add support to decompress Kinesis CloudWatch logs feat(parser): add support to decompress Kinesis CloudWatch logs in Kinesis envelope May 14, 2025
@Artur-T-Malas
Copy link
Contributor Author

Hi @leandrodamascena, thank you. I ran make format and committed the changes.

Copy link

codecov bot commented May 15, 2025

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 96.11%. Comparing base (98284c8) to head (2b25e46).
Report is 4 commits behind head on develop.

Additional details and impacted files
@@           Coverage Diff            @@
##           develop    #6656   +/-   ##
========================================
  Coverage    96.11%   96.11%           
========================================
  Files          253      253           
  Lines        12160    12170   +10     
  Branches       904      904           
========================================
+ Hits         11687    11697   +10     
  Misses         371      371           
  Partials       102      102           

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Copy link
Contributor

@leandrodamascena leandrodamascena left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @Artur-T-Malas, we are almost there! There is a test missing where customer uses data that is not binary and not Gizp. You can check the missing lines here: https://app.codecov.io/gh/aws-powertools/powertools-lambda-python/pull/6656?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=aws-powertools

I could push a commit to fix this and merge this PR, but you haven't given access to your branch, so please fix this before we merge.

@leandrodamascena
Copy link
Contributor

Hi @Artur-T-Malas, I saw your invite to collaborate on your repository, but we don't actually need that. Just give me permission on your branch: https://docs.github.com/en/pull-requests/collaborating-with-pull-requests/working-with-forks/allowing-changes-to-a-pull-request-branch-created-from-a-fork

Thanks

@Artur-T-Malas
Copy link
Contributor Author

Hi @leandrodamascena! I've read the support article which you've linked, and the "Allow edits by maintainers" option seems to be already enabled. Are there any additional steps to it which I'm missing? My apologies, I'm very new to collaborating on GitHub.

About the missing test coverage on the lines 62 and 63

except Exception:
                    raise ValueError("Unable to decode and/or decompress data.")

I believe that I have covered them in this test, which modifies the data of the record to a plain string of 123 instead of bytes or compressed bytes:

def test_kinesis_stream_event_with_cloud_watch_logs_data_fails_using_envelope():
    # GIVEN Kinesis Data Stream event with corrupted compressed data
    # such as CloudWatch Logs
    raw_event = load_event("kinesisStreamCloudWatchLogsEvent.json")

    # WHEN parsing using KinesisDataStreamEvelope to CloudWatchLogsDecode
    # and the data is corrupted
    raw_event["Records"][0]["kinesis"]["data"] = "123"

    # THEN a ValueError should be thrown
    with pytest.raises(ValueError):
        envelopes.KinesisDataStreamEnvelope().parse(raw_event, CloudWatchLogsDecode)

Please correct me if I'm wrong or I have misunderstood your comment. Thank you!

@leandrodamascena
Copy link
Contributor

Hi @leandrodamascena! I've read the support article which you've linked, and the "Allow edits by maintainers" option seems to be already enabled. Are there any additional steps to it which I'm missing? My apologies, I'm very new to collaborating on GitHub.

Hi, it's working now and no need to apologize, we love it when people come together to contribute to Powertools. Feel free to pick any issue to solve, I'll be happy to help.

About the missing test coverage on the lines 62 and 63

except Exception:
                    raise ValueError("Unable to decode and/or decompress data.")

I believe that I have covered them in this test, which modifies the data of the record to a plain string of 123 instead of bytes or compressed bytes:

def test_kinesis_stream_event_with_cloud_watch_logs_data_fails_using_envelope():
    # GIVEN Kinesis Data Stream event with corrupted compressed data
    # such as CloudWatch Logs
    raw_event = load_event("kinesisStreamCloudWatchLogsEvent.json")

    # WHEN parsing using KinesisDataStreamEvelope to CloudWatchLogsDecode
    # and the data is corrupted
    raw_event["Records"][0]["kinesis"]["data"] = "123"

    # THEN a ValueError should be thrown
    with pytest.raises(ValueError):
        envelopes.KinesisDataStreamEnvelope().parse(raw_event, CloudWatchLogsDecode)

Please correct me if I'm wrong or I have misunderstood your comment. Thank you!

Actually you are right, this test covers this scenario, but the problem was that you were sending plain text data in the data record and it was expecting it to be base64 encoded, so it was failing before it got to that part of the code. I pushed a commit.

Thanks a lot for working on this.

@Artur-T-Malas
Copy link
Contributor Author

I'm glad that I could contribute to this amazing tool!

Thank you for explaining the test issue to me and fixing it. Indeed the base64 encoding must've escaped me.

@leandrodamascena leandrodamascena self-requested a review May 15, 2025 10:24
Copy link
Contributor

@leandrodamascena leandrodamascena left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Approved!!!

@leandrodamascena leandrodamascena merged commit d0b18ed into aws-powertools:develop May 15, 2025
12 checks passed
Copy link

boring-cyborg bot commented May 15, 2025

Awesome work, congrats on your first merged pull request and thank you for helping improve everyone's experience!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature New feature or functionality size/M Denotes a PR that changes 30-99 lines, ignoring generated files. tests
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Feature request: KinesisDataStreamEnvelope().parse() method seems to be missing data decompression step
3 participants