Rainer Gerhards wrote:
> I am on vacation right now. But I think what happens is that the worker
> threads inherit the priority setting from the UDP listener thread. You
> probably need to change thread creation in ./runtime/wtp.c.

I hoped there would be a better method. Anyway, I've added thread
attributes in every pthread_create call, since changing just the one in
wtp.c wasn't enough.

There is one pthread_create() in plugins/imsolaris/sun_cddl.c which I
didn't touch because it seems buggy. It's using create_door_thr as
pthread_attr_t, but create_door_thr is never initialized, as far as I can
see.

The updated patch against rsyslog 5.6.2 is attached. I have only UDP
thread in real-time mode now.

I'm not sure if the code which gets the default thread properties should
go in rsyslog.c or somewhere else. It can be safely moved anywhere in the
initialization sequence, before the first pthread_create is called.

About configure check: the proper way to check for the functionality would
be to check for _XOPEN_REALTIME_THREADS preprocessor macro. That's what's
supposed to be defined if real-time thread functionality is available.
However, there is no that symbol anywhere in /usr/include on Solaris 10
(update 6 is what I checked).

The equivalent run-time check sysconf(_SC_XOPEN_REALTIME_THREADS) is
returning 1, though, so the lack of _XOPEN_REALTIME_THREADS macro is a
Solaris bug, as far as I can tell.

Therefore I'm checking for the availability of pthread_setschedparam() and
then have all real-time thread code in #ifdef HAVE_PTHREAD_SETSCHEDPARAM
blocks. It's not ideal, but I hope it works.

-- 
 .-.   .-.    Yes, I am an agent of Satan, but my duties are largely
(_  \ /  _)   ceremonial.
     |
     |        [email protected]
--- configure.ac.orig	2010-12-17 14:58:45.000000000 +0100
+++ configure.ac	2010-12-21 15:24:42.000000000 +0100
@@ -269,6 +269,36 @@
   )
 fi
 
