package POE::QueryAgent;

=head1 NAME

POE::QueryAgent - POE Component for running asynchronous DBI calls.

=head1 SYNOPSIS

 sub _start {
    my ($self, $kernel, $heap) = @_[OBJECT, KERNEL, HEAP];

    $heap->{helper} = POE::QueryAgent->new( DSN => [$dsn,
					       $username,
					       $password
					      ],
				       Queries => $self->make_queries,
				       Debug => 1,
				     );

	# Queries takes a hashref of the form:
	# { query_name => 'select blah from table where x = ?',
	#   other_query => 'select blah_blah from big_view',
	#   etc.
	# }

    $heap->{helper}->query(query_name => session => 'get_row_from_qa');

 }

 sub get_row_from_qa {
    my ($kernel, $self, $heap, $row) = @_[KERNEL, OBJECT, HEAP, ARG0];
    if ($row ne 'EOF') {

 # {{{ PROCESS A ROW

	#row is a listref of columns

 # }}} PROCESS A ROW

    } else {

 # {{{ NO MORE ROWS

	#cleanup code here

 # }}} NO MORE ROWS

    }

 }


=head1 DESCRIPTION



=cut

# {{{ POE::QueryAgent

#use DBI;
#use Daemon; # qw//;
use Socket qw/:crlf/;
#use Data::Dumper;
use Storable qw/freeze thaw/;
use Carp;

use strict;
use POE qw/Session Filter::Line Wheel::Run/;

sub debug { $_[0]->{debug} }

#sub carp { warn @_ }
#sub croak { die @_ }

sub new {
    my $type = shift;

    croak "$type needs an even number of parameters" if @_ & 1;
    my %params = @_;

    my $dsn = delete $params{DSN};
    croak "$type needs a DSN parameter" unless defined $dsn;
    croak "DSN needs to be an array reference" unless ref $dsn eq 'ARRAY';

    my $queries = delete $params{Queries};
    croak "$type needs a Queries parameter" unless defined $queries;
    croak "Queries needs to be a hash reference" unless ref $queries eq 'HASH';

    my $debug = delete $params{Debug} || 0;

    # Make sure the user didn't pass in parameters we're not aware of.
    if (scalar keys %params) {
	carp( "unknown parameters in $type constructor call: ",
	      join(', ', sort keys %params)
	    );
    }
    my $self = bless {}, $type;
    my $config = shift;

    $self->{dsn} = $dsn;
    $self->{queries} = $queries;
    $self->{debug} = $debug;

    POE::Session->new( $self,
		       [ qw [ _start _stop db_reply remote_stderr error ] ]
		     );
    return $self;

}

sub query {
    my ($self, $query, $package, $state, @rest) = @_;

    $self->debug && carp "QA: Running query ($query) for package ($package) to state ($state)\n";

    $self->{helper}->put(join '|', $query, $package, $state, @rest);

}

# {{{ STATES

sub _start {
    my ($self, $kernel, $heap, $dsn, $queries) = @_[OBJECT, KERNEL, HEAP, ARG0, ARG1];


    $self->debug && carp __PACKAGE__ . " received _start.\n";

    # make this session accessible to the others.
    #$kernel->alias_set( 'qa' );

    ## Input and output from the children will be line oriented
    my $helper = POE::Wheel::Run->new(
				      Program     => sub {
					  POE::QueryAgent::Helper->new($self->{dsn}, $self->{queries});
				      },
				      StdoutEvent => 'db_reply',
				      StderrEvent => 'remote_stderr',
				      ErrorEvent  => 'error',
				      Filter => POE::Filter::Line->new(),
				      StdoutFilter => POE::Filter::Line->new( Literal => CRLF),
				     )
      or carp "Can't create new Wheel::Run: $!\n";

    $self->debug && carp __PACKAGE__, " Started db helper pid ", $helper->PID, " wheel ", $helper->ID, "\n";

    $self->{helper} = $helper;

}

sub _stop {
    my ($self, $heap) = @_[OBJECT, HEAP];

    # Oracle clients don't like to TERMinate sometimes.
    $self->{helper}->kill(9);

    $self->debug && carp __PACKAGE__ . " has stopped.\n";

}

