06.11.2013 22:00, Rainer Gerhards:
OK, I have begun to merge the patch and I really like it.
Unfortunately, there seems to be one case where it does not work. I
could not yet test this, maybe you can have a look into it. The
problem, I think, occurs with DA queues. If we look into queue.c, the
*same* condition variable is used for regular and DA workers, which
reside in different pools. This is so that when we need to go to disk,
the disk queue is also properly processed. I think (again, could not
yet verify) this can lead to stalls. Note that will NOT occur with
pure disk queues, just with pure DA queues. It would be great if you
could look into this, if not, I'll do as soon as I find a bit of time.
I will try to test it tomorrow, but the test can be unreliable.
However, it should probably work, because the condition is used in
wtpAdviseMaxWorkers() and wtpShutdownAll(), only.
wtpShutdownAll() is called separately for each thread pool, it does much
more than simple signaling.
wtpAdviseMaxWorkers() is called from several places, but most of them
are related to DA shutdown. One place where it is used for all workers
is qqueueAdviseMaxWorkers(). And there, it is called separately for
regular and DA pool. It should be OK, but I feel there is a bug, which
may, under rare circumstances, lead to stall with my patch. It is
probably a bug even without the patch.
Look, below, if it is time to activate DA worker, we call it explicitly.
But in this case we do not advise regular workers. They are likely
already running at this point, but it is not guaranteed. What if, for
example, the system is set to start additional workers when the queue is
going over high watermark? What if HighWatermark is set to 1? Regular
workers will not be started, and DA worker may fail. Thus, it is
reasonable to advise regular workers even if we are going DA.
If this argument is right, the patch is attached.
static inline rsRetVal
qqueueAdviseMaxWorkers(qqueue_t *pThis)
{
DEFiRet;
int iMaxWorkers;
ISOBJ_TYPE_assert(pThis, qqueue);
if(!pThis->bEnqOnly) {
if(pThis->bIsDA && getLogicalQueueSize(pThis) >=
pThis->iHighWtrMrk) {
DBGOPRINT((obj_t*) pThis, "(re)activating DA worker\n");
wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* disk queues have
always one worker */
} else {
if(getLogicalQueueSize(pThis) == 0) {
iMaxWorkers = 0;
} else if(pThis->qType == QUEUETYPE_DISK ||
pThis->iMinMsgsPerWrkr == 0) {
iMaxWorkers = 1;
} else {
iMaxWorkers = getLogicalQueueSize(pThis) /
pThis->iMinMsgsPerWrkr + 1;
}
wtpAdviseMaxWorkers(pThis->pWtpReg, iMaxWorkers);
}
}
RETiRet;
}
--
Pavel Levshin
diff --git a/runtime/queue.c b/runtime/queue.c
index 66cb721..4283ef5 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -351,16 +351,15 @@ qqueueAdviseMaxWorkers(qqueue_t *pThis)
if(pThis->bIsDA && getLogicalQueueSize(pThis) >=
pThis->iHighWtrMrk) {
DBGOPRINT((obj_t*) pThis, "(re)activating DA worker\n");
wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* disk queues
have always one worker */
+ }
+ if(getLogicalQueueSize(pThis) == 0) {
+ iMaxWorkers = 0;
+ } else if(pThis->qType == QUEUETYPE_DISK ||
pThis->iMinMsgsPerWrkr == 0) {
+ iMaxWorkers = 1;
} else {
- if(getLogicalQueueSize(pThis) == 0) {
- iMaxWorkers = 0;
- } else if(pThis->qType == QUEUETYPE_DISK ||
pThis->iMinMsgsPerWrkr == 0) {
- iMaxWorkers = 1;
- } else {
- iMaxWorkers = getLogicalQueueSize(pThis) /
pThis->iMinMsgsPerWrkr + 1;
- }
- wtpAdviseMaxWorkers(pThis->pWtpReg, iMaxWorkers);
+ iMaxWorkers = getLogicalQueueSize(pThis) /
pThis->iMinMsgsPerWrkr + 1;
}
+ wtpAdviseMaxWorkers(pThis->pWtpReg, iMaxWorkers);
}
RETiRet;
_______________________________________________
rsyslog mailing list
http://lists.adiscon.net/mailman/listinfo/rsyslog
http://www.rsyslog.com/professional-services/
What's up with rsyslog? Follow https://twitter.com/rgerhards
NOTE WELL: This is a PUBLIC mailing list, posts are ARCHIVED by a myriad of
sites beyond our control. PLEASE UNSUBSCRIBE and DO NOT POST if you DON'T LIKE
THAT.