Attached is a test script and a patch that add an session option "thread"
which if is true will start the session in a separate thread!
Patch against 0.15, I haven't had time to check 0.1501 out.
Following limitations apply.
* Only the main thread can watch filehandles
This can be fixed with some work
* Refcounts are broken, because POE relies on the _start being exeucted at
once and start needs to do something to increment refcount. As start is
executed in a different thread this is kind of hard. But can be fixed. Right
now you need to manually destroy the session.
* Aliases can only be set from the main thread. This is annoying and should
be fixed, unclear how! (Possibly a $poe_kernel->post($poe_kernel,
"alias_set"?)
* Only one session per thread except in the main thread. This can and should
be lifted thru hard work :).
* arguments to threaded apps cannot contain references that are not shared
(threads::shared), can eventually be fixed
* works only on 5.7.2 (will maybe work on 5.6.2)
* posts to the main thread are slowed down by 1 second due to how the event
loop is setup, we need to have a filehandle open to get out of select
* right now, communication between two threads must go thru the main thread,
inter thread communication most nu, this can probably be solved, but I am
not sure it is worth it. If we allow multiple sessions per threads then that
would be worth a lot more.
Rocco, please look at the patch, we need to fix this so it doesn't compile
on older and non threaded perls and possibly make somethings macros(?)
before it is includable. That is if the approach is the correct one.
If you want to try it, I advice a recent snapshot or 5.7.2 compiled with
-Dusethreads -DMULTIPLICITY.
The test script!
#sub POE::Kernel::TRACE_DEFAULT () { 1 }
use lib '.';
use POE qw(Kernel Session);
use strict;
my $session2 = POE::Session->create(
inline_states =>
{
_start => sub { },
bar => sub { print "bar\n"; $poe_kernel->post("foo",'foo'); },
},
options => {
thread => 1,
},);
my $session = POE::Session->create(
inline_states => {
_start => sub {
$poe_kernel->alias_set("foo");
$poe_kernel->post($session2,"bar");
},
foo => sub {
print "foo\n";
$poe_kernel->post($_[SENDER],"bar");
}
}
);
$poe_kernel->run();
--- POE-0.15/POE/Kernel.pm Sun Jul 15 20:29:27 2001
+++ POE-THREADS/POE/Kernel.pm Sat Jul 28 11:56:03 2001
@@ -7,6 +7,10 @@
use Carp qw(carp croak confess);
use Sys::Hostname qw(hostname);
+use threads;
+use threads::shared;
+use threads::queue;
+
use vars qw( $poe_kernel $poe_main_window );
#--------------------------------------------------------------------------
----
@@ -168,6 +172,8 @@
sub SS_ID () { 9 }
sub SS_EXTRA_REFS () { 10 }
sub SS_ALCOUNT () { 11 }
+sub SS_THREAD () { 12 }
+sub SS_THREAD_QUEUE () { 13 }
# session handle structure
sub SH_HANDLE () { 0 }
@@ -191,7 +197,9 @@
sub KR_WATCHER_IDLE () { 13 }
sub KR_EXTRA_REFS () { 14 }
sub KR_ALARM_IDS () { 15 }
-sub KR_SIZE () { 16 }
+sub KR_THREAD () { 16 }
+sub KR_THREAD_QUEUE () { 17 }
+sub KR_SIZE () { 18 }
# Handle structure.
sub HND_HANDLE () { 0 }
@@ -655,22 +663,24 @@
unless (defined $poe_kernel) {
my $self = $poe_kernel = bless
- [ \%kr_sessions, # KR_SESSIONS
- \@kr_vectors, # KR_VECTORS
- \%kr_handles, # KR_HANDLES
- \@kr_states, # KR_STATES
- \%kr_signals, # KR_SIGNALS
- \%kr_aliases, # KR_ALIASES
- \$kr_active_session, # KR_ACTIVE_SESSION
- \%kr_processes, # KR_PROCESSES
- \@kr_alarms, # KR_ALARMS
- undef, # KR_ID
- \%kr_session_ids, # KR_SESSION_IDS
- \$kr_id_index, # KR_ID_INDEX
- undef, # KR_WATCHER_TIMER
- undef, # KR_WATCHER_IDLE
- \$kr_extra_refs, # KR_EXTRA_REFS
- \%kr_alarm_ids, # KR_ALARM_IDS
+ [ \%kr_sessions, # KR_SESSIONS
+ \@kr_vectors, # KR_VECTORS
+ \%kr_handles, # KR_HANDLES
+ \@kr_states, # KR_STATES
+ \%kr_signals, # KR_SIGNALS
+ \%kr_aliases, # KR_ALIASES
+ \$kr_active_session, # KR_ACTIVE_SESSION
+ \%kr_processes, # KR_PROCESSES
+ \@kr_alarms, # KR_ALARMS
+ undef, # KR_ID
+ \%kr_session_ids, # KR_SESSION_IDS
+ \$kr_id_index, # KR_ID_INDEX
+ undef, # KR_WATCHER_TIMER
+ undef, # KR_WATCHER_IDLE
+ \$kr_extra_refs, # KR_EXTRA_REFS
+ \%kr_alarm_ids, # KR_ALARM_IDS
+ 0, # KR_THREAD
+ threads::queue->new(), # KR_THREAD_QUEUE
], $type;
# Kernel ID, based on Philip Gwyn's code. I hope he still can
@@ -792,6 +802,7 @@
$kr_id_index, # SS_ID
{ }, # SS_EXTRA_REFS
0, # SS_ALCOUNT
+ 0, # SS_THREAD
];
# For the ID to session reference lookup.
@@ -808,6 +819,22 @@
} # include
+ if($session->option('thread')) {
+ $new_session->[SS_THREAD_QUEUE] = threads::queue->new();
+
+ $new_session->[SS_REFCOUNT]++; #wierd stuf happens otherwise!
+ $kr_extra_refs++;
+
+ $new_session->[SS_THREAD] = threads->create(sub {
+ %kr_sessions = ($session, $new_session);
+ $kr_active_session = $session;
+ while(my $args = $new_session->[SS_THREAD_QUEUE]->dequeue) {
+ $session->_invoke_state(@$args);
+ last if($args->[1] eq '_stop');
+ }
+ });
+ }
+
# Add the new session to its parent's children.
$kr_sessions{$source_session}->[SS_CHILDREN]->{$session} = $session;
{% ses_refcount_inc $source_session %}
@@ -835,6 +862,7 @@
# session's parent). Tell the departing session's parent that
# it has new child sessions.
+
my $parent = $kr_sessions{$session}->[SS_PARENT];
my @children = values %{$kr_sessions{$session}->[SS_CHILDREN]};
foreach my $child (@children) {
@@ -919,8 +947,15 @@
$kr_active_session = $session;
# Dispatch the event, at long last.
- my $return =
- $session->_invoke_state($source_session, $local_state, $etc, $file,
$line);
+ my $return;
+ if($kr_sessions{$session}->[SS_THREAD]) {
+ my $args = share([]);
+ @$args = ($source_session->ID(), $local_state, share([@$etc]), $file,
$line);
+ $kr_sessions{$session}->[SS_THREAD_QUEUE]->enqueue($args);
+ } else {
+ $return =
+ $session->_invoke_state($source_session, $local_state, $etc, $file,
$line);
+ }
# Stringify the state's return value if it belongs in the POE
# namespace. $return's scope exists beyond the post-dispatch
@@ -969,6 +1004,11 @@
# Remove the departing session from its parent.
+ if($kr_sessions{$session}->[SS_THREAD]) {
+ $kr_sessions{$session}->[SS_THREAD]->join();
+ }
+
+
my $parent = $kr_sessions{$session}->[SS_PARENT];
if (defined $parent) {
@@ -1314,6 +1354,8 @@
# Dispatch _start to a session, allocating it in the kernel's data
# structures as a side effect.
+use threads;
+
sub session_alloc {
my ($self, $session, @args) = @_;
@@ -1689,6 +1731,27 @@
# Attempt to resolve the destination session reference against
# various things.
+
+ if($kr_sessions{$kr_active_session}->[SS_THREAD_QUEUE]) {
+ if($destination == $kr_active_session) {
+ my $args = share([]);
+ @$args = ($kr_active_session->ID,
+ $state_name,
+ share([@etc]),
+ (caller)[1,2]);
+ $kr_sessions{$kr_active_session}->[SS_THREAD_QUEUE]->enqueue($args);
+ } else {
+ my $args = share([]);
+ @$args = ($kr_active_session->ID,
+ $state_name,
+ share([@etc]),
+ (caller)[1,2],$destination);
+ $poe_kernel->[KR_THREAD_QUEUE]->enqueue($args);
+ }
+ return;
+ }
+
+
my $session = {% alias_resolve $destination %};
{% test_resolve $destination, $session %}
--- POE-0.15/POE/Kernel/Select.pm Fri Jun 8 21:58:45 2001
+++ POE-THREADS/POE/Kernel/Select.pm Sat Jul 28 11:45:14 2001
@@ -185,6 +185,12 @@
$timeout = 3600;
}
+ if($poe_kernel->[KR_THREAD_QUEUE]) {
+ if($timeout > 1) {
+ $timeout = 1;
+ }
+ }
+
if (TRACE_QUEUE) {
warn( '*** Kernel::run() iterating. ' .
sprintf("now(%.2f) timeout(%.2f) then(%.2f)\n",
@@ -371,6 +377,15 @@
# Dispatch one or more FIFOs, if they are available. There is a
# lot of latency between executions of this code block, so we'll
# dispatch more than one event if we can.
+
+ if($poe_kernel->[KR_THREAD_QUEUE]->pending()) {
+ while(my $event = $poe_kernel->[KR_THREAD_QUEUE]->dequeue_nb()) {
+ my $session = {% alias_resolve $event->[0] %};
+ my $destination = {% alias_resolve $event->[5] %};
+ $poe_kernel->_dispatch_state($destination, $session, $event->[1],
ET_USER, $event->[2], time, $event->[3], $event->[4], 0);
+
+ }
+ }
my $stop_time = time() + FIFO_DISPATCH_TIME;
while (@kr_states) {
--
Arthur