Hi,

   Currently, I am trying to implement an ithread-based DBI connection
pool for an application I am developing for a client.  I have verified
that the DBD::Oracle driver I am using is ithread safe by properly sharing
the ora_dbh_share parameter(see
http://search.cpan.org/~timb/DBD-Oracle-1.15/Oracle.pm#Connect_Attributes)
within all threads that will be accessing the database.

   The issues I am running into consist of two parts, but appear to be
related to a single parent problem: shared variables and blessed object
references.  First, I am attempting to use Thread::Synchronized to insure
that my database pool control methods (add to, subtract from, and monitor
pool) can only be called by one thread at a time (so I don't have to worry
about sharing semaphores to do this).  Secondly, in tune to the current
inherent limitations of threads::shared, I am attempting to share only
specific hashrefs and arrayrefs of my class fields that are cricital to be
accessable/modifiable by all threads.  With a non-object (e.g. blessed)
hashref, I have been able to prove the concept that, given 
     my $hash = {1=>0}
I can set
     share($hash{1});
     $hash{1} = 0;
Then, within any thread, do
     {
     lock($hash{1});
     $hash{1}++;
     }
and the lock will be properly respected and the variable shared
appropriatly incremented globally.

   The issue I am running into is: 'lock can only be used on shared values
at ...' at any place where I am attempting to either declare a
synchronized method (i.e. 'sub foo : synchronized method  { ... }') or
lock a variable I shared earlier using 'shared($self->{foo})'.  As you
will see in the code, I have followed the design assuming that share()
will clobber any pre-exisiting data in the specified data structure.

   In threads::shared POD, I saw the following in the BUGS section:
"share() allows you to share $hashref->{key} without giving any error
message. But the $hashref->{key} is not shared, causing the error "locking
can only be used on shared values" to occur when you attempt to lock
$hasref->{key}."  Thus, I assume I'm currently running into some variant
of this issue when trying to access fields I shared within a blessed
object.

   Any insight or suggestions would be greatly appreciated.  I'm pretty
sure I can fall back to flattening out the contents of this class into the
main calling program (.pl) to subvert these issue, but it would be a very
messy implementation IMO.

   The code is just a work in progress, but is enough to demonstrate the
issues at hand.


##############################################################
# DBIPool.pm
##############################################################

require 5.008;
package DBIPool;
use strict;
use warnings;
use Carp;
use vars qw($debug);
use threads;
use threads::shared;
use Thread::Queue;
use Thread::Synchronized;
use DBI;
$debug = 0;

### CLASS FIELDS
my (%const, %var, %private);
%const = (
        'param_default' => {
                'dbi_params'            => undef
                ,'dbi_db'                       => 'usadbd02'
                ,'dbi_user'                     => 'rerix_engine'
                ,'dbi_pass'                     => 'rerix_engine'
                ,'max_connect'          => 12
                ,'min_connect'          => 4
                ,'keep_alive'           => 120  #seconds between keepalive signals for 
min connections
                ,'timeout_extra'        => 120  #seconds before extra connections 
(beyond min)
disconnect
                ,'timeout'                      => 300  #seconds before warning given 
for unresponsive
connection (not returned to pool)
        }
        ,'param' => {
                'dbi_params'            => undef
                ,'dbi_db'                       => undef
                ,'dbi_user'                     => undef
                ,'dbi_pass'                     => undef
                ,'max_connect'          => undef
                ,'min_connect'          => undef
                ,'keep_alive'           => 120
                ,'timeout_extra'        => 120
                ,'timeout'                      => 300
        }
        ,'max_connect_total'    => 32   #max allowed at any time
);
%var = ();
%private = (
        'dbi_str_prefix'        => 'DBI:Oracle:'
        ,'rs_pool'                      => [('') x $const{max_connect_total}]
        #elements:$ora_dbh_share_handle (must be initialized to empty string)
        ,'rs_avail'                     => Thread::Queue->new   #elements:pool idx
        ,'rs_avail_num'         => 0
        ,'rs_inuse'                     => []   #elements:in use start ts (epoch sec) 
(idx corresponds to
pool idx)
        ,'rs_inuse_num'         => 0
        ,'rs_monitor_tid'       => undef
        ,'rs_monitor_alive'     => 0
        ,'rs_monitor_sleep'     => 2    #seconds between monitor wakeup requests
);

########################################################################
sub new (\%)
{
        my $proto = shift;
        my %param = @_;
        
        ### input params ###
        $const{param}->{max_connect} = ($param{max_connect} =~ m/\D/ ||
$param{max_connect} == 0)? 
                $const{param_default}->{max_connect} : $param{max_connect} >
$const{max_connect_total} ? 
                        $const{max_connect_total} : $param{max_connect};
        $const{param}->{min_connect} = ($param{min_connect} =~ m/\D/ ||
$param{min_connect} == 0)? 
                $const{param_default}->{min_connect} : $param{min_connect} >
$const{param}->{max_connect} ? 
                        $const{param}->{max_connect} : $param{min_connect};
        $const{param}->{dbi_params} = (defined $param{dbi_params} && ref
$param{dbi_params} eq 'HASH') ?
                $param{dbi_params} : $const{param_default}->{dbi_params};
        undef $const{param}->{dbi_params}->{ora_dbh_share};     #explicitly undefine
ora_dbh_share so DBD::Oracle isn't confused
        $const{param}->{dbi_db} = defined $param{dbi_db} ? $param{dbi_db} :
$const{param_default}->{dbi_db};
        $const{param}->{dbi_user} = defined $param{dbi_user} ? $param{dbi_user} :
$const{param_default}->{dbi_user};
        $const{param}->{dbi_pass} = defined $param{dbi_pass} ? $param{dbi_pass} :
$const{param_default}->{dbi_pass};
                
        ### create object ###
        my $class = ref($proto) || $proto;
        my $self = {    #fields
                %const
                ,%var
                ,%private
                ,'_final'               => \%const              #constant class fields
                ,'_permitted'   => \%var                #public class variables
                ,'_private'             => \%private    #private class variables
        };
        
        bless ($self, $class);

        ### declare shared variables ###
        share($self->{rs_pool});                        $self->{rs_pool} = 
$private{rs_pool};   #clumsy but
necessary due to threads::shared limitations
        share($self->{rs_avail});                       $self->{rs_avail} = 
$private{rs_avail};
        share($self->{rs_avail_num});           $self->{rs_avail_num} = 
$private{rs_avail_num};
        share($self->{rs_inuse});                       $self->{rs_inuse} = 
$private{rs_inuse};
        share($self->{rs_inuse_num});           $self->{rs_inuse_num} = 
$private{rs_inuse_num};
        share($self->{rs_monitor_tid});         $self->{rs_monitor_tid} =
$private{rs_monitor_tid};
        share($self->{rs_monitor_alive});       $self->{rs_monitor_alive} =
$private{rs_monitor_alive};

        ### create initial pool ###
        my $connect_success = $self->_inc_pool() x
$self->{param}->{min_connect};  #Database.pm reports errors
        if ($connect_success =~ m/0/)
        {
                my $monitor_thr = threads->new($self->_pool_monitor());
                $self->{rs_monitor_tid} = $monitor_thr->tid;
        }

        return $connect_success =~ m/0/o ? $self : undef;
}

sub DESTROY
{
        my $self = shift;
        $self->{rs_monitor_alive} = 0;  #terminate resource monitor (terminates once
it wakes from sleep)
        my $rs_monitor = threads->object($self->{rs_monitor_tid});
        $rs_monitor->detach;
        while (@{$self->{rs_pool}})
        {
                $self->_dec_pool();
        }
}
########################################################################
sub _get_dbh ($)
{
        my $self = shift;
        my $ora_dbh_share = $_[0];

        return undef;
}

