From 3807138a122141ed8d885379813ecd33e4d526ed Mon Sep 17 00:00:00 2001 From: Jeremy Brenner Date: Wed, 14 Oct 2015 16:51:32 -0500 Subject: [PATCH 1/6] Added job queue support --- .../Mongodb/MongodbServiceProvider.php | 9 + .../Mongodb/Queue/MongodbConnector.php | 45 +++ src/Jenssegers/Mongodb/Queue/MongodbJob.php | 139 ++++++++ src/Jenssegers/Mongodb/Queue/MongodbQueue.php | 333 ++++++++++++++++++ 4 files changed, 526 insertions(+) create mode 100644 src/Jenssegers/Mongodb/Queue/MongodbConnector.php create mode 100644 src/Jenssegers/Mongodb/Queue/MongodbJob.php create mode 100644 src/Jenssegers/Mongodb/Queue/MongodbQueue.php diff --git a/src/Jenssegers/Mongodb/MongodbServiceProvider.php b/src/Jenssegers/Mongodb/MongodbServiceProvider.php index 764fb540f..c5fee36e2 100644 --- a/src/Jenssegers/Mongodb/MongodbServiceProvider.php +++ b/src/Jenssegers/Mongodb/MongodbServiceProvider.php @@ -1,6 +1,7 @@ app->resolving('queue', function($queue) + { + $queue->addConnector('mongodb', function() { + return new MongodbConnector($this->app['db']); + }); + }); + } } diff --git a/src/Jenssegers/Mongodb/Queue/MongodbConnector.php b/src/Jenssegers/Mongodb/Queue/MongodbConnector.php new file mode 100644 index 000000000..a6342bfd7 --- /dev/null +++ b/src/Jenssegers/Mongodb/Queue/MongodbConnector.php @@ -0,0 +1,45 @@ +connections = $connections; + } + + /** + * Establish a queue connection. + * + * @param array $config + * @return \Illuminate\Contracts\Queue\Queue + */ + public function connect(array $config) + { + return new MongodbQueue( + $this->connections->connection(Arr::get($config, 'connection')), + $config['table'], + $config['queue'], + Arr::get($config, 'expire', 60) + ); + } +} diff --git a/src/Jenssegers/Mongodb/Queue/MongodbJob.php b/src/Jenssegers/Mongodb/Queue/MongodbJob.php new file mode 100644 index 000000000..dafa23e17 --- /dev/null +++ b/src/Jenssegers/Mongodb/Queue/MongodbJob.php @@ -0,0 +1,139 @@ +job = $job; + $this->queue = $queue; + $this->database = $database; + $this->container = $container; + $this->job->attempts = $this->job->attempts + 1; + } + + /** + * Fire the job. + * + * @return void + */ + public function fire() + { + $this->resolveAndFire(json_decode($this->job->payload, true)); + } + + /** + * Delete the job from the queue. + * + * @return void + */ + public function delete() + { + parent::delete(); + + $this->database->deleteReserved($this->queue, $this->job->_id); + } + + /** + * Release the job back into the queue. + * + * @param int $delay + * @return void + */ + public function release($delay = 0) + { + parent::release($delay); + + $this->delete(); + + $this->database->release($this->queue, $this->job, $delay); + } + + /** + * Get the number of times the job has been attempted. + * + * @return int + */ + public function attempts() + { + return (int) $this->job->attempts; + } + + /** + * Get the job identifier. + * + * @return string + */ + public function getJobId() + { + return $this->job->id; + } + + /** + * Get the raw body string for the job. + * + * @return string + */ + public function getRawBody() + { + return $this->job->payload; + } + + /** + * Get the IoC container instance. + * + * @return \Illuminate\Container\Container + */ + public function getContainer() + { + return $this->container; + } + + /** + * Get the underlying queue driver instance. + * + * @return \Illuminate\Queue\DatabaseQueue + */ + public function getDatabaseQueue() + { + return $this->database; + } + + /** + * Get the underlying database job. + * + * @return \StdClass + */ + public function getDatabaseJob() + { + return $this->job; + } +} diff --git a/src/Jenssegers/Mongodb/Queue/MongodbQueue.php b/src/Jenssegers/Mongodb/Queue/MongodbQueue.php new file mode 100644 index 000000000..167663786 --- /dev/null +++ b/src/Jenssegers/Mongodb/Queue/MongodbQueue.php @@ -0,0 +1,333 @@ +table = $table; + $this->expire = $expire; + $this->default = $default; + $this->database = $database; + } + + /** + * Push a new job onto the queue. + * + * @param string $job + * @param mixed $data + * @param string $queue + * @return void + */ + public function push($job, $data = '', $queue = null) + { + return $this->pushToDatabase(0, $queue, $this->createPayload($job, $data)); + } + + /** + * Push a raw payload onto the queue. + * + * @param string $payload + * @param string $queue + * @param array $options + * @return mixed + */ + public function pushRaw($payload, $queue = null, array $options = []) + { + return $this->pushToDatabase(0, $queue, $payload); + } + + /** + * Push a new job onto the queue after a delay. + * + * @param \DateTime|int $delay + * @param string $job + * @param mixed $data + * @param string $queue + * @return void + */ + public function later($delay, $job, $data = '', $queue = null) + { + return $this->pushToDatabase($delay, $queue, $this->createPayload($job, $data)); + } + + /** + * Push an array of jobs onto the queue. + * + * @param array $jobs + * @param mixed $data + * @param string $queue + * @return mixed + */ + public function bulk($jobs, $data = '', $queue = null) + { + $queue = $this->getQueue($queue); + + $availableAt = $this->getAvailableAt(0); + + $records = array_map(function ($job) use ($queue, $data, $availableAt) { + return $this->buildDatabaseRecord( + $queue, $this->createPayload($job, $data), $availableAt + ); + }, (array) $jobs); + + return $this->database->table($this->table)->insert($records); + } + + /** + * Release a reserved job back onto the queue. + * + * @param string $queue + * @param \StdClass $job + * @param int $delay + * @return void + */ + public function release($queue, $job, $delay) + { + return $this->pushToDatabase($delay, $queue, $job->payload, $job->attempts); + } + + /** + * Push a raw payload to the database with a given delay. + * + * @param \DateTime|int $delay + * @param string|null $queue + * @param string $payload + * @param int $attempts + * @return mixed + */ + protected function pushToDatabase($delay, $queue, $payload, $attempts = 0) + { + $attributes = $this->buildDatabaseRecord( + $this->getQueue($queue), $payload, $this->getAvailableAt($delay), $attempts + ); + + return $this->database->table($this->table)->insertGetId($attributes); + } + + /** + * Pop the next job off of the queue. + * + * @param string $queue + * @return \Illuminate\Contracts\Queue\Job|null + */ + public function pop($queue = null) + { + $queue = $this->getQueue($queue); + + if (! is_null($this->expire)) { + $this->releaseJobsThatHaveBeenReservedTooLong($queue); + } + + if ($job = $this->getNextAvailableJob($queue)) { + $this->markJobAsReserved($job->_id); + + return new MongodbJob( + $this->container, $this, $job, $queue + ); + } + + $this->database->commit(); + } + + /** + * Release the jobs that have been reserved for too long. + * + * @param string $queue + * @return void + */ + protected function releaseJobsThatHaveBeenReservedTooLong($queue) + { + $expired = Carbon::now()->subSeconds($this->expire)->getTimestamp(); + $reserved = $this->database->collection($this->table) + ->where('queue', $this->getQueue($queue)) + ->where('reserved', 1) + ->where('reserved_at', '<=', $expired)->get(); + + foreach ($reserved as $job) { + $attempts = $job['attempts'] + 1; + $this->releaseJob($job['_id'], $attempts); + } + } + + /** + * Get the next available job for the queue. + * + * @param string|null $queue + * @return \StdClass|null + */ + protected function getNextAvailableJob($queue) + { + $job = $this->database->table($this->table) + ->lockForUpdate() + ->where('queue', $this->getQueue($queue)) + ->where('reserved', 0) + ->where('available_at', '<=', $this->getTime()) + ->orderBy('_id', 'asc') + ->first(); + + return $job ? (object) $job : null; + } + + /** + * Mark the given job ID as reserved. + * + * @param string $id + * @return void + */ + protected function markJobAsReserved($id) + { + $this->database->table($this->table)->where('_id', $id)->update([ + 'reserved' => 1, 'reserved_at' => $this->getTime(), + ]); + } + + /** + * Release the given job ID from reservation. + * + * @param string $id + * @return void + */ + protected function releaseJob($id, $attempts) + { + $this->database->table($this->table)->where('_id', $id)->update([ + 'reserved' => 0, + 'reserved_at' => null, + 'attempts' => $attempts, + ]); + } + + /** + * Delete a reserved job from the queue. + * + * @param string $queue + * @param string $id + * @return void + */ + public function deleteReserved($queue, $id) + { + $this->database->table($this->table)->where('_id', $id)->delete(); + } + + /** + * Get the "available at" UNIX timestamp. + * + * @param \DateTime|int $delay + * @return int + */ + protected function getAvailableAt($delay) + { + $availableAt = $delay instanceof DateTime ? $delay : Carbon::now()->addSeconds($delay); + + return $availableAt->getTimestamp(); + } + + /** + * Create an array to insert for the given job. + * + * @param string|null $queue + * @param string $payload + * @param int $availableAt + * @param int $attempts + * @return array + */ + protected function buildDatabaseRecord($queue, $payload, $availableAt, $attempts = 0) + { + return [ + 'queue' => $queue, + 'payload' => $payload, + 'attempts' => $attempts, + 'reserved' => 0, + 'reserved_at' => null, + 'available_at' => $availableAt, + 'created_at' => $this->getTime(), + ]; + } + + /** + * Get the queue or return the default. + * + * @param string|null $queue + * @return string + */ + protected function getQueue($queue) + { + return $queue ?: $this->default; + } + + /** + * Get the underlying database instance. + * + * @return \Illuminate\Database\Connection + */ + public function getDatabase() + { + return $this->database; + } + + /** + * Get the expiration time in seconds. + * + * @return int|null + */ + public function getExpire() + { + return $this->expire; + } + + /** + * Set the expiration time in seconds. + * + * @param int|null $seconds + * @return void + */ + public function setExpire($seconds) + { + $this->expire = $seconds; + } +} From 25cc131b2117ccc4e6c0598cdf7920e3bea16f46 Mon Sep 17 00:00:00 2001 From: Jeremy Brenner Date: Wed, 14 Oct 2015 17:15:07 -0500 Subject: [PATCH 2/6] namespacing --- src/Jenssegers/Mongodb/Queue/MongodbJob.php | 1 + 1 file changed, 1 insertion(+) diff --git a/src/Jenssegers/Mongodb/Queue/MongodbJob.php b/src/Jenssegers/Mongodb/Queue/MongodbJob.php index dafa23e17..a6af4514a 100644 --- a/src/Jenssegers/Mongodb/Queue/MongodbJob.php +++ b/src/Jenssegers/Mongodb/Queue/MongodbJob.php @@ -4,6 +4,7 @@ use Jenssegers\Mongodb\Queue\MongodbQueue; use Illuminate\Container\Container; +use Illuminate\Queue\Jobs\Job; use Illuminate\Contracts\Queue\Job as JobContract; class MongodbJob extends Job implements JobContract From 709af66cf0dd72c0b805d70cc8cad4b8ac8e7b01 Mon Sep 17 00:00:00 2001 From: Jeremy Brenner Date: Thu, 15 Oct 2015 10:52:59 -0500 Subject: [PATCH 3/6] Moved Queue init to it's own service provider --- .../Mongodb/MongodbQueueServiceProvider.php | 29 +++++++++++++++++++ .../Mongodb/MongodbServiceProvider.php | 8 ----- 2 files changed, 29 insertions(+), 8 deletions(-) create mode 100644 src/Jenssegers/Mongodb/MongodbQueueServiceProvider.php diff --git a/src/Jenssegers/Mongodb/MongodbQueueServiceProvider.php b/src/Jenssegers/Mongodb/MongodbQueueServiceProvider.php new file mode 100644 index 000000000..e5420b39a --- /dev/null +++ b/src/Jenssegers/Mongodb/MongodbQueueServiceProvider.php @@ -0,0 +1,29 @@ +app->resolving('queue', function($queue) + { + $queue->extend('mongodb', function() { + return new MongodbConnector($this->app['db']); + }); + }); + } + +} diff --git a/src/Jenssegers/Mongodb/MongodbServiceProvider.php b/src/Jenssegers/Mongodb/MongodbServiceProvider.php index c5fee36e2..5db378e81 100644 --- a/src/Jenssegers/Mongodb/MongodbServiceProvider.php +++ b/src/Jenssegers/Mongodb/MongodbServiceProvider.php @@ -1,7 +1,6 @@ app->resolving('queue', function($queue) - { - $queue->addConnector('mongodb', function() { - return new MongodbConnector($this->app['db']); - }); - }); - } } From 62469c884f60a6e47d2d200bafb41788d2a3847f Mon Sep 17 00:00:00 2001 From: Jeremy Brenner Date: Thu, 15 Oct 2015 11:08:12 -0500 Subject: [PATCH 4/6] Update README.md --- README.md | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/README.md b/README.md index d90fdb00a..8302bd0aa 100644 --- a/README.md +++ b/README.md @@ -42,6 +42,11 @@ And add the service provider in `config/app.php`: Jenssegers\Mongodb\MongodbServiceProvider::class, ``` +For job queue support add the optional: +```php +Jenssegers\Mongodb\MongodbQueueServiceProvider::class, +``` + For usage with [Lumen](http://lumen.laravel.com), add the service provider in `bootstrap/app.php`. In this file, you will also need to enable Eloquent. You must however ensure that your call to `$app->withEloquent();` is **below** where you have registered the `MongodbServiceProvider`: ```php @@ -100,6 +105,18 @@ You can connect to multiple servers or replica sets with the following configura ), ``` +For job queue support add a mongodb connection in config/queue.php: +``` + 'mongodb' => [ + 'driver' => 'mongodb', + 'table' => 'jobs', + 'queue' => 'default', + 'expire' => 60, + ], +``` + +You also need to set your QUEUE_DRIVER to 'mongodb' in your environment or .env + Eloquent -------- @@ -917,3 +934,4 @@ DB::connection()->disableQueryLog(); ``` *From: http://laravel.com/docs/database#query-logging* + From 5a2a7cbf05eaed56deb86f6c57c0f88a714a7f80 Mon Sep 17 00:00:00 2001 From: Jeremy Brenner Date: Thu, 15 Oct 2015 11:12:20 -0500 Subject: [PATCH 5/6] style compatibility --- src/Jenssegers/Mongodb/MongodbQueueServiceProvider.php | 4 ++-- src/Jenssegers/Mongodb/Queue/MongodbConnector.php | 1 - src/Jenssegers/Mongodb/Queue/MongodbJob.php | 1 - src/Jenssegers/Mongodb/Queue/MongodbQueue.php | 1 - 4 files changed, 2 insertions(+), 5 deletions(-) diff --git a/src/Jenssegers/Mongodb/MongodbQueueServiceProvider.php b/src/Jenssegers/Mongodb/MongodbQueueServiceProvider.php index e5420b39a..003122d3a 100644 --- a/src/Jenssegers/Mongodb/MongodbQueueServiceProvider.php +++ b/src/Jenssegers/Mongodb/MongodbQueueServiceProvider.php @@ -18,9 +18,9 @@ public function boot() */ public function register() { - $this->app->resolving('queue', function($queue) + $this->app->resolving('queue', function ($queue) { - $queue->extend('mongodb', function() { + $queue->extend('mongodb', function () { return new MongodbConnector($this->app['db']); }); }); diff --git a/src/Jenssegers/Mongodb/Queue/MongodbConnector.php b/src/Jenssegers/Mongodb/Queue/MongodbConnector.php index a6342bfd7..fd4b719ff 100644 --- a/src/Jenssegers/Mongodb/Queue/MongodbConnector.php +++ b/src/Jenssegers/Mongodb/Queue/MongodbConnector.php @@ -3,7 +3,6 @@ namespace Jenssegers\Mongodb\Queue; use Illuminate\Support\Arr; -use Jenssegers\Mongodb\Queue\MongodbQueue; use Illuminate\Database\ConnectionResolverInterface; use Illuminate\Queue\Connectors\ConnectorInterface; diff --git a/src/Jenssegers/Mongodb/Queue/MongodbJob.php b/src/Jenssegers/Mongodb/Queue/MongodbJob.php index a6af4514a..a4da53ada 100644 --- a/src/Jenssegers/Mongodb/Queue/MongodbJob.php +++ b/src/Jenssegers/Mongodb/Queue/MongodbJob.php @@ -2,7 +2,6 @@ namespace Jenssegers\Mongodb\Queue; -use Jenssegers\Mongodb\Queue\MongodbQueue; use Illuminate\Container\Container; use Illuminate\Queue\Jobs\Job; use Illuminate\Contracts\Queue\Job as JobContract; diff --git a/src/Jenssegers/Mongodb/Queue/MongodbQueue.php b/src/Jenssegers/Mongodb/Queue/MongodbQueue.php index 167663786..a5683fc50 100644 --- a/src/Jenssegers/Mongodb/Queue/MongodbQueue.php +++ b/src/Jenssegers/Mongodb/Queue/MongodbQueue.php @@ -5,7 +5,6 @@ use DateTime; use Carbon\Carbon; use Illuminate\Database\Connection; -use Jenssegers\Mongodb\Queue\MongodbJob; use Illuminate\Queue\Queue; use Illuminate\Contracts\Queue\Queue as QueueContract; From 733d283175f9507c2ece71b6ed4ea4d30c6b53db Mon Sep 17 00:00:00 2001 From: Jeremy Brenner Date: Thu, 15 Oct 2015 23:12:50 -0500 Subject: [PATCH 6/6] Changed to overwrite database driver since mongo driver has done the same, no need for extra configuration --- README.md | 13 +------------ .../Mongodb/MongodbQueueServiceProvider.php | 2 +- 2 files changed, 2 insertions(+), 13 deletions(-) diff --git a/README.md b/README.md index 8302bd0aa..50f2d639a 100644 --- a/README.md +++ b/README.md @@ -105,17 +105,7 @@ You can connect to multiple servers or replica sets with the following configura ), ``` -For job queue support add a mongodb connection in config/queue.php: -``` - 'mongodb' => [ - 'driver' => 'mongodb', - 'table' => 'jobs', - 'queue' => 'default', - 'expire' => 60, - ], -``` - -You also need to set your QUEUE_DRIVER to 'mongodb' in your environment or .env +You also need to set your QUEUE_DRIVER to 'database' in your environment or .env Eloquent -------- @@ -934,4 +924,3 @@ DB::connection()->disableQueryLog(); ``` *From: http://laravel.com/docs/database#query-logging* - diff --git a/src/Jenssegers/Mongodb/MongodbQueueServiceProvider.php b/src/Jenssegers/Mongodb/MongodbQueueServiceProvider.php index 003122d3a..112d6f426 100644 --- a/src/Jenssegers/Mongodb/MongodbQueueServiceProvider.php +++ b/src/Jenssegers/Mongodb/MongodbQueueServiceProvider.php @@ -20,7 +20,7 @@ public function register() { $this->app->resolving('queue', function ($queue) { - $queue->extend('mongodb', function () { + $queue->extend('database', function () { return new MongodbConnector($this->app['db']); }); });