On Mon, Apr 27, 2009 at 1:32 PM, David Davis <david.da...@gmail.com> wrote:
> I'd like to take a look at PoCo-EasyDBI-Multiplex.
> Btw, I'm the EasyDBI author, so you can direct any questions about it to the
> list.
>
> David Davis

Cool. Here it is.


$ cat POE-Component-EasyDBI-Multiplex/lib/POE/Component/EasyDBI/Multiplex.pm

package POE::Component::EasyDBI::Multiplex;

use strict;

use constant DEBUG => $ENV{DEBUG_EASYDBI_POOL};
use constant SQL_QUERY_TIMES_LOGFILE => $ENV{SQL_QUERY_TIMES_LOGFILE};
use constant SQL_QUEUE_SIZE_LOGFILE  => $ENV{SQL_QUEUE_SIZE_LOGFILE};

use constant DEFAULT_POOL_SIZE => 1;
use constant GET_QUEUE_SIZE_TIMEOUT => 60;

use POE::Kernel;
use POE::Session;
use POE::Component::EasyDBI;
use Time::HiRes ();
use FileHandle;

my $query_time_log_fh;
my $sql_queue_size_log_fh;

my $previous_sql_req_queue_size = 0;
my $previous_dbi_writers_available = 0;

END {
    if ( SQL_QUERY_TIMES_LOGFILE ) {
        close $query_time_log_fh
            if $query_time_log_fh;
    }

    if ( SQL_QUEUE_SIZE_LOGFILE ) {
        close $sql_queue_size_log_fh
            if $sql_queue_size_log_fh;
    }
}

sub spawn {

    my ($package, %dbi_params) = @_;

    # open file here, so the file handle isn't closed when spawning occurs
    _open_query_time_log_fh()
        if SQL_QUERY_TIMES_LOGFILE;

    _open_sql_queue_size_log_fh()
        if SQL_QUEUE_SIZE_LOGFILE;

    POE::Session->create(
        inline_states => {
            _start                => \&_start,
            queue_sql_req         => \&_queue_sql_req,
            do                    => \&_queue_sql_req,
            arrayhash             => \&_queue_sql_req,
            single                => \&_queue_sql_req,
            hash                  => \&_queue_sql_req,
            process_sql_req_queue => \&_process_sql_req_queue,
            db_response           => \&_db_response,
            sql_req_queue_size    => sub { scalar @{
$_[HEAP]->{sql_req_queue} } },
            dbi_writers_available => sub { scalar @{
$_[HEAP]->{dbi_writers} } },
            timeout               => \&_timeout,
            shutdown              => \&_shutdown,
        },
        heap => {
            dbi_params     => \%dbi_params,
            poolsize       => delete $dbi_params{poolsize} || DEFAULT_POOL_SIZE,

            dbi_writers    => [],
            sql_req_queue  => [],
        },
        options => {
            trace => DEBUG,
        },
    );
}

sub _open_query_time_log_fh {

    if ( SQL_QUERY_TIMES_LOGFILE ) {
        $query_time_log_fh = new FileHandle;
        open $query_time_log_fh, ">>".SQL_QUERY_TIMES_LOGFILE
            or die "Cannot write to sql query time log file :
".SQL_QUERY_TIMES_LOGFILE;
    }
}

sub _open_sql_queue_size_log_fh {

    if ( SQL_QUEUE_SIZE_LOGFILE ) {
        $sql_queue_size_log_fh = new FileHandle;
        open $sql_queue_size_log_fh, ">>".SQL_QUEUE_SIZE_LOGFILE
            or die "Cannot write to sql queue size log file :
".SQL_QUEUE_SIZE_LOGFILE;

        $sql_queue_size_log_fh->autoflush();
    }
}

