Updated client to report job status
Using open3 and select, readin raw bytes from the sockets rather than buffered lines to access the handbrake progress information
This commit is contained in:
@@ -121,16 +121,21 @@ my $client = Gearman::Client->new;
|
||||
$client->job_servers($options->{job_servers});
|
||||
|
||||
# Add new ripping task for each job to run
|
||||
my @running_tasks;
|
||||
my @progress;
|
||||
my $taskset = $client->new_task_set;
|
||||
foreach my $job (@jobs) {
|
||||
foreach my $i (0..$#jobs) {
|
||||
my $job = $jobs[$i];
|
||||
$progress[$i] = 0;
|
||||
|
||||
$taskset->add_task('handbrake_rip', freeze($job),
|
||||
{
|
||||
on_status => sub {
|
||||
my $numerator = shift;
|
||||
my $denominator = shift or die;
|
||||
|
||||
$log->notice("Ripping task at ", ($numerator/$denominator), "% complete");
|
||||
|
||||
$progress[$i] = $numerator / $denominator;
|
||||
|
||||
display_progress(@progress);
|
||||
},
|
||||
on_complete => \&on_complete_handler,
|
||||
on_retry => sub {
|
||||
@@ -177,6 +182,15 @@ sub on_complete_handler {
|
||||
$log->notice("Completed rip to $response->{real_output_filename}");
|
||||
}
|
||||
|
||||
sub display_progress {
|
||||
my @progress = @_;
|
||||
|
||||
local $|;
|
||||
$| = 1;
|
||||
|
||||
print "Completion: " . join(' ', map { "$_($progress[$_]%)" } 0..$#progress) . "\r";
|
||||
}
|
||||
|
||||
# Reads configuration options from a config file, expands the internal references, and returns the expanded form.
|
||||
sub parse_config {
|
||||
my $config_file = shift;
|
||||
|
||||
@@ -6,12 +6,15 @@ use strict;
|
||||
use Data::Dumper;
|
||||
use Gearman::Worker;
|
||||
use Getopt::Long;
|
||||
use IO::Select;
|
||||
use IO::Socket;
|
||||
use IPC::Open3;
|
||||
use Log::Handler;
|
||||
use Pod::Usage;
|
||||
use String::Random qw/random_string/;
|
||||
use Storable qw/freeze thaw/;
|
||||
use Switch;
|
||||
use Symbol qw/gensym/;
|
||||
|
||||
# Globals
|
||||
our %options = (
|
||||
@@ -112,7 +115,7 @@ sub handbrake_rip {
|
||||
|
||||
# Execute the ripping process
|
||||
$log->debug("Beginning ripping process with command:\n" . join(' ', @handbrake_cmd));
|
||||
my ($child_in, $child_out, $child_err);
|
||||
my ($child_in, $child_out, $child_err) = map gensym, 1..3;
|
||||
my $child_pid = open3($child_in, $child_out, $child_err, @handbrake_cmd);
|
||||
# No need to write to the child process
|
||||
close($child_in);
|
||||
@@ -120,21 +123,57 @@ sub handbrake_rip {
|
||||
# Log the output from handbrake, and return it back to the client
|
||||
$response->{log} = ();
|
||||
my $line;
|
||||
while ($line = <$child_out>) {
|
||||
# If the line is a progress report, record the current status
|
||||
# otherwise, log the line
|
||||
|
||||
# Encoding: task 1 of 1, 0.87 % (34.71 fps, avg 62.95 fps, ETA 00h07m56s)
|
||||
if ($line =~ m/Encoding: task \d+ of \d+, (\d+\.\d+) %/) {
|
||||
my $numerator = $1 * 100;
|
||||
$job->set_status($numerator, 100);
|
||||
$log->notice("Task is $numerator% complete");
|
||||
} else {
|
||||
push @{ $response->{log} }, $line;
|
||||
$log->notice($line);
|
||||
my $select = IO::Select->new();
|
||||
$select->add($child_out);
|
||||
$select->add($child_err);
|
||||
|
||||
my $child_out_buffer;
|
||||
my $child_err_buffer;
|
||||
|
||||
while (my @ready = $select->can_read()) {
|
||||
foreach my $handle (@ready) {
|
||||
my $bytes_read = sysread($handle, my $buf = '', 1024);
|
||||
if ($bytes_read == -1) {
|
||||
$log->error("Error reading from HandBrake STDOUT: $!");
|
||||
$select->remove($child_out);
|
||||
next;
|
||||
} elsif ($bytes_read == 0) {
|
||||
$log->debug("HandBrake STDOUT closed");
|
||||
$select->remove($handle);
|
||||
next;
|
||||
}
|
||||
|
||||
if ($handle == $child_out) {
|
||||
$child_out_buffer .= $buf;
|
||||
|
||||
# Check for complete lines in the output
|
||||
while ((my @lines = split(/[\r\n]+/, $child_out_buffer, 2)) > 1) {
|
||||
$line = $lines[0];
|
||||
$child_out_buffer = $lines[1];
|
||||
|
||||
if ($line =~ m/Encoding: task \d+ of \d+, (\d+\.\d+) %/) {
|
||||
my $numerator = $1;
|
||||
$job->set_status($numerator, 1);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
$child_err_buffer .= $buf;
|
||||
|
||||
# Check for complete lines in the output
|
||||
while ((my @lines = split(/[\r\n]+/, $child_err_buffer, 2)) > 1) {
|
||||
$line = $lines[0];
|
||||
$child_err_buffer = $lines[1];
|
||||
|
||||
push @{ $response->{log} }, $line;
|
||||
$log->notice($line);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
close($child_out);
|
||||
close($child_err);
|
||||
|
||||
$job->set_status(100, 100);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user