attached is source for database pool module,
it is very simple but i do not see the problem with the timers and the loop.
i dont see where i could be messing with the loop heap or something like it.
the function dbpool_create is called only once to initialize the pool
before any other thread is
created so it is safe to call it with no locks.

On 13 June 2011 14:01, Marc Lehmann <[email protected]> wrote:
> On Mon, Jun 13, 2011 at 08:38:42AM -0600, Juan Pablo L 
> <[email protected]> wrote:
>> (not from any callback), i m calling from another thread.
>> i m calling ev_break from another thread.
>
> As was already pointed out,t hat is your problem. When using threads, you
> have to take care of threads executing in paralle or switching at any time,
> so when you access a shared resource (such as the event loop) you have to use
> a mutex.
>
> On Mon, Jun 13, 2011 at 12:09:52PM -0600, Juan Pablo L 
> <[email protected]> wrote:
>> yes, i m using mutexes for the timers, so when i modify and reset a
>
> The fact that you are in ev_loop in one thread and calling ev_timer_again
> in another thread instantly tells me that you are not using mutexes to
> lock the loop.
>
> With threads, you have to lock *every* shared resource, that includes the
> loop.
>
>> Maybe what i m trying to do is estrange but here it goes, i m doing a
>> database pool and the timers are one per database connection, so as to
>
> I can't verify whether this approach is corretc or not, but you basically
> have these choices:
>
> - do not access the event loop, except maybe via ev_async watchers.
>  this might be easier than it sounds: maybe you can use a threadpool.
> - do proper locking (and use ev_async watchers to wake up the event loop
>  if it is currently blocked).
>
> Both approaches are documented in the documentation - search for "thread"
> and "locking" and you will find an example for the latter, search for
> ev_async and you find plenty of documentation for async watchers.
>
>> message in the timeout callback so i m expecting to see the message
>> (callback being called) every second for as long as the connection is
>> held by the database thread.
>
> As always with threads, you need to use locking so that not two threads
> access/modify the same data structure at the same time. In addition to
> that, you need a way to wake up the event loop blocked in another thread,
> and you are there.
>
> --
>                The choice of a       Deliantra, the free code+content MORPG
>      -----==-     _GNU_              http://www.deliantra.net
>      ----==-- _       generation
>      ---==---(_)__  __ ____  __      Marc Lehmann
>      --==---/ / _ \/ // /\ \/ /      [email protected]
>      -=====/_/_//_/\_,_/ /_/\_\
>
#include <stdio.h>
#include <errno.h>
#include <freeradius-devel/radiusd.h>
#include "dbpool.h"

// function to be run in the thread to control the timers
void *thread_run(dbpool_t *dbpool);

// disconnect the database connection on timeout
static void dbpool_disconnect_resources(struct ev_loop *loop, ev_timer *evt, int revents);


bool dbpool_create(dbpool_t *dbpool,ppsrad_config_t *config)
{
	pthread_attr_t thread_attr;
	dbpool_timer_data_t *timer_data;
	int rc;

	dbpool->size = config->dbpoolsize;
	dbpool->timeout = config->dbtimeout;
	dbpool->max_retries = config->dbretries;
	dbpool->connection_string = NULL;
	dbpool->pool = NULL;
	dbpool->evt_loop = NULL;

	DEBUG("Creating database pool with %d connections with %d secs timeout",
			dbpool->size,dbpool->timeout);

	if(asprintf(&dbpool->connection_string,"host=%s port=%d user=%s password=%s dbname=%s",
			config->dbhost,config->dbport,config->dbuser,config->dbpasswd,config->dbname) < 0)
	{
		radlog(L_ERR,"Could not allocate memory for the connection string");
		dbpool->connection_string = NULL;// make sure this is NULL
		return false;
	}

	// create the event loop
	dbpool->evt_loop = ev_loop_new(EVFLAG_AUTO);
	pthread_mutex_init(&dbpool->loop_lock,NULL);

	// allocate memory for the pool and start the connections
	dbpool->pool = malloc(sizeof(dbconnection_t) * dbpool->size);
	dbpool->next_dbconn = 0;
	for(int i = 0;i < dbpool->size;i++)
	{
		dbpool->pool[i].pg_conn = PQconnectdb(dbpool->connection_string);
		if(PQstatus(dbpool->pool[i].pg_conn) != CONNECTION_OK)
		{
			radlog(L_ERR,"Connection of database handler %d failed: %s",i,
					PQerrorMessage(dbpool->pool[i].pg_conn));
		}
		else
		{
			radlog(L_INFO,"Database connection %d successfully connected",i);
		}
		pthread_mutex_init(&dbpool->pool[i].lock,NULL);
		ev_init (&dbpool->pool[i].idle_timer,(void*)dbpool_disconnect_resources);
		dbpool->pool[i].idle_timer.repeat = dbpool->timeout;
		timer_data = malloc(sizeof(dbpool_timer_data_t));
		timer_data->my_dbpool = dbpool;
		timer_data->my_index = i;
		dbpool->pool[i].idle_timer.data = timer_data;
		ev_timer_again(dbpool->evt_loop,&dbpool->pool[i].idle_timer);
	}

	// initialize the thread
	pthread_attr_init(&thread_attr);
	pthread_attr_setdetachstate(&thread_attr,PTHREAD_CREATE_JOINABLE);
	rc = pthread_create(&dbpool->thread,&thread_attr,(void*)thread_run,dbpool);
	if(rc != 0)
	{
		radlog(L_ERR,"Could not create thread in database pool: %s",strerror(rc));
		pthread_attr_destroy(&thread_attr);
		dbpool_destroy(dbpool);
		return false;
	}
	pthread_attr_destroy(&thread_attr);

	DEBUG("Successfully created database pool");
	return true;
}

