diff --git a/Makefile b/Makefile index 0ee0ee76fbd..f030e4289ee 100644 --- a/Makefile +++ b/Makefile @@ -90,3 +90,11 @@ changelog: mypy: poetry run mypy --pretty aws_lambda_powertools + +format-examples: + poetry run isort docs/examples + poetry run black docs/examples/*/*/*.py + +lint-examples: + poetry run python3 -m py_compile docs/examples/*/*/*.py + cfn-lint docs/examples/*/*/*.yml diff --git a/docs/examples/utilities/idempotency/batch_sample.py b/docs/examples/utilities/idempotency/batch_sample.py new file mode 100644 index 00000000000..f12d9d3040b --- /dev/null +++ b/docs/examples/utilities/idempotency/batch_sample.py @@ -0,0 +1,27 @@ +from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor +from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord +from aws_lambda_powertools.utilities.idempotency import DynamoDBPersistenceLayer, IdempotencyConfig, idempotent_function + +processor = BatchProcessor(event_type=EventType.SQS) +dynamodb = DynamoDBPersistenceLayer(table_name="idem") +config = IdempotencyConfig( + event_key_jmespath="messageId", # see Choosing a payload subset section + use_local_cache=True, +) + + +@idempotent_function(data_keyword_argument="record", config=config, persistence_store=dynamodb) +def record_handler(record: SQSRecord): + return {"message": record["body"]} + + +@idempotent_function(data_keyword_argument="data", config=config, persistence_store=dynamodb) +def dummy(arg_one, arg_two, data: dict, **kwargs): + return {"data": data} + + +@batch_processor(record_handler=record_handler, processor=processor) +def lambda_handler(event, context): + # `data` parameter must be called as a keyword argument to work + dummy("hello", "universe", data="test") + return processor.response() diff --git a/docs/examples/utilities/idempotency/bring_your_own_persistent_store.py b/docs/examples/utilities/idempotency/bring_your_own_persistent_store.py new file mode 100644 index 00000000000..0aa2c6c1f45 --- /dev/null +++ b/docs/examples/utilities/idempotency/bring_your_own_persistent_store.py @@ -0,0 +1,126 @@ +import datetime +import logging +from typing import Any, Dict, Optional + +import boto3 +from botocore.config import Config + +from aws_lambda_powertools.utilities.idempotency import BasePersistenceLayer +from aws_lambda_powertools.utilities.idempotency.exceptions import ( + IdempotencyItemAlreadyExistsError, + IdempotencyItemNotFoundError, +) +from aws_lambda_powertools.utilities.idempotency.persistence.base import DataRecord + +logger = logging.getLogger(__name__) + + +class DynamoDBPersistenceLayer(BasePersistenceLayer): + def __init__( + self, + table_name: str, + key_attr: str = "id", + expiry_attr: str = "expiration", + status_attr: str = "status", + data_attr: str = "data", + validation_key_attr: str = "validation", + boto_config: Optional[Config] = None, + boto3_session: Optional[boto3.session.Session] = None, + ): + boto_config = boto_config or Config() + session = boto3_session or boto3.session.Session() + self._ddb_resource = session.resource("dynamodb", config=boto_config) + self.table_name = table_name + self.table = self._ddb_resource.Table(self.table_name) + self.key_attr = key_attr + self.expiry_attr = expiry_attr + self.status_attr = status_attr + self.data_attr = data_attr + self.validation_key_attr = validation_key_attr + super(DynamoDBPersistenceLayer, self).__init__() + + def _item_to_data_record(self, item: Dict[str, Any]) -> DataRecord: + """ + Translate raw item records from DynamoDB to DataRecord + + Parameters + ---------- + item: Dict[str, Union[str, int]] + Item format from dynamodb response + + Returns + ------- + DataRecord + representation of item + + """ + return DataRecord( + idempotency_key=item[self.key_attr], + status=item[self.status_attr], + expiry_timestamp=item[self.expiry_attr], + response_data=item.get(self.data_attr), + payload_hash=item.get(self.validation_key_attr), + ) + + def _get_record(self, idempotency_key) -> DataRecord: + response = self.table.get_item(Key={self.key_attr: idempotency_key}, ConsistentRead=True) + + try: + item = response["Item"] + except KeyError: + raise IdempotencyItemNotFoundError + return self._item_to_data_record(item) + + def _put_record(self, data_record: DataRecord) -> None: + item = { + self.key_attr: data_record.idempotency_key, + self.expiry_attr: data_record.expiry_timestamp, + self.status_attr: data_record.status, + } + + if self.payload_validation_enabled: + item[self.validation_key_attr] = data_record.payload_hash + + now = datetime.datetime.now() + try: + logger.debug(f"Putting record for idempotency key: {data_record.idempotency_key}") + self.table.put_item( + Item=item, + ConditionExpression=f"attribute_not_exists({self.key_attr}) OR {self.expiry_attr} < :now", + ExpressionAttributeValues={":now": int(now.timestamp())}, + ) + except self._ddb_resource.meta.client.exceptions.ConditionalCheckFailedException: + logger.debug(f"Failed to put record for already existing idempotency key: {data_record.idempotency_key}") + raise IdempotencyItemAlreadyExistsError + + def _update_record(self, data_record: DataRecord): + logger.debug(f"Updating record for idempotency key: {data_record.idempotency_key}") + update_expression = "SET #response_data = :response_data, #expiry = :expiry, #status = :status" + expression_attr_values = { + ":expiry": data_record.expiry_timestamp, + ":response_data": data_record.response_data, + ":status": data_record.status, + } + expression_attr_names = { + "#response_data": self.data_attr, + "#expiry": self.expiry_attr, + "#status": self.status_attr, + } + + if self.payload_validation_enabled: + update_expression += ", #validation_key = :validation_key" + expression_attr_values[":validation_key"] = data_record.payload_hash + expression_attr_names["#validation_key"] = self.validation_key_attr + + kwargs = { + "Key": {self.key_attr: data_record.idempotency_key}, + "UpdateExpression": update_expression, + "ExpressionAttributeValues": expression_attr_values, + "ExpressionAttributeNames": expression_attr_names, + } + + self.table.update_item(**kwargs) + + def _delete_record(self, data_record: DataRecord) -> None: + logger.debug(f"Deleting record for idempotency key: {data_record.idempotency_key}") + self.table.delete_item(Key={self.key_attr: data_record.idempotency_key}) diff --git a/docs/examples/utilities/idempotency/dataclass_sample.py b/docs/examples/utilities/idempotency/dataclass_sample.py new file mode 100644 index 00000000000..aa8f7f8dc9f --- /dev/null +++ b/docs/examples/utilities/idempotency/dataclass_sample.py @@ -0,0 +1,33 @@ +from dataclasses import dataclass + +from aws_lambda_powertools.utilities.idempotency import DynamoDBPersistenceLayer, IdempotencyConfig, idempotent_function + +dynamodb = DynamoDBPersistenceLayer(table_name="idem") +config = IdempotencyConfig( + event_key_jmespath="order_id", # see Choosing a payload subset section + use_local_cache=True, +) + + +@dataclass +class OrderItem: + sku: str + description: str + + +@dataclass +class Order: + item: OrderItem + order_id: int + + +@idempotent_function(data_keyword_argument="order", config=config, persistence_store=dynamodb) +def process_order(order: Order): + return f"processed order {order.order_id}" + + +order_item = OrderItem(sku="fake", description="sample") +order = Order(item=order_item, order_id="fake-id") + +# `order` parameter must be called as a keyword argument to work +process_order(order=order) diff --git a/docs/examples/utilities/idempotency/dynamodb_persistence_layer_customization.py b/docs/examples/utilities/idempotency/dynamodb_persistence_layer_customization.py new file mode 100644 index 00000000000..d0d97c41eef --- /dev/null +++ b/docs/examples/utilities/idempotency/dynamodb_persistence_layer_customization.py @@ -0,0 +1,10 @@ +from aws_lambda_powertools.utilities.idempotency import DynamoDBPersistenceLayer + +persistence_layer = DynamoDBPersistenceLayer( + table_name="IdempotencyTable", + key_attr="idempotency_key", + expiry_attr="expires_at", + status_attr="current_status", + data_attr="result_data", + validation_key_attr="validation_key", +) diff --git a/docs/examples/utilities/idempotency/idempotency_cache_ttl.py b/docs/examples/utilities/idempotency/idempotency_cache_ttl.py new file mode 100644 index 00000000000..fee48e48d86 --- /dev/null +++ b/docs/examples/utilities/idempotency/idempotency_cache_ttl.py @@ -0,0 +1,12 @@ +from aws_lambda_powertools.utilities.idempotency import DynamoDBPersistenceLayer, IdempotencyConfig, idempotent + +persistence_layer = DynamoDBPersistenceLayer(table_name="IdempotencyTable") +config = IdempotencyConfig( + event_key_jmespath="body", + expires_after_seconds=5 * 60, # 5 minutes +) + + +@idempotent(config=config, persistence_store=persistence_layer) +def handler(event, context): + ... diff --git a/docs/examples/utilities/idempotency/idempotency_composite_primary_key.py b/docs/examples/utilities/idempotency/idempotency_composite_primary_key.py new file mode 100644 index 00000000000..9652c1681c3 --- /dev/null +++ b/docs/examples/utilities/idempotency/idempotency_composite_primary_key.py @@ -0,0 +1,11 @@ +from aws_lambda_powertools.utilities.idempotency import DynamoDBPersistenceLayer, idempotent + +persistence_layer = DynamoDBPersistenceLayer( + table_name="IdempotencyTable", + sort_key_attr="sort_key", +) + + +@idempotent(persistence_store=persistence_layer) +def handler(event, context): + return {"message": "success", "id": event["body"]["id"]} diff --git a/docs/examples/utilities/idempotency/idempotency_custom_config.py b/docs/examples/utilities/idempotency/idempotency_custom_config.py new file mode 100644 index 00000000000..2230bc13b29 --- /dev/null +++ b/docs/examples/utilities/idempotency/idempotency_custom_config.py @@ -0,0 +1,15 @@ +from botocore.config import Config + +from aws_lambda_powertools.utilities.idempotency import DynamoDBPersistenceLayer, IdempotencyConfig, idempotent + +config = IdempotencyConfig(event_key_jmespath="body") +boto_config = Config() +persistence_layer = DynamoDBPersistenceLayer( + table_name="IdempotencyTable", + boto_config=boto_config, +) + + +@idempotent(config=config, persistence_store=persistence_layer) +def handler(event, context): + ... diff --git a/docs/examples/utilities/idempotency/idempotency_custom_session.py b/docs/examples/utilities/idempotency/idempotency_custom_session.py new file mode 100644 index 00000000000..05e072160c2 --- /dev/null +++ b/docs/examples/utilities/idempotency/idempotency_custom_session.py @@ -0,0 +1,16 @@ +import boto3 + +from aws_lambda_powertools.utilities.idempotency import DynamoDBPersistenceLayer, IdempotencyConfig, idempotent + +boto3_session = boto3.session.Session() +persistence_layer = DynamoDBPersistenceLayer( + table_name="IdempotencyTable", + boto3_session=boto3_session, +) + +config = IdempotencyConfig(event_key_jmespath="body") + + +@idempotent(config=config, persistence_store=persistence_layer) +def handler(event, context): + ... diff --git a/docs/examples/utilities/idempotency/idempotency_exception_sample.py b/docs/examples/utilities/idempotency/idempotency_exception_sample.py new file mode 100644 index 00000000000..7df004a2aef --- /dev/null +++ b/docs/examples/utilities/idempotency/idempotency_exception_sample.py @@ -0,0 +1,24 @@ +import requests + +from aws_lambda_powertools.utilities.idempotency import DynamoDBPersistenceLayer, IdempotencyConfig, idempotent_function + +dynamodb = DynamoDBPersistenceLayer(table_name="idem") +config = IdempotencyConfig(event_key_jmespath="order_id") + + +def lambda_handler(event, context): + # If an exception is raised here, no idempotent record will ever get created as the + # idempotent function does not get called + do_some_stuff() + + result = call_external_service(data={"user": "user1", "id": 5}) + + # This exception will not cause the idempotent record to be deleted, since it + # happens after the decorated function has been successfully called + raise Exception + + +@idempotent_function(data_keyword_argument="data", config=config, persistence_store=dynamodb) +def call_external_service(data: dict, **kwargs): + result = requests.post("http://example.com", json={"user": data["user"], "transaction_id": data["id"]}) + return result.json() diff --git a/docs/examples/utilities/idempotency/idempotency_in_memory_cache.py b/docs/examples/utilities/idempotency/idempotency_in_memory_cache.py new file mode 100644 index 00000000000..ad10117b651 --- /dev/null +++ b/docs/examples/utilities/idempotency/idempotency_in_memory_cache.py @@ -0,0 +1,12 @@ +from aws_lambda_powertools.utilities.idempotency import DynamoDBPersistenceLayer, IdempotencyConfig, idempotent + +persistence_layer = DynamoDBPersistenceLayer(table_name="IdempotencyTable") +config = IdempotencyConfig( + event_key_jmespath="body", + use_local_cache=True, +) + + +@idempotent(config=config, persistence_store=persistence_layer) +def handler(event, context): + ... diff --git a/docs/examples/utilities/idempotency/idempotency_key_required.py b/docs/examples/utilities/idempotency/idempotency_key_required.py new file mode 100644 index 00000000000..47212dd48d2 --- /dev/null +++ b/docs/examples/utilities/idempotency/idempotency_key_required.py @@ -0,0 +1,14 @@ +from aws_lambda_powertools.utilities.idempotency import DynamoDBPersistenceLayer, IdempotencyConfig, idempotent + +persistence_layer = DynamoDBPersistenceLayer(table_name="IdempotencyTable") + +# Requires "user"."uid" and "order_id" to be present +config = IdempotencyConfig( + event_key_jmespath="[user.uid, order_id]", + raise_on_no_idempotency_key=True, +) + + +@idempotent(config=config, persistence_store=persistence_layer) +def handler(event, context): + ... diff --git a/docs/examples/utilities/idempotency/idempotency_payload_validation.py b/docs/examples/utilities/idempotency/idempotency_payload_validation.py new file mode 100644 index 00000000000..50396c93ddd --- /dev/null +++ b/docs/examples/utilities/idempotency/idempotency_payload_validation.py @@ -0,0 +1,25 @@ +from aws_lambda_powertools.utilities.idempotency import DynamoDBPersistenceLayer, IdempotencyConfig, idempotent + +config = IdempotencyConfig( + event_key_jmespath="[userDetail, productId]", + payload_validation_jmespath="amount", +) +persistence_layer = DynamoDBPersistenceLayer(table_name="IdempotencyTable") + + +@idempotent(config=config, persistence_store=persistence_layer) +def handler(event, context): + # Creating a subscription payment is a side + # effect of calling this function! + payment = create_subscription_payment( + user=event["userDetail"]["username"], + product=event["product_id"], + amount=event["amount"], + ) + ... + return { + "message": "success", + "statusCode": 200, + "payment_id": payment.id, + "amount": payment.amount, + } diff --git a/docs/examples/utilities/idempotency/idempotency_with_validator.py b/docs/examples/utilities/idempotency/idempotency_with_validator.py new file mode 100644 index 00000000000..d98efc18732 --- /dev/null +++ b/docs/examples/utilities/idempotency/idempotency_with_validator.py @@ -0,0 +1,12 @@ +from aws_lambda_powertools.utilities.idempotency import DynamoDBPersistenceLayer, IdempotencyConfig, idempotent +from aws_lambda_powertools.utilities.validation import envelopes, validator + +config = IdempotencyConfig(event_key_jmespath="[message, username]") +persistence_layer = DynamoDBPersistenceLayer(table_name="IdempotencyTable") + + +@validator(envelope=envelopes.API_GATEWAY_HTTP) +@idempotent(config=config, persistence_store=persistence_layer) +def lambda_handler(event, context): + cause_some_side_effects(event["username"]) + return {"message": event["message"], "statusCode": 200} diff --git a/docs/examples/utilities/idempotency/idempotent_decorator.py b/docs/examples/utilities/idempotency/idempotent_decorator.py new file mode 100644 index 00000000000..d3e78d2d94c --- /dev/null +++ b/docs/examples/utilities/idempotency/idempotent_decorator.py @@ -0,0 +1,14 @@ +from aws_lambda_powertools.utilities.idempotency import DynamoDBPersistenceLayer, idempotent + +persistence_layer = DynamoDBPersistenceLayer(table_name="IdempotencyTable") + + +@idempotent(persistence_store=persistence_layer) +def handler(event, context): + payment = create_subscription_payment(user=event["user"], product=event["product_id"]) + ... + return { + "payment_id": payment.id, + "message": "success", + "statusCode": 200, + } diff --git a/docs/examples/utilities/idempotency/parser_pydantic_sample.py b/docs/examples/utilities/idempotency/parser_pydantic_sample.py new file mode 100644 index 00000000000..6df8d3dcf35 --- /dev/null +++ b/docs/examples/utilities/idempotency/parser_pydantic_sample.py @@ -0,0 +1,30 @@ +from aws_lambda_powertools.utilities.idempotency import DynamoDBPersistenceLayer, IdempotencyConfig, idempotent_function +from aws_lambda_powertools.utilities.parser import BaseModel + +dynamodb = DynamoDBPersistenceLayer(table_name="idem") +config = IdempotencyConfig( + event_key_jmespath="order_id", # see Choosing a payload subset section + use_local_cache=True, +) + + +class OrderItem(BaseModel): + sku: str + description: str + + +class Order(BaseModel): + item: OrderItem + order_id: int + + +@idempotent_function(data_keyword_argument="order", config=config, persistence_store=dynamodb) +def process_order(order: Order): + return f"processed order {order.order_id}" + + +order_item = OrderItem(sku="fake", description="sample") +order = Order(item=order_item, order_id="fake-id") + +# `order` parameter must be called as a keyword argument to work +process_order(order=order) diff --git a/docs/examples/utilities/idempotency/payment.py b/docs/examples/utilities/idempotency/payment.py new file mode 100644 index 00000000000..c2711b5e2f5 --- /dev/null +++ b/docs/examples/utilities/idempotency/payment.py @@ -0,0 +1,20 @@ +import json + +from aws_lambda_powertools.utilities.idempotency import DynamoDBPersistenceLayer, IdempotencyConfig, idempotent + +persistence_layer = DynamoDBPersistenceLayer(table_name="IdempotencyTable") + +# Treat everything under the "body" key +# in the event json object as our payload +config = IdempotencyConfig(event_key_jmespath="powertools_json(body)") + + +@idempotent(config=config, persistence_store=persistence_layer) +def handler(event, context): + body = json.loads(event["body"]) + payment = create_subscription_payment( + user=body["user"], + product=body["product_id"], + ) + ... + return {"payment_id": payment.id, "message": "success", "statusCode": 200} diff --git a/docs/examples/utilities/idempotency/template.yml b/docs/examples/utilities/idempotency/template.yml new file mode 100644 index 00000000000..549f2db9cc6 --- /dev/null +++ b/docs/examples/utilities/idempotency/template.yml @@ -0,0 +1,26 @@ +AWSTemplateFormatVersion: "2010-09-09" +Transform: AWS::Serverless-2016-10-31 +Resources: + IdempotencyTable: + Type: AWS::DynamoDB::Table + Properties: + AttributeDefinitions: + - AttributeName: id + AttributeType: S + KeySchema: + - AttributeName: id + KeyType: HASH + TimeToLiveSpecification: + AttributeName: expiration + Enabled: true + BillingMode: PAY_PER_REQUEST + + HelloWorldFunction: + Type: AWS::Serverless::Function + Properties: + CodeUri: src/ + Handler: app.lambda_handler + Runtime: python3.9 + Policies: + - DynamoDBCrudPolicy: + TableName: !Ref IdempotencyTable diff --git a/docs/examples/utilities/idempotency/testing_idempotency_disabled_app.py b/docs/examples/utilities/idempotency/testing_idempotency_disabled_app.py new file mode 100644 index 00000000000..d6fa87ea124 --- /dev/null +++ b/docs/examples/utilities/idempotency/testing_idempotency_disabled_app.py @@ -0,0 +1,13 @@ +from aws_lambda_powertools.utilities.idempotency import DynamoDBPersistenceLayer, idempotent + +persistence_layer = DynamoDBPersistenceLayer(table_name="idempotency") + + +@idempotent(persistence_store=persistence_layer) +def handler(event, context): + print("expensive operation") + return { + "payment_id": 12345, + "message": "success", + "statusCode": 200, + } diff --git a/docs/examples/utilities/idempotency/testing_idempotency_disabled_test.py b/docs/examples/utilities/idempotency/testing_idempotency_disabled_test.py new file mode 100644 index 00000000000..2bbd960f18d --- /dev/null +++ b/docs/examples/utilities/idempotency/testing_idempotency_disabled_test.py @@ -0,0 +1,9 @@ +from app import handler + + +def test_idempotent_lambda_handler(monkeypatch): + # Set POWERTOOLS_IDEMPOTENCY_DISABLED before calling decorated functions + monkeypatch.setenv("POWERTOOLS_IDEMPOTENCY_DISABLED", 1) + + result = handler() + ... diff --git a/docs/examples/utilities/idempotency/testing_with_dynamodb_local_app.py b/docs/examples/utilities/idempotency/testing_with_dynamodb_local_app.py new file mode 100644 index 00000000000..d6fa87ea124 --- /dev/null +++ b/docs/examples/utilities/idempotency/testing_with_dynamodb_local_app.py @@ -0,0 +1,13 @@ +from aws_lambda_powertools.utilities.idempotency import DynamoDBPersistenceLayer, idempotent + +persistence_layer = DynamoDBPersistenceLayer(table_name="idempotency") + + +@idempotent(persistence_store=persistence_layer) +def handler(event, context): + print("expensive operation") + return { + "payment_id": 12345, + "message": "success", + "statusCode": 200, + } diff --git a/docs/examples/utilities/idempotency/testing_with_dynamodb_local_test.py b/docs/examples/utilities/idempotency/testing_with_dynamodb_local_test.py new file mode 100644 index 00000000000..b79041cdb63 --- /dev/null +++ b/docs/examples/utilities/idempotency/testing_with_dynamodb_local_test.py @@ -0,0 +1,12 @@ +import app +import boto3 + + +def test_idempotent_lambda(): + # Create our own Table resource using the endpoint for our DynamoDB Local instance + resource = boto3.resource("dynamodb", endpoint_url="http://localhost:8000") + table = resource.Table(app.persistence_layer.table_name) + app.persistence_layer.table = table + + result = app.handler({"testkey": "testvalue"}, {}) + assert result["payment_id"] == 12345 diff --git a/docs/examples/utilities/idempotency/testing_with_mocked_dynamodb_app.py b/docs/examples/utilities/idempotency/testing_with_mocked_dynamodb_app.py new file mode 100644 index 00000000000..d6fa87ea124 --- /dev/null +++ b/docs/examples/utilities/idempotency/testing_with_mocked_dynamodb_app.py @@ -0,0 +1,13 @@ +from aws_lambda_powertools.utilities.idempotency import DynamoDBPersistenceLayer, idempotent + +persistence_layer = DynamoDBPersistenceLayer(table_name="idempotency") + + +@idempotent(persistence_store=persistence_layer) +def handler(event, context): + print("expensive operation") + return { + "payment_id": 12345, + "message": "success", + "statusCode": 200, + } diff --git a/docs/examples/utilities/idempotency/testing_with_mocked_dynamodb_test.py b/docs/examples/utilities/idempotency/testing_with_mocked_dynamodb_test.py new file mode 100644 index 00000000000..cbb20367bd2 --- /dev/null +++ b/docs/examples/utilities/idempotency/testing_with_mocked_dynamodb_test.py @@ -0,0 +1,11 @@ +from unittest.mock import MagicMock + +import app + + +def test_idempotent_lambda(): + table = MagicMock() + app.persistence_layer.table = table + result = app.handler({"testkey": "testvalue"}, {}) + table.put_item.assert_called() + ... diff --git a/docs/utilities/idempotency.md b/docs/utilities/idempotency.md index 4b03b66abd4..aa57c51edff 100644 --- a/docs/utilities/idempotency.md +++ b/docs/utilities/idempotency.md @@ -43,30 +43,8 @@ TTL attribute name | `expiration` | This can only be configured after your table ???+ tip "Tip: You can share a single state table for all functions" You can reuse the same DynamoDB table to store idempotency state. We add your `function_name` in addition to the idempotency key as a hash key. -```yaml hl_lines="5-13 21-23" title="AWS Serverless Application Model (SAM) example" -Resources: - IdempotencyTable: - Type: AWS::DynamoDB::Table - Properties: - AttributeDefinitions: - - AttributeName: id - AttributeType: S - KeySchema: - - AttributeName: id - KeyType: HASH - TimeToLiveSpecification: - AttributeName: expiration - Enabled: true - BillingMode: PAY_PER_REQUEST - - HelloWorldFunction: - Type: AWS::Serverless::Function - Properties: - Runtime: python3.8 - ... - Policies: - - DynamoDBCrudPolicy: - TableName: !Ref IdempotencyTable +```yaml hl_lines="7-15 24-26" title="AWS Serverless Application Model (SAM) example" +--8<-- "docs/examples/utilities/idempotency/template.yml" ``` ???+ warning "Warning: Large responses with DynamoDB persistence layer" @@ -86,25 +64,8 @@ You can quickly start by initializing the `DynamoDBPersistenceLayer` class and u === "app.py" - ```python hl_lines="1-3 5 7 14" - from aws_lambda_powertools.utilities.idempotency import ( - DynamoDBPersistenceLayer, idempotent - ) - - persistence_layer = DynamoDBPersistenceLayer(table_name="IdempotencyTable") - - @idempotent(persistence_store=persistence_layer) - def handler(event, context): - payment = create_subscription_payment( - user=event['user'], - product=event['product_id'] - ) - ... - return { - "payment_id": payment.id, - "message": "success", - "statusCode": 200, - } + ```python hl_lines="1 3 6 10" + --8<-- "docs/examples/utilities/idempotency/idempotent_decorator.py" ``` === "Example event" @@ -116,7 +77,7 @@ You can quickly start by initializing the `DynamoDBPersistenceLayer` class and u } ``` -### Idempotent_function decorator +### Idempotent function decorator Similar to [idempotent decorator](#idempotent-decorator), you can use `idempotent_function` decorator for any synchronous Python function. @@ -131,37 +92,8 @@ When using `idempotent_function`, you must tell us which keyword parameter in yo This example also demonstrates how you can integrate with [Batch utility](batch.md), so you can process each record in an idempotent manner. - ```python hl_lines="4-5 16 21 29" - from aws_lambda_powertools.utilities.batch import (BatchProcessor, EventType, - batch_processor) - from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord - from aws_lambda_powertools.utilities.idempotency import ( - DynamoDBPersistenceLayer, IdempotencyConfig, idempotent_function) - - - processor = BatchProcessor(event_type=EventType.SQS) - dynamodb = DynamoDBPersistenceLayer(table_name="idem") - config = IdempotencyConfig( - event_key_jmespath="messageId", # see Choosing a payload subset section - use_local_cache=True, - ) - - - @idempotent_function(data_keyword_argument="record", config=config, persistence_store=dynamodb) - def record_handler(record: SQSRecord): - return {"message": record["body"]} - - - @idempotent_function(data_keyword_argument="data", config=config, persistence_store=dynamodb) - def dummy(arg_one, arg_two, data: dict, **kwargs): - return {"data": data} - - - @batch_processor(record_handler=record_handler, processor=processor) - def lambda_handler(event, context): - # `data` parameter must be called as a keyword argument to work - dummy("hello", "universe", data="test") - return processor.response() + ```python hl_lines="3 13 18 26" + --8<-- "docs/examples/utilities/idempotency/batch_sample.py" ``` === "Batch event" @@ -197,75 +129,14 @@ When using `idempotent_function`, you must tell us which keyword parameter in yo === "dataclass_sample.py" - ```python hl_lines="3-4 23 32" - from dataclasses import dataclass - - from aws_lambda_powertools.utilities.idempotency import ( - DynamoDBPersistenceLayer, IdempotencyConfig, idempotent_function) - - dynamodb = DynamoDBPersistenceLayer(table_name="idem") - config = IdempotencyConfig( - event_key_jmespath="order_id", # see Choosing a payload subset section - use_local_cache=True, - ) - - @dataclass - class OrderItem: - sku: str - description: str - - @dataclass - class Order: - item: OrderItem - order_id: int - - - @idempotent_function(data_keyword_argument="order", config=config, persistence_store=dynamodb) - def process_order(order: Order): - return f"processed order {order.order_id}" - - - order_item = OrderItem(sku="fake", description="sample") - order = Order(item=order_item, order_id="fake-id") - - # `order` parameter must be called as a keyword argument to work - process_order(order=order) + ```python hl_lines="3 24 33" + --8<-- "docs/examples/utilities/idempotency/dataclass_sample.py" ``` === "parser_pydantic_sample.py" - ```python hl_lines="1-2 22 31" - from aws_lambda_powertools.utilities.idempotency import ( - DynamoDBPersistenceLayer, IdempotencyConfig, idempotent_function) - from aws_lambda_powertools.utilities.parser import BaseModel - - dynamodb = DynamoDBPersistenceLayer(table_name="idem") - config = IdempotencyConfig( - event_key_jmespath="order_id", # see Choosing a payload subset section - use_local_cache=True, - ) - - - class OrderItem(BaseModel): - sku: str - description: str - - - class Order(BaseModel): - item: OrderItem - order_id: int - - - @idempotent_function(data_keyword_argument="order", config=config, persistence_store=dynamodb) - def process_order(order: Order): - return f"processed order {order.order_id}" - - - order_item = OrderItem(sku="fake", description="sample") - order = Order(item=order_item, order_id="fake-id") - - # `order` parameter must be called as a keyword argument to work - process_order(order=order) + ```python hl_lines="1 21 30" + --8<-- "docs/examples/utilities/idempotency/parser_pydantic_sample.py" ``` ### Choosing a payload subset for idempotency @@ -288,31 +159,8 @@ Imagine the function executes successfully, but the client never receives the re === "payment.py" - ```python hl_lines="2-4 10 12 15 20" - import json - from aws_lambda_powertools.utilities.idempotency import ( - IdempotencyConfig, DynamoDBPersistenceLayer, idempotent - ) - - persistence_layer = DynamoDBPersistenceLayer(table_name="IdempotencyTable") - - # Treat everything under the "body" key - # in the event json object as our payload - config = IdempotencyConfig(event_key_jmespath="powertools_json(body)") - - @idempotent(config=config, persistence_store=persistence_layer) - def handler(event, context): - body = json.loads(event['body']) - payment = create_subscription_payment( - user=body['user'], - product=body['product_id'] - ) - ... - return { - "payment_id": payment.id, - "message": "success", - "statusCode": 200 - } + ```python hl_lines="3 9 12 15 20" + --8<-- "docs/examples/utilities/idempotency/payment.py" ``` === "Example event" @@ -350,7 +198,6 @@ Imagine the function executes successfully, but the client never receives the re } ``` - ### Idempotency request flow This sequence diagram shows an example flow of what happens in the payment scenario: @@ -367,30 +214,14 @@ The client was successful in receiving the result after the retry. Since the Lam If you are using the `idempotent` decorator on your Lambda handler, any unhandled exceptions that are raised during the code execution will cause **the record in the persistence layer to be deleted**. This means that new invocations will execute your code again despite having the same payload. If you don't want the record to be deleted, you need to catch exceptions within the idempotent function and return a successful response. - ![Idempotent sequence exception](../media/idempotent_sequence_exception.png) If you are using `idempotent_function`, any unhandled exceptions that are raised _inside_ the decorated function will cause the record in the persistence layer to be deleted, and allow the function to be executed again if retried. If an Exception is raised _outside_ the scope of the decorated function and after your function has been called, the persistent record will not be affected. In this case, idempotency will be maintained for your decorated function. Example: -```python hl_lines="2-4 8-10" title="Exception not affecting idempotency record sample" -def lambda_handler(event, context): - # If an exception is raised here, no idempotent record will ever get created as the - # idempotent function does not get called - do_some_stuff() - - result = call_external_service(data={"user": "user1", "id": 5}) - - # This exception will not cause the idempotent record to be deleted, since it - # happens after the decorated function has been successfully called - raise Exception - - -@idempotent_function(data_keyword_argument="data", config=config, persistence_store=dynamodb) -def call_external_service(data: dict, **kwargs): - result = requests.post('http://example.com', json={"user": data['user'], "transaction_id": data['id']} - return result.json() +```python hl_lines="10-12 16-18" title="Exception not affecting idempotency record sample" +--8<-- "docs/examples/utilities/idempotency/idempotency_exception_sample.py" ``` ???+ warning @@ -405,16 +236,7 @@ def call_external_service(data: dict, **kwargs): This persistence layer is built-in, and you can either use an existing DynamoDB table or create a new one dedicated for idempotency state (recommended). ```python hl_lines="5-9" title="Customizing DynamoDBPersistenceLayer to suit your table structure" -from aws_lambda_powertools.utilities.idempotency import DynamoDBPersistenceLayer - -persistence_layer = DynamoDBPersistenceLayer( - table_name="IdempotencyTable", - key_attr="idempotency_key", - expiry_attr="expires_at", - status_attr="current_status", - data_attr="result_data", - validation_key_attr="validation_key", -) +--8<-- "docs/examples/utilities/idempotency/dynamodb_persistence_layer_customization.py" ``` When using DynamoDB as a persistence layer, you can alter the attribute names by passing these parameters when initializing the persistence layer: @@ -464,20 +286,8 @@ This is a locking mechanism for correctness. Since we don't know the result from You can enable in-memory caching with the **`use_local_cache`** parameter: -```python hl_lines="8 11" title="Caching idempotent transactions in-memory to prevent multiple calls to storage" -from aws_lambda_powertools.utilities.idempotency import ( - IdempotencyConfig, DynamoDBPersistenceLayer, idempotent -) - -persistence_layer = DynamoDBPersistenceLayer(table_name="IdempotencyTable") -config = IdempotencyConfig( - event_key_jmespath="body", - use_local_cache=True, -) - -@idempotent(config=config, persistence_store=persistence_layer) -def handler(event, context): - ... +```python hl_lines="6 10" title="Caching idempotent transactions in-memory to prevent multiple calls to storage" +--8<-- "docs/examples/utilities/idempotency/idempotency_in_memory_cache.py" ``` When enabled, the default is to cache a maximum of 256 records in each Lambda execution environment - You can change it with the **`local_cache_max_items`** parameter. @@ -491,20 +301,8 @@ In most cases, it is not desirable to store the idempotency records forever. Rat You can change this window with the **`expires_after_seconds`** parameter: -```python hl_lines="8 11" title="Adjusting cache TTL" -from aws_lambda_powertools.utilities.idempotency import ( - IdempotencyConfig, DynamoDBPersistenceLayer, idempotent -) - -persistence_layer = DynamoDBPersistenceLayer(table_name="IdempotencyTable") -config = IdempotencyConfig( - event_key_jmespath="body", - expires_after_seconds=5*60, # 5 minutes -) - -@idempotent(config=config, persistence_store=persistence_layer) -def handler(event, context): - ... +```python hl_lines="6 10" title="Adjusting cache TTL" +--8<-- "docs/examples/utilities/idempotency/idempotency_cache_ttl.py" ``` This will mark any records older than 5 minutes as expired, and the lambda handler will be executed as normal if it is invoked with a matching payload. @@ -523,33 +321,8 @@ With **`payload_validation_jmespath`**, you can provide an additional JMESPath e === "app.py" - ```python hl_lines="7 11 18 25" - from aws_lambda_powertools.utilities.idempotency import ( - IdempotencyConfig, DynamoDBPersistenceLayer, idempotent - ) - - config = IdempotencyConfig( - event_key_jmespath="[userDetail, productId]", - payload_validation_jmespath="amount" - ) - persistence_layer = DynamoDBPersistenceLayer(table_name="IdempotencyTable") - - @idempotent(config=config, persistence_store=persistence_layer) - def handler(event, context): - # Creating a subscription payment is a side - # effect of calling this function! - payment = create_subscription_payment( - user=event['userDetail']['username'], - product=event['product_id'], - amount=event['amount'] - ) - ... - return { - "message": "success", - "statusCode": 200, - "payment_id": payment.id, - "amount": payment.amount - } + ```python hl_lines="5 10 17 24" + --8<-- "docs/examples/utilities/idempotency/idempotency_payload_validation.py" ``` === "Example Event 1" @@ -597,22 +370,8 @@ This means that we will raise **`IdempotencyKeyError`** if the evaluation of **` === "app.py" - ```python hl_lines="9-10 13" - from aws_lambda_powertools.utilities.idempotency import ( - IdempotencyConfig, DynamoDBPersistenceLayer, idempotent - ) - - persistence_layer = DynamoDBPersistenceLayer(table_name="IdempotencyTable") - - # Requires "user"."uid" and "order_id" to be present - config = IdempotencyConfig( - event_key_jmespath="[user.uid, order_id]", - raise_on_no_idempotency_key=True, - ) - - @idempotent(config=config, persistence_store=persistence_layer) - def handler(event, context): - pass + ```python hl_lines="7-8 12" + --8<-- "docs/examples/utilities/idempotency/idempotency_key_required.py" ``` === "Success Event" @@ -647,42 +406,14 @@ The **`boto_config`** and **`boto3_session`** parameters enable you to pass in a === "Custom session" - ```python hl_lines="1 6 9 14" - import boto3 - from aws_lambda_powertools.utilities.idempotency import ( - IdempotencyConfig, DynamoDBPersistenceLayer, idempotent - ) - - boto3_session = boto3.session.Session() - persistence_layer = DynamoDBPersistenceLayer( - table_name="IdempotencyTable", - boto3_session=boto3_session - ) - - config = IdempotencyConfig(event_key_jmespath="body") - - @idempotent(config=config, persistence_store=persistence_layer) - def handler(event, context): - ... + ```python hl_lines="1 5 8 14" + --8<-- "docs/examples/utilities/idempotency/idempotency_custom_session.py" ``` + === "Custom config" - ```python hl_lines="1 7 10" - from botocore.config import Config - from aws_lambda_powertools.utilities.idempotency import ( - IdempotencyConfig, DynamoDBPersistenceLayer, idempotent - ) - - config = IdempotencyConfig(event_key_jmespath="body") - boto_config = Config() - persistence_layer = DynamoDBPersistenceLayer( - table_name="IdempotencyTable", - boto_config=boto_config - ) - - @idempotent(config=config, persistence_store=persistence_layer) - def handler(event, context): - ... + ```python hl_lines="1 6 9 13" + --8<-- "docs/examples/utilities/idempotency/idempotency_custom_config.py" ``` ### Using a DynamoDB table with a composite primary key @@ -694,16 +425,7 @@ With this setting, we will save the idempotency key in the sort key instead of t You can optionally set a static value for the partition key using the `static_pk_value` parameter. ```python hl_lines="5" title="Reusing a DynamoDB table that uses a composite primary key" -from aws_lambda_powertools.utilities.idempotency import DynamoDBPersistenceLayer, idempotent - -persistence_layer = DynamoDBPersistenceLayer( - table_name="IdempotencyTable", - sort_key_attr='sort_key') - - -@idempotent(persistence_store=persistence_layer) -def handler(event, context): - return {"message": "success": "id": event['body']['id]} +--8<-- "docs/examples/utilities/idempotency/idempotency_composite_primary_key.py" ``` The example function above would cause data to be stored in DynamoDB like this: @@ -722,132 +444,7 @@ You can inherit from the `BasePersistenceLayer` class and implement the abstract `_update_record` and `_delete_record`. ```python hl_lines="8-13 57 65 74 96 124" title="Excerpt DynamoDB Persisntence Layer implementation for reference" -import datetime -import logging -from typing import Any, Dict, Optional - -import boto3 -from botocore.config import Config - -from aws_lambda_powertools.utilities.idempotency import BasePersistenceLayer -from aws_lambda_powertools.utilities.idempotency.exceptions import ( - IdempotencyItemAlreadyExistsError, - IdempotencyItemNotFoundError, -) -from aws_lambda_powertools.utilities.idempotency.persistence.base import DataRecord - -logger = logging.getLogger(__name__) - - -class DynamoDBPersistenceLayer(BasePersistenceLayer): - def __init__( - self, - table_name: str, - key_attr: str = "id", - expiry_attr: str = "expiration", - status_attr: str = "status", - data_attr: str = "data", - validation_key_attr: str = "validation", - boto_config: Optional[Config] = None, - boto3_session: Optional[boto3.session.Session] = None, - ): - boto_config = boto_config or Config() - session = boto3_session or boto3.session.Session() - self._ddb_resource = session.resource("dynamodb", config=boto_config) - self.table_name = table_name - self.table = self._ddb_resource.Table(self.table_name) - self.key_attr = key_attr - self.expiry_attr = expiry_attr - self.status_attr = status_attr - self.data_attr = data_attr - self.validation_key_attr = validation_key_attr - super(DynamoDBPersistenceLayer, self).__init__() - - def _item_to_data_record(self, item: Dict[str, Any]) -> DataRecord: - """ - Translate raw item records from DynamoDB to DataRecord - - Parameters - ---------- - item: Dict[str, Union[str, int]] - Item format from dynamodb response - - Returns - ------- - DataRecord - representation of item - - """ - return DataRecord( - idempotency_key=item[self.key_attr], - status=item[self.status_attr], - expiry_timestamp=item[self.expiry_attr], - response_data=item.get(self.data_attr), - payload_hash=item.get(self.validation_key_attr), - ) - - def _get_record(self, idempotency_key) -> DataRecord: - response = self.table.get_item(Key={self.key_attr: idempotency_key}, ConsistentRead=True) - - try: - item = response["Item"] - except KeyError: - raise IdempotencyItemNotFoundError - return self._item_to_data_record(item) - - def _put_record(self, data_record: DataRecord) -> None: - item = { - self.key_attr: data_record.idempotency_key, - self.expiry_attr: data_record.expiry_timestamp, - self.status_attr: data_record.status, - } - - if self.payload_validation_enabled: - item[self.validation_key_attr] = data_record.payload_hash - - now = datetime.datetime.now() - try: - logger.debug(f"Putting record for idempotency key: {data_record.idempotency_key}") - self.table.put_item( - Item=item, - ConditionExpression=f"attribute_not_exists({self.key_attr}) OR {self.expiry_attr} < :now", - ExpressionAttributeValues={":now": int(now.timestamp())}, - ) - except self._ddb_resource.meta.client.exceptions.ConditionalCheckFailedException: - logger.debug(f"Failed to put record for already existing idempotency key: {data_record.idempotency_key}") - raise IdempotencyItemAlreadyExistsError - - def _update_record(self, data_record: DataRecord): - logger.debug(f"Updating record for idempotency key: {data_record.idempotency_key}") - update_expression = "SET #response_data = :response_data, #expiry = :expiry, #status = :status" - expression_attr_values = { - ":expiry": data_record.expiry_timestamp, - ":response_data": data_record.response_data, - ":status": data_record.status, - } - expression_attr_names = { - "#response_data": self.data_attr, - "#expiry": self.expiry_attr, - "#status": self.status_attr, - } - - if self.payload_validation_enabled: - update_expression += ", #validation_key = :validation_key" - expression_attr_values[":validation_key"] = data_record.payload_hash - expression_attr_names["#validation_key"] = self.validation_key_attr - - kwargs = { - "Key": {self.key_attr: data_record.idempotency_key}, - "UpdateExpression": update_expression, - "ExpressionAttributeValues": expression_attr_values, - "ExpressionAttributeNames": expression_attr_names, - } - - self.table.update_item(**kwargs) - - def _delete_record(self, data_record: DataRecord) -> None: - logger.debug(f"Deleting record for idempotency key: {data_record.idempotency_key}") - self.table.delete_item(Key={self.key_attr: data_record.idempotency_key},) +--8<-- "docs/examples/utilities/idempotency/bring_your_own_persistent_store.py" ``` ???+ danger @@ -867,61 +464,31 @@ The idempotency utility can be used with the `validator` decorator. Ensure that Make sure to account for this behaviour, if you set the `event_key_jmespath`. -```python hl_lines="9 10" title="Using Idempotency with JSONSchema Validation utility" -from aws_lambda_powertools.utilities.validation import validator, envelopes -from aws_lambda_powertools.utilities.idempotency import ( - IdempotencyConfig, DynamoDBPersistenceLayer, idempotent -) - -config = IdempotencyConfig(event_key_jmespath="[message, username]") -persistence_layer = DynamoDBPersistenceLayer(table_name="IdempotencyTable") - -@validator(envelope=envelopes.API_GATEWAY_HTTP) -@idempotent(config=config, persistence_store=persistence_layer) -def lambda_handler(event, context): - cause_some_side_effects(event['username') - return {"message": event['message'], "statusCode": 200} +```python hl_lines="8-9" title="Using Idempotency with JSONSchema Validation utility" +--8<-- "docs/examples/utilities/idempotency/idempotency_with_validator.py" ``` ???+ tip "Tip: JMESPath Powertools functions are also available" Built-in functions known in the validation utility like `powertools_json`, `powertools_base64`, `powertools_base64_gzip` are also available to use in this utility. - ## Testing your code The idempotency utility provides several routes to test your code. ### Disabling the idempotency utility + When testing your code, you may wish to disable the idempotency logic altogether and focus on testing your business logic. To do this, you can set the environment variable `POWERTOOLS_IDEMPOTENCY_DISABLED` with a truthy value. If you prefer setting this for specific tests, and are using Pytest, you can use [monkeypatch](https://docs.pytest.org/en/latest/monkeypatch.html) fixture: === "tests.py" - ```python hl_lines="2 3" - def test_idempotent_lambda_handler(monkeypatch): - # Set POWERTOOLS_IDEMPOTENCY_DISABLED before calling decorated functions - monkeypatch.setenv("POWERTOOLS_IDEMPOTENCY_DISABLED", 1) - - result = handler() - ... + ```python hl_lines="5-6" + --8<-- "docs/examples/utilities/idempotency/testing_idempotency_disabled_test.py" ``` === "app.py" ```python - from aws_lambda_powertools.utilities.idempotency import ( - DynamoDBPersistenceLayer, idempotent - ) - - persistence_layer = DynamoDBPersistenceLayer(table_name="idempotency") - - @idempotent(persistence_store=persistence_layer) - def handler(event, context): - print('expensive operation') - return { - "payment_id": 12345, - "message": "success", - "statusCode": 200, - } + --8<-- "docs/examples/utilities/idempotency/testing_idempotency_disabled_app.py" ``` ### Testing with DynamoDB Local @@ -931,37 +498,13 @@ To test with [DynamoDB Local](https://docs.aws.amazon.com/amazondynamodb/latest/ === "tests.py" ```python hl_lines="6 7 8" - import boto3 - - import app - - def test_idempotent_lambda(): - # Create our own Table resource using the endpoint for our DynamoDB Local instance - resource = boto3.resource("dynamodb", endpoint_url='http://localhost:8000') - table = resource.Table(app.persistence_layer.table_name) - app.persistence_layer.table = table - - result = app.handler({'testkey': 'testvalue'}, {}) - assert result['payment_id'] == 12345 + --8<-- "docs/examples/utilities/idempotency/testing_with_dynamodb_local_test.py" ``` === "app.py" ```python - from aws_lambda_powertools.utilities.idempotency import ( - DynamoDBPersistenceLayer, idempotent - ) - - persistence_layer = DynamoDBPersistenceLayer(table_name="idempotency") - - @idempotent(persistence_store=persistence_layer) - def handler(event, context): - print('expensive operation') - return { - "payment_id": 12345, - "message": "success", - "statusCode": 200, - } + --8<-- "docs/examples/utilities/idempotency/testing_with_dynamodb_local_app.py" ``` ### How do I mock all DynamoDB I/O operations @@ -971,39 +514,17 @@ This means it is possible to pass a mocked Table resource, or stub various metho === "tests.py" - ```python hl_lines="6 7 8 9" - from unittest.mock import MagicMock - - import app - - def test_idempotent_lambda(): - table = MagicMock() - app.persistence_layer.table = table - result = app.handler({'testkey': 'testvalue'}, {}) - table.put_item.assert_called() - ... + ```python hl_lines="7-10" + --8<-- "docs/examples/utilities/idempotency/testing_with_mocked_dynamodb_test.py" ``` === "app.py" ```python - from aws_lambda_powertools.utilities.idempotency import ( - DynamoDBPersistenceLayer, idempotent - ) - - persistence_layer = DynamoDBPersistenceLayer(table_name="idempotency") - - @idempotent(persistence_store=persistence_layer) - def handler(event, context): - print('expensive operation') - return { - "payment_id": 12345, - "message": "success", - "statusCode": 200, - } + --8<-- "docs/examples/utilities/idempotency/testing_with_mocked_dynamodb_app.py" ``` ## Extra resources If you're interested in a deep dive on how Amazon uses idempotency when building our APIs, check out -[this article](https://aws.amazon.com/builders-library/making-retries-safe-with-idempotent-APIs/). +[this article](https://aws.amazon.com/builders-library/making-retries-safe-with-idempotent-APIs/){target="_blank"}.