To answer my own question, and to provide some more information...

1. SOLUTION: My implementation of the shared variables in the object
fields does work.  I was just calling lock() on an incorrect (undefined)
hash reference...DOH!

2. PROBLEM: Threads::Synchronized is still reporting lock issues when I
try to call a method that has 'synchronized method' specified in it's
definition.  Any thoughts on this?

3. COMMENT: I discovered how to specify object methods in thread->new()
calls. Given a blessed object $obj with method 'foo' defined in the
object's package 'My::Pkg', you can start a thread with:

     my $thr = thread->new(\&My::Pkg::foo, $obj);

Using this form, it is possible to specify 'foo : synchronized { ... }' in
the definition of foo, but 'synchronized method' still fails with the
usual 'thread failed to start: lock can only be used on shared values at
..' error.  Is there another way I can create new threads on an object
that both start a proper thread AND use the synchronized method?  For now,
it looks like I'll have to use Thread::Semaphore to emulate this behavior.

-Eric



Perldiscuss - Perl Newsgroups And Mailing Lists wrote:

> Hi,

>    Currently, I am trying to implement an ithread-based DBI connection
> pool for an application I am developing for a client.  I have verified
> that the DBD::Oracle driver I am using is ithread safe by properly sharing
> the ora_dbh_share parameter(see
> http://search.cpan.org/~timb/DBD-Oracle-1.15/Oracle.pm#Connect_Attributes)
> within all threads that will be accessing the database.

>    The issues I am running into consist of two parts, but appear to be
> related to a single parent problem: shared variables and blessed object
> references.  First, I am attempting to use Thread::Synchronized to insure
> that my database pool control methods (add to, subtract from, and monitor
> pool) can only be called by one thread at a time (so I don't have to worry
> about sharing semaphores to do this).  Secondly, in tune to the current
> inherent limitations of threads::shared, I am attempting to share only
> specific hashrefs and arrayrefs of my class fields that are cricital to be
> accessable/modifiable by all threads.  With a non-object (e.g. blessed)
> hashref, I have been able to prove the concept that, given 
>      my $hash = {1=>0}
> I can set
>      share($hash{1});
>      $hash{1} = 0;
> Then, within any thread, do
>      {
>      lock($hash{1});
>      $hash{1}++;
>      }
> and the lock will be properly respected and the variable shared
> appropriatly incremented globally.

>    The issue I am running into is: 'lock can only be used on shared values
> at ...' at any place where I am attempting to either declare a
> synchronized method (i.e. 'sub foo : synchronized method  { ... }') or
> lock a variable I shared earlier using 'shared($self->{foo})'.  As you
> will see in the code, I have followed the design assuming that share()
> will clobber any pre-exisiting data in the specified data structure.

>    In threads::shared POD, I saw the following in the BUGS section:
> "share() allows you to share $hashref->{key} without giving any error
> message. But the $hashref->{key} is not shared, causing the error "locking
> can only be used on shared values" to occur when you attempt to lock
> $hasref->{key}."  Thus, I assume I'm currently running into some variant
> of this issue when trying to access fields I shared within a blessed
> object.

>    Any insight or suggestions would be greatly appreciated.  I'm pretty
> sure I can fall back to flattening out the contents of this class into the
> main calling program (.pl) to subvert these issue, but it would be a very
> messy implementation IMO.

>    The code is just a work in progress, but is enough to demonstrate the
> issues at hand.


> ##############################################################
> # DBIPool.pm
> ##############################################################

> require 5.008;
> package DBIPool;
> use strict;
> use warnings;
> use Carp;
> use vars qw($debug);
> use threads;
> use threads::shared;
> use Thread::Queue;
> use Thread::Synchronized;
> use DBI;
> $debug = 0;

> ### CLASS FIELDS
> my (%const, %var, %private);
> %const = (
>       'param_default' => {
>               'dbi_params'            => undef
>               ,'dbi_db'                       => 'usadbd02'
>               ,'dbi_user'                     => 'rerix_engine'
>               ,'dbi_pass'                     => 'rerix_engine'
>               ,'max_connect'          => 12
>               ,'min_connect'          => 4
>               ,'keep_alive'           => 120  #seconds between keepalive signals for 
> min
connections
>               ,'timeout_extra'        => 120  #seconds before extra connections 
> (beyond min)
> disconnect
>               ,'timeout'                      => 300  #seconds before warning given 
> for unresponsive
> connection (not returned to pool)
>       }
>       ,'param' => {
>               'dbi_params'            => undef
>               ,'dbi_db'                       => undef
>               ,'dbi_user'                     => undef
>               ,'dbi_pass'                     => undef
>               ,'max_connect'          => undef
>               ,'min_connect'          => undef
>               ,'keep_alive'           => 120
>               ,'timeout_extra'        => 120
>               ,'timeout'                      => 300
>       }
>       ,'max_connect_total'    => 32   #max allowed at any time
> );
> %var = ();
> %private = (
>       'dbi_str_prefix'        => 'DBI:Oracle:'
>       ,'rs_pool'                      => [('') x $const{max_connect_total}]
>       #elements:$ora_dbh_share_handle (must be initialized to empty string)
>       ,'rs_avail'                     => Thread::Queue->new   #elements:pool idx
>       ,'rs_avail_num'         => 0
>       ,'rs_inuse'                     => []   #elements:in use start ts (epoch sec) 
> (idx corresponds
to
> pool idx)
>       ,'rs_inuse_num'         => 0
>       ,'rs_monitor_tid'       => undef
>       ,'rs_monitor_alive'     => 0
>       ,'rs_monitor_sleep'     => 2    #seconds between monitor wakeup requests
> );

> ########################################################################
> sub new (%)
> {
>       my $proto = shift;
>       my %param = @_;

>       ### input params ###
>       $const{param}->{max_connect} = ($param{max_connect} =~ m/D/ ||
> $param{max_connect} == 0)? 
>               $const{param_default}->{max_connect} : $param{max_connect} >
> $const{max_connect_total} ? 
>                       $const{max_connect_total} : $param{max_connect};
>       $const{param}->{min_connect} = ($param{min_connect} =~ m/D/ ||
> $param{min_connect} == 0)? 
>               $const{param_default}->{min_connect} : $param{min_connect} >
> $const{param}->{max_connect} ? 
>                       $const{param}->{max_connect} : $param{min_connect};
>       $const{param}->{dbi_params} = (defined $param{dbi_params} && ref
> $param{dbi_params} eq 'HASH') ?
>               $param{dbi_params} : $const{param_default}->{dbi_params};
>       undef $const{param}->{dbi_params}->{ora_dbh_share};     #explicitly undefine
> ora_dbh_share so DBD::Oracle isn't confused
>       $const{param}->{dbi_db} = defined $param{dbi_db} ? $param{dbi_db} :
> $const{param_default}->{dbi_db};
>       $const{param}->{dbi_user} = defined $param{dbi_user} ? $param{dbi_user} :
> $const{param_default}->{dbi_user};
>       $const{param}->{dbi_pass} = defined $param{dbi_pass} ? $param{dbi_pass} :
> $const{param_default}->{dbi_pass};

>       ### create object ###
>       my $class = ref($proto) || $proto;
>       my $self = {    #fields
>               %const
>               ,%var
>               ,%private
>               ,'_final'               => %const               #constant class fields
>               ,'_permitted'   => %var         #public class variables
>               ,'_private'             => %private     #private class variables
>       };

>       bless ($self, $class);

>       ### declare shared variables ###
>       share($self->{rs_pool});                        $self->{rs_pool} = 
> $private{rs_pool};   #clumsy but
> necessary due to threads::shared limitations
>       share($self->{rs_avail});                       $self->{rs_avail} = 
> $private{rs_avail};
>       share($self->{rs_avail_num});           $self->{rs_avail_num} =
$private{rs_avail_num};
>       share($self->{rs_inuse});                       $self->{rs_inuse} = 
> $private{rs_inuse};
>       share($self->{rs_inuse_num});           $self->{rs_inuse_num} =
$private{rs_inuse_num};
>       share($self->{rs_monitor_tid});         $self->{rs_monitor_tid} =
> $private{rs_monitor_tid};
>       share($self->{rs_monitor_alive});       $self->{rs_monitor_alive} =
> $private{rs_monitor_alive};

>       ### create initial pool ###
>       my $connect_success = $self->_inc_pool() x
> $self->{param}->{min_connect};        #Database.pm reports errors
>       if ($connect_success =~ m/0/)
>       {
>               my $monitor_thr = threads->new($self->_pool_monitor());
>               $self->{rs_monitor_tid} = $monitor_thr->tid;
>       }

>       return $connect_success =~ m/0/o ? $self : undef;
> }

> sub DESTROY
> {
>       my $self = shift;
>       $self->{rs_monitor_alive} = 0;  #terminate resource monitor (terminates once
> it wakes from sleep)
>       my $rs_monitor = threads->object($self->{rs_monitor_tid});
>       $rs_monitor->detach;
>       while (@{$self->{rs_pool}})
>       {
>               $self->_dec_pool();
>       }
> }
> ########################################################################
> sub _get_dbh ($)
> {
>       my $self = shift;
>       my $ora_dbh_share = $_[0];

>       return undef;
> }

