Skip to content

Commit 3274656

Browse files
authored
Merge pull request #577 from php-enqueue/client-passes-migration
[client] Make symfony compiler passes multi client
2 parents ada1cd4 + 09bbfdb commit 3274656

22 files changed

+735
-483
lines changed

pkg/enqueue-bundle/DependencyInjection/EnqueueExtension.php

+2
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ public function load(array $configs, ContainerBuilder $container): void
3636
$container->setParameter('enqueue.transports', array_keys($config['transport']));
3737

3838
if (isset($config['client'])) {
39+
$container->setParameter('enqueue.clients', ['default']);
40+
3941
$this->setupAutowiringForProcessors($container);
4042

4143
$loader->load('client.yml');

pkg/enqueue-bundle/EnqueueBundle.php

+7-7
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,13 @@ public function build(ContainerBuilder $container): void
2727
$container->addCompilerPass(new BuildProcessorRegistryPass());
2828

2929
//client passes
30-
$container->addCompilerPass(new BuildClientConsumptionExtensionsPass('default'));
31-
$container->addCompilerPass(new BuildClientExtensionsPass('default'));
32-
$container->addCompilerPass(new BuildClientTopicSubscriberRoutesPass('default'), PassConfig::TYPE_BEFORE_OPTIMIZATION, 100);
33-
$container->addCompilerPass(new BuildClientCommandSubscriberRoutesPass('default'), PassConfig::TYPE_BEFORE_OPTIMIZATION, 100);
34-
$container->addCompilerPass(new BuildClientProcessorRoutesPass('default'), PassConfig::TYPE_BEFORE_OPTIMIZATION, 100);
35-
$container->addCompilerPass(new AnalyzeRouteCollectionPass('default'), PassConfig::TYPE_BEFORE_OPTIMIZATION, 30);
36-
$container->addCompilerPass(new BuildClientProcessorRegistryPass('default'));
30+
$container->addCompilerPass(new BuildClientConsumptionExtensionsPass());
31+
$container->addCompilerPass(new BuildClientExtensionsPass());
32+
$container->addCompilerPass(new BuildClientTopicSubscriberRoutesPass(), PassConfig::TYPE_BEFORE_OPTIMIZATION, 100);
33+
$container->addCompilerPass(new BuildClientCommandSubscriberRoutesPass(), PassConfig::TYPE_BEFORE_OPTIMIZATION, 100);
34+
$container->addCompilerPass(new BuildClientProcessorRoutesPass(), PassConfig::TYPE_BEFORE_OPTIMIZATION, 100);
35+
$container->addCompilerPass(new AnalyzeRouteCollectionPass(), PassConfig::TYPE_BEFORE_OPTIMIZATION, 30);
36+
$container->addCompilerPass(new BuildClientProcessorRegistryPass());
3737

3838
if (class_exists(AsyncEventDispatcherExtension::class)) {
3939
$container->addCompilerPass(new AsyncEventsPass(), PassConfig::TYPE_BEFORE_OPTIMIZATION, 100);

pkg/enqueue-bundle/Tests/Unit/DependencyInjection/EnqueueExtensionTest.php

+18
Original file line numberDiff line numberDiff line change
@@ -461,6 +461,24 @@ public function testShouldSetPropertyWithAllConfiguredTransports()
461461
$this->assertEquals(['default', 'foo', 'bar'], $container->getParameter('enqueue.transports'));
462462
}
463463

464+
public function testShouldSetPropertyWithAllConfiguredClients()
465+
{
466+
$container = $this->getContainerBuilder(true);
467+
468+
$extension = new EnqueueExtension();
469+
$extension->load([[
470+
'client' => [],
471+
'transport' => [
472+
'default' => ['dsn' => 'default:'],
473+
'foo' => ['dsn' => 'foo:'],
474+
'bar' => ['dsn' => 'foo:'],
475+
],
476+
]], $container);
477+
478+
$this->assertTrue($container->hasParameter('enqueue.clients'));
479+
$this->assertEquals(['default'], $container->getParameter('enqueue.clients'));
480+
}
481+
464482
public function testShouldLoadProcessAutoconfigureChildDefinition()
465483
{
466484
$container = $this->getContainerBuilder(true);

pkg/enqueue/Symfony/Client/DependencyInjection/AnalyzeRouteCollectionPass.php

+22-18
Original file line numberDiff line numberDiff line change
@@ -8,31 +8,35 @@
88

99
final class AnalyzeRouteCollectionPass implements CompilerPassInterface
1010
{
11-
/**
12-
* @var string
13-
*/
14-
private $name;
11+
use FormatClientNameTrait;
1512

16-
public function __construct(string $clientName)
17-
{
18-
if (empty($clientName)) {
19-
throw new \InvalidArgumentException('The name could not be empty.');
20-
}
21-
22-
$this->name = $clientName;
23-
}
13+
protected $name;
2414

2515
public function process(ContainerBuilder $container): void
2616
{
27-
$routeCollectionId = sprintf('enqueue.client.%s.route_collection', $this->name);
28-
if (false == $container->hasDefinition($routeCollectionId)) {
29-
return;
17+
if (false == $container->hasParameter('enqueue.clients')) {
18+
throw new \LogicException('The "enqueue.clients" parameter must be set.');
3019
}
3120

32-
$collection = RouteCollection::fromArray($container->getDefinition($routeCollectionId)->getArgument(0));
21+
$names = $container->getParameter('enqueue.clients');
22+
23+
foreach ($names as $name) {
24+
$this->name = $name;
25+
$routeCollectionId = $this->format('route_collection');
26+
if (false == $container->hasDefinition($routeCollectionId)) {
27+
throw new \LogicException(sprintf('Service "%s" not found', $routeCollectionId));
28+
}
3329

34-
$this->exclusiveCommandsCouldNotBeRunOnDefaultQueue($collection);
35-
$this->exclusiveCommandProcessorMustBeSingleOnGivenQueue($collection);
30+
$collection = RouteCollection::fromArray($container->getDefinition($routeCollectionId)->getArgument(0));
31+
32+
$this->exclusiveCommandsCouldNotBeRunOnDefaultQueue($collection);
33+
$this->exclusiveCommandProcessorMustBeSingleOnGivenQueue($collection);
34+
}
35+
}
36+
37+
protected function getName(): string
38+
{
39+
return $this->name;
3640
}
3741

3842
private function exclusiveCommandsCouldNotBeRunOnDefaultQueue(RouteCollection $collection)

pkg/enqueue/Symfony/Client/DependencyInjection/BuildClientExtensionsPass.php

+42-38
Original file line numberDiff line numberDiff line change
@@ -8,58 +8,62 @@
88

99
final class BuildClientExtensionsPass implements CompilerPassInterface
1010
{
11-
/**
12-
* @var string
13-
*/
14-
private $name;
11+
use FormatClientNameTrait;
1512

16-
public function __construct(string $clientName)
17-
{
18-
if (empty($clientName)) {
19-
throw new \InvalidArgumentException('The name could not be empty.');
20-
}
21-
22-
$this->name = $clientName;
23-
}
13+
protected $name;
2414

2515
public function process(ContainerBuilder $container): void
2616
{
27-
$extensionsId = sprintf('enqueue.client.%s.client_extensions', $this->name);
28-
if (false == $container->hasDefinition($extensionsId)) {
29-
return;
17+
if (false == $container->hasParameter('enqueue.clients')) {
18+
throw new \LogicException('The "enqueue.clients" parameter must be set.');
3019
}
3120

32-
$tags = array_merge(
33-
$container->findTaggedServiceIds('enqueue.client_extension'),
34-
$container->findTaggedServiceIds('enqueue.client.extension') // TODO BC
35-
);
21+
$names = $container->getParameter('enqueue.clients');
3622

37-
$groupByPriority = [];
38-
foreach ($tags as $serviceId => $tagAttributes) {
39-
foreach ($tagAttributes as $tagAttribute) {
40-
$client = $tagAttribute['client'] ?? 'default';
23+
foreach ($names as $name) {
24+
$this->name = $name;
25+
$extensionsId = $this->format('client_extensions');
26+
if (false == $container->hasDefinition($extensionsId)) {
27+
throw new \LogicException(sprintf('Service "%s" not found', $extensionsId));
28+
}
4129

42-
if ($client !== $this->name && 'all' !== $client) {
43-
continue;
44-
}
30+
$tags = array_merge(
31+
$container->findTaggedServiceIds('enqueue.client_extension'),
32+
$container->findTaggedServiceIds('enqueue.client.extension') // TODO BC
33+
);
34+
35+
$groupByPriority = [];
36+
foreach ($tags as $serviceId => $tagAttributes) {
37+
foreach ($tagAttributes as $tagAttribute) {
38+
$client = $tagAttribute['client'] ?? 'default';
4539

46-
$priority = (int) ($tagAttribute['priority'] ?? 0);
40+
if ($client !== $this->name && 'all' !== $client) {
41+
continue;
42+
}
4743

48-
$groupByPriority[$priority][] = new Reference($serviceId);
44+
$priority = (int) ($tagAttribute['priority'] ?? 0);
45+
46+
$groupByPriority[$priority][] = new Reference($serviceId);
47+
}
4948
}
50-
}
5149

52-
krsort($groupByPriority, SORT_NUMERIC);
50+
krsort($groupByPriority, SORT_NUMERIC);
5351

54-
$flatExtensions = [];
55-
foreach ($groupByPriority as $extension) {
56-
$flatExtensions = array_merge($flatExtensions, $extension);
52+
$flatExtensions = [];
53+
foreach ($groupByPriority as $extension) {
54+
$flatExtensions = array_merge($flatExtensions, $extension);
55+
}
56+
57+
$extensionsService = $container->getDefinition($extensionsId);
58+
$extensionsService->replaceArgument(0, array_merge(
59+
$extensionsService->getArgument(0),
60+
$flatExtensions
61+
));
5762
}
63+
}
5864

59-
$extensionsService = $container->getDefinition($extensionsId);
60-
$extensionsService->replaceArgument(0, array_merge(
61-
$extensionsService->getArgument(0),
62-
$flatExtensions
63-
));
65+
protected function getName(): string
66+
{
67+
return $this->name;
6468
}
6569
}

pkg/enqueue/Symfony/Client/DependencyInjection/BuildCommandSubscriberRoutesPass.php

+74-70
Original file line numberDiff line numberDiff line change
@@ -10,98 +10,102 @@
1010

1111
final class BuildCommandSubscriberRoutesPass implements CompilerPassInterface
1212
{
13-
/**
14-
* @var string
15-
*/
16-
private $name;
13+
use FormatClientNameTrait;
1714

18-
public function __construct(string $clientName)
19-
{
20-
if (empty($clientName)) {
21-
throw new \InvalidArgumentException('The name could not be empty.');
22-
}
23-
24-
$this->name = $clientName;
25-
}
15+
protected $name;
2616

2717
public function process(ContainerBuilder $container): void
2818
{
29-
$routeCollectionId = sprintf('enqueue.client.%s.route_collection', $this->name);
30-
if (false == $container->hasDefinition($routeCollectionId)) {
31-
return;
19+
if (false == $container->hasParameter('enqueue.clients')) {
20+
throw new \LogicException('The "enqueue.clients" parameter must be set.');
3221
}
3322

34-
$tag = 'enqueue.command_subscriber';
35-
$routeCollection = new RouteCollection([]);
36-
foreach ($container->findTaggedServiceIds($tag) as $serviceId => $tagAttributes) {
37-
$processorDefinition = $container->getDefinition($serviceId);
38-
if ($processorDefinition->getFactory()) {
39-
throw new \LogicException('The command subscriber tag could not be applied to a service created by factory.');
40-
}
23+
$names = $container->getParameter('enqueue.clients');
4124

42-
$processorClass = $processorDefinition->getClass();
43-
if (false == class_exists($processorClass)) {
44-
throw new \LogicException(sprintf('The processor class "%s" could not be found.', $processorClass));
25+
foreach ($names as $name) {
26+
$this->name = $name;
27+
$routeCollectionId = sprintf('enqueue.client.%s.route_collection', $this->name);
28+
if (false == $container->hasDefinition($routeCollectionId)) {
29+
throw new \LogicException(sprintf('Service "%s" not found', $routeCollectionId));
4530
}
4631

47-
if (false == is_subclass_of($processorClass, CommandSubscriberInterface::class)) {
48-
throw new \LogicException(sprintf('The processor must implement "%s" interface to be used with the tag "%s"', CommandSubscriberInterface::class, $tag));
49-
}
32+
$tag = 'enqueue.command_subscriber';
33+
$routeCollection = new RouteCollection([]);
34+
foreach ($container->findTaggedServiceIds($tag) as $serviceId => $tagAttributes) {
35+
$processorDefinition = $container->getDefinition($serviceId);
36+
if ($processorDefinition->getFactory()) {
37+
throw new \LogicException('The command subscriber tag could not be applied to a service created by factory.');
38+
}
5039

51-
foreach ($tagAttributes as $tagAttribute) {
52-
$client = $tagAttribute['client'] ?? 'default';
40+
$processorClass = $processorDefinition->getClass();
41+
if (false == class_exists($processorClass)) {
42+
throw new \LogicException(sprintf('The processor class "%s" could not be found.', $processorClass));
43+
}
5344

54-
if ($client !== $this->name && 'all' !== $client) {
55-
continue;
45+
if (false == is_subclass_of($processorClass, CommandSubscriberInterface::class)) {
46+
throw new \LogicException(sprintf('The processor must implement "%s" interface to be used with the tag "%s"', CommandSubscriberInterface::class, $tag));
5647
}
5748

58-
/** @var CommandSubscriberInterface $processorClass */
59-
$commands = $processorClass::getSubscribedCommand();
49+
foreach ($tagAttributes as $tagAttribute) {
50+
$client = $tagAttribute['client'] ?? 'default';
6051

61-
if (empty($commands)) {
62-
throw new \LogicException('Command subscriber must return something.');
63-
}
52+
if ($client !== $this->name && 'all' !== $client) {
53+
continue;
54+
}
6455

65-
if (is_string($commands)) {
66-
$commands = [$commands];
67-
}
56+
/** @var CommandSubscriberInterface $processorClass */
57+
$commands = $processorClass::getSubscribedCommand();
6858

69-
if (!is_array($commands)) {
70-
throw new \LogicException('Command subscriber configuration is invalid. Should be an array or string.');
71-
}
59+
if (empty($commands)) {
60+
throw new \LogicException('Command subscriber must return something.');
61+
}
7262

73-
if (isset($commands['command'])) {
74-
$commands = [$commands];
75-
}
63+
if (is_string($commands)) {
64+
$commands = [$commands];
65+
}
66+
67+
if (!is_array($commands)) {
68+
throw new \LogicException('Command subscriber configuration is invalid. Should be an array or string.');
69+
}
70+
71+
if (isset($commands['command'])) {
72+
$commands = [$commands];
73+
}
7674

77-
foreach ($commands as $key => $params) {
78-
if (is_string($params)) {
79-
$routeCollection->add(new Route($params, Route::COMMAND, $serviceId, ['processor_service_id' => $serviceId]));
80-
} elseif (is_array($params)) {
81-
$source = $params['command'] ?? null;
82-
$processor = $params['processor'] ?? $serviceId;
83-
unset($params['command'], $params['source'], $params['source_type'], $params['processor'], $params['options']);
84-
$options = $params;
85-
$options['processor_service_id'] = $serviceId;
86-
87-
$routeCollection->add(new Route($source, Route::COMMAND, $processor, $options));
88-
} else {
89-
throw new \LogicException(sprintf(
90-
'Command subscriber configuration is invalid for "%s::getSubscribedCommand()". "%s"',
91-
$processorClass,
92-
json_encode($processorClass::getSubscribedCommand())
93-
));
75+
foreach ($commands as $key => $params) {
76+
if (is_string($params)) {
77+
$routeCollection->add(new Route($params, Route::COMMAND, $serviceId, ['processor_service_id' => $serviceId]));
78+
} elseif (is_array($params)) {
79+
$source = $params['command'] ?? null;
80+
$processor = $params['processor'] ?? $serviceId;
81+
unset($params['command'], $params['source'], $params['source_type'], $params['processor'], $params['options']);
82+
$options = $params;
83+
$options['processor_service_id'] = $serviceId;
84+
85+
$routeCollection->add(new Route($source, Route::COMMAND, $processor, $options));
86+
} else {
87+
throw new \LogicException(sprintf(
88+
'Command subscriber configuration is invalid for "%s::getSubscribedCommand()". "%s"',
89+
$processorClass,
90+
json_encode($processorClass::getSubscribedCommand())
91+
));
92+
}
9493
}
9594
}
9695
}
97-
}
9896

99-
$rawRoutes = $routeCollection->toArray();
97+
$rawRoutes = $routeCollection->toArray();
98+
99+
$routeCollectionService = $container->getDefinition($routeCollectionId);
100+
$routeCollectionService->replaceArgument(0, array_merge(
101+
$routeCollectionService->getArgument(0),
102+
$rawRoutes
103+
));
104+
}
105+
}
100106

101-
$routeCollectionService = $container->getDefinition($routeCollectionId);
102-
$routeCollectionService->replaceArgument(0, array_merge(
103-
$routeCollectionService->getArgument(0),
104-
$rawRoutes
105-
));
107+
protected function getName(): string
108+
{
109+
return $this->name;
106110
}
107111
}

0 commit comments

Comments
 (0)