From a67dd5995d021b18f0200763027636545d48c640 Mon Sep 17 00:00:00 2001 From: Michael Brewer Date: Sun, 1 May 2022 10:28:17 -0700 Subject: [PATCH 1/4] feat(idempotency): Clean up on lambda timeout Changes: - Initial draft on an option to clean up on function timeout close #1038 --- .../utilities/idempotency/base.py | 10 +++++- .../utilities/idempotency/config.py | 4 +++ .../utilities/idempotency/persistence/base.py | 28 +++++++++++++-- .../idempotency/persistence/dynamodb.py | 11 ++++-- tests/functional/idempotency/conftest.py | 36 ++++++++++++------- tests/functional/idempotency/utils.py | 11 ++++-- 6 files changed, 79 insertions(+), 21 deletions(-) diff --git a/aws_lambda_powertools/utilities/idempotency/base.py b/aws_lambda_powertools/utilities/idempotency/base.py index 41fbd232ad3..889450ac210 100644 --- a/aws_lambda_powertools/utilities/idempotency/base.py +++ b/aws_lambda_powertools/utilities/idempotency/base.py @@ -101,7 +101,10 @@ def _process_idempotency(self): try: # We call save_inprogress first as an optimization for the most common case where no idempotent record # already exists. If it succeeds, there's no need to call get_record. - self.persistence_store.save_inprogress(data=self.data) + self.persistence_store.save_inprogress( + data=self.data, + function_timeout=self._get_remaining_time_in_seconds(), + ) except IdempotencyKeyError: raise except IdempotencyItemAlreadyExistsError: @@ -113,6 +116,11 @@ def _process_idempotency(self): return self._get_function_response() + def _get_remaining_time_in_seconds(self) -> Optional[int]: + if self.fn_args and len(self.fn_args) == 2 and getattr(self.fn_args[1], "get_remaining_time_in_millis", None): + return self.fn_args[1].get_remaining_time_in_millis() / 1000 + return None + def _get_idempotency_record(self) -> DataRecord: """ Retrieve the idempotency record from the persistence layer. diff --git a/aws_lambda_powertools/utilities/idempotency/config.py b/aws_lambda_powertools/utilities/idempotency/config.py index 06468cc74a7..8b752641824 100644 --- a/aws_lambda_powertools/utilities/idempotency/config.py +++ b/aws_lambda_powertools/utilities/idempotency/config.py @@ -9,6 +9,7 @@ def __init__( jmespath_options: Optional[Dict] = None, raise_on_no_idempotency_key: bool = False, expires_after_seconds: int = 60 * 60, # 1 hour default + function_timeout_clean_up: bool = False, use_local_cache: bool = False, local_cache_max_items: int = 256, hash_function: str = "md5", @@ -26,6 +27,8 @@ def __init__( Raise exception if no idempotency key was found in the request, by default False expires_after_seconds: int The number of seconds to wait before a record is expired + function_timeout_clean_up: bool + Whether to clean up in progress record after a function timeouts use_local_cache: bool, optional Whether to locally cache idempotency results, by default False local_cache_max_items: int, optional @@ -38,6 +41,7 @@ def __init__( self.jmespath_options = jmespath_options self.raise_on_no_idempotency_key = raise_on_no_idempotency_key self.expires_after_seconds = expires_after_seconds + self.function_timeout_clean_up = function_timeout_clean_up self.use_local_cache = use_local_cache self.local_cache_max_items = local_cache_max_items self.hash_function = hash_function diff --git a/aws_lambda_powertools/utilities/idempotency/persistence/base.py b/aws_lambda_powertools/utilities/idempotency/persistence/base.py index e6ffea10de8..0493d5e066c 100644 --- a/aws_lambda_powertools/utilities/idempotency/persistence/base.py +++ b/aws_lambda_powertools/utilities/idempotency/persistence/base.py @@ -40,6 +40,7 @@ def __init__( idempotency_key, status: str = "", expiry_timestamp: Optional[int] = None, + function_timeout: Optional[int] = None, response_data: Optional[str] = "", payload_hash: Optional[str] = None, ) -> None: @@ -61,6 +62,7 @@ def __init__( self.idempotency_key = idempotency_key self.payload_hash = payload_hash self.expiry_timestamp = expiry_timestamp + self.function_timeout = function_timeout self._status = status self.response_data = response_data @@ -120,6 +122,7 @@ def __init__(self): self.validation_key_jmespath = None self.raise_on_no_idempotency_key = False self.expires_after_seconds: int = 60 * 60 # 1 hour default + self.function_timeout_clean_up = False self.use_local_cache = False self.hash_function = None @@ -152,6 +155,7 @@ def configure(self, config: IdempotencyConfig, function_name: Optional[str] = No self.payload_validation_enabled = True self.raise_on_no_idempotency_key = config.raise_on_no_idempotency_key self.expires_after_seconds = config.expires_after_seconds + self.function_timeout_clean_up = config.function_timeout_clean_up self.use_local_cache = config.use_local_cache if self.use_local_cache: self._cache = LRUDict(max_items=config.local_cache_max_items) @@ -257,9 +261,21 @@ def _get_expiry_timestamp(self) -> int: int unix timestamp of expiry date for idempotency record + """ + return self._get_timestamp_after_seconds(self.expires_after_seconds) + + @staticmethod + def _get_timestamp_after_seconds(seconds: int) -> int: + """ + + Returns + ------- + int + unix timestamp after the specified seconds + """ now = datetime.datetime.now() - period = datetime.timedelta(seconds=self.expires_after_seconds) + period = datetime.timedelta(seconds=seconds) return int((now + period).timestamp()) def _save_to_cache(self, data_record: DataRecord): @@ -317,6 +333,7 @@ def save_success(self, data: Dict[str, Any], result: dict) -> None: idempotency_key=self._get_hashed_idempotency_key(data=data), status=STATUS_CONSTANTS["COMPLETED"], expiry_timestamp=self._get_expiry_timestamp(), + function_timeout=None, response_data=response_data, payload_hash=self._get_hashed_payload(data=data), ) @@ -328,7 +345,7 @@ def save_success(self, data: Dict[str, Any], result: dict) -> None: self._save_to_cache(data_record=data_record) - def save_inprogress(self, data: Dict[str, Any]) -> None: + def save_inprogress(self, data: Dict[str, Any], function_timeout: Optional[int] = None) -> None: """ Save record of function's execution being in progress @@ -336,11 +353,18 @@ def save_inprogress(self, data: Dict[str, Any]) -> None: ---------- data: Dict[str, Any] Payload + function_timeout: int, optional """ + function_timeout = ( + self._get_timestamp_after_seconds(function_timeout) + if function_timeout and self.function_timeout_clean_up + else None + ) data_record = DataRecord( idempotency_key=self._get_hashed_idempotency_key(data=data), status=STATUS_CONSTANTS["INPROGRESS"], expiry_timestamp=self._get_expiry_timestamp(), + function_timeout=function_timeout, payload_hash=self._get_hashed_payload(data=data), ) diff --git a/aws_lambda_powertools/utilities/idempotency/persistence/dynamodb.py b/aws_lambda_powertools/utilities/idempotency/persistence/dynamodb.py index 88955738ecc..ee0ab7b26b3 100644 --- a/aws_lambda_powertools/utilities/idempotency/persistence/dynamodb.py +++ b/aws_lambda_powertools/utilities/idempotency/persistence/dynamodb.py @@ -25,6 +25,7 @@ def __init__( static_pk_value: Optional[str] = None, sort_key_attr: Optional[str] = None, expiry_attr: str = "expiration", + function_timeout_attr: str = "function_timeout", status_attr: str = "status", data_attr: str = "data", validation_key_attr: str = "validation", @@ -85,6 +86,7 @@ def __init__( self.static_pk_value = static_pk_value self.sort_key_attr = sort_key_attr self.expiry_attr = expiry_attr + self.function_timeout_attr = function_timeout_attr self.status_attr = status_attr self.data_attr = data_attr self.validation_key_attr = validation_key_attr @@ -150,6 +152,7 @@ def _put_record(self, data_record: DataRecord) -> None: item = { **self._get_key(data_record.idempotency_key), self.expiry_attr: data_record.expiry_timestamp, + self.function_timeout_attr: data_record.function_timeout, self.status_attr: data_record.status, } @@ -161,8 +164,12 @@ def _put_record(self, data_record: DataRecord) -> None: logger.debug(f"Putting record for idempotency key: {data_record.idempotency_key}") self.table.put_item( Item=item, - ConditionExpression="attribute_not_exists(#id) OR #now < :now", - ExpressionAttributeNames={"#id": self.key_attr, "#now": self.expiry_attr}, + ConditionExpression="attribute_not_exists(#id) OR #now < :now OR #function_timeout < :now", + ExpressionAttributeNames={ + "#id": self.key_attr, + "#now": self.expiry_attr, + "#function_timeout": self.function_timeout_attr, + }, ExpressionAttributeValues={":now": int(now.timestamp())}, ) except self.table.meta.client.exceptions.ConditionalCheckFailedException: diff --git a/tests/functional/idempotency/conftest.py b/tests/functional/idempotency/conftest.py index 74deecef123..957777e0782 100644 --- a/tests/functional/idempotency/conftest.py +++ b/tests/functional/idempotency/conftest.py @@ -1,6 +1,5 @@ import datetime import json -from collections import namedtuple from decimal import Decimal from unittest import mock @@ -32,14 +31,19 @@ def lambda_apigw_event(): @pytest.fixture def lambda_context(): - lambda_context = { - "function_name": "test-func", - "memory_limit_in_mb": 128, - "invoked_function_arn": "arn:aws:lambda:eu-west-1:809313241234:function:test-func", - "aws_request_id": "52fdfc07-2182-154f-163f-5f0f9a621d72", - } + class LambdaContext: + def __init__(self): + self.function_name = "test-func" + self.memory_limit_in_mb = 128 + self.invoked_function_arn = "arn:aws:lambda:eu-west-1:809313241234:function:test-func" + self.aws_request_id = "52fdfc07-2182-154f-163f-5f0f9a621d72" + + @staticmethod + def get_remaining_time_in_millis() -> int: + """Returns the number of milliseconds left before the execution times out.""" + return 0 - return namedtuple("LambdaContext", lambda_context.keys())(*lambda_context.values()) + return LambdaContext() @pytest.fixture @@ -117,10 +121,15 @@ def expected_params_update_item_with_validation( @pytest.fixture def expected_params_put_item(hashed_idempotency_key): return { - "ConditionExpression": "attribute_not_exists(#id) OR #now < :now", - "ExpressionAttributeNames": {"#id": "id", "#now": "expiration"}, + "ConditionExpression": "attribute_not_exists(#id) OR #now < :now OR #function_timeout < :now", + "ExpressionAttributeNames": {"#id": "id", "#now": "expiration", "#function_timeout": "function_timeout"}, "ExpressionAttributeValues": {":now": stub.ANY}, - "Item": {"expiration": stub.ANY, "id": hashed_idempotency_key, "status": "INPROGRESS"}, + "Item": { + "expiration": stub.ANY, + "id": hashed_idempotency_key, + "status": "INPROGRESS", + "function_timeout": None, + }, "TableName": "TEST_TABLE", } @@ -128,14 +137,15 @@ def expected_params_put_item(hashed_idempotency_key): @pytest.fixture def expected_params_put_item_with_validation(hashed_idempotency_key, hashed_validation_key): return { - "ConditionExpression": "attribute_not_exists(#id) OR #now < :now", - "ExpressionAttributeNames": {"#id": "id", "#now": "expiration"}, + "ConditionExpression": "attribute_not_exists(#id) OR #now < :now OR #function_timeout < :now", + "ExpressionAttributeNames": {"#id": "id", "#now": "expiration", "#function_timeout": "function_timeout"}, "ExpressionAttributeValues": {":now": stub.ANY}, "Item": { "expiration": stub.ANY, "id": hashed_idempotency_key, "status": "INPROGRESS", "validation": hashed_validation_key, + "function_timeout": None, }, "TableName": "TEST_TABLE", } diff --git a/tests/functional/idempotency/utils.py b/tests/functional/idempotency/utils.py index ca3862a2d8c..1bf55047894 100644 --- a/tests/functional/idempotency/utils.py +++ b/tests/functional/idempotency/utils.py @@ -16,10 +16,15 @@ def build_idempotency_put_item_stub( ) -> Dict: idempotency_key_hash = f"{function_name}.{handler_name}#{hash_idempotency_key(data)}" return { - "ConditionExpression": "attribute_not_exists(#id) OR #now < :now", - "ExpressionAttributeNames": {"#id": "id", "#now": "expiration"}, + "ConditionExpression": "attribute_not_exists(#id) OR #now < :now OR #function_timeout < :now", + "ExpressionAttributeNames": {"#id": "id", "#now": "expiration", "#function_timeout": "function_timeout"}, "ExpressionAttributeValues": {":now": stub.ANY}, - "Item": {"expiration": stub.ANY, "id": idempotency_key_hash, "status": "INPROGRESS"}, + "Item": { + "expiration": stub.ANY, + "id": idempotency_key_hash, + "status": "INPROGRESS", + "function_timeout": None, + }, "TableName": "TEST_TABLE", } From 1efc5e4c4e7827272f05f95d13a3008b7e365439 Mon Sep 17 00:00:00 2001 From: Michael Brewer Date: Mon, 2 May 2022 23:43:54 -0700 Subject: [PATCH 2/4] test: Initial tests --- .../utilities/idempotency/config.py | 2 +- tests/functional/idempotency/conftest.py | 2 +- .../idempotency/test_idempotency.py | 44 +++++++++++++++++++ 3 files changed, 46 insertions(+), 2 deletions(-) diff --git a/aws_lambda_powertools/utilities/idempotency/config.py b/aws_lambda_powertools/utilities/idempotency/config.py index 8b752641824..2e11c64706c 100644 --- a/aws_lambda_powertools/utilities/idempotency/config.py +++ b/aws_lambda_powertools/utilities/idempotency/config.py @@ -28,7 +28,7 @@ def __init__( expires_after_seconds: int The number of seconds to wait before a record is expired function_timeout_clean_up: bool - Whether to clean up in progress record after a function timeouts + Whether to clean up "INPROGRESS" record after a function has timed out use_local_cache: bool, optional Whether to locally cache idempotency results, by default False local_cache_max_items: int, optional diff --git a/tests/functional/idempotency/conftest.py b/tests/functional/idempotency/conftest.py index 957777e0782..17934f4a1ca 100644 --- a/tests/functional/idempotency/conftest.py +++ b/tests/functional/idempotency/conftest.py @@ -41,7 +41,7 @@ def __init__(self): @staticmethod def get_remaining_time_in_millis() -> int: """Returns the number of milliseconds left before the execution times out.""" - return 0 + return 3000 return LambdaContext() diff --git a/tests/functional/idempotency/test_idempotency.py b/tests/functional/idempotency/test_idempotency.py index 40cee10e4f7..073fa4efaaa 100644 --- a/tests/functional/idempotency/test_idempotency.py +++ b/tests/functional/idempotency/test_idempotency.py @@ -1256,3 +1256,47 @@ def lambda_handler(event, context): stubber.assert_no_pending_responses() stubber.deactivate() + + +def test_idempotent_lambda_cleanup( + persistence_store: DynamoDBPersistenceLayer, + hashed_idempotency_key, + lambda_apigw_event, + expected_params_update_item, + lambda_response, + lambda_context, +): + # GIVEN + idempotency_config = IdempotencyConfig( + function_timeout_clean_up=True, + event_key_jmespath="[body, queryStringParameters]", + ) + + stubber = stub.Stubber(persistence_store.table.meta.client) + expected_params_put_item = { + "ConditionExpression": "attribute_not_exists(#id) OR #now < :now OR #function_timeout < :now", + "ExpressionAttributeNames": {"#id": "id", "#now": "expiration", "#function_timeout": "function_timeout"}, + "ExpressionAttributeValues": {":now": stub.ANY}, + "Item": { + "expiration": stub.ANY, + "id": hashed_idempotency_key, + "status": "INPROGRESS", + "function_timeout": stub.ANY, + }, + "TableName": "TEST_TABLE", + } + ddb_response = {} + stubber.add_response("put_item", ddb_response, expected_params_put_item) + stubber.add_response("update_item", ddb_response, expected_params_update_item) + stubber.activate() + + @idempotent(config=idempotency_config, persistence_store=persistence_store) + def lambda_handler(event, context): + return lambda_response + + # WHEN + lambda_handler(lambda_apigw_event, lambda_context) + + # THEN + stubber.assert_no_pending_responses() + stubber.deactivate() From bfc67e812edc2fb1089ba8dd622ed5f3d7ab5c71 Mon Sep 17 00:00:00 2001 From: Michael Brewer Date: Wed, 4 May 2022 12:59:49 -0700 Subject: [PATCH 3/4] fix: Reset function_timeout on complete --- .../utilities/idempotency/persistence/dynamodb.py | 8 +++++++- tests/functional/idempotency/conftest.py | 15 ++++++++++++--- tests/functional/idempotency/utils.py | 11 +++++++++-- 3 files changed, 28 insertions(+), 6 deletions(-) diff --git a/aws_lambda_powertools/utilities/idempotency/persistence/dynamodb.py b/aws_lambda_powertools/utilities/idempotency/persistence/dynamodb.py index ee0ab7b26b3..30f20291a2e 100644 --- a/aws_lambda_powertools/utilities/idempotency/persistence/dynamodb.py +++ b/aws_lambda_powertools/utilities/idempotency/persistence/dynamodb.py @@ -135,6 +135,7 @@ def _item_to_data_record(self, item: Dict[str, Any]) -> DataRecord: idempotency_key=item[self.key_attr], status=item[self.status_attr], expiry_timestamp=item[self.expiry_attr], + function_timeout=item.get(self.function_timeout_attr), response_data=item.get(self.data_attr), payload_hash=item.get(self.validation_key_attr), ) @@ -178,15 +179,20 @@ def _put_record(self, data_record: DataRecord) -> None: def _update_record(self, data_record: DataRecord): logger.debug(f"Updating record for idempotency key: {data_record.idempotency_key}") - update_expression = "SET #response_data = :response_data, #expiry = :expiry, #status = :status" + update_expression = ( + "SET #response_data = :response_data, #expiry = :expiry, " + "#function_timeout = :function_timeout, #status = :status" + ) expression_attr_values = { ":expiry": data_record.expiry_timestamp, + ":function_timeout": data_record.function_timeout, ":response_data": data_record.response_data, ":status": data_record.status, } expression_attr_names = { "#response_data": self.data_attr, "#expiry": self.expiry_attr, + "#function_timeout": self.function_timeout_attr, "#status": self.status_attr, } diff --git a/tests/functional/idempotency/conftest.py b/tests/functional/idempotency/conftest.py index 17934f4a1ca..90b90e922f0 100644 --- a/tests/functional/idempotency/conftest.py +++ b/tests/functional/idempotency/conftest.py @@ -81,15 +81,22 @@ def default_jmespath(): @pytest.fixture def expected_params_update_item(serialized_lambda_response, hashed_idempotency_key): return { - "ExpressionAttributeNames": {"#expiry": "expiration", "#response_data": "data", "#status": "status"}, + "ExpressionAttributeNames": { + "#expiry": "expiration", + "#function_timeout": "function_timeout", + "#response_data": "data", + "#status": "status", + }, "ExpressionAttributeValues": { ":expiry": stub.ANY, + ":function_timeout": None, ":response_data": serialized_lambda_response, ":status": "COMPLETED", }, "Key": {"id": hashed_idempotency_key}, "TableName": "TEST_TABLE", - "UpdateExpression": "SET #response_data = :response_data, " "#expiry = :expiry, #status = :status", + "UpdateExpression": "SET #response_data = :response_data, " + "#expiry = :expiry, #function_timeout = :function_timeout, #status = :status", } @@ -100,12 +107,14 @@ def expected_params_update_item_with_validation( return { "ExpressionAttributeNames": { "#expiry": "expiration", + "#function_timeout": "function_timeout", "#response_data": "data", "#status": "status", "#validation_key": "validation", }, "ExpressionAttributeValues": { ":expiry": stub.ANY, + ":function_timeout": None, ":response_data": serialized_lambda_response, ":status": "COMPLETED", ":validation_key": hashed_validation_key, @@ -113,7 +122,7 @@ def expected_params_update_item_with_validation( "Key": {"id": hashed_idempotency_key}, "TableName": "TEST_TABLE", "UpdateExpression": "SET #response_data = :response_data, " - "#expiry = :expiry, #status = :status, " + "#expiry = :expiry, #function_timeout = :function_timeout, #status = :status, " "#validation_key = :validation_key", } diff --git a/tests/functional/idempotency/utils.py b/tests/functional/idempotency/utils.py index 1bf55047894..c39c0fcaf41 100644 --- a/tests/functional/idempotency/utils.py +++ b/tests/functional/idempotency/utils.py @@ -38,13 +38,20 @@ def build_idempotency_update_item_stub( idempotency_key_hash = f"{function_name}.{handler_name}#{hash_idempotency_key(data)}" serialized_lambda_response = json_serialize(handler_response) return { - "ExpressionAttributeNames": {"#expiry": "expiration", "#response_data": "data", "#status": "status"}, + "ExpressionAttributeNames": { + "#expiry": "expiration", + "#function_timeout": "function_timeout", + "#response_data": "data", + "#status": "status", + }, "ExpressionAttributeValues": { ":expiry": stub.ANY, + ":function_timeout": None, ":response_data": serialized_lambda_response, ":status": "COMPLETED", }, "Key": {"id": idempotency_key_hash}, "TableName": "TEST_TABLE", - "UpdateExpression": "SET #response_data = :response_data, " "#expiry = :expiry, #status = :status", + "UpdateExpression": "SET #response_data = :response_data, " + "#expiry = :expiry, #function_timeout = :function_timeout, #status = :status", } From 6f972a7e5d4fae8ab942e9c51f60db3af009d020 Mon Sep 17 00:00:00 2001 From: Michael Brewer Date: Thu, 5 May 2022 16:10:19 -0700 Subject: [PATCH 4/4] chore: some clean up --- aws_lambda_powertools/utilities/idempotency/base.py | 5 ++++- .../utilities/idempotency/persistence/base.py | 11 +++++++---- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/aws_lambda_powertools/utilities/idempotency/base.py b/aws_lambda_powertools/utilities/idempotency/base.py index 889450ac210..086a866e31b 100644 --- a/aws_lambda_powertools/utilities/idempotency/base.py +++ b/aws_lambda_powertools/utilities/idempotency/base.py @@ -103,7 +103,7 @@ def _process_idempotency(self): # already exists. If it succeeds, there's no need to call get_record. self.persistence_store.save_inprogress( data=self.data, - function_timeout=self._get_remaining_time_in_seconds(), + remaining_time_in_seconds=self._get_remaining_time_in_seconds(), ) except IdempotencyKeyError: raise @@ -117,6 +117,9 @@ def _process_idempotency(self): return self._get_function_response() def _get_remaining_time_in_seconds(self) -> Optional[int]: + """ + Try to get the time remaining in seconds from the lambda context + """ if self.fn_args and len(self.fn_args) == 2 and getattr(self.fn_args[1], "get_remaining_time_in_millis", None): return self.fn_args[1].get_remaining_time_in_millis() / 1000 return None diff --git a/aws_lambda_powertools/utilities/idempotency/persistence/base.py b/aws_lambda_powertools/utilities/idempotency/persistence/base.py index 0493d5e066c..100a83eda3e 100644 --- a/aws_lambda_powertools/utilities/idempotency/persistence/base.py +++ b/aws_lambda_powertools/utilities/idempotency/persistence/base.py @@ -54,6 +54,8 @@ def __init__( status of the idempotent record expiry_timestamp: int, optional time before the record should expire, in seconds + function_timeout: int, optional + time before the function should time out, in seconds payload_hash: str, optional hashed representation of payload response_data: str, optional @@ -345,7 +347,7 @@ def save_success(self, data: Dict[str, Any], result: dict) -> None: self._save_to_cache(data_record=data_record) - def save_inprogress(self, data: Dict[str, Any], function_timeout: Optional[int] = None) -> None: + def save_inprogress(self, data: Dict[str, Any], remaining_time_in_seconds: Optional[int] = None) -> None: """ Save record of function's execution being in progress @@ -353,11 +355,12 @@ def save_inprogress(self, data: Dict[str, Any], function_timeout: Optional[int] ---------- data: Dict[str, Any] Payload - function_timeout: int, optional + remaining_time_in_seconds: int, optional + Function remaining time in seconds """ function_timeout = ( - self._get_timestamp_after_seconds(function_timeout) - if function_timeout and self.function_timeout_clean_up + self._get_timestamp_after_seconds(remaining_time_in_seconds) + if remaining_time_in_seconds and self.function_timeout_clean_up else None ) data_record = DataRecord(