From 9a2c2a64a309f6eb582657caae6cc47481844de3 Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Mon, 7 Aug 2017 15:47:06 +0300 Subject: [PATCH 1/2] Amqp queue --- composer.json | 6 +- src/AmqpConnector.php | 21 +++++++ src/AmqpQueue.php | 62 +++++++++++++++++++ .../Command}/ConsumeMessagesCommand.php | 0 .../Command}/ProduceMessageCommand.php | 0 {Command => src/Command}/QueuesCommand.php | 0 .../Command}/SetupBrokerCommand.php | 0 {Command => src/Command}/TopicsCommand.php | 0 Connector.php => src/Connector.php | 0 .../EnqueueServiceProvider.php | 4 ++ Job.php => src/Job.php | 0 Queue.php => src/Queue.php | 3 +- {Tests => src/Tests}/ConnectorTest.php | 0 .../Tests}/EnqueueServiceProviderTest.php | 0 {Tests => src/Tests}/JobTest.php | 0 {Tests => src/Tests}/QueueTest.php | 0 16 files changed, 93 insertions(+), 3 deletions(-) create mode 100644 src/AmqpConnector.php create mode 100644 src/AmqpQueue.php rename {Command => src/Command}/ConsumeMessagesCommand.php (100%) rename {Command => src/Command}/ProduceMessageCommand.php (100%) rename {Command => src/Command}/QueuesCommand.php (100%) rename {Command => src/Command}/SetupBrokerCommand.php (100%) rename {Command => src/Command}/TopicsCommand.php (100%) rename Connector.php => src/Connector.php (100%) rename EnqueueServiceProvider.php => src/EnqueueServiceProvider.php (94%) rename Job.php => src/Job.php (100%) rename Queue.php => src/Queue.php (98%) rename {Tests => src/Tests}/ConnectorTest.php (100%) rename {Tests => src/Tests}/EnqueueServiceProviderTest.php (100%) rename {Tests => src/Tests}/JobTest.php (100%) rename {Tests => src/Tests}/QueueTest.php (100%) diff --git a/composer.json b/composer.json index eb2e6e0..647dda6 100644 --- a/composer.json +++ b/composer.json @@ -13,7 +13,9 @@ "require": { "php": ">=5.6", "illuminate/queue": "^5.4", - "queue-interop/queue-interop": "^0.6@dev" + "queue-interop/queue-interop": "^0.6@dev", + "queue-interop/amqp-interop": "^0.6@dev", + "enqueue/amqp-tools": "^0.7@dev" }, "require-dev": { "phpunit/phpunit": "~5.5", @@ -22,7 +24,7 @@ "enqueue/test": "^0.7@dev" }, "autoload": { - "psr-4": { "Enqueue\\LaravelQueue\\": "" }, + "psr-4": { "Enqueue\\LaravelQueue\\": "src/" }, "exclude-from-classmap": [ "/Tests/" ] diff --git a/src/AmqpConnector.php b/src/AmqpConnector.php new file mode 100644 index 0000000..d371b20 --- /dev/null +++ b/src/AmqpConnector.php @@ -0,0 +1,21 @@ +getPsrContext() instanceof AmqpContext) { + throw new \LogicException(sprintf('The context must be instance of "%s" but got "%s"', AmqpContext::class, get_class($queue->getPsrContext())); + } + + // TODO set delay strategy. + + return $queue; + } +} diff --git a/src/AmqpQueue.php b/src/AmqpQueue.php new file mode 100644 index 0000000..e06bfd1 --- /dev/null +++ b/src/AmqpQueue.php @@ -0,0 +1,62 @@ +declareQueue($queue); + + parent::pushRaw($payload, $queue, $options); + } + + /** + * {@inheritdoc} + */ + public function later($delay, $job, $data = '', $queue = null) + { + $this->declareQueue($queue); + + return parent::later($delay, $job, $data, $queue); + } + + /** + * {@inheritdoc} + */ + public function pop($queue = null) + { + $this->declareQueue($queue); + + return parent::pop($queue); + } + + /** + * @param string|null $queue + */ + protected function declareQueue($queue = null) + { + $psrQueue = $this->getPsrContext()->createQueue($this->getQueue($queue)); + $psrQueue->addFlag(\Interop\Amqp\AmqpQueue::FLAG_DURABLE); + + $this->getPsrContext()->declareQueue($psrQueue); + } +} diff --git a/Command/ConsumeMessagesCommand.php b/src/Command/ConsumeMessagesCommand.php similarity index 100% rename from Command/ConsumeMessagesCommand.php rename to src/Command/ConsumeMessagesCommand.php diff --git a/Command/ProduceMessageCommand.php b/src/Command/ProduceMessageCommand.php similarity index 100% rename from Command/ProduceMessageCommand.php rename to src/Command/ProduceMessageCommand.php diff --git a/Command/QueuesCommand.php b/src/Command/QueuesCommand.php similarity index 100% rename from Command/QueuesCommand.php rename to src/Command/QueuesCommand.php diff --git a/Command/SetupBrokerCommand.php b/src/Command/SetupBrokerCommand.php similarity index 100% rename from Command/SetupBrokerCommand.php rename to src/Command/SetupBrokerCommand.php diff --git a/Command/TopicsCommand.php b/src/Command/TopicsCommand.php similarity index 100% rename from Command/TopicsCommand.php rename to src/Command/TopicsCommand.php diff --git a/Connector.php b/src/Connector.php similarity index 100% rename from Connector.php rename to src/Connector.php diff --git a/EnqueueServiceProvider.php b/src/EnqueueServiceProvider.php similarity index 94% rename from EnqueueServiceProvider.php rename to src/EnqueueServiceProvider.php index 95dbc50..a7f73e5 100644 --- a/EnqueueServiceProvider.php +++ b/src/EnqueueServiceProvider.php @@ -65,5 +65,9 @@ private function bootInteropQueueDriver() $manager->addConnector('interop', function () { return new Connector(); }); + + $manager->addConnector('amqp_interop', function () { + return new AmqpConnector(); + }); } } diff --git a/Job.php b/src/Job.php similarity index 100% rename from Job.php rename to src/Job.php diff --git a/Queue.php b/src/Queue.php similarity index 98% rename from Queue.php rename to src/Queue.php index 590a23d..fae0e3f 100644 --- a/Queue.php +++ b/src/Queue.php @@ -17,10 +17,11 @@ class Queue extends BaseQueue implements QueueContract * @var int */ protected $timeToRun; + /** * @var PsrContext */ - private $psrContext; + protected $psrContext; /** * @param PsrContext $psrContext diff --git a/Tests/ConnectorTest.php b/src/Tests/ConnectorTest.php similarity index 100% rename from Tests/ConnectorTest.php rename to src/Tests/ConnectorTest.php diff --git a/Tests/EnqueueServiceProviderTest.php b/src/Tests/EnqueueServiceProviderTest.php similarity index 100% rename from Tests/EnqueueServiceProviderTest.php rename to src/Tests/EnqueueServiceProviderTest.php diff --git a/Tests/JobTest.php b/src/Tests/JobTest.php similarity index 100% rename from Tests/JobTest.php rename to src/Tests/JobTest.php diff --git a/Tests/QueueTest.php b/src/Tests/QueueTest.php similarity index 100% rename from Tests/QueueTest.php rename to src/Tests/QueueTest.php From 4f9c4a5bf7fba7ebc64cb326dc2cbe8c71fa100b Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Mon, 7 Aug 2017 16:43:52 +0300 Subject: [PATCH 2/2] amqp delay message with dlx strategy. --- src/AmqpConnector.php | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/src/AmqpConnector.php b/src/AmqpConnector.php index d371b20..1861043 100644 --- a/src/AmqpConnector.php +++ b/src/AmqpConnector.php @@ -2,6 +2,8 @@ namespace Enqueue\LaravelQueue; +use Enqueue\AmqpTools\DelayStrategyAware; +use Enqueue\AmqpTools\RabbitMqDlxDelayStrategy; use Interop\Amqp\AmqpContext; class AmqpConnector extends Connector @@ -10,11 +12,15 @@ public function connect(array $config) { $queue = parent::connect($config); - if (false == $queue->getPsrContext() instanceof AmqpContext) { - throw new \LogicException(sprintf('The context must be instance of "%s" but got "%s"', AmqpContext::class, get_class($queue->getPsrContext())); + /** @var AmqpContext $amqpContext */ + $amqpContext = $queue->getPsrContext(); + if (false == $amqpContext instanceof AmqpContext) { + throw new \LogicException(sprintf('The context must be instance of "%s" but got "%s"', AmqpContext::class, get_class($queue->getPsrContext()))); } - // TODO set delay strategy. + if ($amqpContext instanceof DelayStrategyAware) { + $amqpContext->setDelayStrategy(new RabbitMqDlxDelayStrategy()); + } return $queue; }