This reworks complib's thread_pool implementation (used by opensm
dispatcher). Prevents events signaling merges, termination races,
eliminates using of broken cl_atomic stuff, reduces memory allocations
and code complexity.

Signed-off-by: Sasha Khapyorsky <[EMAIL PROTECTED]>
---
 osm/complib/cl_async_proc.c         |    1 -
 osm/complib/cl_dispatcher.c         |    2 +-
 osm/complib/cl_thread.c             |   13 --
 osm/complib/cl_threadpool.c         |  208 +++++++++++-----------------------
 osm/complib/libosmcomp.map          |    1 -
 osm/include/complib/cl_thread.h     |   16 ---
 osm/include/complib/cl_threadpool.h |   84 ++++----------
 osm/osmtest/osmt_multicast.c        |    1 +
 8 files changed, 92 insertions(+), 234 deletions(-)

diff --git a/osm/complib/cl_async_proc.c b/osm/complib/cl_async_proc.c
index 51561af..7ac96bb 100644
--- a/osm/complib/cl_async_proc.c
+++ b/osm/complib/cl_async_proc.c
@@ -55,7 +55,6 @@ cl_async_proc_construct(
 
        cl_qlist_init( &p_async_proc->item_queue );
        cl_spinlock_construct( &p_async_proc->lock );
-       cl_thread_pool_construct( &p_async_proc->thread_pool );
 }
 
 cl_status_t
diff --git a/osm/complib/cl_dispatcher.c b/osm/complib/cl_dispatcher.c
index a7c0ac7..4a1960c 100644
--- a/osm/complib/cl_dispatcher.c
+++ b/osm/complib/cl_dispatcher.c
@@ -49,6 +49,7 @@
 
 #include <stdlib.h>
 #include <complib/cl_dispatcher.h>
+#include <complib/cl_thread.h>
 #include <complib/cl_timer.h>
 
 /* give some guidance when we build our cl_pool of messages */
@@ -132,7 +133,6 @@ cl_disp_construct(
 
   cl_qlist_init( &p_disp->reg_list );
   cl_ptr_vector_construct( &p_disp->reg_vec );
-  cl_thread_pool_construct( &p_disp->worker_threads );
   cl_qlist_init( &p_disp->msg_fifo );
   cl_spinlock_construct( &p_disp->lock );
   cl_qpool_construct( &p_disp->msg_pool );
diff --git a/osm/complib/cl_thread.c b/osm/complib/cl_thread.c
index f131480..eecc7d6 100644
--- a/osm/complib/cl_thread.c
+++ b/osm/complib/cl_thread.c
@@ -39,7 +39,6 @@
 
 #include <stdio.h>
 #include <unistd.h>
-#include <sys/sysinfo.h>
 #include <complib/cl_thread.h>
 
 /*
@@ -129,18 +128,6 @@ cl_thread_stall(
        usleep( pause_us );
 }
 
-uint32_t
-cl_proc_count( void )
-{
-       uint32_t ret;
-
-       ret = get_nprocs();
-       if( !ret)
-               return 1;/* Workaround for PPC where get_nprocs() returns 0 */
-
-       return ret;
-}
-
 boolean_t
 cl_is_current_thread(
        IN      const cl_thread_t* const        p_thread )
diff --git a/osm/complib/cl_threadpool.c b/osm/complib/cl_threadpool.c
index ff8bf90..ca4e261 100644
--- a/osm/complib/cl_threadpool.c
+++ b/osm/complib/cl_threadpool.c
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2004-2006 Voltaire, Inc. All rights reserved.
+ * Copyright (c) 2004-2007 Voltaire, Inc. All rights reserved.
  * Copyright (c) 2002-2005 Mellanox Technologies LTD. All rights reserved.
  * Copyright (c) 1996-2003 Intel Corporation. All rights reserved.
  *
@@ -49,134 +49,85 @@
 
 #include <stdlib.h>
 #include <string.h>
