and here's the patch


Andrew Dunstan wrote:

This patch implements the protocol Tom suggested for writing to the syslogger pipe. It seems to pass my tests (basically "make installcheck" against a server with stderr redirection turned on and log_statement set to 'all').

The effect of this should be to prevent two problems:
. partial messages get written to the log file, which messes with rotation, and
. messages from various backends get interleaved, causing garbled logs.

Please review ASAP. I want to get this applied soon so that a) it gets wider testing and b) I can use it as the basis for the adapted CSV log patch. If this is acceptable I intend to backpatch this all the way to wherever we started using the syslogger pipe (was that 8.0?).

cheers

andrew


Index: src/backend/postmaster/syslogger.c
===================================================================
RCS file: /cvsroot/pgsql/src/backend/postmaster/syslogger.c,v
retrieving revision 1.31
diff -c -r1.31 syslogger.c
*** src/backend/postmaster/syslogger.c	4 Jun 2007 22:21:42 -0000	1.31
--- src/backend/postmaster/syslogger.c	13 Jun 2007 15:38:07 -0000
***************
*** 42,47 ****
--- 42,48 ----
  #include "utils/guc.h"
  #include "utils/ps_status.h"
  #include "utils/timestamp.h"
+ #include "lib/stringinfo.h"
  
  /*
   * We really want line-buffered mode for logfile output, but Windows does
***************
*** 54,59 ****
--- 55,77 ----
  #define LBF_MODE	_IOLBF
  #endif
  
+ /* try not to break chunked messages into multiple reads */
+ #if PIPE_BUF > 1024
+ #define READ_SIZE PIPE_BUF
+ #else
+ #define READ_SIZE 1024
+ #endif
+ 
+ /* 
+  * we use a buffer twice as big as a read so that if there is a fragment left
+  * after processing what is read we can save it and copy it back before the 
+  * next read.
+  */
+ #define READ_BUF_SIZE 2 * READ_SIZE
+ 
+ /* buffer to keep any partial chunks read between calls to read()/ReadFile() */
+ static char * read_fragment[READ_SIZE];
+ static int read_fragment_len = 0;
  
  /*
   * GUC parameters.	Redirect_stderr cannot be changed after postmaster
***************
*** 75,89 ****
   * Private state
   */
  static pg_time_t next_rotation_time;
- 
  static bool redirection_done = false;
- 
  static bool pipe_eof_seen = false;
- 
  static FILE *syslogFile = NULL;
- 
  static char *last_file_name = NULL;
  
  /* These must be exported for EXEC_BACKEND case ... annoying */
  #ifndef WIN32
  int			syslogPipe[2] = {-1, -1};
--- 93,117 ----
   * Private state
   */
  static pg_time_t next_rotation_time;
  static bool redirection_done = false;
  static bool pipe_eof_seen = false;
  static FILE *syslogFile = NULL;
  static char *last_file_name = NULL;
  
+ /* 
+  * buffers for saving partial messages from different backends. We don't expect
+  * that there will be very many outstanding at one time, so 20 seems plenty of
+  * leeway. If this array gets full we won't lose messages, but we will lose
+  * the protocol protection against them being partially written or interleaved.
+  */
+ typedef struct
+ {
+ 	pid_t pid;
+     StringInfoData data;
+ } save_buffer;
+ #define CHUNK_SLOTS 20
+ static save_buffer saved_chunks[CHUNK_SLOTS];
+ 
  /* These must be exported for EXEC_BACKEND case ... annoying */
  #ifndef WIN32
  int			syslogPipe[2] = {-1, -1};
***************
*** 117,123 ****
  static void set_next_rotation_time(void);
  static void sigHupHandler(SIGNAL_ARGS);
  static void sigUsr1Handler(SIGNAL_ARGS);
