Hello community, here is the log from the commit of package perl-Minion for openSUSE:Factory checked in at 2016-09-25 14:44:46 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ Comparing /work/SRC/openSUSE:Factory/perl-Minion (Old) and /work/SRC/openSUSE:Factory/.perl-Minion.new (New) ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Package is "perl-Minion" Changes: -------- --- /work/SRC/openSUSE:Factory/perl-Minion/perl-Minion.changes 2016-09-07 11:45:07.000000000 +0200 +++ /work/SRC/openSUSE:Factory/.perl-Minion.new/perl-Minion.changes 2016-09-25 14:44:49.000000000 +0200 @@ -1,0 +2,18 @@ +Tue Sep 20 05:51:48 UTC 2016 - co...@suse.com + +- updated to 6.0 + see /usr/share/doc/packages/perl-Minion/Changes + + 6.0 2016-09-17 + - Removed TTIN, TTOU and USR1 signals from worker command. + - Changed return value of start method in Minion::Job. + - Added support for worker remote control commands. + - Added commands attribute to Minion::Worker. + - Added add_command and process_commands methods to Minion::Worker. + - Added pid and stop methods to Minion::Job. + - Added broadcast and receive methods to Minion::Backend::Pg. + - Added -b option to job command. + - Improved worker command with support for jobs and stop remote control + commands. + +------------------------------------------------------------------- Old: ---- Minion-5.09.tar.gz New: ---- Minion-6.0.tar.gz ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ Other differences: ------------------ ++++++ perl-Minion.spec ++++++ --- /var/tmp/diff_new_pack.l8suIh/_old 2016-09-25 14:44:50.000000000 +0200 +++ /var/tmp/diff_new_pack.l8suIh/_new 2016-09-25 14:44:50.000000000 +0200 @@ -17,7 +17,7 @@ Name: perl-Minion -Version: 5.09 +Version: 6.0 Release: 0 %define cpan_name Minion Summary: Job queue @@ -38,8 +38,9 @@ Minion is a job queue for the at http://mojolicious.org real-time web framework, with support for multiple named queues, priorities, delayed jobs, job dependencies, job results, retries with backoff, statistics, -distributed workers, parallel processing, autoscaling, resource leak -protection and multiple backends (such as at http://www.postgresql.org). +distributed workers, parallel processing, autoscaling, remote control, +resource leak protection and multiple backends (such as at +http://www.postgresql.org). Job queues allow you to process time and/or computationally intensive tasks in background processes, outside of the request/response lifecycle. Among ++++++ Minion-5.09.tar.gz -> Minion-6.0.tar.gz ++++++ diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/Minion-5.09/Changes new/Minion-6.0/Changes --- old/Minion-5.09/Changes 2016-08-31 19:43:50.000000000 +0200 +++ new/Minion-6.0/Changes 2016-09-17 12:16:51.000000000 +0200 @@ -1,4 +1,16 @@ +6.0 2016-09-17 + - Removed TTIN, TTOU and USR1 signals from worker command. + - Changed return value of start method in Minion::Job. + - Added support for worker remote control commands. + - Added commands attribute to Minion::Worker. + - Added add_command and process_commands methods to Minion::Worker. + - Added pid and stop methods to Minion::Job. + - Added broadcast and receive methods to Minion::Backend::Pg. + - Added -b option to job command. + - Improved worker command with support for jobs and stop remote control + commands. + 5.09 2016-08-31 - Added EXPERIMENTAL enqueued_jobs field to stats methods in Minion and Minion::Backend::Pg. diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/Minion-5.09/META.json new/Minion-6.0/META.json --- old/Minion-5.09/META.json 2016-09-01 06:21:57.000000000 +0200 +++ new/Minion-6.0/META.json 2016-09-19 16:18:08.000000000 +0200 @@ -54,6 +54,6 @@ }, "x_IRC" : "irc://irc.perl.org/#mojo" }, - "version" : "5.09", + "version" : "6.0", "x_serialization_backend" : "JSON::PP version 2.27400" } diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/Minion-5.09/META.yml new/Minion-6.0/META.yml --- old/Minion-5.09/META.yml 2016-09-01 06:21:57.000000000 +0200 +++ new/Minion-6.0/META.yml 2016-09-19 16:18:07.000000000 +0200 @@ -27,5 +27,5 @@ homepage: http://mojolicious.org license: http://www.opensource.org/licenses/artistic-license-2.0 repository: https://github.com/kraih/minion.git -version: '5.09' +version: '6.0' x_serialization_backend: 'CPAN::Meta::YAML version 0.018' diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/Minion-5.09/lib/Minion/Backend/Pg.pm new/Minion-6.0/lib/Minion/Backend/Pg.pm --- old/Minion-5.09/lib/Minion/Backend/Pg.pm 2016-08-31 19:55:44.000000000 +0200 +++ new/Minion-6.0/lib/Minion/Backend/Pg.pm 2016-09-17 02:30:26.000000000 +0200 @@ -8,6 +8,14 @@ has 'pg'; +sub broadcast { + my ($self, $command, $args, $ids) = (shift, shift, shift || [], shift || []); + return !!$self->pg->db->query( + q{update minion_workers set inbox = inbox || $1::jsonb + where (id = any($2) or $2 = '{}')}, {json => [[$command, @$args]]}, $ids + )->rows; +} + sub dequeue { my ($self, $id, $wait, $options) = @_; @@ -85,6 +93,16 @@ return $self; } +sub receive { + my $array = shift->pg->db->query( + "update minion_workers as new set inbox = '[]' + from (select id, inbox from minion_workers where id = ? for update) as old + where new.id = old.id and old.inbox != '[]' + returning old.inbox", shift + )->expand->array; + return $array ? $array->[0] : []; +} + sub register_worker { my ($self, $id) = @_; @@ -211,7 +229,7 @@ where delayed <= now() and (parents = '{}' or cardinality(parents) = ( select count(*) from minion_jobs where id = any(j.parents) and state = 'finished' - )) and queue = any (?) and state = 'inactive' and task = any (?) + )) and queue = any(?) and state = 'inactive' and task = any(?) order by priority desc, id limit 1 for update skip locked @@ -276,6 +294,14 @@ L<Minion::Backend::Pg> inherits all methods from L<Minion::Backend> and implements the following new ones. +=head2 broadcast + + my $bool = $backend->broadcast('some_command'); + my $bool = $backend->broadcast('some_command', [@args]); + my $bool = $backend->broadcast('some_command', [@args], [$id1, $id2, $id3]); + +Broadcast remote control command to one or more workers. + =head2 dequeue my $job_info = $backend->dequeue($worker_id, 0.5); @@ -549,6 +575,12 @@ Construct a new L<Minion::Backend::Pg> object. +=head2 receive + + my $commands = $backend->receive($worker_id); + +Receive remote control commands for worker. + =head2 register_worker my $worker_id = $backend->register_worker; @@ -808,7 +840,11 @@ drop function if exists minion_jobs_notify_workers(); -- 10 up -alter table minion_jobs add column parents bigint[] default '{}'::bigint[]; +alter table minion_jobs add column parents bigint[] default '{}'; -- 11 up create index on minion_jobs (state, priority desc, id); + +-- 12 up +alter table minion_workers add column inbox jsonb + check(jsonb_typeof(inbox) = 'array') default '[]'; diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/Minion-5.09/lib/Minion/Backend.pm new/Minion-6.0/lib/Minion/Backend.pm --- old/Minion-5.09/lib/Minion/Backend.pm 2016-06-24 08:49:50.000000000 +0200 +++ new/Minion-6.0/lib/Minion/Backend.pm 2016-09-16 21:48:35.000000000 +0200 @@ -5,6 +5,7 @@ has 'minion'; +sub broadcast { croak 'Method "broadcast" not implemented by subclass' } sub dequeue { croak 'Method "dequeue" not implemented by subclass' } sub enqueue { croak 'Method "enqueue" not implemented by subclass' } sub fail_job { croak 'Method "fail_job" not implemented by subclass' } @@ -12,6 +13,7 @@ sub job_info { croak 'Method "job_info" not implemented by subclass' } sub list_jobs { croak 'Method "list_jobs" not implemented by subclass' } sub list_workers { croak 'Method "list_workers" not implemented by subclass' } +sub receive { croak 'Method "receive" not implemented by subclass' } sub register_worker { croak 'Method "register_worker" not implemented by subclass'; @@ -42,6 +44,7 @@ package Minion::Backend::MyBackend; use Mojo::Base 'Minion::Backend'; + sub broadcast {...} sub dequeue {...} sub enqueue {...} sub fail_job {...} @@ -49,6 +52,7 @@ sub job_info {...} sub list_jobs {...} sub list_workers {...} + sub receive {...} sub register_worker {...} sub remove_job {...} sub repair {...} @@ -79,6 +83,14 @@ L<Minion::Backend> inherits all methods from L<Mojo::Base> and implements the following new ones. +=head2 broadcast + + my $bool = $backend->broadcast('some_command'); + my $bool = $backend->broadcast('some_command', [@args]); + my $bool = $backend->broadcast('some_command', [@args], [$id1, $id2, $id3]); + +Broadcast remote control command to one or more workers. + =head2 dequeue my $job_info = $backend->dequeue($worker_id, 0.5); @@ -351,6 +363,12 @@ Returns the same information as L</"worker_info"> but in batches. Meant to be overloaded in a subclass. +=head2 receive + + my $commands = $backend->receive($worker_id); + +Receive remote control commands for worker. + =head2 register_worker my $worker_id = $backend->register_worker; diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/Minion-5.09/lib/Minion/Command/minion/job.pm new/Minion-6.0/lib/Minion/Command/minion/job.pm --- old/Minion-5.09/lib/Minion/Command/minion/job.pm 2016-06-24 08:49:50.000000000 +0200 +++ new/Minion-6.0/lib/Minion/Command/minion/job.pm 2016-09-17 12:14:56.000000000 +0200 @@ -13,10 +13,11 @@ my ($args, $options) = ([], {}); GetOptionsFromArray \@args, - 'A|attempts=i' => \$options->{attempts}, - 'a|args=s' => sub { $args = decode_json($_[1]) }, - 'd|delay=i' => \$options->{delay}, - 'e|enqueue=s' => \my $enqueue, + 'A|attempts=i' => \$options->{attempts}, + 'a|args=s' => sub { $args = decode_json($_[1]) }, + 'b|broadcast=s' => (\my $command), + 'd|delay=i' => \$options->{delay}, + 'e|enqueue=s' => \my $enqueue, 'l|limit=i' => \(my $limit = 100), 'o|offset=i' => \(my $offset = 0), 'P|parent=s' => ($options->{parents} = []), @@ -28,13 +29,19 @@ 's|stats' => \my $stats, 't|task=s' => \$options->{task}, 'w|workers' => \my $workers; - my $id = @args ? shift @args : undef; + + # Worker remote control command + return $self->app->minion->backend->broadcast($command, $args, \@args) + if $command; # Enqueue return say $self->app->minion->enqueue($enqueue, $args, $options) if $enqueue; - # Show stats or list jobs/workers + # Show stats return $self->_stats if $stats; + + # List jobs/workers + my $id = @args ? shift @args : undef; return $id ? $self->_worker($id) : $self->_list_workers($offset, $limit) if $workers; return $self->_list_jobs($offset, $limit, $options) unless defined $id; @@ -91,38 +98,44 @@ ./myapp.pl minion job -e foo -P 10023 -P 10024 -p 5 -q important ./myapp.pl minion job -R -d 10 10023 ./myapp.pl minion job -r 10023 + ./myapp.pl minion job -b jobs -a '[12]' + ./myapp.pl minion job -b jobs -a '[12]' 23 24 25 Options: - -A, --attempts <number> Number of times performing this new job will be - attempted, defaults to 1 - -a, --args <JSON array> Arguments for new job in JSON format - -d, --delay <seconds> Delay new job for this many seconds - -e, --enqueue <name> New job to be enqueued - -h, --help Show this summary of available options - --home <path> Path to home directory of your application, - defaults to the value of MOJO_HOME or - auto-detection - -l, --limit <number> Number of jobs/workers to show when listing them, - defaults to 100 - -m, --mode <name> Operating mode for your application, defaults to - the value of MOJO_MODE/PLACK_ENV or "development" - -o, --offset <number> Number of jobs/workers to skip when listing them, - defaults to 0 - -P, --parent <id> One or more jobs the new job depends on - -p, --priority <number> Priority of new job, defaults to 0 - -q, --queue <name> Queue to put new job in, defaults to "default", or - list only jobs in this queue - -R, --retry Retry job - -r, --remove Remove job - -S, --state <state> List only jobs in this state - -s, --stats Show queue statistics - -t, --task <name> List only jobs for this task - -w, --workers List workers instead of jobs, or show information - for a specific worker + -A, --attempts <number> Number of times performing this new job will be + attempted, defaults to 1 + -a, --args <JSON array> Arguments for new job or worker remote control + command in JSON format + -b, --broadcast <command> Broadcast remote control command to one or more + workers + -d, --delay <seconds> Delay new job for this many seconds + -e, --enqueue <task> New job to be enqueued + -h, --help Show this summary of available options + --home <path> Path to home directory of your application, + defaults to the value of MOJO_HOME or + auto-detection + -l, --limit <number> Number of jobs/workers to show when listing + them, defaults to 100 + -m, --mode <name> Operating mode for your application, defaults to + the value of MOJO_MODE/PLACK_ENV or + "development" + -o, --offset <number> Number of jobs/workers to skip when listing + them, defaults to 0 + -P, --parent <id> One or more jobs the new job depends on + -p, --priority <number> Priority of new job, defaults to 0 + -q, --queue <name> Queue to put new job in, defaults to "default", + or list only jobs in this queue + -R, --retry Retry job + -r, --remove Remove job + -S, --state <name> List only jobs in this state + -s, --stats Show queue statistics + -t, --task <name> List only jobs for this task + -w, --workers List workers instead of jobs, or show + information for a specific worker =head1 DESCRIPTION -L<Minion::Command::minion::job> manages L<Minion> jobs. +L<Minion::Command::minion::job> manages the L<Minion> job queue. =head1 ATTRIBUTES diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/Minion-5.09/lib/Minion/Command/minion/worker.pm new/Minion-6.0/lib/Minion/Command/minion/worker.pm --- old/Minion-5.09/lib/Minion/Command/minion/worker.pm 2016-06-24 08:49:50.000000000 +0200 +++ new/Minion-6.0/lib/Minion/Command/minion/worker.pm 2016-09-18 19:21:55.000000000 +0200 @@ -11,24 +11,28 @@ my ($self, @args) = @_; GetOptionsFromArray \@args, - 'I|heartbeat-interval=i' => \($self->{hearthbeat} = 60), - 'j|jobs=i' => \($self->{max} = 4), + 'C|command-interval=i' => \($self->{commands} = 10), + 'I|heartbeat-interval=i' => \($self->{heartbeat} = 60), + 'j|jobs=i' => \($self->{max} = 4), 'q|queue=s' => \my @queues, - 'R|repair-interval=i' => \($self->{repair} = 21600); + 'R|repair-interval=i' => \($self->{repair} = 21600); $self->{queues} = @queues ? \@queues : ['default']; local $SIG{CHLD} = 'DEFAULT'; local $SIG{INT} = local $SIG{TERM} = sub { $self->{finished}++ }; local $SIG{QUIT} = sub { ++$self->{finished} and kill 'KILL', keys %{$self->{jobs}} }; - local $SIG{TTIN} = sub { $self->{max}++ }; - local $SIG{TTOU} = sub { $self->{max}-- if $self->{max} > 0 }; - local $SIG{USR1} = sub { $self->{max} = 0 }; - # Log fatal errors + # Remote control commands need to validate arguments carefully my $app = $self->app; - $app->log->debug("Worker $$ started"); my $worker = $self->{worker} = $app->minion->worker; + $worker->add_command( + jobs => sub { $self->{max} = $_[1] if ($_[1] // '') =~ /^\d+$/ }); + $worker->add_command( + stop => sub { $self->{jobs}{$_[1]}->stop if $self->{jobs}{$_[1] // ''} }); + + # Log fatal errors + $app->log->debug("Worker $$ started"); eval { $self->_work until $self->{finished} && !keys %{$self->{jobs}}; 1 } or $app->log->fatal("Worker error: $@"); $worker->unregister; @@ -39,8 +43,12 @@ # Send heartbeats in regular intervals my $worker = $self->{worker}; - $worker->register and $self->{register} = steady_time + $self->{hearthbeat} - if ($self->{register} || 0) < steady_time; + $worker->register and $self->{lr} = steady_time + $self->{heartbeat} + if ($self->{lr} || 0) < steady_time; + + # Process worker remote control commands in regular intervals + $worker->process_commands and $self->{lp} = steady_time + $self->{commands} + if ($self->{lp} || 0) < steady_time; # Repair in regular intervals (randomize to avoid congestion) if (($self->{check} || 0) < steady_time) { @@ -53,15 +61,15 @@ # Check if jobs are finished my $jobs = $self->{jobs} ||= {}; - $jobs->{$_}->is_finished($_) and delete $jobs->{$_} for keys %$jobs; + $jobs->{$_}->is_finished and delete $jobs->{$_} for keys %$jobs; # Wait if job limit has been reached or worker is stopping if (($self->{max} <= keys %$jobs) || $self->{finished}) { sleep 1 } # Try to get more jobs elsif (my $job = $worker->dequeue(5 => {queues => $self->{queues}})) { - $jobs->{my $pid = $job->start} = $job; - my ($id, $task) = ($job->id, $job->task); + $jobs->{my $id = $job->id} = $job->start; + my ($pid, $task) = ($job->pid, $job->task); $self->app->log->debug( qq{Performing job "$id" with task "$task" in process $pid}); } @@ -80,10 +88,12 @@ Usage: APPLICATION minion worker [OPTIONS] ./myapp.pl minion worker - ./myapp.pl minion worker -m production -I 15 -R 3600 -j 10 + ./myapp.pl minion worker -m production -I 15 -C 5 -R 3600 -j 10 ./myapp.pl minion worker -q important -q default Options: + -C, --command-interval <seconds> Worker remote control command interval, + defaults to 10 -h, --help Show this summary of available options --home <path> Path to home directory of your application, defaults to the value of @@ -117,19 +127,26 @@ Stop immediately without finishing the current jobs. -=head2 TTIN +=head1 REMOTE CONTROL COMMANDS + +The L<Minion::Command::minion::worker> process can be controlled at runtime +with the following remote control commands. + +=head2 jobs -Increase the number of jobs to perform concurrently by one. + $ ./myapp.pl minion job -b jobs -a '[10]' + $ ./myapp.pl minion job -b jobs -a '[10]' 23 -=head2 TTOU +Change the number of jobs to perform concurrently. Setting this value to C<0> +will effectively pause the worker. That means all current jobs will be finished, +but no new ones accepted, until the number is increased again. -Decrease the number of jobs to perform concurrently by one. +=head2 stop -=head2 USR1 + $ ./myapp.pl minion job -b stop -a '[10025]' + $ ./myapp.pl minion job -b stop -a '[10025]' 23 -Pause the worker by setting the number of jobs to perform concurrently to zero. -That means it will finish all current jobs, but not accept new ones, until the -number is increased again with L</"TTIN">. +Stop a job that is currently being performed immediately. =head1 ATTRIBUTES diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/Minion-5.09/lib/Minion/Job.pm new/Minion-6.0/lib/Minion/Job.pm --- old/Minion-5.09/lib/Minion/Job.pm 2016-06-24 08:49:50.000000000 +0200 +++ new/Minion-6.0/lib/Minion/Job.pm 2016-09-17 11:20:24.000000000 +0200 @@ -24,18 +24,22 @@ sub info { $_[0]->minion->backend->job_info($_[0]->id) } sub is_finished { - my ($self, $pid) = @_; - return undef unless waitpid($pid, WNOHANG) == $pid; + my $self = shift; + return undef unless waitpid($self->{pid}, WNOHANG) == $self->{pid}; $? ? $self->fail("Non-zero exit status (@{[$? >> 8]})") : $self->finish; return 1; } +sub stop { kill 'KILL', shift->{pid} } + sub perform { my $self = shift; - waitpid $self->start, 0; + waitpid $self->start->pid, 0; $? ? $self->fail("Non-zero exit status (@{[$? >> 8]})") : $self->finish; } +sub pid { shift->{pid} } + sub remove { $_[0]->minion->backend->remove_job($_[0]->id) } sub retry { @@ -48,7 +52,7 @@ # Parent die "Can't fork: $!" unless defined(my $pid = fork); - $self->emit(spawn => $pid) and return $pid if $pid; + return $self->emit(spawn => $pid) if $self->{pid} = $pid; # Child eval { @@ -59,7 +63,7 @@ 1; } or $self->fail($@); - exit 0; + POSIX::_exit(0); } 1; @@ -333,7 +337,7 @@ =head2 is_finished - my $bool = $job->is_finished($pid); + my $bool = $job->is_finished; Check if job performed with L</"start"> is finished. @@ -343,6 +347,12 @@ Perform job in new process and wait for it to finish. +=head2 pid + + my $pid = $job->pid; + +Process id of the process spawned by L</"start"> if available. + =head2 remove my $bool = $job->remove; @@ -383,17 +393,22 @@ =head2 start - my $pid = $job->start; + $job = $job->start; Perform job in new process, but do not wait for it to finish. # Perform two jobs concurrently - my $pid1 = $job1->start; - my $pid2 = $job2->start; + $job1->start; + $job2->start; my ($first, $second); sleep 1 - until $first ||= $job1->is_finished($pid1) - and $second ||= $job2->is_finished($pid2); + until $first ||= $job1->is_finished and $second ||= $job2->is_finished; + +=head2 stop + + $job->stop; + +Stop job performed with L</"start"> immediately. =head1 SEE ALSO diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/Minion-5.09/lib/Minion/Worker.pm new/Minion-6.0/lib/Minion/Worker.pm --- old/Minion-5.09/lib/Minion/Worker.pm 2016-06-24 08:49:50.000000000 +0200 +++ new/Minion-6.0/lib/Minion/Worker.pm 2016-09-17 10:55:06.000000000 +0200 @@ -1,8 +1,11 @@ package Minion::Worker; use Mojo::Base 'Mojo::EventEmitter'; +has commands => sub { {} }; has [qw(id minion)]; +sub add_command { $_[0]->commands->{$_[1]} = $_[2] and return $_[0] } + sub dequeue { my ($self, $wait, $options) = @_; @@ -24,6 +27,17 @@ sub info { $_[0]->minion->backend->worker_info($_[0]->id) } +sub process_commands { + my $self = shift; + + for my $command (@{$self->minion->backend->receive($self->id)}) { + next unless my $cb = $self->commands->{shift @$command}; + $self->$cb(@$command); + } + + return $self; +} + sub register { $_[0]->id($_[0]->minion->backend->register_worker($_[0]->id)) } sub unregister { @@ -74,6 +88,13 @@ L<Minion::Worker> implements the following attributes. +=head2 commands + + my $commands = $worker->commands; + $worker = $worker->commands({jobs => sub {...}}); + +Registered worker remote control commands. + =head2 id my $id = $worker->id; @@ -93,6 +114,17 @@ L<Minion::Worker> inherits all methods from L<Mojo::EventEmitter> and implements the following new ones. +=head2 add_command + + $worker = $worker->add_command(jobs => sub {...}); + +Register a worker remote control command. + + $worker->add_command(foo => sub { + my ($worker, @args) = @_; + ... + }); + =head2 dequeue my $job = $worker->dequeue(0.5); @@ -159,6 +191,12 @@ =back +=head2 process_commands + + $worker = $worker->process_commands; + +Process worker remote control commands. + =head2 register $worker = $worker->register; diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/Minion-5.09/lib/Minion.pm new/Minion-6.0/lib/Minion.pm --- old/Minion-5.09/lib/Minion.pm 2016-08-31 19:34:19.000000000 +0200 +++ new/Minion-6.0/lib/Minion.pm 2016-09-17 11:06:08.000000000 +0200 @@ -2,6 +2,7 @@ use Mojo::Base 'Mojo::EventEmitter'; use Carp 'croak'; +use Config; use Minion::Job; use Minion::Worker; use Mojo::Loader 'load_class'; @@ -15,7 +16,7 @@ has remove_after => 172800; has tasks => sub { {} }; -our $VERSION = '5.09'; +our $VERSION = '6.0'; sub add_task { ($_[0]->tasks->{$_[1]} = $_[2]) and return $_[0] } @@ -66,6 +67,10 @@ sub worker { my $self = shift; + + # No fork emulation support + croak 'Minion workers do not support fork emulation' if $Config{d_pseudofork}; + my $worker = Minion::Worker->new(minion => $self); $self->emit(worker => $worker); return $worker; @@ -121,8 +126,9 @@ L<Minion> is a job queue for the L<Mojolicious|http://mojolicious.org> real-time web framework, with support for multiple named queues, priorities, delayed jobs, job dependencies, job results, retries with backoff, statistics, distributed -workers, parallel processing, autoscaling, resource leak protection and multiple -backends (such as L<PostgreSQL|http://www.postgresql.org>). +workers, parallel processing, autoscaling, remote control, resource leak +protection and multiple backends (such as +L<PostgreSQL|http://www.postgresql.org>). Job queues allow you to process time and/or computationally intensive tasks in background processes, outside of the request/response lifecycle. Among those diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/Minion-5.09/t/pg.t new/Minion-6.0/t/pg.t --- old/Minion-5.09/t/pg.t 2016-08-31 19:55:44.000000000 +0200 +++ new/Minion-6.0/t/pg.t 2016-09-17 11:17:59.000000000 +0200 @@ -24,11 +24,11 @@ isa_ok $worker->minion->app, 'Mojolicious', 'has default application'; # Migrate up and down -is $minion->backend->pg->migrations->active, 11, 'active version is 11'; +is $minion->backend->pg->migrations->active, 12, 'active version is 12'; is $minion->backend->pg->migrations->migrate(0)->active, 0, 'active version is 0'; -is $minion->backend->pg->migrations->migrate->active, 11, - 'active version is 11'; +is $minion->backend->pg->migrations->migrate->active, 12, + 'active version is 12'; # Register and unregister $worker->register; @@ -578,16 +578,16 @@ $job2 = $worker->dequeue(0); my $job3 = $worker->dequeue(0); my $job4 = $worker->dequeue(0); -$pid = $job->start; -my $pid2 = $job2->start; -my $pid3 = $job3->start; -my $pid4 = $job4->start; +$job->start; +$job2->start; +$job3->start; +$job4->start; my ($first, $second, $third, $fourth); usleep 50000 - until $first ||= $job->is_finished($pid) - and $second ||= $job2->is_finished($pid2) - and $third ||= $job3->is_finished($pid3) - and $fourth ||= $job4->is_finished($pid4); + until $first ||= $job->is_finished + and $second ||= $job2->is_finished + and $third ||= $job3->is_finished + and $fourth ||= $job4->is_finished; is $minion->job($id)->info->{state}, 'finished', 'right state'; is_deeply $minion->job($id)->info->{result}, {added => 21}, 'right result'; is $minion->job($id2)->info->{state}, 'finished', 'right state'; @@ -599,6 +599,19 @@ 'right result'; $worker->unregister; +# Stopping jobs +$minion->add_task(long_running => sub { sleep 1000 }); +$worker = $minion->worker->register; +$minion->enqueue('long_running'); +$job = $worker->dequeue(0); +ok $job->start->pid, 'has a process id'; +ok !$job->is_finished, 'job is not finished'; +$job->stop; +usleep 5000 until $job->is_finished; +is $job->info->{state}, 'failed', 'right state'; +like $job->info->{result}, qr/Non-zero exit status/, 'right result'; +$worker->unregister; + # Job dependencies $worker = $minion->remove_after(0)->worker->register; is $minion->repair->stats->{finished_jobs}, 0, 'no finished jobs'; @@ -641,6 +654,35 @@ is $minion->job($id)->info->{result}, 'Parent went away', 'right result'; $worker->unregister; +# Worker remote control commands +$worker = $minion->worker->register->process_commands; +$worker2 = $minion->worker->register; +my @commands; +$_->add_command(test_id => sub { push @commands, shift->id }) + for $worker, $worker2; +$worker->add_command(test_args => sub { shift and push @commands, [@_] }) + ->register; +ok $minion->backend->broadcast('test_id', [], [$worker->id]), 'sent command'; +ok $minion->backend->broadcast('test_id', [], [$worker->id, $worker2->id]), + 'sent command'; +$worker->process_commands->register; +$worker2->process_commands; +is_deeply \@commands, [$worker->id, $worker->id, $worker2->id], + 'right structure'; +@commands = (); +ok $minion->backend->broadcast('test_id'), 'sent command'; +ok $minion->backend->broadcast('test_whatever'), 'sent command'; +ok $minion->backend->broadcast('test_args', [23], []), 'sent command'; +ok $minion->backend->broadcast('test_args', [1, [2], {3 => 'three'}], + [$worker->id]), + 'sent command'; +$_->process_commands for $worker, $worker2; +is_deeply \@commands, + [$worker->id, [23], [1, [2], {3 => 'three'}], $worker2->id], + 'right structure'; +$_->unregister for $worker, $worker2; +ok !$minion->backend->broadcast('test_id', []), 'command not sent'; + # Clean up once we are done $pg->db->query('drop schema minion_test cascade');