Heres a quick diff -u of my copy of Run.pm that uses get_one instead
of get(), adds pause and resume functions for the output filehandles
and adds set_filter methods as well.

I thought I'd send it on incase anyone else had use for it.

The bits and peices are just copied from ReadWrite for the most part.


@@ -508,6 +508,44 @@
     my $stdout_event  = \$self->[EVENT_STDOUT];
     my $is_active     = \$self->[IS_ACTIVE];
 
+       if (    $stdout_filter->can('get_one') and 
+                       $stdout_filter->can('get_one_start') ) {
+
+    $poe_kernel->state
+      ( $self->[STATE_STDOUT] = ref($self) . "($unique_id) -> select stdout",
+        sub {
+          # prevents SEGV
+          0 && CRIMSON_SCOPE_HACK('<');
+
+          # subroutine starts here
+          my ($k, $me, $handle) = @_[KERNEL, SESSION, ARG0];
+          if (defined(my $raw_input = $driver->get($handle))) {
+                       $stdout_filter->get_one_start($raw_input);
+            while (1) {
+                my $next_rec = $stdout_filter->get_one();
+                last unless @$next_rec;
+                foreach my $cooked_input (@$next_rec) {
+                  $k->call($me, $$stdout_event, $cooked_input, $unique_id);
+                }
+             }
+          }
+          else {
+            $$error_event and
+              $k->call( $me, $$error_event,
+                        'read', ($!+0), $!, $unique_id, 'STDOUT'
+                      );
+            unless (--$$is_active) {
+              $k->call( $me, $$close_event, $unique_id )
+                if defined $$close_event;
+            }
+            $k->select_read($handle);
+          }
+        }
+      );
+
+       }
+       else {
+
     $poe_kernel->state
       ( $self->[STATE_STDOUT] = ref($self) . "($unique_id) -> select stdout",
         sub {
@@ -534,6 +572,7 @@
           }
         }
       );
+       } # End if else can't get_one
 
     # register the state's select
     $poe_kernel->select_read($self->[HANDLE_STDOUT], $self->[STATE_STDOUT]);
@@ -565,6 +604,44 @@
     my $stderr_event  = \$self->[EVENT_STDERR];
     my $is_active     = \$self->[IS_ACTIVE];
 
+       if (    $stderr_filter->can('get_one') and 
+                       $stderr_filter->can('get_one_start') ) {
+
+    $poe_kernel->state
+      ( $self->[STATE_STDERR] = ref($self) . "($unique_id) -> select stderr",
+        sub {
+          # prevents SEGV
+          0 && CRIMSON_SCOPE_HACK('<');
+
+          # subroutine starts here
+          my ($k, $me, $handle) = @_[KERNEL, SESSION, ARG0];
+          if (defined(my $raw_input = $driver->get($handle))) {
+            $stderr_filter->get_one_start($raw_input);
+            while (1) {
+                my $next_rec = $stderr_filter->get_one();
+                last unless @$next_rec;
+                foreach my $cooked_input (@$next_rec) {
+                  $k->call($me, $$stderr_event, $cooked_input, $unique_id);
+                }
+            }
+          }
+          else {
+            $$error_event and
+              $k->call( $me, $$error_event,
+                        'read', ($!+0), $!, $unique_id, 'STDERR'
+                      );
+            unless (--$$is_active) {
+              $k->call( $me, $$close_event, $unique_id )
+                if defined $$close_event;
+            }
+            $k->select_read($handle);
+          }
+        }
+      );
+
+       }
+       else {
+
     $poe_kernel->state
       ( $self->[STATE_STDERR] = ref($self) . "($unique_id) -> select stderr",
         sub {
@@ -592,6 +669,8 @@
         }
       );
 
+       } # End else can't get_one
+
     # register the state's select
     $poe_kernel->select_read($self->[HANDLE_STDERR], $self->[STATE_STDERR]);
   }
@@ -700,26 +779,176 @@
   return 0;
 }
 
+sub pause_stdout {
+        my $self = shift();
+        if (defined $self->[HANDLE_STDOUT]) {
+            $poe_kernel->select_pause_read( $self->[HANDLE_STDOUT] );
+        }
+}
+
+sub pause_stderr {
+        my $self = shift();
+        if (defined $self->[HANDLE_STDERR]) {
+            $poe_kernel->select_pause_read( $self->[HANDLE_STDERR] );
+        }
+}
+
+sub resume_stdout {
+        my $self = shift();
+        if (defined $self->[HANDLE_STDOUT]) {
+            $poe_kernel->select_resume_read( $self->[HANDLE_STDOUT] );        }
+}
+
+sub resume_stderr {
+        my $self = shift();
+        if (defined $self->[HANDLE_STDERR]) {
+            $poe_kernel->select_resume_read( $self->[HANDLE_STDERR] );        }
+}
+
+
 #------------------------------------------------------------------------------
 # Redefine filters, one at a time or at once.  This is based on PG's
 # code in Wheel::ReadWrite.
 
