attached is source for database pool module,
it is very simple but i do not see the problem with the timers and the loop.
i dont see where i could be messing with the loop heap or something like it.
the function dbpool_create is called only once to initialize the pool
before any other thread is
created so it is safe to call it with no locks.
On 13 June 2011 14:01, Marc Lehmann <[email protected]> wrote:
> On Mon, Jun 13, 2011 at 08:38:42AM -0600, Juan Pablo L
> <[email protected]> wrote:
>> (not from any callback), i m calling from another thread.
>> i m calling ev_break from another thread.
>
> As was already pointed out,t hat is your problem. When using threads, you
> have to take care of threads executing in paralle or switching at any time,
> so when you access a shared resource (such as the event loop) you have to use
> a mutex.
>
> On Mon, Jun 13, 2011 at 12:09:52PM -0600, Juan Pablo L
> <[email protected]> wrote:
>> yes, i m using mutexes for the timers, so when i modify and reset a
>
> The fact that you are in ev_loop in one thread and calling ev_timer_again
> in another thread instantly tells me that you are not using mutexes to
> lock the loop.
>
> With threads, you have to lock *every* shared resource, that includes the
> loop.
>
>> Maybe what i m trying to do is estrange but here it goes, i m doing a
>> database pool and the timers are one per database connection, so as to
>
> I can't verify whether this approach is corretc or not, but you basically
> have these choices:
>
> - do not access the event loop, except maybe via ev_async watchers.
> this might be easier than it sounds: maybe you can use a threadpool.
> - do proper locking (and use ev_async watchers to wake up the event loop
> if it is currently blocked).
>
> Both approaches are documented in the documentation - search for "thread"
> and "locking" and you will find an example for the latter, search for
> ev_async and you find plenty of documentation for async watchers.
>
>> message in the timeout callback so i m expecting to see the message
>> (callback being called) every second for as long as the connection is
>> held by the database thread.
>
> As always with threads, you need to use locking so that not two threads
> access/modify the same data structure at the same time. In addition to
> that, you need a way to wake up the event loop blocked in another thread,
> and you are there.
>
> --
> The choice of a Deliantra, the free code+content MORPG
> -----==- _GNU_ http://www.deliantra.net
> ----==-- _ generation
> ---==---(_)__ __ ____ __ Marc Lehmann
> --==---/ / _ \/ // /\ \/ / [email protected]
> -=====/_/_//_/\_,_/ /_/\_\
>
#include <stdio.h>
#include <errno.h>
#include <freeradius-devel/radiusd.h>
#include "dbpool.h"
// function to be run in the thread to control the timers
void *thread_run(dbpool_t *dbpool);
// disconnect the database connection on timeout
static void dbpool_disconnect_resources(struct ev_loop *loop, ev_timer *evt, int revents);
bool dbpool_create(dbpool_t *dbpool,ppsrad_config_t *config)
{
pthread_attr_t thread_attr;
dbpool_timer_data_t *timer_data;
int rc;
dbpool->size = config->dbpoolsize;
dbpool->timeout = config->dbtimeout;
dbpool->max_retries = config->dbretries;
dbpool->connection_string = NULL;
dbpool->pool = NULL;
dbpool->evt_loop = NULL;
DEBUG("Creating database pool with %d connections with %d secs timeout",
dbpool->size,dbpool->timeout);
if(asprintf(&dbpool->connection_string,"host=%s port=%d user=%s password=%s dbname=%s",
config->dbhost,config->dbport,config->dbuser,config->dbpasswd,config->dbname) < 0)
{
radlog(L_ERR,"Could not allocate memory for the connection string");
dbpool->connection_string = NULL;// make sure this is NULL
return false;
}
// create the event loop
dbpool->evt_loop = ev_loop_new(EVFLAG_AUTO);
pthread_mutex_init(&dbpool->loop_lock,NULL);
// allocate memory for the pool and start the connections
dbpool->pool = malloc(sizeof(dbconnection_t) * dbpool->size);
dbpool->next_dbconn = 0;
for(int i = 0;i < dbpool->size;i++)
{
dbpool->pool[i].pg_conn = PQconnectdb(dbpool->connection_string);
if(PQstatus(dbpool->pool[i].pg_conn) != CONNECTION_OK)
{
radlog(L_ERR,"Connection of database handler %d failed: %s",i,
PQerrorMessage(dbpool->pool[i].pg_conn));
}
else
{
radlog(L_INFO,"Database connection %d successfully connected",i);
}
pthread_mutex_init(&dbpool->pool[i].lock,NULL);
ev_init (&dbpool->pool[i].idle_timer,(void*)dbpool_disconnect_resources);
dbpool->pool[i].idle_timer.repeat = dbpool->timeout;
timer_data = malloc(sizeof(dbpool_timer_data_t));
timer_data->my_dbpool = dbpool;
timer_data->my_index = i;
dbpool->pool[i].idle_timer.data = timer_data;
ev_timer_again(dbpool->evt_loop,&dbpool->pool[i].idle_timer);
}
// initialize the thread
pthread_attr_init(&thread_attr);
pthread_attr_setdetachstate(&thread_attr,PTHREAD_CREATE_JOINABLE);
rc = pthread_create(&dbpool->thread,&thread_attr,(void*)thread_run,dbpool);
if(rc != 0)
{
radlog(L_ERR,"Could not create thread in database pool: %s",strerror(rc));
pthread_attr_destroy(&thread_attr);
dbpool_destroy(dbpool);
return false;
}
pthread_attr_destroy(&thread_attr);
DEBUG("Successfully created database pool");
return true;
}
void dbpool_destroy(dbpool_t *dbpool)
{
dbpool_timer_data_t *timer_data;
DEBUG("Destroying database pool");
free(dbpool->connection_string);
// destroy all database connection objects
for(int i = 0;i < dbpool->size;i++)
{
PQfinish(dbpool->pool[i].pg_conn);
pthread_mutex_destroy(&dbpool->pool[i].lock);
timer_data = (dbpool_timer_data_t*)dbpool->pool[i].idle_timer.data;
free(timer_data);
pthread_mutex_lock(&dbpool->loop_lock);
ev_timer_stop(dbpool->evt_loop,&dbpool->pool[i].idle_timer);
pthread_mutex_unlock(&dbpool->loop_lock);
radlog(L_DBG,"Closed database connection %d",i);
}
// destroy the thread
pthread_join(dbpool->thread,NULL);
// destroy the event loop
ev_loop_destroy(dbpool->evt_loop);
pthread_mutex_destroy(&dbpool->loop_lock);
free(dbpool->pool);
DEBUG("Successfully destroyed database pool");
}
dbconnection_t *dbpool_get_connection(dbpool_t *dbpool)
{
int conn_index = dbpool->next_dbconn;
int curr_index;
int last_index;
int retry;
// start where we left off last time
curr_index = (conn_index >= dbpool->size)?0:conn_index;
last_index = curr_index;
do
{
radlog(L_DBG,"Trying to get database connection %d",curr_index);
if(pthread_mutex_trylock(&dbpool->pool[curr_index].lock) == EBUSY)
{// connection taken
// get the next connection in the pool
goto next_connection;
}
// we found a connection
radlog(L_DBG,"Found free database connection %d",curr_index);
// check the connection status and retry if connection is faulty
if(PQstatus(dbpool->pool[curr_index].pg_conn) != CONNECTION_OK)
{
radlog(L_INFO,"Database connection %d is broken, will reconnect",curr_index);
retry = 0;
try_reconnect:
radlog(L_DBG,"Retry %d for connection %d",retry,curr_index);
PQreset(dbpool->pool[curr_index].pg_conn);
if(PQstatus(dbpool->pool[curr_index].pg_conn) != CONNECTION_OK)
{// the connection still faulty, retry
radlog(L_DBG,"Retry for connection %d failed, continue trying",curr_index);
if(++retry >= dbpool->max_retries)
{// reached maximum retries, look for a next connection
radlog(L_DBG,"Max number of retries reached for connection %d, fetching a next connection",curr_index);
// free the lock
pthread_mutex_unlock(&dbpool->pool[curr_index].lock);
goto next_connection;
}
// still more times to retry
goto try_reconnect;
}
radlog(L_DBG,"Successfully reconnected database connection %d",curr_index);
}
// remember where we left off
dbpool->next_dbconn = curr_index + 1;
// update the reset time for this connection
dbpool->pool[curr_index].idle_timer.repeat = 1;//(24 * 60 * 60);
pthread_mutex_lock(&dbpool->loop_lock);
ev_timer_again(dbpool->evt_loop,&dbpool->pool[curr_index].idle_timer);
pthread_mutex_unlock(&dbpool->loop_lock);
return &dbpool->pool[curr_index];
next_connection:
if(++curr_index >= dbpool->size) curr_index = 0;
}while(curr_index != last_index);
// we did not find a free connection
radlog(L_DBG,"Did not find a free database connection in the pool");
return NULL;
}
void dbpool_release_connection(dbpool_t *dbpool,dbconnection_t *dbconn)
{
// update the repeat time out
dbconn->idle_timer.repeat = dbpool->timeout;
pthread_mutex_lock(&dbpool->loop_lock);
ev_timer_again(dbpool->evt_loop,&dbconn->idle_timer);
pthread_mutex_unlock(&dbpool->loop_lock);
// free the lock
pthread_mutex_unlock(&dbconn->lock);
}
void *thread_run(dbpool_t *dbpool)
{
ev_run(dbpool->evt_loop,0);
pthread_exit(NULL);
}
static void dbpool_disconnect_resources(struct ev_loop *loop, ev_timer *evt, int revents)
{
dbpool_timer_data_t *timer_data = ((dbpool_timer_data_t*)evt->data);
// try to acquire the lock
if(pthread_mutex_trylock(&timer_data->my_dbpool->pool[timer_data->my_index].lock) == EBUSY)
{// connection is still taken
DEBUG("Timeout detected on connection %d but connection still taken, skipping",timer_data->my_index);
evt->repeat = 1;//24 * 60 * 60;
pthread_mutex_lock(&timer_data->my_dbpool->loop_lock);
ev_timer_again(timer_data->my_dbpool->evt_loop,evt);
pthread_mutex_unlock(&timer_data->my_dbpool->loop_lock);
return;
}
evt->repeat = timer_data->my_dbpool->timeout;
pthread_mutex_lock(&timer_data->my_dbpool->loop_lock);
ev_timer_again(timer_data->my_dbpool->evt_loop,evt);
pthread_mutex_unlock(&timer_data->my_dbpool->loop_lock);
pthread_mutex_unlock(&timer_data->my_dbpool->pool[timer_data->my_index].lock);
radlog(L_INFO,"Successfully disconnect database connection %d on timeout",timer_data->my_index);
}
_______________________________________________
libev mailing list
[email protected]
http://lists.schmorp.de/cgi-bin/mailman/listinfo/libev