Skip to content

Commit 821ff2a

Browse files
authored
Merge pull request #9 from php-enqueue/0.9
0.9 version
2 parents 5906753 + 9b01008 commit 821ff2a

17 files changed

+240
-285
lines changed

composer.json

+10-9
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,18 @@
55
"keywords": ["messaging", "queue", "laravel"],
66
"license": "MIT",
77
"require": {
8-
"php": ">=5.6",
9-
"illuminate/queue": "^5.4",
10-
"queue-interop/queue-interop": "^0.6",
11-
"queue-interop/amqp-interop": "^0.7",
12-
"enqueue/amqp-tools": "^0.8"
8+
"php": ">=7.1",
9+
"illuminate/queue": "^5.6",
10+
"queue-interop/amqp-interop": "0.8.x-dev",
11+
"queue-interop/queue-interop": "0.7.x-dev",
12+
"enqueue/enqueue": "0.9.x-dev",
13+
"enqueue/dsn": "0.9.x-dev"
1314
},
1415
"require-dev": {
1516
"phpunit/phpunit": "~5.5",
16-
"enqueue/enqueue": "^0.8@dev",
17-
"enqueue/null": "^0.8@dev",
18-
"enqueue/test": "^0.8@dev"
17+
"enqueue/enqueue": "0.9.x-dev",
18+
"enqueue/null": "0.9.x-dev",
19+
"enqueue/test": "0.9.x-dev"
1920
},
2021
"autoload": {
2122
"psr-4": { "Enqueue\\LaravelQueue\\": "src/" },
@@ -33,7 +34,7 @@
3334
]
3435
},
3536
"branch-alias": {
36-
"dev-master": "0.8.x-dev"
37+
"dev-master": "0.9.x-dev"
3738
}
3839
}
3940
}

src/AmqpConnector.php

-39
This file was deleted.

src/AmqpQueue.php

+7-7
Original file line numberDiff line numberDiff line change
@@ -5,18 +5,18 @@
55
use Interop\Amqp\AmqpContext;
66

77
/**
8-
* @method AmqpContext getPsrContext()
8+
* @method AmqpContext getQueueInteropContext()
99
*/
1010
class AmqpQueue extends Queue
1111
{
1212
/**
1313
* {@inheritdoc}
1414
*
15-
* @param AmqpContext $psrContext
15+
* @param AmqpContext $amqpContext
1616
*/
17-
public function __construct(AmqpContext $psrContext, $queueName, $timeToRun)
17+
public function __construct(AmqpContext $amqpContext, $queueName, $timeToRun)
1818
{
19-
parent::__construct($psrContext, $queueName, $timeToRun);
19+
parent::__construct($amqpContext, $queueName, $timeToRun);
2020
}
2121

2222
/**
@@ -54,9 +54,9 @@ public function pop($queue = null)
5454
*/
5555
protected function declareQueue($queue = null)
5656
{
57-
$psrQueue = $this->getQueue($queue);
58-
$psrQueue->addFlag(\Interop\Amqp\AmqpQueue::FLAG_DURABLE);
57+
$interopQueue = $this->getQueue($queue);
58+
$interopQueue->addFlag(\Interop\Amqp\AmqpQueue::FLAG_DURABLE);
5959

60-
$this->getPsrContext()->declareQueue($psrQueue);
60+
$this->getQueueInteropContext()->declareQueue($queue);
6161
}
6262
}

src/Command/ConsumeCommand.php

+19
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
<?php
2+
namespace Enqueue\LaravelQueue\Command;
3+
4+
use Enqueue\Container\Container;
5+
use Enqueue\SimpleClient\SimpleClient;
6+
7+
class ConsumeCommand extends \Enqueue\Symfony\Client\ConsumeCommand
8+
{
9+
public function __construct(SimpleClient $client)
10+
{
11+
$container = new Container([
12+
'queue_consumer' => $client->getQueueConsumer(),
13+
'driver' => $client->getDriver(),
14+
'processor' => $client->getDelegateProcessor()
15+
]);
16+
17+
parent::__construct($container, 'queue_consumer', 'driver', 'processor');
18+
}
19+
}

