Hello community, here is the log from the commit of package perl-Minion for openSUSE:Factory checked in at 2015-10-03 20:30:24 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ Comparing /work/SRC/openSUSE:Factory/perl-Minion (Old) and /work/SRC/openSUSE:Factory/.perl-Minion.new (New) ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Package is "perl-Minion" Changes: -------- --- /work/SRC/openSUSE:Factory/perl-Minion/perl-Minion.changes 2015-09-08 17:42:37.000000000 +0200 +++ /work/SRC/openSUSE:Factory/.perl-Minion.new/perl-Minion.changes 2015-10-03 20:30:44.000000000 +0200 @@ -1,0 +2,30 @@ +Fri Oct 2 09:16:58 UTC 2015 - [email protected] + +- updated to 2.0 + see /usr/share/doc/packages/perl-Minion/Changes + + 2.0 2015-10-01 + - Removed -t option from worker command. + - Added support for multiple named queues. + - Added retries attribute to Minion::Job. + - Added retries argument to fail_job, finish_job and retry_job methods in + Minion::Backend::File and Minion::Backend::Pg. + - Added queue option to enqueue method in Minion. + - Added queue option to enqueue and retry_job methods in Minion::Backend::File + and Minion::Backend::Pg. + - Added queues option to dequeue method in Minion::Worker. + - Added queues option to dequeue method in Minion::Backend::File and + Minion::Backend::Pg. + - Added -q option to job and worker commands. + - Improved worker command to be more resilient to time jumps. + - Fixed a race condition in Minion::Backend::File and Minion::Backend::Pg + where a retried job did not have to be dequeued again before it could be + finished. + + 1.19 2015-09-28 + - Added support for retrying jobs with a new priority. + - Added priority option to retry method in Minion::Job. + - Added priority option to retry_job method in Minion::Backend::File and + Minion::Backend::Pg. + +------------------------------------------------------------------- Old: ---- Minion-1.18.tar.gz New: ---- Minion-2.0.tar.gz ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ Other differences: ------------------ ++++++ perl-Minion.spec ++++++ --- /var/tmp/diff_new_pack.u8dvOn/_old 2015-10-03 20:30:44.000000000 +0200 +++ /var/tmp/diff_new_pack.u8dvOn/_new 2015-10-03 20:30:44.000000000 +0200 @@ -17,7 +17,7 @@ Name: perl-Minion -Version: 1.18 +Version: 2.0 Release: 0 %define cpan_name Minion Summary: Job queue ++++++ Minion-1.18.tar.gz -> Minion-2.0.tar.gz ++++++ diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/Minion-1.18/Changes new/Minion-2.0/Changes --- old/Minion-1.18/Changes 2015-08-29 13:39:17.000000000 +0200 +++ new/Minion-2.0/Changes 2015-10-01 14:13:16.000000000 +0200 @@ -1,4 +1,28 @@ +2.0 2015-10-01 + - Removed -t option from worker command. + - Added support for multiple named queues. + - Added retries attribute to Minion::Job. + - Added retries argument to fail_job, finish_job and retry_job methods in + Minion::Backend::File and Minion::Backend::Pg. + - Added queue option to enqueue method in Minion. + - Added queue option to enqueue and retry_job methods in Minion::Backend::File + and Minion::Backend::Pg. + - Added queues option to dequeue method in Minion::Worker. + - Added queues option to dequeue method in Minion::Backend::File and + Minion::Backend::Pg. + - Added -q option to job and worker commands. + - Improved worker command to be more resilient to time jumps. + - Fixed a race condition in Minion::Backend::File and Minion::Backend::Pg + where a retried job did not have to be dequeued again before it could be + finished. + +1.19 2015-09-28 + - Added support for retrying jobs with a new priority. + - Added priority option to retry method in Minion::Job. + - Added priority option to retry_job method in Minion::Backend::File and + Minion::Backend::Pg. + 1.18 2015-08-30 - Fixed Makefile.PL to be compliant with version 2 of the CPAN distribution metadata specification. @@ -13,8 +37,8 @@ 1.15 2015-05-15 - Added support for retrying jobs with a delay. (kwa) - Added delay option to retry method in Minion::Job. (kwa) - - Added delay option to retry_job method in Minion::Backend::File. (kwa) - - Added delay option to retry_job method in Minion::Backend::Pg. (kwa) + - Added delay option to retry_job method in Minion::Backend::File and + Minion::Backend::Pg. (kwa) 1.14 2015-04-21 - Improved performance of Minion::Backend::Pg with a new index. (avkhozov) diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/Minion-1.18/META.json new/Minion-2.0/META.json --- old/Minion-1.18/META.json 2015-09-02 19:29:02.000000000 +0200 +++ new/Minion-2.0/META.json 2015-10-01 20:35:41.000000000 +0200 @@ -55,6 +55,6 @@ }, "x_IRC" : "irc://irc.perl.org/#mojo" }, - "version" : "1.18", + "version" : "2.0", "x_serialization_backend" : "JSON::PP version 2.27300" } diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/Minion-1.18/META.yml new/Minion-2.0/META.yml --- old/Minion-1.18/META.yml 2015-09-02 19:29:01.000000000 +0200 +++ new/Minion-2.0/META.yml 2015-10-01 20:35:41.000000000 +0200 @@ -28,5 +28,5 @@ homepage: http://mojolicio.us license: http://www.opensource.org/licenses/artistic-license-2.0 repository: https://github.com/kraih/minion.git -version: '1.18' +version: '2.0' x_serialization_backend: 'CPAN::Meta::YAML version 0.016' diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/Minion-1.18/lib/Minion/Backend/File.pm new/Minion-2.0/lib/Minion/Backend/File.pm --- old/Minion-1.18/lib/Minion/Backend/File.pm 2015-08-29 01:40:49.000000000 +0200 +++ new/Minion-2.0/lib/Minion/Backend/File.pm 2015-10-01 20:15:34.000000000 +0200 @@ -14,9 +14,9 @@ } sub dequeue { - my ($self, $id, $timeout) = @_; - usleep($timeout * 1000000) unless my $job = $self->_try($id); - return $job || $self->_try($id); + my ($self, $id, $wait, $options) = @_; + usleep($wait * 1000000) unless my $job = $self->_try($id, $options); + return $job || $self->_try($id, $options); } sub enqueue { @@ -31,6 +31,7 @@ delayed => $options->{delay} ? (time + $options->{delay}) : 1, id => $self->_id, priority => $options->{priority} // 0, + queue => $options->{queue} // 'default', retries => 0, state => 'inactive', task => $task @@ -120,13 +121,15 @@ sub reset { $_[0]->db and delete($_[0]->{db})->clear } sub retry_job { - my ($self, $id) = (shift, shift); + my ($self, $id, $retries) = (shift, shift, shift); my $options = shift // {}; my $guard = $self->_exclusive; return undef unless my $job = $self->_job($id, 'failed', 'finished'); + return undef unless $job->{retries} == $retries; $job->{retries} += 1; $job->{delayed} = time + $options->{delay} if $options->{delay}; + defined $options->{$_} and $job->{$_} = $options->{$_} for qw(priority queue); @$job{qw(retried state)} = (time, 'inactive'); delete @$job{qw(finished result started worker)}; @@ -180,25 +183,28 @@ sub _jobs { shift->db->{jobs} //= {} } sub _try { - my ($self, $id) = @_; + my ($self, $id, $options) = @_; - my $guard = $self->_exclusive; - my @ready = grep { $_->{state} eq 'inactive' } values %{$self->_jobs}; - my $now = time; - @ready = grep { $_->{delayed} < $now } @ready; + my $guard = $self->_exclusive; + my @ready = grep { $_->{state} eq 'inactive' } values %{$self->_jobs}; + my %queues = map { $_ => 1 } @{$options->{queues} || ['default']}; + my $tasks = $self->minion->tasks; + @ready = grep { $queues{$_->{queue}} && $tasks->{$_->{task}} } @ready; @ready = sort { $a->{created} <=> $b->{created} } @ready; @ready = sort { $b->{priority} <=> $a->{priority} } @ready; - my $job = first { $self->minion->tasks->{$_->{task}} } @ready; - @$job{qw(started state worker)} = (time, 'active', $id) if $job; - return $job ? $job->export : undef; + my $now = time; + return undef unless my $job = first { $_->{delayed} < $now } @ready; + @$job{qw(started state worker)} = (time, 'active', $id); + return $job->export; } sub _update { - my ($self, $fail, $id, $result) = @_; + my ($self, $fail, $id, $retries, $result) = @_; my $guard = $self->_exclusive; return undef unless my $job = $self->_job($id, 'active'); + return undef unless $job->{retries} == $retries; @$job{qw(finished result)} = (time, $result); $job->{state} = $fail ? 'failed' : 'finished'; return 1; @@ -258,9 +264,44 @@ =head2 dequeue my $job_info = $backend->dequeue($worker_id, 0.5); + my $job_info = $backend->dequeue($worker_id, 0.5, {queues => ['important']}); Wait for job, dequeue it and transition from C<inactive> to C<active> state or -return C<undef> if queue was empty. +return C<undef> if queues were empty. + +These options are currently available: + +=over 2 + +=item queues + + queues => ['important'] + +One or more queues to dequeue jobs from, defaults to C<default>. + +=back + +These fields are currently available: + +=over 2 + +=item args + +Job arguments. + +=item id + +Job ID. + +=item retries + +Number of times job has been retried. + +=item task + +Task name. + +=back =head2 enqueue @@ -286,21 +327,28 @@ Job priority, defaults to C<0>. +=item queue + + queue => 'important' + +Queue to put job in, defaults to C<default>. + =back =head2 fail_job - my $bool = $backend->fail_job($job_id); - my $bool = $backend->fail_job($job_id, 'Something went wrong!'); - my $bool = $backend->fail_job($job_id, {msg => 'Something went wrong!'}); + my $bool = $backend->fail_job($job_id, $retries); + my $bool = $backend->fail_job($job_id, $retries, 'Something went wrong!'); + my $bool = $backend->fail_job( + $job_id, $retries, {msg => 'Something went wrong!'}); Transition from C<active> to C<failed> state. =head2 finish_job - my $bool = $backend->finish_job($job_id); - my $bool = $backend->finish_job($job_id, 'All went well!'); - my $bool = $backend->finish_job($job_id, {msg => 'All went well!'}); + my $bool = $backend->finish_job($job_id, $retries); + my $bool = $backend->finish_job($job_id, $retries, 'All went well!'); + my $bool = $backend->finish_job($job_id, $retries, {msg => 'All went well!'}); Transition from C<active> to C<finished> state. @@ -340,6 +388,10 @@ Job priority. +=item queue + +Queue name. + =item result Job result. @@ -358,7 +410,7 @@ =item state -Current job state. +Current job state, usually C<active>, C<failed>, C<finished> or C<inactive>. =item task @@ -434,8 +486,8 @@ =head2 retry_job - my $bool = $backend->retry_job($job_id); - my $bool = $backend->retry_job($job_id, {delay => 10}); + my $bool = $backend->retry_job($job_id, $retries); + my $bool = $backend->retry_job($job_id, $retries, {delay => 10}); Transition from C<failed> or C<finished> state back to C<inactive>. @@ -449,6 +501,18 @@ Delay job for this many seconds (from now). +=item priority + + priority => 5 + +Job priority. + +=item queue + + queue => 'important' + +Queue to put job in. + =back =head2 stats diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/Minion-1.18/lib/Minion/Backend/Pg.pm new/Minion-2.0/lib/Minion/Backend/Pg.pm --- old/Minion-1.18/lib/Minion/Backend/Pg.pm 2015-08-29 01:40:50.000000000 +0200 +++ new/Minion-2.0/lib/Minion/Backend/Pg.pm 2015-10-01 20:34:59.000000000 +0200 @@ -8,18 +8,19 @@ has 'pg'; sub dequeue { - my ($self, $id, $timeout) = @_; + my ($self, $id, $wait, $options) = @_; - if ((my $job = $self->_try($id)) || Mojo::IOLoop->is_running) { return $job } + if ((my $job = $self->_try($id, $options))) { return $job } + return undef if Mojo::IOLoop->is_running; my $db = $self->pg->db; $db->listen('minion.job')->on(notification => sub { Mojo::IOLoop->stop }); - my $timer = Mojo::IOLoop->timer($timeout => sub { Mojo::IOLoop->stop }); + my $timer = Mojo::IOLoop->timer($wait => sub { Mojo::IOLoop->stop }); Mojo::IOLoop->start; $db->unlisten('*') and Mojo::IOLoop->remove($timer); undef $db; - return $self->_try($id); + return $self->_try($id, $options); } sub enqueue { @@ -29,10 +30,10 @@ my $db = $self->pg->db; return $db->query( - "insert into minion_jobs (args, delayed, priority, task) - values (?, (now() + (interval '1 second' * ?)), ?, ?) + "insert into minion_jobs (args, delayed, priority, queue, task) + values (?, (now() + (interval '1 second' * ?)), ?, ?, ?) returning id", {json => $args}, $options->{delay} // 0, - $options->{priority} // 0, $task + $options->{priority} // 0, $options->{queue} // 'default', $task )->hash->{id}; } @@ -43,7 +44,7 @@ shift->pg->db->query( 'select id, args, extract(epoch from created) as created, extract(epoch from delayed) as delayed, - extract(epoch from finished) as finished, priority, result, + extract(epoch from finished) as finished, priority, queue, result, extract(epoch from retried) as retried, retries, extract(epoch from started) as started, state, task, worker from minion_jobs where id = ?', shift @@ -128,16 +129,18 @@ sub reset { shift->pg->db->query('truncate minion_jobs, minion_workers') } sub retry_job { - my ($self, $id) = (shift, shift); + my ($self, $id, $retries) = (shift, shift, shift); my $options = shift // {}; return !!$self->pg->db->query( "update minion_jobs - set finished = null, result = null, retried = now(), - retries = retries + 1, started = null, state = 'inactive', - worker = null, delayed = (now() + (interval '1 second' * ?)) - where id = ? and state in ('failed', 'finished') - returning 1", $options->{delay} // 0, $id + set finished = null, priority = coalesce(?, priority), + queue = coalesce(?, queue), result = null, retried = now(), + retries = retries + 1, started = null, state = 'inactive', worker = null, + delayed = (now() + (interval '1 second' * ?)) + where id = ? and retries = ? and state in ('failed', 'finished') + returning 1", @$options{qw(priority queue)}, $options->{delay} // 0, $id, + $retries )->rows; } @@ -180,30 +183,33 @@ } sub _try { - my ($self, $id) = @_; + my ($self, $id, $options) = @_; return $self->pg->db->query( "update minion_jobs set started = now(), state = 'active', worker = ? where id = ( select id from minion_jobs - where state = 'inactive' and delayed < now() and task = any (?) + where delayed < now() and queue = any (?) and state = 'inactive' + and task = any (?) order by priority desc, created limit 1 for update ) - returning id, args, task", $id, [keys %{$self->minion->tasks}] + returning id, args, retries, task", $id, + $options->{queues} || ['default'], [keys %{$self->minion->tasks}] )->expand->hash; } sub _update { - my ($self, $fail, $id, $result) = @_; + my ($self, $fail, $id, $retries, $result) = @_; return !!$self->pg->db->query( "update minion_jobs set finished = now(), result = ?, state = ? - where id = ? and state = 'active' - returning 1", {json => $result}, $fail ? 'failed' : 'finished', $id + where id = ? and retries = ? and state = 'active' + returning 1", {json => $result}, $fail ? 'failed' : 'finished', $id, + $retries )->rows; } @@ -248,9 +254,44 @@ =head2 dequeue my $job_info = $backend->dequeue($worker_id, 0.5); + my $job_info = $backend->dequeue($worker_id, 0.5, {queues => ['important']}); Wait for job, dequeue it and transition from C<inactive> to C<active> state or -return C<undef> if queue was empty. +return C<undef> if queues were empty. + +These options are currently available: + +=over 2 + +=item queues + + queues => ['important'] + +One or more queues to dequeue jobs from, defaults to C<default>. + +=back + +These fields are currently available: + +=over 2 + +=item args + +Job arguments. + +=item id + +Job ID. + +=item retries + +Number of times job has been retried. + +=item task + +Task name. + +=back =head2 enqueue @@ -276,21 +317,28 @@ Job priority, defaults to C<0>. +=item queue + + queue => 'important' + +Queue to put job in, defaults to C<default>. + =back =head2 fail_job - my $bool = $backend->fail_job($job_id); - my $bool = $backend->fail_job($job_id, 'Something went wrong!'); - my $bool = $backend->fail_job($job_id, {msg => 'Something went wrong!'}); + my $bool = $backend->fail_job($job_id, $retries); + my $bool = $backend->fail_job($job_id, $retries, 'Something went wrong!'); + my $bool = $backend->fail_job( + $job_id, $retries, {msg => 'Something went wrong!'}); Transition from C<active> to C<failed> state. =head2 finish_job - my $bool = $backend->finish_job($job_id); - my $bool = $backend->finish_job($job_id, 'All went well!'); - my $bool = $backend->finish_job($job_id, {msg => 'All went well!'}); + my $bool = $backend->finish_job($job_id, $retries); + my $bool = $backend->finish_job($job_id, $retries, 'All went well!'); + my $bool = $backend->finish_job($job_id, $retries, {msg => 'All went well!'}); Transition from C<active> to C<finished> state. @@ -330,6 +378,10 @@ Job priority. +=item queue + +Queue name. + =item result Job result. @@ -348,7 +400,7 @@ =item state -Current job state. +Current job state, usually C<active>, C<failed>, C<finished> or C<inactive>. =item task @@ -424,8 +476,8 @@ =head2 retry_job - my $bool = $backend->retry_job($job_id); - my $bool = $backend->retry_job($job_id, {delay => 10}); + my $bool = $backend->retry_job($job_id, $retries); + my $bool = $backend->retry_job($job_id, $retries, {delay => 10}); Transition from C<failed> or C<finished> state back to C<inactive>. @@ -439,6 +491,18 @@ Delay job for this many seconds (from now). +=item priority + + priority => 5 + +Job priority. + +=item queue + + queue => 'important' + +Queue to put job in. + =back =head2 stats @@ -547,3 +611,6 @@ -- 3 up create index on minion_jobs (state); + +-- 4 up +alter table minion_jobs add column queue text not null default 'default'; diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/Minion-1.18/lib/Minion/Backend.pm new/Minion-2.0/lib/Minion/Backend.pm --- old/Minion-1.18/lib/Minion/Backend.pm 2015-08-29 01:40:50.000000000 +0200 +++ new/Minion-2.0/lib/Minion/Backend.pm 2015-10-01 17:11:03.000000000 +0200 @@ -81,9 +81,44 @@ =head2 dequeue my $job_info = $backend->dequeue($worker_id, 0.5); + my $job_info = $backend->dequeue($worker_id, 0.5, {queues => ['important']}); Wait for job, dequeue it and transition from C<inactive> to C<active> state or -return C<undef> if queue was empty. Meant to be overloaded in a subclass. +return C<undef> if queues were empty. Meant to be overloaded in a subclass. + +These options are currently available: + +=over 2 + +=item queues + + queues => ['important'] + +One or more queues to dequeue jobs from, defaults to C<default>. + +=back + +These fields are currently available: + +=over 2 + +=item args + +Job arguments. + +=item id + +Job ID. + +=item retries + +Number of times job has been retried. + +=item task + +Task name. + +=back =head2 enqueue @@ -109,22 +144,29 @@ Job priority, defaults to C<0>. +=item queue + + queue => 'important' + +Queue to put job in, defaults to C<default>. + =back =head2 fail_job - my $bool = $backend->fail_job($job_id); - my $bool = $backend->fail_job($job_id, 'Something went wrong!'); - my $bool = $backend->fail_job($job_id, {msg => 'Something went wrong!'}); + my $bool = $backend->fail_job($job_id, $retries); + my $bool = $backend->fail_job($job_id, $retries, 'Something went wrong!'); + my $bool = $backend->fail_job( + $job_id, $retries, {msg => 'Something went wrong!'}); Transition from C<active> to C<failed> state. Meant to be overloaded in a subclass. =head2 finish_job - my $bool = $backend->finish_job($job_id); - my $bool = $backend->finish_job($job_id, 'All went well!'); - my $bool = $backend->finish_job($job_id, {msg => 'All went well!'}); + my $bool = $backend->finish_job($job_id, $retries); + my $bool = $backend->finish_job($job_id, $retries, 'All went well!'); + my $bool = $backend->finish_job($job_id, $retries, {msg => 'All went well!'}); Transition from C<active> to C<finished> state. Meant to be overloaded in a subclass. @@ -166,6 +208,10 @@ Job priority. +=item queue + +Queue name. + =item result Job result. @@ -184,7 +230,7 @@ =item state -Current job state. +Current job state, usually C<active>, C<failed>, C<finished> or C<inactive>. =item task @@ -259,8 +305,8 @@ =head2 retry_job - my $bool = $backend->retry_job($job_id); - my $bool = $backend->retry_job($job_id, {delay => 10}); + my $bool = $backend->retry_job($job_id, $retries); + my $bool = $backend->retry_job($job_id, $retries, {delay => 10}); Transition from C<failed> or C<finished> state back to C<inactive>. Meant to be overloaded in a subclass. @@ -275,6 +321,18 @@ Delay job for this many seconds (from now). +=item priority + + priority => 5 + +Job priority. + +=item queue + + queue => 'important' + +Queue to put job in. + =back =head2 stats diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/Minion-1.18/lib/Minion/Command/minion/job.pm new/Minion-2.0/lib/Minion/Command/minion/job.pm --- old/Minion-1.18/lib/Minion/Command/minion/job.pm 2015-08-29 01:40:51.000000000 +0200 +++ new/Minion-2.0/lib/Minion/Command/minion/job.pm 2015-10-01 20:12:00.000000000 +0200 @@ -20,6 +20,7 @@ 'l|limit=i' => \(my $limit = 100), 'o|offset=i' => \(my $offset = 0), 'p|priority=i' => \$options->{priority}, + 'q|queue=s' => \$options->{queue}, 'R|retry' => \my $retry, 'r|remove' => \my $remove, 'S|state=s' => \$options->{state}, @@ -54,8 +55,9 @@ # Details my $info = $job->info; - my ($state, $priority, $retries) = @$info{qw(state priority retries)}; - say $info->{task}, " ($state, p$priority, r$retries)"; + my ($queue, $state, $priority, $retries) + = @$info{qw(queue state priority retries)}; + say $info->{task}, " ($queue, $state, p$priority, r$retries)"; print dumper $info->{args}; if (my $result = $info->{result}) { print dumper $result } @@ -109,7 +111,7 @@ ./myapp.pl minion job ./myapp.pl minion job -t foo -S inactive ./myapp.pl minion job -e foo -a '[23, "bar"]' - ./myapp.pl minion job -e foo -p 5 + ./myapp.pl minion job -e foo -p 5 -q important ./myapp.pl minion job -s ./myapp.pl minion job -w -l 5 ./myapp.pl minion job 10023 @@ -125,6 +127,7 @@ -o, --offset <number> Number of jobs/workers to skip when listing them, defaults to 0 -p, --priority <number> Priority of new job, defaults to 0 + -q, --queue <name> Queue to put job in, defaults to "default" -R, --retry Retry job -r, --remove Remove job -S, --state <state> List only jobs in this state diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/Minion-1.18/lib/Minion/Command/minion/worker.pm new/Minion-2.0/lib/Minion/Command/minion/worker.pm --- old/Minion-1.18/lib/Minion/Command/minion/worker.pm 2015-08-29 01:40:51.000000000 +0200 +++ new/Minion-2.0/lib/Minion/Command/minion/worker.pm 2015-10-01 20:10:29.000000000 +0200 @@ -2,6 +2,7 @@ use Mojo::Base 'Mojolicious::Command'; use Getopt::Long qw(GetOptionsFromArray :config no_auto_abbrev no_ignore_case); +use Mojo::Util 'steady_time'; has description => 'Start Minion worker'; has usage => sub { shift->extract_usage }; @@ -12,19 +13,14 @@ GetOptionsFromArray \@args, 'I|heartbeat-interval=i' => \($self->{interval} = 60), 'j|jobs=i' => \($self->{max} = 4), - 't|task=s' => \my @tasks; - - # Limit tasks - my $app = $self->app; - my $minion = $app->minion; - my $tasks = $minion->tasks; - %$tasks = map { $tasks->{$_} ? ($_ => $tasks->{$_}) : () } @tasks if @tasks; + 'q|queue=s' => ($self->{queues} = ['default']); local $SIG{CHLD} = 'DEFAULT'; local $SIG{INT} = local $SIG{TERM} = sub { $self->{finished}++ }; # Log fatal errors - my $worker = $self->{worker} = $minion->worker; + my $app = $self->app; + my $worker = $self->{worker} = $app->minion->worker; @$self{qw(register repair)} = (0, 0); eval { $self->_work until $self->{finished} && !keys %{$self->{jobs}}; 1 } or $app->log->fatal("Worker error: $@"); @@ -35,16 +31,17 @@ my $self = shift; # Send heartbeats in regular intervals - $self->{worker}->register and $self->{register} = time + $self->{interval} - if $self->{register} < time; + my $worker = $self->{worker}; + $worker->register and $self->{register} = steady_time + $self->{interval} + if $self->{register} < steady_time; # Repair in regular intervals - if ($self->{repair} < time) { + if ($self->{repair} < steady_time) { my $app = $self->app; $app->log->debug('Checking worker registry and job queue'); my $minion = $app->minion; $minion->repair; - $self->{repair} = time + $minion->missing_after; + $self->{repair} = steady_time + $minion->missing_after; } # Check if jobs are finished @@ -55,7 +52,9 @@ if (($self->{max} <= keys %$jobs) || $self->{finished}) { sleep 1 } # Try to get more jobs - elsif (my $job = $self->{worker}->dequeue(5)) { $jobs->{$job->start} = $job } + elsif (my $job = $worker->dequeue(5 => {queues => $self->{queues}})) { + $jobs->{$job->start} = $job; + } } 1; @@ -72,14 +71,14 @@ ./myapp.pl minion worker ./myapp.pl minion worker -m production -I 15 -j 10 - ./myapp.pl minion worker -t foo -t bar + ./myapp.pl minion worker -q important -q default Options: -I, --heartbeat-interval <seconds> Heartbeat interval, defaults to 60 -j, --jobs <number> Number of jobs to perform concurrently, defaults to 4 - -t, --task <name> One or more tasks to handle, defaults - to handling all tasks + -q, --queue <name> One or more queues to get jobs from, + defaults to "default" =head1 DESCRIPTION diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/Minion-1.18/lib/Minion/Job.pm new/Minion-2.0/lib/Minion/Job.pm --- old/Minion-1.18/lib/Minion/Job.pm 2015-08-29 01:40:51.000000000 +0200 +++ new/Minion-2.0/lib/Minion/Job.pm 2015-10-01 17:10:37.000000000 +0200 @@ -4,21 +4,21 @@ use Mojo::IOLoop; use POSIX 'WNOHANG'; -has args => sub { [] }; -has [qw(id minion task)]; +has [qw(args id minion retries task)]; sub app { shift->minion->app } sub fail { my $self = shift; my $err = shift // 'Unknown error'; - my $ok = $self->minion->backend->fail_job($self->id, $err); + my $ok = $self->minion->backend->fail_job($self->id, $self->retries, $err); return $ok ? !!$self->emit(failed => $err) : undef; } sub finish { my ($self, $result) = @_; - my $ok = $self->minion->backend->finish_job($self->id, $result); + my $ok + = $self->minion->backend->finish_job($self->id, $self->retries, $result); return $ok ? !!$self->emit(finished => $result) : undef; } @@ -39,7 +39,10 @@ sub remove { $_[0]->minion->backend->remove_job($_[0]->id) } -sub retry { $_[0]->minion->backend->retry_job($_[0]->id, $_[1]) } +sub retry { + my $self = shift; + return $self->minion->backend->retry_job($self->id, $self->retries, @_); +} sub start { my $self = shift; @@ -155,6 +158,13 @@ L<Minion> object this job belongs to. +=head2 retries + + my $retries = $job->retries; + $job = $job->retries(5); + +Number of times job has been retried. + =head2 task my $task = $job->task; @@ -228,6 +238,10 @@ Job priority. +=item queue + +Queue name. + =item result Job result. @@ -246,7 +260,7 @@ =item state -Current job state. +Current job state, usually C<active>, C<failed>, C<finished> or C<inactive>. =item task @@ -293,6 +307,18 @@ Delay job for this many seconds (from now). +=item priority + + priority => 5 + +Job priority. + +=item queue + + queue => 'important' + +Queue to put job in. + =back =head2 start diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/Minion-1.18/lib/Minion/Worker.pm new/Minion-2.0/lib/Minion/Worker.pm --- old/Minion-1.18/lib/Minion/Worker.pm 2015-08-29 01:40:52.000000000 +0200 +++ new/Minion-2.0/lib/Minion/Worker.pm 2015-10-01 20:11:00.000000000 +0200 @@ -4,18 +4,19 @@ has [qw(id minion)]; sub dequeue { - my ($self, $timeout) = @_; + my ($self, $wait, $options) = @_; # Worker not registered return undef unless my $id = $self->id; my $minion = $self->minion; - return undef unless my $job = $minion->backend->dequeue($id, $timeout); + return undef unless my $job = $minion->backend->dequeue($id, $wait, $options); $job = Minion::Job->new( - args => $job->{args}, - id => $job->{id}, - minion => $minion, - task => $job->{task} + args => $job->{args}, + id => $job->{id}, + minion => $minion, + retries => $job->{retries}, + task => $job->{task} ); $self->emit(dequeue => $job); return $job; @@ -95,9 +96,22 @@ =head2 dequeue my $job = $worker->dequeue(0.5); + my $job = $worker->dequeue(0.5 => {queues => ['important']}); Wait for job, dequeue L<Minion::Job> object and transition from C<inactive> to -C<active> state or return C<undef> if queue was empty. +C<active> state or return C<undef> if queues were empty. + +These options are currently available: + +=over 2 + +=item queues + + queues => ['important'] + +One or more queues to dequeue jobs from, defaults to C<default>. + +=back =head2 info diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/Minion-1.18/lib/Minion.pm new/Minion-2.0/lib/Minion.pm --- old/Minion-1.18/lib/Minion.pm 2015-08-29 01:40:52.000000000 +0200 +++ new/Minion-2.0/lib/Minion.pm 2015-10-01 17:09:40.000000000 +0200 @@ -15,7 +15,7 @@ has remove_after => 864000; has tasks => sub { {} }; -our $VERSION = '1.18'; +our $VERSION = '2.0'; sub add_task { ($_[0]->tasks->{$_[1]} = $_[2]) and return $_[0] } @@ -26,10 +26,11 @@ return undef unless my $job = $self->backend->job_info($id); return Minion::Job->new( - args => $job->{args}, - id => $job->{id}, - minion => $self, - task => $job->{task} + args => $job->{args}, + id => $job->{id}, + minion => $self, + retries => $job->{retries}, + task => $job->{task} ); } @@ -265,8 +266,9 @@ my $id = $minion->enqueue(foo => [@args]); my $id = $minion->enqueue(foo => [@args] => {priority => 1}); -Enqueue a new job with C<inactive> state. Arguments get serialized, so you -shouldn't send objects, nested data structures with hash and array references +Enqueue a new job with C<inactive> state. Arguments get serialized by the +L</"backend"> (often with L<Mojo::JSON>), so you shouldn't send objects and be +careful with binary data, nested data structures with hash and array references are fine though. These options are currently available: @@ -285,6 +287,12 @@ Job priority, defaults to C<0>. +=item queue + + queue => 'important' + +Queue to put job in, defaults to C<default>. + =back =head2 job diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/Minion-1.18/t/file.t new/Minion-2.0/t/file.t --- old/Minion-1.18/t/file.t 2015-08-29 01:39:14.000000000 +0200 +++ new/Minion-2.0/t/file.t 2015-10-01 14:15:55.000000000 +0200 @@ -221,7 +221,8 @@ like $job->info->{started}, qr/^[\d.]+$/, 'has started timestamp'; is_deeply $job->args, [2, 2], 'right arguments'; is $job->info->{state}, 'active', 'right state'; -is $job->task, 'add', 'right task'; +is $job->task, 'add', 'right task'; +is $job->retries, 0, 'job has not been retried'; $id = $job->info->{worker}; is $minion->backend->worker_info($id)->{pid}, $$, 'right worker'; ok !$job->info->{finished}, 'no finished timestamp'; @@ -233,6 +234,7 @@ $worker->unregister; $job = $minion->job($job->id); is_deeply $job->args, [2, 2], 'right arguments'; +is $job->retries, 0, 'job has not been retried'; is $job->info->{state}, 'finished', 'right state'; is $job->task, 'add', 'right task'; @@ -250,6 +252,7 @@ is $job->info->{state}, 'inactive', 'right state'; is $job->info->{retries}, 1, 'job has been retried once'; $job = $worker->dequeue(0); +is $job->retries, 1, 'job has been retried once'; ok !$job->retry, 'job not retried'; is $job->id, $id, 'right id'; ok !$job->remove, 'job has not been removed'; @@ -284,6 +287,23 @@ is $job->info->{priority}, 1, 'right priority'; ok $job->finish, 'job finished'; isnt $worker->dequeue(0)->id, $id, 'different id'; +$id = $minion->enqueue(add => [2, 5]); +$job = $worker->register->dequeue(0); +is $job->id, $id, 'right id'; +is $job->info->{priority}, 0, 'right priority'; +ok $job->finish, 'job finished'; +ok $job->retry({priority => 100}), 'job retried with higher priority'; +$job = $worker->dequeue(0); +is $job->id, $id, 'right id'; +is $job->info->{retries}, 1, 'job has been retried once'; +is $job->info->{priority}, 100, 'high priority'; +ok $job->finish, 'job finished'; +ok $job->retry({priority => 0}), 'job retried with lower priority'; +$job = $worker->dequeue(0); +is $job->id, $id, 'right id'; +is $job->info->{retries}, 2, 'job has been retried twice'; +is $job->info->{priority}, 0, 'low priority'; +ok $job->finish, 'job finished'; $worker->unregister; # Delayed jobs @@ -350,6 +370,25 @@ is $finished, 1, 'finished event has been emitted once'; $worker->unregister; +# Queues +$id = $minion->enqueue(add => [100, 1]); +$job = $worker->register->dequeue(0); +is $job->id, $id, 'right id'; +is $job->info->{queue}, 'default', 'right queue'; +ok $job->finish, 'job finished'; +$id = $minion->enqueue(add => [100, 3] => {queue => 'test1'}); +is $worker->dequeue(0), undef, 'wrong queue'; +$job = $worker->dequeue(0 => {queues => ['test1']}); +is $job->id, $id, 'right id'; +is $job->info->{queue}, 'test1', 'right queue'; +ok $job->finish, 'job finished'; +ok $job->retry({queue => 'test2'}), 'job retried'; +$job = $worker->dequeue(0 => {queues => ['default', 'test2']}); +is $job->id, $id, 'right id'; +is $job->info->{queue}, 'test2', 'right queue'; +ok $job->finish, 'job finished'; +$worker->unregister; + # Failed jobs $id = $minion->enqueue(add => [5, 6]); $job = $worker->register->dequeue(0); @@ -403,6 +442,25 @@ is $job->info->{result}, 'Non-zero exit status (1)', 'right result'; $worker->unregister; +# A job needs to be dequeued again after a retry +$minion->add_task(restart => sub { }); +$id = $minion->enqueue('restart'); +$job = $worker->register->dequeue(0); +is $job->id, $id, 'right id'; +ok $job->finish, 'job finished'; +is $job->info->{state}, 'finished', 'right state'; +ok $job->retry, 'job retried'; +is $job->info->{state}, 'inactive', 'right state'; +$job2 = $worker->dequeue(0); +is $job->info->{state}, 'active', 'right state'; +ok !$job->finish, 'job not finished'; +is $job->info->{state}, 'active', 'right state'; +is $job2->id, $id, 'right id'; +ok $job2->finish, 'job finished'; +ok !$job->retry, 'job not retried'; +is $job->info->{state}, 'finished', 'right state'; +$worker->unregister; + # Perform jobs concurrently $id = $minion->enqueue(add => [10, 11]); $id2 = $minion->enqueue(add => [12, 13]); diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/Minion-1.18/t/pg.t new/Minion-2.0/t/pg.t --- old/Minion-1.18/t/pg.t 2015-08-29 01:39:15.000000000 +0200 +++ new/Minion-2.0/t/pg.t 2015-10-01 14:15:40.000000000 +0200 @@ -243,7 +243,8 @@ like $job->info->{started}, qr/^[\d.]+$/, 'has started timestamp'; is_deeply $job->args, [2, 2], 'right arguments'; is $job->info->{state}, 'active', 'right state'; -is $job->task, 'add', 'right task'; +is $job->task, 'add', 'right task'; +is $job->retries, 0, 'job has not been retried'; $id = $job->info->{worker}; is $minion->backend->worker_info($id)->{pid}, $$, 'right worker'; ok !$job->info->{finished}, 'no finished timestamp'; @@ -255,6 +256,7 @@ $worker->unregister; $job = $minion->job($job->id); is_deeply $job->args, [2, 2], 'right arguments'; +is $job->retries, 0, 'job has not been retried'; is $job->info->{state}, 'finished', 'right state'; is $job->task, 'add', 'right task'; @@ -272,6 +274,7 @@ is $job->info->{state}, 'inactive', 'right state'; is $job->info->{retries}, 1, 'job has been retried once'; $job = $worker->dequeue(0); +is $job->retries, 1, 'job has been retried once'; ok !$job->retry, 'job not retried'; is $job->id, $id, 'right id'; ok !$job->remove, 'job has not been removed'; @@ -306,6 +309,23 @@ is $job->info->{priority}, 1, 'right priority'; ok $job->finish, 'job finished'; isnt $worker->dequeue(0)->id, $id, 'different id'; +$id = $minion->enqueue(add => [2, 5]); +$job = $worker->register->dequeue(0); +is $job->id, $id, 'right id'; +is $job->info->{priority}, 0, 'right priority'; +ok $job->finish, 'job finished'; +ok $job->retry({priority => 100}), 'job retried with higher priority'; +$job = $worker->dequeue(0); +is $job->id, $id, 'right id'; +is $job->info->{retries}, 1, 'job has been retried once'; +is $job->info->{priority}, 100, 'high priority'; +ok $job->finish, 'job finished'; +ok $job->retry({priority => 0}), 'job retried with lower priority'; +$job = $worker->dequeue(0); +is $job->id, $id, 'right id'; +is $job->info->{retries}, 2, 'job has been retried twice'; +is $job->info->{priority}, 0, 'low priority'; +ok $job->finish, 'job finished'; $worker->unregister; # Delayed jobs @@ -333,6 +353,25 @@ ok $minion->job($id)->remove, 'job has been removed'; $worker->unregister; +# Queues +$id = $minion->enqueue(add => [100, 1]); +$job = $worker->register->dequeue(0); +is $job->id, $id, 'right id'; +is $job->info->{queue}, 'default', 'right queue'; +ok $job->finish, 'job finished'; +$id = $minion->enqueue(add => [100, 3] => {queue => 'test1'}); +is $worker->dequeue(0), undef, 'wrong queue'; +$job = $worker->dequeue(0 => {queues => ['test1']}); +is $job->id, $id, 'right id'; +is $job->info->{queue}, 'test1', 'right queue'; +ok $job->finish, 'job finished'; +ok $job->retry({queue => 'test2'}), 'job retried'; +$job = $worker->dequeue(0 => {queues => ['default', 'test2']}); +is $job->id, $id, 'right id'; +is $job->info->{queue}, 'test2', 'right queue'; +ok $job->finish, 'job finished'; +$worker->unregister; + # Failed jobs $id = $minion->enqueue(add => [5, 6]); $job = $worker->register->dequeue(0); @@ -386,6 +425,25 @@ is $job->info->{result}, 'Non-zero exit status (1)', 'right result'; $worker->unregister; +# A job needs to be dequeued again after a retry +$minion->add_task(restart => sub { }); +$id = $minion->enqueue('restart'); +$job = $worker->register->dequeue(0); +is $job->id, $id, 'right id'; +ok $job->finish, 'job finished'; +is $job->info->{state}, 'finished', 'right state'; +ok $job->retry, 'job retried'; +is $job->info->{state}, 'inactive', 'right state'; +$job2 = $worker->dequeue(0); +is $job->info->{state}, 'active', 'right state'; +ok !$job->finish, 'job not finished'; +is $job->info->{state}, 'active', 'right state'; +is $job2->id, $id, 'right id'; +ok $job2->finish, 'job finished'; +ok !$job->retry, 'job not retried'; +is $job->info->{state}, 'finished', 'right state'; +$worker->unregister; + # Perform jobs concurrently $id = $minion->enqueue(add => [10, 11]); $id2 = $minion->enqueue(add => [12, 13]);
