Update worker code to the point that it can run.
Added plugin architecture for Worker functions, to support multiple ripping engines. Update worker script to use the new code. Update Logger to output messages to console when running under the worker script.
This commit is contained in:
@@ -28,6 +28,10 @@ class HandBrakeCluster_Log {
|
||||
array('name' => 'message', 'value' => $message, 'type' => PDO::PARAM_STR)
|
||||
)
|
||||
);
|
||||
|
||||
if (HBC_File == 'worker') {
|
||||
echo date("r") . ' ' . $message . "\n";
|
||||
}
|
||||
}
|
||||
|
||||
public function debug($message, $job_id = 0) {
|
||||
|
||||
50
lib/HandBrakeCluster/PluginFactory.class.php
Normal file
50
lib/HandBrakeCluster/PluginFactory.class.php
Normal file
@@ -0,0 +1,50 @@
|
||||
<?php
|
||||
|
||||
abstract class HandBrakeCluster_PluginFactory {
|
||||
|
||||
static protected $validPlugins;
|
||||
|
||||
abstract public static function init();
|
||||
|
||||
public static function getValidPlugins() {
|
||||
return array_keys(self::$validPlugins);
|
||||
}
|
||||
|
||||
protected static function findPlugins($directory) {
|
||||
$plugins = array();
|
||||
|
||||
$iterator = new HandBrakeCluster_Utility_ClassFilesIterator(new HandBrakeCluster_Utility_VisibleFilesIterator(new DirectoryIterator(HandBrakeCluster_Lib . $directory)));
|
||||
|
||||
foreach ($iterator as /** @var SplFileInfo */ $file) {
|
||||
$plugin = preg_replace('/.class.php$/', '', $file->getFilename());
|
||||
$plugins[] = $plugin;
|
||||
}
|
||||
|
||||
return $plugins;
|
||||
}
|
||||
|
||||
protected static function loadPlugins($plugins, $prefix) {
|
||||
self::$validPlugins = array();
|
||||
|
||||
foreach ($plugins as $plugin) {
|
||||
$fullClassname = $prefix . $plugin;
|
||||
if ( ! class_exists($fullClassname, true)) {
|
||||
echo "Cannot load $fullClassname\n";
|
||||
continue;
|
||||
}
|
||||
|
||||
if ( ! in_array('HandBrakeCluster_Worker_IPlugin', class_implements($fullClassname))) {
|
||||
echo "$plugin does not implement the necessary interfaces\n";
|
||||
continue;
|
||||
}
|
||||
|
||||
// Initialise the plugin
|
||||
call_user_func(array($fullClassname, 'init'));
|
||||
|
||||
self::$validPlugins[$plugin] = $fullClassname;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
?>
|
||||
@@ -2,18 +2,40 @@
|
||||
|
||||
class HandBrakeCluster_Worker {
|
||||
|
||||
const DEINTERLACE_ALWAYS = 1;
|
||||
const DEINTERLACE_SELECTIVELY = 2;
|
||||
protected $gearman;
|
||||
|
||||
public function __construct() {
|
||||
$gearman = new GearmanWorker();
|
||||
$gearman->addServers($config->get('rips.job_servers'));
|
||||
$gearman->addFunction('handbrake_rip', 'hbc_gearman_handbrake_rip');
|
||||
$this->init();
|
||||
|
||||
}
|
||||
|
||||
public function start() {
|
||||
while($gearman->work()) {
|
||||
if ($gearman->returnCode() != GEARMAN_SUCCESS) {
|
||||
private function init() {
|
||||
if ($this->gearman) {
|
||||
return;
|
||||
}
|
||||
|
||||
$config = HandBrakeCluster_Main::instance()->config();
|
||||
|
||||
$this->gearman = new GearmanWorker();
|
||||
$this->gearman->addServers($config->get('rips.job_servers'));
|
||||
|
||||
// Load all the plugin classes
|
||||
echo "Loading Plugins\n";
|
||||
HandBrakeCluster_Worker_PluginFactory::scan();
|
||||
foreach (HandBrakeCluster_Worker_PluginFactory::getValidPlugins() as $plugin) {
|
||||
echo "Grabbing worker functions provided by {$plugin}\n";
|
||||
$workerFunctions = HandBrakeCluster_Worker_PluginFactory::getPluginWorkerFunctions($plugin);
|
||||
|
||||
foreach ($workerFunctions as $function => $callback) {
|
||||
echo "Adding {$plugin}::{$callback[1]} as {$function}\n";
|
||||
$this->gearman->addFunction($function, $callback);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public function start() {
|
||||
while($this->gearman->work()) {
|
||||
if ($this->gearman->returnCode() != GEARMAN_SUCCESS) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
@@ -21,65 +43,6 @@ class HandBrakeCluster_Worker {
|
||||
return true;
|
||||
}
|
||||
|
||||
public function handbrakeRip($client_job_id, $rip_options) {
|
||||
$handbrake_cmd_raw = array(
|
||||
'-n', $config->get('rips.nice'),
|
||||
$config->get('rips.handbrake_binary'),
|
||||
self::evaluateOption($rip_options, 'input_filename', '-i'),
|
||||
self::evaluateOption($rip_options, 'real_output_filename', '-o'),
|
||||
self::evaluateOption($rip_options, 'title'),
|
||||
self::evaluateOption($rip_options, 'format', '-f'),
|
||||
self::evaluateOption($rip_options, 'video_codec', '-e'),
|
||||
self::evaluateOption($rip_options, 'quantizer', '-q'),
|
||||
self::evaluateOption($rip_options, 'video_width', '-w'),
|
||||
self::evaluateOption($rip_options, 'video_height', '-l'),
|
||||
self::evaluateOption($rip_options, 'deinterlace'),
|
||||
self::evaluateOption($rip_options, 'audio_tracks', '-a'),
|
||||
self::evaluateOption($rip_options, 'audio_codec', '-E'),
|
||||
self::evaluateOption($rip_options, 'audio_names', '-A'),
|
||||
self::evaluateOption($rip_options, 'subtitle_tracks', '-s'),
|
||||
);
|
||||
|
||||
$handbrake_cmd = array($config->get('rips.nice_binary'));
|
||||
foreach(new RecursiveIteratorIterator(new RecursiveArrayIterator($handbrake_cmd_raw)) as $value) {
|
||||
$handbrake_cmd[] = shell_escape_arg($value);
|
||||
}
|
||||
$handbrake_cmd = array_join(' ', $handbrake_cmd);
|
||||
|
||||
return HandBrakeCluster_ForegroundTask::execute($handbrake_cmd, null, null, null, array($this, 'callback_stdout'), array($this, 'callback_stderr'), $this);
|
||||
|
||||
}
|
||||
|
||||
protected static function evaluateOption(array &$rip_options, $name, $option = null) {
|
||||
switch($name) {
|
||||
case 'title': {
|
||||
if (!$rip_options[$name] || int($rip_options[$name]) < 0) {
|
||||
return array('-L');
|
||||
} else {
|
||||
return array('-t', $rip_options[$name]);
|
||||
}
|
||||
} break;
|
||||
|
||||
case 'deinterlace': {
|
||||
switch ($rip_options[$name]) {
|
||||
case self::DEINTERLACE_ALWAYS:
|
||||
return array('-d');
|
||||
case self::DEINTERLACE_SELECTIVELY:
|
||||
return array('-5');
|
||||
default:
|
||||
return array();
|
||||
}
|
||||
}
|
||||
|
||||
default:
|
||||
return array(isset($option) ? $option : $name, $rip_options[$name]);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
function hbc_gearman_handbrake_rip(GearmanJob $job) {
|
||||
return $worker->handbrakeRip($job->unique(), unserialize($job->workload));
|
||||
}
|
||||
|
||||
?>
|
||||
16
lib/HandBrakeCluster/Worker/IPlugin.class.php
Normal file
16
lib/HandBrakeCluster/Worker/IPlugin.class.php
Normal file
@@ -0,0 +1,16 @@
|
||||
<?php
|
||||
|
||||
interface HandBrakeCluster_Worker_IPlugin {
|
||||
|
||||
public static function workerFunctions();
|
||||
|
||||
public static function rip(GearmanJob $job);
|
||||
|
||||
public function evaluateOption($name, $option = null);
|
||||
|
||||
public function callbackStdout($id, $data);
|
||||
|
||||
public function callbackStderr($id, $data);
|
||||
}
|
||||
|
||||
?>
|
||||
130
lib/HandBrakeCluster/Worker/Plugin/HandBrake.class.php
Normal file
130
lib/HandBrakeCluster/Worker/Plugin/HandBrake.class.php
Normal file
@@ -0,0 +1,130 @@
|
||||
<?php
|
||||
|
||||
class HandBrakeCluster_Worker_Plugin_HandBrake implements HandBrakeCluster_Worker_IPlugin {
|
||||
|
||||
const DEINTERLACE_ALWAYS = 1;
|
||||
const DEINTERLACE_SELECTIVELY = 2;
|
||||
|
||||
private $stdout;
|
||||
private $stderr;
|
||||
|
||||
private $job;
|
||||
|
||||
private $client_job_id;
|
||||
private $rip_options;
|
||||
|
||||
private function __construct(GearmanJob $job) {
|
||||
$this->stdout = '';
|
||||
$this->stderr = '';
|
||||
|
||||
$this->job = $job;
|
||||
|
||||
$this->client_job_id = $job->unique();
|
||||
$this->rip_options = unserialize($job->workload());
|
||||
}
|
||||
|
||||
public static function init() {
|
||||
|
||||
}
|
||||
|
||||
public static function workerFunctions() {
|
||||
return array(
|
||||
'handbrake_rip' => array(__CLASS__, 'rip'),
|
||||
);
|
||||
}
|
||||
|
||||
public static function rip(GearmanJob $job) {
|
||||
$rip = new self($job);
|
||||
$rip->execute();
|
||||
}
|
||||
|
||||
public function execute() {
|
||||
$config = HandBrakeCluster_Main::instance()->config();
|
||||
|
||||
$handbrake_cmd_raw = array(
|
||||
'-n', $config->get('rips.nice'),
|
||||
$config->get('rips.handbrake_binary'),
|
||||
$this->evaluateOption('input_filename', '-i'),
|
||||
$this->evaluateOption('output_filename', '-o'),
|
||||
$this->evaluateOption('title'),
|
||||
$this->evaluateOption('format', '-f'),
|
||||
$this->evaluateOption('video_codec', '-e'),
|
||||
$this->evaluateOption('quantizer', '-q'),
|
||||
$this->evaluateOption('video_width', '-w'),
|
||||
$this->evaluateOption('video_height', '-l'),
|
||||
$this->evaluateOption('deinterlace'),
|
||||
$this->evaluateOption('audio_tracks', '-a'),
|
||||
$this->evaluateOption('audio_codec', '-E'),
|
||||
$this->evaluateOption('audio_names', '-A'),
|
||||
$this->evaluateOption('subtitle_tracks', '-s'),
|
||||
);
|
||||
|
||||
$handbrake_cmd = array($config->get('rips.nice_binary'));
|
||||
foreach(new RecursiveIteratorIterator(new RecursiveArrayIterator($handbrake_cmd_raw)) as $value) {
|
||||
$handbrake_cmd[] = escapeshellarg($value);
|
||||
}
|
||||
$handbrake_cmd = join(' ', $handbrake_cmd);
|
||||
|
||||
return HandBrakeCluster_ForegroundTask::execute($handbrake_cmd, null, null, null, array($this, 'callbackStdout'), array($this, 'callbackStderr'), $this);
|
||||
|
||||
}
|
||||
|
||||
public function evaluateOption($name, $option = null) {
|
||||
switch($name) {
|
||||
case 'title': {
|
||||
if (!$this->rip_options[$name] || (int)$this->rip_options[$name] < 0) {
|
||||
return array('-L');
|
||||
} else {
|
||||
return array('-t', $this->rip_options[$name]);
|
||||
}
|
||||
} break;
|
||||
|
||||
case 'deinterlace': {
|
||||
switch ($this->rip_options[$name]) {
|
||||
case self::DEINTERLACE_ALWAYS:
|
||||
return array('-d');
|
||||
case self::DEINTERLACE_SELECTIVELY:
|
||||
return array('-5');
|
||||
default:
|
||||
return array();
|
||||
}
|
||||
}
|
||||
|
||||
default:
|
||||
return array(isset($option) ? $option : $name, $this->rip_options[$name]);
|
||||
}
|
||||
}
|
||||
|
||||
public function callbackStdout($rip, $data) {
|
||||
$this->stdout .= $data;
|
||||
|
||||
while (count($lines = preg_split('/[\r\n]+/', $this->stdout, 2)) > 1) {
|
||||
$line = $lines[0];
|
||||
$this->stdout = $lines[1];
|
||||
|
||||
$log = HandBrakeCluster_Main::instance()->log();
|
||||
$log->info($line);
|
||||
}
|
||||
}
|
||||
|
||||
public function callbackStderr($rip, $data) {
|
||||
$this->stderr .= $data;
|
||||
|
||||
while (count($lines = preg_split('/[\r\n]+/', $this->stderr, 2)) > 1) {
|
||||
$line = $lines[0];
|
||||
$rip->stderr = $lines[1];
|
||||
|
||||
$matches = array();
|
||||
if (preg_match('/Encoding: task \d+ of \d+, (\d+\.\d+) %/', $line, $matches)) {
|
||||
$numerator = 100 * $matches[1];
|
||||
$this->job->sendStatus($numerator, 100);
|
||||
} else {
|
||||
$log = HandBrakeCluster_Main::instance()->log();
|
||||
$log->debug($line);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
?>
|
||||
27
lib/HandBrakeCluster/Worker/PluginFactory.class.php
Normal file
27
lib/HandBrakeCluster/Worker/PluginFactory.class.php
Normal file
@@ -0,0 +1,27 @@
|
||||
<?php
|
||||
|
||||
class HandBrakeCluster_Worker_PluginFactory extends HandBrakeCluster_PluginFactory {
|
||||
|
||||
const PLUGIN_DIR = 'HandBrakeCluster/Worker/Plugin/';
|
||||
const PREFIX = 'HandBrakeCluster_Worker_Plugin_';
|
||||
|
||||
public static function init() {
|
||||
|
||||
}
|
||||
|
||||
public static function scan() {
|
||||
$candidatePlugins = parent::findPlugins(self::PLUGIN_DIR);
|
||||
|
||||
parent::loadPlugins($candidatePlugins, self::PREFIX);
|
||||
}
|
||||
|
||||
public static function getPluginWorkerFunctions($plugin) {
|
||||
if ( ! isset(parent::$validPlugins[$plugin])) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return call_user_func(array(parent::$validPlugins[$plugin], 'workerFunctions'));
|
||||
}
|
||||
}
|
||||
|
||||
?>
|
||||
@@ -6,6 +6,9 @@ require_once '../config.php';
|
||||
require_once HandBrakeCluster_Lib . 'HandBrakeCluster/Main.class.php';
|
||||
|
||||
try {
|
||||
|
||||
set_time_limit(0);
|
||||
|
||||
$main = HandBrakeCluster_Main::instance();
|
||||
$smarty = $main->smarty();
|
||||
|
||||
|
||||
Reference in New Issue
Block a user