sub db_reply {
    my ($kernel, $self, $heap, $input) = @_[KERNEL, OBJECT, HEAP, ARG0];

    # Parse the "receiving state" and dispatch the input line to that state.

    my ($package, $state, $rest) = split /\|/, $input;
    my $obj;

    # $self->debug && $self->debug && carp "QA: received db_reply for $package => $state\n";

    unless (defined $rest) {
	$self->debug && carp "QA: Empty input value.\n";
    }

    if ($rest eq 'EOF') {
	$self->debug && carp "QA: Got EOF\n";
	$obj = $rest;
    } else {

	eval { $obj = thaw($rest) };

	unless (defined $obj) {
	    if ($@) {
		warn "QA: Data error: $@\n";
		open TEST, ">>/tmp/TEST.txt";
		print TEST "input: $input\n";
		print TEST "package: $package\n";
		print TEST "state: $state\n";
		print TEST "rest: $rest\n";
		print TEST "=====\n";
		close TEST;
	    } else {
		$self->debug && carp "QA: Undefined error... corrupt input line?\n"
	    }
	}
    }

    if (defined $obj) {
	if (0 && $self->debug) {
	    if ($obj eq 'EOF') {
		carp "Calling $package => $state => 'EOF'\n";
	    } else {
		carp "Calling $package => $state => \$obj\n";
		#carp Data::Dumper->Dump([$obj],[qw/$obj/]);
	    }
	}
	$kernel->call($package => $state => $obj);
    }


}

sub  remote_stderr {
    my ($self, $operation, $errnum, $errstr, $wheel_id) = @_[OBJECT, ARG0..ARG3];

    #$self->debug && carp "read from qa: $operation";

    my $error = $operation;

    $self->debug && carp "$operation: $errstr\n";

}

sub error {
    my ($self, $operation, $errnum, $errstr, $wheel_id) = @_[OBJECT, ARG0..ARG3];

    $self->debug && carp "error: Wheel $wheel_id generated $operation error $errnum: $errstr\n";

}

# }}} STATES

# }}} POE::QueryAgent

package POE::QueryAgent::Helper;

# {{{ POE::QueryAgent::Helper

use DBI;
#use Daemon; # qw//;
use Socket qw/:crlf/;
use Data::Dumper;
use Storable qw/freeze thaw/;
use URI::Escape;
use MIME::QuotedPrint;

use strict;

my $debug = 0;

sub new {

    $debug && warn "  QA: start\n";

    #$SIG{__WARN__} = *CORE::warn;
    #$SIG{__DIE__} = *CORE::die;

    my ($type, $dsn, $queries) = @_;

    my $self = bless {}, $type;
    $self->init_dbi($dsn, $queries);

    $| = 1;

    # Input record seperator uses Network Newlines (so we know what to chomp);
    $/ = CRLF;

    $self->{dbh}->{RaiseError} = 0;
    $self->{dbh}->{PrintError} = 0;

    $debug && warn "  QA: initialized\n";

    my $row;			# to hold DBI result
    while ( <STDIN> ) {
	chomp;

	$debug && warn "  QA: Got line: $_\n";

	last if /^EXIT$/;	# allow parent to tell us to exit

	my( $id, $package, $state, @rest ) = split( /\|/, $_ );

	if ($id eq 'CREATE') {
	    next;
	}

	$debug && warn "  QA: Read line: $id for $state (params @rest)\n";
	#print STDOUT "Read line: $id", CRLF;

	unless (exists $self->{$id}) {
	    $debug && warn "  QA: No such query: $id";
	    #print "ERROR|No such query: $id", CRLF;
	    next;
	}
	$debug && warn "  QA: query $id exists\n";

	my $rowcount = 0;

	unless ( $self->{$id}->execute( @rest ) ) {
	    $debug && warn "  QA: error executing query: ", $self->{$id}->errstr,"\n";

	    #print "ERROR|", $self->{$id}->errstr, "\n";
	} else {
	    $debug && warn "  QA: query running\n";

	    if ($self->{$id}{Active}) {
		while (defined ($row = $self->{$id}->fetchrow_arrayref)) {

		    $rowcount++;

		    # Serialize via Storable: We use CRLF because Storable
		    # uses \n in its streams, so we need to use \r\n for
		    # end-of-line.
		    print "$package|$state|", freeze($row), CRLF;
		    #warn "  QA: got row $rowcount: ",,"\n";

		}
	    }
	    print "$package|$state|EOF", CRLF;
	    $debug && warn "  QA: ROWS|$rowcount\n";

	}

	#$sth->finish;
    }

    $self->{dbh}->disconnect;

}


# {{{ init_dbi

sub init_dbi {
    my ($heap, $dsn, $queries) = @_;

    my $dbh = DBI->connect(@$dsn) or die DBI->errstr;
    $heap->{dbh} = $dbh;

    $dbh->{AutoCommit} = 1;
    $dbh->{RaiseError} = 0;

    #local $dbh->{RaiseError} = 1; # unless keys %hits; # There... it's FRESH

    if (defined $queries) {
	foreach (keys %$queries) {
	    $heap->{$_} = $dbh->prepare($queries->{$_}) or die $dbh->errstr;
	}

	return;
    }

}

# }}} init_dbi

# }}} POE::QueryAgent::Helper

1;



__END__