sub _pool_monitor () : synchronized method {
        my $self = shift;
        warn "***In pool Monitor\n" if $debug;
        unless ($self->{rs_monitor_alive})
        {
                $self->{rs_monitor_alive} = 1;
                while ($self->{rs_monitor_alive})
                {
                        warn "***Pool monitor awake...\n" if $debug;
                        ### send keepalive ping to each connection ###
                        for (my $i = 0; $i < $self->{rs_avail_num}; $i++)
                        {
                                if (defined $self->{rs_pool}->[$i])
                                {
#                                       my $dbh = _get_dbh($self->{rs_pool}->[$i]);
                                }
                        }
                        
                        ### check for extra sleeping connections that should be closed 
###
                        
                        ### warn on connections that have been in use too long (caller 
thread
may have died?) --debugging ##
                        
                        warn "***Pool monitor going to sleep...\n" if $debug;
                        sleep $self->{rs_monitor_sleep};
                }
        }
        
        return $self->{pool_monitor};
}

sub _inc_pool : synchronized method {
        my $self = shift;
        lock($self->{db_pool});
        lock($self->{db_pool_avail});
        lock($self->{db_pool_inuse});
        if ($self->{db_pool_avail}->pending >
$self->{param}->{max_connect})  #debugging
        {
                die "thread issue: total connections 
".$self->{db_pool_avail}->pending."
> max allowed ".$self->{param}->{max_connect};
        }
        unless ($self->{db_pool_avail}->pending == $self->{param}->{max_connect})
        {
                my %dbi_params = (ora_dbh_share => \$self->{rs_pool});
                %dbi_params = defined $self->{dbi_params} ? (%dbi_params,
%{$self->{dbi_params}}) : ();
#               my $db = DBI->new(dbi_params=>{%{$self->{dbi_params}}});

        }
        return 1;
}

sub _dec_pool () : synchronized method {}

sub select {

}

sub insert {

}

sub update {

}

sub query {

}

sub procedure {

}

1;


Thanks,
Eric


Reply via email to