06.11.2013 22:00, Rainer Gerhards:

OK, I have begun to merge the patch and I really like it. Unfortunately, there seems to be one case where it does not work. I could not yet test this, maybe you can have a look into it. The problem, I think, occurs with DA queues. If we look into queue.c, the *same* condition variable is used for regular and DA workers, which reside in different pools. This is so that when we need to go to disk, the disk queue is also properly processed. I think (again, could not yet verify) this can lead to stalls. Note that will NOT occur with pure disk queues, just with pure DA queues. It would be great if you could look into this, if not, I'll do as soon as I find a bit of time.

I will try to test it tomorrow, but the test can be unreliable.

However, it should probably work, because the condition is used in wtpAdviseMaxWorkers() and wtpShutdownAll(), only.

wtpShutdownAll() is called separately for each thread pool, it does much more than simple signaling.

wtpAdviseMaxWorkers() is called from several places, but most of them are related to DA shutdown. One place where it is used for all workers is qqueueAdviseMaxWorkers(). And there, it is called separately for regular and DA pool. It should be OK, but I feel there is a bug, which may, under rare circumstances, lead to stall with my patch. It is probably a bug even without the patch.

Look, below, if it is time to activate DA worker, we call it explicitly. But in this case we do not advise regular workers. They are likely already running at this point, but it is not guaranteed. What if, for example, the system is set to start additional workers when the queue is going over high watermark? What if HighWatermark is set to 1? Regular workers will not be started, and DA worker may fail. Thus, it is reasonable to advise regular workers even if we are going DA.

If this argument is right, the patch is attached.


static inline rsRetVal
qqueueAdviseMaxWorkers(qqueue_t *pThis)
{
    DEFiRet;
    int iMaxWorkers;

    ISOBJ_TYPE_assert(pThis, qqueue);

    if(!pThis->bEnqOnly) {
if(pThis->bIsDA && getLogicalQueueSize(pThis) >= pThis->iHighWtrMrk) {
            DBGOPRINT((obj_t*) pThis, "(re)activating DA worker\n");
wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* disk queues have always one worker */
        } else {
            if(getLogicalQueueSize(pThis) == 0) {
                iMaxWorkers = 0;
} else if(pThis->qType == QUEUETYPE_DISK || pThis->iMinMsgsPerWrkr == 0) {
                iMaxWorkers = 1;
            } else {
iMaxWorkers = getLogicalQueueSize(pThis) / pThis->iMinMsgsPerWrkr + 1;
            }
            wtpAdviseMaxWorkers(pThis->pWtpReg, iMaxWorkers);
        }
    }

    RETiRet;
}

--
Pavel Levshin

diff --git a/runtime/queue.c b/runtime/queue.c
index 66cb721..4283ef5 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -351,16 +351,15 @@ qqueueAdviseMaxWorkers(qqueue_t *pThis)
                if(pThis->bIsDA && getLogicalQueueSize(pThis) >= 
pThis->iHighWtrMrk) {
                        DBGOPRINT((obj_t*) pThis, "(re)activating DA worker\n");
                        wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* disk queues 
have always one worker */
+               }
+               if(getLogicalQueueSize(pThis) == 0) {
+                       iMaxWorkers = 0;
+               } else if(pThis->qType == QUEUETYPE_DISK || 
pThis->iMinMsgsPerWrkr == 0) {
+                       iMaxWorkers = 1;
                } else {
-                       if(getLogicalQueueSize(pThis) == 0) {
-                               iMaxWorkers = 0;
-                       } else if(pThis->qType == QUEUETYPE_DISK || 
pThis->iMinMsgsPerWrkr == 0) {
-                               iMaxWorkers = 1;
-                       } else {
-                               iMaxWorkers = getLogicalQueueSize(pThis) / 
pThis->iMinMsgsPerWrkr + 1;
-                       }
-                       wtpAdviseMaxWorkers(pThis->pWtpReg, iMaxWorkers);
+                       iMaxWorkers = getLogicalQueueSize(pThis) / 
pThis->iMinMsgsPerWrkr + 1;
                }
+               wtpAdviseMaxWorkers(pThis->pWtpReg, iMaxWorkers);
        }
 
        RETiRet;
_______________________________________________
rsyslog mailing list
http://lists.adiscon.net/mailman/listinfo/rsyslog
http://www.rsyslog.com/professional-services/
What's up with rsyslog? Follow https://twitter.com/rgerhards
NOTE WELL: This is a PUBLIC mailing list, posts are ARCHIVED by a myriad of 
sites beyond our control. PLEASE UNSUBSCRIBE and DO NOT POST if you DON'T LIKE 
THAT.

Reply via email to