In my current application I use a ReadWrite wheel to move a file from one 
location to another (it could be rather large).  A small percentage of the 
time, the newly created file that's copied to is missing the last block of 
data from the original file (<= 512 bytes).  I'm not sure why.

My 'fix' has ErrorEvent flagging an EOF variable on the HEAP and thus 
FlushedEvent becomes responsible for deleting my Read Write Wheel.  With this 
setup, I never end up with a copied file smaller than its original, but most 
of my Wheels never go away.  (I assume because FlushedEvent is usually -- 
never? -- called after ErrorEvent flies.)

I surmize this is because ErrorEvent is almost always triggered after the 
last FlushedEvent, but once in a while, ErrorEvent gets hit first, deletes my 
wheel before it can do a FlushedEvent, and I lose data.  At least, that seems 
to match the behavior I'm experiencing.

I wrote a test that copies some reasonable number of files from a /usr/bin to 
/dev/null and compares the original size to the copied number of bytes to see 
if there's ever any data loss.  There isn't.  So my problem isn't a POE bug, 
it seems.  I tested with the code below(1).

In any event, seeing as it doesn't seem to be a POE problem, I'm really out 
of ideas as to what to do to track this down.  If anyone can think of a 
reason why the last block of data might not be getting flushed before the 
wheel's destruction in my ErrorEvent, please say so.

In the meantime I might switch to File::Copy and take the 
blocking hit while potentially arbritrarily large files are moved around my 
file system.

Thanks.

(1)
use strict;
use warnings;

use lib '/home/jasonb/src/poe/poe/blib/lib';

#sub POE::Kernel::TRACE_GARBAGE() { 1 } # 100s of MBs of data in a few min

use POE qw( Wheel::ReadWrite Filter::Stream Driver::SysRW );
use POE::Session;

package test;

use POE::Session;
use Fcntl;
use IO::File;

sub new {
  my( $package ) = @_;
  my $self = bless { }, $package;
  POE::Session->create(
    object_states =>
      [ $self => [ qw( _start fh_read fh_error socket_flush _stop) ] ]
  );
  undef;
}

sub _start {
  my ($self ) = $_[ OBJECT ];
  my $x = 0;
  opendir(D, '/usr/bin');
  if($!) { warn "Failed to readdir: $!\n"; return; }

  foreach my $package ( readdir(D) ) {

    next if $package =~ m/^\.+/;
#    next unless $package =~ m/^(a)/;
    next unless (stat("/usr/bin/$package"))[7] > 90000;
    next unless (stat("/usr/bin/$package"))[7] < 100000;

    my $write = new IO::File('/dev/null',
    O_CREAT | O_WRONLY | O_NONBLOCK) or die $!;
    my $read = new IO::File("/usr/bin/$package",
    O_RDONLY | O_NONBLOCK) or die "$package $!";

    warn "testing $package\n";
    $self->copy_file( $write, $read, $package );

  }
  closedir D;
}

sub copy_file {
  my( $self, $write_fh, $read_fh, $package ) = @_[0..3];

  if(defined $write_fh && defined $read_fh ) {
    $self->{sessions}->{$package} = POE::Wheel::ReadWrite->new(
      InputHandle => $read_fh,
      OutputHandle => $write_fh,
      Driver => POE::Driver::SysRW->new(),
      Filter => POE::Filter::Stream->new(),
      InputEvent => 'fh_read',
      FlushedEvent => 'socket_flush',
      ErrorEvent => 'fh_error'
    );

        my $lame =  (stat($read_fh))[7];
        $self->{sizes}->{$package}->[0] = $lame;
  }
  else {
    warn "permissions error";
  }
}

sub fh_read {
  my( $self, $record, $wheel_id ) = @_[ OBJECT, ARG0, ARG1 ];
    foreach my $package (keys %{$self->{sessions}} ) {
      if($self->{sessions}->{$package}->ID == $wheel_id) {
        $self->{sessions}->{$package}->put( $record );
#        if( defined $self->{sizes}->{$package}->[1]) {
#          warn "For $package read ".length($record)." of " 
..$self->{sizes}->{$package}->[1] . "\n";
#        }
        $self->{sizes}->{$package}->[1] += length ($record);
        last;
      }
    }
}

sub fh_error {
  my( $self, $op, $errnum, $errstr, $wheel_id ) = @_[ OBJECT, ARG0, ARG1, 
ARG2, ARG3 ];
  if( $errnum == 0 ) {
    foreach my $package (keys %{$self->{sessions}} ) {
      if($self->{sessions}->{$package}->ID == $wheel_id) {
        delete $self->{sessions}->{$package};
        warn "$package : $self->{sizes}->{$package}->[0] vs 
$self->{sizes}->{$package}->[1]\n";
        if($self->{sizes}->{$package}->[0] != 
$self->{sizes}->{$package}->[1]) {
          warn "copy error on $package\n";
        }
        last;
      }
    }
  }

  return;
}

sub socket_flush {
  # do nothing
}

sub _stop {
}

package main;

use POE::Session;

test->new();

$poe_kernel->run();

exit;

Reply via email to