+#include <pthread.h>
+#include <sys/sysinfo.h>
 #include <complib/cl_threadpool.h>
-#include <complib/cl_atomic.h>
 
-void
-__cl_thread_pool_routine(
-       IN      void* const     context )
+static int proc_count( void )
 {
-       cl_status_t                     status = CL_SUCCESS;
-       cl_thread_pool_t        *p_thread_pool = (cl_thread_pool_t*)context;
-
-       /* Continue looping until signalled to end. */
-       while( !p_thread_pool->exit )
-       {
-               /* Wait for the specified event to occur. */
-               status = cl_event_wait_on( &p_thread_pool->wakeup_event, 
-                                                       EVENT_NO_TIMEOUT, TRUE 
);
-
-               /* See if we've been signalled to end execution. */
-               if( (p_thread_pool->exit) || (status == CL_NOT_DONE) )
-                       break;
-
-               /* The event has been signalled.  Invoke the callback. */
-               (*p_thread_pool->pfn_callback)( (void*)p_thread_pool->context );
-       }
+       int ret = get_nprocs();
+       if (!ret)
+               return 1;/* Workaround for PPC where get_nprocs() returns 0 */
+       return ret;
+}
 
-       /*
-        * Decrement the running count to notify the destroying thread
-        * that the event was received and processed.
-        */
-       cl_atomic_dec( &p_thread_pool->running_count );
-       cl_event_signal( &p_thread_pool->destroy_event );
+static void cleanup_mutex(void *arg)
+{
+       pthread_mutex_unlock(&((cl_thread_pool_t *)arg)->mutex);
 }
 
