Skip to content

Commit 8438e41

Browse files
committed
[consumption] Add ability to consume from multiple transports.
1 parent 0cc122c commit 8438e41

23 files changed

+1417
-424
lines changed

Client/ArrayProcessorRegistry.php renamed to ArrayProcessorRegistry.php

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
<?php
22

3-
namespace Enqueue\Client;
3+
namespace Enqueue;
44

55
use Interop\Queue\Processor;
66

@@ -16,7 +16,10 @@ class ArrayProcessorRegistry implements ProcessorRegistryInterface
1616
*/
1717
public function __construct(array $processors = [])
1818
{
19-
$this->processors = $processors;
19+
$this->processors = [];
20+
array_walk($processors, function (Processor $processor, string $key) {
21+
$this->processors[$key] = $processor;
22+
});
2023
}
2124

2225
public function add(string $name, Processor $processor): void

Client/DelegateProcessor.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
namespace Enqueue\Client;
44

5+
use Enqueue\ProcessorRegistryInterface;
56
use Interop\Queue\Context;
67
use Interop\Queue\Message as InteropMessage;
78
use Interop\Queue\Processor;
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
<?php
2+
3+
namespace Enqueue\Consumption;
4+
5+
class ArrayQueueConsumerRegistry implements QueueConsumerRegistryInterface
6+
{
7+
/**
8+
* @var QueueConsumerInterface[]
9+
*/
10+
private $consumers;
11+
12+
/**
13+
* @param QueueConsumerInterface[] $queueConsumers
14+
*/
15+
public function __construct(array $queueConsumers = [])
16+
{
17+
$this->consumers = [];
18+
array_walk($queueConsumers, function (QueueConsumerInterface $consumer, string $key) {
19+
$this->consumers[$key] = $consumer;
20+
});
21+
}
22+
23+
public function add(string $name, QueueConsumerInterface $consumer): void
24+
{
25+
$this->consumers[$name] = $consumer;
26+
}
27+
28+
public function get(string $name): QueueConsumerInterface
29+
{
30+
if (false == isset($this->consumers[$name])) {
31+
throw new \LogicException(sprintf('QueueConsumer was not found, name: "%s".', $name));
32+
}
33+
34+
return $this->consumers[$name];
35+
}
36+
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Enqueue\Consumption;
6+
7+
interface QueueConsumerRegistryInterface
8+
{
9+
public function get(string $name): QueueConsumerInterface;
10+
}

Client/ProcessorRegistryInterface.php renamed to ProcessorRegistryInterface.php

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
<?php
22

3-
namespace Enqueue\Client;
3+
declare(strict_types=1);
4+
5+
namespace Enqueue;
46

57
use Interop\Queue\Processor;
68

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
<?php
2+
3+
namespace Enqueue\Symfony\Consumption;
4+
5+
use Enqueue\Consumption\ChainExtension;
6+
use Enqueue\Consumption\Extension\LoggerExtension;
7+
use Enqueue\Consumption\QueueConsumerRegistryInterface;
8+
use Enqueue\Consumption\QueueSubscriberInterface;
9+
use Enqueue\ProcessorRegistryInterface;
10+
use Symfony\Component\Console\Command\Command;
11+
use Symfony\Component\Console\Input\InputArgument;
12+
use Symfony\Component\Console\Input\InputInterface;
13+
use Symfony\Component\Console\Input\InputOption;
14+
use Symfony\Component\Console\Logger\ConsoleLogger;
15+
use Symfony\Component\Console\Output\OutputInterface;
16+
17+
class ConfigurableConsumeCommand extends Command
18+
{
19+
use LimitsExtensionsCommandTrait;
20+
use QueueConsumerOptionsCommandTrait;
21+
22+
protected static $defaultName = 'enqueue:transport:consume';
23+
24+
/**
25+
* @var QueueConsumerRegistryInterface
26+
*/
27+
protected $consumerRegistry;
28+
29+
/**
30+
* @var ProcessorRegistryInterface
31+
*/
32+
private $processorRegistry;
33+
34+
public function __construct(QueueConsumerRegistryInterface $consumerRegistry, ProcessorRegistryInterface $processorRegistry)
35+
{
36+
parent::__construct(static::$defaultName);
37+
38+
$this->consumerRegistry = $consumerRegistry;
39+
$this->processorRegistry = $processorRegistry;
40+
}
41+
42+
protected function configure(): void
43+
{
44+
$this->configureLimitsExtensions();
45+
$this->configureQueueConsumerOptions();
46+
47+
$this
48+
->setDescription('A worker that consumes message from a broker. '.
49+
'To use this broker you have to explicitly set a queue to consume from '.
50+
'and a message processor service')
51+
->addArgument('processor', InputArgument::REQUIRED, 'A message processor.')
52+
->addArgument('queues', InputArgument::OPTIONAL | InputArgument::IS_ARRAY, 'A queue to consume from', [])
53+
->addOption('transport', 't', InputOption::VALUE_OPTIONAL, 'The transport to consume messages from.', 'default')
54+
;
55+
}
56+
57+
protected function execute(InputInterface $input, OutputInterface $output): ?int
58+
{
59+
$consumer = $this->consumerRegistry->get($input->getOption('transport'));
60+
61+
$this->setQueueConsumerOptions($consumer, $input);
62+
63+
$processor = $this->processorRegistry->get($input->getArgument('processor'));
64+
65+
$queues = $input->getArgument('queues');
66+
if (empty($queues) && $processor instanceof QueueSubscriberInterface) {
67+
$queues = $processor::getSubscribedQueues();
68+
}
69+
70+
if (empty($queues)) {
71+
throw new \LogicException(sprintf(
72+
'The queue is not provided. The processor must implement "%s" interface and it must return not empty array of queues or a queue set using as a second argument.',
73+
QueueSubscriberInterface::class
74+
));
75+
}
76+
77+
$extensions = $this->getLimitsExtensions($input, $output);
78+
array_unshift($extensions, new LoggerExtension(new ConsoleLogger($output)));
79+
80+
$runtimeExtensions = new ChainExtension($extensions);
81+
82+
foreach ($queues as $queue) {
83+
$consumer->bind($queue, $processor);
84+
}
85+
86+
$consumer->consume($runtimeExtensions);
87+
88+
return null;
89+
}
90+
}

Symfony/Consumption/ConsumeMessagesCommand.php renamed to Symfony/Consumption/ConsumeCommand.php

Lines changed: 20 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,16 @@
55
use Enqueue\Consumption\ChainExtension;
66
use Enqueue\Consumption\Extension\LoggerExtension;
77
use Enqueue\Consumption\QueueConsumerInterface;
8+
use Enqueue\Consumption\QueueConsumerRegistryInterface;
89
use Symfony\Component\Console\Command\Command;
910
use Symfony\Component\Console\Input\InputInterface;
11+
use Symfony\Component\Console\Input\InputOption;
1012
use Symfony\Component\Console\Logger\ConsoleLogger;
1113
use Symfony\Component\Console\Output\OutputInterface;
1214
use Symfony\Component\DependencyInjection\ContainerAwareInterface;
1315
use Symfony\Component\DependencyInjection\ContainerAwareTrait;
1416

15-
class ConsumeMessagesCommand extends Command implements ContainerAwareInterface
17+
class ConsumeCommand extends Command implements ContainerAwareInterface
1618
{
1719
use ContainerAwareTrait;
1820
use LimitsExtensionsCommandTrait;
@@ -21,46 +23,48 @@ class ConsumeMessagesCommand extends Command implements ContainerAwareInterface
2123
protected static $defaultName = 'enqueue:transport:consume';
2224

2325
/**
24-
* @var QueueConsumerInterface
26+
* @var QueueConsumerRegistryInterface
2527
*/
26-
protected $consumer;
28+
protected $consumerRegistry;
2729

2830
/**
29-
* @param QueueConsumerInterface $consumer
31+
* [name => QueueConsumerInterface].
32+
*
33+
* @param QueueConsumerInterface[]
3034
*/
31-
public function __construct(QueueConsumerInterface $consumer)
35+
public function __construct(QueueConsumerRegistryInterface $consumerRegistry)
3236
{
3337
parent::__construct(static::$defaultName);
3438

35-
$this->consumer = $consumer;
39+
$this->consumerRegistry = $consumerRegistry;
3640
}
3741

38-
/**
39-
* {@inheritdoc}
40-
*/
41-
protected function configure()
42+
protected function configure(): void
4243
{
4344
$this->configureLimitsExtensions();
4445
$this->configureQueueConsumerOptions();
4546

4647
$this
48+
->addOption('transport', 't', InputOption::VALUE_OPTIONAL, 'The transport to consume messages from.', 'default')
4749
->setDescription('A worker that consumes message from a broker. '.
4850
'To use this broker you have to configure queue consumer before adding to the command')
4951
;
5052
}
5153

52-
/**
53-
* {@inheritdoc}
54-
*/
55-
protected function execute(InputInterface $input, OutputInterface $output)
54+
protected function execute(InputInterface $input, OutputInterface $output): ?int
5655
{
57-
$this->setQueueConsumerOptions($this->consumer, $input);
56+
// QueueConsumer must be pre configured outside of the command!
57+
$consumer = $this->consumerRegistry->get($input->getOption('transport'));
58+
59+
$this->setQueueConsumerOptions($consumer, $input);
5860

5961
$extensions = $this->getLimitsExtensions($input, $output);
6062
array_unshift($extensions, new LoggerExtension(new ConsoleLogger($output)));
6163

6264
$runtimeExtensions = new ChainExtension($extensions);
6365

64-
$this->consumer->consume($runtimeExtensions);
66+
$consumer->consume($runtimeExtensions);
67+
68+
return null;
6569
}
6670
}

Symfony/Consumption/ContainerAwareConsumeMessagesCommand.php

Lines changed: 0 additions & 101 deletions
This file was deleted.
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Enqueue\Symfony\Consumption;
6+
7+
use Enqueue\Consumption\QueueConsumerInterface;
8+
use Enqueue\Consumption\QueueConsumerRegistryInterface;
9+
use Psr\Container\ContainerInterface;
10+
11+
final class ContainerQueueConsumerRegistry implements QueueConsumerRegistryInterface
12+
{
13+
/**
14+
* @var ContainerInterface
15+
*/
16+
private $locator;
17+
18+
public function __construct(ContainerInterface $locator)
19+
{
20+
$this->locator = $locator;
21+
}
22+
23+
public function get(string $name): QueueConsumerInterface
24+
{
25+
if (false == $this->locator->has($name)) {
26+
throw new \LogicException(sprintf('Service locator does not have a queue consumer with name "%s".', $name));
27+
}
28+
29+
return $this->locator->get($name);
30+
}
31+
}

0 commit comments

Comments
 (0)