void dbpool_destroy(dbpool_t *dbpool)
{
	dbpool_timer_data_t *timer_data;

	DEBUG("Destroying database pool");

	free(dbpool->connection_string);

	// destroy all database connection objects
	for(int i = 0;i < dbpool->size;i++)
	{
		PQfinish(dbpool->pool[i].pg_conn);
		pthread_mutex_destroy(&dbpool->pool[i].lock);
		timer_data = (dbpool_timer_data_t*)dbpool->pool[i].idle_timer.data;
		free(timer_data);
		pthread_mutex_lock(&dbpool->loop_lock);
		ev_timer_stop(dbpool->evt_loop,&dbpool->pool[i].idle_timer);
		pthread_mutex_unlock(&dbpool->loop_lock);
		radlog(L_DBG,"Closed database connection %d",i);
	}

	// destroy the thread
	pthread_join(dbpool->thread,NULL);

	// destroy the event loop
	ev_loop_destroy(dbpool->evt_loop);
	pthread_mutex_destroy(&dbpool->loop_lock);

	free(dbpool->pool);

	DEBUG("Successfully destroyed database pool");
}

dbconnection_t *dbpool_get_connection(dbpool_t *dbpool)
{
	int conn_index = dbpool->next_dbconn;
	int curr_index;
	int last_index;
	int retry;

	// start where we left off last time
	curr_index = (conn_index >= dbpool->size)?0:conn_index;
	last_index = curr_index;

	do
	{
		radlog(L_DBG,"Trying to get database connection %d",curr_index);
		if(pthread_mutex_trylock(&dbpool->pool[curr_index].lock) == EBUSY)
		{// connection taken
			// get the next connection in the pool
			goto next_connection;
		}
		// we found a connection
		radlog(L_DBG,"Found free database connection %d",curr_index);

		// check the connection status and retry if connection is faulty
		if(PQstatus(dbpool->pool[curr_index].pg_conn) != CONNECTION_OK)
		{
			radlog(L_INFO,"Database connection %d is broken, will reconnect",curr_index);
			retry = 0;
try_reconnect:
			radlog(L_DBG,"Retry %d for connection %d",retry,curr_index);
			PQreset(dbpool->pool[curr_index].pg_conn);
			if(PQstatus(dbpool->pool[curr_index].pg_conn) != CONNECTION_OK)
			{// the connection still faulty, retry
				radlog(L_DBG,"Retry for connection %d failed, continue trying",curr_index);
				if(++retry >= dbpool->max_retries)
				{// reached maximum retries, look for a next connection
					radlog(L_DBG,"Max number of retries reached for connection %d, fetching a next connection",curr_index);
					// free the lock
					pthread_mutex_unlock(&dbpool->pool[curr_index].lock);
					goto next_connection;
				}
				// still more times to retry
				goto try_reconnect;
			}
			radlog(L_DBG,"Successfully reconnected database connection %d",curr_index);
		}
		// remember where we left off
		dbpool->next_dbconn = curr_index + 1;

		// update the reset time for this connection
		dbpool->pool[curr_index].idle_timer.repeat = 1;//(24 * 60 * 60);
		pthread_mutex_lock(&dbpool->loop_lock);
		ev_timer_again(dbpool->evt_loop,&dbpool->pool[curr_index].idle_timer);
		pthread_mutex_unlock(&dbpool->loop_lock);
		return &dbpool->pool[curr_index];

next_connection:
		if(++curr_index >= dbpool->size) curr_index = 0;
	}while(curr_index != last_index);

	// we did not find a free connection
	radlog(L_DBG,"Did not find a free database connection in the pool");
	return NULL;
}

void dbpool_release_connection(dbpool_t *dbpool,dbconnection_t *dbconn)
{
	// update the repeat time out
	dbconn->idle_timer.repeat = dbpool->timeout;
	pthread_mutex_lock(&dbpool->loop_lock);
	ev_timer_again(dbpool->evt_loop,&dbconn->idle_timer);
	pthread_mutex_unlock(&dbpool->loop_lock);

	// free the lock
	pthread_mutex_unlock(&dbconn->lock);
}

void *thread_run(dbpool_t *dbpool)
{
	ev_run(dbpool->evt_loop,0);
	pthread_exit(NULL);
}

static void dbpool_disconnect_resources(struct ev_loop *loop, ev_timer *evt, int revents)
{
	dbpool_timer_data_t *timer_data = ((dbpool_timer_data_t*)evt->data);

	// try to acquire the lock
	if(pthread_mutex_trylock(&timer_data->my_dbpool->pool[timer_data->my_index].lock) == EBUSY)
	{// connection is still taken
		DEBUG("Timeout detected on connection %d but connection still taken, skipping",timer_data->my_index);
		evt->repeat = 1;//24 * 60 * 60;
		pthread_mutex_lock(&timer_data->my_dbpool->loop_lock);
		ev_timer_again(timer_data->my_dbpool->evt_loop,evt);
		pthread_mutex_unlock(&timer_data->my_dbpool->loop_lock);
		return;
	}

	evt->repeat = timer_data->my_dbpool->timeout;
	pthread_mutex_lock(&timer_data->my_dbpool->loop_lock);
	ev_timer_again(timer_data->my_dbpool->evt_loop,evt);
	pthread_mutex_unlock(&timer_data->my_dbpool->loop_lock);

	pthread_mutex_unlock(&timer_data->my_dbpool->pool[timer_data->my_index].lock);
	radlog(L_INFO,"Successfully disconnect database connection %d on timeout",timer_data->my_index);
}
_______________________________________________
libev mailing list
[email protected]
http://lists.schmorp.de/cgi-bin/mailman/listinfo/libev

Reply via email to