From 1d6deeb014370d98b79171cd94a8f50aabc63127 Mon Sep 17 00:00:00 2001 From: Ben Roberts Date: Sat, 20 Feb 2010 03:24:54 +0000 Subject: [PATCH] Updated client to report job status Using open3 and select, readin raw bytes from the sockets rather than buffered lines to access the handbrake progress information --- handbrake-cluster-client.pl | 22 ++++++++++--- handbrake-cluster-worker.pl | 63 ++++++++++++++++++++++++++++++------- 2 files changed, 69 insertions(+), 16 deletions(-) diff --git a/handbrake-cluster-client.pl b/handbrake-cluster-client.pl index 88c47e7..7e56635 100755 --- a/handbrake-cluster-client.pl +++ b/handbrake-cluster-client.pl @@ -121,16 +121,21 @@ my $client = Gearman::Client->new; $client->job_servers($options->{job_servers}); # Add new ripping task for each job to run -my @running_tasks; +my @progress; my $taskset = $client->new_task_set; -foreach my $job (@jobs) { +foreach my $i (0..$#jobs) { + my $job = $jobs[$i]; + $progress[$i] = 0; + $taskset->add_task('handbrake_rip', freeze($job), { on_status => sub { my $numerator = shift; my $denominator = shift or die; - - $log->notice("Ripping task at ", ($numerator/$denominator), "% complete"); + + $progress[$i] = $numerator / $denominator; + + display_progress(@progress); }, on_complete => \&on_complete_handler, on_retry => sub { @@ -177,6 +182,15 @@ sub on_complete_handler { $log->notice("Completed rip to $response->{real_output_filename}"); } +sub display_progress { + my @progress = @_; + + local $|; + $| = 1; + + print "Completion: " . join(' ', map { "$_($progress[$_]%)" } 0..$#progress) . "\r"; +} + # Reads configuration options from a config file, expands the internal references, and returns the expanded form. sub parse_config { my $config_file = shift; diff --git a/handbrake-cluster-worker.pl b/handbrake-cluster-worker.pl index cbd67f9..b3c9206 100755 --- a/handbrake-cluster-worker.pl +++ b/handbrake-cluster-worker.pl @@ -6,12 +6,15 @@ use strict; use Data::Dumper; use Gearman::Worker; use Getopt::Long; +use IO::Select; +use IO::Socket; use IPC::Open3; use Log::Handler; use Pod::Usage; use String::Random qw/random_string/; use Storable qw/freeze thaw/; use Switch; +use Symbol qw/gensym/; # Globals our %options = ( @@ -112,7 +115,7 @@ sub handbrake_rip { # Execute the ripping process $log->debug("Beginning ripping process with command:\n" . join(' ', @handbrake_cmd)); - my ($child_in, $child_out, $child_err); + my ($child_in, $child_out, $child_err) = map gensym, 1..3; my $child_pid = open3($child_in, $child_out, $child_err, @handbrake_cmd); # No need to write to the child process close($child_in); @@ -120,21 +123,57 @@ sub handbrake_rip { # Log the output from handbrake, and return it back to the client $response->{log} = (); my $line; - while ($line = <$child_out>) { - # If the line is a progress report, record the current status - # otherwise, log the line - # Encoding: task 1 of 1, 0.87 % (34.71 fps, avg 62.95 fps, ETA 00h07m56s) - if ($line =~ m/Encoding: task \d+ of \d+, (\d+\.\d+) %/) { - my $numerator = $1 * 100; - $job->set_status($numerator, 100); - $log->notice("Task is $numerator% complete"); - } else { - push @{ $response->{log} }, $line; - $log->notice($line); + my $select = IO::Select->new(); + $select->add($child_out); + $select->add($child_err); + + my $child_out_buffer; + my $child_err_buffer; + + while (my @ready = $select->can_read()) { + foreach my $handle (@ready) { + my $bytes_read = sysread($handle, my $buf = '', 1024); + if ($bytes_read == -1) { + $log->error("Error reading from HandBrake STDOUT: $!"); + $select->remove($child_out); + next; + } elsif ($bytes_read == 0) { + $log->debug("HandBrake STDOUT closed"); + $select->remove($handle); + next; + } + + if ($handle == $child_out) { + $child_out_buffer .= $buf; + + # Check for complete lines in the output + while ((my @lines = split(/[\r\n]+/, $child_out_buffer, 2)) > 1) { + $line = $lines[0]; + $child_out_buffer = $lines[1]; + + if ($line =~ m/Encoding: task \d+ of \d+, (\d+\.\d+) %/) { + my $numerator = $1; + $job->set_status($numerator, 1); + } + } + } else { + $child_err_buffer .= $buf; + + # Check for complete lines in the output + while ((my @lines = split(/[\r\n]+/, $child_err_buffer, 2)) > 1) { + $line = $lines[0]; + $child_err_buffer = $lines[1]; + + push @{ $response->{log} }, $line; + $log->notice($line); + } + } } } + close($child_out); + close($child_err); $job->set_status(100, 100);