Hi all,

I am sure, many of you would like to delete this message before reading, hold 
on. :-)

There is much talk about threading on this list and the idea is always 
deferred for want of robust thread models across all supported platforms and 
feasibility of gains v/s efforts required.

I think threads are useful in difference situations namely parallelising 
blocking conditions and using multiple CPUs.

Attached is a framework that I ported to C from a C++ server I have written. 
It has threadpool and threads implementation based on pthreads.

This code expects minimum pthreads implementation and does not assume anything 
on threads part (e.g kernel threads or not etc.)

I request hackers on this list to take a look at it. It should be easily 
pluggable in any source code and is released without any strings for any use.

This framework allows to plug-in the worker function and argument on the fly. 
The threads created are sleeping by default and can be woken up s and when 
required.

I propose to use it incrementally in postgresql. Let's start with I/O. When a 
block of data is being read, rather than blocking for read, we can set up 
creator-consumer link between two threads That we way can utilize that I/O 
time in a overlapped fashion.

Further threads can be useful when the server has more CPUs. It can spread CPU 
intensive work to different threads such as index creation or sorting. This 
way we can utilise idle CPU which we can not as of now.

There are many advantages that I can see.

1)Threads can be optionally turned on/off depending upon the configuration. So 
we can entirely keep existing functionality and convert them one-by-one to 
threaded application.

2)For each functionality we can have two code branches, one that do not use 
threads i.e. current code base and one that can use threads. Agreed the 
binary will be bit bloated but that would give enormous flexibility. If we 
find a thread implementation buggy, we simply switch it off either in 
compilation or inconfiguration.

3) Not much efforts should be required to plug code into this model. The idea 
of using threads is to assign exclusive work to each thread. So that should 
not require much of a locking.

In case of using multiple CPUs, separate functions need be written that can 
handle the things in a thread-safe fashion. Also a merger function would be 
required which would merge results of worker threads. That would be totally 
additional.

I would say two threads per CPU per back-end should be a reasonable default as 
that would cover I/O blocking well. Of course unless threading is turned off 
in build or in configuration.

Please note that I have tested the code in C++ and my C is rusty. Quite likely 
there are bugs in the code. I will stress test the code on monday but I would 
like to seek an opinion on this as soon as possible. ( Hey but it compiles 
clean..)

If required I can post example usage of this code, but I don't think that 
should be necessary.:-)

Bye
 Shridhar
#define _REENTRANT

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>


//typedefs
typedef void* (*function)(void *);
typedef void* argtype;

typedef struct
{
 pthread_mutex_t lock;
 pthread_cond_t cond;

 unsigned short freeCount,n,count;
 void *pool;

} threadPool;

typedef struct
{
 pthread_t t;
 pthread_attr_t tattr;
 pthread_mutex_t lock;
 pthread_cond_t cond;

 argtype arg;
 function f;

 unsigned short quit;
 threadPool *p;

} thread;

/*Thread functions*/
void initThread(thread **t,threadPool *pool);
void deleteThread(thread **t);
void stop(thread *thr);

void wakeForWork(thread *thr,function func,argtype a);

argtype runner(void *ptr);

/*thread pool functions*/
void initPool(threadPool **pool,unsigned short numthreads);
void deletePool(threadPool **p);

void putThread(threadPool *p,thread *t);
thread	*getThread(threadPool *p);





#include "thread.h"

void initThread(thread **t,threadPool *pool)
{
 thread *thr=(thread *)malloc(sizeof(thread));

 if(!thr)
 {
  fprintf(stderr,"\nCan not allocate memory for thread. Quitting...\n");
  exit(1);
 }

 *t=thr;

 pthread_attr_init(&(thr->tattr));
 pthread_mutex_init(&(thr->lock), NULL);
 pthread_cond_init(&(thr->cond), NULL);

 pthread_attr_setdetachstate(&(thr->tattr),PTHREAD_CREATE_DETACHED);

 thr->quit=0;
 thr->p=pool;

 //Create the thread
 int ret=pthread_create(&(thr->t),&(thr->tattr),runner,(void *)thr);

 if(ret!=0)
 {
  fprintf(stderr,"\nCan not create thread. Quitting...\n");
  exit(1);
 }
}

void deleteThread(thread **t)
{
 thread *thr=*t;

 if(!t) return;

 stop(thr);

 pthread_attr_destroy(&(thr->tattr));
 pthread_cond_destroy(&(thr->cond));
 pthread_mutex_destroy(&(thr->lock));

 free(thr);
}

void stop(thread *thr)
{
 unsigned short i;
 thr->quit=1;

 pthread_cond_signal(&(thr->cond));

 for(i=0;thr->quit && i<10;i++)
 {
   if(i>=10)
   {
	pthread_kill(thr->t,9);
	break;
   }
   usleep(400);
  }
}

void wakeForWork(thread *thr,function func,argtype a)
{
 thr->f=func;
 thr->arg=a;

 pthread_cond_signal(&(thr->cond));
}

argtype runner(void* arg)
{
 thread *ptr=(thread *)arg;

 while(1)
 {
  pthread_mutex_lock(&(ptr->lock));

  if(ptr->p)
   putThread(ptr->p,ptr);

  pthread_cond_wait(&(ptr->cond),&(ptr->lock));

  if(ptr->quit) break;

  ptr->f((void *)ptr->arg);

  pthread_mutex_unlock(&(ptr->lock));
 }

 ptr->quit=0;

 return NULL;
}


void initPool(threadPool **pool,unsigned short numthreads)
{
 thread **thr;
 threadPool *p=(threadPool *)malloc(sizeof(threadPool));

 if(!p)
 {
  fprintf(stderr,"Can not get memory to create threadpool. Quitting\n");
  exit(1);
 }
 
 if(!pool)
 {
  free(p);
  return;
 }

 *pool=p;

 pthread_mutex_init(&(p->lock), NULL);
 pthread_cond_init(&(p->cond), NULL);

 p->n=numthreads;
 p->freeCount=0;
 p->n=numthreads;

 thr=(thread **)malloc(numthreads*sizeof(thread *));

 if(!thr)
 {
  fprintf(stderr,"Can not get memory to create pool of threads. Quitting\n");
  exit(1);
 }

 p->pool=(void *)thr;

}

void deletePool(threadPool **pool)
{
 threadPool *p=(threadPool *)pool;

 if(!pool) return;

 thread **thr=(thread **)p->pool;
 unsigned short i;

 for(i=0;i<p->n;i++) stop(thr[i]);

 free(p->pool);

 pthread_cond_destroy(&(p->cond));
 pthread_mutex_destroy(&(p->lock));

 free(p);

}

void putThread(threadPool *p,thread *t)
{
 unsigned short i;
 thread **pool;

 if(!p || !t) return;

 pool=(thread **)p->pool;

 pthread_mutex_lock(&(p->lock));

 i=p->freeCount;
 pool[(p->freeCount)++]=t;

 if(i<=0)pthread_cond_signal(&(p->cond));

 pthread_mutex_unlock(&(p->lock));

}

thread *getThread(threadPool *p)
{
 thread *t,**t1;

 if(!p) return NULL;

 t1=(thread **)p->pool;

 pthread_mutex_lock(&(p->lock));

 if((p->freeCount)<=0)pthread_cond_wait(&(p->cond),&(p->lock));

 t=t1[--(p->freeCount)];

 pthread_mutex_unlock(&(p->lock));

 return t;

}
---------------------------(end of broadcast)---------------------------
TIP 6: Have you searched our list archives?

http://archives.postgresql.org

Reply via email to