Heres a quick diff -u of my copy of Run.pm that uses get_one instead
of get(), adds pause and resume functions for the output filehandles
and adds set_filter methods as well.
I thought I'd send it on incase anyone else had use for it.
The bits and peices are just copied from ReadWrite for the most part.
@@ -508,6 +508,44 @@
my $stdout_event = \$self->[EVENT_STDOUT];
my $is_active = \$self->[IS_ACTIVE];
+ if ( $stdout_filter->can('get_one') and
+ $stdout_filter->can('get_one_start') ) {
+
+ $poe_kernel->state
+ ( $self->[STATE_STDOUT] = ref($self) . "($unique_id) -> select stdout",
+ sub {
+ # prevents SEGV
+ 0 && CRIMSON_SCOPE_HACK('<');
+
+ # subroutine starts here
+ my ($k, $me, $handle) = @_[KERNEL, SESSION, ARG0];
+ if (defined(my $raw_input = $driver->get($handle))) {
+ $stdout_filter->get_one_start($raw_input);
+ while (1) {
+ my $next_rec = $stdout_filter->get_one();
+ last unless @$next_rec;
+ foreach my $cooked_input (@$next_rec) {
+ $k->call($me, $$stdout_event, $cooked_input, $unique_id);
+ }
+ }
+ }
+ else {
+ $$error_event and
+ $k->call( $me, $$error_event,
+ 'read', ($!+0), $!, $unique_id, 'STDOUT'
+ );
+ unless (--$$is_active) {
+ $k->call( $me, $$close_event, $unique_id )
+ if defined $$close_event;
+ }
+ $k->select_read($handle);
+ }
+ }
+ );
+
+ }
+ else {
+
$poe_kernel->state
( $self->[STATE_STDOUT] = ref($self) . "($unique_id) -> select stdout",
sub {
@@ -534,6 +572,7 @@
}
}
);
+ } # End if else can't get_one
# register the state's select
$poe_kernel->select_read($self->[HANDLE_STDOUT], $self->[STATE_STDOUT]);
@@ -565,6 +604,44 @@
my $stderr_event = \$self->[EVENT_STDERR];
my $is_active = \$self->[IS_ACTIVE];
+ if ( $stderr_filter->can('get_one') and
+ $stderr_filter->can('get_one_start') ) {
+
+ $poe_kernel->state
+ ( $self->[STATE_STDERR] = ref($self) . "($unique_id) -> select stderr",
+ sub {
+ # prevents SEGV
+ 0 && CRIMSON_SCOPE_HACK('<');
+
+ # subroutine starts here
+ my ($k, $me, $handle) = @_[KERNEL, SESSION, ARG0];
+ if (defined(my $raw_input = $driver->get($handle))) {
+ $stderr_filter->get_one_start($raw_input);
+ while (1) {
+ my $next_rec = $stderr_filter->get_one();
+ last unless @$next_rec;
+ foreach my $cooked_input (@$next_rec) {
+ $k->call($me, $$stderr_event, $cooked_input, $unique_id);
+ }
+ }
+ }
+ else {
+ $$error_event and
+ $k->call( $me, $$error_event,
+ 'read', ($!+0), $!, $unique_id, 'STDERR'
+ );
+ unless (--$$is_active) {
+ $k->call( $me, $$close_event, $unique_id )
+ if defined $$close_event;
+ }
+ $k->select_read($handle);
+ }
+ }
+ );
+
+ }
+ else {
+
$poe_kernel->state
( $self->[STATE_STDERR] = ref($self) . "($unique_id) -> select stderr",
sub {
@@ -592,6 +669,8 @@
}
);
+ } # End else can't get_one
+
# register the state's select
$poe_kernel->select_read($self->[HANDLE_STDERR], $self->[STATE_STDERR]);
}
@@ -700,26 +779,176 @@
return 0;
}
+sub pause_stdout {
+ my $self = shift();
+ if (defined $self->[HANDLE_STDOUT]) {
+ $poe_kernel->select_pause_read( $self->[HANDLE_STDOUT] );
+ }
+}
+
+sub pause_stderr {
+ my $self = shift();
+ if (defined $self->[HANDLE_STDERR]) {
+ $poe_kernel->select_pause_read( $self->[HANDLE_STDERR] );
+ }
+}
+
+sub resume_stdout {
+ my $self = shift();
+ if (defined $self->[HANDLE_STDOUT]) {
+ $poe_kernel->select_resume_read( $self->[HANDLE_STDOUT] ); }
+}
+
+sub resume_stderr {
+ my $self = shift();
+ if (defined $self->[HANDLE_STDERR]) {
+ $poe_kernel->select_resume_read( $self->[HANDLE_STDERR] ); }
+}
+
+
#------------------------------------------------------------------------------
# Redefine filters, one at a time or at once. This is based on PG's
# code in Wheel::ReadWrite.
-sub set_filter {
- croak "set_filter not implemented";
+
+sub _transfer_stdout_buffer {
+ my ($self, $buf) = @_;
+
+ my $old_output_filter = $self->[FILTER_STDOUT];
+
+ # Assign old buffer contents and send out pending packets.
+
+ # If the new filter implements "get_one", use that.
+ if ( $old_output_filter->can('get_one') and
+ $old_output_filter->can('get_one_start')
+ ) {
+ if (defined $buf) {
+ $self->[FILTER_STDOUT]->get_one_start($buf);
+
+ while ($self->[FILTER_STDOUT] == $old_output_filter) {
+ # the conditional checks if an event handler has switched the filter
+out from
+ # under our feet (again).
+
+ my $next_rec = $self->[FILTER_STDOUT]->get_one();
+ last unless @$next_rec;
+ foreach my $cooked_input (@$next_rec) {
+ $poe_kernel->call( $poe_kernel->get_active_session(),
+ $self->[EVENT_STDOUT],
+ $cooked_input, $self->[UNIQUE_ID]
+ );
+ }
+ }
+ }
+ }
+
+ # Otherwise use the old behavior.
+ else {
+ if (defined $buf) {
+ foreach my $cooked_input (@{$self->[FILTER_STDOUT]->get($buf)}) {
+ $poe_kernel->call( $poe_kernel->get_active_session(),
+ $self->[EVENT_STDOUT],
+ $cooked_input, $self->[UNIQUE_ID]
+ );
+ }
+ }
+ }
}
-sub set_stdin_filter {
- croak "set_stdin_filter not implemented";
+
+sub _transfer_stderr_buffer { # copy of _transfer_stdout_buffer
+ my ($self, $buf) = @_;
+
+ my $old_error_filter = $self->[FILTER_STDERR];
+
+ # Assign old buffer contents and send out pending packets.
+
+ # If the new filter implements "get_one", use that.
+ if ( $old_error_filter->can('get_one') and
+ $old_error_filter->can('get_one_start')
+ ) {
+ if (defined $buf) {
+ $self->[FILTER_STDERR]->get_one_start($buf);
+
+ while ($self->[FILTER_STDERR] == $old_error_filter) {
+ # the conditional checks if an event handler has switched the filter
+out from
+ # under our feet (again).
+
+ my $next_rec = $self->[FILTER_STDERR]->get_one();
+ last unless @$next_rec;
+ foreach my $cooked_input (@$next_rec) {
+ $poe_kernel->call( $poe_kernel->get_active_session(),
+ $self->[EVENT_STDERR],
+ $cooked_input, $self->[UNIQUE_ID]
+ );
+ }
+ }
+ }
+ }
+
+ # Otherwise use the old behavior.
+ else {
+ if (defined $buf) {
+ foreach my $cooked_input (@{$self->[FILTER_STDERR]->get($buf)}) {
+ $poe_kernel->call( $poe_kernel->get_active_session(),
+ $self->[EVENT_STDERR],
+ $cooked_input, $self->[UNIQUE_ID]
+ );
+ }
+ }
+ }
}
+
+sub set_stdio_filter {
+ my ($self, $new_filter) = @_;
+ $self->set_stdout_filter($new_filter);
+ $self->set_stdin_filter($new_filter);
+}
+
+sub set_stdin_filter {
+ my ($self, $new_filter) = @_;
+ $self->[FILTER_STDIN] = $new_filter;
+}
+
sub set_stdout_filter {
- croak "set_stdout_filter not implemented";
-}
+ my ($self, $new_filter) = @_;
+
+ my $buf = $self->[FILTER_STDOUT]->get_pending();
+ $self->[FILTER_STDOUT] = $new_filter;
+ $self->_define_stdout_reader();
+ $self->_transfer_stdout_buffer($buf);
+}
+
sub set_stderr_filter {
- croak "set_stderr_filter not implemented";
+ my ($self, $new_filter) = @_;
+
+ my $buf = $self->[FILTER_STDERR]->get_pending();
+ $self->[FILTER_STDERR] = $new_filter;
+
+ $self->_define_stderr_reader();
+ $self->_transfer_stderr_buffer($buf);
+}
+
+sub get_stdin_filter {
+ my $self = shift;
+ return $self->[FILTER_STDIN];
}
+sub get_stdout_filter {
+ my $self = shift;
+ return $self->[FILTER_STDOUT];
+}
+
+sub get_stderr_filter {
+ my $self = shift;
+ return $self->[FILTER_STDERR];
+}
+
+
+
+
+
#------------------------------------------------------------------------------
# Data accessors.
@@ -960,22 +1189,37 @@
will be flushed asynchronously once the current state returns. Each
item in the LIST is processed according to the C<StdinFilter>.
-=item set_filter FILTER_REFERENCE
+=item get_stdin_filter
-Set C<StdinFilter>, C<StdoutFilter>, and C<StderrFilter> all at once.
-Not yet implemented.
+=item get_stdout_filter
-=item set_stdin_filter FILTER_REFERENCE
+=item get_stderr_filter
-Set C<StdinFilter> to something else. Not yet implemented.
+Get C<StdinFilter>, C<StdoutFilter>, or C<StderrFilter> respectively.
-=item set_stdout_filter FILTER_REFERENCE
+=item set_stdio_filter FILTER_REFERENCE
+
+Set C<StdinFilter>, C<StdoutFilter> at once.
+
+=item set_stdin_filter FILTER_REFERENCE
-Set C<StdoutFilter> to something else. Not yet implemented.
+=item set_stdout_filter FILTER_REFERENCE
=item set_stderr_filter FILTER_REFERENCE
-Set C<StderrFilter> to something else. Not yet implemented.
+Set C<StdinFilter>, C<StdoutFilter>, or C<StderrFilter> respectively.
+
+=item pause_stdout
+
+=item pause_stderr
+
+=item resume_stdout
+
+=item resume_stderr
+
+Pause/Resume C<StdoutEvent>, or C<StderrEvent> events. By using these
+methods a session can control the flow of Stdout and Stderr events
+coming in from this child process.
=item ID