To answer my own question, and to provide some more information...
1. SOLUTION: My implementation of the shared variables in the object
fields does work. I was just calling lock() on an incorrect (undefined)
hash reference...DOH!
2. PROBLEM: Threads::Synchronized is still reporting lock issues when I
try to call a method that has 'synchronized method' specified in it's
definition. Any thoughts on this?
3. COMMENT: I discovered how to specify object methods in thread->new()
calls. Given a blessed object $obj with method 'foo' defined in the
object's package 'My::Pkg', you can start a thread with:
my $thr = thread->new(\&My::Pkg::foo, $obj);
Using this form, it is possible to specify 'foo : synchronized { ... }' in
the definition of foo, but 'synchronized method' still fails with the
usual 'thread failed to start: lock can only be used on shared values at
..' error. Is there another way I can create new threads on an object
that both start a proper thread AND use the synchronized method? For now,
it looks like I'll have to use Thread::Semaphore to emulate this behavior.
-Eric
Perldiscuss - Perl Newsgroups And Mailing Lists wrote:
> Hi,
> Currently, I am trying to implement an ithread-based DBI connection
> pool for an application I am developing for a client. I have verified
> that the DBD::Oracle driver I am using is ithread safe by properly sharing
> the ora_dbh_share parameter(see
> http://search.cpan.org/~timb/DBD-Oracle-1.15/Oracle.pm#Connect_Attributes)
> within all threads that will be accessing the database.
> The issues I am running into consist of two parts, but appear to be
> related to a single parent problem: shared variables and blessed object
> references. First, I am attempting to use Thread::Synchronized to insure
> that my database pool control methods (add to, subtract from, and monitor
> pool) can only be called by one thread at a time (so I don't have to worry
> about sharing semaphores to do this). Secondly, in tune to the current
> inherent limitations of threads::shared, I am attempting to share only
> specific hashrefs and arrayrefs of my class fields that are cricital to be
> accessable/modifiable by all threads. With a non-object (e.g. blessed)
> hashref, I have been able to prove the concept that, given
> my $hash = {1=>0}
> I can set
> share($hash{1});
> $hash{1} = 0;
> Then, within any thread, do
> {
> lock($hash{1});
> $hash{1}++;
> }
> and the lock will be properly respected and the variable shared
> appropriatly incremented globally.
> The issue I am running into is: 'lock can only be used on shared values
> at ...' at any place where I am attempting to either declare a
> synchronized method (i.e. 'sub foo : synchronized method { ... }') or
> lock a variable I shared earlier using 'shared($self->{foo})'. As you
> will see in the code, I have followed the design assuming that share()
> will clobber any pre-exisiting data in the specified data structure.
> In threads::shared POD, I saw the following in the BUGS section:
> "share() allows you to share $hashref->{key} without giving any error
> message. But the $hashref->{key} is not shared, causing the error "locking
> can only be used on shared values" to occur when you attempt to lock
> $hasref->{key}." Thus, I assume I'm currently running into some variant
> of this issue when trying to access fields I shared within a blessed
> object.
> Any insight or suggestions would be greatly appreciated. I'm pretty
> sure I can fall back to flattening out the contents of this class into the
> main calling program (.pl) to subvert these issue, but it would be a very
> messy implementation IMO.
> The code is just a work in progress, but is enough to demonstrate the
> issues at hand.
> ##############################################################
> # DBIPool.pm
> ##############################################################
> require 5.008;
> package DBIPool;
> use strict;
> use warnings;
> use Carp;
> use vars qw($debug);
> use threads;
> use threads::shared;
> use Thread::Queue;
> use Thread::Synchronized;
> use DBI;
> $debug = 0;
> ### CLASS FIELDS
> my (%const, %var, %private);
> %const = (
> 'param_default' => {
> 'dbi_params' => undef
> ,'dbi_db' => 'usadbd02'
> ,'dbi_user' => 'rerix_engine'
> ,'dbi_pass' => 'rerix_engine'
> ,'max_connect' => 12
> ,'min_connect' => 4
> ,'keep_alive' => 120 #seconds between keepalive signals for
> min
connections
> ,'timeout_extra' => 120 #seconds before extra connections
> (beyond min)
> disconnect
> ,'timeout' => 300 #seconds before warning given
> for unresponsive
> connection (not returned to pool)
> }
> ,'param' => {
> 'dbi_params' => undef
> ,'dbi_db' => undef
> ,'dbi_user' => undef
> ,'dbi_pass' => undef
> ,'max_connect' => undef
> ,'min_connect' => undef
> ,'keep_alive' => 120
> ,'timeout_extra' => 120
> ,'timeout' => 300
> }
> ,'max_connect_total' => 32 #max allowed at any time
> );
> %var = ();
> %private = (
> 'dbi_str_prefix' => 'DBI:Oracle:'
> ,'rs_pool' => [('') x $const{max_connect_total}]
> #elements:$ora_dbh_share_handle (must be initialized to empty string)
> ,'rs_avail' => Thread::Queue->new #elements:pool idx
> ,'rs_avail_num' => 0
> ,'rs_inuse' => [] #elements:in use start ts (epoch sec)
> (idx corresponds
to
> pool idx)
> ,'rs_inuse_num' => 0
> ,'rs_monitor_tid' => undef
> ,'rs_monitor_alive' => 0
> ,'rs_monitor_sleep' => 2 #seconds between monitor wakeup requests
> );
> ########################################################################
> sub new (%)
> {
> my $proto = shift;
> my %param = @_;
> ### input params ###
> $const{param}->{max_connect} = ($param{max_connect} =~ m/D/ ||
> $param{max_connect} == 0)?
> $const{param_default}->{max_connect} : $param{max_connect} >
> $const{max_connect_total} ?
> $const{max_connect_total} : $param{max_connect};
> $const{param}->{min_connect} = ($param{min_connect} =~ m/D/ ||
> $param{min_connect} == 0)?
> $const{param_default}->{min_connect} : $param{min_connect} >
> $const{param}->{max_connect} ?
> $const{param}->{max_connect} : $param{min_connect};
> $const{param}->{dbi_params} = (defined $param{dbi_params} && ref
> $param{dbi_params} eq 'HASH') ?
> $param{dbi_params} : $const{param_default}->{dbi_params};
> undef $const{param}->{dbi_params}->{ora_dbh_share}; #explicitly undefine
> ora_dbh_share so DBD::Oracle isn't confused
> $const{param}->{dbi_db} = defined $param{dbi_db} ? $param{dbi_db} :
> $const{param_default}->{dbi_db};
> $const{param}->{dbi_user} = defined $param{dbi_user} ? $param{dbi_user} :
> $const{param_default}->{dbi_user};
> $const{param}->{dbi_pass} = defined $param{dbi_pass} ? $param{dbi_pass} :
> $const{param_default}->{dbi_pass};
> ### create object ###
> my $class = ref($proto) || $proto;
> my $self = { #fields
> %const
> ,%var
> ,%private
> ,'_final' => %const #constant class fields
> ,'_permitted' => %var #public class variables
> ,'_private' => %private #private class variables
> };
> bless ($self, $class);
> ### declare shared variables ###
> share($self->{rs_pool}); $self->{rs_pool} =
> $private{rs_pool}; #clumsy but
> necessary due to threads::shared limitations
> share($self->{rs_avail}); $self->{rs_avail} =
> $private{rs_avail};
> share($self->{rs_avail_num}); $self->{rs_avail_num} =
$private{rs_avail_num};
> share($self->{rs_inuse}); $self->{rs_inuse} =
> $private{rs_inuse};
> share($self->{rs_inuse_num}); $self->{rs_inuse_num} =
$private{rs_inuse_num};
> share($self->{rs_monitor_tid}); $self->{rs_monitor_tid} =
> $private{rs_monitor_tid};
> share($self->{rs_monitor_alive}); $self->{rs_monitor_alive} =
> $private{rs_monitor_alive};
> ### create initial pool ###
> my $connect_success = $self->_inc_pool() x
> $self->{param}->{min_connect}; #Database.pm reports errors
> if ($connect_success =~ m/0/)
> {
> my $monitor_thr = threads->new($self->_pool_monitor());
> $self->{rs_monitor_tid} = $monitor_thr->tid;
> }
> return $connect_success =~ m/0/o ? $self : undef;
> }
> sub DESTROY
> {
> my $self = shift;
> $self->{rs_monitor_alive} = 0; #terminate resource monitor (terminates once
> it wakes from sleep)
> my $rs_monitor = threads->object($self->{rs_monitor_tid});
> $rs_monitor->detach;
> while (@{$self->{rs_pool}})
> {
> $self->_dec_pool();
> }
> }
> ########################################################################
> sub _get_dbh ($)
> {
> my $self = shift;
> my $ora_dbh_share = $_[0];
> return undef;
> }
> sub _pool_monitor () : synchronized method {
> my $self = shift;
> warn "***In pool Monitorn" if $debug;
> unless ($self->{rs_monitor_alive})
> {
> $self->{rs_monitor_alive} = 1;
> while ($self->{rs_monitor_alive})
> {
> warn "***Pool monitor awake...n" if $debug;
> ### send keepalive ping to each connection ###
> for (my $i = 0; $i < $self->{rs_avail_num}; $i++)
> {
> if (defined $self->{rs_pool}->[$i])
> {
> # my $dbh = _get_dbh($self->{rs_pool}->[$i]);
> }
> }
> ### check for extra sleeping connections that should be closed
> ###
> ### warn on connections that have been in use too long (caller
> thread
> may have died?) --debugging ##
> warn "***Pool monitor going to sleep...n" if $debug;
> sleep $self->{rs_monitor_sleep};
> }
> }
> return $self->{pool_monitor};
> }
> sub _inc_pool : synchronized method {
> my $self = shift;
> lock($self->{db_pool});
> lock($self->{db_pool_avail});
> lock($self->{db_pool_inuse});
> if ($self->{db_pool_avail}->pending >
> $self->{param}->{max_connect}) #debugging
> {
> die "thread issue: total connections
> ".$self->{db_pool_avail}->pending."
> > max allowed ".$self->{param}->{max_connect};
> }
> unless ($self->{db_pool_avail}->pending == $self->{param}->{max_connect})
> {
> my %dbi_params = (ora_dbh_share => $self->{rs_pool});
> %dbi_params = defined $self->{dbi_params} ? (%dbi_params,
> %{$self->{dbi_params}}) : ();
> # my $db = DBI->new(dbi_params=>{%{$self->{dbi_params}}});
> }
> return 1;
> }
> sub _dec_pool () : synchronized method {}
> sub select {
> }
> sub insert {
> }
> sub update {
> }
> sub query {
> }
> sub procedure {
> }
> 1;
> Thanks,
> Eric