Hi, i have been modifying the code but i have some problems, there is
something i dont get right it is not identical as in the example code as i
did not include things i think i dont need in my case but i m still doing
something wrong.

when i uncomment lines 181 and 184 it hangs for ever.
when i comment line 212,213 and 215 i get a seg fault.
the loop does not receive the ev_async in any case.
i did not make any difference by including the lock in the loop,
i m, for sure, missing something, i did not include the things with the
semaphore and calling another thread. i m not sure when i m supposed
to lock the loop
and when i m not supposed to lock the loop, if i start the ev_run with
the lock already locked,
i dont know how to handle another thread modifying a timer and doing
ev_timer_again, when i try to lock there
it hangs.


On 13 June 2011 23:41, Marc Lehmann <[email protected]> wrote:
> On Mon, Jun 13, 2011 at 02:48:00PM -0600, Juan Pablo L 
> <[email protected]> wrote:
>> thanks, yes i have a lock for the loop (i focused too much on the
>> timer that i forgot to mention it in the explanation),
>> i lock that lock just before calling ev_timer_again and realising it
>> just after, right now i only one the thread running the ev_run
>> and one database thread, i m going to check the code because i might
>> be forgetting to acquire the loop lock somewhere then.
>> i ll let you know in a little while.
>
> Hmm, from glancing over that code:
>
> - where is the mutex lock around ev_run?
> - where is the async watcher that wakes up the loop?
> - where is the call to ev_set_loop_release_cb?
>
> have you read the documentation that shows a working example?
>
> --
>                The choice of a       Deliantra, the free code+content MORPG
>      -----==-     _GNU_              http://www.deliantra.net
>      ----==-- _       generation
>      ---==---(_)__  __ ____  __      Marc Lehmann
>      --==---/ / _ \/ // /\ \/ /      [email protected]
>      -=====/_/_//_/\_,_/ /_/\_\
>
> _______________________________________________
> libev mailing list
> [email protected]
> http://lists.schmorp.de/cgi-bin/mailman/listinfo/libev
>
#include <stdio.h>
#include <errno.h>
#include <freeradius-devel/radiusd.h>
#include "dbpool.h"

// function to be run in the thread to control the timers
static void *dbpool_thread_run(dbpool_t *dbpool);

// disconnect the database connection on timeout
static void dbpool_disconnect_resources(struct ev_loop *loop, ev_timer *evt, int revents);

// acquire/release dbpool loop lock
static void dbpool_lock(struct ev_loop *loop);
static void dbpool_unlock(struct ev_loop *loop);

// wakeup the loop
static void dbpool_wakeup_loop(struct ev_loop *loop, ev_async *evt, int revents);