+AC_CHECK_FUNCS(
+    [pthread_setschedparam],
+    [
+      rsyslog_have_pthread_setschedparam=yes
+    ],
+    [
+      rsyslog_have_pthread_setschedparam=no
+    ]
+)
+AC_CHECK_HEADERS(
+    [sched.h],
+    [
+      rsyslog_have_sched_h=yes
+    ],
+    [
+      rsyslog_have_sched_h=no
+    ]
+)
+if test "$rsyslog_have_pthread_setschedparam" = "yes" -a "$rsyslog_have_sched_h" = "yes"; then
+	save_LIBS=$LIBS
+	LIBS=
+	AC_SEARCH_LIBS(sched_get_priority_max, rt)
+	if test "x$ac_cv_search" != "xno"; then
+		AC_CHECK_FUNCS(sched_get_priority_max)
+	fi
+	IMUDP_LIBS=$LIBS
+	AC_SUBST(IMUDP_LIBS)
+	LIBS=$save_LIBS
+fi
+
 
 # klog
 AC_ARG_ENABLE(klog,
--- plugins/imudp/Makefile.am.orig	2010-12-17 15:02:14.000000000 +0100
+++ plugins/imudp/Makefile.am	2010-12-17 15:02:33.000000000 +0100
@@ -3,4 +3,4 @@
 imudp_la_SOURCES = imudp.c
 imudp_la_CPPFLAGS = -I$(top_srcdir) $(PTHREADS_CFLAGS) $(RSRT_CFLAGS)
 imudp_la_LDFLAGS = -module -avoid-version
-imudp_la_LIBADD = 
+imudp_la_LIBADD = $(IMUDP_LIBS)
--- plugins/imudp/imudp.c.orig	2010-12-15 16:06:56.000000000 +0100
+++ plugins/imudp/imudp.c	2010-12-27 17:48:17.000000000 +0100
@@ -35,6 +35,9 @@
 #if HAVE_SYS_EPOLL_H
 #	include <sys/epoll.h>
 #endif
+#ifdef HAVE_SCHED_H
+#	include <sched.h>
+#endif
 #include "rsyslog.h"
 #include "dirty.h"
 #include "net.h"
@@ -78,12 +81,103 @@
 					 * termination if we can not get it. -- rgerhards, 2007-12-27
 					 */
 static prop_t *pInputName = NULL;	/* our inputName currently is always "imudp", and this will hold it */
+static uchar *pszSchedPolicy = NULL;	/* scheduling policy string */
+static int iSchedPolicy;		/* scheduling policy as SCHED_xxx */
+static int iSchedPrio;			/* scheduling priority */
+static int seen_iSchedPrio = 0;		/* have we seen scheduling priority in the config file? */
 static ruleset_t *pBindRuleset = NULL;	/* ruleset to bind listener to (use system default if unspecified) */
 #define TIME_REQUERY_DFLT 2
 static int iTimeRequery = TIME_REQUERY_DFLT;/* how often is time to be queried inside tight recv loop? 0=always */
 
 /* config settings */
 
+static rsRetVal check_scheduling_priority(int report_error)
+{
+    	DEFiRet;
+
+#ifdef HAVE_SCHED_GET_PRIORITY_MAX
+	if (iSchedPrio < sched_get_priority_min(iSchedPolicy) ||
+	    iSchedPrio > sched_get_priority_max(iSchedPolicy)) {
+	    	if (report_error)
+		    	errmsg.LogError(errno, NO_ERRCODE,
+				"imudp: scheduling priority %d out of range (%d - %d)"
+				" for scheduling policy '%s' - ignoring settings",
+				iSchedPrio,
+				sched_get_priority_min(iSchedPolicy),
+				sched_get_priority_max(iSchedPolicy),
+				pszSchedPolicy);
+		ABORT_FINALIZE(RS_RET_VALIDATION_RUN);
+	}
+#endif
+
+finalize_it:
+	RETiRet;
+}
+
+/* Set scheduling priority in the supplied variable (will be iSchedPrio)
+ * and record that we have seen the directive (in seen_iSchedPrio).
+ */
+static rsRetVal set_scheduling_priority(void *pVal, int value)
+{
+	DEFiRet;
+
+	if (seen_iSchedPrio) {
+		errmsg.LogError(0, NO_ERRCODE, "directive already seen");
+		ABORT_FINALIZE(RS_RET_VALIDATION_RUN);
+	}
+	*(int *)pVal = value;
+	seen_iSchedPrio = 1;
+	if (pszSchedPolicy != NULL)
+	    	CHKiRet(check_scheduling_priority(1));
+
+finalize_it:
+	RETiRet;
+}
+
+/* Set scheduling policy in iSchedPolicy */
+static rsRetVal set_scheduling_policy(void *pVal, uchar *pNewVal)
+{
+    	int have_sched_policy = 0;
+	DEFiRet;
+
+	if (pszSchedPolicy != NULL) {
+	    	errmsg.LogError(0, NO_ERRCODE, "directive already seen");
+		ABORT_FINALIZE(RS_RET_VALIDATION_RUN);
+	}
+	*((uchar**)pVal) = pNewVal;	/* pVal is pszSchedPolicy */
+	if (0) { /* trick to use conditional compilation */
+#ifdef SCHED_FIFO
+	} else if (!strcasecmp((char*)pszSchedPolicy, "fifo")) {
+		iSchedPolicy = SCHED_FIFO;
+		have_sched_policy = 1;
+#endif
+#ifdef SCHED_RR
+	} else if (!strcasecmp((char*)pszSchedPolicy, "rr")) {
+		iSchedPolicy = SCHED_RR;
+		have_sched_policy = 1;
+#endif
+#ifdef SCHED_OTHER
+	} else if (!strcasecmp((char*)pszSchedPolicy, "other")) {
+		iSchedPolicy = SCHED_OTHER;
+		have_sched_policy = 1;
+#endif
+	} else {
+		errmsg.LogError(errno, NO_ERRCODE,
+			    "imudp: invalid scheduling policy '%s' "
+			    "- ignoring setting", pszSchedPolicy);
+	}
+	if (have_sched_policy == 0) {
+	    	free(pszSchedPolicy);
+		pszSchedPolicy = NULL;
+		ABORT_FINALIZE(RS_RET_VALIDATION_RUN);
+	}
+	if (seen_iSchedPrio)
+	    	CHKiRet(check_scheduling_priority(1));
+
+finalize_it:
+	RETiRet;
+}
+
 
 /* This function is called when a new listener shall be added. It takes
  * the configured parameters, tries to bind the socket and, if that
@@ -294,6 +388,41 @@
 	RETiRet;
 }
 
+static void set_thread_schedparam(void)
+{
+	struct sched_param sparam;
+
+	if (pszSchedPolicy != NULL && seen_iSchedPrio == 0) {
+		errmsg.LogError(0, NO_ERRCODE,
+			"imudp: scheduling policy set, but without priority - ignoring settings");
+	} else if (pszSchedPolicy == NULL && seen_iSchedPrio != 0) {
+		errmsg.LogError(0, NO_ERRCODE,
+			"imudp: scheduling priority set, but without policy - ignoring settings");
+	} else if (pszSchedPolicy != NULL && seen_iSchedPrio != 0 &&
+		   check_scheduling_priority(0) == 0) {
+#ifndef HAVE_PTHREAD_SETSCHEDPARAM
+	    	errmsg.LogError(0, NO_ERRCODE,
+			"imudp: cannot set thread scheduling policy, "
+			"pthread_setschedparam() not available");
+#else
+		int err;
+
+		memset(&sparam, 0, sizeof sparam);
+		sparam.sched_priority = iSchedPrio;
+		dbgprintf("imudp trying to set sched policy to '%s', prio %d\n",
+			  pszSchedPolicy, iSchedPrio);
+		err = pthread_setschedparam(pthread_self(), iSchedPolicy, &sparam);
+		if (err != 0) {
+			errmsg.LogError(err, NO_ERRCODE, "imudp: pthread_setschedparam() failed");
+		}
+#endif
+	}
+
+	if (pszSchedPolicy != NULL) {
+	    	free(pszSchedPolicy);
+		pszSchedPolicy = NULL;
+	}
+}
 
 /* This function implements the main reception loop. Depending on the environment,
  * we either use the traditional (but slower) select() or the Linux-specific epoll()
@@ -317,6 +446,7 @@
 	/* start "name caching" algo by making sure the previous system indicator
 	 * is invalidated.
 	 */
+	set_thread_schedparam();
 	bIsPermitted = 0;
 	memset(&frominetPrev, 0, sizeof(frominetPrev));
 
@@ -384,6 +514,7 @@
 	/* start "name caching" algo by making sure the previous system indicator
 	 * is invalidated.
 	 */
+	set_thread_schedparam();
 	bIsPermitted = 0;
 	memset(&frominetPrev, 0, sizeof(frominetPrev));
 	DBGPRINTF("imudp uses select()\n");
@@ -539,6 +670,10 @@
 		addListner, NULL, STD_LOADABLE_MODULE_ID));
 	CHKiRet(omsdRegCFSLineHdlr((uchar *)"udpserveraddress", 0, eCmdHdlrGetWord,
 		NULL, &pszBindAddr, STD_LOADABLE_MODULE_ID));
