Dražen Kačar wrote:

> Then I negated the condition in preparePipe() to:
> 
>         if(!pData->bBlocking)
>                 mode |= O_NONBLOCK;
> 
> and now strace doesn't show O_NONBLOCK and there is no sleep in select().
> I'll run more comprehensive tests just to be sure, but this looks ok.

I've tried using the blocking ompipe in real world and it's not
functioning too well.

The open function uses O_RDWR on a FIFO. That happens to yield undefined
behavior under POSIX and also some real implementations (Solaris, for
example). But I'm on Linux, so it's documented to work always.

The problem with that is that rsyslog has that FIFO opened for reading as
well, so it can write to it regardless of whether the intended reader has
opened the FIFO or not. And so we can lose messages if rsyslog reopens the
FIFO before the reader manages to open it.

So I changed the preparePipe() code with this:

    rdfd = open((char*) pData->f_fname, O_RDONLY|O_NONBLOCK|O_CLOEXEC);
    pData->fd = open((char*) pData->f_fname, O_WRONLY|O_NONBLOCK|O_CLOEXEC);
    if(pData->bBlocking) {
            long flags;

            flags = fcntl(pData->fd, F_GETFL);
            fcntl(pData->fd, F_SETFL, flags & ~O_NONBLOCK);
    }
    close(rdfd);

That will always open FIFO and it's portable.

But then, when there is no reader, ompipe is constantly trying to write
and all those writes fail with EPIPE. The writing might get suspended
after a certain number of attempts, but then there'd be a long pause
before the next attempt, which is the kind of behavior I wanted to avoid.

So I added a blocking open()[1] to the writePipe() function in case when the
write() fails with EPIPE on a blocking FIFO. The thread will block, but
the blocking FIFO is supposed to be used on its own action queue, so I
hope everything will work fine in that case.

The patch for rsyslog 5.6.2 is attached. I didn't test it on non-blocking
FIFOs, though.

Any comments?

[1] poll() would return immediately with POLLERR, so can't be used for
    this purpose.

-- 
 .-.   .-.    Yes, I am an agent of Satan, but my duties are largely
(_  \ /  _)   ceremonial.
     |
     |        [email protected]
--- tools/ompipe.c.orig	2010-12-10 14:55:34.000000000 +0100
+++ tools/ompipe.c	2010-12-10 17:36:16.000000000 +0100
@@ -65,12 +65,14 @@
 
 
 /* globals for default values */
+static int bBlocking = 0;
 /* end globals for default values */
 
 
 typedef struct _instanceData {
 	uchar	f_fname[MAXFNAME];/* pipe or template name (display only) */
 	short	fd;		  /* pipe descriptor for (current) pipe */
+	sbool	bBlocking;
 } instanceData;
 
 
@@ -98,8 +100,20 @@
 static inline rsRetVal
 preparePipe(instanceData *pData)
 {
+
+    	int rdfd;
+
 	DEFiRet;
-	pData->fd = open((char*) pData->f_fname, O_RDWR|O_NONBLOCK|O_CLOEXEC);
+
+	rdfd = open((char*) pData->f_fname, O_RDONLY|O_NONBLOCK|O_CLOEXEC);
+	pData->fd = open((char*) pData->f_fname, O_WRONLY|O_NONBLOCK|O_CLOEXEC);
+	if(pData->bBlocking) {
+	    	long flags;
+
+		flags = fcntl(pData->fd, F_GETFL);
+		fcntl(pData->fd, F_SETFL, flags & ~O_NONBLOCK);
+	}
+	close(rdfd);
 	RETiRet;
 }
 
@@ -123,10 +137,28 @@
 	}
 
 	/* create the message based on format specified */
+write_again:
 	iLenWritten = write(pData->fd, ppString[0], strlen((char*)ppString[0]));
 	if(iLenWritten < 0) {
 		int e = errno;
 		char errStr[1024];
+
+		if (pData->bBlocking && e == EPIPE) {
+		    	/* No reader. Block in open() until a reader appears. */
+			int tfd;
+
+			do {
+				tfd = open((char*) pData->f_fname, O_WRONLY|O_CLOEXEC);
+			} while (tfd == -1 && errno == EINTR);
+			if (tfd < 0) {
+			    	close(pData->fd);
+				pData->fd = -1;
+				ABORT_FINALIZE(RS_RET_SUSPENDED);
+			}
+			close(tfd);
+			goto write_again;
+		}
+
 		rs_strerror_r(errno, errStr, sizeof(errStr));
 		DBGPRINTF("pipe (%d) write error %d: %s\n", pData->fd, e, errStr);
 
@@ -197,6 +229,8 @@
 	 */
 	CHKiRet(cflineParseFileName(p, (uchar*) pData->f_fname, *ppOMSR, 0, OMSR_NO_RQD_TPL_OPTS,
 				       (pszFileDfltTplName == NULL) ? (uchar*)"RSYSLOG_FileFormat" : pszFileDfltTplName));
+	pData->bBlocking = bBlocking;
+	bBlocking = 0;
 
 	/* at this stage, we ignore the return value of preparePipe, this is taken
 	 * care of in later steps. -- rgerhards, 2009-03-19
@@ -237,6 +271,7 @@
 CODESTARTmodInit
 	*ipIFVersProvided = CURR_MOD_IF_VERSION; /* we only support the current interface specification */
 CODEmodInit_QueryRegCFSLineHdlr
+	CHKiRet(omsdRegCFSLineHdlr((uchar *)"ompipeblockingmode", 0, eCmdHdlrBinary, NULL, &bBlocking, STD_LOADABLE_MODULE_ID));
 	CHKiRet(objUse(errmsg, CORE_COMPONENT));
 ENDmodInit
 /* vi:set ai:
_______________________________________________
rsyslog mailing list
http://lists.adiscon.net/mailman/listinfo/rsyslog
http://www.rsyslog.com

Reply via email to