Skip to content

Commit 77fcce5

Browse files
committed
Init commit.
0 parents  commit 77fcce5

36 files changed

+4755
-0
lines changed

.gitignore

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
*~
2+
/composer.lock
3+
/phpunit.xml
4+
/vendor/
5+
/.idea/

.travis.yml

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
sudo: false
2+
3+
git:
4+
depth: 1
5+
6+
language: php
7+
8+
php:
9+
- '5.6'
10+
- '7.0'
11+
12+
cache:
13+
directories:
14+
- $HOME/.composer/cache
15+
16+
install:
17+
- composer self-update
18+
- composer install --prefer-source
19+
20+
script:
21+
- vendor/bin/phpunit --exclude-group=functional

CalculateRootJobStatusProcessor.php

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
<?php
2+
namespace Enqueue\JobQueue;
3+
4+
use Enqueue\Psr\Context;
5+
use Enqueue\Psr\Message;
6+
use Enqueue\Client\MessageProducerInterface;
7+
use Enqueue\Client\TopicSubscriberInterface;
8+
use Enqueue\Consumption\MessageProcessorInterface;
9+
use Enqueue\Consumption\Result;
10+
use Enqueue\Util\JSON;
11+
use Psr\Log\LoggerInterface;
12+
13+
class CalculateRootJobStatusProcessor implements MessageProcessorInterface, TopicSubscriberInterface
14+
{
15+
/**
16+
* @var JobStorage
17+
*/
18+
private $jobStorage;
19+
20+
/**
21+
* @var CalculateRootJobStatusService
22+
*/
23+
private $calculateRootJobStatusService;
24+
25+
/**
26+
* @var MessageProducerInterface
27+
*/
28+
private $producer;
29+
30+
/**
31+
* @var LoggerInterface
32+
*/
33+
private $logger;
34+
35+
/**
36+
* @param JobStorage $jobStorage
37+
* @param CalculateRootJobStatusService $calculateRootJobStatusCase
38+
* @param MessageProducerInterface $producer
39+
* @param LoggerInterface $logger
40+
*/
41+
public function __construct(
42+
JobStorage $jobStorage,
43+
CalculateRootJobStatusService $calculateRootJobStatusCase,
44+
MessageProducerInterface $producer,
45+
LoggerInterface $logger
46+
) {
47+
$this->jobStorage = $jobStorage;
48+
$this->calculateRootJobStatusService = $calculateRootJobStatusCase;
49+
$this->producer = $producer;
50+
$this->logger = $logger;
51+
}
52+
53+
/**
54+
* {@inheritdoc}
55+
*/
56+
public function process(Message $message, Context $context)
57+
{
58+
$data = JSON::decode($message->getBody());
59+
60+
if (!isset($data['jobId'])) {
61+
$this->logger->critical(sprintf('Got invalid message. body: "%s"', $message->getBody()));
62+
63+
return Result::REJECT;
64+
}
65+
66+
$job = $this->jobStorage->findJobById($data['jobId']);
67+
if (!$job) {
68+
$this->logger->critical(sprintf('Job was not found. id: "%s"', $data['jobId']));
69+
70+
return Result::REJECT;
71+
}
72+
73+
$isRootJobStopped = $this->calculateRootJobStatusService->calculate($job);
74+
75+
if ($isRootJobStopped) {
76+
$this->producer->send(Topics::ROOT_JOB_STOPPED, [
77+
'jobId' => $job->getRootJob()->getId(),
78+
]);
79+
}
80+
81+
return Result::ACK;
82+
}
83+
84+
/**
85+
* {@inheritdoc}
86+
*/
87+
public static function getSubscribedTopics()
88+
{
89+
return [Topics::CALCULATE_ROOT_JOB_STATUS];
90+
}
91+
}