+	CHKiRet(omsdRegCFSLineHdlr((uchar *)"imudpschedulingpolicy", 0, eCmdHdlrGetWord,
+		&set_scheduling_policy, &pszSchedPolicy, STD_LOADABLE_MODULE_ID));
+	CHKiRet(omsdRegCFSLineHdlr((uchar *)"imudpschedulingpriority", 0, eCmdHdlrInt,
+		&set_scheduling_priority, &iSchedPrio, STD_LOADABLE_MODULE_ID));
 	CHKiRet(omsdRegCFSLineHdlr((uchar *)"udpservertimerequery", 0, eCmdHdlrInt,
 		NULL, &iTimeRequery, STD_LOADABLE_MODULE_ID));
 	CHKiRet(omsdRegCFSLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler,
--- runtime/rsyslog.h.orig	2010-12-29 13:39:49.000000000 +0100
+++ runtime/rsyslog.h	2010-12-29 17:36:40.000000000 +0100
@@ -25,6 +25,7 @@
  */
 #ifndef INCLUDED_RSYSLOG_H
 #define INCLUDED_RSYSLOG_H
+#include <pthread.h>
 #include "typedefs.h"
 
 /* ############################################################# *
@@ -411,6 +412,12 @@
 #define RSFREEOBJ(x) {(x)->OID = OIDrsFreed; free(x);}
 #endif
 
+#ifdef HAVE_PTHREAD_SETSCHEDPARAM
+extern struct sched_param default_sched_param;
+extern pthread_attr_t default_thread_attr;
+extern int default_thr_sched_policy;
+#endif
+
 
 /* for the time being, we do our own portability handling here. It
  * looks like autotools either does not yet support checks for it, or
--- runtime/rsyslog.c.orig	2010-12-29 13:33:31.000000000 +0100
+++ runtime/rsyslog.c	2010-12-29 17:42:04.000000000 +0100
@@ -84,6 +84,12 @@
 #include "strgen.h"
 #include "atomic.h"
 
+#ifdef HAVE_PTHREAD_SETSCHEDPARAM
+struct sched_param default_sched_param;
+pthread_attr_t default_thread_attr;
+int default_thr_sched_policy;
+#endif
+
 /* forward definitions */
 static rsRetVal dfltErrLogger(int, uchar *errMsg);
 
@@ -138,6 +144,18 @@
 
 	if(iRefCount == 0) {
 		/* init runtime only if not yet done */
+#ifdef HAVE_PTHREAD_SETSCHEDPARAM
+	    	CHKiRet(pthread_getschedparam(pthread_self(),
+			    		      &default_thr_sched_policy,
+					      &default_sched_param));
+		CHKiRet(pthread_attr_init(&default_thread_attr));
+		CHKiRet(pthread_attr_setschedpolicy(&default_thread_attr,
+			    			    default_thr_sched_policy));
+		CHKiRet(pthread_attr_setschedparam(&default_thread_attr,
+			    			   &default_sched_param));
+		CHKiRet(pthread_attr_setinheritsched(&default_thread_attr,
+			    			     PTHREAD_EXPLICIT_SCHED));
+#endif
 		if(ppErrObj != NULL) *ppErrObj = "obj";
 		CHKiRet(objClassInit(NULL)); /* *THIS* *MUST* always be the first class initilizer being called! */
 		CHKiRet(objGetObjInterface(pObjIF)); /* this provides the root pointer for all other queries */
--- runtime/wtp.c.orig	2010-12-29 12:46:26.000000000 +0100
+++ runtime/wtp.c	2010-12-29 17:41:17.000000000 +0100
@@ -90,6 +90,12 @@
 	pthread_mutex_init(&pThis->mutWtp, NULL);
 	pthread_cond_init(&pThis->condThrdTrm, NULL);
 	pthread_attr_init(&pThis->attrThrd);
+	/* Set thread scheduling policy to default */
+#ifdef HAVE_PTHREAD_SETSCHEDPARAM
+	pthread_attr_setschedpolicy(&pThis->attrThrd, default_thr_sched_policy);
+	pthread_attr_setschedparam(&pThis->attrThrd, &default_sched_param);
+	pthread_attr_setinheritsched(&pThis->attrThrd, PTHREAD_EXPLICIT_SCHED);
+#endif
 	pthread_attr_setdetachstate(&pThis->attrThrd, PTHREAD_CREATE_DETACHED);
 	/* set all function pointers to "not implemented" dummy so that we can safely call them */
 	pThis->pfChkStopWrkr = NotImplementedDummy;
--- runtime/stream.c.orig	2010-12-29 16:45:59.000000000 +0100
+++ runtime/stream.c	2010-12-29 16:47:14.000000000 +0100
@@ -669,7 +669,13 @@
 		}
 		pThis->pIOBuf = pThis->asyncBuf[0].pBuf;
 		pThis->bStopWriter = 0;
-		if(pthread_create(&pThis->writerThreadID, NULL, asyncWriterThread, pThis) != 0)
+		if(pthread_create(&pThis->writerThreadID,
+#ifdef HAVE_PTHREAD_SETSCHEDPARAM
+			    	  &default_thread_attr,
+#else
+				  NULL,
+#endif
+				  asyncWriterThread, pThis) != 0)
 			DBGPRINTF("ERROR: stream %p cold not create writer thread\n", pThis);
 	} else {
 		/* we work synchronously, so we need to alloc a fixed pIOBuf */
--- threads.c.orig	2010-12-29 16:47:32.000000000 +0100
+++ threads.c	2010-12-29 16:56:13.000000000 +0100
@@ -220,7 +220,13 @@
 	pThis->pUsrThrdMain = thrdMain;
 	pThis->pAfterRun = afterRun;
 	pThis->bNeedsCancel = bNeedsCancel;
-	i = pthread_create(&pThis->thrdID, NULL, thrdStarter, pThis);
+	i = pthread_create(&pThis->thrdID,
+#ifdef HAVE_PTHREAD_SETSCHEDPARAM
+			   &default_thread_attr,
+#else
+			   NULL,
+#endif
+			   thrdStarter, pThis);
 	CHKiRet(llAppend(&llThrds, NULL, pThis));
 
 finalize_it:
_______________________________________________
rsyslog mailing list
http://lists.adiscon.net/mailman/listinfo/rsyslog
http://www.rsyslog.com

Reply via email to