src/Command/ConsumeMessagesCommand.php

-17
This file was deleted.

src/Command/ProduceCommand.php

+17
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
<?php
2+
namespace Enqueue\LaravelQueue\Command;
3+
4+
use Enqueue\Container\Container;
5+
use Enqueue\SimpleClient\SimpleClient;
6+
7+
class ProduceCommand extends \Enqueue\Symfony\Client\ProduceCommand
8+
{
9+
public function __construct(SimpleClient $client)
10+
{
11+
$container = new Container([
12+
'producer' => $client->getProducer(),
13+
]);
14+
15+
parent::__construct($container, 'producer');
16+
}
17+
}

src/Command/ProduceMessageCommand.php

-12
This file was deleted.

src/Command/QueuesCommand.php

-12
This file was deleted.

src/Command/RoutesCommand.php

+17
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
<?php
2+
namespace Enqueue\LaravelQueue\Command;
3+
4+
use Enqueue\Container\Container;
5+
use Enqueue\SimpleClient\SimpleClient;
6+
7+
class RoutesCommand extends \Enqueue\Symfony\Client\RoutesCommand
8+
{
9+
public function __construct(SimpleClient $client)
10+
{
11+
$container = new Container([
12+
'driver' => $client->getDriver(),
13+
]);
14+
15+
parent::__construct($container, 'driver');
16+
}
17+
}

src/Command/SetupBrokerCommand.php

+6-1
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,17 @@
22
namespace Enqueue\LaravelQueue\Command;
33

44

5+
use Enqueue\Container\Container;
56
use Enqueue\SimpleClient\SimpleClient;
67

78
class SetupBrokerCommand extends \Enqueue\Symfony\Client\SetupBrokerCommand
89
{
910
public function __construct(SimpleClient $client)
1011
{
11-
parent::__construct($client->getDriver());
12+
$container = new Container([
13+
'driver' => $client->getDriver(),
14+
]);
15+
16+
parent::__construct($container, 'driver');
1217
}
1318
}

src/Command/TopicsCommand.php

-12
This file was deleted.

src/Connector.php

+35-20
Original file line numberDiff line numberDiff line change
@@ -2,39 +2,54 @@
22

33
namespace Enqueue\LaravelQueue;
44

5+
use Enqueue\AmqpTools\DelayStrategy;
6+
use Enqueue\AmqpTools\DelayStrategyAware;
7+
use Enqueue\AmqpTools\RabbitMqDelayPluginDelayStrategy;
8+
use Enqueue\AmqpTools\RabbitMqDlxDelayStrategy;
9+
use Enqueue\ConnectionFactoryFactory;
10+
use Enqueue\ConnectionFactoryFactoryInterface;
11+
use Illuminate\Contracts\Queue\Queue as QueueContract;
512
use Illuminate\Queue\Connectors\ConnectorInterface;
6-
use Interop\Queue\PsrConnectionFactory;
13+
use Interop\Amqp\AmqpContext;
714

