Skip to content

Commit a0f115c

Browse files
feat(batch): add support to SQS FIFO queues (SqsFifoPartialProcessor) (#1934)
Co-authored-by: Heitor Lessa <[email protected]>
1 parent 59de1ec commit a0f115c

File tree

8 files changed

+231
-20
lines changed

8 files changed

+231
-20
lines changed

aws_lambda_powertools/utilities/batch/__init__.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,22 @@
1616
batch_processor,
1717
)
1818
from aws_lambda_powertools.utilities.batch.exceptions import ExceptionInfo
19+
from aws_lambda_powertools.utilities.batch.sqs_fifo_partial_processor import (
20+
SqsFifoPartialProcessor,
21+
)
22+
from aws_lambda_powertools.utilities.batch.types import BatchTypeModels
1923

2024
__all__ = (
2125
"BatchProcessor",
2226
"AsyncBatchProcessor",
2327
"BasePartialProcessor",
2428
"BasePartialBatchProcessor",
29+
"BatchTypeModels",
2530
"ExceptionInfo",
2631
"EventType",
2732
"FailureResponse",
2833
"SuccessResponse",
34+
"SqsFifoPartialProcessor",
2935
"batch_processor",
3036
"async_batch_processor",
3137
)

aws_lambda_powertools/utilities/batch/base.py

Lines changed: 1 addition & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
List,
2020
Optional,
2121
Tuple,
22-
Type,
2322
Union,
2423
overload,
2524
)
@@ -30,6 +29,7 @@
3029
BatchProcessingError,
3130
ExceptionInfo,
3231
)
32+
from aws_lambda_powertools.utilities.batch.types import BatchTypeModels
3333
from aws_lambda_powertools.utilities.data_classes.dynamo_db_stream_event import (
3434
DynamoDBRecord,
3535
)
@@ -48,24 +48,6 @@ class EventType(Enum):
4848
DynamoDBStreams = "DynamoDBStreams"
4949

5050

