Update job status directly from the worker
Rather than proxying progress reports back to the client, the worker will now directly update the database.
This commit is contained in:
@@ -11,16 +11,19 @@ class RippingCluster_Worker_Plugin_HandBrake implements RippingCluster_Worker_IP
|
|||||||
|
|
||||||
private $job;
|
private $job;
|
||||||
|
|
||||||
private $client_job_id;
|
|
||||||
private $rip_options;
|
private $rip_options;
|
||||||
|
|
||||||
private function __construct(GearmanJob $job) {
|
private function __construct(GearmanJob $gearman_job) {
|
||||||
$this->output = '';
|
$this->output = '';
|
||||||
|
|
||||||
$this->job = $job;
|
$this->gearman_job = $gearman_job;
|
||||||
|
|
||||||
$this->client_job_id = $job->unique();
|
$this->rip_options = unserialize($this->gearman_job->workload());
|
||||||
$this->rip_options = unserialize($job->workload());
|
|
||||||
|
if ( ! $this->rip_options['id']) {
|
||||||
|
throw new RippingCluster_Exception_LogicException("Job ID must not be zero/null");
|
||||||
|
}
|
||||||
|
$this->job = RippingCluster_Job::fromId($this->rip_options['id']);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static function init() {
|
public static function init() {
|
||||||
@@ -62,7 +65,7 @@ class RippingCluster_Worker_Plugin_HandBrake implements RippingCluster_Worker_IP
|
|||||||
$this->evaluateOption('audio_tracks', '-a'),
|
$this->evaluateOption('audio_tracks', '-a'),
|
||||||
$this->evaluateOption('audio_codec', '-E'),
|
$this->evaluateOption('audio_codec', '-E'),
|
||||||
$this->evaluateOption('audio_names', '-A'),
|
$this->evaluateOption('audio_names', '-A'),
|
||||||
$this->evaluateOption('subtitle_tracks', '-s'),
|
$this->evaluateOption('subtitle_tracks', '-s'),
|
||||||
);
|
);
|
||||||
|
|
||||||
$handbrake_cmd = array($config->get('rips.nice_binary'));
|
$handbrake_cmd = array($config->get('rips.nice_binary'));
|
||||||
@@ -70,11 +73,15 @@ class RippingCluster_Worker_Plugin_HandBrake implements RippingCluster_Worker_IP
|
|||||||
$handbrake_cmd[] = escapeshellarg($value);
|
$handbrake_cmd[] = escapeshellarg($value);
|
||||||
}
|
}
|
||||||
$handbrake_cmd = join(' ', $handbrake_cmd);
|
$handbrake_cmd = join(' ', $handbrake_cmd);
|
||||||
$log->debug($handbrake_cmd);
|
$log->debug($handbrake_cmd, $this->job->id());
|
||||||
|
|
||||||
|
// Change the status of this job to running
|
||||||
|
$log->debug("Setting status to Running", $this->job->id());
|
||||||
|
$this->job->updateStatus(RippingCluster_JobStatus::RUNNING, 0);
|
||||||
|
|
||||||
$return_val = RippingCluster_ForegroundTask::execute($handbrake_cmd, null, null, null, array($this, 'callbackOutput'), array($this, 'callbackOutput'), $this);
|
list($return_val, $stdout, $stderr) = RippingCluster_ForegroundTask::execute($handbrake_cmd, null, null, null, array($this, 'callbackOutput'), array($this, 'callbackOutput'), $this);
|
||||||
if ($return_val) {
|
if ($return_val) {
|
||||||
$this->job->setFail($return_val);
|
$this->gearman_job->sendFail($return_val);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -113,11 +120,11 @@ class RippingCluster_Worker_Plugin_HandBrake implements RippingCluster_Worker_IP
|
|||||||
|
|
||||||
$matches = array();
|
$matches = array();
|
||||||
if (preg_match('/Encoding: task \d+ of \d+, (\d+\.\d+) %/', $line, $matches)) {
|
if (preg_match('/Encoding: task \d+ of \d+, (\d+\.\d+) %/', $line, $matches)) {
|
||||||
$numerator = 100 * $matches[1];
|
$status = $rip->job->currentStatus();
|
||||||
$this->job->sendStatus($numerator, 100);
|
$status->updateRipProgress($matches[1]);
|
||||||
} else {
|
} else {
|
||||||
$log = RippingCluster_Main::instance()->log();
|
$log = RippingCluster_Main::instance()->log();
|
||||||
$log->debug($line);
|
$log->debug($line, $rip->job->id());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -45,7 +45,7 @@ function gearman_created_callback($gearman_task) {
|
|||||||
$log = $main->log();
|
$log = $main->log();
|
||||||
|
|
||||||
$job = RippingCluster_Job::fromId($gearman_task->unique());
|
$job = RippingCluster_Job::fromId($gearman_task->unique());
|
||||||
$job->updateStatus(RippingCluster_JobStatus::RUNNING);
|
$job->updateStatus(RippingCluster_JobStatus::QUEUED);
|
||||||
|
|
||||||
$log->info("Job successfully queued with Gearman", $gearman_task->unique());
|
$log->info("Job successfully queued with Gearman", $gearman_task->unique());
|
||||||
}
|
}
|
||||||
@@ -54,7 +54,7 @@ function gearman_data_callback($gearman_task) {
|
|||||||
$main = RippingCluster_Main::instance();
|
$main = RippingCluster_Main::instance();
|
||||||
$log = $main->log();
|
$log = $main->log();
|
||||||
|
|
||||||
$log->debug("Got some data from gearman", $gearman_task->unique());
|
$log->debug("Received data callback from Gearman Task");
|
||||||
}
|
}
|
||||||
|
|
||||||
function gearman_status_callback($gearman_task) {
|
function gearman_status_callback($gearman_task) {
|
||||||
|
|||||||
Reference in New Issue
Block a user