Skip to content

Add job queue support for mongodb. #623

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 9 commits into from
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ And add the service provider in `config/app.php`:
Jenssegers\Mongodb\MongodbServiceProvider::class,
```

For job queue support add the optional:
```php
Jenssegers\Mongodb\MongodbQueueServiceProvider::class,
```

For usage with [Lumen](http://lumen.laravel.com), add the service provider in `bootstrap/app.php`. In this file, you will also need to enable Eloquent. You must however ensure that your call to `$app->withEloquent();` is **below** where you have registered the `MongodbServiceProvider`:

```php
Expand Down Expand Up @@ -100,6 +105,8 @@ You can connect to multiple servers or replica sets with the following configura
),
```

You also need to set your QUEUE_DRIVER to 'database' in your environment or .env

Eloquent
--------

Expand Down
29 changes: 29 additions & 0 deletions src/Jenssegers/Mongodb/MongodbQueueServiceProvider.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
<?php namespace Jenssegers\Mongodb;

use Illuminate\Support\ServiceProvider;
use Jenssegers\Mongodb\Queue\MongodbConnector;

class MongodbQueueServiceProvider extends ServiceProvider {

/**
* Bootstrap the application events.
*/
public function boot()
{

}

/**
* Register the service provider.
*/
public function register()
{
$this->app->resolving('queue', function ($queue)
{
$queue->extend('database', function () {
return new MongodbConnector($this->app['db']);
});
});
}

}
1 change: 1 addition & 0 deletions src/Jenssegers/Mongodb/MongodbServiceProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public function register()
return new Connection($config);
});
});

}

}
44 changes: 44 additions & 0 deletions src/Jenssegers/Mongodb/Queue/MongodbConnector.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
<?php

namespace Jenssegers\Mongodb\Queue;

use Illuminate\Support\Arr;
use Illuminate\Database\ConnectionResolverInterface;
use Illuminate\Queue\Connectors\ConnectorInterface;

class MongodbConnector implements ConnectorInterface
{
/**
* Database connections.
*
* @var \Illuminate\Database\ConnectionResolverInterface
*/
protected $connections;

/**
* Create a new connector instance.
*
* @param \Illuminate\Database\ConnectionResolverInterface $connections
* @return void
*/
public function __construct(ConnectionResolverInterface $connections)
{
$this->connections = $connections;
}

/**
* Establish a queue connection.
*
* @param array $config
* @return \Illuminate\Contracts\Queue\Queue
*/
public function connect(array $config)
{
return new MongodbQueue(
$this->connections->connection(Arr::get($config, 'connection')),
$config['table'],
$config['queue'],
Arr::get($config, 'expire', 60)
);
}
}
139 changes: 139 additions & 0 deletions src/Jenssegers/Mongodb/Queue/MongodbJob.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
<?php

namespace Jenssegers\Mongodb\Queue;

use Illuminate\Container\Container;
use Illuminate\Queue\Jobs\Job;
use Illuminate\Contracts\Queue\Job as JobContract;

class MongodbJob extends Job implements JobContract
{
/**
* The database queue instance.
*
* @var \Illuminate\Queue\DatabaseQueue
*/
protected $database;

/**
* The database job payload.
*
* @var \StdClass
*/
protected $job;

/**
* Create a new job instance.
*
* @param \Illuminate\Container\Container $container
* @param \Illuminate\Queue\DatabaseQueue $database
* @param \StdClass $job
* @param string $queue
* @return void
*/
public function __construct(Container $container, MongodbQueue $database, $job, $queue)
{
$this->job = $job;
$this->queue = $queue;
$this->database = $database;
$this->container = $container;
$this->job->attempts = $this->job->attempts + 1;
}

/**
* Fire the job.
*
* @return void
*/
public function fire()
{
$this->resolveAndFire(json_decode($this->job->payload, true));
}

/**
* Delete the job from the queue.
*
* @return void
*/
public function delete()
{
parent::delete();

$this->database->deleteReserved($this->queue, $this->job->_id);
}

/**
* Release the job back into the queue.
*
* @param int $delay
* @return void
*/
public function release($delay = 0)
{
parent::release($delay);

$this->delete();

$this->database->release($this->queue, $this->job, $delay);
}

/**
* Get the number of times the job has been attempted.
*
* @return int
*/
public function attempts()
{
return (int) $this->job->attempts;
}

/**
* Get the job identifier.
*
* @return string
*/
public function getJobId()
{
return $this->job->id;
}

/**
* Get the raw body string for the job.
*
* @return string
*/
public function getRawBody()
{
return $this->job->payload;
}

/**
* Get the IoC container instance.
*
* @return \Illuminate\Container\Container
*/
public function getContainer()
{
return $this->container;
}

/**
* Get the underlying queue driver instance.
*
* @return \Illuminate\Queue\DatabaseQueue
*/
public function getDatabaseQueue()
{
return $this->database;
}

/**
* Get the underlying database job.
*
* @return \StdClass
*/
public function getDatabaseJob()
{
return $this->job;
}
}
Loading