-sub set_filter {
-  croak "set_filter not implemented";
+
+sub _transfer_stdout_buffer {
+  my ($self, $buf) = @_;
+
+  my $old_output_filter = $self->[FILTER_STDOUT];
+  
+  # Assign old buffer contents and send out pending packets.
+
+  # If the new filter implements "get_one", use that.
+  if ( $old_output_filter->can('get_one') and
+       $old_output_filter->can('get_one_start')
+     ) {
+    if (defined $buf) {
+      $self->[FILTER_STDOUT]->get_one_start($buf);
+
+      while ($self->[FILTER_STDOUT] == $old_output_filter) {
+               # the conditional checks if an event handler has switched the filter 
+out from 
+               # under our feet (again).
+
+        my $next_rec = $self->[FILTER_STDOUT]->get_one();
+        last unless @$next_rec;
+        foreach my $cooked_input (@$next_rec) {
+          $poe_kernel->call( $poe_kernel->get_active_session(),
+                             $self->[EVENT_STDOUT],
+                             $cooked_input, $self->[UNIQUE_ID]
+                           );
+        }
+      }
+    }
+  }
+
+  # Otherwise use the old behavior.
+  else {
+    if (defined $buf) {
+      foreach my $cooked_input (@{$self->[FILTER_STDOUT]->get($buf)}) {
+        $poe_kernel->call( $poe_kernel->get_active_session(),
+                           $self->[EVENT_STDOUT],
+                           $cooked_input, $self->[UNIQUE_ID]
+                         );
+      }
+    }
+  }
 }
 
-sub set_stdin_filter {
-  croak "set_stdin_filter not implemented";
+
+sub _transfer_stderr_buffer { # copy of _transfer_stdout_buffer
+  my ($self, $buf) = @_;
+
+  my $old_error_filter = $self->[FILTER_STDERR];
+  
+  # Assign old buffer contents and send out pending packets.
+
+  # If the new filter implements "get_one", use that.
+  if ( $old_error_filter->can('get_one') and
+       $old_error_filter->can('get_one_start')
+     ) {
+    if (defined $buf) {
+      $self->[FILTER_STDERR]->get_one_start($buf);
+
+      while ($self->[FILTER_STDERR] == $old_error_filter) {
+               # the conditional checks if an event handler has switched the filter 
+out from 
+               # under our feet (again).
+
+        my $next_rec = $self->[FILTER_STDERR]->get_one();
+        last unless @$next_rec;
+        foreach my $cooked_input (@$next_rec) {
+          $poe_kernel->call( $poe_kernel->get_active_session(),
+                             $self->[EVENT_STDERR],
+                             $cooked_input, $self->[UNIQUE_ID]
+                           );
+        }
+      }
+    }
+  }
+
+  # Otherwise use the old behavior.
+  else {
+    if (defined $buf) {
+      foreach my $cooked_input (@{$self->[FILTER_STDERR]->get($buf)}) {
+        $poe_kernel->call( $poe_kernel->get_active_session(),
+                           $self->[EVENT_STDERR],
+                           $cooked_input, $self->[UNIQUE_ID]
+                         );
+      }
+    }
+  }
 }
 
+
+sub set_stdio_filter {
+       my ($self, $new_filter) = @_;
+       $self->set_stdout_filter($new_filter);
+       $self->set_stdin_filter($new_filter);
+}   
+  
+sub set_stdin_filter {
+       my ($self, $new_filter) = @_;
+       $self->[FILTER_STDIN] = $new_filter;
+}   
+  
 sub set_stdout_filter {
-  croak "set_stdout_filter not implemented";
-}
+       my ($self, $new_filter) = @_;
+
+       my $buf = $self->[FILTER_STDOUT]->get_pending();
+       $self->[FILTER_STDOUT] = $new_filter;
 
+       $self->_define_stdout_reader();
+       $self->_transfer_stdout_buffer($buf);
+} 
+  
 sub set_stderr_filter {
-  croak "set_stderr_filter not implemented";
+       my ($self, $new_filter) = @_;
+
+       my $buf = $self->[FILTER_STDERR]->get_pending();
+       $self->[FILTER_STDERR] = $new_filter;
+
+       $self->_define_stderr_reader();
+       $self->_transfer_stderr_buffer($buf);
+} 
+
+sub get_stdin_filter {
+  my $self = shift;
+  return $self->[FILTER_STDIN];
 }
 
+sub get_stdout_filter {
+  my $self = shift;
+  return $self->[FILTER_STDOUT];
+}
+
+sub get_stderr_filter {
+  my $self = shift;
+  return $self->[FILTER_STDERR];
+}
+
+
+
+
+
 #------------------------------------------------------------------------------
 # Data accessors.
 
@@ -960,22 +1189,37 @@
 will be flushed asynchronously once the current state returns.  Each
 item in the LIST is processed according to the C<StdinFilter>.
 
-=item set_filter FILTER_REFERENCE
+=item get_stdin_filter
 
-Set C<StdinFilter>, C<StdoutFilter>, and C<StderrFilter> all at once.
-Not yet implemented.
+=item get_stdout_filter
 
-=item set_stdin_filter FILTER_REFERENCE
+=item get_stderr_filter
 
-Set C<StdinFilter> to something else.  Not yet implemented.
+Get C<StdinFilter>, C<StdoutFilter>, or C<StderrFilter> respectively.
 
-=item set_stdout_filter FILTER_REFERENCE
+=item set_stdio_filter FILTER_REFERENCE
+
+Set C<StdinFilter>, C<StdoutFilter> at once.
+
+=item set_stdin_filter FILTER_REFERENCE
 
-Set C<StdoutFilter> to something else.  Not yet implemented.
+=item set_stdout_filter FILTER_REFERENCE
 
 =item set_stderr_filter FILTER_REFERENCE
 
-Set C<StderrFilter> to something else.  Not yet implemented.
+Set C<StdinFilter>, C<StdoutFilter>, or C<StderrFilter> respectively.
+
+=item pause_stdout
+
+=item pause_stderr
+
+=item resume_stdout
+
+=item resume_stderr
+
+Pause/Resume C<StdoutEvent>, or C<StderrEvent> events. By using these 
+methods a session can control the flow of Stdout and Stderr events
+coming in from this child process.
 
 =item ID
 

Reply via email to