CalculateRootJobStatusService.php

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
<?php
2+
namespace Enqueue\JobQueue;
3+
4+
use Doctrine\Common\Collections\Collection;
5+
6+
class CalculateRootJobStatusService
7+
{
8+
/**
9+
* @var JobStorage
10+
*/
11+
private $jobStorage;
12+
13+
/**
14+
* @param JobStorage $jobStorage
15+
*/
16+
public function __construct(JobStorage $jobStorage)
17+
{
18+
$this->jobStorage = $jobStorage;
19+
}
20+
21+
/**
22+
* @param Job $job
23+
*
24+
* @return bool true if root job was stopped
25+
*/
26+
public function calculate(Job $job)
27+
{
28+
$rootJob = $job->isRoot() ? $job : $job->getRootJob();
29+
$stopStatuses = [Job::STATUS_SUCCESS, Job::STATUS_FAILED, Job::STATUS_CANCELLED];
30+
31+
if (in_array($rootJob->getStatus(), $stopStatuses)) {
32+
return;
33+
}
34+
35+
$rootStopped = false;
36+
$this->jobStorage->saveJob($rootJob, function (Job $rootJob) use ($stopStatuses, &$rootStopped) {
37+
if (in_array($rootJob->getStatus(), $stopStatuses)) {
38+
return;
39+
}
40+
41+
$childJobs = $rootJob->getChildJobs();
42+
if ($childJobs instanceof Collection) {
43+
$childJobs = $childJobs->toArray();
44+
}
45+
46+
$status = $this->calculateRootJobStatus($childJobs);
47+
48+
$rootJob->setStatus($status);
49+
50+
if (in_array($status, $stopStatuses)) {
51+
$rootStopped = true;
52+
if (!$rootJob->getStoppedAt()) {
53+
$rootJob->setStoppedAt(new \DateTime());
54+
}
55+
}
56+
});
57+
58+
return $rootStopped;
59+
}
60+
61+
/**
62+
* @param Job[] $jobs
63+
*
64+
* @return string
65+
*/
66+
protected function calculateRootJobStatus(array $jobs)
67+
{
68+
$new = 0;
69+
$running = 0;
70+
$cancelled = 0;
71+
$failed = 0;
72+
$success = 0;
73+
74+
foreach ($jobs as $job) {
75+
switch ($job->getStatus()) {
76+
case Job::STATUS_NEW:
77+
$new++;
78+
break;
79+
case Job::STATUS_RUNNING:
80+
$running++;
81+
break;
82+
case Job::STATUS_CANCELLED:
83+
$cancelled++;
84+
break;
85+
case Job::STATUS_FAILED:
86+
$failed++;
87+
break;
88+
case Job::STATUS_SUCCESS:
89+
$success++;
90+
break;
91+
default:
92+
throw new \LogicException(sprintf(
93+
'Got unsupported job status: id: "%s" status: "%s"',
94+
$job->getId(),
95+
$job->getStatus()
96+
));
97+
}
98+
}
99+
100+
return $this->getRootJobStatus($new, $running, $cancelled, $failed, $success);
101+
}
102+
103+
/**
104+
* @param int $new
105+
* @param int $running
106+
* @param int $cancelled
107+
* @param int $failed
108+
* @param int $success
109+
*
110+
* @return string
111+
*/
112+
protected function getRootJobStatus($new, $running, $cancelled, $failed, $success)
113+
{
114+
$status = Job::STATUS_NEW;
115+
if (!$new && !$running) {
116+
if ($cancelled) {
117+
$status = Job::STATUS_CANCELLED;
118+
} elseif ($failed) {
119+
$status = Job::STATUS_FAILED;
120+
} else {
121+
$status = Job::STATUS_SUCCESS;
122+
}
123+
} elseif ($running || $cancelled || $failed || $success) {
124+
$status = Job::STATUS_RUNNING;
125+
}
126+
127+
return $status;
128+
}
129+
}

DependentJobContext.php

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
<?php
2+
namespace Enqueue\JobQueue;
3+
4+
class DependentJobContext
5+
{
6+
/**
7+
* @var Job
8+
*/
9+
private $job;
10+
11+
/**
12+
* @var array
13+
*/
14+
private $dependentJobs;
15+
16+
/**
17+
* @param Job $job
18+
*/
19+
public function __construct(Job $job)
20+
{
21+
$this->job = $job;
22+
$this->dependentJobs = [];
23+
}
24+
25+
/**
26+
* @return Job
27+
*/
28+
public function getJob()
29+
{
30+
return $this->job;
31+
}
32+
33+
/**
34+
* @param string $topic
35+
* @param string|array $message
36+
* @param int $priority
37+
*/
38+
public function addDependentJob($topic, $message, $priority = null)
39+
{
40+
$this->dependentJobs[] = [
41+
'topic' => $topic,
42+
'message' => $message,
43+
'priority' => $priority,
44+
];
45+
}
46+
47+
/**
48+
* @return array
49+
*/
50+
public function getDependentJobs()
51+
{
52+
return $this->dependentJobs;
53+
}
54+
}

0 commit comments

Comments
 (0)