> sub _pool_monitor () : synchronized method {
>       my $self = shift;
>       warn "***In pool Monitorn" if $debug;
>       unless ($self->{rs_monitor_alive})
>       {
>               $self->{rs_monitor_alive} = 1;
>               while ($self->{rs_monitor_alive})
>               {
>                       warn "***Pool monitor awake...n" if $debug;
>                       ### send keepalive ping to each connection ###
>                       for (my $i = 0; $i < $self->{rs_avail_num}; $i++)
>                       {
>                               if (defined $self->{rs_pool}->[$i])
>                               {
> #                                     my $dbh = _get_dbh($self->{rs_pool}->[$i]);
>                               }
>                       }

>                       ### check for extra sleeping connections that should be closed 
> ###

>                       ### warn on connections that have been in use too long (caller 
> thread
> may have died?) --debugging ##

>                       warn "***Pool monitor going to sleep...n" if $debug;
>                       sleep $self->{rs_monitor_sleep};
>               }
>       }

>       return $self->{pool_monitor};
> }

> sub _inc_pool : synchronized method {
>       my $self = shift;
>       lock($self->{db_pool});
>       lock($self->{db_pool_avail});
>       lock($self->{db_pool_inuse});
>       if ($self->{db_pool_avail}->pending >
> $self->{param}->{max_connect})        #debugging
>       {
>               die "thread issue: total connections 
> ".$self->{db_pool_avail}->pending."
> > max allowed ".$self->{param}->{max_connect};
>       }
>       unless ($self->{db_pool_avail}->pending == $self->{param}->{max_connect})
>       {
>               my %dbi_params = (ora_dbh_share => $self->{rs_pool});
>               %dbi_params = defined $self->{dbi_params} ? (%dbi_params,
> %{$self->{dbi_params}}) : ();
> #             my $db = DBI->new(dbi_params=>{%{$self->{dbi_params}}});

>       }
>       return 1;
> }

> sub _dec_pool () : synchronized method {}

> sub select {

> }

> sub insert {

> }

> sub update {

> }

> sub query {

> }

> sub procedure {

> }

> 1;


> Thanks,
> Eric


Reply via email to