51-
#
52-
# type specifics
53-
#
54-
has_pydantic = "pydantic" in sys.modules
55-
56-
# For IntelliSense and Mypy to work, we need to account for possible SQS, Kinesis and DynamoDB subclasses
57-
# We need them as subclasses as we must access their message ID or sequence number metadata via dot notation
58-
if has_pydantic:
59-
from aws_lambda_powertools.utilities.parser.models import DynamoDBStreamRecordModel
60-
from aws_lambda_powertools.utilities.parser.models import (
61-
KinesisDataStreamRecord as KinesisDataStreamRecordModel,
62-
)
63-
from aws_lambda_powertools.utilities.parser.models import SqsRecordModel
64-
65-
BatchTypeModels = Optional[
66-
Union[Type[SqsRecordModel], Type[DynamoDBStreamRecordModel], Type[KinesisDataStreamRecordModel]]
67-
]
68-
6951
# When using processor with default arguments, records will carry EventSourceDataClassTypes
7052
# and depending on what EventType it's passed it'll correctly map to the right record
7153
# When using Pydantic Models, it'll accept any subclass from SQS, DynamoDB and Kinesis
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
from typing import List, Optional, Tuple
2+
3+
from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType
4+
from aws_lambda_powertools.utilities.batch.types import BatchSqsTypeModel
5+
6+
7+
class SQSFifoCircuitBreakerError(Exception):
8+
"""
9+
Signals a record not processed due to the SQS FIFO processing being interrupted
10+
"""
11+
12+
pass
13+
14+
15+
class SqsFifoPartialProcessor(BatchProcessor):
16+
"""Process native partial responses from SQS FIFO queues.
17+
18+
Stops processing records when the first record fails. The remaining records are reported as failed items.
19+
20+
Example
21+
_______
22+
23+
## Process batch triggered by a FIFO SQS
24+
25+
```python
26+
import json
27+
28+
from aws_lambda_powertools import Logger, Tracer
29+
from aws_lambda_powertools.utilities.batch import SqsFifoPartialProcessor, EventType, batch_processor
30+
from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord
31+
from aws_lambda_powertools.utilities.typing import LambdaContext
32+
33+
34+
processor = SqsFifoPartialProcessor()
35+
tracer = Tracer()
36+
logger = Logger()
37+
38+
39+
@tracer.capture_method
40+
def record_handler(record: SQSRecord):
41+
payload: str = record.body
42+
if payload:
43+
item: dict = json.loads(payload)
44+
...
45+
46+
@logger.inject_lambda_context
47+
@tracer.capture_lambda_handler
48+
@batch_processor(record_handler=record_handler, processor=processor)
49+
def lambda_handler(event, context: LambdaContext):
50+
return processor.response()
51+
```
52+
"""
53+
54+
circuit_breaker_exc = (
55+
SQSFifoCircuitBreakerError,
56+
SQSFifoCircuitBreakerError("A previous record failed processing"),
57+
None,
58+
)
59+
60+
def __init__(self, model: Optional["BatchSqsTypeModel"] = None):
61+
super().__init__(EventType.SQS, model)
62+
63+
def process(self) -> List[Tuple]:
64+
"""
65+
Call instance's handler for each record. When the first failed message is detected,
66+
the process is short-circuited, and the remaining messages are reported as failed items.
67+
"""
68+
result: List[Tuple] = []
69+
70+
for i, record in enumerate(self.records):
71+
# If we have failed messages, it means that the last message failed.
72+
# We then short circuit the process, failing the remaining messages
73+
if self.fail_messages:
74+
return self._short_circuit_processing(i, result)
75+
76+
# Otherwise, process the message normally
77+
result.append(self._process_record(record))
78+
79+
return result
80+
81+
def _short_circuit_processing(self, first_failure_index: int, result: List[Tuple]) -> List[Tuple]:
82+
"""
83+
Starting from the first failure index, fail all the remaining messages, and append them to the result list.
84+
"""
85+
remaining_records = self.records[first_failure_index:]
86+
for remaining_record in remaining_records:
87+
data = self._to_batch_type(record=remaining_record, event_type=self.event_type, model=self.model)
88+
result.append(self.failure_handler(record=data, exception=self.circuit_breaker_exc))
89+
return result
90+
91+
async def _async_process_record(self, record: dict):
92+
raise NotImplementedError()
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
#
2+
# type specifics
3+
#
4+
import sys
5+
from typing import Optional, Type, Union
6+
7+
has_pydantic = "pydantic" in sys.modules
8+
9+
# For IntelliSense and Mypy to work, we need to account for possible SQS subclasses
10+
# We need them as subclasses as we must access their message ID or sequence number metadata via dot notation
11+
if has_pydantic:
12+
from aws_lambda_powertools.utilities.parser.models import DynamoDBStreamRecordModel
13+
from aws_lambda_powertools.utilities.parser.models import (
14+
KinesisDataStreamRecord as KinesisDataStreamRecordModel,
15+
)
16+
from aws_lambda_powertools.utilities.parser.models import SqsRecordModel
17+
18+
BatchTypeModels = Optional[
19+
Union[Type[SqsRecordModel], Type[DynamoDBStreamRecordModel], Type[KinesisDataStreamRecordModel]]
20+
]
21+
BatchSqsTypeModel = Optional[Type[SqsRecordModel]]
22+
else:
23+
BatchTypeModels = "BatchTypeModels" # type: ignore
24+
BatchSqsTypeModel = "BatchSqsTypeModel" # type: ignore

