Skip to content

Commit 552b6d9

Browse files
committed
add queue consumer based worker.
1 parent 6179026 commit 552b6d9

File tree

3 files changed

+180
-7
lines changed

3 files changed

+180
-7
lines changed

src/EnqueueServiceProvider.php

+7
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
use Enqueue\LaravelQueue\Command\RoutesCommand;
88
use Enqueue\LaravelQueue\Command\SetupBrokerCommand;
99
use Enqueue\SimpleClient\SimpleClient;
10+
use Illuminate\Contracts\Debug\ExceptionHandler;
1011
use Illuminate\Queue\QueueManager;
1112
use Illuminate\Support\ServiceProvider;
1213

@@ -61,5 +62,11 @@ private function bootInteropQueueDriver()
6162
$manager->addConnector('amqp_interop', function () {
6263
return new AmqpConnector();
6364
});
65+
66+
$this->app->extend('queue.worker', function ($worker, $app) {
67+
return new Worker(
68+
$app['queue'], $app['events'], $app[ExceptionHandler::class]
69+
);
70+
});
6471
}
6572
}

src/Queue.php

+14-7
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,10 @@
44

55
use Illuminate\Contracts\Queue\Queue as QueueContract;
66
use Illuminate\Queue\Queue as BaseQueue;
7+
use Interop\Queue\Consumer;
78
use Interop\Queue\Context;
89
use Interop\Amqp\Impl\AmqpMessage;
10+
use Interop\Queue\Message;
911

1012
class Queue extends BaseQueue implements QueueContract
1113
{
@@ -91,15 +93,20 @@ public function pop($queue = null)
9193

9294
$consumer = $this->context->createConsumer($queue);
9395
if ($message = $consumer->receive(1000)) { // 1 sec
94-
return new Job(
95-
$this->container,
96-
$this->context,
97-
$consumer,
98-
$message,
99-
$this->connectionName
100-
);
96+
return $this->convertMessageToJob($message, $consumer);
10197
}
10298
}
99+
100+
public function convertMessageToJob(Message $message, Consumer $consumer): Job
101+
{
102+
return new Job(
103+
$this->container,
104+
$this->context,
105+
$consumer,
106+
$message,
107+
$this->connectionName
108+
);
109+
}
103110

104111
/**
105112
* Get the queue or return the default.

src/Worker.php

+159
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
<?php
2+
namespace Enqueue\LaravelQueue;
3+
4+
use Enqueue\Consumption\ChainExtension;
5+
use Enqueue\Consumption\Context\MessageReceived;
6+
use Enqueue\Consumption\Context\PostMessageReceived;
7+
use Enqueue\Consumption\Context\PreConsume;
8+
use Enqueue\Consumption\Context\Start;
9+
use Enqueue\Consumption\Extension\LimitConsumedMessagesExtension;
10+
use Enqueue\Consumption\MessageReceivedExtensionInterface;
11+
use Enqueue\Consumption\PostMessageReceivedExtensionInterface;
12+
use Enqueue\Consumption\PreConsumeExtensionInterface;
13+
use Enqueue\Consumption\QueueConsumer;
14+
use Enqueue\Consumption\Result;
15+
use Enqueue\Consumption\StartExtensionInterface;
16+
use Enqueue\LaravelQueue\Queue;
17+
use Illuminate\Queue\WorkerOptions;
18+
19+
class Worker extends \Illuminate\Queue\Worker implements
20+
StartExtensionInterface,
21+
PreConsumeExtensionInterface,
22+
MessageReceivedExtensionInterface,
23+
PostMessageReceivedExtensionInterface
24+
{
25+
protected $connectionName;
26+
27+
protected $queueNames;
28+
29+
protected $queue;
30+
31+
protected $options;
32+
33+
protected $lastRestart;
34+
35+
protected $interop = false;
36+
37+
protected $stopped = false;
38+
39+
protected $job;
40+
41+
public function daemon($connectionName, $queueNames, WorkerOptions $options)
42+
{
43+
var_dump(123123);
44+
// TODO: sleep when no job
45+
46+
$this->connectionName = $connectionName;
47+
$this->queueNames = $queueNames;
48+
$this->options = $options;
49+
50+
/** @var Queue $queue */
51+
$this->queue = $this->getManager()->connection($connectionName);
52+
$this->interop = $this->queue instanceof Queue;
53+
54+
if (false == $this->interop) {
55+
parent::daemon($connectionName, $this->queue, $options);
56+
}
57+
58+
$context = $this->queue->getQueueInteropContext();
59+
$queueConsumer = new QueueConsumer($context, new ChainExtension([$this]));
60+
foreach (explode(',', $queueNames) as $queueName) {
61+
$queueConsumer->bindCallback($queueName, function() {
62+
$this->runJob($this->job, $this->connectionName, $this->options);
63+
64+
return Result::ALREADY_ACKNOWLEDGED;
65+
});
66+
}
67+
68+
$queueConsumer->consume();
69+
}
70+
71+
public function runNextJob($connectionName, $queueNames, WorkerOptions $options)
72+
{
73+
$this->connectionName = $connectionName;
74+
$this->queueNames = $queueNames;
75+
$this->options = $options;
76+
77+
/** @var Queue $queue */
78+
$this->queue = $this->getManager()->connection($connectionName);
79+
$this->interop = $this->queue instanceof Queue;
80+
81+
if (false == $this->interop) {
82+
parent::daemon($connectionName, $this->queue, $options);
83+
}
84+
85+
$context = $this->queue->getQueueInteropContext();
86+
87+
$queueConsumer = new QueueConsumer($context, new ChainExtension([
88+
$this,
89+
new LimitConsumedMessagesExtension(1),
90+
]));
91+
92+
foreach (explode(',', $queueNames) as $queueName) {
93+
$queueConsumer->bindCallback($queueName, function() {
94+
$this->runJob($this->job, $this->connectionName, $this->options);
95+
96+
return Result::ALREADY_ACKNOWLEDGED;
97+
});
98+
}
99+
100+
$queueConsumer->consume();
101+
}
102+
103+
public function onStart(Start $context): void
104+
{
105+
if ($this->supportsAsyncSignals()) {
106+
$this->listenForSignals();
107+
}
108+
109+
$this->lastRestart = $this->getTimestampOfLastQueueRestart();
110+
111+
if ($this->stopped) {
112+
$context->interruptExecution();
113+
}
114+
}
115+
116+
public function onPreConsume(PreConsume $context): void
117+
{
118+
if (! $this->daemonShouldRun($this->options, $this->connectionName, $this->queueNames)) {
119+
$this->pauseWorker($this->options, $this->lastRestart);
120+
}
121+
122+
if ($this->stopped) {
123+
$context->interruptExecution();
124+
}
125+
}
126+
127+
public function onMessageReceived(MessageReceived $context): void
128+
{
129+
$this->job = $this->queue->convertMessageToJob(
130+
$context->getMessage(),
131+
$context->getConsumer()
132+
);
133+
134+
if ($this->supportsAsyncSignals()) {
135+
$this->registerTimeoutHandler($this->job, $this->options);
136+
}
137+
}
138+
139+
public function onPostMessageReceived(PostMessageReceived $context): void
140+
{
141+
$this->stopIfNecessary($this->options, $this->lastRestart, $this->job);
142+
143+
if ($this->stopped) {
144+
$context->interruptExecution();
145+
}
146+
}
147+
148+
public function stop($status = 0)
149+
{
150+
if ($this->interop) {
151+
$this->stopped = true;
152+
153+
return;
154+
}
155+
156+
parent::stop($status);
157+
}
158+
}
159+

0 commit comments

Comments
 (0)