diff --git a/pkg/enqueue-bundle/DependencyInjection/EnqueueExtension.php b/pkg/enqueue-bundle/DependencyInjection/EnqueueExtension.php index 8e0fdc414..bcb2f7018 100644 --- a/pkg/enqueue-bundle/DependencyInjection/EnqueueExtension.php +++ b/pkg/enqueue-bundle/DependencyInjection/EnqueueExtension.php @@ -36,6 +36,8 @@ public function load(array $configs, ContainerBuilder $container): void $container->setParameter('enqueue.transports', array_keys($config['transport'])); if (isset($config['client'])) { + $container->setParameter('enqueue.clients', ['default']); + $this->setupAutowiringForProcessors($container); $loader->load('client.yml'); diff --git a/pkg/enqueue-bundle/EnqueueBundle.php b/pkg/enqueue-bundle/EnqueueBundle.php index a01ae5a1b..6db17e15d 100644 --- a/pkg/enqueue-bundle/EnqueueBundle.php +++ b/pkg/enqueue-bundle/EnqueueBundle.php @@ -27,13 +27,13 @@ public function build(ContainerBuilder $container): void $container->addCompilerPass(new BuildProcessorRegistryPass()); //client passes - $container->addCompilerPass(new BuildClientConsumptionExtensionsPass('default')); - $container->addCompilerPass(new BuildClientExtensionsPass('default')); - $container->addCompilerPass(new BuildClientTopicSubscriberRoutesPass('default'), PassConfig::TYPE_BEFORE_OPTIMIZATION, 100); - $container->addCompilerPass(new BuildClientCommandSubscriberRoutesPass('default'), PassConfig::TYPE_BEFORE_OPTIMIZATION, 100); - $container->addCompilerPass(new BuildClientProcessorRoutesPass('default'), PassConfig::TYPE_BEFORE_OPTIMIZATION, 100); - $container->addCompilerPass(new AnalyzeRouteCollectionPass('default'), PassConfig::TYPE_BEFORE_OPTIMIZATION, 30); - $container->addCompilerPass(new BuildClientProcessorRegistryPass('default')); + $container->addCompilerPass(new BuildClientConsumptionExtensionsPass()); + $container->addCompilerPass(new BuildClientExtensionsPass()); + $container->addCompilerPass(new BuildClientTopicSubscriberRoutesPass(), PassConfig::TYPE_BEFORE_OPTIMIZATION, 100); + $container->addCompilerPass(new BuildClientCommandSubscriberRoutesPass(), PassConfig::TYPE_BEFORE_OPTIMIZATION, 100); + $container->addCompilerPass(new BuildClientProcessorRoutesPass(), PassConfig::TYPE_BEFORE_OPTIMIZATION, 100); + $container->addCompilerPass(new AnalyzeRouteCollectionPass(), PassConfig::TYPE_BEFORE_OPTIMIZATION, 30); + $container->addCompilerPass(new BuildClientProcessorRegistryPass()); if (class_exists(AsyncEventDispatcherExtension::class)) { $container->addCompilerPass(new AsyncEventsPass(), PassConfig::TYPE_BEFORE_OPTIMIZATION, 100); diff --git a/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/EnqueueExtensionTest.php b/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/EnqueueExtensionTest.php index d1a483067..fda7924b4 100644 --- a/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/EnqueueExtensionTest.php +++ b/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/EnqueueExtensionTest.php @@ -461,6 +461,24 @@ public function testShouldSetPropertyWithAllConfiguredTransports() $this->assertEquals(['default', 'foo', 'bar'], $container->getParameter('enqueue.transports')); } + public function testShouldSetPropertyWithAllConfiguredClients() + { + $container = $this->getContainerBuilder(true); + + $extension = new EnqueueExtension(); + $extension->load([[ + 'client' => [], + 'transport' => [ + 'default' => ['dsn' => 'default:'], + 'foo' => ['dsn' => 'foo:'], + 'bar' => ['dsn' => 'foo:'], + ], + ]], $container); + + $this->assertTrue($container->hasParameter('enqueue.clients')); + $this->assertEquals(['default'], $container->getParameter('enqueue.clients')); + } + public function testShouldLoadProcessAutoconfigureChildDefinition() { $container = $this->getContainerBuilder(true); diff --git a/pkg/enqueue/Symfony/Client/DependencyInjection/AnalyzeRouteCollectionPass.php b/pkg/enqueue/Symfony/Client/DependencyInjection/AnalyzeRouteCollectionPass.php index c8b140cfd..234c7b48b 100644 --- a/pkg/enqueue/Symfony/Client/DependencyInjection/AnalyzeRouteCollectionPass.php +++ b/pkg/enqueue/Symfony/Client/DependencyInjection/AnalyzeRouteCollectionPass.php @@ -8,31 +8,35 @@ final class AnalyzeRouteCollectionPass implements CompilerPassInterface { - /** - * @var string - */ - private $name; + use FormatClientNameTrait; - public function __construct(string $clientName) - { - if (empty($clientName)) { - throw new \InvalidArgumentException('The name could not be empty.'); - } - - $this->name = $clientName; - } + protected $name; public function process(ContainerBuilder $container): void { - $routeCollectionId = sprintf('enqueue.client.%s.route_collection', $this->name); - if (false == $container->hasDefinition($routeCollectionId)) { - return; + if (false == $container->hasParameter('enqueue.clients')) { + throw new \LogicException('The "enqueue.clients" parameter must be set.'); } - $collection = RouteCollection::fromArray($container->getDefinition($routeCollectionId)->getArgument(0)); + $names = $container->getParameter('enqueue.clients'); + + foreach ($names as $name) { + $this->name = $name; + $routeCollectionId = $this->format('route_collection'); + if (false == $container->hasDefinition($routeCollectionId)) { + throw new \LogicException(sprintf('Service "%s" not found', $routeCollectionId)); + } - $this->exclusiveCommandsCouldNotBeRunOnDefaultQueue($collection); - $this->exclusiveCommandProcessorMustBeSingleOnGivenQueue($collection); + $collection = RouteCollection::fromArray($container->getDefinition($routeCollectionId)->getArgument(0)); + + $this->exclusiveCommandsCouldNotBeRunOnDefaultQueue($collection); + $this->exclusiveCommandProcessorMustBeSingleOnGivenQueue($collection); + } + } + + protected function getName(): string + { + return $this->name; } private function exclusiveCommandsCouldNotBeRunOnDefaultQueue(RouteCollection $collection) diff --git a/pkg/enqueue/Symfony/Client/DependencyInjection/BuildClientExtensionsPass.php b/pkg/enqueue/Symfony/Client/DependencyInjection/BuildClientExtensionsPass.php index 5cb516db1..3f97093d8 100644 --- a/pkg/enqueue/Symfony/Client/DependencyInjection/BuildClientExtensionsPass.php +++ b/pkg/enqueue/Symfony/Client/DependencyInjection/BuildClientExtensionsPass.php @@ -8,58 +8,62 @@ final class BuildClientExtensionsPass implements CompilerPassInterface { - /** - * @var string - */ - private $name; + use FormatClientNameTrait; - public function __construct(string $clientName) - { - if (empty($clientName)) { - throw new \InvalidArgumentException('The name could not be empty.'); - } - - $this->name = $clientName; - } + protected $name; public function process(ContainerBuilder $container): void { - $extensionsId = sprintf('enqueue.client.%s.client_extensions', $this->name); - if (false == $container->hasDefinition($extensionsId)) { - return; + if (false == $container->hasParameter('enqueue.clients')) { + throw new \LogicException('The "enqueue.clients" parameter must be set.'); } - $tags = array_merge( - $container->findTaggedServiceIds('enqueue.client_extension'), - $container->findTaggedServiceIds('enqueue.client.extension') // TODO BC - ); + $names = $container->getParameter('enqueue.clients'); - $groupByPriority = []; - foreach ($tags as $serviceId => $tagAttributes) { - foreach ($tagAttributes as $tagAttribute) { - $client = $tagAttribute['client'] ?? 'default'; + foreach ($names as $name) { + $this->name = $name; + $extensionsId = $this->format('client_extensions'); + if (false == $container->hasDefinition($extensionsId)) { + throw new \LogicException(sprintf('Service "%s" not found', $extensionsId)); + } - if ($client !== $this->name && 'all' !== $client) { - continue; - } + $tags = array_merge( + $container->findTaggedServiceIds('enqueue.client_extension'), + $container->findTaggedServiceIds('enqueue.client.extension') // TODO BC + ); + + $groupByPriority = []; + foreach ($tags as $serviceId => $tagAttributes) { + foreach ($tagAttributes as $tagAttribute) { + $client = $tagAttribute['client'] ?? 'default'; - $priority = (int) ($tagAttribute['priority'] ?? 0); + if ($client !== $this->name && 'all' !== $client) { + continue; + } - $groupByPriority[$priority][] = new Reference($serviceId); + $priority = (int) ($tagAttribute['priority'] ?? 0); + + $groupByPriority[$priority][] = new Reference($serviceId); + } } - } - krsort($groupByPriority, SORT_NUMERIC); + krsort($groupByPriority, SORT_NUMERIC); - $flatExtensions = []; - foreach ($groupByPriority as $extension) { - $flatExtensions = array_merge($flatExtensions, $extension); + $flatExtensions = []; + foreach ($groupByPriority as $extension) { + $flatExtensions = array_merge($flatExtensions, $extension); + } + + $extensionsService = $container->getDefinition($extensionsId); + $extensionsService->replaceArgument(0, array_merge( + $extensionsService->getArgument(0), + $flatExtensions + )); } + } - $extensionsService = $container->getDefinition($extensionsId); - $extensionsService->replaceArgument(0, array_merge( - $extensionsService->getArgument(0), - $flatExtensions - )); + protected function getName(): string + { + return $this->name; } } diff --git a/pkg/enqueue/Symfony/Client/DependencyInjection/BuildCommandSubscriberRoutesPass.php b/pkg/enqueue/Symfony/Client/DependencyInjection/BuildCommandSubscriberRoutesPass.php index 3e8791a47..6cc9934b0 100644 --- a/pkg/enqueue/Symfony/Client/DependencyInjection/BuildCommandSubscriberRoutesPass.php +++ b/pkg/enqueue/Symfony/Client/DependencyInjection/BuildCommandSubscriberRoutesPass.php @@ -10,98 +10,102 @@ final class BuildCommandSubscriberRoutesPass implements CompilerPassInterface { - /** - * @var string - */ - private $name; + use FormatClientNameTrait; - public function __construct(string $clientName) - { - if (empty($clientName)) { - throw new \InvalidArgumentException('The name could not be empty.'); - } - - $this->name = $clientName; - } + protected $name; public function process(ContainerBuilder $container): void { - $routeCollectionId = sprintf('enqueue.client.%s.route_collection', $this->name); - if (false == $container->hasDefinition($routeCollectionId)) { - return; + if (false == $container->hasParameter('enqueue.clients')) { + throw new \LogicException('The "enqueue.clients" parameter must be set.'); } - $tag = 'enqueue.command_subscriber'; - $routeCollection = new RouteCollection([]); - foreach ($container->findTaggedServiceIds($tag) as $serviceId => $tagAttributes) { - $processorDefinition = $container->getDefinition($serviceId); - if ($processorDefinition->getFactory()) { - throw new \LogicException('The command subscriber tag could not be applied to a service created by factory.'); - } + $names = $container->getParameter('enqueue.clients'); - $processorClass = $processorDefinition->getClass(); - if (false == class_exists($processorClass)) { - throw new \LogicException(sprintf('The processor class "%s" could not be found.', $processorClass)); + foreach ($names as $name) { + $this->name = $name; + $routeCollectionId = sprintf('enqueue.client.%s.route_collection', $this->name); + if (false == $container->hasDefinition($routeCollectionId)) { + throw new \LogicException(sprintf('Service "%s" not found', $routeCollectionId)); } - if (false == is_subclass_of($processorClass, CommandSubscriberInterface::class)) { - throw new \LogicException(sprintf('The processor must implement "%s" interface to be used with the tag "%s"', CommandSubscriberInterface::class, $tag)); - } + $tag = 'enqueue.command_subscriber'; + $routeCollection = new RouteCollection([]); + foreach ($container->findTaggedServiceIds($tag) as $serviceId => $tagAttributes) { + $processorDefinition = $container->getDefinition($serviceId); + if ($processorDefinition->getFactory()) { + throw new \LogicException('The command subscriber tag could not be applied to a service created by factory.'); + } - foreach ($tagAttributes as $tagAttribute) { - $client = $tagAttribute['client'] ?? 'default'; + $processorClass = $processorDefinition->getClass(); + if (false == class_exists($processorClass)) { + throw new \LogicException(sprintf('The processor class "%s" could not be found.', $processorClass)); + } - if ($client !== $this->name && 'all' !== $client) { - continue; + if (false == is_subclass_of($processorClass, CommandSubscriberInterface::class)) { + throw new \LogicException(sprintf('The processor must implement "%s" interface to be used with the tag "%s"', CommandSubscriberInterface::class, $tag)); } - /** @var CommandSubscriberInterface $processorClass */ - $commands = $processorClass::getSubscribedCommand(); + foreach ($tagAttributes as $tagAttribute) { + $client = $tagAttribute['client'] ?? 'default'; - if (empty($commands)) { - throw new \LogicException('Command subscriber must return something.'); - } + if ($client !== $this->name && 'all' !== $client) { + continue; + } - if (is_string($commands)) { - $commands = [$commands]; - } + /** @var CommandSubscriberInterface $processorClass */ + $commands = $processorClass::getSubscribedCommand(); - if (!is_array($commands)) { - throw new \LogicException('Command subscriber configuration is invalid. Should be an array or string.'); - } + if (empty($commands)) { + throw new \LogicException('Command subscriber must return something.'); + } - if (isset($commands['command'])) { - $commands = [$commands]; - } + if (is_string($commands)) { + $commands = [$commands]; + } + + if (!is_array($commands)) { + throw new \LogicException('Command subscriber configuration is invalid. Should be an array or string.'); + } + + if (isset($commands['command'])) { + $commands = [$commands]; + } - foreach ($commands as $key => $params) { - if (is_string($params)) { - $routeCollection->add(new Route($params, Route::COMMAND, $serviceId, ['processor_service_id' => $serviceId])); - } elseif (is_array($params)) { - $source = $params['command'] ?? null; - $processor = $params['processor'] ?? $serviceId; - unset($params['command'], $params['source'], $params['source_type'], $params['processor'], $params['options']); - $options = $params; - $options['processor_service_id'] = $serviceId; - - $routeCollection->add(new Route($source, Route::COMMAND, $processor, $options)); - } else { - throw new \LogicException(sprintf( - 'Command subscriber configuration is invalid for "%s::getSubscribedCommand()". "%s"', - $processorClass, - json_encode($processorClass::getSubscribedCommand()) - )); + foreach ($commands as $key => $params) { + if (is_string($params)) { + $routeCollection->add(new Route($params, Route::COMMAND, $serviceId, ['processor_service_id' => $serviceId])); + } elseif (is_array($params)) { + $source = $params['command'] ?? null; + $processor = $params['processor'] ?? $serviceId; + unset($params['command'], $params['source'], $params['source_type'], $params['processor'], $params['options']); + $options = $params; + $options['processor_service_id'] = $serviceId; + + $routeCollection->add(new Route($source, Route::COMMAND, $processor, $options)); + } else { + throw new \LogicException(sprintf( + 'Command subscriber configuration is invalid for "%s::getSubscribedCommand()". "%s"', + $processorClass, + json_encode($processorClass::getSubscribedCommand()) + )); + } } } } - } - $rawRoutes = $routeCollection->toArray(); + $rawRoutes = $routeCollection->toArray(); + + $routeCollectionService = $container->getDefinition($routeCollectionId); + $routeCollectionService->replaceArgument(0, array_merge( + $routeCollectionService->getArgument(0), + $rawRoutes + )); + } + } - $routeCollectionService = $container->getDefinition($routeCollectionId); - $routeCollectionService->replaceArgument(0, array_merge( - $routeCollectionService->getArgument(0), - $rawRoutes - )); + protected function getName(): string + { + return $this->name; } } diff --git a/pkg/enqueue/Symfony/Client/DependencyInjection/BuildConsumptionExtensionsPass.php b/pkg/enqueue/Symfony/Client/DependencyInjection/BuildConsumptionExtensionsPass.php index 8482225f2..d05f6f09e 100644 --- a/pkg/enqueue/Symfony/Client/DependencyInjection/BuildConsumptionExtensionsPass.php +++ b/pkg/enqueue/Symfony/Client/DependencyInjection/BuildConsumptionExtensionsPass.php @@ -8,58 +8,63 @@ final class BuildConsumptionExtensionsPass implements CompilerPassInterface { - /** - * @var string - */ - private $name; + use FormatClientNameTrait; - public function __construct(string $clientName) - { - if (empty($clientName)) { - throw new \InvalidArgumentException('The name could not be empty.'); - } - - $this->name = $clientName; - } + protected $name; public function process(ContainerBuilder $container): void { - $extensionsId = sprintf('enqueue.client.%s.consumption_extensions', $this->name); - if (false == $container->hasDefinition($extensionsId)) { - return; + if (false == $container->hasParameter('enqueue.clients')) { + throw new \LogicException('The "enqueue.clients" parameter must be set.'); } - $tags = array_merge( - $container->findTaggedServiceIds('enqueue.consumption_extension'), - $container->findTaggedServiceIds('enqueue.consumption.extension') // TODO BC - ); + $names = $container->getParameter('enqueue.clients'); - $groupByPriority = []; - foreach ($tags as $serviceId => $tagAttributes) { - foreach ($tagAttributes as $tagAttribute) { - $client = $tagAttribute['client'] ?? 'default'; + foreach ($names as $name) { + $this->name = $name; - if ($client !== $this->name && 'all' !== $client) { - continue; - } + $extensionsId = $this->format('consumption_extensions'); + if (false == $container->hasDefinition($extensionsId)) { + throw new \LogicException(sprintf('Service "%s" not found', $extensionsId)); + } + + $tags = array_merge( + $container->findTaggedServiceIds('enqueue.consumption_extension'), + $container->findTaggedServiceIds('enqueue.consumption.extension') // TODO BC + ); + + $groupByPriority = []; + foreach ($tags as $serviceId => $tagAttributes) { + foreach ($tagAttributes as $tagAttribute) { + $client = $tagAttribute['client'] ?? 'default'; - $priority = (int) ($tagAttribute['priority'] ?? 0); + if ($client !== $this->name && 'all' !== $client) { + continue; + } - $groupByPriority[$priority][] = new Reference($serviceId); + $priority = (int) ($tagAttribute['priority'] ?? 0); + + $groupByPriority[$priority][] = new Reference($serviceId); + } } - } - krsort($groupByPriority, SORT_NUMERIC); + krsort($groupByPriority, SORT_NUMERIC); - $flatExtensions = []; - foreach ($groupByPriority as $extension) { - $flatExtensions = array_merge($flatExtensions, $extension); + $flatExtensions = []; + foreach ($groupByPriority as $extension) { + $flatExtensions = array_merge($flatExtensions, $extension); + } + + $extensionsService = $container->getDefinition($extensionsId); + $extensionsService->replaceArgument(0, array_merge( + $extensionsService->getArgument(0), + $flatExtensions + )); } + } - $extensionsService = $container->getDefinition($extensionsId); - $extensionsService->replaceArgument(0, array_merge( - $extensionsService->getArgument(0), - $flatExtensions - )); + protected function getName(): string + { + return $this->name; } } diff --git a/pkg/enqueue/Symfony/Client/DependencyInjection/BuildProcessorRegistryPass.php b/pkg/enqueue/Symfony/Client/DependencyInjection/BuildProcessorRegistryPass.php index d69935d64..cdf40a832 100644 --- a/pkg/enqueue/Symfony/Client/DependencyInjection/BuildProcessorRegistryPass.php +++ b/pkg/enqueue/Symfony/Client/DependencyInjection/BuildProcessorRegistryPass.php @@ -10,51 +10,56 @@ final class BuildProcessorRegistryPass implements CompilerPassInterface { - /** - * @var string - */ - private $name; + use FormatClientNameTrait; - public function __construct(string $clientName) - { - if (empty($clientName)) { - throw new \InvalidArgumentException('The name could not be empty.'); - } - - $this->name = $clientName; - } + protected $name; public function process(ContainerBuilder $container): void { - $processorRegistryId = sprintf('enqueue.client.%s.processor_registry', $this->name); - if (false == $container->hasDefinition($processorRegistryId)) { - return; + if (false == $container->hasParameter('enqueue.clients')) { + throw new \LogicException('The "enqueue.clients" parameter must be set.'); } - $routeCollectionId = sprintf('enqueue.client.%s.route_collection', $this->name); - if (false == $container->hasDefinition($routeCollectionId)) { - return; - } + $names = $container->getParameter('enqueue.clients'); - $routerProcessorId = sprintf('enqueue.client.%s.router_processor', $this->name); - if (false == $container->hasDefinition($routerProcessorId)) { - return; - } + foreach ($names as $name) { + $this->name = $name; - $routeCollection = RouteCollection::fromArray($container->getDefinition($routeCollectionId)->getArgument(0)); + $processorRegistryId = $this->format('processor_registry'); + if (false == $container->hasDefinition($processorRegistryId)) { + throw new \LogicException(sprintf('Service "%s" not found', $processorRegistryId)); + } - $map = []; - foreach ($routeCollection->all() as $route) { - if (false == $processorServiceId = $route->getOption('processor_service_id')) { - throw new \LogicException('The route option "processor_service_id" is required'); + $routeCollectionId = $this->format('route_collection'); + if (false == $container->hasDefinition($routeCollectionId)) { + throw new \LogicException(sprintf('Service "%s" not found', $routeCollectionId)); } - $map[$route->getProcessor()] = new Reference($processorServiceId); - } + $routerProcessorId = $this->format('router_processor'); + if (false == $container->hasDefinition($routerProcessorId)) { + throw new \LogicException(sprintf('Service "%s" not found', $routerProcessorId)); + } - $map["%enqueue.client.{$this->name}.router_processor%"] = new Reference($routerProcessorId); + $routeCollection = RouteCollection::fromArray($container->getDefinition($routeCollectionId)->getArgument(0)); - $registry = $container->getDefinition($processorRegistryId); - $registry->setArgument(0, ServiceLocatorTagPass::register($container, $map, $processorRegistryId)); + $map = []; + foreach ($routeCollection->all() as $route) { + if (false == $processorServiceId = $route->getOption('processor_service_id')) { + throw new \LogicException('The route option "processor_service_id" is required'); + } + + $map[$route->getProcessor()] = new Reference($processorServiceId); + } + + $map[$this->parameter('router_processor')] = new Reference($routerProcessorId); + + $registry = $container->getDefinition($processorRegistryId); + $registry->setArgument(0, ServiceLocatorTagPass::register($container, $map, $processorRegistryId)); + } + } + + private function getName(): string + { + return $this->name; } } diff --git a/pkg/enqueue/Symfony/Client/DependencyInjection/BuildProcessorRoutesPass.php b/pkg/enqueue/Symfony/Client/DependencyInjection/BuildProcessorRoutesPass.php index 415b87a92..c89f37427 100644 --- a/pkg/enqueue/Symfony/Client/DependencyInjection/BuildProcessorRoutesPass.php +++ b/pkg/enqueue/Symfony/Client/DependencyInjection/BuildProcessorRoutesPass.php @@ -9,72 +9,76 @@ final class BuildProcessorRoutesPass implements CompilerPassInterface { - /** - * @var string - */ - private $name; + use FormatClientNameTrait; - public function __construct(string $clientName) - { - if (empty($clientName)) { - throw new \InvalidArgumentException('The name could not be empty.'); - } - - $this->name = $clientName; - } + protected $name; public function process(ContainerBuilder $container): void { - $routeCollectionId = sprintf('enqueue.client.%s.route_collection', $this->name); - if (false == $container->hasDefinition($routeCollectionId)) { - return; + if (false == $container->hasParameter('enqueue.clients')) { + throw new \LogicException('The "enqueue.clients" parameter must be set.'); } - $tag = 'enqueue.processor'; - $routeCollection = new RouteCollection([]); - foreach ($container->findTaggedServiceIds($tag) as $serviceId => $tagAttributes) { - foreach ($tagAttributes as $tagAttribute) { - $client = $tagAttribute['client'] ?? 'default'; + $names = $container->getParameter('enqueue.clients'); - if ($client !== $this->name && 'all' !== $client) { - continue; - } + foreach ($names as $name) { + $this->name = $name; + $routeCollectionId = $this->format('route_collection'); + if (false == $container->hasDefinition($routeCollectionId)) { + throw new \LogicException(sprintf('Service "%s" not found', $routeCollectionId)); + } - $topic = $tagAttribute['topic'] ?? null; - $command = $tagAttribute['command'] ?? null; + $tag = 'enqueue.processor'; + $routeCollection = new RouteCollection([]); + foreach ($container->findTaggedServiceIds($tag) as $serviceId => $tagAttributes) { + foreach ($tagAttributes as $tagAttribute) { + $client = $tagAttribute['client'] ?? 'default'; - if (false == $topic && false == $command) { - throw new \LogicException(sprintf('Either "topic" or "command" tag attribute must be set on service "%s". None is set.', $serviceId)); - } - if ($topic && $command) { - throw new \LogicException(sprintf('Either "topic" or "command" tag attribute must be set on service "%s". Both are set.', $serviceId)); - } + if ($client !== $this->name && 'all' !== $client) { + continue; + } + + $topic = $tagAttribute['topic'] ?? null; + $command = $tagAttribute['command'] ?? null; - $source = $command ?: $topic; - $sourceType = $command ? Route::COMMAND : Route::TOPIC; - $processor = $tagAttribute['processor'] ?? $serviceId; - - unset( - $tagAttribute['topic'], - $tagAttribute['command'], - $tagAttribute['source'], - $tagAttribute['source_type'], - $tagAttribute['processor'], - $tagAttribute['options'] - ); - $options = $tagAttribute; - $options['processor_service_id'] = $serviceId; - - $routeCollection->add(new Route($source, $sourceType, $processor, $options)); + if (false == $topic && false == $command) { + throw new \LogicException(sprintf('Either "topic" or "command" tag attribute must be set on service "%s". None is set.', $serviceId)); + } + if ($topic && $command) { + throw new \LogicException(sprintf('Either "topic" or "command" tag attribute must be set on service "%s". Both are set.', $serviceId)); + } + + $source = $command ?: $topic; + $sourceType = $command ? Route::COMMAND : Route::TOPIC; + $processor = $tagAttribute['processor'] ?? $serviceId; + + unset( + $tagAttribute['topic'], + $tagAttribute['command'], + $tagAttribute['source'], + $tagAttribute['source_type'], + $tagAttribute['processor'], + $tagAttribute['options'] + ); + $options = $tagAttribute; + $options['processor_service_id'] = $serviceId; + + $routeCollection->add(new Route($source, $sourceType, $processor, $options)); + } } - } - $rawRoutes = $routeCollection->toArray(); + $rawRoutes = $routeCollection->toArray(); - $routeCollectionService = $container->getDefinition($routeCollectionId); - $routeCollectionService->replaceArgument(0, array_merge( - $routeCollectionService->getArgument(0), - $rawRoutes - )); + $routeCollectionService = $container->getDefinition($routeCollectionId); + $routeCollectionService->replaceArgument(0, array_merge( + $routeCollectionService->getArgument(0), + $rawRoutes + )); + } + } + + protected function getName(): string + { + return $this->name; } } diff --git a/pkg/enqueue/Symfony/Client/DependencyInjection/BuildTopicSubscriberRoutesPass.php b/pkg/enqueue/Symfony/Client/DependencyInjection/BuildTopicSubscriberRoutesPass.php index 73f3a019c..f1b9ecc1c 100644 --- a/pkg/enqueue/Symfony/Client/DependencyInjection/BuildTopicSubscriberRoutesPass.php +++ b/pkg/enqueue/Symfony/Client/DependencyInjection/BuildTopicSubscriberRoutesPass.php @@ -10,94 +10,98 @@ final class BuildTopicSubscriberRoutesPass implements CompilerPassInterface { - /** - * @var string - */ - private $name; + use FormatClientNameTrait; - public function __construct(string $clientName) - { - if (empty($clientName)) { - throw new \InvalidArgumentException('The name could not be empty.'); - } - - $this->name = $clientName; - } + protected $name; public function process(ContainerBuilder $container): void { - $routeCollectionId = sprintf('enqueue.client.%s.route_collection', $this->name); - if (false == $container->hasDefinition($routeCollectionId)) { - return; + if (false == $container->hasParameter('enqueue.clients')) { + throw new \LogicException('The "enqueue.clients" parameter must be set.'); } - $tag = 'enqueue.topic_subscriber'; - $routeCollection = new RouteCollection([]); - foreach ($container->findTaggedServiceIds($tag) as $serviceId => $tagAttributes) { - $processorDefinition = $container->getDefinition($serviceId); - if ($processorDefinition->getFactory()) { - throw new \LogicException('The topic subscriber tag could not be applied to a service created by factory.'); - } + $names = $container->getParameter('enqueue.clients'); - $processorClass = $processorDefinition->getClass(); - if (false == class_exists($processorClass)) { - throw new \LogicException(sprintf('The processor class "%s" could not be found.', $processorClass)); + foreach ($names as $name) { + $this->name = $name; + $routeCollectionId = $this->format('route_collection'); + if (false == $container->hasDefinition($routeCollectionId)) { + throw new \LogicException(sprintf('Service "%s" not found', $routeCollectionId)); } - if (false == is_subclass_of($processorClass, TopicSubscriberInterface::class)) { - throw new \LogicException(sprintf('The processor must implement "%s" interface to be used with the tag "%s"', TopicSubscriberInterface::class, $tag)); - } + $tag = 'enqueue.topic_subscriber'; + $routeCollection = new RouteCollection([]); + foreach ($container->findTaggedServiceIds($tag) as $serviceId => $tagAttributes) { + $processorDefinition = $container->getDefinition($serviceId); + if ($processorDefinition->getFactory()) { + throw new \LogicException('The topic subscriber tag could not be applied to a service created by factory.'); + } - foreach ($tagAttributes as $tagAttribute) { - $client = $tagAttribute['client'] ?? 'default'; + $processorClass = $processorDefinition->getClass(); + if (false == class_exists($processorClass)) { + throw new \LogicException(sprintf('The processor class "%s" could not be found.', $processorClass)); + } - if ($client !== $this->name && 'all' !== $client) { - continue; + if (false == is_subclass_of($processorClass, TopicSubscriberInterface::class)) { + throw new \LogicException(sprintf('The processor must implement "%s" interface to be used with the tag "%s"', TopicSubscriberInterface::class, $tag)); } - /** @var TopicSubscriberInterface $processorClass */ - $topics = $processorClass::getSubscribedTopics(); + foreach ($tagAttributes as $tagAttribute) { + $client = $tagAttribute['client'] ?? 'default'; - if (empty($topics)) { - throw new \LogicException('Topic subscriber must return something.'); - } + if ($client !== $this->name && 'all' !== $client) { + continue; + } - if (is_string($topics)) { - $topics = [$topics]; - } + /** @var TopicSubscriberInterface $processorClass */ + $topics = $processorClass::getSubscribedTopics(); - if (!is_array($topics)) { - throw new \LogicException('Topic subscriber configuration is invalid. Should be an array or string.'); - } + if (empty($topics)) { + throw new \LogicException('Topic subscriber must return something.'); + } + + if (is_string($topics)) { + $topics = [$topics]; + } + + if (!is_array($topics)) { + throw new \LogicException('Topic subscriber configuration is invalid. Should be an array or string.'); + } - foreach ($topics as $key => $params) { - if (is_string($params)) { - $routeCollection->add(new Route($params, Route::TOPIC, $serviceId, ['processor_service_id' => $serviceId])); - } elseif (is_array($params)) { - $source = $params['topic'] ?? null; - $processor = $params['processor'] ?? $serviceId; - unset($params['topic'], $params['source'], $params['source_type'], $params['processor'], $params['options']); - $options = $params; - $options['processor_service_id'] = $serviceId; - - $routeCollection->add(new Route($source, Route::TOPIC, $processor, $options)); - } else { - throw new \LogicException(sprintf( - 'Topic subscriber configuration is invalid for "%s::getSubscribedTopics()". Got "%s"', - $processorClass, - json_encode($processorClass::getSubscribedTopics()) - )); + foreach ($topics as $key => $params) { + if (is_string($params)) { + $routeCollection->add(new Route($params, Route::TOPIC, $serviceId, ['processor_service_id' => $serviceId])); + } elseif (is_array($params)) { + $source = $params['topic'] ?? null; + $processor = $params['processor'] ?? $serviceId; + unset($params['topic'], $params['source'], $params['source_type'], $params['processor'], $params['options']); + $options = $params; + $options['processor_service_id'] = $serviceId; + + $routeCollection->add(new Route($source, Route::TOPIC, $processor, $options)); + } else { + throw new \LogicException(sprintf( + 'Topic subscriber configuration is invalid for "%s::getSubscribedTopics()". Got "%s"', + $processorClass, + json_encode($processorClass::getSubscribedTopics()) + )); + } } } } - } - $rawRoutes = $routeCollection->toArray(); + $rawRoutes = $routeCollection->toArray(); - $routeCollectionService = $container->getDefinition($routeCollectionId); - $routeCollectionService->replaceArgument(0, array_merge( - $routeCollectionService->getArgument(0), - $rawRoutes - )); + $routeCollectionService = $container->getDefinition($routeCollectionId); + $routeCollectionService->replaceArgument(0, array_merge( + $routeCollectionService->getArgument(0), + $rawRoutes + )); + } + } + + protected function getName(): string + { + return $this->name; } } diff --git a/pkg/enqueue/Symfony/Client/DependencyInjection/FormatClientNameTrait.php b/pkg/enqueue/Symfony/Client/DependencyInjection/FormatClientNameTrait.php index 6f76d2b5b..41dace8bb 100644 --- a/pkg/enqueue/Symfony/Client/DependencyInjection/FormatClientNameTrait.php +++ b/pkg/enqueue/Symfony/Client/DependencyInjection/FormatClientNameTrait.php @@ -16,17 +16,15 @@ private function reference(string $serviceName, $invalidBehavior = ContainerInte private function parameter(string $serviceName): string { - $fullName = $this->format($serviceName, false); + $fullName = $this->format($serviceName); return "%$fullName%"; } - private function format(string $serviceName, $parameter = false): string + private function format(string $serviceName): string { $pattern = 'enqueue.client.%s.'.$serviceName; - $fullName = sprintf($pattern, $this->getName()); - - return $parameter ? "%$fullName%" : $fullName; + return sprintf($pattern, $this->getName()); } } diff --git a/pkg/enqueue/Symfony/DependencyInjection/FormatTransportNameTrait.php b/pkg/enqueue/Symfony/DependencyInjection/FormatTransportNameTrait.php index e5531b579..dd0fdd7bc 100644 --- a/pkg/enqueue/Symfony/DependencyInjection/FormatTransportNameTrait.php +++ b/pkg/enqueue/Symfony/DependencyInjection/FormatTransportNameTrait.php @@ -16,17 +16,15 @@ private function reference(string $serviceName, $invalidBehavior = ContainerInte private function parameter(string $serviceName): string { - $fullName = $this->format($serviceName, false); + $fullName = $this->format($serviceName); return "%$fullName%"; } - private function format(string $serviceName, $parameter = false): string + private function format(string $serviceName): string { $pattern = 'enqueue.transport.%s.'.$serviceName; - $fullName = sprintf($pattern, $this->getName()); - - return $parameter ? "%$fullName%" : $fullName; + return sprintf($pattern, $this->getName()); } } diff --git a/pkg/enqueue/Symfony/DependencyInjection/TransportFactory.php b/pkg/enqueue/Symfony/DependencyInjection/TransportFactory.php index f70b1cb5d..19712ff8b 100644 --- a/pkg/enqueue/Symfony/DependencyInjection/TransportFactory.php +++ b/pkg/enqueue/Symfony/DependencyInjection/TransportFactory.php @@ -182,7 +182,7 @@ public function buildQueueConsumer(ContainerBuilder $container, array $config): ->addArgument(new Reference($this->format('consumption_extensions'))) ->addArgument([]) ->addArgument(new Reference('logger', ContainerInterface::NULL_ON_INVALID_REFERENCE)) - ->addArgument($this->format('receive_timeout', true)) + ->addArgument($this->parameter('receive_timeout')) ; $container->register($this->format('processor_registry'), ContainerProcessorRegistry::class); diff --git a/pkg/enqueue/Tests/Symfony/Client/DependencyInjection/AnalyzeRouteCollectionPassTest.php b/pkg/enqueue/Tests/Symfony/Client/DependencyInjection/AnalyzeRouteCollectionPassTest.php index c2f7386af..9e4dfde81 100644 --- a/pkg/enqueue/Tests/Symfony/Client/DependencyInjection/AnalyzeRouteCollectionPassTest.php +++ b/pkg/enqueue/Tests/Symfony/Client/DependencyInjection/AnalyzeRouteCollectionPassTest.php @@ -23,29 +23,36 @@ public function testShouldBeFinal() $this->assertClassFinal(AnalyzeRouteCollectionPass::class); } - public function testCouldBeConstructedWithName() + public function testCouldBeConstructedWithoutArguments() { - $pass = new AnalyzeRouteCollectionPass('aName'); - - $this->assertAttributeSame('aName', 'name', $pass); + new AnalyzeRouteCollectionPass(); } - public function testThrowIfNameEmptyOnConstruct() + public function testThrowIfEnqueueClientsParameterNotSet() { - $this->expectException(\InvalidArgumentException::class); - $this->expectExceptionMessage('The name could not be empty.'); - new AnalyzeRouteCollectionPass(''); + $pass = new AnalyzeRouteCollectionPass(); + + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('The "enqueue.clients" parameter must be set.'); + $pass->process(new ContainerBuilder()); } - public function testShouldDoNothingIfRouteCollectionServiceIsNotRegistered() + public function testThrowsIfNoRouteCollectionServiceFoundForConfiguredTransport() { - $pass = new AnalyzeRouteCollectionPass('aName'); - $pass->process(new ContainerBuilder()); + $container = new ContainerBuilder(); + $container->setParameter('enqueue.clients', ['foo', 'bar']); + + $pass = new AnalyzeRouteCollectionPass(); + + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('Service "enqueue.client.foo.route_collection" not found'); + $pass->process($container); } public function testThrowIfExclusiveCommandProcessorOnDefaultQueue() { $container = new ContainerBuilder(); + $container->setParameter('enqueue.clients', ['aName']); $container->register('enqueue.client.aName.route_collection')->addArgument([ (new Route( 'aCommand', @@ -57,7 +64,7 @@ public function testThrowIfExclusiveCommandProcessorOnDefaultQueue() $this->expectException(\LogicException::class); $this->expectExceptionMessage('The command "aCommand" processor "aBarProcessor" is exclusive but queue is not specified. Exclusive processors could not be run on a default queue.'); - $pass = new AnalyzeRouteCollectionPass('aName'); + $pass = new AnalyzeRouteCollectionPass(); $pass->process($container); } @@ -65,6 +72,7 @@ public function testThrowIfExclusiveCommandProcessorOnDefaultQueue() public function testThrowIfTwoExclusiveCommandProcessorsWorkOnSamePrefixedQueue() { $container = new ContainerBuilder(); + $container->setParameter('enqueue.clients', ['aName']); $container->register('enqueue.client.aName.route_collection')->addArgument([ (new Route( 'aFooCommand', @@ -83,7 +91,7 @@ public function testThrowIfTwoExclusiveCommandProcessorsWorkOnSamePrefixedQueue( $this->expectException(\LogicException::class); $this->expectExceptionMessage('The command "aBarCommand" processor "aBarProcessor" is exclusive. The queue "aQueue" already has another exclusive command processor "aFooProcessor" bound to it.'); - $pass = new AnalyzeRouteCollectionPass('aName'); + $pass = new AnalyzeRouteCollectionPass(); $pass->process($container); } @@ -91,6 +99,7 @@ public function testThrowIfTwoExclusiveCommandProcessorsWorkOnSamePrefixedQueue( public function testThrowIfTwoExclusiveCommandProcessorsWorkOnSameQueue() { $container = new ContainerBuilder(); + $container->setParameter('enqueue.clients', ['aName']); $container->register('enqueue.client.aName.route_collection')->addArgument([ (new Route( 'aFooCommand', @@ -109,7 +118,7 @@ public function testThrowIfTwoExclusiveCommandProcessorsWorkOnSameQueue() $this->expectException(\LogicException::class); $this->expectExceptionMessage('The command "aBarCommand" processor "aBarProcessor" is exclusive. The queue "aQueue" already has another exclusive command processor "aFooProcessor" bound to it.'); - $pass = new AnalyzeRouteCollectionPass('aName'); + $pass = new AnalyzeRouteCollectionPass(); $pass->process($container); } @@ -117,6 +126,7 @@ public function testThrowIfTwoExclusiveCommandProcessorsWorkOnSameQueue() public function testShouldNotThrowIfTwoExclusiveCommandProcessorsWorkOnQueueWithSameNameButOnePrefixed() { $container = new ContainerBuilder(); + $container->setParameter('enqueue.clients', ['aName']); $container->register('enqueue.client.aName.route_collection')->addArgument([ (new Route( 'aFooCommand', @@ -133,7 +143,7 @@ public function testShouldNotThrowIfTwoExclusiveCommandProcessorsWorkOnQueueWith ))->toArray(), ]); - $pass = new AnalyzeRouteCollectionPass('aName'); + $pass = new AnalyzeRouteCollectionPass(); $pass->process($container); } diff --git a/pkg/enqueue/Tests/Symfony/Client/DependencyInjection/BuildClientExtensionsPassTest.php b/pkg/enqueue/Tests/Symfony/Client/DependencyInjection/BuildClientExtensionsPassTest.php index 46f549167..e1c8bdcf4 100644 --- a/pkg/enqueue/Tests/Symfony/Client/DependencyInjection/BuildClientExtensionsPassTest.php +++ b/pkg/enqueue/Tests/Symfony/Client/DependencyInjection/BuildClientExtensionsPassTest.php @@ -25,28 +25,29 @@ public function testShouldBeFinal() $this->assertClassFinal(BuildClientExtensionsPass::class); } - public function testCouldBeConstructedWithName() + public function testCouldBeConstructedWithoutArguments() { - $pass = new BuildClientExtensionsPass('aName'); - - $this->assertAttributeSame('aName', 'name', $pass); + new BuildClientExtensionsPass(); } - public function testThrowIfNameEmptyOnConstruct() + public function testThrowIfEnqueueClientsParameterNotSet() { - $this->expectException(\InvalidArgumentException::class); - $this->expectExceptionMessage('The name could not be empty.'); - new BuildClientExtensionsPass(''); + $pass = new BuildClientExtensionsPass(); + + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('The "enqueue.clients" parameter must be set.'); + $pass->process(new ContainerBuilder()); } - public function testShouldDoNothingIfExtensionsServiceIsNotRegistered() + public function testThrowsIfNoClientExtensionsServiceFoundForConfiguredTransport() { $container = new ContainerBuilder(); + $container->setParameter('enqueue.clients', ['foo', 'bar']); - //guard - $this->assertFalse($container->hasDefinition('enqueue.client.aName.client_extensions')); + $pass = new BuildClientExtensionsPass(); - $pass = new BuildClientExtensionsPass('aName'); + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('Service "enqueue.client.foo.client_extensions" not found'); $pass->process($container); } @@ -56,6 +57,7 @@ public function testShouldRegisterClientExtension() $extensions->addArgument([]); $container = new ContainerBuilder(); + $container->setParameter('enqueue.clients', ['aName']); $container->setDefinition('enqueue.client.aName.client_extensions', $extensions); $container->register('aFooExtension', ExtensionInterface::class) @@ -65,7 +67,7 @@ public function testShouldRegisterClientExtension() ->addTag('enqueue.client_extension', ['client' => 'aName']) ; - $pass = new BuildClientExtensionsPass('aName'); + $pass = new BuildClientExtensionsPass(); $pass->process($container); $this->assertInternalType('array', $extensions->getArgument(0)); @@ -81,6 +83,7 @@ public function testShouldIgnoreOtherClientExtensions() $extensions->addArgument([]); $container = new ContainerBuilder(); + $container->setParameter('enqueue.clients', ['aName']); $container->setDefinition('enqueue.client.aName.client_extensions', $extensions); $container->register('aFooExtension', ExtensionInterface::class) @@ -90,7 +93,7 @@ public function testShouldIgnoreOtherClientExtensions() ->addTag('enqueue.client_extension', ['client' => 'anotherName']) ; - $pass = new BuildClientExtensionsPass('aName'); + $pass = new BuildClientExtensionsPass(); $pass->process($container); $this->assertInternalType('array', $extensions->getArgument(0)); @@ -105,6 +108,7 @@ public function testShouldAddExtensionIfClientAll() $extensions->addArgument([]); $container = new ContainerBuilder(); + $container->setParameter('enqueue.clients', ['aName']); $container->setDefinition('enqueue.client.aName.client_extensions', $extensions); $container->register('aFooExtension', ExtensionInterface::class) @@ -114,7 +118,7 @@ public function testShouldAddExtensionIfClientAll() ->addTag('enqueue.client_extension', ['client' => 'anotherName']) ; - $pass = new BuildClientExtensionsPass('aName'); + $pass = new BuildClientExtensionsPass(); $pass->process($container); $this->assertInternalType('array', $extensions->getArgument(0)); @@ -129,6 +133,7 @@ public function testShouldTreatTagsWithoutClientAsDefaultClient() $extensions->addArgument([]); $container = new ContainerBuilder(); + $container->setParameter('enqueue.clients', ['default']); $container->setDefinition('enqueue.client.default.client_extensions', $extensions); $container->register('aFooExtension', ExtensionInterface::class) @@ -138,7 +143,7 @@ public function testShouldTreatTagsWithoutClientAsDefaultClient() ->addTag('enqueue.client_extension') ; - $pass = new BuildClientExtensionsPass('default'); + $pass = new BuildClientExtensionsPass(); $pass->process($container); $this->assertInternalType('array', $extensions->getArgument(0)); @@ -151,6 +156,7 @@ public function testShouldTreatTagsWithoutClientAsDefaultClient() public function testShouldOrderExtensionsByPriority() { $container = new ContainerBuilder(); + $container->setParameter('enqueue.clients', ['default']); $extensions = new Definition(); $extensions->addArgument([]); @@ -168,7 +174,7 @@ public function testShouldOrderExtensionsByPriority() $extension->addTag('enqueue.client_extension', ['priority' => 2]); $container->setDefinition('baz_extension', $extension); - $pass = new BuildClientExtensionsPass('default'); + $pass = new BuildClientExtensionsPass(); $pass->process($container); $orderedExtensions = $extensions->getArgument(0); @@ -182,6 +188,7 @@ public function testShouldOrderExtensionsByPriority() public function testShouldAssumePriorityZeroIfPriorityIsNotSet() { $container = new ContainerBuilder(); + $container->setParameter('enqueue.clients', ['default']); $extensions = new Definition(); $extensions->addArgument([]); @@ -199,7 +206,7 @@ public function testShouldAssumePriorityZeroIfPriorityIsNotSet() $extension->addTag('enqueue.client_extension', ['priority' => -1]); $container->setDefinition('baz_extension', $extension); - $pass = new BuildClientExtensionsPass('default'); + $pass = new BuildClientExtensionsPass(); $pass->process($container); $orderedExtensions = $extensions->getArgument(0); @@ -219,6 +226,7 @@ public function testShouldMergeWithAddedPreviously() ]); $container = new ContainerBuilder(); + $container->setParameter('enqueue.clients', ['aName']); $container->setDefinition('enqueue.client.aName.client_extensions', $extensions); $container->register('aFooExtension', ExtensionInterface::class) @@ -228,7 +236,7 @@ public function testShouldMergeWithAddedPreviously() ->addTag('enqueue.client_extension') ; - $pass = new BuildClientExtensionsPass('aName'); + $pass = new BuildClientExtensionsPass(); $pass->process($container); $this->assertInternalType('array', $extensions->getArgument(0)); @@ -237,4 +245,38 @@ public function testShouldMergeWithAddedPreviously() 'aOloloExtension' => 'aOloloServiceIdAddedPreviously', ], $extensions->getArgument(0)); } + + public function testShouldRegisterProcessorWithMatchedNameToCorrespondingExtensions() + { + $fooExtensions = new Definition(); + $fooExtensions->addArgument([]); + + $barExtensions = new Definition(); + $barExtensions->addArgument([]); + + $container = new ContainerBuilder(); + $container->setParameter('enqueue.clients', ['foo', 'bar']); + $container->setDefinition('enqueue.client.foo.client_extensions', $fooExtensions); + $container->setDefinition('enqueue.client.bar.client_extensions', $barExtensions); + + $container->register('aFooExtension', ExtensionInterface::class) + ->addTag('enqueue.client_extension', ['client' => 'foo']) + ; + $container->register('aBarExtension', ExtensionInterface::class) + ->addTag('enqueue.client_extension', ['client' => 'bar']) + ; + + $pass = new BuildClientExtensionsPass(); + $pass->process($container); + + $this->assertInternalType('array', $fooExtensions->getArgument(0)); + $this->assertEquals([ + new Reference('aFooExtension'), + ], $fooExtensions->getArgument(0)); + + $this->assertInternalType('array', $barExtensions->getArgument(0)); + $this->assertEquals([ + new Reference('aBarExtension'), + ], $barExtensions->getArgument(0)); + } } diff --git a/pkg/enqueue/Tests/Symfony/Client/DependencyInjection/BuildCommandSubscriberRoutesPassTest.php b/pkg/enqueue/Tests/Symfony/Client/DependencyInjection/BuildCommandSubscriberRoutesPassTest.php index 9d375227c..e077a768c 100644 --- a/pkg/enqueue/Tests/Symfony/Client/DependencyInjection/BuildCommandSubscriberRoutesPassTest.php +++ b/pkg/enqueue/Tests/Symfony/Client/DependencyInjection/BuildCommandSubscriberRoutesPassTest.php @@ -29,29 +29,36 @@ public function testShouldBeFinal() $this->assertClassFinal(BuildCommandSubscriberRoutesPass::class); } - public function testCouldBeConstructedWithName() + public function testCouldBeConstructedWithoutArguments() { - $pass = new BuildCommandSubscriberRoutesPass('aName'); - - $this->assertAttributeSame('aName', 'name', $pass); + new BuildCommandSubscriberRoutesPass(); } - public function testThrowIfNameEmptyOnConstruct() + public function testThrowIfEnqueueClientsParameterNotSet() { - $this->expectException(\InvalidArgumentException::class); - $this->expectExceptionMessage('The name could not be empty.'); - new BuildCommandSubscriberRoutesPass(''); + $pass = new BuildCommandSubscriberRoutesPass(); + + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('The "enqueue.clients" parameter must be set.'); + $pass->process(new ContainerBuilder()); } - public function testShouldDoNothingIfRouteCollectionServiceIsNotRegistered() + public function testThrowsIfNoRouteCollectionServiceFoundForConfiguredTransport() { - $pass = new BuildCommandSubscriberRoutesPass('aName'); - $pass->process(new ContainerBuilder()); + $container = new ContainerBuilder(); + $container->setParameter('enqueue.clients', ['foo', 'bar']); + + $pass = new BuildCommandSubscriberRoutesPass(); + + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('Service "enqueue.client.foo.route_collection" not found'); + $pass->process($container); } public function testThrowIfTaggedProcessorIsBuiltByFactory() { $container = new ContainerBuilder(); + $container->setParameter('enqueue.clients', ['aName']); $container->register('enqueue.client.aName.route_collection', RouteCollection::class) ->addArgument([]) ; @@ -60,7 +67,7 @@ public function testThrowIfTaggedProcessorIsBuiltByFactory() ->addTag('enqueue.command_subscriber') ; - $pass = new BuildCommandSubscriberRoutesPass('aName'); + $pass = new BuildCommandSubscriberRoutesPass(); $this->expectException(\LogicException::class); $this->expectExceptionMessage('The command subscriber tag could not be applied to a service created by factory.'); @@ -73,6 +80,7 @@ public function testShouldRegisterProcessorWithMatchedName() $routeCollection->addArgument([]); $container = new ContainerBuilder(); + $container->setParameter('enqueue.clients', ['foo']); $container->setDefinition('enqueue.client.foo.route_collection', $routeCollection); $container->register('aFooProcessor', get_class($this->createCommandSubscriberProcessor())) ->addTag('enqueue.command_subscriber', ['client' => 'foo']) @@ -81,7 +89,7 @@ public function testShouldRegisterProcessorWithMatchedName() ->addTag('enqueue.command_subscriber', ['client' => 'bar']) ; - $pass = new BuildCommandSubscriberRoutesPass('foo'); + $pass = new BuildCommandSubscriberRoutesPass(); $pass->process($container); @@ -95,6 +103,7 @@ public function testShouldRegisterProcessorWithoutNameToDefaultClient() $routeCollection->addArgument([]); $container = new ContainerBuilder(); + $container->setParameter('enqueue.clients', ['default']); $container->setDefinition('enqueue.client.default.route_collection', $routeCollection); $container->register('aFooProcessor', get_class($this->createCommandSubscriberProcessor())) ->addTag('enqueue.command_subscriber') @@ -103,7 +112,7 @@ public function testShouldRegisterProcessorWithoutNameToDefaultClient() ->addTag('enqueue.command_subscriber', ['client' => 'bar']) ; - $pass = new BuildCommandSubscriberRoutesPass('default'); + $pass = new BuildCommandSubscriberRoutesPass(); $pass->process($container); @@ -117,6 +126,7 @@ public function testShouldRegisterProcessorIfClientNameEqualsAll() $routeCollection->addArgument([]); $container = new ContainerBuilder(); + $container->setParameter('enqueue.clients', ['default']); $container->setDefinition('enqueue.client.default.route_collection', $routeCollection); $container->register('aFooProcessor', get_class($this->createCommandSubscriberProcessor())) ->addTag('enqueue.command_subscriber', ['client' => 'all']) @@ -125,7 +135,7 @@ public function testShouldRegisterProcessorIfClientNameEqualsAll() ->addTag('enqueue.command_subscriber', ['client' => 'bar']) ; - $pass = new BuildCommandSubscriberRoutesPass('default'); + $pass = new BuildCommandSubscriberRoutesPass(); $pass->process($container); @@ -141,12 +151,13 @@ public function testShouldRegisterProcessorIfCommandsIsString() $processor = $this->createCommandSubscriberProcessor('fooCommand'); $container = new ContainerBuilder(); + $container->setParameter('enqueue.clients', ['default']); $container->setDefinition('enqueue.client.default.route_collection', $routeCollection); $container->register('aFooProcessor', get_class($processor)) ->addTag('enqueue.command_subscriber') ; - $pass = new BuildCommandSubscriberRoutesPass('default'); + $pass = new BuildCommandSubscriberRoutesPass(); $pass->process($container); $this->assertInternalType('array', $routeCollection->getArgument(0)); @@ -173,12 +184,13 @@ public function testThrowIfCommandSubscriberReturnsNothing() $processor = $this->createCommandSubscriberProcessor(null); $container = new ContainerBuilder(); + $container->setParameter('enqueue.clients', ['default']); $container->setDefinition('enqueue.client.default.route_collection', $routeCollection); $container->register('aFooProcessor', get_class($processor)) ->addTag('enqueue.command_subscriber') ; - $pass = new BuildCommandSubscriberRoutesPass('default'); + $pass = new BuildCommandSubscriberRoutesPass(); $this->expectException(\LogicException::class); $this->expectExceptionMessage('Command subscriber must return something.'); @@ -193,12 +205,13 @@ public function testShouldRegisterProcessorIfCommandsAreStrings() $processor = $this->createCommandSubscriberProcessor(['fooCommand', 'barCommand']); $container = new ContainerBuilder(); + $container->setParameter('enqueue.clients', ['default']); $container->setDefinition('enqueue.client.default.route_collection', $routeCollection); $container->register('aFooProcessor', get_class($processor)) ->addTag('enqueue.command_subscriber') ; - $pass = new BuildCommandSubscriberRoutesPass('default'); + $pass = new BuildCommandSubscriberRoutesPass(); $pass->process($container); $this->assertInternalType('array', $routeCollection->getArgument(0)); @@ -235,12 +248,13 @@ public function testShouldRegisterProcessorIfParamSingleCommandArray() ]); $container = new ContainerBuilder(); + $container->setParameter('enqueue.clients', ['default']); $container->setDefinition('enqueue.client.default.route_collection', $routeCollection); $container->register('aFooProcessor', get_class($processor)) ->addTag('enqueue.command_subscriber') ; - $pass = new BuildCommandSubscriberRoutesPass('default'); + $pass = new BuildCommandSubscriberRoutesPass(); $pass->process($container); $this->assertInternalType('array', $routeCollection->getArgument(0)); @@ -272,12 +286,13 @@ public function testShouldRegisterProcessorIfCommandsAreParamArrays() ]); $container = new ContainerBuilder(); + $container->setParameter('enqueue.clients', ['default']); $container->setDefinition('enqueue.client.default.route_collection', $routeCollection); $container->register('aFooProcessor', get_class($processor)) ->addTag('enqueue.command_subscriber') ; - $pass = new BuildCommandSubscriberRoutesPass('default'); + $pass = new BuildCommandSubscriberRoutesPass(); $pass->process($container); $this->assertInternalType('array', $routeCollection->getArgument(0)); @@ -312,12 +327,13 @@ public function testThrowIfCommandSubscriberParamsInvalid() $processor = $this->createCommandSubscriberProcessor(['fooBar', true]); $container = new ContainerBuilder(); + $container->setParameter('enqueue.clients', ['default']); $container->setDefinition('enqueue.client.default.route_collection', $routeCollection); $container->register('aFooProcessor', get_class($processor)) ->addTag('enqueue.command_subscriber') ; - $pass = new BuildCommandSubscriberRoutesPass('default'); + $pass = new BuildCommandSubscriberRoutesPass(); $this->expectException(\LogicException::class); $this->expectExceptionMessage('Command subscriber configuration is invalid'); @@ -335,12 +351,13 @@ public function testShouldMergeExtractedRoutesWithAlreadySetInCollection() $processor = $this->createCommandSubscriberProcessor(['fooCommand']); $container = new ContainerBuilder(); + $container->setParameter('enqueue.clients', ['default']); $container->setDefinition('enqueue.client.default.route_collection', $routeCollection); $container->register('aFooProcessor', get_class($processor)) ->addTag('enqueue.command_subscriber') ; - $pass = new BuildCommandSubscriberRoutesPass('default'); + $pass = new BuildCommandSubscriberRoutesPass(); $pass->process($container); $this->assertInternalType('array', $routeCollection->getArgument(0)); diff --git a/pkg/enqueue/Tests/Symfony/Client/DependencyInjection/BuildConsumptionExtensionsPassTest.php b/pkg/enqueue/Tests/Symfony/Client/DependencyInjection/BuildConsumptionExtensionsPassTest.php index 68ad2ab3e..d3936fedd 100644 --- a/pkg/enqueue/Tests/Symfony/Client/DependencyInjection/BuildConsumptionExtensionsPassTest.php +++ b/pkg/enqueue/Tests/Symfony/Client/DependencyInjection/BuildConsumptionExtensionsPassTest.php @@ -25,28 +25,29 @@ public function testShouldBeFinal() $this->assertClassFinal(BuildConsumptionExtensionsPass::class); } - public function testCouldBeConstructedWithName() + public function testCouldBeConstructedWithoutArguments() { - $pass = new BuildConsumptionExtensionsPass('aName'); - - $this->assertAttributeSame('aName', 'name', $pass); + new BuildConsumptionExtensionsPass(); } - public function testThrowIfNameEmptyOnConstruct() + public function testThrowIfEnqueueClientsParameterNotSet() { - $this->expectException(\InvalidArgumentException::class); - $this->expectExceptionMessage('The name could not be empty.'); - new BuildConsumptionExtensionsPass(''); + $pass = new BuildConsumptionExtensionsPass(); + + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('The "enqueue.clients" parameter must be set.'); + $pass->process(new ContainerBuilder()); } - public function testShouldDoNothingIfExtensionsServiceIsNotRegistered() + public function testThrowsIfNoConsumptionExtensionsServiceFoundForConfiguredTransport() { $container = new ContainerBuilder(); + $container->setParameter('enqueue.clients', ['foo', 'bar']); - //guard - $this->assertFalse($container->hasDefinition('enqueue.client.aName.consumption_extensions')); + $pass = new BuildConsumptionExtensionsPass(); - $pass = new BuildConsumptionExtensionsPass('aName'); + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('Service "enqueue.client.foo.consumption_extensions" not found'); $pass->process($container); } @@ -56,6 +57,7 @@ public function testShouldRegisterClientExtension() $extensions->addArgument([]); $container = new ContainerBuilder(); + $container->setParameter('enqueue.clients', ['aName']); $container->setDefinition('enqueue.client.aName.consumption_extensions', $extensions); $container->register('aFooExtension', ExtensionInterface::class) @@ -65,7 +67,7 @@ public function testShouldRegisterClientExtension() ->addTag('enqueue.consumption_extension', ['client' => 'aName']) ; - $pass = new BuildConsumptionExtensionsPass('aName'); + $pass = new BuildConsumptionExtensionsPass(); $pass->process($container); $this->assertInternalType('array', $extensions->getArgument(0)); @@ -81,6 +83,7 @@ public function testShouldIgnoreOtherClientExtensions() $extensions->addArgument([]); $container = new ContainerBuilder(); + $container->setParameter('enqueue.clients', ['aName']); $container->setDefinition('enqueue.client.aName.consumption_extensions', $extensions); $container->register('aFooExtension', ExtensionInterface::class) @@ -90,7 +93,7 @@ public function testShouldIgnoreOtherClientExtensions() ->addTag('enqueue.consumption_extension', ['client' => 'anotherName']) ; - $pass = new BuildConsumptionExtensionsPass('aName'); + $pass = new BuildConsumptionExtensionsPass(); $pass->process($container); $this->assertInternalType('array', $extensions->getArgument(0)); @@ -105,6 +108,7 @@ public function testShouldAddExtensionIfClientAll() $extensions->addArgument([]); $container = new ContainerBuilder(); + $container->setParameter('enqueue.clients', ['aName']); $container->setDefinition('enqueue.client.aName.consumption_extensions', $extensions); $container->register('aFooExtension', ExtensionInterface::class) @@ -114,7 +118,7 @@ public function testShouldAddExtensionIfClientAll() ->addTag('enqueue.consumption_extension', ['client' => 'anotherName']) ; - $pass = new BuildConsumptionExtensionsPass('aName'); + $pass = new BuildConsumptionExtensionsPass(); $pass->process($container); $this->assertInternalType('array', $extensions->getArgument(0)); @@ -129,6 +133,7 @@ public function testShouldTreatTagsWithoutClientAsDefaultClient() $extensions->addArgument([]); $container = new ContainerBuilder(); + $container->setParameter('enqueue.clients', ['default']); $container->setDefinition('enqueue.client.default.consumption_extensions', $extensions); $container->register('aFooExtension', ExtensionInterface::class) @@ -138,7 +143,7 @@ public function testShouldTreatTagsWithoutClientAsDefaultClient() ->addTag('enqueue.consumption_extension') ; - $pass = new BuildConsumptionExtensionsPass('default'); + $pass = new BuildConsumptionExtensionsPass(); $pass->process($container); $this->assertInternalType('array', $extensions->getArgument(0)); @@ -151,6 +156,7 @@ public function testShouldTreatTagsWithoutClientAsDefaultClient() public function testShouldOrderExtensionsByPriority() { $container = new ContainerBuilder(); + $container->setParameter('enqueue.clients', ['default']); $extensions = new Definition(); $extensions->addArgument([]); @@ -168,7 +174,7 @@ public function testShouldOrderExtensionsByPriority() $extension->addTag('enqueue.consumption_extension', ['priority' => 2]); $container->setDefinition('baz_extension', $extension); - $pass = new BuildConsumptionExtensionsPass('default'); + $pass = new BuildConsumptionExtensionsPass(); $pass->process($container); $orderedExtensions = $extensions->getArgument(0); @@ -182,6 +188,7 @@ public function testShouldOrderExtensionsByPriority() public function testShouldAssumePriorityZeroIfPriorityIsNotSet() { $container = new ContainerBuilder(); + $container->setParameter('enqueue.clients', ['default']); $extensions = new Definition(); $extensions->addArgument([]); @@ -199,7 +206,7 @@ public function testShouldAssumePriorityZeroIfPriorityIsNotSet() $extension->addTag('enqueue.consumption_extension', ['priority' => -1]); $container->setDefinition('baz_extension', $extension); - $pass = new BuildConsumptionExtensionsPass('default'); + $pass = new BuildConsumptionExtensionsPass(); $pass->process($container); $orderedExtensions = $extensions->getArgument(0); @@ -219,6 +226,7 @@ public function testShouldMergeWithAddedPreviously() ]); $container = new ContainerBuilder(); + $container->setParameter('enqueue.clients', ['aName']); $container->setDefinition('enqueue.client.aName.consumption_extensions', $extensions); $container->register('aFooExtension', ExtensionInterface::class) @@ -228,7 +236,7 @@ public function testShouldMergeWithAddedPreviously() ->addTag('enqueue.consumption_extension') ; - $pass = new BuildConsumptionExtensionsPass('aName'); + $pass = new BuildConsumptionExtensionsPass(); $pass->process($container); $this->assertInternalType('array', $extensions->getArgument(0)); @@ -237,4 +245,38 @@ public function testShouldMergeWithAddedPreviously() 'aOloloExtension' => 'aOloloServiceIdAddedPreviously', ], $extensions->getArgument(0)); } + + public function testShouldRegisterProcessorWithMatchedNameToCorrespondingExtensions() + { + $fooExtensions = new Definition(); + $fooExtensions->addArgument([]); + + $barExtensions = new Definition(); + $barExtensions->addArgument([]); + + $container = new ContainerBuilder(); + $container->setParameter('enqueue.clients', ['foo', 'bar']); + $container->setDefinition('enqueue.client.foo.consumption_extensions', $fooExtensions); + $container->setDefinition('enqueue.client.bar.consumption_extensions', $barExtensions); + + $container->register('aFooExtension', ExtensionInterface::class) + ->addTag('enqueue.consumption_extension', ['client' => 'foo']) + ; + $container->register('aBarExtension', ExtensionInterface::class) + ->addTag('enqueue.consumption_extension', ['client' => 'bar']) + ; + + $pass = new BuildConsumptionExtensionsPass(); + $pass->process($container); + + $this->assertInternalType('array', $fooExtensions->getArgument(0)); + $this->assertEquals([ + new Reference('aFooExtension'), + ], $fooExtensions->getArgument(0)); + + $this->assertInternalType('array', $barExtensions->getArgument(0)); + $this->assertEquals([ + new Reference('aBarExtension'), + ], $barExtensions->getArgument(0)); + } } diff --git a/pkg/enqueue/Tests/Symfony/Client/DependencyInjection/BuildProcessorRegistryPassTest.php b/pkg/enqueue/Tests/Symfony/Client/DependencyInjection/BuildProcessorRegistryPassTest.php index 59796521f..c0142f0e3 100644 --- a/pkg/enqueue/Tests/Symfony/Client/DependencyInjection/BuildProcessorRegistryPassTest.php +++ b/pkg/enqueue/Tests/Symfony/Client/DependencyInjection/BuildProcessorRegistryPassTest.php @@ -6,6 +6,7 @@ use Enqueue\Symfony\Client\DependencyInjection\BuildProcessorRegistryPass; use Enqueue\Test\ClassExtensionTrait; use PHPUnit\Framework\TestCase; +use Symfony\Component\DependencyInjection\Argument\ServiceClosureArgument; use Symfony\Component\DependencyInjection\Compiler\CompilerPassInterface; use Symfony\Component\DependencyInjection\ContainerBuilder; use Symfony\Component\DependencyInjection\Definition; @@ -25,70 +26,70 @@ public function testShouldBeFinal() $this->assertClassFinal(BuildProcessorRegistryPass::class); } - public function testCouldBeConstructedWithName() + public function testCouldBeConstructedWithoutArguments() { - $pass = new BuildProcessorRegistryPass('aName'); - - $this->assertAttributeSame('aName', 'name', $pass); + new BuildProcessorRegistryPass(); } - public function testThrowIfNameEmptyOnConstruct() + public function testThrowIfEnqueueClientsParameterNotSet() { - $this->expectException(\InvalidArgumentException::class); - $this->expectExceptionMessage('The name could not be empty.'); - new BuildProcessorRegistryPass(''); + $pass = new BuildProcessorRegistryPass(); + + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('The "enqueue.clients" parameter must be set.'); + $pass->process(new ContainerBuilder()); } - public function testShouldDoNothingIfRouteCollectionServiceIsNotRegistered() + public function testThrowsIfNoProcessorRegistryServiceFoundForConfiguredTransport() { $container = new ContainerBuilder(); + $container->setParameter('enqueue.clients', ['foo', 'bar']); - //guard - $this->assertFalse($container->hasDefinition('enqueue.client.aName.route_collection')); + $pass = new BuildProcessorRegistryPass(); - $pass = new BuildProcessorRegistryPass('aName'); + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('Service "enqueue.client.foo.processor_registry" not found'); $pass->process($container); } - public function testShouldDoNothingIfProcessorRegistryCollectionServiceIsNotRegistered() + public function testThrowsIfNoRouteCollectionServiceFoundForConfiguredTransport() { $container = new ContainerBuilder(); - $container->register('enqueue.client.aName.route_collection'); + $container->setParameter('enqueue.clients', ['foo', 'bar']); + $container->register('enqueue.client.foo.processor_registry'); - //guard - $this->assertFalse($container->hasDefinition('enqueue.client.aName.processor_registry')); + $pass = new BuildProcessorRegistryPass(); - $pass = new BuildProcessorRegistryPass('aName'); + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('Service "enqueue.client.foo.route_collection" not found'); $pass->process($container); } - public function testShouldDoNothingIfRouterProcessorServiceIsNotRegistered() + public function testThrowsIfNoRouteProcessorServiceFoundForConfiguredTransport() { $container = new ContainerBuilder(); - $container->register('enqueue.client.aName.route_collection'); - $container->register('enqueue.client.aName.processor_registry') - ->addArgument([]) - ; + $container->setParameter('enqueue.clients', ['foo', 'bar']); + $container->register('enqueue.client.foo.processor_registry'); + $container->register('enqueue.client.foo.route_collection'); - //guard - $this->assertFalse($container->hasDefinition('enqueue.client.aName.router_processor')); + $pass = new BuildProcessorRegistryPass(); - $pass = new BuildProcessorRegistryPass('aName'); + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('Service "enqueue.client.foo.router_processor" not found'); $pass->process($container); - - $this->assertSame([], $container->getDefinition('enqueue.client.aName.processor_registry')->getArgument(0)); } public function testThrowIfProcessorServiceIdOptionNotSet() { $container = new ContainerBuilder(); + $container->setParameter('enqueue.clients', ['aName']); $container->register('enqueue.client.aName.route_collection')->addArgument([ (new Route('aCommand', Route::COMMAND, 'aProcessor'))->toArray(), ]); $container->register('enqueue.client.aName.processor_registry')->addArgument([]); $container->register('enqueue.client.aName.router_processor'); - $pass = new BuildProcessorRegistryPass('aName'); + $pass = new BuildProcessorRegistryPass(); $this->expectException(\LogicException::class); $this->expectExceptionMessage('The route option "processor_service_id" is required'); @@ -101,6 +102,7 @@ public function testShouldPassLocatorAsFirstArgument() $registry->addArgument([]); $container = new ContainerBuilder(); + $container->setParameter('enqueue.clients', ['aName']); $container->register('enqueue.client.aName.route_collection')->addArgument([ (new Route( 'aCommand', @@ -118,10 +120,37 @@ public function testShouldPassLocatorAsFirstArgument() $container->setDefinition('enqueue.client.aName.processor_registry', $registry); $container->register('enqueue.client.aName.router_processor'); - $pass = new BuildProcessorRegistryPass('aName'); + $pass = new BuildProcessorRegistryPass(); $pass->process($container); - $this->assertInstanceOf(Reference::class, $registry->getArgument(0)); - $this->assertRegExp('/service_locator\..*?\.enqueue\.client\.aName\.processor_registry/', (string) $registry->getArgument(0)); + $this->assertLocatorServices($container, $registry->getArgument(0), [ + '%enqueue.client.aName.router_processor%' => 'enqueue.client.aName.router_processor', + 'aBarProcessor' => 'aBarServiceId', + 'aFooProcessor' => 'aFooServiceId', + ]); + } + + private function assertLocatorServices(ContainerBuilder $container, $locatorId, array $locatorServices) + { + $this->assertInstanceOf(Reference::class, $locatorId); + $locatorId = (string) $locatorId; + + $this->assertTrue($container->hasDefinition($locatorId)); + $this->assertRegExp('/service_locator\..*?\.enqueue\./', $locatorId); + + $match = []; + if (false == preg_match('/(service_locator\..*?)\.enqueue\./', $locatorId, $match)) { + $this->fail('preg_match should not failed'); + } + + $this->assertTrue($container->hasDefinition($match[1])); + $locator = $container->getDefinition($match[1]); + + $this->assertContainsOnly(ServiceClosureArgument::class, $locator->getArgument(0)); + $actualServices = array_map(function (ServiceClosureArgument $value) { + return (string) $value->getValues()[0]; + }, $locator->getArgument(0)); + + $this->assertEquals($locatorServices, $actualServices); } } diff --git a/pkg/enqueue/Tests/Symfony/Client/DependencyInjection/BuildProcessorRoutesPassTest.php b/pkg/enqueue/Tests/Symfony/Client/DependencyInjection/BuildProcessorRoutesPassTest.php index b0d7af5eb..e671f7a8b 100644 --- a/pkg/enqueue/Tests/Symfony/Client/DependencyInjection/BuildProcessorRoutesPassTest.php +++ b/pkg/enqueue/Tests/Symfony/Client/DependencyInjection/BuildProcessorRoutesPassTest.php @@ -25,24 +25,30 @@ public function testShouldBeFinal() $this->assertClassFinal(BuildProcessorRoutesPass::class); } - public function testCouldBeConstructedWithName() + public function testCouldBeConstructedWithoutArguments() { - $pass = new BuildProcessorRoutesPass('aName'); - - $this->assertAttributeSame('aName', 'name', $pass); + new BuildProcessorRoutesPass(); } - public function testThrowIfNameEmptyOnConstruct() + public function testThrowIfEnqueueClientsParameterNotSet() { - $this->expectException(\InvalidArgumentException::class); - $this->expectExceptionMessage('The name could not be empty.'); - new BuildProcessorRoutesPass(''); + $pass = new BuildProcessorRoutesPass(); + + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('The "enqueue.clients" parameter must be set.'); + $pass->process(new ContainerBuilder()); } - public function testShouldDoNothingIfRouteCollectionServiceIsNotRegistered() + public function testThrowsIfNoRouteCollectionServiceFoundForConfiguredTransport() { - $pass = new BuildProcessorRoutesPass('aName'); - $pass->process(new ContainerBuilder()); + $container = new ContainerBuilder(); + $container->setParameter('enqueue.clients', ['foo', 'bar']); + + $pass = new BuildProcessorRoutesPass(); + + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('Service "enqueue.client.foo.route_collection" not found'); + $pass->process($container); } public function testThrowIfBothTopicAndCommandAttributesAreSet() @@ -51,12 +57,13 @@ public function testThrowIfBothTopicAndCommandAttributesAreSet() $routeCollection->addArgument([]); $container = new ContainerBuilder(); + $container->setParameter('enqueue.clients', ['default']); $container->setDefinition('enqueue.client.default.route_collection', $routeCollection); $container->register('aFooProcessor', 'aProcessorClass') ->addTag('enqueue.processor', ['topic' => 'foo', 'command' => 'bar']) ; - $pass = new BuildProcessorRoutesPass('default'); + $pass = new BuildProcessorRoutesPass(); $this->expectException(\LogicException::class); $this->expectExceptionMessage('Either "topic" or "command" tag attribute must be set on service "aFooProcessor". Both are set.'); @@ -69,12 +76,13 @@ public function testThrowIfNeitherTopicNorCommandAttributesAreSet() $routeCollection->addArgument([]); $container = new ContainerBuilder(); + $container->setParameter('enqueue.clients', ['default']); $container->setDefinition('enqueue.client.default.route_collection', $routeCollection); $container->register('aFooProcessor', 'aProcessorClass') ->addTag('enqueue.processor', []) ; - $pass = new BuildProcessorRoutesPass('default'); + $pass = new BuildProcessorRoutesPass(); $this->expectException(\LogicException::class); $this->expectExceptionMessage('Either "topic" or "command" tag attribute must be set on service "aFooProcessor". None is set.'); @@ -87,6 +95,7 @@ public function testShouldRegisterProcessorWithMatchedName() $routeCollection->addArgument([]); $container = new ContainerBuilder(); + $container->setParameter('enqueue.clients', ['foo']); $container->setDefinition('enqueue.client.foo.route_collection', $routeCollection); $container->register('aFooProcessor', 'aProcessorClass') ->addTag('enqueue.processor', ['client' => 'foo', 'topic' => 'foo']) @@ -95,7 +104,7 @@ public function testShouldRegisterProcessorWithMatchedName() ->addTag('enqueue.processor', ['client' => 'bar', 'command' => 'foo']) ; - $pass = new BuildProcessorRoutesPass('foo'); + $pass = new BuildProcessorRoutesPass(); $pass->process($container); @@ -109,6 +118,7 @@ public function testShouldRegisterProcessorWithoutNameToDefaultClient() $routeCollection->addArgument([]); $container = new ContainerBuilder(); + $container->setParameter('enqueue.clients', ['default']); $container->setDefinition('enqueue.client.default.route_collection', $routeCollection); $container->register('aFooProcessor', 'aProcessorClass') ->addTag('enqueue.processor', ['topic' => 'foo']) @@ -117,7 +127,7 @@ public function testShouldRegisterProcessorWithoutNameToDefaultClient() ->addTag('enqueue.processor', ['client' => 'bar', 'command' => 'foo']) ; - $pass = new BuildProcessorRoutesPass('default'); + $pass = new BuildProcessorRoutesPass(); $pass->process($container); @@ -131,6 +141,7 @@ public function testShouldRegisterProcessorIfClientNameEqualsAll() $routeCollection->addArgument([]); $container = new ContainerBuilder(); + $container->setParameter('enqueue.clients', ['default']); $container->setDefinition('enqueue.client.default.route_collection', $routeCollection); $container->register('aFooProcessor', 'aProcessorClass') ->addTag('enqueue.processor', ['client' => 'all', 'topic' => 'foo']) @@ -139,7 +150,7 @@ public function testShouldRegisterProcessorIfClientNameEqualsAll() ->addTag('enqueue.processor', ['client' => 'bar', 'command' => 'foo']) ; - $pass = new BuildProcessorRoutesPass('default'); + $pass = new BuildProcessorRoutesPass(); $pass->process($container); @@ -153,12 +164,13 @@ public function testShouldRegisterAsTopicProcessor() $routeCollection->addArgument([]); $container = new ContainerBuilder(); + $container->setParameter('enqueue.clients', ['default']); $container->setDefinition('enqueue.client.default.route_collection', $routeCollection); $container->register('aFooProcessor', 'aProcessorClass') ->addTag('enqueue.processor', ['topic' => 'aTopic']) ; - $pass = new BuildProcessorRoutesPass('default'); + $pass = new BuildProcessorRoutesPass(); $pass->process($container); $this->assertInternalType('array', $routeCollection->getArgument(0)); @@ -183,12 +195,13 @@ public function testShouldRegisterAsCommandProcessor() $routeCollection->addArgument([]); $container = new ContainerBuilder(); + $container->setParameter('enqueue.clients', ['default']); $container->setDefinition('enqueue.client.default.route_collection', $routeCollection); $container->register('aFooProcessor', 'aProcessorClass') ->addTag('enqueue.processor', ['command' => 'aCommand']) ; - $pass = new BuildProcessorRoutesPass('default'); + $pass = new BuildProcessorRoutesPass(); $pass->process($container); $this->assertInternalType('array', $routeCollection->getArgument(0)); @@ -213,12 +226,13 @@ public function testShouldRegisterWithCustomProcessorName() $routeCollection->addArgument([]); $container = new ContainerBuilder(); + $container->setParameter('enqueue.clients', ['default']); $container->setDefinition('enqueue.client.default.route_collection', $routeCollection); $container->register('aFooProcessor', 'aProcessorClass') ->addTag('enqueue.processor', ['command' => 'aCommand', 'processor' => 'customProcessorName']) ; - $pass = new BuildProcessorRoutesPass('default'); + $pass = new BuildProcessorRoutesPass(); $pass->process($container); $this->assertInternalType('array', $routeCollection->getArgument(0)); @@ -246,12 +260,13 @@ public function testShouldMergeExtractedRoutesWithAlreadySetInCollection() ]); $container = new ContainerBuilder(); + $container->setParameter('enqueue.clients', ['default']); $container->setDefinition('enqueue.client.default.route_collection', $routeCollection); $container->register('aFooProcessor', 'aProcessorClass') ->addTag('enqueue.processor', ['command' => 'fooCommand']) ; - $pass = new BuildProcessorRoutesPass('default'); + $pass = new BuildProcessorRoutesPass(); $pass->process($container); $this->assertInternalType('array', $routeCollection->getArgument(0)); diff --git a/pkg/enqueue/Tests/Symfony/Client/DependencyInjection/BuildTopicSubscriberRoutesPassTest.php b/pkg/enqueue/Tests/Symfony/Client/DependencyInjection/BuildTopicSubscriberRoutesPassTest.php index 895cfb11d..25033f0c8 100644 --- a/pkg/enqueue/Tests/Symfony/Client/DependencyInjection/BuildTopicSubscriberRoutesPassTest.php +++ b/pkg/enqueue/Tests/Symfony/Client/DependencyInjection/BuildTopicSubscriberRoutesPassTest.php @@ -29,29 +29,36 @@ public function testShouldBeFinal() $this->assertClassFinal(BuildTopicSubscriberRoutesPass::class); } - public function testCouldBeConstructedWithName() + public function testCouldBeConstructedWithoutArguments() { - $pass = new BuildTopicSubscriberRoutesPass('aName'); - - $this->assertAttributeSame('aName', 'name', $pass); + new BuildTopicSubscriberRoutesPass(); } - public function testThrowIfNameEmptyOnConstruct() + public function testThrowIfEnqueueClientsParameterNotSet() { - $this->expectException(\InvalidArgumentException::class); - $this->expectExceptionMessage('The name could not be empty.'); - new BuildTopicSubscriberRoutesPass(''); + $pass = new BuildTopicSubscriberRoutesPass(); + + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('The "enqueue.clients" parameter must be set.'); + $pass->process(new ContainerBuilder()); } - public function testShouldDoNothingIfRouteCollectionServiceIsNotRegistered() + public function testThrowsIfNoRouteCollectionServiceFoundForConfiguredTransport() { - $pass = new BuildTopicSubscriberRoutesPass('aName'); - $pass->process(new ContainerBuilder()); + $container = new ContainerBuilder(); + $container->setParameter('enqueue.clients', ['foo', 'bar']); + + $pass = new BuildTopicSubscriberRoutesPass(); + + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('Service "enqueue.client.foo.route_collection" not found'); + $pass->process($container); } public function testThrowIfTaggedProcessorIsBuiltByFactory() { $container = new ContainerBuilder(); + $container->setParameter('enqueue.clients', ['aName']); $container->register('enqueue.client.aName.route_collection', RouteCollection::class) ->addArgument([]) ; @@ -60,7 +67,7 @@ public function testThrowIfTaggedProcessorIsBuiltByFactory() ->addTag('enqueue.topic_subscriber') ; - $pass = new BuildTopicSubscriberRoutesPass('aName'); + $pass = new BuildTopicSubscriberRoutesPass(); $this->expectException(\LogicException::class); $this->expectExceptionMessage('The topic subscriber tag could not be applied to a service created by factory.'); @@ -73,6 +80,7 @@ public function testShouldRegisterProcessorWithMatchedName() $routeCollection->addArgument([]); $container = new ContainerBuilder(); + $container->setParameter('enqueue.clients', ['foo']); $container->setDefinition('enqueue.client.foo.route_collection', $routeCollection); $container->register('aFooProcessor', get_class($this->createTopicSubscriberProcessor())) ->addTag('enqueue.topic_subscriber', ['client' => 'foo']) @@ -81,7 +89,7 @@ public function testShouldRegisterProcessorWithMatchedName() ->addTag('enqueue.topic_subscriber', ['client' => 'bar']) ; - $pass = new BuildTopicSubscriberRoutesPass('foo'); + $pass = new BuildTopicSubscriberRoutesPass(); $pass->process($container); @@ -95,6 +103,7 @@ public function testShouldRegisterProcessorWithoutNameToDefaultClient() $routeCollection->addArgument([]); $container = new ContainerBuilder(); + $container->setParameter('enqueue.clients', ['default']); $container->setDefinition('enqueue.client.default.route_collection', $routeCollection); $container->register('aFooProcessor', get_class($this->createTopicSubscriberProcessor())) ->addTag('enqueue.topic_subscriber') @@ -103,7 +112,7 @@ public function testShouldRegisterProcessorWithoutNameToDefaultClient() ->addTag('enqueue.topic_subscriber', ['client' => 'bar']) ; - $pass = new BuildTopicSubscriberRoutesPass('default'); + $pass = new BuildTopicSubscriberRoutesPass(); $pass->process($container); @@ -117,6 +126,7 @@ public function testShouldRegisterProcessorIfClientNameEqualsAll() $routeCollection->addArgument([]); $container = new ContainerBuilder(); + $container->setParameter('enqueue.clients', ['default']); $container->setDefinition('enqueue.client.default.route_collection', $routeCollection); $container->register('aFooProcessor', get_class($this->createTopicSubscriberProcessor())) ->addTag('enqueue.topic_subscriber', ['client' => 'all']) @@ -125,7 +135,7 @@ public function testShouldRegisterProcessorIfClientNameEqualsAll() ->addTag('enqueue.topic_subscriber', ['client' => 'bar']) ; - $pass = new BuildTopicSubscriberRoutesPass('default'); + $pass = new BuildTopicSubscriberRoutesPass(); $pass->process($container); @@ -141,12 +151,13 @@ public function testShouldRegisterProcessorIfTopicsIsString() $processor = $this->createTopicSubscriberProcessor('fooTopic'); $container = new ContainerBuilder(); + $container->setParameter('enqueue.clients', ['default']); $container->setDefinition('enqueue.client.default.route_collection', $routeCollection); $container->register('aFooProcessor', get_class($processor)) ->addTag('enqueue.topic_subscriber') ; - $pass = new BuildTopicSubscriberRoutesPass('default'); + $pass = new BuildTopicSubscriberRoutesPass(); $pass->process($container); $this->assertInternalType('array', $routeCollection->getArgument(0)); @@ -173,12 +184,13 @@ public function testThrowIfTopicSubscriberReturnsNothing() $processor = $this->createTopicSubscriberProcessor(null); $container = new ContainerBuilder(); + $container->setParameter('enqueue.clients', ['default']); $container->setDefinition('enqueue.client.default.route_collection', $routeCollection); $container->register('aFooProcessor', get_class($processor)) ->addTag('enqueue.topic_subscriber') ; - $pass = new BuildTopicSubscriberRoutesPass('default'); + $pass = new BuildTopicSubscriberRoutesPass(); $this->expectException(\LogicException::class); $this->expectExceptionMessage('Topic subscriber must return something.'); @@ -193,12 +205,13 @@ public function testShouldRegisterProcessorIfTopicsAreStrings() $processor = $this->createTopicSubscriberProcessor(['fooTopic', 'barTopic']); $container = new ContainerBuilder(); + $container->setParameter('enqueue.clients', ['default']); $container->setDefinition('enqueue.client.default.route_collection', $routeCollection); $container->register('aFooProcessor', get_class($processor)) ->addTag('enqueue.topic_subscriber') ; - $pass = new BuildTopicSubscriberRoutesPass('default'); + $pass = new BuildTopicSubscriberRoutesPass(); $pass->process($container); $this->assertInternalType('array', $routeCollection->getArgument(0)); @@ -234,12 +247,13 @@ public function testShouldRegisterProcessorIfTopicsAreParamArrays() ]); $container = new ContainerBuilder(); + $container->setParameter('enqueue.clients', ['default']); $container->setDefinition('enqueue.client.default.route_collection', $routeCollection); $container->register('aFooProcessor', get_class($processor)) ->addTag('enqueue.topic_subscriber') ; - $pass = new BuildTopicSubscriberRoutesPass('default'); + $pass = new BuildTopicSubscriberRoutesPass(); $pass->process($container); $this->assertInternalType('array', $routeCollection->getArgument(0)); @@ -274,12 +288,13 @@ public function testThrowIfTopicSubscriberParamsInvalid() $processor = $this->createTopicSubscriberProcessor(['fooBar', true]); $container = new ContainerBuilder(); + $container->setParameter('enqueue.clients', ['default']); $container->setDefinition('enqueue.client.default.route_collection', $routeCollection); $container->register('aFooProcessor', get_class($processor)) ->addTag('enqueue.topic_subscriber') ; - $pass = new BuildTopicSubscriberRoutesPass('default'); + $pass = new BuildTopicSubscriberRoutesPass(); $this->expectException(\LogicException::class); $this->expectExceptionMessage('Topic subscriber configuration is invalid'); @@ -297,12 +312,13 @@ public function testShouldMergeExtractedRoutesWithAlreadySetInCollection() $processor = $this->createTopicSubscriberProcessor(['fooTopic']); $container = new ContainerBuilder(); + $container->setParameter('enqueue.clients', ['default']); $container->setDefinition('enqueue.client.default.route_collection', $routeCollection); $container->register('aFooProcessor', get_class($processor)) ->addTag('enqueue.topic_subscriber') ; - $pass = new BuildTopicSubscriberRoutesPass('default'); + $pass = new BuildTopicSubscriberRoutesPass(); $pass->process($container); $this->assertInternalType('array', $routeCollection->getArgument(0)); diff --git a/pkg/enqueue/Tests/Symfony/DependencyInjection/BuildConsumptionExtensionsPassTest.php b/pkg/enqueue/Tests/Symfony/DependencyInjection/BuildConsumptionExtensionsPassTest.php index 5401b7e18..e44272dff 100644 --- a/pkg/enqueue/Tests/Symfony/DependencyInjection/BuildConsumptionExtensionsPassTest.php +++ b/pkg/enqueue/Tests/Symfony/DependencyInjection/BuildConsumptionExtensionsPassTest.php @@ -245,4 +245,38 @@ public function testShouldMergeWithAddedPreviously() 'aOloloExtension' => 'aOloloServiceIdAddedPreviously', ], $extensions->getArgument(0)); } + + public function testShouldRegisterProcessorWithMatchedNameToCorrespondingRegistries() + { + $fooExtensions = new Definition(); + $fooExtensions->addArgument([]); + + $barExtensions = new Definition(); + $barExtensions->addArgument([]); + + $container = new ContainerBuilder(); + $container->setParameter('enqueue.transports', ['foo', 'bar']); + $container->setDefinition('enqueue.transport.foo.consumption_extensions', $fooExtensions); + $container->setDefinition('enqueue.transport.bar.consumption_extensions', $barExtensions); + + $container->register('aFooExtension', ExtensionInterface::class) + ->addTag('enqueue.transport.consumption_extension', ['transport' => 'foo']) + ; + $container->register('aBarExtension', ExtensionInterface::class) + ->addTag('enqueue.transport.consumption_extension', ['transport' => 'bar']) + ; + + $pass = new BuildConsumptionExtensionsPass(); + $pass->process($container); + + $this->assertInternalType('array', $fooExtensions->getArgument(0)); + $this->assertEquals([ + new Reference('aFooExtension'), + ], $fooExtensions->getArgument(0)); + + $this->assertInternalType('array', $barExtensions->getArgument(0)); + $this->assertEquals([ + new Reference('aBarExtension'), + ], $barExtensions->getArgument(0)); + } } diff --git a/pkg/enqueue/composer.json b/pkg/enqueue/composer.json index e42f5e9f6..fc42d772b 100644 --- a/pkg/enqueue/composer.json +++ b/pkg/enqueue/composer.json @@ -10,6 +10,7 @@ "queue-interop/amqp-interop": "0.8.x-dev", "queue-interop/queue-interop": "0.7.x-dev", "enqueue/null": "0.9.x-dev", + "enqueue/dsn": "0.9.x-dev", "ramsey/uuid": "^2|^3.5", "psr/log": "^1", "psr/container": "^1"