! 
  
  /*
   * Main entry point for syslogger process
--- 145,151 ----
  static void set_next_rotation_time(void);
  static void sigHupHandler(SIGNAL_ARGS);
  static void sigUsr1Handler(SIGNAL_ARGS);
! static void write_chunk(const char * buffer, int count);
  
  /*
   * Main entry point for syslogger process
***************
*** 244,250 ****
  		bool		time_based_rotation = false;
  
  #ifndef WIN32
! 		char		logbuffer[1024];
  		int			bytesRead;
  		int			rc;
  		fd_set		rfds;
--- 272,278 ----
  		bool		time_based_rotation = false;
  
  #ifndef WIN32
! 		char		logbuffer[READ_BUF_SIZE];
  		int			bytesRead;
  		int			rc;
  		fd_set		rfds;
***************
*** 325,332 ****
  		}
  		else if (rc > 0 && FD_ISSET(syslogPipe[0], &rfds))
  		{
  			bytesRead = piperead(syslogPipe[0],
! 								 logbuffer, sizeof(logbuffer));
  
  			if (bytesRead < 0)
  			{
--- 353,363 ----
  		}
  		else if (rc > 0 && FD_ISSET(syslogPipe[0], &rfds))
  		{
+ 			Assert (read_fragment_len <= READ_SIZE);
+ 
+ 			memcpy(logbuffer, read_fragment, read_fragment_len);
  			bytesRead = piperead(syslogPipe[0],
! 								 logbuffer + read_fragment_len, READ_SIZE);
  
  			if (bytesRead < 0)
  			{
***************
*** 337,343 ****
  			}
  			else if (bytesRead > 0)
  			{
! 				write_syslogger_file(logbuffer, bytesRead);
  				continue;
  			}
  			else
--- 368,374 ----
  			}
  			else if (bytesRead > 0)
  			{
! 				write_syslogger_file(logbuffer, bytesRead + read_fragment_len);
  				continue;
  			}
  			else
***************
*** 349,354 ****
--- 380,389 ----
  				 * and all backends are shut down, and we are done.
  				 */
  				pipe_eof_seen = true;
+ 
+ 				/* if there's a fragment left then force it out now */
+ 				if (read_fragment_len)
+ 					write_chunk(read_fragment, read_fragment_len);
  			}
  		}
  #else							/* WIN32 */
***************
*** 622,631 ****
--- 657,821 ----
   * This is exported so that elog.c can call it when am_syslogger is true.
   * This allows the syslogger process to record elog messages of its own,
   * even though its stderr does not point at the syslog pipe.
