Here is a new module for use with POE
package POE::Filter::SyncStream;

use strict;
use warnings 'all';

use constant OBJ_STATE   => 0;
use constant OBJ_LENGTH  => 1;
use constant OBJ_PENDING => 2;

use constant STATE_FIND_LENGTH   => 0x00;
use constant STATE_READ_DATA     => 0x01;
use constant STATE_RESYNCHRONIZE => 0x02;

#------------------------------------------------------------------------------

sub new {
  my $type = shift;
  my $self = bless [STATE_FIND_LENGTH, 0, ''], $type;
  $self;
}

#------------------------------------------------------------------------------

sub get {
  my ($self, $stream) = @_;
  my $buffer = $self->[OBJ_PENDING] . join('', @$stream);
  my $ret = [];
  LOOP: for (;;) {
        if     ( $self->[OBJ_STATE] == STATE_FIND_LENGTH   ) {
                my $ready_for_length = $buffer =~ /[^0-9A-F]/s; # it contains non 
digits
                last LOOP if ( not $ready_for_length ); # nothing to do yet
                my ($length) = $buffer =~ /^([0-9A-F]+):/s;
                if ( not $length ) { # oops! bad format in data
                        $self->[OBJ_STATE] = STATE_RESYNCHRONIZE;
                        next LOOP;
                }
                $buffer =~ s/^[0-9A-F]+:(.*)$/$1/s; # strip length
                $self->[OBJ_LENGTH] = hex($length);
                $self->[OBJ_STATE] = STATE_READ_DATA;
                next LOOP;
        } elsif( $self->[OBJ_STATE] == STATE_READ_DATA     ) {
                my $length = $self->[OBJ_LENGTH];
                last LOOP if length($buffer) <= $length; # nothing to do yet
                if ( substr($buffer, $length, 1) ne "\x00") { # oops! bad format in 
data
                        $self->[OBJ_STATE] = STATE_RESYNCHRONIZE;
                        next LOOP;
                }
                my $data = substr($buffer, 0, ($length + 1), ''); # slurp and remove
                chop $data; # off comes the "\x00"
                $data =~ s/\\0/\x00/sg;
                $data =~ s/\\\\/\\/sg;
                push @$ret, $data;
                $self->[OBJ_STATE] = STATE_FIND_LENGTH;
                next LOOP;
        } elsif( $self->[OBJ_STATE] == STATE_RESYNCHRONIZE ) {
                last LOOP if ( not $buffer =~ /\x00/s ); # nothing to do yet
                $buffer =~ s/^.*\x00(.*)$/$1/s; # resynchronize at the zero byte
                $self->[OBJ_STATE] = STATE_FIND_LENGTH; # and try again
                next LOOP;
        } else { die "shouldn't happen!"; }
  }
  $self->[OBJ_PENDING] = $buffer;
  $ret;
}

#------------------------------------------------------------------------------

sub put {
  my ($self, $chunks) = @_;
  my $ret = [];
  foreach my $chunk (@$chunks) {
        $chunk =~ s/\\/\\\\/sg;
        $chunk =~ s/\x00/\\0/sg; 
        push @$ret, sprintf "\%X:\%s\x00", length($chunk), $chunk;
  }
}

#------------------------------------------------------------------------------

sub get_pending {
  my ($self) = @_;
  my $ret = $self->[OBJ_PENDING];
  $self->[OBJ_PENDING] = '';
  $ret;
}

###############################################################################
1;

__END__

=head1 NAME

POE::Filter::SyncStream - convert to and from SyncBlocks

=head1 SYNOPSIS

  $filter = POE::Filter::SyncStream->new();
  $arrayref_of_logical_chunks =
    $filter->get($arrayref_of_raw_chunks_from_driver);
  $arrayref_of_streamable_chunks_for_driver =
     $filter->put($arrayref_of_logical_chunks);

=head1 DESCRIPTION

A SyncStream is a series of SyncBlocks, each of which consists of a
length in uppercase hexadecimal, a colon, a block of data as long as
specified in the length, and a zero byte.

Within that block of data, all backslashes have been doubled, then
all zero bytes have been escaped as backslash zero. Reverse this
process to recover the original block.

The useful feature of this is that should a block be damaged or lost,
the parser can skip ahead to the next zero byte and pick up the
stream at the next valid block. Damaged data is discarded.

CRC or HMAC of the blocks' contents is left as an exercise for the
user.

=head1 SEE ALSO

POE::Filter.

The SEE ALSO section in L<POE> contains a table of contents covering
the entire POE distribution.

=head1 BUGS

Oh, probably some.

=head1 AUTHORS & COPYRIGHTS

This code was written by Julian Morrison <[EMAIL PROTECTED]>

Derived from POE::Filter::Stream and distributed under the same terms.

Please see L<POE> for more information about authors and contributors.

=cut

Reply via email to