bool dbpool_create(dbpool_t *dbpool,ppsrad_config_t *config)
{
	pthread_attr_t thread_attr;
	dbpool_timer_data_t *timer_data;
	int rc;

	dbpool->size = config->dbpoolsize;
	dbpool->timeout = config->dbtimeout;
	dbpool->max_retries = config->dbretries;
	dbpool->connection_string = NULL;
	dbpool->pool = NULL;
	dbpool->evt_loop = NULL;

	DEBUG("Creating database pool with %d connections with %d secs timeout",
			dbpool->size,dbpool->timeout);

	if(asprintf(&dbpool->connection_string,"host=%s port=%d user=%s password=%s dbname=%s",
			config->dbhost,config->dbport,config->dbuser,config->dbpasswd,config->dbname) < 0)
	{
		radlog(L_ERR,"Could not allocate memory for the connection string");
		dbpool->connection_string = NULL;// make sure this is NULL
		return false;
	}

	// create the event loop
	dbpool->evt_loop = ev_loop_new(EVFLAG_AUTO);
	pthread_mutex_init(&dbpool->loop_lock,NULL);
	ev_set_loop_release_cb(dbpool->evt_loop,(void*)dbpool_unlock,(void*)dbpool_lock);
	ev_set_userdata(dbpool->evt_loop,&dbpool);

	ev_async_init(&dbpool->async_wl,(void*)dbpool_wakeup_loop);
	ev_async_start(dbpool->evt_loop,&dbpool->async_wl);

	// allocate memory for the pool and start the connections
	dbpool->pool = malloc(sizeof(dbconnection_t) * dbpool->size);
	dbpool->next_dbconn = 0;
	for(int i = 0;i < dbpool->size;i++)
	{
		dbpool->pool[i].pg_conn = PQconnectdb(dbpool->connection_string);
		if(PQstatus(dbpool->pool[i].pg_conn) != CONNECTION_OK)
		{
			radlog(L_ERR,"Connection of database handler %d failed: %s",i,
					PQerrorMessage(dbpool->pool[i].pg_conn));
		}
		else
		{
			radlog(L_INFO,"Database connection %d successfully connected",i);
		}
		pthread_mutex_init(&dbpool->pool[i].lock,NULL);
		timer_data = malloc(sizeof(dbpool_timer_data_t));
		timer_data->my_dbpool = dbpool;
		timer_data->my_index = i;
		dbpool->pool[i].idle_timer.data = timer_data;
		ev_init (&dbpool->pool[i].idle_timer,(void*)dbpool_disconnect_resources);
		dbpool->pool[i].idle_timer.repeat = dbpool->timeout;
		ev_timer_again(dbpool->evt_loop,&dbpool->pool[i].idle_timer);
	}

	// initialize the thread
	pthread_attr_init(&thread_attr);
	pthread_attr_setdetachstate(&thread_attr,PTHREAD_CREATE_JOINABLE);
	rc = pthread_create(&dbpool->thread,&thread_attr,(void*)dbpool_thread_run,dbpool);
	if(rc != 0)
	{
		radlog(L_ERR,"Could not create thread in database pool: %s",strerror(rc));
		pthread_attr_destroy(&thread_attr);
		dbpool_destroy(dbpool);
		return false;
	}
	pthread_attr_destroy(&thread_attr);

	DEBUG("Successfully created database pool");
	return true;
}

void dbpool_destroy(dbpool_t *dbpool)
{
	dbpool_timer_data_t *timer_data;

	DEBUG("Destroying database pool");

	free(dbpool->connection_string);

	// destroy all database connection objects
	for(int i = 0;i < dbpool->size;i++)
	{
		PQfinish(dbpool->pool[i].pg_conn);
		pthread_mutex_destroy(&dbpool->pool[i].lock);
		timer_data = (dbpool_timer_data_t*)dbpool->pool[i].idle_timer.data;
		free(timer_data);
		ev_timer_stop(dbpool->evt_loop,&dbpool->pool[i].idle_timer);
		radlog(L_DBG,"Closed database connection %d",i);
	}

	// destroy the thread
	pthread_join(dbpool->thread,NULL);

	// destroy the async evt
	ev_async_stop(dbpool->evt_loop,&dbpool->async_wl);

	// destroy the event loop
	ev_loop_destroy(dbpool->evt_loop);
	pthread_mutex_destroy(&dbpool->loop_lock);

	free(dbpool->pool);

	DEBUG("Successfully destroyed database pool");
}

dbconnection_t *dbpool_get_connection(dbpool_t *dbpool)
{
	int conn_index = dbpool->next_dbconn;
	int curr_index;
	int last_index;
	int retry;

	// start where we left off last time
	curr_index = (conn_index >= dbpool->size)?0:conn_index;
	last_index = curr_index;

	do
	{
		radlog(L_DBG,"Trying to get database connection %d",curr_index);
		if(pthread_mutex_trylock(&dbpool->pool[curr_index].lock) == EBUSY)
		{// connection taken
			// get the next connection in the pool
			goto next_connection;
		}
		// we found a connection
		radlog(L_DBG,"Found free database connection %d",curr_index);

		// check the connection status and retry if connection is faulty
		if(PQstatus(dbpool->pool[curr_index].pg_conn) != CONNECTION_OK)
		{
			radlog(L_INFO,"Database connection %d is broken, will reconnect",curr_index);
			retry = 0;
try_reconnect:
			radlog(L_DBG,"Retry %d for connection %d",retry,curr_index);
			PQreset(dbpool->pool[curr_index].pg_conn);
			if(PQstatus(dbpool->pool[curr_index].pg_conn) != CONNECTION_OK)
			{// the connection still faulty, retry
				radlog(L_DBG,"Retry for connection %d failed, continue trying",curr_index);
				if(++retry >= dbpool->max_retries)
				{// reached maximum retries, look for a next connection
					radlog(L_DBG,"Max number of retries reached for connection %d, fetching a next connection",curr_index);
					// free the lock
					pthread_mutex_unlock(&dbpool->pool[curr_index].lock);
					goto next_connection;
				}
				// still more times to retry
				goto try_reconnect;
			}
			radlog(L_DBG,"Successfully reconnected database connection %d",curr_index);
		}
		// remember where we left off
		dbpool->next_dbconn = curr_index + 1;

		// update the reset time for this connection
		dbpool->pool[curr_index].idle_timer.repeat = 1;//(24 * 60 * 60);
		radlog(L_DBG,"before the lock");
		//dbpool_lock(dbpool->evt_loop);
		ev_timer_again(dbpool->evt_loop,&dbpool->pool[curr_index].idle_timer);
		ev_async_send(dbpool->evt_loop,&dbpool->async_wl);
		//dbpool_unlock(dbpool->evt_loop);
		radlog(L_DBG,"after the lock");
		return &dbpool->pool[curr_index];

next_connection:
		if(++curr_index >= dbpool->size) curr_index = 0;
	}while(curr_index != last_index);

	// we did not find a free connection
	radlog(L_DBG,"Did not find a free database connection in the pool");
	return NULL;
}