sub _start {

    my $heap = $_[HEAP];

    # Set an alias for the current session
    $_[KERNEL]->alias_set( delete $heap->{dbi_params}->{alias} )
        if $heap->{dbi_params}->{alias};

    # create a Pool consisting of a configurable number of DBIWriter's
    for ( 1 .. $heap->{poolsize} ) {

        push @{ $heap->{dbi_writers} },
            POE::Component::EasyDBI->spawn(
                %{ $heap->{dbi_params} },
                alias => 'not_used',
            )->ID;
    }

    if (SQL_QUEUE_SIZE_LOGFILE && $sql_queue_size_log_fh) {
        POE::Kernel->delay('timeout', GET_QUEUE_SIZE_TIMEOUT);
    }
}

sub _shutdown {
    my $heap = $_[HEAP];

    while ( my $dbi_writer = pop @{ $heap->{dbi_writers} } ) {

        POE::Kernel->call( $dbi_writer, "shutdown" );
    }
}

sub _queue_sql_req {

    my ($heap, $sql_req) = @_[HEAP,ARG0];

    $sql_req->{_return_session_id} = $_[SENDER]->ID;
    $sql_req->{_return_event}      = delete $sql_req->{event},
    $sql_req->{_state}             = $_[STATE];
    $sql_req->{_sql_start_time}    = Time::HiRes::time();

    push @{ $_[HEAP]->{sql_req_queue} }, $sql_req;

    $_[KERNEL]->yield( "process_sql_req_queue" );
}

sub _process_sql_req_queue {

    my $heap = $_[HEAP];

    # try to get a dbi writer and insert a logline into the db
    if ( @{ $heap->{sql_req_queue} }
        && ( my $dbi_session_id = pop @{ $heap->{dbi_writers} } ) ) {

        # using LIFO stack rather than FIFO queue
        #my $sql_req = shift @{ $heap->{sql_req_queue} };
        my $sql_req = pop @{ $heap->{sql_req_queue} };

        POE::Kernel->call(
            $dbi_session_id,
            $sql_req->{_state} => {
                %$sql_req,
                event        => 'db_response',
            },
        )
    }
}

sub _db_response {

    my ($kernel,$heap,$result) = @_[KERNEL,HEAP,ARG0];

    # mark the writer as free now the result has been returned
    unshift @{ $heap->{dbi_writers} }, $_[SENDER]->ID;

    $result->{sql_query_duration} = Time::HiRes::time() - delete (
$result->{_sql_start_time} );

    if ( SQL_QUERY_TIMES_LOGFILE && $query_time_log_fh ) {

        print $query_time_log_fh
            sprintf ("[%s] (%2.4fs) %s (%s)\n",
                "".localtime(time),
                $result->{sql_query_duration} ,
                $result->{sql},
                join(", ", map { defined($_) ? $_ : 'NULL' }
$result->{placeholders} ? @{ $result->{placeholders} } : () )
            );
    }

    $_[KERNEL]->post(
        delete $result->{_return_session_id},
        delete $result->{_return_event},
        $result,
    );

    # If the queue is not empty write to the database
    $kernel->yield( "process_sql_req_queue" )
        if @{ $heap->{sql_req_queue} };

    return;
}

sub _timeout {

    my ($kernel,$session) = @_[KERNEL,SESSION];

    if ( SQL_QUEUE_SIZE_LOGFILE && $sql_queue_size_log_fh ) {

        my $sql_req_queue_size = $kernel->call($session,'sql_req_queue_size');
        my $dbi_writers_available =
$kernel->call($session,'dbi_writers_available');

        $sql_req_queue_size = 0 unless defined $sql_req_queue_size;
        $dbi_writers_available= 0 unless defined $dbi_writers_available;

        print $sql_queue_size_log_fh
            sprintf ("[%s] SQL Queue Size: %s (Change: %s)   DBI
Writers Available: %s (Change: %s)\n",
                "".localtime(time),
                $sql_req_queue_size,
                $sql_req_queue_size - $previous_sql_req_queue_size,
                $dbi_writers_available,
                $dbi_writers_available - $previous_dbi_writers_available
             );

         $previous_sql_req_queue_size = $sql_req_queue_size;
         $previous_dbi_writers_available = $dbi_writers_available;
     }

    POE::Kernel->delay('timeout', GET_QUEUE_SIZE_TIMEOUT);
}

1;

Reply via email to