Skip to content

Commit 449b183

Browse files
committed
* 'develop' of https://github.com/awslabs/aws-lambda-powertools-python: feat(idempotency): handle lambda timeout scenarios for INPROGRESS records (#1387) chore(deps): bump jsii from 1.57.0 to 1.63.1 (#1390) chore(deps): bump constructs from 10.1.1 to 10.1.59 (#1396) chore(deps-dev): bump flake8-isort from 4.1.1 to 4.1.2.post0 (#1384) docs(examples): enforce and fix all mypy errors (#1393) chore(ci): drop 3.6 from workflows (#1395)
2 parents 77fc1bc + 160feae commit 449b183

39 files changed

+814
-284
lines changed

Makefile

+1-1
Original file line numberDiff line numberDiff line change
@@ -99,4 +99,4 @@ changelog:
9999
docker run -v "${PWD}":/workdir quay.io/git-chglog/git-chglog > CHANGELOG.md
100100

101101
mypy:
102-
poetry run mypy --pretty aws_lambda_powertools
102+
poetry run mypy --pretty aws_lambda_powertools examples

aws_lambda_powertools/event_handler/__init__.py

+8-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,14 @@
22
Event handler decorators for common Lambda events
33
"""
44

5-
from .api_gateway import ALBResolver, APIGatewayHttpResolver, ApiGatewayResolver, APIGatewayRestResolver, CORSConfig, Response
5+
from .api_gateway import (
6+
ALBResolver,
7+
APIGatewayHttpResolver,
8+
ApiGatewayResolver,
9+
APIGatewayRestResolver,
10+
CORSConfig,
11+
Response,
12+
)
613
from .appsync import AppSyncResolver
714

815
__all__ = [

aws_lambda_powertools/event_handler/appsync.py

+1
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,7 @@ def lambda_handler(event, context):
142142
ValueError
143143
If we could not find a field resolver
144144
"""
145+
# Maintenance: revisit generics/overload to fix [attr-defined] in mypy usage
145146
BaseRouter.current_event = data_model(event)
146147
BaseRouter.lambda_context = context
147148
resolver = self._get_resolver(BaseRouter.current_event.type_name, BaseRouter.current_event.field_name)

aws_lambda_powertools/tracing/base.py

+59-59
Original file line numberDiff line numberDiff line change
@@ -2,39 +2,34 @@
22
import numbers
33
import traceback
44
from contextlib import contextmanager
5-
from typing import Any, AsyncContextManager, ContextManager, List, NoReturn, Optional, Set, Union
5+
from typing import Any, Generator, List, NoReturn, Optional, Sequence, Union
66

77

8-
class BaseProvider(abc.ABC):
9-
@abc.abstractmethod # type: ignore
10-
@contextmanager
11-
def in_subsegment(self, name=None, **kwargs) -> ContextManager:
12-
"""Return a subsegment context manger.
8+
class BaseSegment(abc.ABC):
9+
"""Holds common properties and methods on segment and subsegment."""
10+
11+
@abc.abstractmethod
12+
def close(self, end_time: Optional[int] = None):
13+
"""Close the trace entity by setting `end_time`
14+
and flip the in progress flag to False.
1315
1416
Parameters
1517
----------
16-
name: str
17-
Subsegment name
18-
kwargs: Optional[dict]
19-
Optional parameters to be propagated to segment
18+
end_time: int
19+
Time in epoch seconds, by default current time will be used.
2020
"""
2121

22-
@abc.abstractmethod # type: ignore
23-
@contextmanager
24-
def in_subsegment_async(self, name=None, **kwargs) -> AsyncContextManager:
25-
"""Return a subsegment async context manger.
22+
@abc.abstractmethod
23+
def add_subsegment(self, subsegment: Any):
24+
"""Add input subsegment as a child subsegment."""
2625