+  *
+  * This routine processes the log pipe protocol which sends log messages as
+  * chunks - such chunks are detected and reassembled here. 
+  * The protocol has a header that starts with two nul bytes, then has a 16 bit
+  * length, the pid of the sending process, and a flag to indicate if it is 
+  * the last chunk in a message. Incomplete chunks are saved until we read some
+  * more, and non-final chunks are accumulated until we get the final chunk.
+  *
+  * All of this is to avoid 2 problems:
+  * . partial messages being written to logfiles, (messes rotation) and
+  * . messages from different backends being interleaved (messages garbled).
+  *
+  * Any non-protocol messages are written out directly. These should only come
+  * from non-PostgreSQL sources, however (e.g. third party libraries writing to
+  * stderr). This won't matter for CSV output, which will be a separate
+  * reporting channel.
   */
  void
  write_syslogger_file(const char *buffer, int count)
  {
+ 	char *cursor = (char *) buffer;
+ 	int  chunklen;
+ 	PipeProto p;
+ 
+ 	/* the buffer has any fragment we had saved, so reset the length */
+ 	read_fragment_len = 0;
+ 
+ 
+ 	while (count > 0)
+ 	{
+ 		/* not enough data even for a header? save it until we get more */
+ 		if (count < sizeof(PipeProto))
+ 		{
+ 			memcpy(read_fragment, cursor, count);
+ 			read_fragment_len = count;
+ 			return;
+ 		}
+ 		/* process protocol chunks */
+ 		if ( cursor[0] == '\0' && cursor[1] == '\0' )
+ 		{
+ 			memcpy(&p,cursor,sizeof(PipeProto));
+ 			/* save a partial chunk in the fragment buffer */
+ 			if (p.len + PIPE_DATA_OFFSET > count)
+ 			{
+ 				Assert(count <= READ_SIZE);
+ 				memcpy(read_fragment, cursor, count);
+ 				read_fragment_len = count;
+ 				return;
+ 			}
+ 			/* 
+ 			 * save a complete non-final chunk in the per-pid buffer 
+ 			 * if possible - if not just write it out.
+ 			 */
+ 			else if ( p.is_last != 't')
+ 			{
+ 				int free_slot = -1, existing_slot = -1;
+ 				int i;
+ 				StringInfo str;
+ 
+ 				Assert (p.is_last == 'f');
+ 
+ 				for (i = 0; i < CHUNK_SLOTS; i++)
+ 				{
+ 					if (saved_chunks[i].pid == 0 && free_slot < 0)
+ 						free_slot = i;
+ 					if (saved_chunks[i].pid == p.pid)
+ 					{
+ 						existing_slot = i;
+ 						break;
+ 					}
+ 				}
+ 				if (existing_slot > -1)
+ 				{
+ 					str = &(saved_chunks[existing_slot].data);
+ 					appendBinaryStringInfo(str, cursor + PIPE_DATA_OFFSET, 
+ 										   p.len);
+ 				}
+ 				else if (free_slot > -1)
+ 				{
+ 					saved_chunks[free_slot].pid = p.pid;
+ 					str = &(saved_chunks[free_slot].data);
+ 					initStringInfo(str);
+ 					appendBinaryStringInfo(str, cursor + PIPE_DATA_OFFSET, 
+ 										   p.len);
+ 				}
+ 				else
+ 				{
+ 					/* 
+ 					 * if there is no exisiting or free slot we'll just have to
+ 					 * take our chances and write out a part message and hope
+ 					 * that it's not followed by something from another pid.
+ 					 */
+ 					write_chunk(cursor + PIPE_DATA_OFFSET, p.len);
+ 				}
+ 			}
+ 			/* 
+ 			 * add a final chunk to anything saved for that pid, and either way
+ 			 * write the whole thing out.
+ 			 */			   
+ 			else
+ 			{
+ 				int existing_slot = -1;
+ 				int i;
+ 				for (i = 0; i < CHUNK_SLOTS; i++)
+ 				{
+ 					if (saved_chunks[i].pid == p.pid)
+ 					{
+ 						existing_slot = i;
+ 						break;
+ 					}
+ 				}
+ 				if (existing_slot > -1)
+ 				{
+ 					appendBinaryStringInfo(&(saved_chunks[existing_slot].data),
+ 										   cursor + PIPE_DATA_OFFSET, p.len);
+ 					write_chunk(saved_chunks[existing_slot].data.data,
+ 								saved_chunks[existing_slot].data.len);
+ 					saved_chunks[existing_slot].pid = 0;
+ 					pfree(saved_chunks[existing_slot].data.data);
+ 				}
+ 				else
+ 				{
+ 					/* the whole message was one chunk, probably. */
+ 					write_chunk(cursor + PIPE_DATA_OFFSET, p.len);
+ 				}
+ 			}
+ 			
+ 			count -= PIPE_DATA_OFFSET + p.len;
+ 			cursor += PIPE_DATA_OFFSET + p.len;
+ 		}
+ 		/* process non-protocol chunks */
+ 		else 
+ 		{
+ 			/* look for the start of a protocol header */
+ 			for(chunklen = 1; chunklen + 1 < count; chunklen++)
+ 			{
+ 				if (cursor[chunklen] == '\0' && cursor[chunklen + 1] == '\0')
+ 				{
+ 					write_chunk(cursor, chunklen);
+ 					cursor += chunklen;
+ 					count -= chunklen;
+ 					break;
+ 				}
+ 			}
+ 			/* if no protocol header, write out the whole remaining buffer */
+ 			if (chunklen + 1 >= count)
+ 			{
+ 				write_chunk(cursor, count);
+ 				read_fragment_len = 0;
+ 				return;
+ 			}
+ 		}
+ 	}
+ 	
+ }
+ 
+ void
+ write_chunk(const char *buffer, int count)
+ {
  	int			rc;
  
  #ifndef WIN32
***************
*** 654,664 ****
  pipeThread(void *arg)
  {
  	DWORD		bytesRead;
! 	char		logbuffer[1024];
  
  	for (;;)
  	{
! 		if (!ReadFile(syslogPipe[0], logbuffer, sizeof(logbuffer),
  					  &bytesRead, 0))
  		{
  			DWORD		error = GetLastError();
--- 844,856 ----
  pipeThread(void *arg)
  {
  	DWORD		bytesRead;
! 	char		logbuffer[READ_BUF_SIZE];
  
  	for (;;)
  	{
! 		Assert (read_fragment_len <= READ_SIZE);
! 		memcpy(logbuffer, read_buffer, read_fragment_len);
! 		if (!ReadFile(syslogPipe[0], logbuffer + read_fragment_len, READ_SIZE,
  					  &bytesRead, 0))
  		{
  			DWORD		error = GetLastError();
***************
*** 672,682 ****
  					 errmsg("could not read from logger pipe: %m")));
  		}
  		else if (bytesRead > 0)
! 			write_syslogger_file(logbuffer, bytesRead);
  	}
  
  	/* We exit the above loop only upon detecting pipe EOF */
  	pipe_eof_seen = true;
  	_endthread();
  	return 0;
  }
--- 864,879 ----
  					 errmsg("could not read from logger pipe: %m")));
  		}
  		else if (bytesRead > 0)
! 			write_syslogger_file(logbuffer, bytesRead + read_fragment_len);
  	}
  
  	/* We exit the above loop only upon detecting pipe EOF */
  	pipe_eof_seen = true;
+ 
+ 	/* if there's a fragment left then force it out now */
+ 	if (read_fragment_len)
+ 		write_chunk(read_fragment, read_fragment_len);
+ 
  	_endthread();
  	return 0;
  }
