Georgios Pappas wrote:
Hello,

I'm pretty new to POE and I found it very useful to solve a simple problem. Given an array of tasks I want to distribute them to an array of hosts, but each host should deal with a single task. The list of tasks is larger than the available hosts and as soon as the host finishes its job a new task is assigned to it.
I tried to implement it using a JobQueue, but in my program instead of distributing the tasks and running them in parallel, the task for each host blocks the entire execution. The question is how can I effectively distribute the tasks between hosts? How should I proceed if two hosts finish exactly at the same time? Could this program be implemented without JobQueue?



You should consider running your "long task" inside of a POE::Wheel::Run that way it is forked as a non-blocking process so control would return to your queue while the processing of a particular element of that queue continues to happen. Then when there is output/error or exit you can handle those events in the controller (queue) to start running a new task. Because the controller is timesliced you don't have to worry about all of the normal IPC locking, issues, etc.


I did something similar but never succeeded at getting PoCo::JobQueue to do what I wanted effectively, but it was trivial to create my own queue that handled this specific task. Because my queue(s) are part of a larger app I created a queue object which had methods for adding elements to the queue, popping off the next task to run, running a task, getting the number of running tasks, number of waiting elements, etc. Then I implemented the events, basically check queue, process element, element output, element error, as object states so each session had access to the object where the elements were stored (the queue), avoiding the need to mess with the HEAP. To go one step further I also made each of the elements of the queue an object, and then the 'process element' event could handle any object that had the same 'process' method, these are known as Processors. So essentially there is a threshold, some other input source adds an element to a queue which triggers a check queue event, that then checks to see if there are new elements in the queue and if the threshold has not been hit starts a new one. Then the element is processed inside of a wheel and the check queue is called again. When the wheel is done (the process has completed) then the exit event calls another check queue. This way there is no need to poll and as soon as an element is added to the queue it can be scheduled to process.

Sorry I can't post code (yet), I am working on the company to let me reveal it. Someone may have a better method but I have found this to work well for our application.

http://danconia.org



Thanks in advance,


Georgios Pappas

Here is the code:


use POE qw(Component::JobQueue);


my @HOSTS = qw/host1 host2 host3 host4/;
my $job_control = { count=>0 };
use vars qw (@HOSTS $job_control);

# They hold the time spent in the task
my @tasks  = qw( 15 4 3 2 10 5 4 8 2 1 );

$job_control->{finished_host} = undef;

#
# Spawning the tasks
#
POE::Component::JobQueue->spawn
  ( Alias        => 'active',
    WorkerLimit  => scalar(@HOSTS),
    Worker       => \&worker_next,

    Active =>
    { AckAlias => 'respondee',
      AckState => 'response',
    },
  );


sub worker_next {
my $task;
my $host;
foreach my $host (@HOSTS) {
if ($job_control->{$_}->{busy}!=1) { $task = shift @tasks;
if (defined $task) {
&spawn_worker( $host, $task );
}
}
}
}


sub worker_start {
my ($kernel, $heap, $host, $task ) =
@_[KERNEL, HEAP, ARG0..ARG1];
$job_control->{$host}->{busy} = 1;


    $heap->{task} = $task;
    $heap->{host} = $host;
    # Runnig the task -> it may take a while !!!
    warn "Task $task sent to host [$host].\n";
    $kernel->yield("LongTask");

}

sub worker_stop {
my $heap = $_[HEAP];


print "finished on [$heap->{host}]\n";
$job_control->{finished_host} = $heap->{host}; $job_control->{count}++;
$job_control->{$heap->{host}}->{busy} = 0;
}


sub spawn_worker {
  my ($outer_host, $outer_task) = @_;

  POE::Session->create
    ( inline_states =>
      { _start => \&worker_start,
        _stop  => \&worker_stop,


LongTask => \&LongTask,
},
args => [ $outer_host, $outer_task ]
);
}


sub LongTask {
    my ($heap) = @_[HEAP];

    #
    # WARNING: My task is actually a SOAP call to the server but
    #  I'm simulating this with sleep (and not $kernel->delay)
    sleep($heap->{task});
}





Reply via email to