diff --git a/README.md b/README.md index 3f528d4..3d4e11c 100644 --- a/README.md +++ b/README.md @@ -7,6 +7,7 @@ It also supports extended AMQP features such as queue declaration and message de The package allows you to use queue interop transport the [laravel way](https://github.com/php-enqueue/enqueue-dev/blob/master/docs/laravel/queues.md) as well as integrates the [enqueue simple client](https://github.com/php-enqueue/enqueue-dev/blob/master/docs/laravel/quick_tour.md#enqueue-simple-client). +To make message [persistent](https://www.rabbitmq.com/persistence-conf.html) add to Laravel Job class field `public $persistent = true;` ## Resources * [Documentation](https://github.com/php-enqueue/enqueue-dev/blob/master/docs/index.md) diff --git a/src/Queue.php b/src/Queue.php index fae0e3f..dde7388 100644 --- a/src/Queue.php +++ b/src/Queue.php @@ -5,6 +5,7 @@ use Illuminate\Contracts\Queue\Queue as QueueContract; use Illuminate\Queue\Queue as BaseQueue; use Interop\Queue\PsrContext; +use Interop\Amqp\Impl\AmqpMessage; class Queue extends BaseQueue implements QueueContract { @@ -56,9 +57,15 @@ public function push($job, $data = '', $queue = null) */ public function pushRaw($payload, $queue = null, array $options = []) { + $message = $this->psrContext->createMessage($payload); + + if ($message instanceof AmqpMessage) { + $message->setDeliveryMode(\Interop\Amqp\AmqpMessage::DELIVERY_MODE_PERSISTENT); + } + return $this->psrContext->createProducer()->send( $this->getQueue($queue), - $this->psrContext->createMessage($payload) + $message ); } @@ -69,11 +76,13 @@ public function later($delay, $job, $data = '', $queue = null) { $message = $this->psrContext->createMessage($this->createPayload($job, $data)); + if ($message instanceof AmqpMessage) { + $message->setDeliveryMode(\Interop\Amqp\AmqpMessage::DELIVERY_MODE_PERSISTENT); + } + return $this->psrContext->createProducer() ->setDeliveryDelay($this->secondsUntil($delay) * 1000) - - ->send($this->getQueue($queue), $message) - ; + ->send($this->getQueue($queue), $message); } /**