Hello.

Currently, worker threads are started as needed, but they virtually never can be stopped. The algorithm is as follows:

1. First thread is started when there is at least 1 message in the queue. Additional thread N is started when the queue has at least (N-1)*WorkerThreadMinimumMessages. 2. Worker thread can stop when it has been sleeping for a while, this is controlled via QueueWorkerTimeoutThreadShutdown parameter. 3. When there is some work in the queue, sleeping threads are awoken, one at a time. There is no way to select most appropriate thread to awake. Furthermore, they are awoken even when there is already enough workers. Therefore, all threads are awoken in round-robin fashion, and they will never reach timeout while there is some traffic in the queue.

Having too many threads is not good for performance, because it has additional overhead. Here is my proposed patch to make thread pool shrinkable. It already works for me on a loaded server. Basically, this patch always selects the same threads to wake up, and limits number of running threads to advised maximum. In that way, unneeded threads are able to sleep up to timeout. In general, it makes rsyslog behave closer to docs.

This modification has an important consequence: if one thread cannot cope with traffic, the queue is almost always has more than (N-1)*WorkerThreadMinimumMessages. It reduces overhead, because threads are able to fetch more messages in each batch. On the other hand, this increases latency. It this is an issue, WorkerThreadMinimumMessage can be set to lower value. Or, maybe, formula for iMaxWorkers could be changed.


--
Pavel Levshin

diff --git a/runtime/wti.c b/runtime/wti.c
index f91fb5a..77197a9 100644
--- a/runtime/wti.c
+++ b/runtime/wti.c
@@ -171,6 +171,7 @@ BEGINobjDestruct(wti) /* be sure to specify the object type 
also in END and CODE
 CODESTARTobjDestruct(wti)
        /* actual destruction */
        batchFree(&pThis->batch);
+       pthread_cond_destroy(&pThis->pcondBusy);
        DESTROY_ATOMIC_HELPER_MUT(pThis->mutIsRunning);
 
        free(pThis->pszDbgHdr);
@@ -181,6 +182,7 @@ ENDobjDestruct(wti)
  */
 BEGINobjConstruct(wti) /* be sure to specify the object type also in END 
macro! */
        INIT_ATOMIC_HELPER_MUT(pThis->mutIsRunning);
+       pthread_cond_init(&pThis->pcondBusy, NULL);
 ENDobjConstruct(wti)
 
 
@@ -249,10 +251,10 @@ doIdleProcessing(wti_t *pThis, wtp_t *pWtp, int 
*pbInactivityTOOccured)
 
        if(pThis->bAlwaysRunning) {
                /* never shut down any started worker */
-               d_pthread_cond_wait(pWtp->pcondBusy, pWtp->pmutUsr);
+               d_pthread_cond_wait(&pThis->pcondBusy, pWtp->pmutUsr);
        } else {
                timeoutComp(&t, pWtp->toWrkShutdown);/* get absolute timeout */
-               if(d_pthread_cond_timedwait(pWtp->pcondBusy, pWtp->pmutUsr, &t) 
!= 0) {
+               if(d_pthread_cond_timedwait(&pThis->pcondBusy, pWtp->pmutUsr, 
&t) != 0) {
                        DBGPRINTF("%s: inactivity timeout, worker 
terminating...\n", wtiGetDbgHdr(pThis));
                        *pbInactivityTOOccured = 1; /* indicate we had a 
timeout */
                }
diff --git a/runtime/wti.h b/runtime/wti.h
index 014251f..b0dc6c9 100644
--- a/runtime/wti.h
+++ b/runtime/wti.h
@@ -37,6 +37,7 @@ struct wti_s {
        wtp_t *pWtp; /* my worker thread pool (important if only the work 
thread instance is passed! */
        batch_t batch; /* pointer to an object array meaningful for current 
user pointer (e.g. queue pUsr data elemt) */
        uchar *pszDbgHdr;       /* header string for debug messages */
+       pthread_cond_t pcondBusy; /* condition to wake up the worker, protected 
by pmutUsr in wtp */
        DEF_ATOMIC_HELPER_MUT(mutIsRunning);
 };
 
diff --git a/runtime/wtp.c b/runtime/wtp.c
index 19151e7..0326d5d 100644
--- a/runtime/wtp.c
+++ b/runtime/wtp.c
@@ -233,9 +233,9 @@ wtpShutdownAll(wtp_t *pThis, wtpState_t tShutdownCmd, 
struct timespec *ptTimeout
        /* lock mutex to prevent races (may otherwise happen during idle 
processing and such...) */
        d_pthread_mutex_lock(pThis->pmutUsr);
        wtpSetState(pThis, tShutdownCmd);
-       pthread_cond_broadcast(pThis->pcondBusy); /* wake up all workers */
        /* awake workers in retry loop */
        for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) {
+               pthread_cond_signal(&pThis->pWrkr[i]->pcondBusy);
                wtiWakeupThrd(pThis->pWrkr[i]);
        }
        d_pthread_mutex_unlock(pThis->pmutUsr);
@@ -455,7 +455,7 @@ wtpAdviseMaxWorkers(wtp_t *pThis, int nMaxWrkr)
 {
        DEFiRet;
        int nMissing; /* number workers missing to run */
-       int i;
+       int i, nRunning;
 
        ISOBJ_TYPE_assert(pThis, wtp);
 
@@ -475,7 +475,13 @@ wtpAdviseMaxWorkers(wtp_t *pThis, int nMaxWrkr)
                        CHKiRet(wtpStartWrkr(pThis));
                }
        } else {
-               pthread_cond_signal(pThis->pcondBusy);
+               /* we have needed number of workers, but they may be sleeping */
+               for(i = 0, nRunning = 0; i < pThis->iNumWorkerThreads && 
nRunning < nMaxWrkr; ++i) {
+                       if (wtiGetState(pThis->pWrkr[i]) != WRKTHRD_STOPPED) {
+                               
pthread_cond_signal(&pThis->pWrkr[i]->pcondBusy);
+                               nRunning++;
+                       }
+               }
        }
 
        
diff --git a/runtime/wtp.h b/runtime/wtp.h
index 25992f7..697722a 100644
--- a/runtime/wtp.h
+++ b/runtime/wtp.h
@@ -56,7 +56,7 @@ struct wtp_s {
        void *pUsr;             /* pointer to user object (in this case, the 
queue the wtp belongs to) */
        pthread_attr_t attrThrd;/* attribute for new threads (created just once 
and cached here) */
        pthread_mutex_t *pmutUsr;
-       pthread_cond_t *pcondBusy; /* condition the user will signal "busy 
again, keep runing" on (awakes worker) */
+       pthread_cond_t *pcondBusy; /* unused condition variable, was used to 
signal threads to wake up */
        rsRetVal (*pfChkStopWrkr)(void *pUsr, int);
        rsRetVal (*pfGetDeqBatchSize)(void *pUsr, int*); /* obtains max dequeue 
count from queue config */
        rsRetVal (*pfObjProcessed)(void *pUsr, wti_t *pWti); /* indicate user 
object is processed */
_______________________________________________
rsyslog mailing list
http://lists.adiscon.net/mailman/listinfo/rsyslog
http://www.rsyslog.com/professional-services/
What's up with rsyslog? Follow https://twitter.com/rgerhards
NOTE WELL: This is a PUBLIC mailing list, posts are ARCHIVED by a myriad of 
sites beyond our control. PLEASE UNSUBSCRIBE and DO NOT POST if you DON'T LIKE 
THAT.

Reply via email to