815
class Connector implements ConnectorInterface
916
{
10-
/**
11-
* {@inheritdoc}
12-
*/
13-
public function connect(array $config)
17+
public function connect(array $config): QueueContract
1418
{
1519
$config = array_replace([
16-
'connection_factory_class' => null,
20+
'dsn' => null,
21+
'factory_class' => null,
1722
'queue' => 'default',
1823
'time_to_run' => 0,
1924
], $config);
2025

21-
if (empty($config['connection_factory_class'])) {
22-
throw new \LogicException('The "connection_factory_class" option is required');
23-
}
26+
$queue = $config['queue'];
27+
$timeToRum = $config['time_to_run'];
28+
$connectionFactoryFactoryClass = $config['factory_class'] ?? ConnectionFactoryFactory::class;
2429

25-
$factoryClass = $config['connection_factory_class'];
26-
if (false == class_exists($factoryClass)) {
27-
throw new \LogicException(sprintf('The "connection_factory_class" option "%s" is not a class', $factoryClass));
28-
}
30+
unset($config['factory_class']);
2931

30-
$rc = new \ReflectionClass($factoryClass);
31-
if (false == $rc->implementsInterface(PsrConnectionFactory::class)) {
32-
throw new \LogicException(sprintf('The "connection_factory_class" option must contain a class that implements "%s" but it is not', PsrConnectionFactory::class));
33-
}
32+
/** @var ConnectionFactoryFactoryInterface $factory */
33+
$factory = new $connectionFactoryFactoryClass();
34+
$connection = $factory->create($config);
35+
$context = $connection->createContext();
3436

35-
/** @var PsrConnectionFactory $factory */
36-
$factory = new $factoryClass($config);
37+
if ($context instanceof AmqpContext) {
38+
$config = array_replace(['delay_strategy' => 'rabbitmq_dlx'], $config);
39+
40+
if ($context instanceof DelayStrategyAware && 'rabbitmq_dlx' == $config['delay_strategy']) {
41+
$context->setDelayStrategy(new RabbitMqDlxDelayStrategy());
42+
}
43+
if ($context instanceof DelayStrategyAware && 'rabbitmq_delay_plugin' == $config['delay_strategy']) {
44+
$context->setDelayStrategy(new RabbitMqDelayPluginDelayStrategy());
45+
}
46+
if ($context instanceof DelayStrategyAware && $config['delay_strategy'] instanceof DelayStrategy) {
47+
$context->setDelayStrategy($config['delay_strategy']);
48+
}
49+
50+
return new AmqpQueue($context, $queue, $timeToRum);
51+
}
3752

38-
return new Queue($factory->createContext(), $config['queue'], $config['time_to_run']);
53+
return new Queue($context, $queue, $timeToRum);
3954
}
4055
}

src/EnqueueServiceProvider.php

+6-14
Original file line numberDiff line numberDiff line change
@@ -2,28 +2,21 @@
22

33
namespace Enqueue\LaravelQueue;
44

5-
use Enqueue\LaravelQueue\Command\ConsumeMessagesCommand;
6-
use Enqueue\LaravelQueue\Command\ProduceMessageCommand;
7-
use Enqueue\LaravelQueue\Command\QueuesCommand;
5+
use Enqueue\LaravelQueue\Command\ConsumeCommand;
6+
use Enqueue\LaravelQueue\Command\ProduceCommand;
7+
use Enqueue\LaravelQueue\Command\RoutesCommand;
88
use Enqueue\LaravelQueue\Command\SetupBrokerCommand;
9-
use Enqueue\LaravelQueue\Command\TopicsCommand;
109
use Enqueue\SimpleClient\SimpleClient;
1110
use Illuminate\Queue\QueueManager;
1211
use Illuminate\Support\ServiceProvider;
1312

1413
class EnqueueServiceProvider extends ServiceProvider
1514
{
16-
/**
17-
* {@inheritdoc}
18-
*/
1915
public function boot()
2016
{
2117
$this->bootInteropQueueDriver();
2218
}
2319

24-
/**
25-
* {@inheritdoc}
26-
*/
2720
public function register()
2821
{
2922
$this->registerClient();
@@ -49,10 +42,9 @@ private function registerClient()
4942
if ($this->app->runningInConsole()) {
5043
$this->commands([
5144
SetupBrokerCommand::class,
52-
ProduceMessageCommand::class,
53-
QueuesCommand::class,
54-
TopicsCommand::class,
55-
ConsumeMessagesCommand::class,
45+
ProduceCommand::class,
46+
RoutesCommand::class,
47+
ConsumeCommand::class,
5648
]);
5749
}
5850
}

0 commit comments

Comments
 (0)