Files
handbrake-cluster/handbrake-cluster-worker.pl

295 lines
9.4 KiB
Perl
Executable File

#!/usr/bin/perl
package HandbrakeCluster::Worker;
use warnings;
use strict;
use Data::Dumper;
use Gearman::Worker;
use Getopt::Long;
use IO::Select;
use IO::Socket;
use IPC::Open3;
use Log::Log4perl;
use PHP::Serialization;
use Pod::Usage;
use String::Random qw/random_string/;
use Storable qw/freeze thaw/;
use Switch;
use Symbol qw/gensym/;
use YAML::Any;
# Globals
our $default_options = {
log_config => 'logging.conf',
help => 0,
pretend => 0,
job_servers => ['build0.sihnon.net', 'build1.sihnon.net', 'build2.sihnon.net'],
gearman_prefix => '',
handbrake => '/usr/bin/HandBrakeCLI',
config_file => '',
php_client => 0,
};
my $options = { map { $_ => undef } keys %$default_options };
Getopt::Long::Configure( qw(bundling no_getopt_compat) );
GetOptions(
'help|h' => \$options->{help},
'log-config|l=s' => \$options->{log_config},
'pretend|n' => \$options->{pretend},
'job-servers|j=s@' => \$options->{job_servers},
'gearman-prefix=s' => \$options->{gearman_prefix},
'handbrake' => \$options->{handbrake},
'config|c=s' => \$options->{config_file},
) or pod2usage(-verbose => 0);
pod2usage(-verbose => 1) if ($options->{help});
# Parse the configuration file (if any), and merge/validate the options
my $config = parse_config($options->{config_file});
($config, $options) = process_config($config, $options, $default_options);
# Setup logging
Log::Log4perl->init($options->{log_config});
my $worker_log = Log::Log4perl->get_logger('HandbrakeCluster::Worker');
$worker_log->debug("Logging started");
# Setup the worker, and listen for jobs
my $worker = Gearman::Worker->new(job_servers => $options->{job_servers});
$worker->prefix($options->{gearman_prefix}) if $options->{gearman_prefix};
$worker->register_function(handbrake_rip => \&handbrake_rip);
$worker->work while 1;
sub handbrake_rip {
my $job = shift;
my $rip_options;
if ($options->{php_client}) {
$rip_options = PHP::Serialization::unserialize($job->arg);
} else {
$rip_options = thaw($job->arg);
}
my $response = {};
# Setup logging for this job
my $job_log = Log::Log4perl->get_logger('HandbrakeCluster::Worker::Job');
Log::Log4perl::MDC->put('job_id', $rip_options->{db_job_id});
$job_log->info("Beginning new rip to $rip_options->{output_filename}");
# Generate a unique filename based on the output filename to prevent clashes from previous runs
my $uuid = random_string('cccccc');
$rip_options->{real_output_filename} = $rip_options->{output_dir} . $rip_options->{output_filename};
$rip_options->{real_output_filename} =~ s/\.([^\.]+)$/\.$uuid\.$1/;
$response->{real_output_filename} = $rip_options->{real_output_filename};
# Generate the command line for handbrake
my @handbrake_cmd = (
# Reduce the priority of the ripping process, since it is very processor intensive
'nice', '-n', $rip_options->{nice},
# Construct the handbrake command line
$options->{handbrake},
get_options($rip_options, 'input_filename', '-i'),
get_options($rip_options, 'real_output_filename', '-o'),
get_options($rip_options, 'title'),
get_options($rip_options, 'format', '-f'),
get_options($rip_options, 'video_codec', '-e'),
get_options($rip_options, 'quantizer', '-q'),
get_options($rip_options, 'video_width', '-w'),
get_options($rip_options, 'video_height', '-l'),
get_options($rip_options, 'deinterlace'),
get_options($rip_options, 'audio_tracks', '-a'),
get_options($rip_options, 'audio_codec', '-E'),
get_options($rip_options, 'audio_names', '-A'),
get_options($rip_options, 'subtitle_tracks', '-s'),
);
# Return a copy of the command used to rip the title
$response->{handbrake_cmd} = @handbrake_cmd;
# flag the start of the job
$job->set_status(0, 100);
# Execute the ripping process
$job_log->debug("Beginning ripping process with command:\n" . join(' ', @handbrake_cmd));
my ($child_in, $child_out, $child_err) = map gensym, 1..3;
my $child_pid = open3($child_in, $child_out, $child_err, @handbrake_cmd);
# No need to write to the child process
close($child_in);
# Log the output from handbrake, and return it back to the client
$response->{log} = ();
my $line;
my $select = IO::Select->new();
$select->add($child_out);
$select->add($child_err);
my $child_out_buffer;
my $child_err_buffer;
while (my @ready = $select->can_read()) {
foreach my $handle (@ready) {
my $bytes_read = sysread($handle, my $buf = '', 1024);
if ($bytes_read == -1) {
$job_log->error("Error reading from HandBrake socket: $!");
$select->remove($child_out);
next;
} elsif ($bytes_read == 0) {
$job_log->debug("HandBrake socket closed");
$select->remove($handle);
next;
}
if ($handle == $child_out) {
$child_out_buffer .= $buf;
# Check for complete lines in the output
while ((my @lines = split(/[\r\n]+/, $child_out_buffer, 2)) > 1) {
$line = $lines[0];
$child_out_buffer = $lines[1];
if ($line =~ m/Encoding: task \d+ of \d+, (\d+\.\d+) %/) {
my $numerator = 100 * $1;
$job->set_status($numerator, 100);
}
}
} else {
$child_err_buffer .= $buf;
# Check for complete lines in the output
while ((my @lines = split(/[\r\n]+/, $child_err_buffer, 2)) > 1) {
$line = $lines[0];
$child_err_buffer = $lines[1];
push @{ $response->{log} }, $line;
$job_log->info($line);
}
}
}
}
close($child_out);
close($child_err);
$job->set_status(100, 1);
# If the rip process failed, report an error status here
waitpid($child_pid, 0);
$response->{status} = $? >> 8;
$response->{success} = $response->{status} == 0;
if ($response->{success}) {
$job_log->info("Finished rip to $response->{real_output_filename}");
} else {
$job_log->warning("Ripping process returned error status: $response->{status}");
}
return freeze($response);
}
sub get_options {
my $rip_options = shift or die;
my $option_name = shift or die;
my $option = shift;
switch ($option_name) {
case 'input_filename' {
return ('-i', $rip_options->{input_dir} . $rip_options->{input_filename});
}
case 'title' {
return ('-L') if ! defined($rip_options->{$option_name}) || $rip_options->{$option_name} < 0;
return ('-t', $rip_options->{$option_name});
}
case 'deinterlace' {
switch ($rip_options->{$option_name}) {
case 1 { return ('-d') }
case 2 { return ('-5') }
return ();
}
}
return (defined $rip_options->{$option_name} ? ($option, $rip_options->{$option_name}) : ());
}
}
# Reads configuration options from a config file, expands the internal references, and returns the expanded form.
sub parse_config {
my $config_file = shift;
return {} unless defined $config_file;
my $config = YAML::Any::LoadFile($options->{config_file}) or die 'Unable to load configuration file: ' . $!;
return $config;
}
sub process_config {
my $config = shift or die;
my $options = shift or die;
my $default_options = shift or die;
# Update any unset options with values from config file
foreach my $option (keys %$options) {
# Update the value of any option which has not been set by a command line argument
if (!$options->{$option}) {
# Try the config file first, otherwise use the default value
if (defined $config->{options}->{$option}) {
$options->{$option} = $config->{options}->{$option};
} else {
$options->{$option} = $default_options->{$option};
}
}
}
return ($config, $options);
}
__END__;
=head1 NAME
handbrake-cluster-worker - Processes DVD rips farmed out by gearman
=head1 SYNOPSIS
handbrake-cluster-worker.pl -h|--help
handbrake-cluster-worker.pl [-l|--log-config <log4perl config file>]
[-n|--pretend]
[-j|--job-servers <server list>]
[--handbrake <handbrake executable>]
=head1 DESCRIPTION
Processes ripping tasks as requested by a gearman job server. Logging and the
job servers to use are configurable by command line arguments.
=head1 OPTIONS
=over 4
=item B<-h|--help>
Displays this help information and quits.
=item B<-l|--log-config E<lt>log4perl config fileE<gt>>
Specifies the name of a Log4perl configuration file to control logging.
This file is expected to define a logger named HandbrakeCluster::Worker.
=item B<-n|--pretend>
Only shows what action would be taken, no rips are actually performed.
=item B<-j|--job-servers E<lt>server listE<gt>>
Specifies a comma separated list of alternate servers to use for job
distribution.
=item B<--handbrake E<lt>handbrake executableE<gt>>
Specifies an alternate HandBrake executable to use. Default is
C</usr/bin/HandBrakeCLI>.
=cut