-void
-cl_thread_pool_construct(
-       IN      cl_thread_pool_t* const p_thread_pool )
+static void *thread_pool_routine(void* context)
 {
-       CL_ASSERT( p_thread_pool);
+       cl_thread_pool_t *p_thread_pool = (cl_thread_pool_t*)context;
+
+       do {
+               pthread_mutex_lock(&p_thread_pool->mutex);
+               pthread_cleanup_push(cleanup_mutex, p_thread_pool);
+               while(!p_thread_pool->events)
+                       pthread_cond_wait(&p_thread_pool->cond,
+                                         &p_thread_pool->mutex);
+               p_thread_pool->events--;
+               pthread_cleanup_pop(1);
+               /* The event has been signalled.  Invoke the callback. */
+               (*p_thread_pool->pfn_callback)(p_thread_pool->context);
+       } while (1);
 
-       memset( p_thread_pool, 0, sizeof(cl_thread_pool_t) );
-       cl_event_construct( &p_thread_pool->wakeup_event );
-       cl_event_construct( &p_thread_pool->destroy_event );
-       cl_list_construct( &p_thread_pool->thread_list );
-       p_thread_pool->state = CL_UNINITIALIZED;
+       return NULL;
 }
 
 cl_status_t
 cl_thread_pool_init(
-       IN      cl_thread_pool_t* const         p_thread_pool,
-       IN      uint32_t                                        count,
-       IN      cl_pfn_thread_callback_t        pfn_callback,
-       IN      const void* const                       context,
-       IN      const char* const                       name )
+       IN cl_thread_pool_t* const p_thread_pool,
+       IN unsigned count,
+       IN void (*pfn_callback)(void*),
+       IN void *context,
+       IN const char* const name )
 {
-       cl_status_t     status;
-       cl_thread_t     *p_thread;
-       uint32_t        i;
+       int i;
 
        CL_ASSERT( p_thread_pool );
        CL_ASSERT( pfn_callback );
 
-       cl_thread_pool_construct( p_thread_pool );
+       memset(p_thread_pool, 0, sizeof(*p_thread_pool));
 
-       if( !count )
-               count = cl_proc_count();
+       if(!count)
+               count = proc_count();
 
-       status = cl_list_init( &p_thread_pool->thread_list, count );
-       if( status != CL_SUCCESS )
-       {
-               cl_thread_pool_destroy( p_thread_pool );
-               return( status );
-       }
+       pthread_mutex_init(&p_thread_pool->mutex, NULL);
+       pthread_cond_init(&p_thread_pool->cond, NULL);
 
-       /* Initialize the event that the threads wait on. */
-       status = cl_event_init( &p_thread_pool->wakeup_event, FALSE );
-       if( status != CL_SUCCESS )
-       {
-               cl_thread_pool_destroy( p_thread_pool );
-               return( status );
-       }
+       p_thread_pool->events = 0;
 
-       /* Initialize the event used to destroy the threadpool. */
-       status = cl_event_init( &p_thread_pool->destroy_event, FALSE );
-       if( status != CL_SUCCESS )
-       {
+       p_thread_pool->pfn_callback = pfn_callback;
+       p_thread_pool->context = context;
+
+       p_thread_pool->tid = calloc(count, sizeof(*p_thread_pool->tid));
+       if (!p_thread_pool->tid) {
                cl_thread_pool_destroy( p_thread_pool );
-               return( status );
+               return CL_INSUFFICIENT_MEMORY;
        }
 
-       p_thread_pool->pfn_callback = pfn_callback;
-       p_thread_pool->context = context;
+       p_thread_pool->running_count = count;
 
        for( i = 0; i < count; i++ )
        {
-               /* Create a new thread. */
-               p_thread = (cl_thread_t*)malloc( sizeof(cl_thread_t) );
-               if( !p_thread )
-               {
+               if (pthread_create(&p_thread_pool->tid[i], NULL,
+                                  thread_pool_routine, p_thread_pool) < 0) {
                        cl_thread_pool_destroy( p_thread_pool );
-                       return( CL_INSUFFICIENT_MEMORY );
+                       return CL_INSUFFICIENT_RESOURCES;
                }
-
-               cl_thread_construct( p_thread );
-
-               /*
-                * Add it to the list.  This is guaranteed to work since we
-                * initialized the list to hold at least the number of threads 
we want
-                * to store there.
-                */
-               status = cl_list_insert_head( &p_thread_pool->thread_list, 
p_thread );
-               CL_ASSERT( status == CL_SUCCESS );
-
-               /* Start the thread. */
-               status = cl_thread_init( p_thread, __cl_thread_pool_routine,
-                       p_thread_pool, name );
-               if( status != CL_SUCCESS )
-               {
-                       cl_thread_pool_destroy( p_thread_pool );
-                       return( status );
-               }
-
-               /*
-                * Increment the running count to insure that a destroying 
thread
-                * will signal all the threads.
-                */
-               cl_atomic_inc( &p_thread_pool->running_count );
        }
-       p_thread_pool->state = CL_INITIALIZED;
+
        return( CL_SUCCESS );
 }
 
@@ -184,59 +135,34 @@ void
 cl_thread_pool_destroy(
        IN      cl_thread_pool_t* const p_thread_pool )
 {
-       cl_thread_t             *p_thread;
+       int i;
 
        CL_ASSERT( p_thread_pool );
-       CL_ASSERT( cl_is_state_valid( p_thread_pool->state ) );
 
-       /* Indicate to all threads that they need to exit. */
-       p_thread_pool->exit = TRUE;
+       for (i = 0 ; i < p_thread_pool->running_count; i++)
+               if (p_thread_pool->tid[i])
+                       pthread_cancel(p_thread_pool->tid[i]);
 
-       /*
-        * Signal the threads until they have all exited.  Signalling
-        * once for each thread is not guaranteed to work since two events
-        * could release only a single thread, depending on the rate at which
-        * the events are set and how the thread scheduler processes 
notifications.
-        */
+       for (i = 0 ; i < p_thread_pool->running_count; i++)
+               if (p_thread_pool->tid[i])
+                       pthread_join(p_thread_pool->tid[i], NULL);
 
-       while( p_thread_pool->running_count )
-       {
-     cl_event_signal( &p_thread_pool->wakeup_event );
-     /*
-      * Wait for the destroy event to occur, indicating that the thread
-      * has exited.
-      */
-     cl_event_wait_on( &p_thread_pool->destroy_event,
-                       EVENT_NO_TIMEOUT, TRUE );
-   }
-
-       /*
-        * Stop each thread one at a time.  Note that this cannot be done in the
-        * above for loop because signal will wake up an unknown thread.
-        */
-       if( cl_is_list_inited( &p_thread_pool->thread_list ) )
-       {
-               while( !cl_is_list_empty( &p_thread_pool->thread_list ) )
-               {
-                       p_thread =
-                               (cl_thread_t*)cl_list_remove_head( 
&p_thread_pool->thread_list );
-                       cl_thread_destroy( p_thread );
-                       free( p_thread );
-               }
-       }
+       p_thread_pool->running_count = 0;
+       pthread_cond_destroy(&p_thread_pool->cond);
+       pthread_mutex_destroy(&p_thread_pool->mutex);
 
-       cl_event_destroy( &p_thread_pool->destroy_event );
-       cl_event_destroy( &p_thread_pool->wakeup_event );
-       cl_list_destroy( &p_thread_pool->thread_list );
-       p_thread_pool->state = CL_UNINITIALIZED;
+       p_thread_pool->events = 0;
 }
 
 cl_status_t
 cl_thread_pool_signal(
        IN      cl_thread_pool_t* const p_thread_pool )
 {
+       int ret;
        CL_ASSERT( p_thread_pool );
-       CL_ASSERT( p_thread_pool->state == CL_INITIALIZED );
-
-       return( cl_event_signal( &p_thread_pool->wakeup_event ) );
+       pthread_mutex_lock(&p_thread_pool->mutex);
+       p_thread_pool->events++;
+       ret = pthread_cond_signal(&p_thread_pool->cond);
+       pthread_mutex_unlock(&p_thread_pool->mutex);
+       return ret;
 }
diff --git a/osm/complib/libosmcomp.map b/osm/complib/libosmcomp.map
index e2e58b1..3b8c040 100644
--- a/osm/complib/libosmcomp.map
+++ b/osm/complib/libosmcomp.map
@@ -138,7 +138,6 @@ OSMCOMP_1.1 {
                cl_thread_destroy;
                cl_thread_suspend;
                cl_thread_stall;
-               cl_proc_count;
                cl_is_current_thread;
                __cl_thread_pool_routine;
                cl_thread_pool_construct;
diff --git a/osm/include/complib/cl_thread.h b/osm/include/complib/cl_thread.h
index 4752278..9635e22 100644
--- a/osm/include/complib/cl_thread.h
+++ b/osm/include/complib/cl_thread.h
@@ -312,22 +312,6 @@ cl_thread_stall(
 *      Thread, cl_thread_suspend
 *********/
 
-/****f* Component Library: Thread/cl_proc_count
-* NAME
-*      cl_proc_count
-*
-* DESCRIPTION
-*      The cl_proc_count function returns the number of processors in the 
system.
-*
-* SYNOPSIS
-*/
-uint32_t
-cl_proc_count( void );
-/*
-* RETURN VALUE
-*      Returns the number of processors in the system.
-*********/
-
 /****i* Component Library: Thread/cl_is_current_thread
 * NAME
 *      cl_is_current_thread
diff --git a/osm/include/complib/cl_threadpool.h 
b/osm/include/complib/cl_threadpool.h
index aa1e066..30b5f86 100644
--- a/osm/include/complib/cl_threadpool.h
+++ b/osm/include/complib/cl_threadpool.h
@@ -46,9 +46,8 @@
 #ifndef _CL_THREAD_POOL_H_
 #define _CL_THREAD_POOL_H_
 
-#include <complib/cl_list.h>
-#include <complib/cl_thread.h>
-#include <complib/cl_event.h>
+#include <pthread.h>
+#include <complib/cl_types.h>
 
 #ifdef __cplusplus
 #  define BEGIN_C_DECLS extern "C" {
@@ -100,15 +99,13 @@ BEGIN_C_DECLS
 */
 typedef struct _cl_thread_pool
 {
-       cl_pfn_thread_callback_t        pfn_callback;
-       const void                                      *context;
-       cl_list_t                                       thread_list;
-       cl_event_t                                      wakeup_event;
-       cl_event_t                                      destroy_event;
-       boolean_t                                       exit;
-       cl_state_t                                      state;
-       atomic32_t                                      running_count;
-
+       void (*pfn_callback)(void*);
+       void *context;
+       unsigned running_count;
+       unsigned events;
+       pthread_cond_t cond;
+       pthread_mutex_t mutex;
+       pthread_t *tid;
 } cl_thread_pool_t;
 /*
 * FIELDS
@@ -118,58 +115,23 @@ typedef struct _cl_thread_pool
 *      context
 *              Context to pass to the thread callback function.
 *
-*      thread_list
-*              List of threads managed by the thread pool.
-*
-*      event
-*              Event used to signal threads to wake up and do work.
-*
-*      destroy_event
-*              Event used to signal threads to exit.
-*
-*      exit
-*              Flag used to indicates threads to exit.
-*
-*      state
-*              State of the thread pool.
-*
 *      running_count
 *              Number of threads running.
 *
-* SEE ALSO
-*      Thread Pool
-*********/
-
-/****f* Component Library: Thread Pool/cl_thread_pool_construct
-* NAME
-*      cl_thread_pool_construct
+*      events
+*              events counter
 *
-* DESCRIPTION
-*      The cl_thread_pool_construct function initializes the state of a
-*      thread pool.
+*      mutex
+*              mutex for cond variable protection
 *
-* SYNOPSIS
-*/
-void
-cl_thread_pool_construct(
-       IN      cl_thread_pool_t* const p_thread_pool );
-/*
-* PARAMETERS
-*      p_thread_pool
-*              [in] Pointer to a thread pool structure.
+*      cond
+*              conditional variable to signal an event to thread
 *
-* RETURN VALUE
-*      This function does not return a value.
-*
-* NOTES
-*      Allows calling cl_thread_pool_destroy without first calling
-*      cl_thread_pool_init.
-*
-*      Calling cl_thread_pool_construct is a prerequisite to calling any other
-*      thread pool function except cl_thread_pool_init.
+*      tid
+*              array of allocated thread ids.
 *
 * SEE ALSO
-*      Thread Pool, cl_thread_pool_init, cl_thread_pool_destroy
+*      Thread Pool
 *********/
 
 /****f* Component Library: Thread Pool/cl_thread_pool_init
@@ -184,11 +146,11 @@ cl_thread_pool_construct(
 */
 cl_status_t
 cl_thread_pool_init(
-       IN      cl_thread_pool_t* const         p_thread_pool,
-       IN      uint32_t                                        thread_count,
-       IN      cl_pfn_thread_callback_t        pfn_callback,
-       IN      const void* const                       context,
-       IN      const char* const                       name );
+       IN cl_thread_pool_t* const p_thread_pool,
+       IN unsigned count,
+       IN void (*pfn_callback)(void*),
+       IN void *context,
+       IN const char* const name );
 /*
 * PARAMETERS
 *      p_thread_pool
diff --git a/osm/osmtest/osmt_multicast.c b/osm/osmtest/osmt_multicast.c
index d5519eb..724a0bb 100644
--- a/osm/osmtest/osmt_multicast.c
+++ b/osm/osmtest/osmt_multicast.c
@@ -51,6 +51,7 @@
 #include <string.h>
 #include <complib/cl_debug.h>
 #include <complib/cl_map.h>
+#include <complib/cl_list.h>
 #include "osmtest.h"
 
 /**********************************************************************
-- 
1.5.0.1.40.gb40d


_______________________________________________
openib-general mailing list
[email protected]
http://openib.org/mailman/listinfo/openib-general

To unsubscribe, please visit http://openib.org/mailman/listinfo/openib-general

Reply via email to