Hello,

I'm pretty new to POE and I found it very useful to solve a simple problem. Given an array of tasks I want to distribute them to an array of hosts, but each host should deal with a single task. The list of tasks is larger than the available hosts and as soon as the host finishes its job a new task is assigned to it.
I tried to implement it using a JobQueue, but in my program instead of distributing the tasks and running them in parallel, the task for each host blocks the entire execution. The question is how can I effectively distribute the tasks between hosts? How should I proceed if two hosts finish exactly at the same time? Could this program be implemented without JobQueue?



Thanks in advance,


Georgios Pappas

Here is the code:


use POE qw(Component::JobQueue);


my @HOSTS = qw/host1 host2 host3 host4/;
my $job_control = { count=>0 };
use vars qw (@HOSTS $job_control);

# They hold the time spent in the task
my @tasks  = qw( 15 4 3 2 10 5 4 8 2 1 );

$job_control->{finished_host} = undef;

#
# Spawning the tasks
#
POE::Component::JobQueue->spawn
  ( Alias        => 'active',
    WorkerLimit  => scalar(@HOSTS),
    Worker       => \&worker_next,

    Active =>
    { AckAlias => 'respondee',
      AckState => 'response',
    },
  );


sub worker_next { my $task; my $host; foreach my $host (@HOSTS) { if ($job_control->{$_}->{busy}!=1) { $task = shift @tasks; if (defined $task) { &spawn_worker( $host, $task ); } } } }

sub worker_start {
  my ($kernel, $heap, $host, $task ) =
    @_[KERNEL, HEAP, ARG0..ARG1];
                
        $job_control->{$host}->{busy} = 1;

        $heap->{task} = $task;
        $heap->{host} = $host;
        # Runnig the task -> it may take a while !!!
        warn "Task $task sent to host [$host].\n";
        $kernel->yield("LongTask");

}

sub worker_stop {
        my $heap = $_[HEAP];    

        print "finished on [$heap->{host}]\n";
        $job_control->{finished_host} = $heap->{host};    
        $job_control->{count}++;
        $job_control->{$heap->{host}}->{busy} = 0;
        
}

sub spawn_worker {
  my ($outer_host, $outer_task) = @_;

  POE::Session->create
    ( inline_states =>
      { _start => \&worker_start,
        _stop  => \&worker_stop,


LongTask => \&LongTask, }, args => [ $outer_host, $outer_task ] ); }

sub LongTask {
    my ($heap) = @_[HEAP];

    #
    # WARNING: My task is actually a SOAP call to the server but
    #  I'm simulating this with sleep (and not $kernel->delay)
    sleep($heap->{task});
}



Reply via email to