On Mon, Apr 27, 2009 at 1:32 PM, David Davis <david.da...@gmail.com> wrote: > I'd like to take a look at PoCo-EasyDBI-Multiplex. > Btw, I'm the EasyDBI author, so you can direct any questions about it to the > list. > > David Davis
Cool. Here it is. $ cat POE-Component-EasyDBI-Multiplex/lib/POE/Component/EasyDBI/Multiplex.pm package POE::Component::EasyDBI::Multiplex; use strict; use constant DEBUG => $ENV{DEBUG_EASYDBI_POOL}; use constant SQL_QUERY_TIMES_LOGFILE => $ENV{SQL_QUERY_TIMES_LOGFILE}; use constant SQL_QUEUE_SIZE_LOGFILE => $ENV{SQL_QUEUE_SIZE_LOGFILE}; use constant DEFAULT_POOL_SIZE => 1; use constant GET_QUEUE_SIZE_TIMEOUT => 60; use POE::Kernel; use POE::Session; use POE::Component::EasyDBI; use Time::HiRes (); use FileHandle; my $query_time_log_fh; my $sql_queue_size_log_fh; my $previous_sql_req_queue_size = 0; my $previous_dbi_writers_available = 0; END { if ( SQL_QUERY_TIMES_LOGFILE ) { close $query_time_log_fh if $query_time_log_fh; } if ( SQL_QUEUE_SIZE_LOGFILE ) { close $sql_queue_size_log_fh if $sql_queue_size_log_fh; } } sub spawn { my ($package, %dbi_params) = @_; # open file here, so the file handle isn't closed when spawning occurs _open_query_time_log_fh() if SQL_QUERY_TIMES_LOGFILE; _open_sql_queue_size_log_fh() if SQL_QUEUE_SIZE_LOGFILE; POE::Session->create( inline_states => { _start => \&_start, queue_sql_req => \&_queue_sql_req, do => \&_queue_sql_req, arrayhash => \&_queue_sql_req, single => \&_queue_sql_req, hash => \&_queue_sql_req, process_sql_req_queue => \&_process_sql_req_queue, db_response => \&_db_response, sql_req_queue_size => sub { scalar @{ $_[HEAP]->{sql_req_queue} } }, dbi_writers_available => sub { scalar @{ $_[HEAP]->{dbi_writers} } }, timeout => \&_timeout, shutdown => \&_shutdown, }, heap => { dbi_params => \%dbi_params, poolsize => delete $dbi_params{poolsize} || DEFAULT_POOL_SIZE, dbi_writers => [], sql_req_queue => [], }, options => { trace => DEBUG, }, ); } sub _open_query_time_log_fh { if ( SQL_QUERY_TIMES_LOGFILE ) { $query_time_log_fh = new FileHandle; open $query_time_log_fh, ">>".SQL_QUERY_TIMES_LOGFILE or die "Cannot write to sql query time log file : ".SQL_QUERY_TIMES_LOGFILE; } } sub _open_sql_queue_size_log_fh { if ( SQL_QUEUE_SIZE_LOGFILE ) { $sql_queue_size_log_fh = new FileHandle; open $sql_queue_size_log_fh, ">>".SQL_QUEUE_SIZE_LOGFILE or die "Cannot write to sql queue size log file : ".SQL_QUEUE_SIZE_LOGFILE; $sql_queue_size_log_fh->autoflush(); } } sub _start { my $heap = $_[HEAP]; # Set an alias for the current session $_[KERNEL]->alias_set( delete $heap->{dbi_params}->{alias} ) if $heap->{dbi_params}->{alias}; # create a Pool consisting of a configurable number of DBIWriter's for ( 1 .. $heap->{poolsize} ) { push @{ $heap->{dbi_writers} }, POE::Component::EasyDBI->spawn( %{ $heap->{dbi_params} }, alias => 'not_used', )->ID; } if (SQL_QUEUE_SIZE_LOGFILE && $sql_queue_size_log_fh) { POE::Kernel->delay('timeout', GET_QUEUE_SIZE_TIMEOUT); } } sub _shutdown { my $heap = $_[HEAP]; while ( my $dbi_writer = pop @{ $heap->{dbi_writers} } ) { POE::Kernel->call( $dbi_writer, "shutdown" ); } } sub _queue_sql_req { my ($heap, $sql_req) = @_[HEAP,ARG0]; $sql_req->{_return_session_id} = $_[SENDER]->ID; $sql_req->{_return_event} = delete $sql_req->{event}, $sql_req->{_state} = $_[STATE]; $sql_req->{_sql_start_time} = Time::HiRes::time(); push @{ $_[HEAP]->{sql_req_queue} }, $sql_req; $_[KERNEL]->yield( "process_sql_req_queue" ); } sub _process_sql_req_queue { my $heap = $_[HEAP]; # try to get a dbi writer and insert a logline into the db if ( @{ $heap->{sql_req_queue} } && ( my $dbi_session_id = pop @{ $heap->{dbi_writers} } ) ) { # using LIFO stack rather than FIFO queue #my $sql_req = shift @{ $heap->{sql_req_queue} }; my $sql_req = pop @{ $heap->{sql_req_queue} }; POE::Kernel->call( $dbi_session_id, $sql_req->{_state} => { %$sql_req, event => 'db_response', }, ) } } sub _db_response { my ($kernel,$heap,$result) = @_[KERNEL,HEAP,ARG0]; # mark the writer as free now the result has been returned unshift @{ $heap->{dbi_writers} }, $_[SENDER]->ID; $result->{sql_query_duration} = Time::HiRes::time() - delete ( $result->{_sql_start_time} ); if ( SQL_QUERY_TIMES_LOGFILE && $query_time_log_fh ) { print $query_time_log_fh sprintf ("[%s] (%2.4fs) %s (%s)\n", "".localtime(time), $result->{sql_query_duration} , $result->{sql}, join(", ", map { defined($_) ? $_ : 'NULL' } $result->{placeholders} ? @{ $result->{placeholders} } : () ) ); } $_[KERNEL]->post( delete $result->{_return_session_id}, delete $result->{_return_event}, $result, ); # If the queue is not empty write to the database $kernel->yield( "process_sql_req_queue" ) if @{ $heap->{sql_req_queue} }; return; } sub _timeout { my ($kernel,$session) = @_[KERNEL,SESSION]; if ( SQL_QUEUE_SIZE_LOGFILE && $sql_queue_size_log_fh ) { my $sql_req_queue_size = $kernel->call($session,'sql_req_queue_size'); my $dbi_writers_available = $kernel->call($session,'dbi_writers_available'); $sql_req_queue_size = 0 unless defined $sql_req_queue_size; $dbi_writers_available= 0 unless defined $dbi_writers_available; print $sql_queue_size_log_fh sprintf ("[%s] SQL Queue Size: %s (Change: %s) DBI Writers Available: %s (Change: %s)\n", "".localtime(time), $sql_req_queue_size, $sql_req_queue_size - $previous_sql_req_queue_size, $dbi_writers_available, $dbi_writers_available - $previous_dbi_writers_available ); $previous_sql_req_queue_size = $sql_req_queue_size; $previous_dbi_writers_available = $dbi_writers_available; } POE::Kernel->delay('timeout', GET_QUEUE_SIZE_TIMEOUT); } 1;