#!/usr/bin/perl package HandbrakeCluster::Worker; use warnings; use strict; use Data::Dumper; use Gearman::Worker; use Getopt::Long; use IO::Select; use IO::Socket; use IPC::Open3; use Log::Log4perl; use PHP::Serialization; use Pod::Usage; use String::Random qw/random_string/; use Storable qw/freeze thaw/; use Switch; use Symbol qw/gensym/; use YAML::Any; # Globals our $default_options = { log_config => 'logging.conf', help => 0, pretend => 0, job_servers => ['build0.sihnon.net', 'build1.sihnon.net', 'build2.sihnon.net'], gearman_prefix => '', handbrake => '/usr/bin/HandBrakeCLI', config_file => '', php_client => 0, }; my $options = { map { $_ => undef } keys %$default_options }; Getopt::Long::Configure( qw(bundling no_getopt_compat) ); GetOptions( 'help|h' => \$options->{help}, 'log-config|l=s' => \$options->{log_config}, 'pretend|n' => \$options->{pretend}, 'job-servers|j=s@' => \$options->{job_servers}, 'gearman-prefix=s' => \$options->{gearman_prefix}, 'handbrake' => \$options->{handbrake}, 'config|c=s' => \$options->{config_file}, ) or pod2usage(-verbose => 0); pod2usage(-verbose => 1) if ($options->{help}); # Parse the configuration file (if any), and merge/validate the options my $config = parse_config($options->{config_file}); ($config, $options) = process_config($config, $options, $default_options); # Setup logging Log::Log4perl->init($options->{log_config}); my $worker_log = Log::Log4perl->get_logger('HandbrakeCluster::Worker'); $worker_log->debug("Logging started"); # Setup the worker, and listen for jobs my $worker = Gearman::Worker->new(job_servers => $options->{job_servers}); $worker->prefix($options->{gearman_prefix}) if $options->{gearman_prefix}; $worker->register_function(handbrake_rip => \&handbrake_rip); $worker->work while 1; sub handbrake_rip { my $job = shift; my $rip_options; if ($options->{php_client}) { $rip_options = PHP::Serialization::unserialize($job->arg); } else { $rip_options = thaw($job->arg); } my $response = {}; # Setup logging for this job my $job_log = Log::Log4perl->get_logger('HandbrakeCluster::Worker::Job'); Log::Log4perl::MDC->put('job_id', $rip_options->{db_job_id}); $job_log->info("Beginning new rip to $rip_options->{output_filename}"); # Generate a unique filename based on the output filename to prevent clashes from previous runs my $uuid = random_string('cccccc'); $rip_options->{real_output_filename} = $rip_options->{output_dir} . $rip_options->{output_filename}; $rip_options->{real_output_filename} =~ s/\.([^\.]+)$/\.$uuid\.$1/; $response->{real_output_filename} = $rip_options->{real_output_filename}; # Generate the command line for handbrake my @handbrake_cmd = ( # Reduce the priority of the ripping process, since it is very processor intensive 'nice', '-n', $rip_options->{nice}, # Construct the handbrake command line $options->{handbrake}, get_options($rip_options, 'input_filename', '-i'), get_options($rip_options, 'real_output_filename', '-o'), get_options($rip_options, 'title'), get_options($rip_options, 'format', '-f'), get_options($rip_options, 'video_codec', '-e'), get_options($rip_options, 'quantizer', '-q'), get_options($rip_options, 'video_width', '-w'), get_options($rip_options, 'video_height', '-l'), get_options($rip_options, 'deinterlace'), get_options($rip_options, 'audio_tracks', '-a'), get_options($rip_options, 'audio_codec', '-E'), get_options($rip_options, 'audio_names', '-A'), get_options($rip_options, 'subtitle_tracks', '-s'), ); # Return a copy of the command used to rip the title $response->{handbrake_cmd} = @handbrake_cmd; # flag the start of the job $job->set_status(0, 100); # Execute the ripping process $job_log->debug("Beginning ripping process with command:\n" . join(' ', @handbrake_cmd)); my ($child_in, $child_out, $child_err) = map gensym, 1..3; my $child_pid = open3($child_in, $child_out, $child_err, @handbrake_cmd); # No need to write to the child process close($child_in); # Log the output from handbrake, and return it back to the client $response->{log} = (); my $line; my $select = IO::Select->new(); $select->add($child_out); $select->add($child_err); my $child_out_buffer; my $child_err_buffer; while (my @ready = $select->can_read()) { foreach my $handle (@ready) { my $bytes_read = sysread($handle, my $buf = '', 1024); if ($bytes_read == -1) { $job_log->error("Error reading from HandBrake socket: $!"); $select->remove($child_out); next; } elsif ($bytes_read == 0) { $job_log->debug("HandBrake socket closed"); $select->remove($handle); next; } if ($handle == $child_out) { $child_out_buffer .= $buf; # Check for complete lines in the output while ((my @lines = split(/[\r\n]+/, $child_out_buffer, 2)) > 1) { $line = $lines[0]; $child_out_buffer = $lines[1]; if ($line =~ m/Encoding: task \d+ of \d+, (\d+\.\d+) %/) { my $numerator = 100 * $1; $job->set_status($numerator, 100); } } } else { $child_err_buffer .= $buf; # Check for complete lines in the output while ((my @lines = split(/[\r\n]+/, $child_err_buffer, 2)) > 1) { $line = $lines[0]; $child_err_buffer = $lines[1]; push @{ $response->{log} }, $line; $job_log->info($line); } } } } close($child_out); close($child_err); $job->set_status(100, 1); # If the rip process failed, report an error status here waitpid($child_pid, 0); $response->{status} = $? >> 8; $response->{success} = $response->{status} == 0; if ($response->{success}) { $job_log->info("Finished rip to $response->{real_output_filename}"); } else { $job_log->warning("Ripping process returned error status: $response->{status}"); } return freeze($response); } sub get_options { my $rip_options = shift or die; my $option_name = shift or die; my $option = shift; switch ($option_name) { case 'input_filename' { return ('-i', $rip_options->{input_dir} . $rip_options->{input_filename}); } case 'title' { return ('-L') if ! defined($rip_options->{$option_name}) || $rip_options->{$option_name} < 0; return ('-t', $rip_options->{$option_name}); } case 'deinterlace' { switch ($rip_options->{$option_name}) { case 1 { return ('-d') } case 2 { return ('-5') } return (); } } return (defined $rip_options->{$option_name} ? ($option, $rip_options->{$option_name}) : ()); } } # Reads configuration options from a config file, expands the internal references, and returns the expanded form. sub parse_config { my $config_file = shift; return {} unless defined $config_file; my $config = YAML::Any::LoadFile($options->{config_file}) or die 'Unable to load configuration file: ' . $!; return $config; } sub process_config { my $config = shift or die; my $options = shift or die; my $default_options = shift or die; # Update any unset options with values from config file foreach my $option (keys %$options) { # Update the value of any option which has not been set by a command line argument if (!$options->{$option}) { # Try the config file first, otherwise use the default value if (defined $config->{options}->{$option}) { $options->{$option} = $config->{options}->{$option}; } else { $options->{$option} = $default_options->{$option}; } } } return ($config, $options); } __END__; =head1 NAME handbrake-cluster-worker - Processes DVD rips farmed out by gearman =head1 SYNOPSIS handbrake-cluster-worker.pl -h|--help handbrake-cluster-worker.pl [-l|--log-config ] [-n|--pretend] [-j|--job-servers ] [--handbrake ] =head1 DESCRIPTION Processes ripping tasks as requested by a gearman job server. Logging and the job servers to use are configurable by command line arguments. =head1 OPTIONS =over 4 =item B<-h|--help> Displays this help information and quits. =item B<-l|--log-config Elog4perl config fileE> Specifies the name of a Log4perl configuration file to control logging. This file is expected to define a logger named HandbrakeCluster::Worker. =item B<-n|--pretend> Only shows what action would be taken, no rips are actually performed. =item B<-j|--job-servers Eserver listE> Specifies a comma separated list of alternate servers to use for job distribution. =item B<--handbrake Ehandbrake executableE> Specifies an alternate HandBrake executable to use. Default is C. =cut