diff --git a/README.md b/README.md index 3f528d4..fad2648 100644 --- a/README.md +++ b/README.md @@ -5,7 +5,41 @@ You can use all transports built on top of [queue-interop](https://github.com/queue-interop/queue-interop) including [all supported](https://github.com/php-enqueue/enqueue-dev/tree/master/docs/transport) by Enqueue. It also supports extended AMQP features such as queue declaration and message delaying. -The package allows you to use queue interop transport the [laravel way](https://github.com/php-enqueue/enqueue-dev/blob/master/docs/laravel/queues.md) as well as integrates the [enqueue simple client](https://github.com/php-enqueue/enqueue-dev/blob/master/docs/laravel/quick_tour.md#enqueue-simple-client). +The package allows you to use queue interop transport the [laravel way](https://github.com/php-enqueue/enqueue-dev/blob/master/docs/laravel/queues.md) as well as integrates the [enqueue simple client](https://github.com/php-enqueue/enqueue-dev/blob/master/docs/laravel/quick_tour.md#enqueue-simple-client). + + +## Advantages + +* Supports message delaying, priorities and expiration +* Use DSN to configure transport. 12 factors friendly. +* It brings support of a lot of MQ transport with few lines of integration code: + + * [AMQP(s)](https://github.com/php-enqueue/enqueue-dev/blob/master/docs/transport/amqp.md) based on [PHP AMQP extension](https://github.com/pdezwart/php-amqp). + * [AMQP](https://github.com/php-enqueue/enqueue-dev/blob/master/docs/transport/amqp_bunny.md) based on [bunny](https://github.com/jakubkulhan/bunny). + * [AMQP(s)](https://github.com/php-enqueue/enqueue-dev/blob/master/docs/transport/amqp_lib.md) based on [php-amqplib](https://github.com/php-amqplib/php-amqplib). + * [Beanstalk](https://github.com/php-enqueue/enqueue-dev/blob/master/docs/transport/pheanstalk.md). + * [STOMP](https://github.com/php-enqueue/enqueue-dev/blob/master/docs/transport/stomp.md) + * [Amazon SQS](https://github.com/php-enqueue/enqueue-dev/blob/master/docs/transport/sqs.md) + * [Google PubSub](https://github.com/php-enqueue/enqueue-dev/blob/master/docs/transport/gps.md) + * [Kafka](https://github.com/php-enqueue/enqueue-dev/blob/master/docs/transport/kafka.md) + * [Redis](https://github.com/php-enqueue/enqueue-dev/blob/master/docs/transport/redis.md) + * [Gearman](https://github.com/php-enqueue/enqueue-dev/blob/master/docs/transport/gearman.md) + * [Doctrine DBAL](https://github.com/php-enqueue/enqueue-dev/blob/master/docs/transport/dbal.md) + * [Filesystem](https://github.com/php-enqueue/enqueue-dev/blob/master/docs/transport/filesystem.md) + * [MongoDB](https://github.com/php-enqueue/enqueue-dev/blob/master/docs/transport/mongodb.md) + * [WAMP](https://github.com/php-enqueue/enqueue-dev/blob/master/docs/transport/wamp.md) + * [PHP-FPM](https://github.com/makasim/php-fpm-queue) + * [rabbitmq-cli-consumer-client](https://github.com/makasim/rabbitmq-cli-consumer-client) + +* Consume messages as they arrive from multiple queues. +* You can run fewer work processes and reduce memory usages. +* It uses long pulling whenever possible. It results in zero CPU usages while waiting for messages. +* You can [monitor](https://github.com/php-enqueue/enqueue-dev/blob/master/docs/monitoring.md) any transport, not only redis +* Adds extension points +* AMQP friendly. +* Popular soliution, big and active community around the project +* Supported by a company - Forma-Pro + ## Resources diff --git a/src/EnqueueServiceProvider.php b/src/EnqueueServiceProvider.php index 44f5372..999c8c1 100644 --- a/src/EnqueueServiceProvider.php +++ b/src/EnqueueServiceProvider.php @@ -7,6 +7,7 @@ use Enqueue\LaravelQueue\Command\RoutesCommand; use Enqueue\LaravelQueue\Command\SetupBrokerCommand; use Enqueue\SimpleClient\SimpleClient; +use Illuminate\Contracts\Debug\ExceptionHandler; use Illuminate\Queue\QueueManager; use Illuminate\Support\ServiceProvider; @@ -61,5 +62,11 @@ private function bootInteropQueueDriver() $manager->addConnector('amqp_interop', function () { return new AmqpConnector(); }); + + $this->app->extend('queue.worker', function ($worker, $app) { + return new Worker( + $app['queue'], $app['events'], $app[ExceptionHandler::class] + ); + }); } } diff --git a/src/Queue.php b/src/Queue.php index 637955b..4c17b5b 100644 --- a/src/Queue.php +++ b/src/Queue.php @@ -4,8 +4,10 @@ use Illuminate\Contracts\Queue\Queue as QueueContract; use Illuminate\Queue\Queue as BaseQueue; +use Interop\Queue\Consumer; use Interop\Queue\Context; use Interop\Amqp\Impl\AmqpMessage; +use Interop\Queue\Message; class Queue extends BaseQueue implements QueueContract { @@ -91,15 +93,20 @@ public function pop($queue = null) $consumer = $this->context->createConsumer($queue); if ($message = $consumer->receive(1000)) { // 1 sec - return new Job( - $this->container, - $this->context, - $consumer, - $message, - $this->connectionName - ); + return $this->convertMessageToJob($message, $consumer); } } + + public function convertMessageToJob(Message $message, Consumer $consumer): Job + { + return new Job( + $this->container, + $this->context, + $consumer, + $message, + $this->connectionName + ); + } /** * Get the queue or return the default. diff --git a/src/Worker.php b/src/Worker.php new file mode 100644 index 0000000..e376988 --- /dev/null +++ b/src/Worker.php @@ -0,0 +1,156 @@ +connectionName = $connectionName; + $this->queueNames = $queueNames; + $this->options = $options; + + /** @var Queue $queue */ + $this->queue = $this->getManager()->connection($connectionName); + $this->interop = $this->queue instanceof Queue; + + if (false == $this->interop) { + parent::daemon($connectionName, $this->queue, $options); + } + + $context = $this->queue->getQueueInteropContext(); + $queueConsumer = new QueueConsumer($context, new ChainExtension([$this])); + foreach (explode(',', $queueNames) as $queueName) { + $queueConsumer->bindCallback($queueName, function() { + $this->runJob($this->job, $this->connectionName, $this->options); + + return Result::ALREADY_ACKNOWLEDGED; + }); + } + + $queueConsumer->consume(); + } + + public function runNextJob($connectionName, $queueNames, WorkerOptions $options) + { + $this->connectionName = $connectionName; + $this->queueNames = $queueNames; + $this->options = $options; + + /** @var Queue $queue */ + $this->queue = $this->getManager()->connection($connectionName); + $this->interop = $this->queue instanceof Queue; + + if (false == $this->interop) { + parent::daemon($connectionName, $this->queue, $options); + } + + $context = $this->queue->getQueueInteropContext(); + + $queueConsumer = new QueueConsumer($context, new ChainExtension([ + $this, + new LimitConsumedMessagesExtension(1), + ])); + + foreach (explode(',', $queueNames) as $queueName) { + $queueConsumer->bindCallback($queueName, function() { + $this->runJob($this->job, $this->connectionName, $this->options); + + return Result::ALREADY_ACKNOWLEDGED; + }); + } + + $queueConsumer->consume(); + } + + public function onStart(Start $context): void + { + if ($this->supportsAsyncSignals()) { + $this->listenForSignals(); + } + + $this->lastRestart = $this->getTimestampOfLastQueueRestart(); + + if ($this->stopped) { + $context->interruptExecution(); + } + } + + public function onPreConsume(PreConsume $context): void + { + if (! $this->daemonShouldRun($this->options, $this->connectionName, $this->queueNames)) { + $this->pauseWorker($this->options, $this->lastRestart); + } + + if ($this->stopped) { + $context->interruptExecution(); + } + } + + public function onMessageReceived(MessageReceived $context): void + { + $this->job = $this->queue->convertMessageToJob( + $context->getMessage(), + $context->getConsumer() + ); + + if ($this->supportsAsyncSignals()) { + $this->registerTimeoutHandler($this->job, $this->options); + } + } + + public function onPostMessageReceived(PostMessageReceived $context): void + { + $this->stopIfNecessary($this->options, $this->lastRestart, $this->job); + + if ($this->stopped) { + $context->interruptExecution(); + } + } + + public function stop($status = 0) + { + if ($this->interop) { + $this->stopped = true; + + return; + } + + parent::stop($status); + } +} +