From 34b0e8e48749bc3f4b403fcd6135dc2fe2db4f46 Mon Sep 17 00:00:00 2001 From: Dariusz Date: Tue, 20 Mar 2018 11:46:28 +0100 Subject: [PATCH 1/4] Add persisten functionality --- README.md | 1 + src/Queue.php | 23 +++++++++++++++++++---- 2 files changed, 20 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 3f528d4..bcf85a0 100644 --- a/README.md +++ b/README.md @@ -7,6 +7,7 @@ It also supports extended AMQP features such as queue declaration and message de The package allows you to use queue interop transport the [laravel way](https://github.com/php-enqueue/enqueue-dev/blob/master/docs/laravel/queues.md) as well as integrates the [enqueue simple client](https://github.com/php-enqueue/enqueue-dev/blob/master/docs/laravel/quick_tour.md#enqueue-simple-client). +To make message [persistent](https://www.rabbitmq.com/persistence-conf.html) add to Laravel Job class field `protected $persistent = true;` ## Resources * [Documentation](https://github.com/php-enqueue/enqueue-dev/blob/master/docs/index.md) diff --git a/src/Queue.php b/src/Queue.php index fae0e3f..3864355 100644 --- a/src/Queue.php +++ b/src/Queue.php @@ -23,6 +23,11 @@ class Queue extends BaseQueue implements QueueContract */ protected $psrContext; + /** + * @var boolean + */ + protected $persistent = false; + /** * @param PsrContext $psrContext * @param string $queueName @@ -48,6 +53,8 @@ public function size($queue = null) */ public function push($job, $data = '', $queue = null) { + $this->persistent = $job->persistent ?? false; + return $this->pushRaw($this->createPayload($job, $data), $queue); } @@ -56,9 +63,15 @@ public function push($job, $data = '', $queue = null) */ public function pushRaw($payload, $queue = null, array $options = []) { + $message = $this->psrContext->createMessage($payload); + + if ($this->persistent) { + $message->setDeliveryMode(\Interop\Amqp\AmqpMessage::DELIVERY_MODE_PERSISTENT); + } + return $this->psrContext->createProducer()->send( $this->getQueue($queue), - $this->psrContext->createMessage($payload) + $message ); } @@ -69,11 +82,13 @@ public function later($delay, $job, $data = '', $queue = null) { $message = $this->psrContext->createMessage($this->createPayload($job, $data)); + if (isset($job->persistent) && $job->persistent) { + $message->setDeliveryMode(\Interop\Amqp\AmqpMessage::DELIVERY_MODE_PERSISTENT); + } + return $this->psrContext->createProducer() ->setDeliveryDelay($this->secondsUntil($delay) * 1000) - - ->send($this->getQueue($queue), $message) - ; + ->send($this->getQueue($queue), $message); } /** From 68f2da17926863db08ec5cfd54a9fcaf5a282636 Mon Sep 17 00:00:00 2001 From: Dariusz Date: Wed, 21 Mar 2018 09:07:20 +0100 Subject: [PATCH 2/4] update readme --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index bcf85a0..3d4e11c 100644 --- a/README.md +++ b/README.md @@ -7,7 +7,7 @@ It also supports extended AMQP features such as queue declaration and message de The package allows you to use queue interop transport the [laravel way](https://github.com/php-enqueue/enqueue-dev/blob/master/docs/laravel/queues.md) as well as integrates the [enqueue simple client](https://github.com/php-enqueue/enqueue-dev/blob/master/docs/laravel/quick_tour.md#enqueue-simple-client). -To make message [persistent](https://www.rabbitmq.com/persistence-conf.html) add to Laravel Job class field `protected $persistent = true;` +To make message [persistent](https://www.rabbitmq.com/persistence-conf.html) add to Laravel Job class field `public $persistent = true;` ## Resources * [Documentation](https://github.com/php-enqueue/enqueue-dev/blob/master/docs/index.md) From c42e72062ff889c140cfb2440d147b92e7c07008 Mon Sep 17 00:00:00 2001 From: Dariusz Date: Thu, 29 Mar 2018 10:29:01 +0200 Subject: [PATCH 3/4] Always use persistent queues for AMQP/RabbitMQ transport --- src/Queue.php | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/src/Queue.php b/src/Queue.php index 3864355..d8ebe08 100644 --- a/src/Queue.php +++ b/src/Queue.php @@ -23,11 +23,6 @@ class Queue extends BaseQueue implements QueueContract */ protected $psrContext; - /** - * @var boolean - */ - protected $persistent = false; - /** * @param PsrContext $psrContext * @param string $queueName @@ -53,8 +48,6 @@ public function size($queue = null) */ public function push($job, $data = '', $queue = null) { - $this->persistent = $job->persistent ?? false; - return $this->pushRaw($this->createPayload($job, $data), $queue); } @@ -65,7 +58,7 @@ public function pushRaw($payload, $queue = null, array $options = []) { $message = $this->psrContext->createMessage($payload); - if ($this->persistent) { + if ($message instanceof \Interop\Amqp\Impl\AmqpMessage) { $message->setDeliveryMode(\Interop\Amqp\AmqpMessage::DELIVERY_MODE_PERSISTENT); } @@ -82,7 +75,7 @@ public function later($delay, $job, $data = '', $queue = null) { $message = $this->psrContext->createMessage($this->createPayload($job, $data)); - if (isset($job->persistent) && $job->persistent) { + if ($message instanceof \Interop\Amqp\Impl\AmqpMessage) { $message->setDeliveryMode(\Interop\Amqp\AmqpMessage::DELIVERY_MODE_PERSISTENT); } From 8b8cce9c390a0d4126794026fb7a6c3cb75b6d1c Mon Sep 17 00:00:00 2001 From: Dariusz Date: Thu, 29 Mar 2018 10:43:43 +0200 Subject: [PATCH 4/4] Use namespace --- src/Queue.php | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/Queue.php b/src/Queue.php index d8ebe08..dde7388 100644 --- a/src/Queue.php +++ b/src/Queue.php @@ -5,6 +5,7 @@ use Illuminate\Contracts\Queue\Queue as QueueContract; use Illuminate\Queue\Queue as BaseQueue; use Interop\Queue\PsrContext; +use Interop\Amqp\Impl\AmqpMessage; class Queue extends BaseQueue implements QueueContract { @@ -58,7 +59,7 @@ public function pushRaw($payload, $queue = null, array $options = []) { $message = $this->psrContext->createMessage($payload); - if ($message instanceof \Interop\Amqp\Impl\AmqpMessage) { + if ($message instanceof AmqpMessage) { $message->setDeliveryMode(\Interop\Amqp\AmqpMessage::DELIVERY_MODE_PERSISTENT); } @@ -75,7 +76,7 @@ public function later($delay, $job, $data = '', $queue = null) { $message = $this->psrContext->createMessage($this->createPayload($job, $data)); - if ($message instanceof \Interop\Amqp\Impl\AmqpMessage) { + if ($message instanceof AmqpMessage) { $message->setDeliveryMode(\Interop\Amqp\AmqpMessage::DELIVERY_MODE_PERSISTENT); }