In my current application I use a ReadWrite wheel to move a file from one
location to another (it could be rather large). A small percentage of the
time, the newly created file that's copied to is missing the last block of
data from the original file (<= 512 bytes). I'm not sure why.
My 'fix' has ErrorEvent flagging an EOF variable on the HEAP and thus
FlushedEvent becomes responsible for deleting my Read Write Wheel. With this
setup, I never end up with a copied file smaller than its original, but most
of my Wheels never go away. (I assume because FlushedEvent is usually --
never? -- called after ErrorEvent flies.)
I surmize this is because ErrorEvent is almost always triggered after the
last FlushedEvent, but once in a while, ErrorEvent gets hit first, deletes my
wheel before it can do a FlushedEvent, and I lose data. At least, that seems
to match the behavior I'm experiencing.
I wrote a test that copies some reasonable number of files from a /usr/bin to
/dev/null and compares the original size to the copied number of bytes to see
if there's ever any data loss. There isn't. So my problem isn't a POE bug,
it seems. I tested with the code below(1).
In any event, seeing as it doesn't seem to be a POE problem, I'm really out
of ideas as to what to do to track this down. If anyone can think of a
reason why the last block of data might not be getting flushed before the
wheel's destruction in my ErrorEvent, please say so.
In the meantime I might switch to File::Copy and take the
blocking hit while potentially arbritrarily large files are moved around my
file system.
Thanks.
(1)
use strict;
use warnings;
use lib '/home/jasonb/src/poe/poe/blib/lib';
#sub POE::Kernel::TRACE_GARBAGE() { 1 } # 100s of MBs of data in a few min
use POE qw( Wheel::ReadWrite Filter::Stream Driver::SysRW );
use POE::Session;
package test;
use POE::Session;
use Fcntl;
use IO::File;
sub new {
my( $package ) = @_;
my $self = bless { }, $package;
POE::Session->create(
object_states =>
[ $self => [ qw( _start fh_read fh_error socket_flush _stop) ] ]
);
undef;
}
sub _start {
my ($self ) = $_[ OBJECT ];
my $x = 0;
opendir(D, '/usr/bin');
if($!) { warn "Failed to readdir: $!\n"; return; }
foreach my $package ( readdir(D) ) {
next if $package =~ m/^\.+/;
# next unless $package =~ m/^(a)/;
next unless (stat("/usr/bin/$package"))[7] > 90000;
next unless (stat("/usr/bin/$package"))[7] < 100000;
my $write = new IO::File('/dev/null',
O_CREAT | O_WRONLY | O_NONBLOCK) or die $!;
my $read = new IO::File("/usr/bin/$package",
O_RDONLY | O_NONBLOCK) or die "$package $!";
warn "testing $package\n";
$self->copy_file( $write, $read, $package );
}
closedir D;
}
sub copy_file {
my( $self, $write_fh, $read_fh, $package ) = @_[0..3];
if(defined $write_fh && defined $read_fh ) {
$self->{sessions}->{$package} = POE::Wheel::ReadWrite->new(
InputHandle => $read_fh,
OutputHandle => $write_fh,
Driver => POE::Driver::SysRW->new(),
Filter => POE::Filter::Stream->new(),
InputEvent => 'fh_read',
FlushedEvent => 'socket_flush',
ErrorEvent => 'fh_error'
);
my $lame = (stat($read_fh))[7];
$self->{sizes}->{$package}->[0] = $lame;
}
else {
warn "permissions error";
}
}
sub fh_read {
my( $self, $record, $wheel_id ) = @_[ OBJECT, ARG0, ARG1 ];
foreach my $package (keys %{$self->{sessions}} ) {
if($self->{sessions}->{$package}->ID == $wheel_id) {
$self->{sessions}->{$package}->put( $record );
# if( defined $self->{sizes}->{$package}->[1]) {
# warn "For $package read ".length($record)." of "
..$self->{sizes}->{$package}->[1] . "\n";
# }
$self->{sizes}->{$package}->[1] += length ($record);
last;
}
}
}
sub fh_error {
my( $self, $op, $errnum, $errstr, $wheel_id ) = @_[ OBJECT, ARG0, ARG1,
ARG2, ARG3 ];
if( $errnum == 0 ) {
foreach my $package (keys %{$self->{sessions}} ) {
if($self->{sessions}->{$package}->ID == $wheel_id) {
delete $self->{sessions}->{$package};
warn "$package : $self->{sizes}->{$package}->[0] vs
$self->{sizes}->{$package}->[1]\n";
if($self->{sizes}->{$package}->[0] !=
$self->{sizes}->{$package}->[1]) {
warn "copy error on $package\n";
}
last;
}
}
}
return;
}
sub socket_flush {
# do nothing
}
sub _stop {
}
package main;
use POE::Session;
test->new();
$poe_kernel->run();
exit;