On Mon, Apr 27, 2009 at 1:32 PM, David Davis <david.da...@gmail.com> wrote:
I'd like to take a look at PoCo-EasyDBI-Multiplex.
Btw, I'm the EasyDBI author, so you can direct any questions about it to the
list.
David Davis
Cool. Here it is.
$ cat POE-Component-EasyDBI-Multiplex/lib/POE/Component/EasyDBI/Multiplex.pm
package POE::Component::EasyDBI::Multiplex;
use strict;
use constant DEBUG => $ENV{DEBUG_EASYDBI_POOL};
use constant SQL_QUERY_TIMES_LOGFILE => $ENV{SQL_QUERY_TIMES_LOGFILE};
use constant SQL_QUEUE_SIZE_LOGFILE => $ENV{SQL_QUEUE_SIZE_LOGFILE};
use constant DEFAULT_POOL_SIZE => 1;
use constant GET_QUEUE_SIZE_TIMEOUT => 60;
use POE::Kernel;
use POE::Session;
use POE::Component::EasyDBI;
use Time::HiRes ();
use FileHandle;
my $query_time_log_fh;
my $sql_queue_size_log_fh;
my $previous_sql_req_queue_size = 0;
my $previous_dbi_writers_available = 0;
END {
if ( SQL_QUERY_TIMES_LOGFILE ) {
close $query_time_log_fh
if $query_time_log_fh;
}
if ( SQL_QUEUE_SIZE_LOGFILE ) {
close $sql_queue_size_log_fh
if $sql_queue_size_log_fh;
}
}
sub spawn {
my ($package, %dbi_params) = @_;
# open file here, so the file handle isn't closed when spawning occurs
_open_query_time_log_fh()
if SQL_QUERY_TIMES_LOGFILE;
_open_sql_queue_size_log_fh()
if SQL_QUEUE_SIZE_LOGFILE;
POE::Session->create(
inline_states => {
_start => \&_start,
queue_sql_req => \&_queue_sql_req,
do => \&_queue_sql_req,
arrayhash => \&_queue_sql_req,
single => \&_queue_sql_req,
hash => \&_queue_sql_req,
process_sql_req_queue => \&_process_sql_req_queue,
db_response => \&_db_response,
sql_req_queue_size => sub { scalar @{
$_[HEAP]->{sql_req_queue} } },
dbi_writers_available => sub { scalar @{
$_[HEAP]->{dbi_writers} } },
timeout => \&_timeout,
shutdown => \&_shutdown,
},
heap => {
dbi_params => \%dbi_params,
poolsize => delete $dbi_params{poolsize} || DEFAULT_POOL_SIZE,
dbi_writers => [],
sql_req_queue => [],
},
options => {
trace => DEBUG,
},
);
}
sub _open_query_time_log_fh {
if ( SQL_QUERY_TIMES_LOGFILE ) {
$query_time_log_fh = new FileHandle;
open $query_time_log_fh, ">>".SQL_QUERY_TIMES_LOGFILE
or die "Cannot write to sql query time log file :
".SQL_QUERY_TIMES_LOGFILE;
}
}
sub _open_sql_queue_size_log_fh {
if ( SQL_QUEUE_SIZE_LOGFILE ) {
$sql_queue_size_log_fh = new FileHandle;
open $sql_queue_size_log_fh, ">>".SQL_QUEUE_SIZE_LOGFILE
or die "Cannot write to sql queue size log file :
".SQL_QUEUE_SIZE_LOGFILE;
$sql_queue_size_log_fh->autoflush();
}
}
sub _start {
my $heap = $_[HEAP];
# Set an alias for the current session
$_[KERNEL]->alias_set( delete $heap->{dbi_params}->{alias} )
if $heap->{dbi_params}->{alias};
# create a Pool consisting of a configurable number of DBIWriter's
for ( 1 .. $heap->{poolsize} ) {
push @{ $heap->{dbi_writers} },
POE::Component::EasyDBI->spawn(
%{ $heap->{dbi_params} },
alias => 'not_used',
)->ID;
}
if (SQL_QUEUE_SIZE_LOGFILE && $sql_queue_size_log_fh) {
POE::Kernel->delay('timeout', GET_QUEUE_SIZE_TIMEOUT);
}
}
sub _shutdown {
my $heap = $_[HEAP];
while ( my $dbi_writer = pop @{ $heap->{dbi_writers} } ) {
POE::Kernel->call( $dbi_writer, "shutdown" );
}
}
sub _queue_sql_req {
my ($heap, $sql_req) = @_[HEAP,ARG0];
$sql_req->{_return_session_id} = $_[SENDER]->ID;
$sql_req->{_return_event} = delete $sql_req->{event},
$sql_req->{_state} = $_[STATE];
$sql_req->{_sql_start_time} = Time::HiRes::time();
push @{ $_[HEAP]->{sql_req_queue} }, $sql_req;
$_[KERNEL]->yield( "process_sql_req_queue" );
}
sub _process_sql_req_queue {
my $heap = $_[HEAP];
# try to get a dbi writer and insert a logline into the db
if ( @{ $heap->{sql_req_queue} }
&& ( my $dbi_session_id = pop @{ $heap->{dbi_writers} } ) ) {
# using LIFO stack rather than FIFO queue
#my $sql_req = shift @{ $heap->{sql_req_queue} };
my $sql_req = pop @{ $heap->{sql_req_queue} };
POE::Kernel->call(
$dbi_session_id,
$sql_req->{_state} => {
%$sql_req,
event => 'db_response',
},
)
}
}
sub _db_response {
my ($kernel,$heap,$result) = @_[KERNEL,HEAP,ARG0];
# mark the writer as free now the result has been returned
unshift @{ $heap->{dbi_writers} }, $_[SENDER]->ID;
$result->{sql_query_duration} = Time::HiRes::time() - delete (
$result->{_sql_start_time} );
if ( SQL_QUERY_TIMES_LOGFILE && $query_time_log_fh ) {
print $query_time_log_fh
sprintf ("[%s] (%2.4fs) %s (%s)\n",
"".localtime(time),
$result->{sql_query_duration} ,
$result->{sql},
join(", ", map { defined($_) ? $_ : 'NULL' }
$result->{placeholders} ? @{ $result->{placeholders} } : () )
);
}
$_[KERNEL]->post(
delete $result->{_return_session_id},
delete $result->{_return_event},
$result,
);
# If the queue is not empty write to the database
$kernel->yield( "process_sql_req_queue" )
if @{ $heap->{sql_req_queue} };
return;
}
sub _timeout {
my ($kernel,$session) = @_[KERNEL,SESSION];
if ( SQL_QUEUE_SIZE_LOGFILE && $sql_queue_size_log_fh ) {
my $sql_req_queue_size = $kernel->call($session,'sql_req_queue_size');
my $dbi_writers_available =
$kernel->call($session,'dbi_writers_available');
$sql_req_queue_size = 0 unless defined $sql_req_queue_size;
$dbi_writers_available= 0 unless defined $dbi_writers_available;
print $sql_queue_size_log_fh
sprintf ("[%s] SQL Queue Size: %s (Change: %s) DBI
Writers Available: %s (Change: %s)\n",
"".localtime(time),
$sql_req_queue_size,
$sql_req_queue_size - $previous_sql_req_queue_size,
$dbi_writers_available,
$dbi_writers_available - $previous_dbi_writers_available
);
$previous_sql_req_queue_size = $sql_req_queue_size;
$previous_dbi_writers_available = $dbi_writers_available;
}
POE::Kernel->delay('timeout', GET_QUEUE_SIZE_TIMEOUT);
}
1;