I'm pretty new to POE and I found it very useful to solve a simple problem. Given an array of tasks I want to distribute them to an array of hosts, but each host should deal with a single task. The list of tasks is larger than the available hosts and as soon as the host finishes its job a new task is assigned to it.
I tried to implement it using a JobQueue, but in my program instead of distributing the tasks and running them in parallel, the task for each host blocks the entire execution. The question is how can I effectively distribute the tasks between hosts? How should I proceed if two hosts finish exactly at the same time? Could this program be implemented without JobQueue?
Thanks in advance,
Georgios Pappas
Here is the code:
use POE qw(Component::JobQueue);
my @HOSTS = qw/host1 host2 host3 host4/;
my $job_control = { count=>0 };
use vars qw (@HOSTS $job_control);# They hold the time spent in the task my @tasks = qw( 15 4 3 2 10 5 4 8 2 1 );
$job_control->{finished_host} = undef;
#
# Spawning the tasks
#
POE::Component::JobQueue->spawn
( Alias => 'active',
WorkerLimit => scalar(@HOSTS),
Worker => \&worker_next, Active =>
{ AckAlias => 'respondee',
AckState => 'response',
},
);
sub worker_next { my $task; my $host; foreach my $host (@HOSTS) { if ($job_control->{$_}->{busy}!=1) { $task = shift @tasks; if (defined $task) { &spawn_worker( $host, $task ); } } } }
sub worker_start {
my ($kernel, $heap, $host, $task ) =
@_[KERNEL, HEAP, ARG0..ARG1];
$job_control->{$host}->{busy} = 1; $heap->{task} = $task;
$heap->{host} = $host;
# Runnig the task -> it may take a while !!!
warn "Task $task sent to host [$host].\n";
$kernel->yield("LongTask");}
sub worker_stop {
my $heap = $_[HEAP]; print "finished on [$heap->{host}]\n";
$job_control->{finished_host} = $heap->{host};
$job_control->{count}++;
$job_control->{$heap->{host}}->{busy} = 0;
}sub spawn_worker {
my ($outer_host, $outer_task) = @_; POE::Session->create
( inline_states =>
{ _start => \&worker_start,
_stop => \&worker_stop,
LongTask => \&LongTask, }, args => [ $outer_host, $outer_task ] ); }
sub LongTask {
my ($heap) = @_[HEAP]; #
# WARNING: My task is actually a SOAP call to the server but
# I'm simulating this with sleep (and not $kernel->delay)
sleep($heap->{task});
}