diff --git a/aws_lambda_powertools/event_handler/appsync.py b/aws_lambda_powertools/event_handler/appsync.py index fba5681ef6a..fb53b71c77d 100644 --- a/aws_lambda_powertools/event_handler/appsync.py +++ b/aws_lambda_powertools/event_handler/appsync.py @@ -1,96 +1,79 @@ +import asyncio import logging -from typing import Any, Callable, Optional, Type, TypeVar +import warnings +from typing import Any, Callable, Dict, List, Optional, Type, Union +from aws_lambda_powertools.event_handler.graphql_appsync.exceptions import InvalidBatchResponse, ResolverNotFoundError +from aws_lambda_powertools.event_handler.graphql_appsync.router import Router from aws_lambda_powertools.utilities.data_classes import AppSyncResolverEvent from aws_lambda_powertools.utilities.typing import LambdaContext +from aws_lambda_powertools.warnings import PowertoolsUserWarning logger = logging.getLogger(__name__) -AppSyncResolverEventT = TypeVar("AppSyncResolverEventT", bound=AppSyncResolverEvent) - -class BaseRouter: - current_event: AppSyncResolverEventT # type: ignore[valid-type] - lambda_context: LambdaContext - context: dict - - def __init__(self): - self._resolvers: dict = {} - - def resolver(self, type_name: str = "*", field_name: Optional[str] = None): - """Registers the resolver for field_name - - Parameters - ---------- - type_name : str - Type name - field_name : str - Field name - """ - - def register_resolver(func): - logger.debug(f"Adding resolver `{func.__name__}` for field `{type_name}.{field_name}`") - self._resolvers[f"{type_name}.{field_name}"] = {"func": func} - return func - - return register_resolver - - def append_context(self, **additional_context): - """Append key=value data as routing context""" - self.context.update(**additional_context) - - def clear_context(self): - """Resets routing context""" - self.context.clear() - - -class AppSyncResolver(BaseRouter): +class AppSyncResolver(Router): """ - AppSync resolver decorator + AppSync GraphQL API Resolver Example ------- - - **Sample usage** - - from aws_lambda_powertools.event_handler import AppSyncResolver - - app = AppSyncResolver() - - @app.resolver(type_name="Query", field_name="listLocations") - def list_locations(page: int = 0, size: int = 10) -> list: - # Your logic to fetch locations with arguments passed in - return [{"id": 100, "name": "Smooth Grooves"}] - - @app.resolver(type_name="Merchant", field_name="extraInfo") - def get_extra_info() -> dict: - # Can use "app.current_event.source" to filter within the parent context - account_type = app.current_event.source["accountType"] - method = "BTC" if account_type == "NEW" else "USD" - return {"preferredPaymentMethod": method} - - @app.resolver(field_name="commonField") - def common_field() -> str: - # Would match all fieldNames matching 'commonField' - return str(uuid.uuid4()) + ```python + from aws_lambda_powertools.event_handler import AppSyncResolver + + app = AppSyncResolver() + + @app.resolver(type_name="Query", field_name="listLocations") + def list_locations(page: int = 0, size: int = 10) -> list: + # Your logic to fetch locations with arguments passed in + return [{"id": 100, "name": "Smooth Grooves"}] + + @app.resolver(type_name="Merchant", field_name="extraInfo") + def get_extra_info() -> dict: + # Can use "app.current_event.source" to filter within the parent context + account_type = app.current_event.source["accountType"] + method = "BTC" if account_type == "NEW" else "USD" + return {"preferredPaymentMethod": method} + + @app.resolver(field_name="commonField") + def common_field() -> str: + # Would match all fieldNames matching 'commonField' + return str(uuid.uuid4()) + ``` """ def __init__(self): + """ + Initialize a new instance of the AppSyncResolver. + """ super().__init__() self.context = {} # early init as customers might add context before event resolution - def resolve( + self.current_batch_event: List[AppSyncResolverEvent] = [] + self.current_event: Optional[AppSyncResolverEvent] = None + self.lambda_context: Optional[LambdaContext] = None + + def __call__( self, event: dict, context: LambdaContext, data_model: Type[AppSyncResolverEvent] = AppSyncResolverEvent, ) -> Any: - """Resolve field_name + """Implicit lambda handler which internally calls `resolve`""" + return self.resolve(event, context, data_model) + + def resolve( + self, + event: Union[dict, List[Dict]], + context: LambdaContext, + data_model: Type[AppSyncResolverEvent] = AppSyncResolverEvent, + ) -> Any: + """Resolves the response based on the provide event and decorator routes Parameters ---------- - event : dict - Lambda event + event : dict | List[Dict] + Lambda event either coming from batch processing endpoint or from standard processing endpoint context : LambdaContext Lambda context data_model: @@ -154,45 +137,211 @@ def lambda_handler(event, context): ValueError If we could not find a field resolver """ - # Maintenance: revisit generics/overload to fix [attr-defined] in mypy usage - BaseRouter.current_event = data_model(event) - BaseRouter.lambda_context = context - resolver = self._get_resolver(BaseRouter.current_event.type_name, BaseRouter.current_event.field_name) - response = resolver(**BaseRouter.current_event.arguments) + self.lambda_context = context + + if isinstance(event, list): + response = self._call_batch_resolver(event=event, data_model=data_model) + else: + response = self._call_single_resolver(event=event, data_model=data_model) + self.clear_context() return response - def _get_resolver(self, type_name: str, field_name: str) -> Callable: - """Get resolver for field_name + def _call_single_resolver(self, event: dict, data_model: Type[AppSyncResolverEvent]) -> Any: + """Call single event resolver Parameters ---------- - type_name : str - Type name - field_name : str - Field name + event : dict + Event + data_model : Type[AppSyncResolverEvent] + Data_model to decode AppSync event, by default it is of AppSyncResolverEvent type or subclass of it + """ + + logger.debug("Processing direct resolver event") + + self.current_event = data_model(event) + resolver = self._resolver_registry.find_resolver(self.current_event.type_name, self.current_event.field_name) + if not resolver: + raise ValueError(f"No resolver found for '{self.current_event.type_name}.{self.current_event.field_name}'") + return resolver["func"](**self.current_event.arguments) + + def _call_sync_batch_resolver( + self, + resolver: Callable, + raise_on_error: bool = False, + aggregate: bool = True, + ) -> List[Any]: + """ + Calls a synchronous batch resolver function for each event in the current batch. + + Parameters + ---------- + resolver: Callable + The callable function to resolve events. + raise_on_error: bool + A flag indicating whether to raise an error when processing batches + with failed items. Defaults to False, which means errors are handled without raising exceptions. + aggregate: bool + A flag indicating whether the batch items should be processed at once or individually. + If True (default), the batch resolver will process all items in the batch as a single event. + If False, the batch resolver will process each item in the batch individually. Returns ------- - Callable - callable function and configuration + List[Any] + A list of results corresponding to the resolved events. """ - full_name = f"{type_name}.{field_name}" - resolver = self._resolvers.get(full_name, self._resolvers.get(f"*.{field_name}")) - if not resolver: - raise ValueError(f"No resolver found for '{full_name}'") - return resolver["func"] - def __call__( + logger.debug(f"Graceful error handling flag {raise_on_error=}") + + # Checks whether the entire batch should be processed at once + if aggregate: + # Process the entire batch + response = resolver(event=self.current_batch_event) + + if not isinstance(response, List): + raise InvalidBatchResponse("The response must be a List when using batch resolvers") + + return response + + # Non aggregated events, so we call this event list x times + # Stop on first exception we encounter + if raise_on_error: + return [ + resolver(event=appconfig_event, **appconfig_event.arguments) + for appconfig_event in self.current_batch_event + ] + + # By default, we gracefully append `None` for any records that failed processing + results = [] + for idx, event in enumerate(self.current_batch_event): + try: + results.append(resolver(event=event, **event.arguments)) + except Exception: + logger.debug(f"Failed to process event number {idx} from field '{event.info.field_name}'") + results.append(None) + + return results + + async def _call_async_batch_resolver( self, - event: dict, - context: LambdaContext, - data_model: Type[AppSyncResolverEvent] = AppSyncResolverEvent, - ) -> Any: - """Implicit lambda handler which internally calls `resolve`""" - return self.resolve(event, context, data_model) + resolver: Callable, + raise_on_error: bool = False, + aggregate: bool = True, + ) -> List[Any]: + """ + Asynchronously call a batch resolver for each event in the current batch. + + Parameters + ---------- + resolver: Callable + The asynchronous resolver function. + raise_on_error: bool + A flag indicating whether to raise an error when processing batches + with failed items. Defaults to False, which means errors are handled without raising exceptions. + aggregate: bool + A flag indicating whether the batch items should be processed at once or individually. + If True (default), the batch resolver will process all items in the batch as a single event. + If False, the batch resolver will process each item in the batch individually. + + Returns + ------- + List[Any] + A list of results corresponding to the resolved events. + """ + + logger.debug(f"Graceful error handling flag {raise_on_error=}") + + # Checks whether the entire batch should be processed at once + if aggregate: + # Process the entire batch + ret = await resolver(event=self.current_batch_event) + if not isinstance(ret, List): + raise InvalidBatchResponse("The response must be a List when using batch resolvers") + + return ret + + response: List = [] + + # Prime coroutines + tasks = [resolver(event=e, **e.arguments) for e in self.current_batch_event] + + # Aggregate results or raise at first error + if raise_on_error: + response.extend(await asyncio.gather(*tasks)) + return response + + # Aggregate results and exceptions, then filter them out + # Use `None` upon exception for graceful error handling at GraphQL engine level + # + # NOTE: asyncio.gather(return_exceptions=True) catches and includes exceptions in the results + # this will become useful when we support exception handling in AppSync resolver + results = await asyncio.gather(*tasks, return_exceptions=True) + response.extend(None if isinstance(ret, Exception) else ret for ret in results) + + return response + + def _call_batch_resolver(self, event: List[dict], data_model: Type[AppSyncResolverEvent]) -> List[Any]: + """Call batch event resolver for sync and async methods + + Parameters + ---------- + event : List[dict] + Batch event + data_model : Type[AppSyncResolverEvent] + Data_model to decode AppSync event, by default AppSyncResolverEvent or a subclass + + Returns + ------- + List[Any] + Results of the resolver execution. + + Raises + ------ + InconsistentPayloadError: + When all events in the batch do not have the same fieldName. + + ResolverNotFoundError: + When no resolver is found for the specified type and field. + """ + logger.debug("Processing batch resolver event") + + self.current_batch_event = [data_model(e) for e in event] + type_name, field_name = self.current_batch_event[0].type_name, self.current_batch_event[0].field_name + + resolver = self._batch_resolver_registry.find_resolver(type_name, field_name) + async_resolver = self._async_batch_resolver_registry.find_resolver(type_name, field_name) + + if resolver and async_resolver: + warnings.warn( + f"Both synchronous and asynchronous resolvers found for the same event and field." + f"The synchronous resolver takes precedence. Executing: {resolver['func'].__name__}", + stacklevel=2, + category=PowertoolsUserWarning, + ) + + if resolver: + logger.debug(f"Found sync resolver. {resolver=}, {field_name=}") + return self._call_sync_batch_resolver( + resolver=resolver["func"], + raise_on_error=resolver["raise_on_error"], + aggregate=resolver["aggregate"], + ) + + if async_resolver: + logger.debug(f"Found async resolver. {resolver=}, {field_name=}") + return asyncio.run( + self._call_async_batch_resolver( + resolver=async_resolver["func"], + raise_on_error=async_resolver["raise_on_error"], + aggregate=async_resolver["aggregate"], + ), + ) + + raise ResolverNotFoundError(f"No resolver found for '{type_name}.{field_name}'") def include_router(self, router: "Router") -> None: """Adds all resolvers defined in a router @@ -202,15 +351,112 @@ def include_router(self, router: "Router") -> None: router : Router A router containing a dict of field resolvers """ + # Merge app and router context + logger.debug("Merging router and app context") self.context.update(**router.context) + # use pointer to allow context clearance after event is processed e.g., resolve(evt, ctx) router.context = self.context - self._resolvers.update(router._resolvers) + logger.debug("Merging router resolver registries") + self._resolver_registry.merge(router._resolver_registry) + self._batch_resolver_registry.merge(router._batch_resolver_registry) + self._async_batch_resolver_registry.merge(router._async_batch_resolver_registry) + def resolver(self, type_name: str = "*", field_name: Optional[str] = None) -> Callable: + """Registers direct resolver function for GraphQL type and field name. -class Router(BaseRouter): - def __init__(self): - super().__init__() - self.context = {} # early init as customers might add context before event resolution + Parameters + ---------- + type_name : str, optional + GraphQL type e.g., Query, Mutation, by default "*" meaning any + field_name : Optional[str], optional + GraphQL field e.g., getTodo, createTodo, by default None + + Returns + ------- + Callable + Registered resolver + + Example + ------- + + ```python + from aws_lambda_powertools.event_handler import AppSyncResolver + + from typing import TypedDict + + app = AppSyncResolver() + + class Todo(TypedDict, total=False): + id: str + userId: str + title: str + completed: bool + + # resolve any GraphQL `getTodo` queries + # arguments are injected as function arguments as-is + @app.resolver(type_name="Query", field_name="getTodo") + def get_todo(id: str = "", status: str = "open") -> Todo: + todos: Response = requests.get(f"https://jsonplaceholder.typicode.com/todos/{id}") + todos.raise_for_status() + + return todos.json() + + def lambda_handler(event, context): + return app.resolve(event, context) + ``` + """ + return self._resolver_registry.register(field_name=field_name, type_name=type_name) + + def batch_resolver( + self, + type_name: str = "*", + field_name: Optional[str] = None, + raise_on_error: bool = False, + aggregate: bool = True, + ) -> Callable: + """Registers batch resolver function for GraphQL type and field name. + + By default, we handle errors gracefully by returning `None`. If you want + to short-circuit and fail the entire batch use `raise_on_error=True`. + + Parameters + ---------- + type_name : str, optional + GraphQL type e.g., Query, Mutation, by default "*" meaning any + field_name : Optional[str], optional + GraphQL field e.g., getTodo, createTodo, by default None + raise_on_error : bool, optional + Whether to fail entire batch upon error, or handle errors gracefully (None), by default False + aggregate: bool + A flag indicating whether the batch items should be processed at once or individually. + If True (default), the batch resolver will process all items in the batch as a single event. + If False, the batch resolver will process each item in the batch individually. + + Returns + ------- + Callable + Registered resolver + """ + return self._batch_resolver_registry.register( + field_name=field_name, + type_name=type_name, + raise_on_error=raise_on_error, + aggregate=aggregate, + ) + + def async_batch_resolver( + self, + type_name: str = "*", + field_name: Optional[str] = None, + raise_on_error: bool = False, + aggregate: bool = True, + ) -> Callable: + return self._async_batch_resolver_registry.register( + field_name=field_name, + type_name=type_name, + raise_on_error=raise_on_error, + aggregate=aggregate, + ) diff --git a/aws_lambda_powertools/event_handler/graphql_appsync/__init__.py b/aws_lambda_powertools/event_handler/graphql_appsync/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/aws_lambda_powertools/event_handler/graphql_appsync/_registry.py b/aws_lambda_powertools/event_handler/graphql_appsync/_registry.py new file mode 100644 index 00000000000..fe86f502d65 --- /dev/null +++ b/aws_lambda_powertools/event_handler/graphql_appsync/_registry.py @@ -0,0 +1,76 @@ +import logging +from typing import Any, Callable, Dict, Optional + +logger = logging.getLogger(__name__) + + +class ResolverRegistry: + def __init__(self): + self.resolvers: Dict[str, Dict[str, Any]] = {} + + def register( + self, + type_name: str = "*", + field_name: Optional[str] = None, + raise_on_error: bool = False, + aggregate: bool = True, + ) -> Callable: + """Registers the resolver for field_name + + Parameters + ---------- + type_name : str + Type name + field_name : str + Field name + raise_on_error: bool + A flag indicating whether to raise an error when processing batches + with failed items. Defaults to False, which means errors are handled without raising exceptions. + aggregate: bool + A flag indicating whether the batch items should be processed at once or individually. + If True (default), the batch resolver will process all items in the batch as a single event. + If False, the batch resolver will process each item in the batch individually. + + Return + ---------- + Dict + A dictionary with the resolver and if raise exception on error + """ + + def _register(func) -> Callable: + logger.debug(f"Adding resolver `{func.__name__}` for field `{type_name}.{field_name}`") + self.resolvers[f"{type_name}.{field_name}"] = { + "func": func, + "raise_on_error": raise_on_error, + "aggregate": aggregate, + } + return func + + return _register + + def find_resolver(self, type_name: str, field_name: str) -> Optional[Dict]: + """Find resolver based on type_name and field_name + + Parameters + ---------- + type_name : str + Type name + field_name : str + Field name + Return + ---------- + Optional[Dict] + A dictionary with the resolver and if raise exception on error + """ + logger.debug(f"Looking for resolver for type={type_name}, field={field_name}.") + return self.resolvers.get(f"{type_name}.{field_name}", self.resolvers.get(f"*.{field_name}")) + + def merge(self, other_registry: "ResolverRegistry"): + """Update current registry with incoming registry + + Parameters + ---------- + other_registry : ResolverRegistry + Registry to merge from + """ + self.resolvers.update(**other_registry.resolvers) diff --git a/aws_lambda_powertools/event_handler/graphql_appsync/base.py b/aws_lambda_powertools/event_handler/graphql_appsync/base.py new file mode 100644 index 00000000000..ad32d0d17f8 --- /dev/null +++ b/aws_lambda_powertools/event_handler/graphql_appsync/base.py @@ -0,0 +1,158 @@ +from abc import ABC, abstractmethod +from typing import Callable, Optional + + +class BaseRouter(ABC): + """Abstract base class for Router (resolvers)""" + + @abstractmethod + def resolver(self, type_name: str = "*", field_name: Optional[str] = None) -> Callable: + """ + Retrieve a resolver function for a specific type and field. + + Parameters + ----------- + type_name: str + The name of the type. + field_name: Optional[str] + The name of the field (default is None). + + Examples + -------- + ```python + from typing import Optional + + from aws_lambda_powertools.event_handler import AppSyncResolver + from aws_lambda_powertools.utilities.data_classes import AppSyncResolverEvent + from aws_lambda_powertools.utilities.typing import LambdaContext + + app = AppSyncResolver() + + @app.resolver(type_name="Query", field_name="getPost") + def related_posts(event: AppSyncResolverEvent) -> Optional[list]: + return {"success": "ok"} + + def lambda_handler(event, context: LambdaContext) -> dict: + return app.resolve(event, context) + ``` + + Returns + ------- + Callable + The resolver function. + """ + raise NotImplementedError + + @abstractmethod + def batch_resolver( + self, + type_name: str = "*", + field_name: Optional[str] = None, + raise_on_error: bool = False, + aggregate: bool = True, + ) -> Callable: + """ + Retrieve a batch resolver function for a specific type and field. + + Parameters + ----------- + type_name: str + The name of the type. + field_name: Optional[str] + The name of the field (default is None). + raise_on_error: bool + A flag indicating whether to raise an error when processing batches + with failed items. Defaults to False, which means errors are handled without raising exceptions. + aggregate: bool + A flag indicating whether the batch items should be processed at once or individually. + If True (default), the batch resolver will process all items in the batch as a single event. + If False, the batch resolver will process each item in the batch individually. + + Examples + -------- + ```python + from typing import Optional + + from aws_lambda_powertools.event_handler import AppSyncResolver + from aws_lambda_powertools.utilities.data_classes import AppSyncResolverEvent + from aws_lambda_powertools.utilities.typing import LambdaContext + + app = AppSyncResolver() + + @app.batch_resolver(type_name="Query", field_name="getPost") + def related_posts(event: AppSyncResolverEvent, id) -> Optional[list]: + return {"post_id": id} + + def lambda_handler(event, context: LambdaContext) -> dict: + return app.resolve(event, context) + ``` + + Returns + ------- + Callable + The batch resolver function. + """ + raise NotImplementedError + + @abstractmethod + def async_batch_resolver( + self, + type_name: str = "*", + field_name: Optional[str] = None, + raise_on_error: bool = False, + aggregate: bool = True, + ) -> Callable: + """ + Retrieve a batch resolver function for a specific type and field and runs async. + + Parameters + ----------- + type_name: str + The name of the type. + field_name: Optional[str] + The name of the field (default is None). + raise_on_error: bool + A flag indicating whether to raise an error when processing batches + with failed items. Defaults to False, which means errors are handled without raising exceptions. + aggregate: bool + A flag indicating whether the batch items should be processed at once or individually. + If True (default), the batch resolver will process all items in the batch as a single event. + If False, the batch resolver will process each item in the batch individually. + + Examples + -------- + ```python + from typing import Optional + + from aws_lambda_powertools.event_handler import AppSyncResolver + from aws_lambda_powertools.utilities.data_classes import AppSyncResolverEvent + from aws_lambda_powertools.utilities.typing import LambdaContext + + app = AppSyncResolver() + + @app.async_batch_resolver(type_name="Query", field_name="getPost") + async def related_posts(event: AppSyncResolverEvent, id) -> Optional[list]: + return {"post_id": id} + + def lambda_handler(event, context: LambdaContext) -> dict: + return app.resolve(event, context) + ``` + + Returns + ------- + Callable + The batch resolver function. + """ + raise NotImplementedError + + @abstractmethod + def append_context(self, **additional_context) -> None: + """ + Appends context information available under any route. + + Parameters + ----------- + **additional_context: dict + Additional context key-value pairs to append. + """ + raise NotImplementedError diff --git a/aws_lambda_powertools/event_handler/graphql_appsync/exceptions.py b/aws_lambda_powertools/event_handler/graphql_appsync/exceptions.py new file mode 100644 index 00000000000..f98a75b6f17 --- /dev/null +++ b/aws_lambda_powertools/event_handler/graphql_appsync/exceptions.py @@ -0,0 +1,10 @@ +class ResolverNotFoundError(Exception): + """ + When a resolver is not found during a lookup. + """ + + +class InvalidBatchResponse(Exception): + """ + When a batch response something different from a List + """ diff --git a/aws_lambda_powertools/event_handler/graphql_appsync/router.py b/aws_lambda_powertools/event_handler/graphql_appsync/router.py new file mode 100644 index 00000000000..2046f09e03c --- /dev/null +++ b/aws_lambda_powertools/event_handler/graphql_appsync/router.py @@ -0,0 +1,53 @@ +from typing import Callable, Optional + +from aws_lambda_powertools.event_handler.graphql_appsync._registry import ResolverRegistry +from aws_lambda_powertools.event_handler.graphql_appsync.base import BaseRouter + + +class Router(BaseRouter): + context: dict + + def __init__(self): + self.context = {} # early init as customers might add context before event resolution + self._resolver_registry = ResolverRegistry() + self._batch_resolver_registry = ResolverRegistry() + self._async_batch_resolver_registry = ResolverRegistry() + + def resolver(self, type_name: str = "*", field_name: Optional[str] = None) -> Callable: + return self._resolver_registry.register(field_name=field_name, type_name=type_name) + + def batch_resolver( + self, + type_name: str = "*", + field_name: Optional[str] = None, + raise_on_error: bool = False, + aggregate: bool = True, + ) -> Callable: + return self._batch_resolver_registry.register( + field_name=field_name, + type_name=type_name, + raise_on_error=raise_on_error, + aggregate=aggregate, + ) + + def async_batch_resolver( + self, + type_name: str = "*", + field_name: Optional[str] = None, + raise_on_error: bool = False, + aggregate: bool = True, + ) -> Callable: + return self._async_batch_resolver_registry.register( + field_name=field_name, + type_name=type_name, + raise_on_error=raise_on_error, + aggregate=aggregate, + ) + + def append_context(self, **additional_context): + """Append key=value data as routing context""" + self.context.update(**additional_context) + + def clear_context(self): + """Resets routing context""" + self.context.clear() diff --git a/aws_lambda_powertools/utilities/data_classes/appsync_resolver_event.py b/aws_lambda_powertools/utilities/data_classes/appsync_resolver_event.py index f58308377ff..d3166db1e3b 100644 --- a/aws_lambda_powertools/utilities/data_classes/appsync_resolver_event.py +++ b/aws_lambda_powertools/utilities/data_classes/appsync_resolver_event.py @@ -184,9 +184,9 @@ def identity(self) -> Union[None, AppSyncIdentityIAM, AppSyncIdentityCognito]: return get_identity_object(self.get("identity")) @property - def source(self) -> Optional[Dict[str, Any]]: + def source(self) -> Dict[str, Any]: """A map that contains the resolution of the parent field.""" - return self.get("source") + return self.get("source") or {} @property def request_headers(self) -> Dict[str, str]: diff --git a/aws_lambda_powertools/utilities/parameters/secrets.py b/aws_lambda_powertools/utilities/parameters/secrets.py index 0494c64985a..89df05f26e1 100644 --- a/aws_lambda_powertools/utilities/parameters/secrets.py +++ b/aws_lambda_powertools/utilities/parameters/secrets.py @@ -423,7 +423,7 @@ def set_secret( >>> parameters.set_secret( name="my-secret", value='{"password": "supers3cr3tllam@passw0rd"}', - client_request_token="61f2af5f-5f75-44b1-a29f-0cc37af55b11" + client_request_token="YOUR_TOKEN_HERE" ) URLs: diff --git a/docs/core/event_handler/appsync.md b/docs/core/event_handler/appsync.md index fcadc2a1f27..440940cd107 100644 --- a/docs/core/event_handler/appsync.md +++ b/docs/core/event_handler/appsync.md @@ -3,40 +3,69 @@ title: GraphQL API description: Core utility --- -Event handler for AWS AppSync Direct Lambda Resolver and Amplify GraphQL Transformer. +Event Handler for AWS AppSync and Amplify GraphQL Transformer. + +```mermaid +stateDiagram-v2 + direction LR + EventSource: AWS Lambda Event Sources + EventHandlerResolvers: AWS AppSync Direct invocation<br/><br/> AWS AppSync Batch invocation + LambdaInit: Lambda invocation + EventHandler: Event Handler + EventHandlerResolver: Route event based on GraphQL type/field keys + YourLogic: Run your registered resolver function + EventHandlerResolverBuilder: Adapts response to Event Source contract + LambdaResponse: Lambda response + + state EventSource { + EventHandlerResolvers + } + + EventHandlerResolvers --> LambdaInit + + LambdaInit --> EventHandler + EventHandler --> EventHandlerResolver + + state EventHandler { + [*] --> EventHandlerResolver: app.resolve(event, context) + EventHandlerResolver --> YourLogic + YourLogic --> EventHandlerResolverBuilder + } + + EventHandler --> LambdaResponse +``` ## Key Features -* Automatically parse API arguments to function arguments * Choose between strictly match a GraphQL field name or all of them to a function -* Integrates with [Data classes utilities](../../utilities/data_classes.md){target="_blank"} to access resolver and identity information -* Works with both Direct Lambda Resolver and Amplify GraphQL Transformer `@function` directive -* Support async Python 3.8+ functions, and generators +* Automatically parse API arguments to function arguments +* Integrates with [Event Source Data classes utilities](../../utilities/data_classes.md){target="_blank"} to access resolver and identity information +* Support async Python 3.8+ functions and generators ## Terminology **[Direct Lambda Resolver](https://docs.aws.amazon.com/appsync/latest/devguide/direct-lambda-reference.html){target="_blank"}**. A custom AppSync Resolver to bypass the use of Apache Velocity Template (VTL) and automatically map your function's response to a GraphQL field. -**[Amplify GraphQL Transformer](https://docs.amplify.aws/cli/graphql-transformer/function){target="_blank"}**. Custom GraphQL directives to define your application's data model using Schema Definition Language (SDL). Amplify CLI uses these directives to convert GraphQL SDL into full descriptive AWS CloudFormation templates. +**[Amplify GraphQL Transformer](https://docs.amplify.aws/cli/graphql-transformer/function){target="_blank"}**. Custom GraphQL directives to define your application's data model using Schema Definition Language _(SDL)_, _e.g., `@function`_. Amplify CLI uses these directives to convert GraphQL SDL into full descriptive AWS CloudFormation templates. ## Getting started +???+ tip "Tip: Designing GraphQL Schemas for the first time?" + Visit [AWS AppSync schema documentation](https://docs.aws.amazon.com/appsync/latest/devguide/designing-your-schema.html){target="_blank"} to understand how to define types, nesting, and pagination. + ### Required resources -You must have an existing AppSync GraphQL API and IAM permissions to invoke your Lambda function. That said, there is no additional permissions to use this utility. +You must have an existing AppSync GraphQL API and IAM permissions to invoke your Lambda function. That said, there is no additional permissions to use Event Handler as routing requires no dependency (_standard library_). This is the sample infrastructure we are using for the initial examples with a AppSync Direct Lambda Resolver. -???+ tip "Tip: Designing GraphQL Schemas for the first time?" - Visit [AWS AppSync schema documentation](https://docs.aws.amazon.com/appsync/latest/devguide/designing-your-schema.html){target="_blank"} for understanding how to define types, nesting, and pagination. - === "getting_started_schema.graphql" ```typescript --8<-- "examples/event_handler_graphql/src/getting_started_schema.graphql" ``` -=== "template.yml" +=== "template.yaml" ```yaml hl_lines="59-60 71-72 94-95 104-105 112-113" --8<-- "examples/event_handler_graphql/sam/template.yaml" @@ -259,6 +288,275 @@ You can use `append_context` when you want to share data between your App and Ro --8<-- "examples/event_handler_graphql/src/split_operation_append_context_module.py" ``` +### Batch processing + +```mermaid +stateDiagram-v2 + direction LR + LambdaInit: Lambda invocation + EventHandler: Event Handler + EventHandlerResolver: Route event based on GraphQL type/field keys + Client: Client query (listPosts) + YourLogic: Run your registered resolver function + EventHandlerResolverBuilder: Verifies response is a list + AppSyncBatchPostsResolution: query listPosts + AppSyncBatchPostsItems: get all posts data <em>(id, title, relatedPosts)</em> + AppSyncBatchRelatedPosts: get related posts <em>(id, title, relatedPosts)</em> + AppSyncBatchAggregate: aggregate batch resolver event + AppSyncBatchLimit: reached batch size limit + LambdaResponse: Lambda response + + Client --> AppSyncBatchResolverMode + state AppSyncBatchResolverMode { + [*] --> AppSyncBatchPostsResolution + AppSyncBatchPostsResolution --> AppSyncBatchPostsItems + AppSyncBatchPostsItems --> AppSyncBatchRelatedPosts: <strong>N additional queries</strong> + AppSyncBatchRelatedPosts --> AppSyncBatchRelatedPosts + AppSyncBatchRelatedPosts --> AppSyncBatchAggregate + AppSyncBatchRelatedPosts --> AppSyncBatchAggregate + AppSyncBatchRelatedPosts --> AppSyncBatchAggregate + AppSyncBatchAggregate --> AppSyncBatchLimit + } + + AppSyncBatchResolverMode --> LambdaInit: 1x Invoke with N events + LambdaInit --> EventHandler + + state EventHandler { + [*] --> EventHandlerResolver: app.resolve(event, context) + EventHandlerResolver --> YourLogic + YourLogic --> EventHandlerResolverBuilder + EventHandlerResolverBuilder --> LambdaResponse + } +``` + +<em><center>Batch resolvers mechanics: visualizing N+1 in `relatedPosts` field.</center></em> + +#### Understanding N+1 problem + +When AWS AppSync has [batching enabled for Lambda Resolvers](https://docs.aws.amazon.com/appsync/latest/devguide/tutorial-lambda-resolvers.html#advanced-use-case-batching){target="_blank"}, it will group as many requests as possible before invoking your Lambda invocation. Effectively solving the [N+1 problem in GraphQL](https://aws.amazon.com/blogs/mobile/introducing-configurable-batching-size-for-aws-appsync-lambda-resolvers/){target="_blank"}. + +For example, say you have a query named `listPosts`. For each post, you also want `relatedPosts`. **Without batching**, AppSync will: + +1. Invoke your Lambda function to get the first post +2. Invoke your Lambda function for each related post +3. Repeat 1 until done + +```mermaid +sequenceDiagram + participant Client + participant AppSync + participant Lambda + participant Database + + Client->>AppSync: GraphQL Query + Note over Client,AppSync: query listPosts { <br/>id <br/>title <br/>relatedPosts { id title } <br/> } + + AppSync->>Lambda: Fetch N posts (listPosts) + Lambda->>Database: Query + Database->>Lambda: Posts + Lambda-->>AppSync: Return posts (id, title) + loop Fetch N related posts (relatedPosts) + AppSync->>Lambda: Invoke function (N times) + Lambda->>Database: Query + Database-->>Lambda: Return related posts + Lambda-->>AppSync: Return related posts + end + AppSync-->>Client: Return posts and their related posts +``` + +#### Batch resolvers + +You can use `@batch_resolver` or `@async_batch_resolver` decorators to receive the entire batch of requests. + +In this mode, you must return results in the same order of your batch items, so AppSync can associate the results back to the client. + +=== "advanced_batch_resolver.py" + ```python hl_lines="5 9 23" + --8<-- "examples/event_handler_graphql/src/advanced_batch_resolver.py" + ``` + + 1. The entire batch is sent to the resolver. You need to iterate through it to process all records. + 2. We use `post_id` as our unique identifier of the GraphQL request. + +=== "advanced_batch_resolver_payload.json" + ```json hl_lines="6 16 25 35 44 54" + --8<-- "examples/event_handler_graphql/src/advanced_batch_resolver_payload.json" + ``` + +=== "advanced_batch_query.graphql" + ```typescript hl_lines="3 6" + --8<-- "examples/event_handler_graphql/src/advanced_batch_query.graphql" + ``` + +##### Processing items individually + +```mermaid +stateDiagram-v2 + direction LR + LambdaInit: Lambda invocation + EventHandler: Event Handler + EventHandlerResolver: Route event based on GraphQL type/field keys + Client: Client query (listPosts) + YourLogic: Call your registered resolver function <strong>N times</strong> + EventHandlerResolverErrorHandling: Gracefully <strong>handle errors</strong> with null response + EventHandlerResolverBuilder: Aggregate responses to match batch size + AppSyncBatchPostsResolution: query listPosts + AppSyncBatchPostsItems: get all posts data <em>(id, title, relatedPosts)</em> + AppSyncBatchRelatedPosts: get related posts <em>(id, title, relatedPosts)</em> + AppSyncBatchAggregate: aggregate batch resolver event + AppSyncBatchLimit: reached batch size limit + LambdaResponse: Lambda response + + Client --> AppSyncBatchResolverMode + state AppSyncBatchResolverMode { + [*] --> AppSyncBatchPostsResolution + AppSyncBatchPostsResolution --> AppSyncBatchPostsItems + AppSyncBatchPostsItems --> AppSyncBatchRelatedPosts: <strong>N additional queries</strong> + AppSyncBatchRelatedPosts --> AppSyncBatchRelatedPosts + AppSyncBatchRelatedPosts --> AppSyncBatchAggregate + AppSyncBatchRelatedPosts --> AppSyncBatchAggregate + AppSyncBatchRelatedPosts --> AppSyncBatchAggregate + AppSyncBatchAggregate --> AppSyncBatchLimit + } + + AppSyncBatchResolverMode --> LambdaInit: 1x Invoke with N events + LambdaInit --> EventHandler + + state EventHandler { + [*] --> EventHandlerResolver: app.resolve(event, context) + EventHandlerResolver --> YourLogic + YourLogic --> EventHandlerResolverErrorHandling + EventHandlerResolverErrorHandling --> EventHandlerResolverBuilder + EventHandlerResolverBuilder --> LambdaResponse + } +``` + +<em><center>Batch resolvers: reducing Lambda invokes but fetching data N times (similar to single resolver).</center></em> + +In rare scenarios, you might want to process each item individually, trading ease of use for increased latency as you handle one batch item at a time. + +You can toggle `aggregate` parameter in `@batch_resolver` decorator for your resolver function to be called N times. + +!!! note "This does not resolve the N+1 problem, but shifts it to the Lambda runtime." + +In this mode, we will: + +1. Aggregate each response we receive from your function in the exact order it receives +2. Gracefully handle errors by adding `None` in the final response for each batch item that failed processing + * You can customize `nul` or error responses back to the client in the [AppSync resolver mapping templates](https://docs.aws.amazon.com/appsync/latest/devguide/tutorial-lambda-resolvers.html#returning-individual-errors){target="_blank"} + +=== "advanced_batch_resolver_individual.py" + ```python hl_lines="5 9 19" + --8<-- "examples/event_handler_graphql/src/advanced_batch_resolver_individual.py" + ``` + + 1. You need to disable the aggregated event by using `aggregate` flag. + The resolver receives and processes each record one at a time. + +=== "advanced_batch_resolver_payload.json" + ```json hl_lines="6 16 25 35 44 54" + --8<-- "examples/event_handler_graphql/src/advanced_batch_resolver_payload.json" + ``` + +=== "advanced_batch_query.graphql" + ```typescript hl_lines="3 6" + --8<-- "examples/event_handler_graphql/src/advanced_batch_query.graphql" + ``` + +##### Raise on error + +```mermaid +stateDiagram-v2 + direction LR + LambdaInit: Lambda invocation + EventHandler: Event Handler + EventHandlerResolver: Route event based on GraphQL type/field keys + Client: Client query (listPosts) + YourLogic: Call your registered resolver function <strong>N times</strong> + EventHandlerResolverErrorHandling: <strong>Error?</strong> + EventHandlerResolverHappyPath: <strong>No error?</strong> + EventHandlerResolverUnhappyPath: Propagate any exception + EventHandlerResolverBuilder: Aggregate responses to match batch size + AppSyncBatchPostsResolution: query listPosts + AppSyncBatchPostsItems: get all posts data <em>(id, title, relatedPosts)</em> + AppSyncBatchRelatedPosts: get related posts <em>(id, title, relatedPosts)</em> + AppSyncBatchAggregate: aggregate batch resolver event + AppSyncBatchLimit: reached batch size limit + LambdaResponse: <strong>Lambda response</strong> + LambdaErrorResponse: <strong>Lambda error</strong> + + Client --> AppSyncBatchResolverMode + state AppSyncBatchResolverMode { + [*] --> AppSyncBatchPostsResolution + AppSyncBatchPostsResolution --> AppSyncBatchPostsItems + AppSyncBatchPostsItems --> AppSyncBatchRelatedPosts: <strong>N additional queries</strong> + AppSyncBatchRelatedPosts --> AppSyncBatchRelatedPosts + AppSyncBatchRelatedPosts --> AppSyncBatchAggregate + AppSyncBatchRelatedPosts --> AppSyncBatchAggregate + AppSyncBatchRelatedPosts --> AppSyncBatchAggregate + AppSyncBatchAggregate --> AppSyncBatchLimit + } + + AppSyncBatchResolverMode --> LambdaInit: 1x Invoke with N events + LambdaInit --> EventHandler + + state EventHandler { + [*] --> EventHandlerResolver: app.resolve(event, context) + EventHandlerResolver --> YourLogic + YourLogic --> EventHandlerResolverHappyPath + YourLogic --> EventHandlerResolverErrorHandling + EventHandlerResolverHappyPath --> EventHandlerResolverBuilder + EventHandlerResolverErrorHandling --> EventHandlerResolverUnhappyPath + EventHandlerResolverUnhappyPath --> LambdaErrorResponse + + EventHandlerResolverBuilder --> LambdaResponse + } +``` + +<em><center>Batch resolvers: reducing Lambda invokes but fetching data N times (similar to single resolver).</center></em> + +You can toggle `raise_on_error` parameter in `@batch_resolver` to propagate any exception instead of gracefully returning `None` for a given batch item. + +This is useful when you want to stop processing immediately in the event of an unhandled or unrecoverable exception. + +=== "advanced_batch_resolver_handling_error.py" + ```python hl_lines="5 9 19" + --8<-- "examples/event_handler_graphql/src/advanced_batch_resolver_handling_error.py" + ``` + + 1. You can enable enable the error handling by using `raise_on_error` flag. + +=== "advanced_batch_resolver_payload.json" + ```json hl_lines="6 16 25 35 44 54" + --8<-- "examples/event_handler_graphql/src/advanced_batch_resolver_payload.json" + ``` + +=== "advanced_batch_query.graphql" + ```typescript hl_lines="3 6" + --8<-- "examples/event_handler_graphql/src/advanced_batch_query.graphql" + ``` + +#### Async batch resolver + +Similar to `@batch_resolver` explained in [batch resolvers](#batch-resolvers), you can use `async_batch_resolver` to handle async functions. + +=== "advanced_batch_async_resolver.py" + ```python hl_lines="5 9 23" + --8<-- "examples/event_handler_graphql/src/advanced_batch_async_resolver.py" + ``` + + 1. `async_batch_resolver` takes care of running and waiting for coroutine completion. + +=== "advanced_batch_resolver_payload.json" + ```json hl_lines="6 16 25 35 44 54" + --8<-- "examples/event_handler_graphql/src/advanced_batch_resolver_payload.json" + ``` + +=== "advanced_batch_query.graphql" + ```typescript hl_lines="3 6" + --8<-- "examples/event_handler_graphql/src/advanced_batch_query.graphql" + ``` + ## Testing your code You can test your resolvers by passing a mocked or actual AppSync Lambda event that you're expecting. diff --git a/examples/event_handler_graphql/sam/template.yaml b/examples/event_handler_graphql/sam/template.yaml index bc4faa34319..1c75d18ae55 100644 --- a/examples/event_handler_graphql/sam/template.yaml +++ b/examples/event_handler_graphql/sam/template.yaml @@ -5,7 +5,7 @@ Description: Hello world Direct Lambda Resolver Globals: Function: Timeout: 5 - Runtime: python3.9 + Runtime: python3.12 Tracing: Active Environment: Variables: diff --git a/examples/event_handler_graphql/src/advanced_batch_async_resolver.py b/examples/event_handler_graphql/src/advanced_batch_async_resolver.py new file mode 100644 index 00000000000..e56802cf0c6 --- /dev/null +++ b/examples/event_handler_graphql/src/advanced_batch_async_resolver.py @@ -0,0 +1,39 @@ +from __future__ import annotations + +from typing import Any + +from aws_lambda_powertools.event_handler import AppSyncResolver +from aws_lambda_powertools.utilities.data_classes import AppSyncResolverEvent +from aws_lambda_powertools.utilities.typing import LambdaContext + +app = AppSyncResolver() + +# mimic DB data for simplicity +posts_related = { + "1": {"title": "post1"}, + "2": {"title": "post2"}, + "3": {"title": "post3"}, +} + + +async def search_batch_posts(posts: list) -> dict[str, Any]: + return {post_id: posts_related.get(post_id) for post_id in posts} + + +@app.async_batch_resolver(type_name="Query", field_name="relatedPosts") +async def related_posts(event: list[AppSyncResolverEvent]) -> list[Any]: + # Extract all post_ids in order + post_ids: list = [record.source.get("post_id") for record in event] + + # Get unique post_ids while preserving order + unique_post_ids = list(dict.fromkeys(post_ids)) + + # Fetch posts in a single batch operation + fetched_posts = await search_batch_posts(unique_post_ids) + + # Return results in original order + return [fetched_posts.get(post_id) for post_id in post_ids] + + +def lambda_handler(event, context: LambdaContext) -> dict: + return app.resolve(event, context) # (1)! diff --git a/examples/event_handler_graphql/src/advanced_batch_query.graphql b/examples/event_handler_graphql/src/advanced_batch_query.graphql new file mode 100644 index 00000000000..d89358dcde5 --- /dev/null +++ b/examples/event_handler_graphql/src/advanced_batch_query.graphql @@ -0,0 +1,12 @@ +query MyQuery { + getPost(post_id: "2") { + relatedPosts { + post_id + author + relatedPosts { + post_id + author + } + } + } +} diff --git a/examples/event_handler_graphql/src/advanced_batch_resolver.py b/examples/event_handler_graphql/src/advanced_batch_resolver.py new file mode 100644 index 00000000000..653ce59775e --- /dev/null +++ b/examples/event_handler_graphql/src/advanced_batch_resolver.py @@ -0,0 +1,39 @@ +from __future__ import annotations + +from typing import Any + +from aws_lambda_powertools.event_handler import AppSyncResolver +from aws_lambda_powertools.utilities.data_classes import AppSyncResolverEvent +from aws_lambda_powertools.utilities.typing import LambdaContext + +app = AppSyncResolver() + +# mimic DB data for simplicity +posts_related = { + "1": {"title": "post1"}, + "2": {"title": "post2"}, + "3": {"title": "post3"}, +} + + +def search_batch_posts(posts: list) -> dict[str, Any]: + return {post_id: posts_related.get(post_id) for post_id in posts} + + +@app.batch_resolver(type_name="Query", field_name="relatedPosts") +def related_posts(event: list[AppSyncResolverEvent]) -> list[Any]: # (1)! + # Extract all post_ids in order + post_ids: list = [record.source.get("post_id") for record in event] # (2)! + + # Get unique post_ids while preserving order + unique_post_ids = list(dict.fromkeys(post_ids)) + + # Fetch posts in a single batch operation + fetched_posts = search_batch_posts(unique_post_ids) + + # Return results in original order + return [fetched_posts.get(post_id) for post_id in post_ids] + + +def lambda_handler(event, context: LambdaContext) -> dict: + return app.resolve(event, context) diff --git a/examples/event_handler_graphql/src/advanced_batch_resolver_handling_error.py b/examples/event_handler_graphql/src/advanced_batch_resolver_handling_error.py new file mode 100644 index 00000000000..a4862c6e55b --- /dev/null +++ b/examples/event_handler_graphql/src/advanced_batch_resolver_handling_error.py @@ -0,0 +1,25 @@ +from typing import Any, Dict + +from aws_lambda_powertools import Logger +from aws_lambda_powertools.event_handler import AppSyncResolver +from aws_lambda_powertools.utilities.data_classes import AppSyncResolverEvent +from aws_lambda_powertools.utilities.typing import LambdaContext + +logger = Logger() +app = AppSyncResolver() + + +posts_related = { + "1": {"title": "post1"}, + "2": {"title": "post2"}, + "3": {"title": "post3"}, +} + + +@app.batch_resolver(type_name="Query", field_name="relatedPosts", aggregate=False, raise_on_error=True) # (1)! +def related_posts(event: AppSyncResolverEvent, post_id: str = "") -> Dict[str, Any]: + return posts_related[post_id] + + +def lambda_handler(event, context: LambdaContext) -> dict: + return app.resolve(event, context) diff --git a/examples/event_handler_graphql/src/advanced_batch_resolver_individual.py b/examples/event_handler_graphql/src/advanced_batch_resolver_individual.py new file mode 100644 index 00000000000..731ec11813f --- /dev/null +++ b/examples/event_handler_graphql/src/advanced_batch_resolver_individual.py @@ -0,0 +1,25 @@ +from typing import Any, Dict + +from aws_lambda_powertools import Logger +from aws_lambda_powertools.event_handler import AppSyncResolver +from aws_lambda_powertools.utilities.data_classes import AppSyncResolverEvent +from aws_lambda_powertools.utilities.typing import LambdaContext + +logger = Logger() +app = AppSyncResolver() + + +posts_related = { + "1": {"title": "post1"}, + "2": {"title": "post2"}, + "3": {"title": "post3"}, +} + + +@app.batch_resolver(type_name="Query", field_name="relatedPosts", aggregate=False) # (1)! +def related_posts(event: AppSyncResolverEvent, post_id: str = "") -> Dict[str, Any]: + return posts_related[post_id] + + +def lambda_handler(event, context: LambdaContext) -> dict: + return app.resolve(event, context) diff --git a/examples/event_handler_graphql/src/advanced_batch_resolver_payload.json b/examples/event_handler_graphql/src/advanced_batch_resolver_payload.json new file mode 100644 index 00000000000..6f4aaa2ff20 --- /dev/null +++ b/examples/event_handler_graphql/src/advanced_batch_resolver_payload.json @@ -0,0 +1,59 @@ +[ + { + "arguments":{}, + "identity":"None", + "source":{ + "post_id":"1", + "author":"Author1" + }, + "prev":"None", + "info":{ + "selectionSetList":[ + "post_id", + "author" + ], + "selectionSetGraphQL":"{\n post_id\n author\n}", + "fieldName":"relatedPosts", + "parentTypeName":"Post", + "variables":{} + } + }, + { + "arguments":{}, + "identity":"None", + "source":{ + "post_id":"2", + "author":"Author2" + }, + "prev":"None", + "info":{ + "selectionSetList":[ + "post_id", + "author" + ], + "selectionSetGraphQL":"{\n post_id\n author\n}", + "fieldName":"relatedPosts", + "parentTypeName":"Post", + "variables":{} + } + }, + { + "arguments":{}, + "identity":"None", + "source":{ + "post_id":"1", + "author":"Author1" + }, + "prev":"None", + "info":{ + "selectionSetList":[ + "post_id", + "author" + ], + "selectionSetGraphQL":"{\n post_id\n author\n}", + "fieldName":"relatedPosts", + "parentTypeName":"Post", + "variables":{} + } + } +] diff --git a/examples/event_handler_graphql/src/custom_models.py b/examples/event_handler_graphql/src/custom_models.py index 61e03318d14..84e920af280 100644 --- a/examples/event_handler_graphql/src/custom_models.py +++ b/examples/event_handler_graphql/src/custom_models.py @@ -36,7 +36,8 @@ def api_key(self) -> str: @app.resolver(type_name="Query", field_name="listLocations") def list_locations(page: int = 0, size: int = 10) -> List[Location]: # additional properties/methods will now be available under current_event - logger.debug(f"Request country origin: {app.current_event.country_viewer}") # type: ignore[attr-defined] + if app.current_event: + logger.debug(f"Request country origin: {app.current_event.country_viewer}") # type: ignore[attr-defined] return [{"id": scalar_types_utils.make_id(), "name": "Perry, James and Carroll"}] diff --git a/examples/event_handler_graphql/src/enable_exceptions_batch_resolver.py b/examples/event_handler_graphql/src/enable_exceptions_batch_resolver.py new file mode 100644 index 00000000000..f77374527ea --- /dev/null +++ b/examples/event_handler_graphql/src/enable_exceptions_batch_resolver.py @@ -0,0 +1,32 @@ +from typing import Dict, Optional + +from aws_lambda_powertools.event_handler import AppSyncResolver +from aws_lambda_powertools.utilities.data_classes import AppSyncResolverEvent +from aws_lambda_powertools.utilities.typing import LambdaContext + +app = AppSyncResolver() + + +posts_related = { + "1": {"title": "post1"}, + "2": {"title": "post2"}, + "3": {"title": "post3"}, +} + + +class PostRelatedNotFound(Exception): + ... + + +@app.batch_resolver(type_name="Query", field_name="relatedPosts", raise_on_error=True) # (1)! +def related_posts(event: AppSyncResolverEvent, post_id: str) -> Optional[Dict]: + post_found = posts_related.get(post_id, None) + + if not post_found: + raise PostRelatedNotFound(f"Unable to find a related post with ID {post_id}.") + + return post_found + + +def lambda_handler(event, context: LambdaContext) -> dict: + return app.resolve(event, context) diff --git a/examples/event_handler_graphql/src/enable_exceptions_batch_resolver_payload.json b/examples/event_handler_graphql/src/enable_exceptions_batch_resolver_payload.json new file mode 100644 index 00000000000..c0de86728ea --- /dev/null +++ b/examples/event_handler_graphql/src/enable_exceptions_batch_resolver_payload.json @@ -0,0 +1,52 @@ +[ + { + "arguments":{ + "post_id":"12" + }, + "identity":"None", + "source":"None", + "request":{ + "headers":{ + "x-forwarded-for":"18.68.31.36", + "sec-ch-ua-mobile":"?0" + } + }, + "prev":"None", + "info":{ + "fieldName":"relatedPosts", + "selectionSetList":[ + "relatedPosts" + ], + "selectionSetGraphQL":"{\n relatedPosts {\n downs\n content\n relatedPosts {\n ups\n downs\n relatedPosts {\n content\n downs\n }\n }\n }\n}", + "parentTypeName":"Query", + "variables":{ + + } + } + }, + { + "arguments":{ + "post_id":"1" + }, + "identity":"None", + "source":"None", + "request":{ + "headers":{ + "x-forwarded-for":"18.68.31.36", + "sec-ch-ua-mobile":"?0" + } + }, + "prev":"None", + "info":{ + "fieldName":"relatedPosts", + "selectionSetList":[ + "relatedPosts" + ], + "selectionSetGraphQL":"{\n relatedPosts {\n downs\n content\n relatedPosts {\n ups\n downs\n relatedPosts {\n content\n downs\n }\n }\n }\n}", + "parentTypeName":"Query", + "variables":{ + + } + } + } + ] diff --git a/examples/event_handler_graphql/src/split_operation_append_context_module.py b/examples/event_handler_graphql/src/split_operation_append_context_module.py index 15ed7af1b9e..c12c22fb626 100644 --- a/examples/event_handler_graphql/src/split_operation_append_context_module.py +++ b/examples/event_handler_graphql/src/split_operation_append_context_module.py @@ -1,7 +1,7 @@ from typing import List from aws_lambda_powertools import Logger, Tracer -from aws_lambda_powertools.event_handler.appsync import Router +from aws_lambda_powertools.event_handler.graphql_appsync.router import Router from aws_lambda_powertools.shared.types import TypedDict tracer = Tracer() diff --git a/examples/event_handler_graphql/src/split_operation_module.py b/examples/event_handler_graphql/src/split_operation_module.py index e4c7f978b73..5d7f9425c97 100644 --- a/examples/event_handler_graphql/src/split_operation_module.py +++ b/examples/event_handler_graphql/src/split_operation_module.py @@ -1,7 +1,7 @@ from typing import List from aws_lambda_powertools import Logger, Tracer -from aws_lambda_powertools.event_handler.appsync import Router +from aws_lambda_powertools.event_handler.graphql_appsync.router import Router from aws_lambda_powertools.shared.types import TypedDict tracer = Tracer() diff --git a/mkdocs.yml b/mkdocs.yml index efd18efd5cc..d2bab86cd22 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -116,7 +116,7 @@ markdown_extensions: - meta - toc: permalink: true - toc_depth: 4 + toc_depth: 5 - attr_list - md_in_html - pymdownx.emoji: diff --git a/poetry.lock b/poetry.lock index 521fdeb308d..74dc0fe18ec 100644 --- a/poetry.lock +++ b/poetry.lock @@ -170,6 +170,24 @@ jsii = ">=1.92.0,<2.0.0" publication = ">=0.0.3" typeguard = ">=2.13.3,<2.14.0" +[[package]] +name = "aws-cdk-aws-appsync-alpha" +version = "2.59.0a0" +description = "The CDK Construct Library for AWS::AppSync" +optional = false +python-versions = "~=3.7" +files = [ + {file = "aws-cdk.aws-appsync-alpha-2.59.0a0.tar.gz", hash = "sha256:f5c7773b70b759efd576561dc3d71af5762a6f7cbc9ee9eef5e538c7ab3dccc7"}, + {file = "aws_cdk.aws_appsync_alpha-2.59.0a0-py3-none-any.whl", hash = "sha256:ecc235f1f70d404c8d03cf250be0227becd14c468f8c43b6d9df334a1d60c8e2"}, +] + +[package.dependencies] +aws-cdk-lib = ">=2.59.0,<3.0.0" +constructs = ">=10.0.0,<11.0.0" +jsii = ">=1.72.0,<2.0.0" +publication = ">=0.0.3" +typeguard = ">=2.13.3,<2.14.0" + [[package]] name = "aws-cdk-aws-lambda-python-alpha" version = "2.147.1a0" @@ -1502,13 +1520,13 @@ files = [ [[package]] name = "jsii" -version = "1.100.0" +version = "1.101.0" description = "Python client for jsii runtime" optional = false python-versions = "~=3.8" files = [ - {file = "jsii-1.100.0-py3-none-any.whl", hash = "sha256:3564188028b5b2ddf7fbd50bb28555125e684a6c4540f591c9be088bc073fdb8"}, - {file = "jsii-1.100.0.tar.gz", hash = "sha256:b267f651086cbdbb2ba336099361ebfc109dbe86f3c8671636338090c4b4d2f4"}, + {file = "jsii-1.101.0-py3-none-any.whl", hash = "sha256:b78b87f8316560040ad0b9dca1682d73b6532a33acf4ecf56185d1ae5edb54fa"}, + {file = "jsii-1.101.0.tar.gz", hash = "sha256:043c4d3d0d09af3c7265747f4da9c95770232477f75c846640df4c63d01b19cb"}, ] [package.dependencies] @@ -2741,13 +2759,13 @@ toml = ["tomli (>=2.0.1)"] [[package]] name = "redis" -version = "5.0.6" +version = "5.0.7" description = "Python client for Redis database and key-value store" optional = false python-versions = ">=3.7" files = [ - {file = "redis-5.0.6-py3-none-any.whl", hash = "sha256:c0d6d990850c627bbf7be01c5c4cbaadf67b48593e913bb71c9819c30df37eee"}, - {file = "redis-5.0.6.tar.gz", hash = "sha256:38473cd7c6389ad3e44a91f4c3eaf6bcb8a9f746007f29bf4fb20824ff0b2197"}, + {file = "redis-5.0.7-py3-none-any.whl", hash = "sha256:0e479e24da960c690be5d9b96d21f7b918a98c0cf49af3b6fafaa0753f93a0db"}, + {file = "redis-5.0.7.tar.gz", hash = "sha256:8f611490b93c8109b50adc317b31bfd84fff31def3475b92e7e80bf39f48175b"}, ] [package.dependencies] @@ -3066,13 +3084,13 @@ crt = ["botocore[crt] (>=1.33.2,<2.0a.0)"] [[package]] name = "sentry-sdk" -version = "2.6.0" +version = "2.7.0" description = "Python client for Sentry (https://sentry.io)" optional = false python-versions = ">=3.6" files = [ - {file = "sentry_sdk-2.6.0-py2.py3-none-any.whl", hash = "sha256:422b91cb49378b97e7e8d0e8d5a1069df23689d45262b86f54988a7db264e874"}, - {file = "sentry_sdk-2.6.0.tar.gz", hash = "sha256:65cc07e9c6995c5e316109f138570b32da3bd7ff8d0d0ee4aaf2628c3dd8127d"}, + {file = "sentry_sdk-2.7.0-py2.py3-none-any.whl", hash = "sha256:db9594c27a4d21c1ebad09908b1f0dc808ef65c2b89c1c8e7e455143262e37c1"}, + {file = "sentry_sdk-2.7.0.tar.gz", hash = "sha256:d846a211d4a0378b289ced3c434480945f110d0ede00450ba631fc2852e7a0d4"}, ] [package.dependencies] @@ -3102,7 +3120,7 @@ langchain = ["langchain (>=0.0.210)"] loguru = ["loguru (>=0.5)"] openai = ["openai (>=1.0.0)", "tiktoken (>=0.3.0)"] opentelemetry = ["opentelemetry-distro (>=0.35b0)"] -opentelemetry-experimental = ["opentelemetry-distro (>=0.40b0,<1.0)", "opentelemetry-instrumentation-aiohttp-client (>=0.40b0,<1.0)", "opentelemetry-instrumentation-django (>=0.40b0,<1.0)", "opentelemetry-instrumentation-fastapi (>=0.40b0,<1.0)", "opentelemetry-instrumentation-flask (>=0.40b0,<1.0)", "opentelemetry-instrumentation-requests (>=0.40b0,<1.0)", "opentelemetry-instrumentation-sqlite3 (>=0.40b0,<1.0)", "opentelemetry-instrumentation-urllib (>=0.40b0,<1.0)"] +opentelemetry-experimental = ["opentelemetry-instrumentation-aio-pika (==0.46b0)", "opentelemetry-instrumentation-aiohttp-client (==0.46b0)", "opentelemetry-instrumentation-aiopg (==0.46b0)", "opentelemetry-instrumentation-asgi (==0.46b0)", "opentelemetry-instrumentation-asyncio (==0.46b0)", "opentelemetry-instrumentation-asyncpg (==0.46b0)", "opentelemetry-instrumentation-aws-lambda (==0.46b0)", "opentelemetry-instrumentation-boto (==0.46b0)", "opentelemetry-instrumentation-boto3sqs (==0.46b0)", "opentelemetry-instrumentation-botocore (==0.46b0)", "opentelemetry-instrumentation-cassandra (==0.46b0)", "opentelemetry-instrumentation-celery (==0.46b0)", "opentelemetry-instrumentation-confluent-kafka (==0.46b0)", "opentelemetry-instrumentation-dbapi (==0.46b0)", "opentelemetry-instrumentation-django (==0.46b0)", "opentelemetry-instrumentation-elasticsearch (==0.46b0)", "opentelemetry-instrumentation-falcon (==0.46b0)", "opentelemetry-instrumentation-fastapi (==0.46b0)", "opentelemetry-instrumentation-flask (==0.46b0)", "opentelemetry-instrumentation-grpc (==0.46b0)", "opentelemetry-instrumentation-httpx (==0.46b0)", "opentelemetry-instrumentation-jinja2 (==0.46b0)", "opentelemetry-instrumentation-kafka-python (==0.46b0)", "opentelemetry-instrumentation-logging (==0.46b0)", "opentelemetry-instrumentation-mysql (==0.46b0)", "opentelemetry-instrumentation-mysqlclient (==0.46b0)", "opentelemetry-instrumentation-pika (==0.46b0)", "opentelemetry-instrumentation-psycopg (==0.46b0)", "opentelemetry-instrumentation-psycopg2 (==0.46b0)", "opentelemetry-instrumentation-pymemcache (==0.46b0)", "opentelemetry-instrumentation-pymongo (==0.46b0)", "opentelemetry-instrumentation-pymysql (==0.46b0)", "opentelemetry-instrumentation-pyramid (==0.46b0)", "opentelemetry-instrumentation-redis (==0.46b0)", "opentelemetry-instrumentation-remoulade (==0.46b0)", "opentelemetry-instrumentation-requests (==0.46b0)", "opentelemetry-instrumentation-sklearn (==0.46b0)", "opentelemetry-instrumentation-sqlalchemy (==0.46b0)", "opentelemetry-instrumentation-sqlite3 (==0.46b0)", "opentelemetry-instrumentation-starlette (==0.46b0)", "opentelemetry-instrumentation-system-metrics (==0.46b0)", "opentelemetry-instrumentation-threading (==0.46b0)", "opentelemetry-instrumentation-tornado (==0.46b0)", "opentelemetry-instrumentation-tortoiseorm (==0.46b0)", "opentelemetry-instrumentation-urllib (==0.46b0)", "opentelemetry-instrumentation-urllib3 (==0.46b0)", "opentelemetry-instrumentation-wsgi (==0.46b0)"] pure-eval = ["asttokens", "executing", "pure-eval"] pymongo = ["pymongo (>=3.1)"] pyspark = ["pyspark (>=2.4.4)"] @@ -3698,4 +3716,4 @@ validation = ["fastjsonschema"] [metadata] lock-version = "2.0" python-versions = ">=3.8,<4.0.0" -content-hash = "b51afbf69445f2f45a3a3e0134cdb3c7f5f654468d633d10d4fc356612fd94a4" +content-hash = "25868c659cf950a66656c81b83c7ff3dfd1b21fb221e2f1cbd57b058323ae114" diff --git a/pyproject.toml b/pyproject.toml index 5aee9f7077d..d8142524122 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -118,6 +118,7 @@ datamasking = ["aws-encryption-sdk", "jsonpath-ng"] cfn-lint = "1.3.5" mypy = "^1.1.1" types-python-dateutil = "^2.8.19.6" +aws-cdk-aws-appsync-alpha = "^2.59.0a0" httpx = ">=0.23.3,<0.28.0" sentry-sdk = ">=1.22.2,<3.0.0" ruff = ">=0.0.272,<0.4.11" diff --git a/tests/e2e/event_handler_appsync/__init__.py b/tests/e2e/event_handler_appsync/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/e2e/event_handler_appsync/conftest.py b/tests/e2e/event_handler_appsync/conftest.py new file mode 100644 index 00000000000..1f6d8c406de --- /dev/null +++ b/tests/e2e/event_handler_appsync/conftest.py @@ -0,0 +1,19 @@ +import pytest + +from tests.e2e.event_handler_appsync.infrastructure import EventHandlerAppSyncStack + + +@pytest.fixture(autouse=True, scope="package") +def infrastructure(): + """Setup and teardown logic for E2E test infrastructure + + Yields + ------ + Dict[str, str] + CloudFormation Outputs from deployed infrastructure + """ + stack = EventHandlerAppSyncStack() + try: + yield stack.deploy() + finally: + stack.delete() diff --git a/tests/e2e/event_handler_appsync/files/schema.graphql b/tests/e2e/event_handler_appsync/files/schema.graphql new file mode 100644 index 00000000000..9733ba2f666 --- /dev/null +++ b/tests/e2e/event_handler_appsync/files/schema.graphql @@ -0,0 +1,22 @@ +schema { + query: Query +} + +type Query { + getPost(post_id:ID!): Post + allPosts: [Post] +} + +type Post { + post_id: ID! + author: String! + title: String + content: String + url: String + ups: Int + downs: Int + relatedPosts: [Post] + relatedPostsAsync: [Post] + relatedPostsAggregate: [Post] + relatedPostsAsyncAggregate: [Post] +} diff --git a/tests/e2e/event_handler_appsync/handlers/appsync_resolver_handler.py b/tests/e2e/event_handler_appsync/handlers/appsync_resolver_handler.py new file mode 100644 index 00000000000..594290f478d --- /dev/null +++ b/tests/e2e/event_handler_appsync/handlers/appsync_resolver_handler.py @@ -0,0 +1,114 @@ +from typing import List, Optional + +from pydantic import BaseModel + +from aws_lambda_powertools.event_handler import AppSyncResolver +from aws_lambda_powertools.utilities.data_classes import AppSyncResolverEvent +from aws_lambda_powertools.utilities.typing import LambdaContext + +app = AppSyncResolver() + + +posts = { + "1": { + "post_id": "1", + "title": "First book", + "author": "Author1", + "url": "https://amazon.com/", + "content": "SAMPLE TEXT AUTHOR 1", + "ups": "100", + "downs": "10", + }, + "2": { + "post_id": "2", + "title": "Second book", + "author": "Author2", + "url": "https://amazon.com", + "content": "SAMPLE TEXT AUTHOR 2", + "ups": "100", + "downs": "10", + }, + "3": { + "post_id": "3", + "title": "Third book", + "author": "Author3", + "url": None, + "content": None, + "ups": None, + "downs": None, + }, + "4": { + "post_id": "4", + "title": "Fourth book", + "author": "Author4", + "url": "https://www.amazon.com/", + "content": "SAMPLE TEXT AUTHOR 4", + "ups": "1000", + "downs": "0", + }, + "5": { + "post_id": "5", + "title": "Fifth book", + "author": "Author5", + "url": "https://www.amazon.com/", + "content": "SAMPLE TEXT AUTHOR 5", + "ups": "50", + "downs": "0", + }, +} + +posts_related = { + "1": [posts["4"]], + "2": [posts["3"], posts["5"]], + "3": [posts["2"], posts["1"]], + "4": [posts["2"], posts["1"]], + "5": [], +} + + +class Post(BaseModel): + post_id: str + author: str + title: str + url: str + content: str + ups: str + downs: str + + +# PROCESSING SINGLE RESOLVERS +@app.resolver(type_name="Query", field_name="getPost") +def get_post(post_id: str = "") -> dict: + post = Post(**posts[post_id]).dict() + return post + + +@app.resolver(type_name="Query", field_name="allPosts") +def all_posts() -> List[dict]: + return list(posts.values()) + + +# PROCESSING BATCH WITHOUT AGGREGATION +@app.batch_resolver(type_name="Post", field_name="relatedPosts", aggregate=False) +def related_posts(event: AppSyncResolverEvent) -> Optional[list]: + return posts_related[event.source["post_id"]] if event.source else None + + +@app.async_batch_resolver(type_name="Post", field_name="relatedPostsAsync", aggregate=False) +async def related_posts_async(event: AppSyncResolverEvent) -> Optional[list]: + return posts_related[event.source["post_id"]] if event.source else None + + +# PROCESSING BATCH WITH AGGREGATION +@app.batch_resolver(type_name="Post", field_name="relatedPostsAggregate") +def related_posts_aggregate(event: List[AppSyncResolverEvent]) -> Optional[list]: + return [posts_related[record.source.get("post_id")] for record in event] + + +@app.async_batch_resolver(type_name="Post", field_name="relatedPostsAsyncAggregate") +async def related_posts_async_aggregate(event: List[AppSyncResolverEvent]) -> Optional[list]: + return [posts_related[record.source.get("post_id")] for record in event] + + +def lambda_handler(event, context: LambdaContext) -> dict: + return app.resolve(event, context) diff --git a/tests/e2e/event_handler_appsync/infrastructure.py b/tests/e2e/event_handler_appsync/infrastructure.py new file mode 100644 index 00000000000..1a07270572a --- /dev/null +++ b/tests/e2e/event_handler_appsync/infrastructure.py @@ -0,0 +1,76 @@ +from pathlib import Path + +from aws_cdk import CfnOutput, Duration, Expiration +from aws_cdk import aws_appsync_alpha as appsync +from aws_cdk.aws_lambda import Function + +from tests.e2e.utils.data_builder import build_random_value +from tests.e2e.utils.infrastructure import BaseInfrastructure + + +class EventHandlerAppSyncStack(BaseInfrastructure): + def create_resources(self): + functions = self.create_lambda_functions() + + self._create_appsync_endpoint(function=functions["AppsyncResolverHandler"]) + + def _create_appsync_endpoint(self, function: Function): + api = appsync.GraphqlApi( + self.stack, + "Api", + name=f"e2e-tests{build_random_value()}", + schema=appsync.SchemaFile.from_asset(str(Path(self.feature_path, "files/schema.graphql"))), + authorization_config=appsync.AuthorizationConfig( + default_authorization=appsync.AuthorizationMode( + authorization_type=appsync.AuthorizationType.API_KEY, + api_key_config=appsync.ApiKeyConfig( + description="public key for getting data", + expires=Expiration.after(Duration.hours(25)), + name="API Token", + ), + ), + ), + xray_enabled=False, + ) + lambda_datasource = api.add_lambda_data_source("DataSource", lambda_function=function) + + lambda_datasource.create_resolver( + "QueryGetAllPostsResolver", + type_name="Query", + field_name="allPosts", + ) + lambda_datasource.create_resolver( + "QueryGetPostResolver", + type_name="Query", + field_name="getPost", + ) + lambda_datasource.create_resolver( + "QueryGetPostRelatedResolver", + type_name="Post", + field_name="relatedPosts", + max_batch_size=10, + ) + + lambda_datasource.create_resolver( + "QueryGetPostRelatedAsyncResolver", + type_name="Post", + field_name="relatedPostsAsync", + max_batch_size=10, + ) + + lambda_datasource.create_resolver( + "QueryGetPostRelatedResolverAggregate", + type_name="Post", + field_name="relatedPostsAggregate", + max_batch_size=10, + ) + + lambda_datasource.create_resolver( + "QueryGetPostRelatedAsyncResolverAggregate", + type_name="Post", + field_name="relatedPostsAsyncAggregate", + max_batch_size=10, + ) + + CfnOutput(self.stack, "GraphQLHTTPUrl", value=api.graphql_url) + CfnOutput(self.stack, "GraphQLAPIKey", value=api.api_key) diff --git a/tests/e2e/event_handler_appsync/test_appsync_resolvers.py b/tests/e2e/event_handler_appsync/test_appsync_resolvers.py new file mode 100644 index 00000000000..35549a1fdef --- /dev/null +++ b/tests/e2e/event_handler_appsync/test_appsync_resolvers.py @@ -0,0 +1,176 @@ +import json + +import pytest +from requests import Request + +from tests.e2e.utils import data_fetcher + + +@pytest.fixture +def appsync_endpoint(infrastructure: dict) -> str: + return infrastructure["GraphQLHTTPUrl"] + + +@pytest.fixture +def appsync_access_key(infrastructure: dict) -> str: + return infrastructure["GraphQLAPIKey"] + + +@pytest.mark.xdist_group(name="event_handler") +def test_appsync_get_all_posts(appsync_endpoint, appsync_access_key): + # GIVEN + body = { + "query": "query MyQuery { allPosts { post_id }}", + "variables": None, + "operationName": "MyQuery", + } + + # WHEN + response = data_fetcher.get_http_response( + Request( + method="POST", + url=appsync_endpoint, + json=body, + headers={"x-api-key": appsync_access_key, "Content-Type": "application/json"}, + ), + ) + + # THEN expect a HTTP 200 response and content return list of Posts + assert response.status_code == 200 + assert response.content is not None + + data = json.loads(response.content.decode("ascii"))["data"] + + assert data["allPosts"] is not None + assert len(data["allPosts"]) > 0 + + +@pytest.mark.xdist_group(name="event_handler") +def test_appsync_get_post(appsync_endpoint, appsync_access_key): + # GIVEN + post_id = "1" + body = { + "query": f'query MyQuery {{ getPost(post_id: "{post_id}") {{ post_id }} }}', + "variables": None, + "operationName": "MyQuery", + } + + # WHEN + response = data_fetcher.get_http_response( + Request( + method="POST", + url=appsync_endpoint, + json=body, + headers={"x-api-key": appsync_access_key, "Content-Type": "application/json"}, + ), + ) + + # THEN expect a HTTP 200 response and content return Post id + assert response.status_code == 200 + assert response.content is not None + + data = json.loads(response.content.decode("ascii"))["data"] + + assert data["getPost"]["post_id"] == post_id + + +@pytest.mark.xdist_group(name="event_handler") +def test_appsync_get_related_posts_batch_without_aggregate(appsync_endpoint, appsync_access_key): + # GIVEN a batch event + post_id = "2" + related_posts_ids = ["3", "5"] + + body = { + "query": f""" + query MyQuery {{ + getPost(post_id: "{post_id}") {{ + post_id + relatedPosts {{ + post_id + }} + relatedPostsAsync {{ + post_id + }} + }} + }} + """, + "variables": None, + "operationName": "MyQuery", + } + + # WHEN we invoke the AppSync API with a batch event + response = data_fetcher.get_http_response( + Request( + method="POST", + url=appsync_endpoint, + json=body, + headers={"x-api-key": appsync_access_key, "Content-Type": "application/json"}, + ), + ) + + # THEN expect a HTTP 200 response and content return Post id with dependent Posts id's + assert response.status_code == 200 + assert response.content is not None + + data = json.loads(response.content.decode("ascii"))["data"] + + assert data["getPost"]["post_id"] == post_id + + assert len(data["getPost"]["relatedPosts"]) == len(related_posts_ids) + for post in data["getPost"]["relatedPosts"]: + assert post["post_id"] in related_posts_ids + + assert len(data["getPost"]["relatedPostsAsync"]) == len(related_posts_ids) + for post in data["getPost"]["relatedPostsAsync"]: + assert post["post_id"] in related_posts_ids + + +@pytest.mark.xdist_group(name="event_handler") +def test_appsync_get_related_posts_batch_with_aggregate(appsync_endpoint, appsync_access_key): + # GIVEN a batch event + post_id = "2" + related_posts_ids = ["3", "5"] + + body = { + "query": f""" + query MyQuery {{ + getPost(post_id: "{post_id}") {{ + post_id + relatedPostsAggregate {{ + post_id + }} + relatedPostsAsyncAggregate {{ + post_id + }} + }} + }} + """, + "variables": None, + "operationName": "MyQuery", + } + + # WHEN we invoke the AppSync API with a batch event + response = data_fetcher.get_http_response( + Request( + method="POST", + url=appsync_endpoint, + json=body, + headers={"x-api-key": appsync_access_key, "Content-Type": "application/json"}, + ), + ) + + # THEN expect a HTTP 200 response and content return Post id with dependent Posts id's + assert response.status_code == 200 + assert response.content is not None + + data = json.loads(response.content.decode("ascii"))["data"] + + assert data["getPost"]["post_id"] == post_id + + assert len(data["getPost"]["relatedPostsAggregate"]) == len(related_posts_ids) + for post in data["getPost"]["relatedPostsAggregate"]: + assert post["post_id"] in related_posts_ids + + assert len(data["getPost"]["relatedPostsAsyncAggregate"]) == len(related_posts_ids) + for post in data["getPost"]["relatedPostsAsyncAggregate"]: + assert post["post_id"] in related_posts_ids diff --git a/tests/functional/event_handler/required_dependencies/appsync/__init__.py b/tests/functional/event_handler/required_dependencies/appsync/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/functional/event_handler/required_dependencies/appsync/test_appsync_batch_resolvers.py b/tests/functional/event_handler/required_dependencies/appsync/test_appsync_batch_resolvers.py new file mode 100644 index 00000000000..cc78cbf0f9c --- /dev/null +++ b/tests/functional/event_handler/required_dependencies/appsync/test_appsync_batch_resolvers.py @@ -0,0 +1,909 @@ +from typing import List, Optional + +import pytest + +from aws_lambda_powertools.event_handler import AppSyncResolver +from aws_lambda_powertools.event_handler.graphql_appsync.exceptions import InvalidBatchResponse, ResolverNotFoundError +from aws_lambda_powertools.event_handler.graphql_appsync.router import Router +from aws_lambda_powertools.utilities.data_classes import AppSyncResolverEvent +from aws_lambda_powertools.utilities.typing import LambdaContext +from aws_lambda_powertools.warnings import PowertoolsUserWarning + + +# TESTS RECEIVING THE EVENT PARTIALLY AND PROCESS EACH RECORD PER TIME. +def test_resolve_batch_processing_with_related_events_one_at_time(): + # GIVEN An event with multiple requests to fetch related posts for different post IDs. + event = [ + { + "arguments": {}, + "identity": "None", + "source": { + "post_id": "3", + "title": "Third book", + }, + "info": { + "selectionSetList": [ + "title", + ], + "selectionSetGraphQL": "{\n title\n}", + "fieldName": "relatedPosts", + "parentTypeName": "Post", + }, + }, + { + "arguments": {}, + "identity": "None", + "source": { + "post_id": "4", + "title": "Fifth book", + }, + "info": { + "selectionSetList": [ + "title", + ], + "selectionSetGraphQL": "{\n title\n}", + "fieldName": "relatedPosts", + "parentTypeName": "Post", + }, + }, + { + "arguments": {}, + "identity": "None", + "source": { + "post_id": "1", + "title": "First book", + }, + "info": { + "selectionSetList": [ + "title", + ], + "selectionSetGraphQL": "{\n title\n}", + "fieldName": "relatedPosts", + "parentTypeName": "Post", + }, + }, + ] + + # GIVEN A dictionary of posts and a dictionary of related posts. + posts = { + "1": { + "post_id": "1", + "title": "First book", + }, + "2": { + "post_id": "2", + "title": "Second book", + }, + "3": { + "post_id": "3", + "title": "Third book", + }, + "4": { + "post_id": "4", + "title": "Fourth book", + }, + } + + posts_related = { + "1": [posts["2"]], + "2": [posts["3"], posts["4"], posts["1"]], + "3": [posts["2"], posts["1"]], + "4": [posts["3"], posts["1"]], + } + + app = AppSyncResolver() + + @app.batch_resolver(type_name="Post", field_name="relatedPosts", aggregate=False) + def related_posts(event: AppSyncResolverEvent) -> Optional[list]: + return posts_related[event.source["post_id"]] + + # WHEN related_posts function, which is the batch resolver, is called with the event. + result = app.resolve(event, LambdaContext()) + + # THEN the result must be a list of related posts + assert result == [ + posts_related["3"], + posts_related["4"], + posts_related["1"], + ] + + +# Batch resolver tests +def test_resolve_batch_processing_with_simple_queries_one_at_time(): + # GIVEN a list of events representing GraphQL queries for listing locations + event = [ + { + "typeName": "Query", + "info": { + "fieldName": "listLocations", + "parentTypeName": "Post", + }, + "fieldName": "listLocations", + "arguments": {}, + "source": { + "id": "1", + }, + }, + { + "typeName": "Query", + "info": { + "fieldName": "listLocations", + "parentTypeName": "Post", + }, + "fieldName": "listLocations", + "arguments": {}, + "source": { + "id": "2", + }, + }, + { + "typeName": "Query", + "info": { + "fieldName": "listLocations", + "parentTypeName": "Post", + }, + "fieldName": "listLocations", + "arguments": {}, + "source": { + "id": [3, 4], + }, + }, + ] + + app = AppSyncResolver() + + # WHEN the batch resolver for the listLocations field is defined + @app.batch_resolver(field_name="listLocations", aggregate=False) + def create_something(event: AppSyncResolverEvent) -> Optional[list]: # noqa AA03 VNE003 + return event.source["id"] if event.source else None + + # THEN the resolver should correctly process the batch of queries + result = app.resolve(event, LambdaContext()) + assert result == [appsync_event["source"]["id"] for appsync_event in event] + + assert app.current_batch_event and len(app.current_batch_event) == len(event) + assert not app.current_event + + +def test_resolve_batch_processing_with_raise_on_exception_one_at_time(): + # GIVEN a list of events representing GraphQL queries for listing locations + event = [ + { + "typeName": "Query", + "info": { + "fieldName": "listLocations", + "parentTypeName": "Post", + }, + "fieldName": "listLocations", + "arguments": {}, + "source": { + "id": "1", + }, + }, + { + "typeName": "Query", + "info": { + "fieldName": "listLocations", + "parentTypeName": "Post", + }, + "fieldName": "listLocations", + "arguments": {}, + "source": { + "id": "2", + }, + }, + { + "typeName": "Query", + "info": { + "fieldName": "listLocations", + "parentTypeName": "Post", + }, + "fieldName": "listLocations", + "arguments": {}, + "source": { + "id": [3, 4], + }, + }, + ] + + app = AppSyncResolver() + + # WHEN the sync batch resolver for the 'listLocations' field is defined with raise_on_error=True + @app.batch_resolver(field_name="listLocations", raise_on_error=True, aggregate=False) + def create_something(event: AppSyncResolverEvent) -> Optional[list]: # noqa AA03 VNE003 + raise RuntimeError + + # THEN the resolver should raise a RuntimeError when processing the batch of queries + with pytest.raises(RuntimeError): + app.resolve(event, LambdaContext()) + + +def test_async_resolve_batch_processing_with_raise_on_exception_one_at_time(): + # GIVEN a list of events representing GraphQL queries for listing locations + event = [ + { + "typeName": "Query", + "info": { + "fieldName": "listLocations", + "parentTypeName": "Post", + }, + "fieldName": "listLocations", + "arguments": {}, + "source": { + "id": "1", + }, + }, + { + "typeName": "Query", + "info": { + "fieldName": "listLocations", + "parentTypeName": "Post", + }, + "fieldName": "listLocations", + "arguments": {}, + "source": { + "id": "2", + }, + }, + { + "typeName": "Query", + "info": { + "fieldName": "listLocations", + "parentTypeName": "Post", + }, + "fieldName": "listLocations", + "arguments": {}, + "source": { + "id": [3, 4], + }, + }, + ] + + app = AppSyncResolver() + + # WHEN the async batch resolver for the 'listLocations' field is defined with raise_on_error=True + @app.async_batch_resolver(field_name="listLocations", raise_on_error=True, aggregate=False) + async def create_something(event: AppSyncResolverEvent) -> Optional[list]: # noqa AA03 VNE003 + raise RuntimeError + + # THEN the resolver should raise a RuntimeError when processing the batch of queries + with pytest.raises(RuntimeError): + app.resolve(event, LambdaContext()) + + +def test_resolve_batch_processing_without_exception_one_at_time(): + event = [ + { + "typeName": "Query", + "info": { + "fieldName": "listLocations", + "parentTypeName": "Post", + }, + "fieldName": "listLocations", + "arguments": {}, + "source": { + "id": "1", + }, + }, + { + "typeName": "Query", + "info": { + "fieldName": "listLocations", + "parentTypeName": "Post", + }, + "fieldName": "listLocations", + "arguments": {}, + "source": { + "id": "2", + }, + }, + { + "typeName": "Query", + "info": { + "fieldName": "listLocations", + "parentTypeName": "Post", + }, + "fieldName": "listLocations", + "arguments": {}, + "source": { + "id": [3, 4], + }, + }, + ] + + app = AppSyncResolver() + + @app.batch_resolver(field_name="listLocations", raise_on_error=False, aggregate=False) + def create_something(event: AppSyncResolverEvent) -> Optional[list]: # noqa AA03 VNE003 + raise RuntimeError + + # Call the implicit handler + result = app.resolve(event, LambdaContext()) + assert result == [None, None, None] + + assert app.current_batch_event and len(app.current_batch_event) == len(event) + assert not app.current_event + + +def test_resolve_async_batch_processing_without_exception_one_at_time(): + # GIVEN a list of events representing GraphQL queries for listing locations + event = [ + { + "typeName": "Query", + "info": { + "fieldName": "listLocations", + "parentTypeName": "Post", + }, + "fieldName": "listLocations", + "arguments": {}, + "source": { + "id": "1", + }, + }, + { + "typeName": "Query", + "info": { + "fieldName": "listLocations", + "parentTypeName": "Post", + }, + "fieldName": "listLocations", + "arguments": {}, + "source": { + "id": "2", + }, + }, + { + "typeName": "Query", + "info": { + "fieldName": "listLocations", + "parentTypeName": "Post", + }, + "fieldName": "listLocations", + "arguments": {}, + "source": { + "id": [3, 4], + }, + }, + ] + + app = AppSyncResolver() + + # WHEN the batch resolver for the 'listLocations' field is defined with raise_on_error=False + @app.async_batch_resolver(field_name="listLocations", raise_on_error=False, aggregate=False) + async def create_something(event: AppSyncResolverEvent) -> Optional[list]: # noqa AA03 VNE003 + raise RuntimeError + + result = app.resolve(event, LambdaContext()) + + # THEN the resolver should return None for each event in the batch + assert len(app.current_batch_event) == len(event) + assert result == [None, None, None] + + +def test_resolver_batch_with_resolver_not_found_one_at_time(): + # GIVEN a AppSyncResolver + app = AppSyncResolver() + router = Router() + + # WHEN we have an event + # WHEN the event field_name doesn't match with the resolver field_name + mock_event1 = [ + { + "typeName": "Query", + "info": { + "fieldName": "listCars", + "parentTypeName": "Query", + }, + "fieldName": "listCars", + "arguments": {"name": "value"}, + "source": { + "id": "1", + }, + }, + ] + + @router.batch_resolver(type_name="Query", field_name="listLocations", aggregate=False) + def get_locations(event: AppSyncResolverEvent, name: str) -> str: + return f"get_locations#{name}#" + event.source["id"] + + app.include_router(router) + + # THEN must fail with ResolverNotFoundError + with pytest.raises(ResolverNotFoundError, match="No resolver found for.*"): + app.resolve(mock_event1, LambdaContext()) + + +def test_resolver_batch_with_sync_and_async_resolver_at_same_time(): + # GIVEN a AppSyncResolver + app = AppSyncResolver() + router = Router() + + # WHEN we have an event + # WHEN the event field_name doesn't match with the resolver field_name + mock_event1 = [ + { + "typeName": "Query", + "info": { + "fieldName": "listCars", + "parentTypeName": "Query", + }, + "fieldName": "listCars", + "arguments": {"name": "value"}, + "source": { + "id": "1", + }, + }, + ] + + @router.batch_resolver(type_name="Query", field_name="listCars", aggregate=False) + def get_locations(event: AppSyncResolverEvent, name: str) -> str: + return f"get_locations#{name}#" + event.source["id"] + + @router.async_batch_resolver(type_name="Query", field_name="listCars", aggregate=False) + async def get_locations_async(event: AppSyncResolverEvent, name: str) -> str: + return f"get_locations#{name}#" + event.source["id"] + + app.include_router(router) + + # THEN must raise a PowertoolsUserWarning + with pytest.warns(PowertoolsUserWarning, match="Both synchronous and asynchronous resolvers*"): + app.resolve(mock_event1, LambdaContext()) + + +def test_batch_resolver_with_router(): + # GIVEN an AppSyncResolver and a Router instance + app = AppSyncResolver() + router = Router() + + @router.batch_resolver(type_name="Query", field_name="listLocations", aggregate=False) + def get_locations(event: AppSyncResolverEvent, name: str) -> str: + return f"get_locations#{name}#" + event.source["id"] + + @router.batch_resolver(field_name="listLocations2", aggregate=False) + def get_locations2(event: AppSyncResolverEvent, name: str) -> str: + return f"get_locations2#{name}#" + event.source["id"] + + # WHEN we include the routes + app.include_router(router) + + mock_event1 = [ + { + "typeName": "Query", + "info": { + "fieldName": "listLocations", + "parentTypeName": "Query", + }, + "fieldName": "listLocations", + "arguments": {"name": "value"}, + "source": { + "id": "1", + }, + }, + ] + mock_event2 = [ + { + "typeName": "Query", + "info": { + "fieldName": "listLocations2", + "parentTypeName": "Post", + }, + "fieldName": "listLocations2", + "arguments": {"name": "value"}, + "source": { + "id": "2", + }, + }, + ] + result1 = app.resolve(mock_event1, LambdaContext()) + result2 = app.resolve(mock_event2, LambdaContext()) + + # THEN the resolvers should return the expected results + assert result1 == ["get_locations#value#1"] + assert result2 == ["get_locations2#value#2"] + + +def test_resolve_async_batch_processing(): + # GIVEN a list of events representing GraphQL queries for listing locations + event = [ + { + "typeName": "Query", + "info": { + "fieldName": "listLocations", + "parentTypeName": "Post", + }, + "fieldName": "listLocations", + "arguments": {}, + "source": { + "id": "1", + }, + }, + { + "typeName": "Query", + "info": { + "fieldName": "listLocations", + "parentTypeName": "Post", + }, + "fieldName": "listLocations", + "arguments": {}, + "source": { + "id": "2", + }, + }, + { + "typeName": "Query", + "info": { + "fieldName": "listLocations", + "parentTypeName": "Post", + }, + "fieldName": "listLocations", + "arguments": {}, + "source": { + "id": [3, 4], + }, + }, + ] + + app = AppSyncResolver() + + # WHEN the async batch resolver for the 'listLocations' field is defined + @app.async_batch_resolver(field_name="listLocations", aggregate=False) + async def create_something(event: AppSyncResolverEvent) -> Optional[list]: + return event.source["id"] if event.source else None + + # THEN the resolver should correctly process the batch of queries asynchronously + result = app.resolve(event, LambdaContext()) + assert result == [appsync_event["source"]["id"] for appsync_event in event] + + assert app.current_batch_event and len(app.current_batch_event) == len(event) + + +def test_resolve_async_batch_and_sync_singular_processing(): + # GIVEN a router with an async batch resolver for 'listLocations' and a sync singular resolver for 'listLocation' + app = AppSyncResolver() + router = Router() + + @router.async_batch_resolver(type_name="Query", field_name="listLocations", aggregate=False) + async def get_locations(event: AppSyncResolverEvent, name: str) -> str: + return f"get_locations#{name}#" + event.source["id"] + + @app.resolver(type_name="Query", field_name="listLocation") + def get_location(name: str) -> str: + return f"get_location#{name}" + + app.include_router(router) + + # WHEN resolving a batch of events for async 'listLocations' and a singular event for 'listLocation' + mock_event1 = [ + { + "typeName": "Query", + "info": { + "fieldName": "listLocations", + "parentTypeName": "Query", + }, + "fieldName": "listLocations", + "arguments": {"name": "value"}, + "source": { + "id": "1", + }, + }, + ] + mock_event2 = {"typeName": "Query", "fieldName": "listLocation", "arguments": {"name": "value"}} + + result1 = app.resolve(mock_event1, LambdaContext()) + result2 = app.resolve(mock_event2, LambdaContext()) + + # THEN the resolvers should return the expected results + assert result1 == ["get_locations#value#1"] + assert result2 == "get_location#value" + + +def test_async_resolver_include_batch_resolver(): + # GIVEN an AppSyncResolver instance and a Router + app = AppSyncResolver() + router = Router() + + @router.async_batch_resolver(type_name="Query", field_name="listLocations", aggregate=False) + async def get_locations(event: AppSyncResolverEvent, name: str) -> str: + return f"get_locations#{name}#" + event.source["id"] + + @app.async_batch_resolver(field_name="listLocations2", aggregate=False) + async def get_locations2(event: AppSyncResolverEvent, name: str) -> str: + return f"get_locations2#{name}#" + event.source["id"] + + app.include_router(router) + + # WHEN two different events needs to be resolved + mock_event1 = [ + { + "typeName": "Query", + "info": { + "fieldName": "listLocations", + "parentTypeName": "Query", + }, + "fieldName": "listLocations", + "arguments": {"name": "value"}, + "source": { + "id": "1", + }, + }, + ] + mock_event2 = [ + { + "typeName": "Query", + "info": { + "fieldName": "listLocations2", + "parentTypeName": "Post", + }, + "fieldName": "listLocations2", + "arguments": {"name": "value"}, + "source": { + "id": "2", + }, + }, + ] + + # WHEN Resolve the events using the AppSyncResolver + result1 = app.resolve(mock_event1, LambdaContext()) + result2 = app.resolve(mock_event2, LambdaContext()) + + # THEN Verify that the results match the expected values + assert result1 == ["get_locations#value#1"] + assert result2 == ["get_locations2#value#2"] + + +def test_resolve_batch_processing_with_simple_queries_with_aggregate(): + # GIVEN a list of events representing GraphQL queries for listing locations + event = [ + { + "typeName": "Query", + "info": { + "fieldName": "listLocations", + "parentTypeName": "Post", + }, + "fieldName": "listLocations", + "arguments": {}, + "source": { + "id": "1", + }, + }, + { + "typeName": "Query", + "info": { + "fieldName": "listLocations", + "parentTypeName": "Post", + }, + "fieldName": "listLocations", + "arguments": {}, + "source": { + "id": "2", + }, + }, + { + "typeName": "Query", + "info": { + "fieldName": "listLocations", + "parentTypeName": "Post", + }, + "fieldName": "listLocations", + "arguments": {}, + "source": { + "id": [3, 4], + }, + }, + ] + + app = AppSyncResolver() + + # WHEN the sync batch resolver for the listLocations field is defined + # WHEN using an aggregated event + # WHEN function returns a List + @app.batch_resolver(field_name="listLocations") + def create_something(event: List[AppSyncResolverEvent]) -> List: # noqa AA03 VNE003 + results = [] + for record in event: + results.append(record.source.get("id") if record.source else None) + + return results + + # THEN the resolver should correctly process the batch of queries + result = app.resolve(event, LambdaContext()) + assert result == [appsync_event["source"]["id"] for appsync_event in event] + + assert app.current_batch_event and len(app.current_batch_event) == len(event) + assert not app.current_event + + +def test_resolve_async_batch_processing_with_simple_queries_with_aggregate(): + # GIVEN a list of events representing GraphQL queries for listing locations + event = [ + { + "typeName": "Query", + "info": { + "fieldName": "listLocations", + "parentTypeName": "Post", + }, + "fieldName": "listLocations", + "arguments": {}, + "source": { + "id": "1", + }, + }, + { + "typeName": "Query", + "info": { + "fieldName": "listLocations", + "parentTypeName": "Post", + }, + "fieldName": "listLocations", + "arguments": {}, + "source": { + "id": "2", + }, + }, + { + "typeName": "Query", + "info": { + "fieldName": "listLocations", + "parentTypeName": "Post", + }, + "fieldName": "listLocations", + "arguments": {}, + "source": { + "id": [3, 4], + }, + }, + ] + + app = AppSyncResolver() + + # WHEN the async batch resolver for the listLocations field is defined + # WHEN using an aggregated event + # WHEN function returns a List + @app.async_batch_resolver(field_name="listLocations") + async def create_something(event: List[AppSyncResolverEvent]) -> List: # noqa AA03 VNE003 + results = [] + for record in event: + results.append(record.source.get("id") if record.source else None) + + return results + + # THEN the resolver should correctly process the batch of queries + result = app.resolve(event, LambdaContext()) + assert result == [appsync_event["source"]["id"] for appsync_event in event] + + assert app.current_batch_event and len(app.current_batch_event) == len(event) + assert not app.current_event + + +def test_resolve_batch_processing_with_aggregate_and_returning_a_non_list(): + # GIVEN a list of events representing GraphQL queries for listing locations + event = [ + { + "typeName": "Query", + "info": { + "fieldName": "listLocations", + "parentTypeName": "Post", + }, + "fieldName": "listLocations", + "arguments": {}, + "source": { + "id": "1", + }, + }, + ] + + app = AppSyncResolver() + + # WHEN the sync batch resolver for the listLocations field is defined + # WHEN using an aggregated event + # WHEN function return something different than a List + @app.batch_resolver(field_name="listLocations") + def create_something(event: List[AppSyncResolverEvent]) -> Optional[List]: # noqa AA03 VNE003 + return event[0].source.get("id") if event[0].source else None + + # THEN the resolver should raise a InvalidBatchResponse when processing the batch of queries + with pytest.raises(InvalidBatchResponse): + app.resolve(event, LambdaContext()) + + +def test_resolve_async_batch_processing_with_aggregate_and_returning_a_non_list(): + # GIVEN a list of events representing GraphQL queries for listing locations + event = [ + { + "typeName": "Query", + "info": { + "fieldName": "listLocations", + "parentTypeName": "Post", + }, + "fieldName": "listLocations", + "arguments": {}, + "source": { + "id": "1", + }, + }, + ] + + app = AppSyncResolver() + + # WHEN the async batch resolver for the listLocations field is defined + # WHEN using an aggregated event + # WHEN function return something different than a List + @app.async_batch_resolver(field_name="listLocations") + async def create_something(event: List[AppSyncResolverEvent]) -> Optional[List]: # noqa AA03 VNE003 + return event[0].source.get("id") if event[0].source else None + + # THEN the resolver should raise a InvalidBatchResponse when processing the batch of queries + with pytest.raises(InvalidBatchResponse): + app.resolve(event, LambdaContext()) + + +def test_resolve_sync_batch_processing_with_aggregate_and_without_return(): + # GIVEN a list of events representing GraphQL queries for listing locations + event = [ + { + "typeName": "Query", + "info": { + "fieldName": "listLocations", + "parentTypeName": "Post", + }, + "fieldName": "listLocations", + "arguments": {}, + "source": { + "id": "1", + }, + }, + ] + + app = AppSyncResolver() + + # WHEN the sync batch resolver for the listLocations field is defined + # WHEN using an aggregated event + # WHEN function there is no return statement + @app.batch_resolver(field_name="listLocations") + def create_something(event: List[AppSyncResolverEvent]) -> Optional[List]: # noqa AA03 VNE003 + def do_something_with_post_id(post_id): ... + + post_id = event[0].source.get("id") if event[0].source else None + do_something_with_post_id(post_id) + + # No Return statement + + # THEN the resolver should raise a InvalidBatchResponse when processing the batch of queries + with pytest.raises(InvalidBatchResponse): + app.resolve(event, LambdaContext()) + + +def test_resolve_async_batch_processing_with_aggregate_and_without_return(): + # GIVEN a list of events representing GraphQL queries for listing locations + event = [ + { + "typeName": "Query", + "info": { + "fieldName": "listLocations", + "parentTypeName": "Post", + }, + "fieldName": "listLocations", + "arguments": {}, + "source": { + "id": "1", + }, + }, + ] + + app = AppSyncResolver() + + # WHEN the async batch resolver for the listLocations field is defined + # WHEN using an aggregated event + # WHEN function there is no return statement + @app.async_batch_resolver(field_name="listLocations") + async def create_something(event: List[AppSyncResolverEvent]) -> Optional[List]: # noqa AA03 VNE003 + def do_something_with_post_id(post_id): ... + + post_id = event[0].source.get("id") if event[0].source else None + do_something_with_post_id(post_id) + + # No Return statement + + # THEN the resolver should raise a InvalidBatchResponse when processing the batch of queries + with pytest.raises(InvalidBatchResponse): + app.resolve(event, LambdaContext()) diff --git a/tests/functional/event_handler/required_dependencies/test_appsync.py b/tests/functional/event_handler/required_dependencies/appsync/test_appsync_single_resolvers.py similarity index 97% rename from tests/functional/event_handler/required_dependencies/test_appsync.py rename to tests/functional/event_handler/required_dependencies/appsync/test_appsync_single_resolvers.py index 5699e560065..1ace266e731 100644 --- a/tests/functional/event_handler/required_dependencies/test_appsync.py +++ b/tests/functional/event_handler/required_dependencies/appsync/test_appsync_single_resolvers.py @@ -3,7 +3,7 @@ import pytest from aws_lambda_powertools.event_handler import AppSyncResolver -from aws_lambda_powertools.event_handler.appsync import Router +from aws_lambda_powertools.event_handler.graphql_appsync.router import Router from aws_lambda_powertools.utilities.data_classes import AppSyncResolverEvent from aws_lambda_powertools.utilities.typing import LambdaContext from tests.functional.utils import load_event @@ -169,11 +169,11 @@ def test_resolver_include_resolver(): @router.resolver(type_name="Query", field_name="listLocations") def get_locations(name: str): - return "get_locations#" + name + return f"get_locations#{name}" @app.resolver(field_name="listLocations2") def get_locations2(name: str): - return "get_locations2#" + name + return f"get_locations2#{name}" app.include_router(router) @@ -225,7 +225,7 @@ def test_router_has_access_to_app_context(): @router.resolver(type_name="Query", field_name="listLocations") def get_locations(name: str): - if router.context["is_admin"]: + if router.context.get("is_admin"): return f"get_locations#{name}" app.include_router(router) diff --git a/tests/unit/data_classes/required_dependencies/test_appsync_resolver_event.py b/tests/unit/data_classes/required_dependencies/test_appsync_resolver_event.py index a1a010c251a..ace6032e131 100644 --- a/tests/unit/data_classes/required_dependencies/test_appsync_resolver_event.py +++ b/tests/unit/data_classes/required_dependencies/test_appsync_resolver_event.py @@ -80,7 +80,7 @@ def test_appsync_resolver_direct(): raw_event = load_event("appSyncDirectResolver.json") parsed_event = AppSyncResolverEvent(raw_event) - assert parsed_event.source is None + assert parsed_event.source == {} assert parsed_event.arguments.get("id") == raw_event["arguments"]["id"] assert parsed_event.stash == {} assert parsed_event.prev_result is None @@ -112,7 +112,7 @@ def test_appsync_resolver_event_info(): event = AppSyncResolverEvent(event) - assert event.source is None + assert event.source == {} assert event.identity is None assert event.info is not None assert isinstance(event.info, AppSyncResolverEventInfo)