27-
Parameters
28-
----------
29-
name: str
30-
Subsegment name
31-
kwargs: Optional[dict]
32-
Optional parameters to be propagated to segment
33-
"""
26+
@abc.abstractmethod
27+
def remove_subsegment(self, subsegment: Any):
28+
"""Remove input subsegment from child subsegments."""
3429

3530
@abc.abstractmethod
3631
def put_annotation(self, key: str, value: Union[str, numbers.Number, bool]) -> NoReturn:
37-
"""Annotate current active trace entity with a key-value pair.
32+
"""Annotate segment or subsegment with a key-value pair.
3833
3934
Note: Annotations will be indexed for later search query.
4035
@@ -48,9 +43,8 @@ def put_annotation(self, key: str, value: Union[str, numbers.Number, bool]) -> N
4843

4944
@abc.abstractmethod
5045
def put_metadata(self, key: str, value: Any, namespace: str = "default") -> NoReturn:
51-
"""Add metadata to the current active trace entity.
52-
53-
Note: Metadata is not indexed but can be later retrieved by BatchGetTraces API.
46+
"""Add metadata to segment or subsegment. Metadata is not indexed
47+
but can be later retrieved by BatchGetTraces API.
5448
5549
Parameters
5650
----------
@@ -63,45 +57,52 @@ def put_metadata(self, key: str, value: Any, namespace: str = "default") -> NoRe
6357
"""
6458

6559
@abc.abstractmethod
66-
def patch(self, modules: Set[str]) -> NoReturn:
67-
"""Instrument a set of supported libraries
60+
def add_exception(self, exception: BaseException, stack: List[traceback.StackSummary], remote: bool = False):
61+
"""Add an exception to trace entities.
6862
6963
Parameters
7064
----------
71-
modules: Set[str]
72-
Set of modules to be patched
73-
"""
74-
75-
@abc.abstractmethod
76-
def patch_all(self) -> NoReturn:
77-
"""Instrument all supported libraries"""
65+
exception: Exception
66+
Caught exception
67+
stack: List[traceback.StackSummary]
68+
List of traceback summaries
7869
70+
Output from `traceback.extract_stack()`.
71+
remote: bool
72+
Whether it's a client error (False) or downstream service error (True), by default False
73+
"""
7974

80-
class BaseSegment(abc.ABC):
81-
"""Holds common properties and methods on segment and subsegment."""
8275

76+
class BaseProvider(abc.ABC):
8377
@abc.abstractmethod
84-
def close(self, end_time: Optional[int] = None):
85-
"""Close the trace entity by setting `end_time`
86-
and flip the in progress flag to False.
78+
@contextmanager
79+
def in_subsegment(self, name=None, **kwargs) -> Generator[BaseSegment, None, None]:
80+
"""Return a subsegment context manger.
8781
8882
Parameters
8983
----------
90-
end_time: int
91-
Time in epoch seconds, by default current time will be used.
84+
name: str
85+
Subsegment name
86+
kwargs: Optional[dict]
87+
Optional parameters to be propagated to segment
9288
"""
9389

9490
@abc.abstractmethod
95-
def add_subsegment(self, subsegment: Any):
96-
"""Add input subsegment as a child subsegment."""
91+
@contextmanager
92+
def in_subsegment_async(self, name=None, **kwargs) -> Generator[BaseSegment, None, None]:
93+
"""Return a subsegment async context manger.
9794
98-
@abc.abstractmethod
99-
def remove_subsegment(self, subsegment: Any):
100-
"""Remove input subsegment from child subsegments."""
95+
Parameters
96+
----------
97+
name: str
98+
Subsegment name
99+
kwargs: Optional[dict]
100+
Optional parameters to be propagated to segment
101+
"""
101102

102103
@abc.abstractmethod
103104
def put_annotation(self, key: str, value: Union[str, numbers.Number, bool]) -> NoReturn:
104-
"""Annotate segment or subsegment with a key-value pair.
105+
"""Annotate current active trace entity with a key-value pair.
105106
106107
Note: Annotations will be indexed for later search query.
107108
@@ -115,8 +116,9 @@ def put_annotation(self, key: str, value: Union[str, numbers.Number, bool]) -> N
115116

116117
@abc.abstractmethod
117118
def put_metadata(self, key: str, value: Any, namespace: str = "default") -> NoReturn:
118-
"""Add metadata to segment or subsegment. Metadata is not indexed
119-
but can be later retrieved by BatchGetTraces API.
119+
"""Add metadata to the current active trace entity.
120+
121+
Note: Metadata is not indexed but can be later retrieved by BatchGetTraces API.
120122
121123
Parameters
122124
----------
@@ -129,17 +131,15 @@ def put_metadata(self, key: str, value: Any, namespace: str = "default") -> NoRe
129131
"""
130132

