Hi,
Currently, I am trying to implement an ithread-based DBI connection
pool for an application I am developing for a client. I have verified
that the DBD::Oracle driver I am using is ithread safe by properly sharing
the ora_dbh_share parameter(see
http://search.cpan.org/~timb/DBD-Oracle-1.15/Oracle.pm#Connect_Attributes)
within all threads that will be accessing the database.
The issues I am running into consist of two parts, but appear to be
related to a single parent problem: shared variables and blessed object
references. First, I am attempting to use Thread::Synchronized to insure
that my database pool control methods (add to, subtract from, and monitor
pool) can only be called by one thread at a time (so I don't have to worry
about sharing semaphores to do this). Secondly, in tune to the current
inherent limitations of threads::shared, I am attempting to share only
specific hashrefs and arrayrefs of my class fields that are cricital to be
accessable/modifiable by all threads. With a non-object (e.g. blessed)
hashref, I have been able to prove the concept that, given
my $hash = {1=>0}
I can set
share($hash{1});
$hash{1} = 0;
Then, within any thread, do
{
lock($hash{1});
$hash{1}++;
}
and the lock will be properly respected and the variable shared
appropriatly incremented globally.
The issue I am running into is: 'lock can only be used on shared values
at ...' at any place where I am attempting to either declare a
synchronized method (i.e. 'sub foo : synchronized method { ... }') or
lock a variable I shared earlier using 'shared($self->{foo})'. As you
will see in the code, I have followed the design assuming that share()
will clobber any pre-exisiting data in the specified data structure.
In threads::shared POD, I saw the following in the BUGS section:
"share() allows you to share $hashref->{key} without giving any error
message. But the $hashref->{key} is not shared, causing the error "locking
can only be used on shared values" to occur when you attempt to lock
$hasref->{key}." Thus, I assume I'm currently running into some variant
of this issue when trying to access fields I shared within a blessed
object.
Any insight or suggestions would be greatly appreciated. I'm pretty
sure I can fall back to flattening out the contents of this class into the
main calling program (.pl) to subvert these issue, but it would be a very
messy implementation IMO.
The code is just a work in progress, but is enough to demonstrate the
issues at hand.
##############################################################
# DBIPool.pm
##############################################################
require 5.008;
package DBIPool;
use strict;
use warnings;
use Carp;
use vars qw($debug);
use threads;
use threads::shared;
use Thread::Queue;
use Thread::Synchronized;
use DBI;
$debug = 0;
### CLASS FIELDS
my (%const, %var, %private);
%const = (
'param_default' => {
'dbi_params' => undef
,'dbi_db' => 'usadbd02'
,'dbi_user' => 'rerix_engine'
,'dbi_pass' => 'rerix_engine'
,'max_connect' => 12
,'min_connect' => 4
,'keep_alive' => 120 #seconds between keepalive signals for
min connections
,'timeout_extra' => 120 #seconds before extra connections
(beyond min)
disconnect
,'timeout' => 300 #seconds before warning given
for unresponsive
connection (not returned to pool)
}
,'param' => {
'dbi_params' => undef
,'dbi_db' => undef
,'dbi_user' => undef
,'dbi_pass' => undef
,'max_connect' => undef
,'min_connect' => undef
,'keep_alive' => 120
,'timeout_extra' => 120
,'timeout' => 300
}
,'max_connect_total' => 32 #max allowed at any time
);
%var = ();
%private = (
'dbi_str_prefix' => 'DBI:Oracle:'
,'rs_pool' => [('') x $const{max_connect_total}]
#elements:$ora_dbh_share_handle (must be initialized to empty string)
,'rs_avail' => Thread::Queue->new #elements:pool idx
,'rs_avail_num' => 0
,'rs_inuse' => [] #elements:in use start ts (epoch sec)
(idx corresponds to
pool idx)
,'rs_inuse_num' => 0
,'rs_monitor_tid' => undef
,'rs_monitor_alive' => 0
,'rs_monitor_sleep' => 2 #seconds between monitor wakeup requests
);
########################################################################
sub new (\%)
{
my $proto = shift;
my %param = @_;
### input params ###
$const{param}->{max_connect} = ($param{max_connect} =~ m/\D/ ||
$param{max_connect} == 0)?
$const{param_default}->{max_connect} : $param{max_connect} >
$const{max_connect_total} ?
$const{max_connect_total} : $param{max_connect};
$const{param}->{min_connect} = ($param{min_connect} =~ m/\D/ ||
$param{min_connect} == 0)?
$const{param_default}->{min_connect} : $param{min_connect} >
$const{param}->{max_connect} ?
$const{param}->{max_connect} : $param{min_connect};
$const{param}->{dbi_params} = (defined $param{dbi_params} && ref
$param{dbi_params} eq 'HASH') ?
$param{dbi_params} : $const{param_default}->{dbi_params};
undef $const{param}->{dbi_params}->{ora_dbh_share}; #explicitly undefine
ora_dbh_share so DBD::Oracle isn't confused
$const{param}->{dbi_db} = defined $param{dbi_db} ? $param{dbi_db} :
$const{param_default}->{dbi_db};
$const{param}->{dbi_user} = defined $param{dbi_user} ? $param{dbi_user} :
$const{param_default}->{dbi_user};
$const{param}->{dbi_pass} = defined $param{dbi_pass} ? $param{dbi_pass} :
$const{param_default}->{dbi_pass};
### create object ###
my $class = ref($proto) || $proto;
my $self = { #fields
%const
,%var
,%private
,'_final' => \%const #constant class fields
,'_permitted' => \%var #public class variables
,'_private' => \%private #private class variables
};
bless ($self, $class);
### declare shared variables ###
share($self->{rs_pool}); $self->{rs_pool} =
$private{rs_pool}; #clumsy but
necessary due to threads::shared limitations
share($self->{rs_avail}); $self->{rs_avail} =
$private{rs_avail};
share($self->{rs_avail_num}); $self->{rs_avail_num} =
$private{rs_avail_num};
share($self->{rs_inuse}); $self->{rs_inuse} =
$private{rs_inuse};
share($self->{rs_inuse_num}); $self->{rs_inuse_num} =
$private{rs_inuse_num};
share($self->{rs_monitor_tid}); $self->{rs_monitor_tid} =
$private{rs_monitor_tid};
share($self->{rs_monitor_alive}); $self->{rs_monitor_alive} =
$private{rs_monitor_alive};
### create initial pool ###
my $connect_success = $self->_inc_pool() x
$self->{param}->{min_connect}; #Database.pm reports errors
if ($connect_success =~ m/0/)
{
my $monitor_thr = threads->new($self->_pool_monitor());
$self->{rs_monitor_tid} = $monitor_thr->tid;
}
return $connect_success =~ m/0/o ? $self : undef;
}
sub DESTROY
{
my $self = shift;
$self->{rs_monitor_alive} = 0; #terminate resource monitor (terminates once
it wakes from sleep)
my $rs_monitor = threads->object($self->{rs_monitor_tid});
$rs_monitor->detach;
while (@{$self->{rs_pool}})
{
$self->_dec_pool();
}
}
########################################################################
sub _get_dbh ($)
{
my $self = shift;
my $ora_dbh_share = $_[0];
return undef;
}
sub _pool_monitor () : synchronized method {
my $self = shift;
warn "***In pool Monitor\n" if $debug;
unless ($self->{rs_monitor_alive})
{
$self->{rs_monitor_alive} = 1;
while ($self->{rs_monitor_alive})
{
warn "***Pool monitor awake...\n" if $debug;
### send keepalive ping to each connection ###
for (my $i = 0; $i < $self->{rs_avail_num}; $i++)
{
if (defined $self->{rs_pool}->[$i])
{
# my $dbh = _get_dbh($self->{rs_pool}->[$i]);
}
}
### check for extra sleeping connections that should be closed
###
### warn on connections that have been in use too long (caller
thread
may have died?) --debugging ##
warn "***Pool monitor going to sleep...\n" if $debug;
sleep $self->{rs_monitor_sleep};
}
}
return $self->{pool_monitor};
}
sub _inc_pool : synchronized method {
my $self = shift;
lock($self->{db_pool});
lock($self->{db_pool_avail});
lock($self->{db_pool_inuse});
if ($self->{db_pool_avail}->pending >
$self->{param}->{max_connect}) #debugging
{
die "thread issue: total connections
".$self->{db_pool_avail}->pending."
> max allowed ".$self->{param}->{max_connect};
}
unless ($self->{db_pool_avail}->pending == $self->{param}->{max_connect})
{
my %dbi_params = (ora_dbh_share => \$self->{rs_pool});
%dbi_params = defined $self->{dbi_params} ? (%dbi_params,
%{$self->{dbi_params}}) : ();
# my $db = DBI->new(dbi_params=>{%{$self->{dbi_params}}});
}
return 1;
}
sub _dec_pool () : synchronized method {}
sub select {
}
sub insert {
}
sub update {
}
sub query {
}
sub procedure {
}
1;
Thanks,
Eric