From 40fa00c034a969c502a0d56f790e86aeaaa40554 Mon Sep 17 00:00:00 2001 From: Ben Roberts Date: Sun, 22 Aug 2010 23:05:33 +0100 Subject: [PATCH] Update worker code to the point that it can run. Added plugin architecture for Worker functions, to support multiple ripping engines. Update worker script to use the new code. Update Logger to output messages to console when running under the worker script. --- lib/HandBrakeCluster/Log.class.php | 4 + lib/HandBrakeCluster/PluginFactory.class.php | 50 +++++++ lib/HandBrakeCluster/Worker.class.php | 97 ++++--------- lib/HandBrakeCluster/Worker/IPlugin.class.php | 16 +++ .../Worker/Plugin/HandBrake.class.php | 130 ++++++++++++++++++ .../Worker/PluginFactory.class.php | 27 ++++ worker/handbrake-cluster-worker.php | 3 + 7 files changed, 260 insertions(+), 67 deletions(-) create mode 100644 lib/HandBrakeCluster/PluginFactory.class.php create mode 100644 lib/HandBrakeCluster/Worker/IPlugin.class.php create mode 100644 lib/HandBrakeCluster/Worker/Plugin/HandBrake.class.php create mode 100644 lib/HandBrakeCluster/Worker/PluginFactory.class.php diff --git a/lib/HandBrakeCluster/Log.class.php b/lib/HandBrakeCluster/Log.class.php index 378fe32..6e3658c 100644 --- a/lib/HandBrakeCluster/Log.class.php +++ b/lib/HandBrakeCluster/Log.class.php @@ -28,6 +28,10 @@ class HandBrakeCluster_Log { array('name' => 'message', 'value' => $message, 'type' => PDO::PARAM_STR) ) ); + + if (HBC_File == 'worker') { + echo date("r") . ' ' . $message . "\n"; + } } public function debug($message, $job_id = 0) { diff --git a/lib/HandBrakeCluster/PluginFactory.class.php b/lib/HandBrakeCluster/PluginFactory.class.php new file mode 100644 index 0000000..b9759a6 --- /dev/null +++ b/lib/HandBrakeCluster/PluginFactory.class.php @@ -0,0 +1,50 @@ +getFilename()); + $plugins[] = $plugin; + } + + return $plugins; + } + + protected static function loadPlugins($plugins, $prefix) { + self::$validPlugins = array(); + + foreach ($plugins as $plugin) { + $fullClassname = $prefix . $plugin; + if ( ! class_exists($fullClassname, true)) { + echo "Cannot load $fullClassname\n"; + continue; + } + + if ( ! in_array('HandBrakeCluster_Worker_IPlugin', class_implements($fullClassname))) { + echo "$plugin does not implement the necessary interfaces\n"; + continue; + } + + // Initialise the plugin + call_user_func(array($fullClassname, 'init')); + + self::$validPlugins[$plugin] = $fullClassname; + } + } + +} + +?> \ No newline at end of file diff --git a/lib/HandBrakeCluster/Worker.class.php b/lib/HandBrakeCluster/Worker.class.php index 3c966ed..72e0974 100644 --- a/lib/HandBrakeCluster/Worker.class.php +++ b/lib/HandBrakeCluster/Worker.class.php @@ -2,84 +2,47 @@ class HandBrakeCluster_Worker { - const DEINTERLACE_ALWAYS = 1; - const DEINTERLACE_SELECTIVELY = 2; + protected $gearman; public function __construct() { - $gearman = new GearmanWorker(); - $gearman->addServers($config->get('rips.job_servers')); - $gearman->addFunction('handbrake_rip', 'hbc_gearman_handbrake_rip'); + $this->init(); + } - public function start() { - while($gearman->work()) { - if ($gearman->returnCode() != GEARMAN_SUCCESS) { + private function init() { + if ($this->gearman) { + return; + } + + $config = HandBrakeCluster_Main::instance()->config(); + + $this->gearman = new GearmanWorker(); + $this->gearman->addServers($config->get('rips.job_servers')); + + // Load all the plugin classes + echo "Loading Plugins\n"; + HandBrakeCluster_Worker_PluginFactory::scan(); + foreach (HandBrakeCluster_Worker_PluginFactory::getValidPlugins() as $plugin) { + echo "Grabbing worker functions provided by {$plugin}\n"; + $workerFunctions = HandBrakeCluster_Worker_PluginFactory::getPluginWorkerFunctions($plugin); + + foreach ($workerFunctions as $function => $callback) { + echo "Adding {$plugin}::{$callback[1]} as {$function}\n"; + $this->gearman->addFunction($function, $callback); + } + } + } + + public function start() { + while($this->gearman->work()) { + if ($this->gearman->returnCode() != GEARMAN_SUCCESS) { break; } } return true; } - - public function handbrakeRip($client_job_id, $rip_options) { - $handbrake_cmd_raw = array( - '-n', $config->get('rips.nice'), - $config->get('rips.handbrake_binary'), - self::evaluateOption($rip_options, 'input_filename', '-i'), - self::evaluateOption($rip_options, 'real_output_filename', '-o'), - self::evaluateOption($rip_options, 'title'), - self::evaluateOption($rip_options, 'format', '-f'), - self::evaluateOption($rip_options, 'video_codec', '-e'), - self::evaluateOption($rip_options, 'quantizer', '-q'), - self::evaluateOption($rip_options, 'video_width', '-w'), - self::evaluateOption($rip_options, 'video_height', '-l'), - self::evaluateOption($rip_options, 'deinterlace'), - self::evaluateOption($rip_options, 'audio_tracks', '-a'), - self::evaluateOption($rip_options, 'audio_codec', '-E'), - self::evaluateOption($rip_options, 'audio_names', '-A'), - self::evaluateOption($rip_options, 'subtitle_tracks', '-s'), - ); - $handbrake_cmd = array($config->get('rips.nice_binary')); - foreach(new RecursiveIteratorIterator(new RecursiveArrayIterator($handbrake_cmd_raw)) as $value) { - $handbrake_cmd[] = shell_escape_arg($value); - } - $handbrake_cmd = array_join(' ', $handbrake_cmd); - - return HandBrakeCluster_ForegroundTask::execute($handbrake_cmd, null, null, null, array($this, 'callback_stdout'), array($this, 'callback_stderr'), $this); - - } - - protected static function evaluateOption(array &$rip_options, $name, $option = null) { - switch($name) { - case 'title': { - if (!$rip_options[$name] || int($rip_options[$name]) < 0) { - return array('-L'); - } else { - return array('-t', $rip_options[$name]); - } - } break; - - case 'deinterlace': { - switch ($rip_options[$name]) { - case self::DEINTERLACE_ALWAYS: - return array('-d'); - case self::DEINTERLACE_SELECTIVELY: - return array('-5'); - default: - return array(); - } - } - - default: - return array(isset($option) ? $option : $name, $rip_options[$name]); - } - } - -} - -function hbc_gearman_handbrake_rip(GearmanJob $job) { - return $worker->handbrakeRip($job->unique(), unserialize($job->workload)); } ?> \ No newline at end of file diff --git a/lib/HandBrakeCluster/Worker/IPlugin.class.php b/lib/HandBrakeCluster/Worker/IPlugin.class.php new file mode 100644 index 0000000..8b37b80 --- /dev/null +++ b/lib/HandBrakeCluster/Worker/IPlugin.class.php @@ -0,0 +1,16 @@ + \ No newline at end of file diff --git a/lib/HandBrakeCluster/Worker/Plugin/HandBrake.class.php b/lib/HandBrakeCluster/Worker/Plugin/HandBrake.class.php new file mode 100644 index 0000000..f1b2fad --- /dev/null +++ b/lib/HandBrakeCluster/Worker/Plugin/HandBrake.class.php @@ -0,0 +1,130 @@ +stdout = ''; + $this->stderr = ''; + + $this->job = $job; + + $this->client_job_id = $job->unique(); + $this->rip_options = unserialize($job->workload()); + } + + public static function init() { + + } + + public static function workerFunctions() { + return array( + 'handbrake_rip' => array(__CLASS__, 'rip'), + ); + } + + public static function rip(GearmanJob $job) { + $rip = new self($job); + $rip->execute(); + } + + public function execute() { + $config = HandBrakeCluster_Main::instance()->config(); + + $handbrake_cmd_raw = array( + '-n', $config->get('rips.nice'), + $config->get('rips.handbrake_binary'), + $this->evaluateOption('input_filename', '-i'), + $this->evaluateOption('output_filename', '-o'), + $this->evaluateOption('title'), + $this->evaluateOption('format', '-f'), + $this->evaluateOption('video_codec', '-e'), + $this->evaluateOption('quantizer', '-q'), + $this->evaluateOption('video_width', '-w'), + $this->evaluateOption('video_height', '-l'), + $this->evaluateOption('deinterlace'), + $this->evaluateOption('audio_tracks', '-a'), + $this->evaluateOption('audio_codec', '-E'), + $this->evaluateOption('audio_names', '-A'), + $this->evaluateOption('subtitle_tracks', '-s'), + ); + + $handbrake_cmd = array($config->get('rips.nice_binary')); + foreach(new RecursiveIteratorIterator(new RecursiveArrayIterator($handbrake_cmd_raw)) as $value) { + $handbrake_cmd[] = escapeshellarg($value); + } + $handbrake_cmd = join(' ', $handbrake_cmd); + + return HandBrakeCluster_ForegroundTask::execute($handbrake_cmd, null, null, null, array($this, 'callbackStdout'), array($this, 'callbackStderr'), $this); + + } + + public function evaluateOption($name, $option = null) { + switch($name) { + case 'title': { + if (!$this->rip_options[$name] || (int)$this->rip_options[$name] < 0) { + return array('-L'); + } else { + return array('-t', $this->rip_options[$name]); + } + } break; + + case 'deinterlace': { + switch ($this->rip_options[$name]) { + case self::DEINTERLACE_ALWAYS: + return array('-d'); + case self::DEINTERLACE_SELECTIVELY: + return array('-5'); + default: + return array(); + } + } + + default: + return array(isset($option) ? $option : $name, $this->rip_options[$name]); + } + } + + public function callbackStdout($rip, $data) { + $this->stdout .= $data; + + while (count($lines = preg_split('/[\r\n]+/', $this->stdout, 2)) > 1) { + $line = $lines[0]; + $this->stdout = $lines[1]; + + $log = HandBrakeCluster_Main::instance()->log(); + $log->info($line); + } + } + + public function callbackStderr($rip, $data) { + $this->stderr .= $data; + + while (count($lines = preg_split('/[\r\n]+/', $this->stderr, 2)) > 1) { + $line = $lines[0]; + $rip->stderr = $lines[1]; + + $matches = array(); + if (preg_match('/Encoding: task \d+ of \d+, (\d+\.\d+) %/', $line, $matches)) { + $numerator = 100 * $matches[1]; + $this->job->sendStatus($numerator, 100); + } else { + $log = HandBrakeCluster_Main::instance()->log(); + $log->debug($line); + } + } + } + +} + +?> \ No newline at end of file diff --git a/lib/HandBrakeCluster/Worker/PluginFactory.class.php b/lib/HandBrakeCluster/Worker/PluginFactory.class.php new file mode 100644 index 0000000..639e892 --- /dev/null +++ b/lib/HandBrakeCluster/Worker/PluginFactory.class.php @@ -0,0 +1,27 @@ + \ No newline at end of file diff --git a/worker/handbrake-cluster-worker.php b/worker/handbrake-cluster-worker.php index 79ca968..6984685 100644 --- a/worker/handbrake-cluster-worker.php +++ b/worker/handbrake-cluster-worker.php @@ -6,6 +6,9 @@ require_once '../config.php'; require_once HandBrakeCluster_Lib . 'HandBrakeCluster/Main.class.php'; try { + + set_time_limit(0); + $main = HandBrakeCluster_Main::instance(); $smarty = $main->smarty();