131133
@abc.abstractmethod
132-
def add_exception(self, exception: BaseException, stack: List[traceback.StackSummary], remote: bool = False):
133-
"""Add an exception to trace entities.
134+
def patch(self, modules: Sequence[str]) -> NoReturn:
135+
"""Instrument a set of supported libraries
134136
135137
Parameters
136138
----------
137-
exception: Exception
138-
Caught exception
139-
stack: List[traceback.StackSummary]
140-
List of traceback summaries
141-
142-
Output from `traceback.extract_stack()`.
143-
remote: bool
144-
Whether it's a client error (False) or downstream service error (True), by default False
139+
modules: Set[str]
140+
Set of modules to be patched
145141
"""
142+
143+
@abc.abstractmethod
144+
def patch_all(self) -> NoReturn:
145+
"""Instrument all supported libraries"""

aws_lambda_powertools/tracing/tracer.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ def __init__(
155155
self.__build_config(
156156
service=service, disabled=disabled, auto_patch=auto_patch, patch_modules=patch_modules, provider=provider
157157
)
158-
self.provider = self._config["provider"]
158+
self.provider: BaseProvider = self._config["provider"]
159159
self.disabled = self._config["disabled"]
160160
self.service = self._config["service"]
161161
self.auto_patch = self._config["auto_patch"]

aws_lambda_powertools/utilities/idempotency/base.py

+31-1
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import datetime
12
import logging
23
from copy import deepcopy
34
from typing import Any, Callable, Dict, Optional, Tuple
@@ -73,6 +74,7 @@ def __init__(
7374
self.data = deepcopy(_prepare_data(function_payload))
7475
self.fn_args = function_args
7576
self.fn_kwargs = function_kwargs
77+
self.config = config
7678

7779
persistence_store.configure(config, self.function.__name__)
7880
self.persistence_store = persistence_store
@@ -101,7 +103,9 @@ def _process_idempotency(self):
101103
try:
102104
# We call save_inprogress first as an optimization for the most common case where no idempotent record
103105
# already exists. If it succeeds, there's no need to call get_record.
104-
self.persistence_store.save_inprogress(data=self.data)
106+
self.persistence_store.save_inprogress(
107+
data=self.data, remaining_time_in_millis=self._get_remaining_time_in_millis()
108+
)
105109
except IdempotencyKeyError:
106110
raise
107111
except IdempotencyItemAlreadyExistsError:
@@ -113,6 +117,25 @@ def _process_idempotency(self):
113117

114118
return self._get_function_response()
115119

120+
def _get_remaining_time_in_millis(self) -> Optional[int]:
121+
"""
122+
Tries to determine the remaining time available for the current lambda invocation.
123+
124+
This only works if the idempotent handler decorator is used, since we need to access the lambda context.
125+
However, this could be improved if we start storing the lambda context globally during the invocation. One
126+
way to do this is to register the lambda context when configuring the IdempotencyConfig object.
127+
128+
Returns
129+
-------
130+
Optional[int]
131+
Remaining time in millis, or None if the remaining time cannot be determined.
132+
"""
133+
134+
if self.config.lambda_context is not None:
135+
return self.config.lambda_context.get_remaining_time_in_millis()
136+
137+
return None
138+
116139
def _get_idempotency_record(self) -> DataRecord:
117140
"""
118141
Retrieve the idempotency record from the persistence layer.
@@ -167,6 +190,13 @@ def _handle_for_status(self, data_record: DataRecord) -> Optional[Dict[Any, Any]
167190
raise IdempotencyInconsistentStateError("save_inprogress and get_record return inconsistent results.")
168191

169192
if data_record.status == STATUS_CONSTANTS["INPROGRESS"]:
193+
if data_record.in_progress_expiry_timestamp is not None and data_record.in_progress_expiry_timestamp < int(
194+
datetime.datetime.now().timestamp() * 1000
195+
):
196+
raise IdempotencyInconsistentStateError(
197+
"item should have been expired in-progress because it already time-outed."
198+
)
199+
170200
raise IdempotencyAlreadyInProgressError(
171201
f"Execution already in progress with idempotency key: "
172202
f"{self.persistence_store.event_key_jmespath}={data_record.idempotency_key}"

aws_lambda_powertools/utilities/idempotency/config.py

+10
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
from typing import Dict, Optional
22

3+
from aws_lambda_powertools.utilities.typing import LambdaContext
4+
35

46
class IdempotencyConfig:
57
def __init__(
@@ -12,6 +14,7 @@ def __init__(
1214
use_local_cache: bool = False,
1315
local_cache_max_items: int = 256,
1416
hash_function: str = "md5",
17+
lambda_context: Optional[LambdaContext] = None,
1518
):
1619
"""
1720
Initialize the base persistence layer
@@ -32,6 +35,8 @@ def __init__(
3235
Max number of items to store in local cache, by default 1024
3336
hash_function: str, optional
3437
Function to use for calculating hashes, by default md5.
38+
lambda_context: LambdaContext, optional
39+
Lambda Context containing information about the invocation, function and execution environment.
3540
"""
3641
self.event_key_jmespath = event_key_jmespath
3742
self.payload_validation_jmespath = payload_validation_jmespath
@@ -41,3 +46,8 @@ def __init__(
4146
self.use_local_cache = use_local_cache
4247
self.local_cache_max_items = local_cache_max_items
4348
self.hash_function = hash_function
49+
self.lambda_context: Optional[LambdaContext] = lambda_context
50+
51+
def register_lambda_context(self, lambda_context: LambdaContext):
52+
"""Captures the Lambda context, to calculate the remaining time before the invocation times out"""
53+
self.lambda_context = lambda_context

aws_lambda_powertools/utilities/idempotency/idempotency.py

+2
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,8 @@ def idempotent(
6262
return handler(event, context)
6363

6464
config = config or IdempotencyConfig()
65+
config.register_lambda_context(context)
66+
6567
args = event, context
6668
idempotency_handler = IdempotencyHandler(
6769
function=handler,

aws_lambda_powertools/utilities/idempotency/persistence/base.py

+19-1
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ def __init__(
4040
idempotency_key,
4141
status: str = "",
4242
expiry_timestamp: Optional[int] = None,
43+
in_progress_expiry_timestamp: Optional[int] = None,
4344
response_data: Optional[str] = "",
4445
payload_hash: Optional[str] = None,
4546
) -> None:
@@ -53,6 +54,8 @@ def __init__(
5354
status of the idempotent record
5455
expiry_timestamp: int, optional
5556
time before the record should expire, in seconds
57+
in_progress_expiry_timestamp: int, optional
58+
time before the record should expire while in the INPROGRESS state, in seconds
5659
payload_hash: str, optional
5760
hashed representation of payload
5861
response_data: str, optional
@@ -61,6 +64,7 @@ def __init__(
6164
self.idempotency_key = idempotency_key
6265
self.payload_hash = payload_hash
6366
self.expiry_timestamp = expiry_timestamp
67+
self.in_progress_expiry_timestamp = in_progress_expiry_timestamp
6468
self._status = status
6569
self.response_data = response_data
6670

@@ -328,14 +332,16 @@ def save_success(self, data: Dict[str, Any], result: dict) -> None:
328332

329333
self._save_to_cache(data_record=data_record)
330334

331-
def save_inprogress(self, data: Dict[str, Any]) -> None:
335+
def save_inprogress(self, data: Dict[str, Any], remaining_time_in_millis: Optional[int] = None) -> None:
332336
"""
333337
Save record of function's execution being in progress
334338
335339
Parameters
336340
----------
337341
data: Dict[str, Any]
338342
Payload
343+
remaining_time_in_millis: Optional[int]
344+
If expiry of in-progress invocations is enabled, this will contain the remaining time available in millis
339345
"""
340346
data_record = DataRecord(
341347
idempotency_key=self._get_hashed_idempotency_key(data=data),
@@ -344,6 +350,18 @@ def save_inprogress(self, data: Dict[str, Any]) -> None:
344350
payload_hash=self._get_hashed_payload(data=data),
345351
)
346352

353+
if remaining_time_in_millis:
354+
now = datetime.datetime.now()
355+
period = datetime.timedelta(milliseconds=remaining_time_in_millis)
356+
timestamp = (now + period).timestamp()
357+
358+
data_record.in_progress_expiry_timestamp = int(timestamp * 1000)
359+
else:
360+
warnings.warn(
361+
"Couldn't determine the remaining time left. "
362+
"Did you call register_lambda_context on IdempotencyConfig?"
363+
)
364+
347365
logger.debug(f"Saving in progress record for idempotency key: {data_record.idempotency_key}")
348366

349367
if self._retrieve_from_cache(idempotency_key=data_record.idempotency_key):

0 commit comments

Comments
 (0)