void dbpool_release_connection(dbpool_t *dbpool,dbconnection_t *dbconn)
{
	// update the repeat time out
	dbconn->idle_timer.repeat = dbpool->timeout;
	//dbpool_lock(dbpool->evt_loop);
	ev_timer_again(dbpool->evt_loop,&dbconn->idle_timer);
	ev_async_send(dbpool->evt_loop,&dbpool->async_wl);
	//dbpool_lock(dbpool->evt_loop);

	// free the lock
	pthread_mutex_unlock(&dbconn->lock);
}

void *dbpool_thread_run(dbpool_t *dbpool)
{
	dbpool_lock(dbpool->evt_loop);
	pthread_setcanceltype (PTHREAD_CANCEL_ASYNCHRONOUS, 0);
	ev_run(dbpool->evt_loop,0);
	dbpool_unlock(dbpool->evt_loop);
	pthread_exit(NULL);
}

void dbpool_disconnect_resources(struct ev_loop *loop, ev_timer *evt, int revents)
{
	dbpool_timer_data_t *timer_data = ((dbpool_timer_data_t*)evt->data);

	// try to acquire the lock
	if(pthread_mutex_trylock(&timer_data->my_dbpool->pool[timer_data->my_index].lock) == EBUSY)
	{// connection is still taken
		DEBUG("Timeout detected on connection %d but connection still taken, skipping",timer_data->my_index);
		evt->repeat = 1;//24 * 60 * 60;
		//dbpool_lock(timer_data->my_dbpool->evt_loop);
		ev_timer_again(timer_data->my_dbpool->evt_loop,evt);
		ev_async_send(timer_data->my_dbpool->evt_loop,&timer_data->my_dbpool->async_wl);
		//dbpool_unlock(timer_data->my_dbpool->evt_loop);
		return;
	}

	// the connection is not in use
	evt->repeat = timer_data->my_dbpool->timeout;
	//dbpool_lock(timer_data->my_dbpool->evt_loop);
	ev_timer_again(timer_data->my_dbpool->evt_loop,evt);
	ev_async_send(timer_data->my_dbpool->evt_loop,&timer_data->my_dbpool->async_wl);
	//dbpool_unlock(timer_data->my_dbpool->evt_loop);
	pthread_mutex_unlock(&timer_data->my_dbpool->pool[timer_data->my_index].lock);
	radlog(L_INFO,"Successfully disconnect database connection %d on timeout",timer_data->my_index);
}

void dbpool_lock(struct ev_loop *loop)
{
	dbpool_t *dbpool = ev_userdata(loop);
	pthread_mutex_lock(&dbpool->loop_lock);
}

void dbpool_unlock(struct ev_loop *loop)
{
	dbpool_t *dbpool = ev_userdata(loop);
	pthread_mutex_unlock(&dbpool->loop_lock);
}

void dbpool_wakeup_loop(struct ev_loop *loop, ev_async *evt, int revents)
{
	DEBUG("dbpool_wakeup_loop");
}
_______________________________________________
libev mailing list
[email protected]
http://lists.schmorp.de/cgi-bin/mailman/listinfo/libev

Reply via email to