docs/utilities/batch.md

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -347,6 +347,23 @@ Processing batches from SQS works in four stages:
347347
}
348348
```
349349

350+
#### FIFO queues
351+
352+
When using [SQS FIFO queues](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/FIFO-queues.html){target="_blank"}, we will stop processing messages after the first failure, and return all failed and unprocessed messages in `batchItemFailures`.
353+
This helps preserve the ordering of messages in your queue.
354+
355+
=== "As a decorator"
356+
357+
```python hl_lines="5 11"
358+
--8<-- "examples/batch_processing/src/sqs_fifo_batch_processor.py"
359+
```
360+
361+
=== "As a context manager"
362+
363+
```python hl_lines="4 8"
364+
--8<-- "examples/batch_processing/src/sqs_fifo_batch_processor_context_manager.py"
365+
```
366+
350367
### Processing messages from Kinesis
351368

352369
Processing batches from Kinesis works in four stages:
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
from aws_lambda_powertools import Logger, Tracer
2+
from aws_lambda_powertools.utilities.batch import (
3+
SqsFifoPartialProcessor,
4+
batch_processor,
5+
)
6+
from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord
7+
from aws_lambda_powertools.utilities.typing import LambdaContext
8+
9+
processor = SqsFifoPartialProcessor()
10+
tracer = Tracer()
11+
logger = Logger()
12+
13+
14+
@tracer.capture_method
15+
def record_handler(record: SQSRecord):
16+
...
17+
18+
19+
@logger.inject_lambda_context
20+
@tracer.capture_lambda_handler
21+
@batch_processor(record_handler=record_handler, processor=processor)
22+
def lambda_handler(event, context: LambdaContext):
23+
return processor.response()
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
from aws_lambda_powertools import Logger, Tracer
2+
from aws_lambda_powertools.utilities.batch import SqsFifoPartialProcessor
3+
from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord
4+
from aws_lambda_powertools.utilities.typing import LambdaContext
5+
6+
processor = SqsFifoPartialProcessor()
7+
tracer = Tracer()
8+
logger = Logger()
9+
10+
11+
@tracer.capture_method
12+
def record_handler(record: SQSRecord):
13+
...
14+
15+
16+
@logger.inject_lambda_context
17+
@tracer.capture_lambda_handler
18+
def lambda_handler(event, context: LambdaContext):
19+
batch = event["Records"]
20+
with processor(records=batch, handler=record_handler):
21+
processor.process() # kick off processing, return List[Tuple]
22+
23+
return processor.response()

tests/functional/test_utilities_batch.py

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import json
2+
import uuid
23
from random import randint
34
from typing import Any, Awaitable, Callable, Dict, Optional
45

@@ -9,6 +10,7 @@
910
AsyncBatchProcessor,
1011
BatchProcessor,
1112
EventType,
13+
SqsFifoPartialProcessor,
1214
async_batch_processor,
1315
batch_processor,
1416
)
@@ -40,7 +42,7 @@
4042
def sqs_event_factory() -> Callable:
4143
def factory(body: str):
4244
return {
43-
"messageId": "059f36b4-87a3-44ab-83d2-661975830a7d",
45+
"messageId": f"{uuid.uuid4()}",
4446
"receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a",
4547
"body": body,
4648
"attributes": {
@@ -654,6 +656,48 @@ def lambda_handler(event, context):
654656
assert "All records failed processing. " in str(e.value)
655657

656658

659+
def test_sqs_fifo_batch_processor_middleware_success_only(sqs_event_factory, record_handler):
660+
# GIVEN
661+
first_record = SQSRecord(sqs_event_factory("success"))
662+
second_record = SQSRecord(sqs_event_factory("success"))
663+
event = {"Records": [first_record.raw_event, second_record.raw_event]}
664+
665+
processor = SqsFifoPartialProcessor()
666+
667+
@batch_processor(record_handler=record_handler, processor=processor)
668+
def lambda_handler(event, context):
669+
return processor.response()
670+
671+
# WHEN
672+
result = lambda_handler(event, {})
673+
674+
# THEN
675+
assert result["batchItemFailures"] == []
676+
677+
678+
def test_sqs_fifo_batch_processor_middleware_with_failure(sqs_event_factory, record_handler):
679+
# GIVEN
680+
first_record = SQSRecord(sqs_event_factory("success"))
681+
second_record = SQSRecord(sqs_event_factory("fail"))
682+
# this would normally succeed, but since it's a FIFO queue, it will be marked as failure
683+
third_record = SQSRecord(sqs_event_factory("success"))
684+
event = {"Records": [first_record.raw_event, second_record.raw_event, third_record.raw_event]}
685+
686+
processor = SqsFifoPartialProcessor()
687+
688+
@batch_processor(record_handler=record_handler, processor=processor)
689+
def lambda_handler(event, context):
690+
return processor.response()
691+
692+
# WHEN
693+
result = lambda_handler(event, {})
694+
695+
# THEN
696+
assert len(result["batchItemFailures"]) == 2
697+
assert result["batchItemFailures"][0]["itemIdentifier"] == second_record.message_id
698+
assert result["batchItemFailures"][1]["itemIdentifier"] == third_record.message_id
699+
700+
657701
def test_async_batch_processor_middleware_success_only(sqs_event_factory, async_record_handler):
658702
# GIVEN
659703
first_record = SQSRecord(sqs_event_factory("success"))

0 commit comments

Comments
 (0)