Skip to content

feat(idempotency): handle lambda timeout scenarios for INPROGRESS records #1387

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
c53d504
feat(idempotency): add option to expire inprogress invocations
rubenfonseca Jul 25, 2022
90d21fb
chore(idempotency): make existing tests pass with expire_in_progress
rubenfonseca Jul 26, 2022
9676e13
chore(idempotency): add tests for expires_in_progress
rubenfonseca Jul 26, 2022
550e466
chore(docs): added docs about `expires_in_progress`
rubenfonseca Jul 26, 2022
bf18cc2
chore(idempotency): refactored expire in-progress logic
rubenfonseca Jul 27, 2022
c88482e
chore(idempotency): add tests
rubenfonseca Jul 27, 2022
f47ab1f
chore(idempotency): remove unused fixtures in tests
rubenfonseca Jul 27, 2022
a4f8ce7
chore(idempotency): make mypy happy
rubenfonseca Jul 27, 2022
975933d
chores(documentation): update sample code for `expires_in_progress`
rubenfonseca Jul 27, 2022
cd90fc1
chore(documentation): replace idempotency diagrams with mermaid.js
rubenfonseca Jul 27, 2022
631370d
chore(idempotency): remove param `expires_in_progress`
rubenfonseca Jul 27, 2022
61f94b3
chore(idempotency): remove more of the old code
rubenfonseca Jul 27, 2022
fa80aed
chore(docs): remove bad comment
rubenfonseca Jul 27, 2022
c706b0c
feat(idempotench): add mechanism to register lambda context
rubenfonseca Jul 27, 2022
1a72214
fix(idempotency): typo
rubenfonseca Jul 27, 2022
0ce0bb2
fix(idempotency): capture the lambda context automatically
rubenfonseca Jul 28, 2022
a2b6a34
chore(idempotency): addressed review comments
rubenfonseca Jul 29, 2022
228a76d
docs(idempotency): include register_lambda_context in doc snippets
heitorlessa Jul 29, 2022
3cb7411
chore(idempotency): added tests for handle_for_status
rubenfonseca Jul 29, 2022
5c09a5a
chore(docs): add documentation to method
rubenfonseca Jul 29, 2022
2e1afd3
chore(idempotency): address comments
rubenfonseca Jul 29, 2022
0e9dfd4
chore(idempotency): simplified strings
rubenfonseca Jul 29, 2022
81ed53d
chore(documentation): addressed comments
rubenfonseca Jul 29, 2022
66b62a6
chore(idempotency): no need to update expire on update
rubenfonseca Jul 29, 2022
84ced8e
docs(idempotency): reorder wording, banners to emphasize the need, an…
heitorlessa Jul 29, 2022
1ee5c25
Merge remote-tracking branch 'rubenfonseca/feat/expire-inprogress' in…
heitorlessa Jul 29, 2022
84280af
docs(idempotency): shorten wording to fit new mermaid SVG
heitorlessa Jul 29, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 31 additions & 1 deletion aws_lambda_powertools/utilities/idempotency/base.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import datetime
import logging
from copy import deepcopy
from typing import Any, Callable, Dict, Optional, Tuple
@@ -73,6 +74,7 @@ def __init__(
self.data = deepcopy(_prepare_data(function_payload))
self.fn_args = function_args
self.fn_kwargs = function_kwargs
self.config = config

persistence_store.configure(config, self.function.__name__)
self.persistence_store = persistence_store
@@ -101,7 +103,9 @@ def _process_idempotency(self):
try:
# We call save_inprogress first as an optimization for the most common case where no idempotent record
# already exists. If it succeeds, there's no need to call get_record.
self.persistence_store.save_inprogress(data=self.data)
self.persistence_store.save_inprogress(
data=self.data, remaining_time_in_millis=self._get_remaining_time_in_millis()
)
except IdempotencyKeyError:
raise
except IdempotencyItemAlreadyExistsError:
@@ -113,6 +117,25 @@ def _process_idempotency(self):

return self._get_function_response()

def _get_remaining_time_in_millis(self) -> Optional[int]:
"""
Tries to determine the remaining time available for the current lambda invocation.

This only works if the idempotent handler decorator is used, since we need to access the lambda context.
However, this could be improved if we start storing the lambda context globally during the invocation. One
way to do this is to register the lambda context when configuring the IdempotencyConfig object.

Returns
-------
Optional[int]
Remaining time in millis, or None if the remaining time cannot be determined.
"""

if self.config.lambda_context is not None:
return self.config.lambda_context.get_remaining_time_in_millis()

return None

def _get_idempotency_record(self) -> DataRecord:
"""
Retrieve the idempotency record from the persistence layer.
@@ -167,6 +190,13 @@ def _handle_for_status(self, data_record: DataRecord) -> Optional[Dict[Any, Any]
raise IdempotencyInconsistentStateError("save_inprogress and get_record return inconsistent results.")

if data_record.status == STATUS_CONSTANTS["INPROGRESS"]:
if data_record.in_progress_expiry_timestamp is not None and data_record.in_progress_expiry_timestamp < int(
datetime.datetime.now().timestamp() * 1000
):
raise IdempotencyInconsistentStateError(
"item should have been expired in-progress because it already time-outed."
)

raise IdempotencyAlreadyInProgressError(
f"Execution already in progress with idempotency key: "
f"{self.persistence_store.event_key_jmespath}={data_record.idempotency_key}"
10 changes: 10 additions & 0 deletions aws_lambda_powertools/utilities/idempotency/config.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from typing import Dict, Optional

from aws_lambda_powertools.utilities.typing import LambdaContext


class IdempotencyConfig:
def __init__(
@@ -12,6 +14,7 @@ def __init__(
use_local_cache: bool = False,
local_cache_max_items: int = 256,
hash_function: str = "md5",
lambda_context: Optional[LambdaContext] = None,
):
"""
Initialize the base persistence layer
@@ -32,6 +35,8 @@ def __init__(
Max number of items to store in local cache, by default 1024
hash_function: str, optional
Function to use for calculating hashes, by default md5.
lambda_context: LambdaContext, optional
Lambda Context containing information about the invocation, function and execution environment.
"""
self.event_key_jmespath = event_key_jmespath
self.payload_validation_jmespath = payload_validation_jmespath
@@ -41,3 +46,8 @@ def __init__(
self.use_local_cache = use_local_cache
self.local_cache_max_items = local_cache_max_items
self.hash_function = hash_function
self.lambda_context: Optional[LambdaContext] = lambda_context

def register_lambda_context(self, lambda_context: LambdaContext):
"""Captures the Lambda context, to calculate the remaining time before the invocation times out"""
self.lambda_context = lambda_context
2 changes: 2 additions & 0 deletions aws_lambda_powertools/utilities/idempotency/idempotency.py
Original file line number Diff line number Diff line change
@@ -62,6 +62,8 @@ def idempotent(
return handler(event, context)

config = config or IdempotencyConfig()
config.register_lambda_context(context)

args = event, context
idempotency_handler = IdempotencyHandler(
function=handler,
Original file line number Diff line number Diff line change
@@ -40,6 +40,7 @@ def __init__(
idempotency_key,
status: str = "",
expiry_timestamp: Optional[int] = None,
in_progress_expiry_timestamp: Optional[int] = None,
response_data: Optional[str] = "",
payload_hash: Optional[str] = None,
) -> None:
@@ -53,6 +54,8 @@ def __init__(
status of the idempotent record
expiry_timestamp: int, optional
time before the record should expire, in seconds
in_progress_expiry_timestamp: int, optional
time before the record should expire while in the INPROGRESS state, in seconds
payload_hash: str, optional
hashed representation of payload
response_data: str, optional
@@ -61,6 +64,7 @@ def __init__(
self.idempotency_key = idempotency_key
self.payload_hash = payload_hash
self.expiry_timestamp = expiry_timestamp
self.in_progress_expiry_timestamp = in_progress_expiry_timestamp
self._status = status
self.response_data = response_data

@@ -328,14 +332,16 @@ def save_success(self, data: Dict[str, Any], result: dict) -> None:

self._save_to_cache(data_record=data_record)

def save_inprogress(self, data: Dict[str, Any]) -> None:
def save_inprogress(self, data: Dict[str, Any], remaining_time_in_millis: Optional[int] = None) -> None:
"""
Save record of function's execution being in progress

Parameters
----------
data: Dict[str, Any]
Payload
remaining_time_in_millis: Optional[int]
If expiry of in-progress invocations is enabled, this will contain the remaining time available in millis
"""
data_record = DataRecord(
idempotency_key=self._get_hashed_idempotency_key(data=data),
@@ -344,6 +350,18 @@ def save_inprogress(self, data: Dict[str, Any]) -> None:
payload_hash=self._get_hashed_payload(data=data),
)

if remaining_time_in_millis:
now = datetime.datetime.now()
period = datetime.timedelta(milliseconds=remaining_time_in_millis)
timestamp = (now + period).timestamp()

data_record.in_progress_expiry_timestamp = int(timestamp * 1000)
else:
warnings.warn(
"Couldn't determine the remaining time left. "
"Did you call register_lambda_context on IdempotencyConfig?"
)

logger.debug(f"Saving in progress record for idempotency key: {data_record.idempotency_key}")

if self._retrieve_from_cache(idempotency_key=data_record.idempotency_key):
Original file line number Diff line number Diff line change
@@ -12,7 +12,7 @@
IdempotencyItemAlreadyExistsError,
IdempotencyItemNotFoundError,
)
from aws_lambda_powertools.utilities.idempotency.persistence.base import DataRecord
from aws_lambda_powertools.utilities.idempotency.persistence.base import STATUS_CONSTANTS, DataRecord

logger = logging.getLogger(__name__)

@@ -25,6 +25,7 @@ def __init__(
static_pk_value: Optional[str] = None,
sort_key_attr: Optional[str] = None,
expiry_attr: str = "expiration",
in_progress_expiry_attr: str = "in_progress_expiration",
status_attr: str = "status",
data_attr: str = "data",
validation_key_attr: str = "validation",
@@ -47,6 +48,8 @@ def __init__(
DynamoDB attribute name for the sort key
expiry_attr: str, optional
DynamoDB attribute name for expiry timestamp, by default "expiration"
in_progress_expiry_attr: str, optional
DynamoDB attribute name for in-progress expiry timestamp, by default "in_progress_expiration"
status_attr: str, optional
DynamoDB attribute name for status, by default "status"
data_attr: str, optional
@@ -85,6 +88,7 @@ def __init__(
self.static_pk_value = static_pk_value
self.sort_key_attr = sort_key_attr
self.expiry_attr = expiry_attr
self.in_progress_expiry_attr = in_progress_expiry_attr
self.status_attr = status_attr
self.data_attr = data_attr
self.validation_key_attr = validation_key_attr
@@ -133,6 +137,7 @@ def _item_to_data_record(self, item: Dict[str, Any]) -> DataRecord:
idempotency_key=item[self.key_attr],
status=item[self.status_attr],
expiry_timestamp=item[self.expiry_attr],
in_progress_expiry_timestamp=item.get(self.in_progress_expiry_attr),
response_data=item.get(self.data_attr),
payload_hash=item.get(self.validation_key_attr),
)
@@ -153,33 +158,75 @@ def _put_record(self, data_record: DataRecord) -> None:
self.status_attr: data_record.status,
}

if data_record.in_progress_expiry_timestamp is not None:
item[self.in_progress_expiry_attr] = data_record.in_progress_expiry_timestamp

if self.payload_validation_enabled:
item[self.validation_key_attr] = data_record.payload_hash

now = datetime.datetime.now()
try:
logger.debug(f"Putting record for idempotency key: {data_record.idempotency_key}")

# | LOCKED | RETRY if status = "INPROGRESS" | RETRY
# |----------------|-------------------------------------------------------|-------------> .... (time)
# | Lambda Idempotency Record
# | Timeout Timeout
# | (in_progress_expiry) (expiry)

# Conditions to successfully save a record:

# The idempotency key does not exist:
# - first time that this invocation key is used
# - previous invocation with the same key was deleted due to TTL
idempotency_key_not_exist = "attribute_not_exists(#id)"

# The idempotency record exists but it's expired:
idempotency_expiry_expired = "#expiry < :now"

# The status of the record is "INPROGRESS", there is an in-progress expiry timestamp, but it's expired
inprogress_expiry_expired = " AND ".join(
[
"#status = :inprogress",
"attribute_exists(#in_progress_expiry)",
"#in_progress_expiry < :now_in_millis",
]
)

condition_expression = (
f"{idempotency_key_not_exist} OR {idempotency_expiry_expired} OR ({inprogress_expiry_expired})"
)

self.table.put_item(
Item=item,
ConditionExpression="attribute_not_exists(#id) OR #now < :now",
ExpressionAttributeNames={"#id": self.key_attr, "#now": self.expiry_attr},
ExpressionAttributeValues={":now": int(now.timestamp())},
ConditionExpression=condition_expression,
ExpressionAttributeNames={
"#id": self.key_attr,
"#expiry": self.expiry_attr,
"#in_progress_expiry": self.in_progress_expiry_attr,
"#status": self.status_attr,
},
ExpressionAttributeValues={
":now": int(now.timestamp()),
":now_in_millis": int(now.timestamp() * 1000),
":inprogress": STATUS_CONSTANTS["INPROGRESS"],
},
)
except self.table.meta.client.exceptions.ConditionalCheckFailedException:
logger.debug(f"Failed to put record for already existing idempotency key: {data_record.idempotency_key}")
raise IdempotencyItemAlreadyExistsError

def _update_record(self, data_record: DataRecord):
logger.debug(f"Updating record for idempotency key: {data_record.idempotency_key}")
update_expression = "SET #response_data = :response_data, #expiry = :expiry, #status = :status"
update_expression = "SET #response_data = :response_data, #expiry = :expiry, " "#status = :status"
expression_attr_values = {
":expiry": data_record.expiry_timestamp,
":response_data": data_record.response_data,
":status": data_record.status,
}
expression_attr_names = {
"#response_data": self.data_attr,
"#expiry": self.expiry_attr,
"#response_data": self.data_attr,
"#status": self.status_attr,
}

Binary file removed docs/media/idempotent_sequence.png
Binary file not shown.
Binary file removed docs/media/idempotent_sequence_exception.png
Binary file not shown.
209 changes: 166 additions & 43 deletions docs/utilities/idempotency.md

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion tests/functional/event_handler/test_api_gateway.py
Original file line number Diff line number Diff line change
@@ -283,7 +283,7 @@ def test_base64_encode():

@app.get("/my/path", compress=True)
def read_image() -> Response:
return Response(200, "image/png", read_media("idempotent_sequence_exception.png"))
return Response(200, "image/png", read_media("tracer_utility_showcase.png"))

# WHEN calling the event handler
result = app(mock_event, None)
78 changes: 50 additions & 28 deletions tests/functional/idempotency/conftest.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import datetime
import json
from collections import namedtuple
from decimal import Decimal
from unittest import mock

@@ -32,14 +31,17 @@ def lambda_apigw_event():

@pytest.fixture
def lambda_context():
lambda_context = {
"function_name": "test-func",
"memory_limit_in_mb": 128,
"invoked_function_arn": "arn:aws:lambda:eu-west-1:809313241234:function:test-func",
"aws_request_id": "52fdfc07-2182-154f-163f-5f0f9a621d72",
}
class LambdaContext:
def __init__(self):
self.function_name = "test-func"
self.memory_limit_in_mb = 128
self.invoked_function_arn = "arn:aws:lambda:eu-west-1:809313241234:function:test-func"
self.aws_request_id = "52fdfc07-2182-154f-163f-5f0f9a621d72"

def get_remaining_time_in_millis(self) -> int:
return 1000

return namedtuple("LambdaContext", lambda_context.keys())(*lambda_context.values())
return LambdaContext()


@pytest.fixture
@@ -77,7 +79,11 @@ def default_jmespath():
@pytest.fixture
def expected_params_update_item(serialized_lambda_response, hashed_idempotency_key):
return {
"ExpressionAttributeNames": {"#expiry": "expiration", "#response_data": "data", "#status": "status"},
"ExpressionAttributeNames": {
"#expiry": "expiration",
"#response_data": "data",
"#status": "status",
},
"ExpressionAttributeValues": {
":expiry": stub.ANY,
":response_data": serialized_lambda_response,
@@ -108,31 +114,55 @@ def expected_params_update_item_with_validation(
},
"Key": {"id": hashed_idempotency_key},
"TableName": "TEST_TABLE",
"UpdateExpression": "SET #response_data = :response_data, "
"#expiry = :expiry, #status = :status, "
"#validation_key = :validation_key",
"UpdateExpression": (
"SET #response_data = :response_data, "
"#expiry = :expiry, #status = :status, "
"#validation_key = :validation_key"
),
}


@pytest.fixture
def expected_params_put_item(hashed_idempotency_key):
return {
"ConditionExpression": "attribute_not_exists(#id) OR #now < :now",
"ExpressionAttributeNames": {"#id": "id", "#now": "expiration"},
"ExpressionAttributeValues": {":now": stub.ANY},
"Item": {"expiration": stub.ANY, "id": hashed_idempotency_key, "status": "INPROGRESS"},
"ConditionExpression": (
"attribute_not_exists(#id) OR #expiry < :now OR "
"(#status = :inprogress AND attribute_exists(#in_progress_expiry) AND #in_progress_expiry < :now_in_millis)"
),
"ExpressionAttributeNames": {
"#id": "id",
"#expiry": "expiration",
"#status": "status",
"#in_progress_expiry": "in_progress_expiration",
},
"ExpressionAttributeValues": {":now": stub.ANY, ":now_in_millis": stub.ANY, ":inprogress": "INPROGRESS"},
"Item": {
"expiration": stub.ANY,
"id": hashed_idempotency_key,
"status": "INPROGRESS",
"in_progress_expiration": stub.ANY,
},
"TableName": "TEST_TABLE",
}


@pytest.fixture
def expected_params_put_item_with_validation(hashed_idempotency_key, hashed_validation_key):
return {
"ConditionExpression": "attribute_not_exists(#id) OR #now < :now",
"ExpressionAttributeNames": {"#id": "id", "#now": "expiration"},
"ExpressionAttributeValues": {":now": stub.ANY},
"ConditionExpression": (
"attribute_not_exists(#id) OR #expiry < :now OR "
"(#status = :inprogress AND attribute_exists(#in_progress_expiry) AND #in_progress_expiry < :now_in_millis)"
),
"ExpressionAttributeNames": {
"#id": "id",
"#expiry": "expiration",
"#status": "status",
"#in_progress_expiry": "in_progress_expiration",
},
"ExpressionAttributeValues": {":now": stub.ANY, ":now_in_millis": stub.ANY, ":inprogress": "INPROGRESS"},
"Item": {
"expiration": stub.ANY,
"in_progress_expiration": stub.ANY,
"id": hashed_idempotency_key,
"status": "INPROGRESS",
"validation": hashed_validation_key,
@@ -176,6 +206,7 @@ def idempotency_config(config, request, default_jmespath):
return IdempotencyConfig(
event_key_jmespath=request.param.get("event_key_jmespath") or default_jmespath,
use_local_cache=request.param["use_local_cache"],
payload_validation_jmespath=request.param.get("payload_validation_jmespath") or "",
)


@@ -184,15 +215,6 @@ def config_without_jmespath(config, request):
return IdempotencyConfig(use_local_cache=request.param["use_local_cache"])


@pytest.fixture
def config_with_validation(config, request, default_jmespath):
return IdempotencyConfig(
event_key_jmespath=default_jmespath,
use_local_cache=request.param,
payload_validation_jmespath="requestContext",
)


@pytest.fixture
def config_with_jmespath_options(config, request):
class CustomFunctions(functions.Functions):
239 changes: 204 additions & 35 deletions tests/functional/idempotency/test_idempotency.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import copy
import datetime
import sys
import warnings
from hashlib import md5
from unittest.mock import MagicMock

@@ -10,7 +12,7 @@

from aws_lambda_powertools.utilities.data_classes import APIGatewayProxyEventV2, event_source
from aws_lambda_powertools.utilities.idempotency import DynamoDBPersistenceLayer, IdempotencyConfig
from aws_lambda_powertools.utilities.idempotency.base import _prepare_data
from aws_lambda_powertools.utilities.idempotency.base import MAX_RETRIES, IdempotencyHandler, _prepare_data
from aws_lambda_powertools.utilities.idempotency.exceptions import (
IdempotencyAlreadyInProgressError,
IdempotencyInconsistentStateError,
@@ -208,9 +210,6 @@ def test_idempotent_lambda_first_execution(
expected_params_update_item,
expected_params_put_item,
lambda_response,
serialized_lambda_response,
deserialized_lambda_response,
hashed_idempotency_key,
lambda_context,
):
"""
@@ -295,7 +294,11 @@ def test_idempotent_lambda_first_execution_event_mutation(
event = copy.deepcopy(lambda_apigw_event)
stubber = stub.Stubber(persistence_store.table.meta.client)
ddb_response = {}
stubber.add_response("put_item", ddb_response, build_idempotency_put_item_stub(data=event["body"]))
stubber.add_response(
"put_item",
ddb_response,
build_idempotency_put_item_stub(data=event["body"]),
)
stubber.add_response(
"update_item",
ddb_response,
@@ -319,15 +322,13 @@ def test_idempotent_lambda_expired(
idempotency_config: IdempotencyConfig,
persistence_store: DynamoDBPersistenceLayer,
lambda_apigw_event,
timestamp_expired,
lambda_response,
expected_params_update_item,
expected_params_put_item,
hashed_idempotency_key,
lambda_context,
):
"""
Test idempotent decorator when lambda is called with an event it succesfully handled already, but outside of the
Test idempotent decorator when lambda is called with an event it successfully handled already, but outside of the
expiry window
"""

@@ -354,8 +355,6 @@ def test_idempotent_lambda_exception(
idempotency_config: IdempotencyConfig,
persistence_store: DynamoDBPersistenceLayer,
lambda_apigw_event,
timestamp_future,
lambda_response,
hashed_idempotency_key,
expected_params_put_item,
lambda_context,
@@ -389,10 +388,15 @@ def lambda_handler(event, context):


@pytest.mark.parametrize(
"config_with_validation", [{"use_local_cache": False}, {"use_local_cache": True}], indirect=True
"idempotency_config",
[
{"use_local_cache": False, "payload_validation_jmespath": "requestContext"},
{"use_local_cache": True, "payload_validation_jmespath": "requestContext"},
],
indirect=True,
)
def test_idempotent_lambda_already_completed_with_validation_bad_payload(
config_with_validation: IdempotencyConfig,
idempotency_config: IdempotencyConfig,
persistence_store: DynamoDBPersistenceLayer,
lambda_apigw_event,
timestamp_future,
@@ -422,7 +426,7 @@ def test_idempotent_lambda_already_completed_with_validation_bad_payload(
stubber.add_response("get_item", ddb_response, expected_params)
stubber.activate()

@idempotent(config=config_with_validation, persistence_store=persistence_store)
@idempotent(config=idempotency_config, persistence_store=persistence_store)
def lambda_handler(event, context):
return lambda_response

@@ -445,7 +449,7 @@ def test_idempotent_lambda_expired_during_request(
lambda_context,
):
"""
Test idempotent decorator when lambda is called with an event it succesfully handled already. Persistence store
Test idempotent decorator when lambda is called with an event it successfully handled already. Persistence store
returns inconsistent/rapidly changing result between put_item and get_item calls.
"""

@@ -495,9 +499,6 @@ def test_idempotent_persistence_exception_deleting(
idempotency_config: IdempotencyConfig,
persistence_store: DynamoDBPersistenceLayer,
lambda_apigw_event,
timestamp_future,
lambda_response,
hashed_idempotency_key,
expected_params_put_item,
lambda_context,
):
@@ -530,9 +531,6 @@ def test_idempotent_persistence_exception_updating(
idempotency_config: IdempotencyConfig,
persistence_store: DynamoDBPersistenceLayer,
lambda_apigw_event,
timestamp_future,
lambda_response,
hashed_idempotency_key,
expected_params_put_item,
lambda_context,
):
@@ -565,10 +563,6 @@ def test_idempotent_persistence_exception_getting(
idempotency_config: IdempotencyConfig,
persistence_store: DynamoDBPersistenceLayer,
lambda_apigw_event,
timestamp_future,
lambda_response,
hashed_idempotency_key,
expected_params_put_item,
lambda_context,
):
"""
@@ -594,17 +588,20 @@ def lambda_handler(event, context):


@pytest.mark.parametrize(
"config_with_validation", [{"use_local_cache": False}, {"use_local_cache": True}], indirect=True
"idempotency_config",
[
{"use_local_cache": False, "payload_validation_jmespath": "requestContext"},
{"use_local_cache": True, "payload_validation_jmespath": "requestContext"},
],
indirect=True,
)
def test_idempotent_lambda_first_execution_with_validation(
config_with_validation: IdempotencyConfig,
idempotency_config: IdempotencyConfig,
persistence_store: DynamoDBPersistenceLayer,
lambda_apigw_event,
expected_params_update_item_with_validation,
expected_params_put_item_with_validation,
lambda_response,
hashed_idempotency_key,
hashed_validation_key,
lambda_context,
):
"""
@@ -617,7 +614,7 @@ def test_idempotent_lambda_first_execution_with_validation(
stubber.add_response("update_item", ddb_response, expected_params_update_item_with_validation)
stubber.activate()

@idempotent(config=config_with_validation, persistence_store=persistence_store)
@idempotent(config=idempotency_config, persistence_store=persistence_store)
def lambda_handler(event, context):
return lambda_response

@@ -679,6 +676,118 @@ def lambda_handler(event, context):
stubber.deactivate()


@pytest.mark.parametrize("idempotency_config", [{"use_local_cache": False}, {"use_local_cache": True}], indirect=True)
def test_idempotent_lambda_expires_in_progress_before_expire(
idempotency_config: IdempotencyConfig,
persistence_store: DynamoDBPersistenceLayer,
lambda_apigw_event,
timestamp_future,
lambda_response,
hashed_idempotency_key,
lambda_context,
):
stubber = stub.Stubber(persistence_store.table.meta.client)

stubber.add_client_error("put_item", "ConditionalCheckFailedException")

now = datetime.datetime.now()
period = datetime.timedelta(seconds=5)
timestamp_expires_in_progress = int((now + period).timestamp() * 1000)

expected_params_get_item = {
"TableName": TABLE_NAME,
"Key": {"id": hashed_idempotency_key},
"ConsistentRead": True,
}
ddb_response_get_item = {
"Item": {
"id": {"S": hashed_idempotency_key},
"expiration": {"N": timestamp_future},
"in_progress_expiration": {"N": str(timestamp_expires_in_progress)},
"data": {"S": '{"message": "test", "statusCode": 200'},
"status": {"S": "INPROGRESS"},
}
}
stubber.add_response("get_item", ddb_response_get_item, expected_params_get_item)

stubber.activate()

@idempotent(config=idempotency_config, persistence_store=persistence_store)
def lambda_handler(event, context):
return lambda_response

with pytest.raises(IdempotencyAlreadyInProgressError):
lambda_handler(lambda_apigw_event, lambda_context)

stubber.assert_no_pending_responses()
stubber.deactivate()


@pytest.mark.parametrize("idempotency_config", [{"use_local_cache": False}, {"use_local_cache": True}], indirect=True)
def test_idempotent_lambda_expires_in_progress_after_expire(
idempotency_config: IdempotencyConfig,
persistence_store: DynamoDBPersistenceLayer,
lambda_apigw_event,
timestamp_future,
lambda_response,
hashed_idempotency_key,
lambda_context,
):
stubber = stub.Stubber(persistence_store.table.meta.client)

for _ in range(MAX_RETRIES + 1):
stubber.add_client_error("put_item", "ConditionalCheckFailedException")

one_second_ago = datetime.datetime.now() - datetime.timedelta(seconds=1)
expected_params_get_item = {
"TableName": TABLE_NAME,
"Key": {"id": hashed_idempotency_key},
"ConsistentRead": True,
}
ddb_response_get_item = {
"Item": {
"id": {"S": hashed_idempotency_key},
"expiration": {"N": timestamp_future},
"in_progress_expiration": {"N": str(int(one_second_ago.timestamp() * 1000))},
"data": {"S": '{"message": "test", "statusCode": 200'},
"status": {"S": "INPROGRESS"},
}
}
stubber.add_response("get_item", ddb_response_get_item, expected_params_get_item)

stubber.activate()

@idempotent(config=idempotency_config, persistence_store=persistence_store)
def lambda_handler(event, context):
return lambda_response

with pytest.raises(IdempotencyInconsistentStateError):
lambda_handler(lambda_apigw_event, lambda_context)

stubber.assert_no_pending_responses()
stubber.deactivate()


def test_idempotent_lambda_expires_in_progress_unavailable_remaining_time():
mock_event = {"data": "value"}
idempotency_key = "test-func.function#" + hash_idempotency_key(mock_event)
persistence_layer = MockPersistenceLayer(expected_idempotency_key=idempotency_key)
expected_result = {"message": "Foo"}

@idempotent_function(persistence_store=persistence_layer, data_keyword_argument="record")
def function(record):
return expected_result

with warnings.catch_warnings(record=True) as w:
warnings.simplefilter("default")
function(record=mock_event)
assert len(w) == 1
assert (
str(w[-1].message)
== "Couldn't determine the remaining time left. Did you call register_lambda_context on IdempotencyConfig?"
)


def test_data_record_invalid_status_value():
data_record = DataRecord("key", status="UNSUPPORTED_STATUS")
with pytest.raises(IdempotencyInvalidStatusError) as e:
@@ -710,6 +819,62 @@ def test_data_record_json_to_dict_mapping_when_response_data_none():
assert response_data is None


@pytest.mark.parametrize("idempotency_config", [{"use_local_cache": True}], indirect=True)
def test_handler_for_status_expired_data_record(
idempotency_config: IdempotencyConfig, persistence_store: DynamoDBPersistenceLayer
):
idempotency_handler = IdempotencyHandler(
function=lambda a: a,
function_payload={},
config=idempotency_config,
persistence_store=persistence_store,
)
data_record = DataRecord("key", status="EXPIRED", response_data=None)

with pytest.raises(IdempotencyInconsistentStateError):
idempotency_handler._handle_for_status(data_record)


@pytest.mark.parametrize("idempotency_config", [{"use_local_cache": True}], indirect=True)
def test_handler_for_status_inprogress_data_record_inconsistent(
idempotency_config: IdempotencyConfig, persistence_store: DynamoDBPersistenceLayer
):
idempotency_handler = IdempotencyHandler(
function=lambda a: a,
function_payload={},
config=idempotency_config,
persistence_store=persistence_store,
)

now = datetime.datetime.now()
period = datetime.timedelta(milliseconds=100)
timestamp = int((now - period).timestamp() * 1000)
data_record = DataRecord("key", in_progress_expiry_timestamp=timestamp, status="INPROGRESS", response_data=None)

with pytest.raises(IdempotencyInconsistentStateError):
idempotency_handler._handle_for_status(data_record)


@pytest.mark.parametrize("idempotency_config", [{"use_local_cache": True}], indirect=True)
def test_handler_for_status_inprogress_data_record_consistent(
idempotency_config: IdempotencyConfig, persistence_store: DynamoDBPersistenceLayer
):
idempotency_handler = IdempotencyHandler(
function=lambda a: a,
function_payload={},
config=idempotency_config,
persistence_store=persistence_store,
)

now = datetime.datetime.now()
period = datetime.timedelta(milliseconds=100)
timestamp = int((now + period).timestamp() * 1000)
data_record = DataRecord("key", in_progress_expiry_timestamp=timestamp, status="INPROGRESS", response_data=None)

with pytest.raises(IdempotencyAlreadyInProgressError):
idempotency_handler._handle_for_status(data_record)


@pytest.mark.parametrize("idempotency_config", [{"use_local_cache": True}], indirect=True)
def test_in_progress_never_saved_to_cache(
idempotency_config: IdempotencyConfig, persistence_store: DynamoDBPersistenceLayer
@@ -800,7 +965,7 @@ def test_is_missing_idempotency_key():
"idempotency_config", [{"use_local_cache": False, "event_key_jmespath": "body"}], indirect=True
)
def test_default_no_raise_on_missing_idempotency_key(
idempotency_config: IdempotencyConfig, persistence_store: DynamoDBPersistenceLayer, lambda_context
idempotency_config: IdempotencyConfig, persistence_store: DynamoDBPersistenceLayer
):
# GIVEN a persistence_store with use_local_cache = False and event_key_jmespath = "body"
function_name = "foo"
@@ -817,10 +982,14 @@ def test_default_no_raise_on_missing_idempotency_key(


@pytest.mark.parametrize(
"idempotency_config", [{"use_local_cache": False, "event_key_jmespath": "[body, x]"}], indirect=True
"idempotency_config",
[
{"use_local_cache": False, "event_key_jmespath": "[body, x]"},
],
indirect=True,
)
def test_raise_on_no_idempotency_key(
idempotency_config: IdempotencyConfig, persistence_store: DynamoDBPersistenceLayer, lambda_context
idempotency_config: IdempotencyConfig, persistence_store: DynamoDBPersistenceLayer
):
# GIVEN a persistence_store with raise_on_no_idempotency_key and no idempotency key in the request
persistence_store.configure(idempotency_config)
@@ -842,12 +1011,12 @@ def test_raise_on_no_idempotency_key(
{
"use_local_cache": False,
"event_key_jmespath": "[requestContext.authorizer.claims.sub, powertools_json(body).id]",
}
},
],
indirect=True,
)
def test_jmespath_with_powertools_json(
idempotency_config: IdempotencyConfig, persistence_store: DynamoDBPersistenceLayer, lambda_context
idempotency_config: IdempotencyConfig, persistence_store: DynamoDBPersistenceLayer
):
# GIVEN an event_key_jmespath with powertools_json custom function
persistence_store.configure(idempotency_config, "handler")
@@ -868,7 +1037,7 @@ def test_jmespath_with_powertools_json(

@pytest.mark.parametrize("config_with_jmespath_options", ["powertools_json(data).payload"], indirect=True)
def test_custom_jmespath_function_overrides_builtin_functions(
config_with_jmespath_options: IdempotencyConfig, persistence_store: DynamoDBPersistenceLayer, lambda_context
config_with_jmespath_options: IdempotencyConfig, persistence_store: DynamoDBPersistenceLayer
):
# GIVEN a persistence store with a custom jmespath_options
# AND use a builtin powertools custom function
31 changes: 25 additions & 6 deletions tests/functional/idempotency/utils.py
Original file line number Diff line number Diff line change
@@ -12,14 +12,29 @@ def hash_idempotency_key(data: Any):


def build_idempotency_put_item_stub(
data: Dict, function_name: str = "test-func", handler_name: str = "lambda_handler"
data: Dict,
function_name: str = "test-func",
handler_name: str = "lambda_handler",
) -> Dict:
idempotency_key_hash = f"{function_name}.{handler_name}#{hash_idempotency_key(data)}"
return {
"ConditionExpression": "attribute_not_exists(#id) OR #now < :now",
"ExpressionAttributeNames": {"#id": "id", "#now": "expiration"},
"ExpressionAttributeValues": {":now": stub.ANY},
"Item": {"expiration": stub.ANY, "id": idempotency_key_hash, "status": "INPROGRESS"},
"ConditionExpression": (
"attribute_not_exists(#id) OR #expiry < :now OR "
"(#status = :inprogress AND attribute_exists(#in_progress_expiry) AND #in_progress_expiry < :now_in_millis)"
),
"ExpressionAttributeNames": {
"#id": "id",
"#expiry": "expiration",
"#status": "status",
"#in_progress_expiry": "in_progress_expiration",
},
"ExpressionAttributeValues": {":now": stub.ANY, ":now_in_millis": stub.ANY, ":inprogress": "INPROGRESS"},
"Item": {
"expiration": stub.ANY,
"id": idempotency_key_hash,
"status": "INPROGRESS",
"in_progress_expiration": stub.ANY,
},
"TableName": "TEST_TABLE",
}

@@ -33,7 +48,11 @@ def build_idempotency_update_item_stub(
idempotency_key_hash = f"{function_name}.{handler_name}#{hash_idempotency_key(data)}"
serialized_lambda_response = json_serialize(handler_response)
return {
"ExpressionAttributeNames": {"#expiry": "expiration", "#response_data": "data", "#status": "status"},
"ExpressionAttributeNames": {
"#expiry": "expiration",
"#response_data": "data",
"#status": "status",
},
"ExpressionAttributeValues": {
":expiry": stub.ANY,
":response_data": serialized_lambda_response,