Index: src/backend/utils/error/elog.c
===================================================================
RCS file: /cvsroot/pgsql/src/backend/utils/error/elog.c,v
retrieving revision 1.186
diff -c -r1.186 elog.c
*** src/backend/utils/error/elog.c	7 Jun 2007 21:45:59 -0000	1.186
--- src/backend/utils/error/elog.c	13 Jun 2007 15:38:08 -0000
***************
*** 56,61 ****
--- 56,62 ----
  #ifdef HAVE_SYSLOG
  #include <syslog.h>
  #endif
+ #include <limits.h>
  
  #include "access/transam.h"
  #include "access/xact.h"
***************
*** 71,76 ****
--- 72,78 ----
  #include "utils/ps_status.h"
  
  
+ 
  /* Global variables */
  ErrorContextCallback *error_context_stack = NULL;
  
***************
*** 124,129 ****
--- 126,135 ----
  static const char *error_severity(int elevel);
  static void append_with_tabs(StringInfo buf, const char *str);
  static bool is_log_level_output(int elevel, int log_min_level);
+ static void write_pipe_chunks(int fd, char * data, int len);
+ 
+ /* allow space for preamble plus a little head room */
+ #define MAX_CHUNK (sizeof(PipeChunk) - sizeof(PipeProto))
  
  
  /*
***************
*** 1783,1789 ****
  			write_eventlog(edata->elevel, buf.data);
  		else
  #endif
! 			fprintf(stderr, "%s", buf.data);
  	}
  
  	/* If in the syslogger process, try to write messages direct to file */
--- 1789,1798 ----
  			write_eventlog(edata->elevel, buf.data);
  		else
  #endif
! 		if (Redirect_stderr)
! 			write_pipe_chunks(fileno(stderr),buf.data, buf.len);
! 		else
! 			write(fileno(stderr), buf.data, buf.len);
  	}
  
  	/* If in the syslogger process, try to write messages direct to file */
***************
*** 1794,1799 ****
--- 1803,1836 ----
  }
  
  
+ static void
+ write_pipe_chunks(int fd, char * data, int len)
+ {
+ 	PipeChunk p;
+ 
+ 	Assert(len > 0);
+ 
+ 	p.proto.nuls[0] = p.proto.nuls[1] = '\0';
+ 	p.proto.pid = MyProcPid;
+ 	p.proto.is_last = 'f';
+ 	p.proto.len = MAX_CHUNK;
+ 
+ 	/* write all but the last chunk */
+ 	while (len > MAX_CHUNK)
+ 	{
+ 		memcpy(p.proto.data, data, MAX_CHUNK);
+ 		write(fd, &p, PIPE_DATA_OFFSET + MAX_CHUNK );
+ 		data += MAX_CHUNK;
+ 		len -= MAX_CHUNK;
+ 	}
+ 
+ 	/* write the last chunk */
+ 	p.proto.is_last = 't';
+ 	p.proto.len = len;
+ 	memcpy(p.proto.data, data, len);
+ 	write(fd, &p, PIPE_DATA_OFFSET + len);
+ }
+ 
  /*
   * Write error report to client
   */
Index: src/include/postmaster/syslogger.h
===================================================================
RCS file: /cvsroot/pgsql/src/include/postmaster/syslogger.h,v
retrieving revision 1.8
diff -c -r1.8 syslogger.h
*** src/include/postmaster/syslogger.h	5 Jan 2007 22:19:57 -0000	1.8
--- src/include/postmaster/syslogger.h	13 Jun 2007 15:38:09 -0000
***************
*** 37,40 ****
--- 37,64 ----
  extern void SysLoggerMain(int argc, char *argv[]);
  #endif
  
+ /* 
+  * primitive protocol structure for writing to syslogger pipe(s).
+  *
+  * we use 't' or 'f' instead of a bool to make the protocol a tiny bit 
+  * more robust against finding a false double nul byte prologue.
+  * But we still might find it in the len and/or pid bytes unless we're careful.
+  */
+ typedef struct 
+ {
+ 	char      nuls[2];    /* always \0\0 */
+ 	uint16    len;        /* size of this chunk */
+ 	pid_t     pid;        /* our pid */
+ 	char      is_last;    /* is this the last chunk? 't' or 'f' */
+ 	char      data[1];  
+ } PipeProto;
+ 
+ typedef union 
+ {
+ 	PipeProto    proto;
+ 	char         data[PIPE_BUF];
+ }  PipeChunk;
+ 
+ #define PIPE_DATA_OFFSET offsetof(PipeProto, data) /* 9 usually */
+ 
  #endif   /* _SYSLOGGER_H */
---------------------------(end of broadcast)---------------------------
TIP 7: You can help support the PostgreSQL project by donating at

                http://www.postgresql.org/about/donate

Reply via email to