*** xlog.org.c	Sat Jan  1 06:59:30 2005
--- xlog.c	Tue Feb  1 18:28:16 2005
***************
*** 43,48 ****
--- 43,82 ----
  #include "utils/guc.h"
  #include "utils/relcache.h"
  
+ /*-------------------------------------------------------------------------*/
+ 
+ #ifdef O_DIRECT
+ #	define OPEN_DIRECT_FLAG		O_DIRECT
+ #endif
+ 
+ #define XLOG_MULTIPAGE_WRITER_DEBUG
+ 
+ #define ISSUE_BOOTSTRAP_MEMORYLEAK		/* XXX: memory leak? */
+ 
+ /*-------------------------------------------------------------------------*/
+ 
+ /* O_DIRECT : BEGIN */
+ 
+ /* TODO: Aligment depends on OS and filesystem. */
+ #define O_DIRECT_BUFFER_ALIGN	4096
+ 
+ /* assume sizeof(ptrdiff_t) == sizeof(void*) */
+ #define POINTERALIGN(ALIGNVAL,PTR)  \
+ 	(((ptrdiff_t) (PTR) + (ALIGNVAL-1)) & ~((ptrdiff_t) (ALIGNVAL-1)))
+ 
+ #ifdef XLOG_MULTIPAGE_WRITER_DEBUG
+ 	static char STATIC_ASSERT_POINTERSIZE[((int)(sizeof(ptrdiff_t) == sizeof(void*))) - 1];
+ #endif
+ 
+ #ifdef OPEN_DIRECT_FLAG
+ #	define XLOG_EXTRA_BUFFERS		O_DIRECT_BUFFER_ALIGN
+ #	define XLOG_BUFFERS_ALIGN(PTR)	POINTERALIGN(XLOG_EXTRA_BUFFERS, (PTR))
+ #else
+ #	define XLOG_EXTRA_BUFFERS		0
+ #	define XLOG_BUFFERS_ALIGN(PTR)	POINTERALIGN(MAXIMUM_ALIGNOF, (PTR))
+ #endif
+ 
+ /* O_DIRECT : END */
  
  /*
   * This chunk of hackery attempts to determine which file sync methods
***************
*** 465,470 ****
--- 499,583 ----
  static bool read_backup_label(XLogRecPtr *checkPointLoc);
  static void remove_backup_label(void);
  
+ /* BEGIN : XLOG_MULTIPAGE_WRITER */
+ 
+ static struct XLogMultipageData
+ {
+ 	char	*pages;		/* Head of first page */
+ 	int		 size;		/* Total bytes of pages == count(pages) * BLCKSZ */
+ 	int		 offset;	/* Offset in xlog segment file  */
+ } XLogMultipage;
+ 
+ static void XLogMultipageFlush(int index)
+ {
+ 	if (!XLogMultipage.pages)
+ 	{	/* No needs to write pages. */
+ 		XLogCtl->Write.curridx = index;
+ 		return;
+ 	}
+ 	
+ #ifdef XLOG_MULTIPAGE_WRITER_DEBUG
+ 	{
+ 		int i = (XLogCtl->Write.curridx + XLogMultipage.size / BLCKSZ + XLOGbuffers - index) % XLOGbuffers;
+ 		if (i != 0 && i != 1)
+ 			elog(PANIC, "XLogMultipageFlush (%d)", __LINE__);
+ 	}
+ #endif
+ 
+ 	/* Need to seek in the file? */
+ 	if (openLogOff != XLogMultipage.offset)
+ 	{
+ 		openLogOff = XLogMultipage.offset;
+ 		if (lseek(openLogFile, (off_t) openLogOff, SEEK_SET) < 0)
+ 			ereport(PANIC,
+ 					(errcode_for_file_access(),
+ 					 errmsg("could not seek in log file %u, segment %u to offset %u: %m",
+ 							openLogId, openLogSeg, openLogOff)));
+ 	}
+ 
+ 	/* OK to write the page */
+ 	errno = 0;
+ 	if (write(openLogFile, XLogMultipage.pages, XLogMultipage.size) != XLogMultipage.size)
+ 	{
+ 		/* if write didn't set errno, assume problem is no disk space */
+ 		if (errno == 0)
+ 			errno = ENOSPC;
+ 		ereport(PANIC,
+ 				(errcode_for_file_access(),
+ 				 errmsg("could not write to log file %u, segment %u at offset %u: %m",
+ 						openLogId, openLogSeg, openLogOff)));
+ 	}
+ 
+ #ifdef XLOG_MULTIPAGE_WRITER_DEBUG
+ 	elog(LOG, "XLogMultipageFlush writes %d pages.", XLogMultipage.size / BLCKSZ);
+ #endif
+ 
+ 	openLogOff += XLogMultipage.size;
+ 	XLogCtl->Write.curridx = index;
+ 	memset(&XLogMultipage, 0, sizeof(XLogMultipage));
+ }
+ 
+ static void XLogMultipageWrite(int index)
+ {
+ 	char *page = XLogCtl->pages + index * BLCKSZ;
+ 	int size = BLCKSZ;
+ 	int offset = (LogwrtResult.Write.xrecoff - BLCKSZ) % XLogSegSize;
+ 
+ 	if (XLogMultipage.pages + XLogMultipage.size == page
+ 		&& XLogMultipage.offset + XLogMultipage.size == offset)
+ 	{	/* Pages are continuous. Append new page. */
+ 		XLogMultipage.size += size;
+ 	}
+ 	else
+ 	{	/* Pages are not continuous. Flush and clear. */
+ 		XLogMultipageFlush(index);
+ 		XLogMultipage.pages = page;
+ 		XLogMultipage.size = size;
+ 		XLogMultipage.offset = offset;
+ 	}
+ }
+ 
+ /* END : XLOG_MULTIPAGE_WRITER */
  
  /*
   * Insert an XLOG record having the specified RMID and info bytes,
***************
*** 1139,1147 ****
  XLogWrite(XLogwrtRqst WriteRqst)
  {
  	XLogCtlWrite *Write = &XLogCtl->Write;
- 	char	   *from;
  	bool		ispartialpage;
  	bool		use_existent;
  
  	/*
  	 * Update local LogwrtResult (caller probably did this already,
--- 1252,1265 ----
  XLogWrite(XLogwrtRqst WriteRqst)
  {
  	XLogCtlWrite *Write = &XLogCtl->Write;
  	bool		ispartialpage;
  	bool		use_existent;
+ 	int			currentIndex = Write->curridx;
+ 
+ #ifdef XLOG_MULTIPAGE_WRITER_DEBUG
+ 	if (XLogMultipage.pages)
+ 		elog(PANIC, "XLogMultipage.pages not null (%d) : size=%d", __LINE__, XLogMultipage.size);
+ #endif
  
  	/*
  	 * Update local LogwrtResult (caller probably did this already,
***************
*** 1157,1170 ****
  		 * end of the last page that's been initialized by
  		 * AdvanceXLInsertBuffer.
  		 */
! 		if (!XLByteLT(LogwrtResult.Write, XLogCtl->xlblocks[Write->curridx]))
  			elog(PANIC, "xlog write request %X/%X is past end of log %X/%X",
  				 LogwrtResult.Write.xlogid, LogwrtResult.Write.xrecoff,
! 				 XLogCtl->xlblocks[Write->curridx].xlogid,
! 				 XLogCtl->xlblocks[Write->curridx].xrecoff);
  
  		/* Advance LogwrtResult.Write to end of current buffer page */
! 		LogwrtResult.Write = XLogCtl->xlblocks[Write->curridx];
  		ispartialpage = XLByteLT(WriteRqst.Write, LogwrtResult.Write);
  
  		if (!XLByteInPrevSeg(LogwrtResult.Write, openLogId, openLogSeg))
--- 1275,1288 ----
  		 * end of the last page that's been initialized by
  		 * AdvanceXLInsertBuffer.
  		 */
! 		if (!XLByteLT(LogwrtResult.Write, XLogCtl->xlblocks[currentIndex]))
  			elog(PANIC, "xlog write request %X/%X is past end of log %X/%X",
  				 LogwrtResult.Write.xlogid, LogwrtResult.Write.xrecoff,
! 				 XLogCtl->xlblocks[currentIndex].xlogid,
! 				 XLogCtl->xlblocks[currentIndex].xrecoff);
  
  		/* Advance LogwrtResult.Write to end of current buffer page */
! 		LogwrtResult.Write = XLogCtl->xlblocks[currentIndex];
  		ispartialpage = XLByteLT(WriteRqst.Write, LogwrtResult.Write);
  
  		if (!XLByteInPrevSeg(LogwrtResult.Write, openLogId, openLogSeg))
***************
*** 1172,1177 ****
--- 1290,1296 ----
  			/*
  			 * Switch to new logfile segment.
  			 */
+ 			XLogMultipageFlush(currentIndex);
  			if (openLogFile >= 0)
  			{
  				if (close(openLogFile))
***************
*** 1242,1275 ****
  		{
  			XLByteToPrevSeg(LogwrtResult.Write, openLogId, openLogSeg);
  			openLogFile = XLogFileOpen(openLogId, openLogSeg);
- 			openLogOff = 0;
  		}
  
! 		/* Need to seek in the file? */
! 		if (openLogOff != (LogwrtResult.Write.xrecoff - BLCKSZ) % XLogSegSize)
! 		{
! 			openLogOff = (LogwrtResult.Write.xrecoff - BLCKSZ) % XLogSegSize;
! 			if (lseek(openLogFile, (off_t) openLogOff, SEEK_SET) < 0)
! 				ereport(PANIC,
! 						(errcode_for_file_access(),
! 						 errmsg("could not seek in log file %u, segment %u to offset %u: %m",
! 								openLogId, openLogSeg, openLogOff)));
! 		}
! 
! 		/* OK to write the page */
! 		from = XLogCtl->pages + Write->curridx * BLCKSZ;
! 		errno = 0;
! 		if (write(openLogFile, from, BLCKSZ) != BLCKSZ)
! 		{
! 			/* if write didn't set errno, assume problem is no disk space */
! 			if (errno == 0)
! 				errno = ENOSPC;
! 			ereport(PANIC,
! 					(errcode_for_file_access(),
! 					 errmsg("could not write to log file %u, segment %u at offset %u: %m",
! 							openLogId, openLogSeg, openLogOff)));
! 		}
! 		openLogOff += BLCKSZ;
  
  		/*
  		 * If we just wrote the whole last page of a logfile segment,
--- 1361,1370 ----
  		{
  			XLByteToPrevSeg(LogwrtResult.Write, openLogId, openLogSeg);
  			openLogFile = XLogFileOpen(openLogId, openLogSeg);
  		}
  
! 		/* Add a page to buffer */
! 		XLogMultipageWrite(currentIndex);
  
  		/*
  		 * If we just wrote the whole last page of a logfile segment,
***************
*** 1281,1288 ****
  		 * This is also the right place to notify the Archiver that the
  		 * segment is ready to copy to archival storage.
  		 */
! 		if (openLogOff >= XLogSegSize && !ispartialpage)
  		{
  			issue_xlog_fsync();
  			LogwrtResult.Flush = LogwrtResult.Write;	/* end of current page */
  
--- 1376,1384 ----
  		 * This is also the right place to notify the Archiver that the
  		 * segment is ready to copy to archival storage.
  		 */
! 		if (openLogOff + XLogMultipage.size >= XLogSegSize && !ispartialpage)
  		{
+ 			XLogMultipageFlush(currentIndex);
  			issue_xlog_fsync();
  			LogwrtResult.Flush = LogwrtResult.Write;	/* end of current page */
  
***************
*** 1296,1303 ****
  			LogwrtResult.Write = WriteRqst.Write;
  			break;
  		}
! 		Write->curridx = NextBufIdx(Write->curridx);
  	}
  
  	/*
  	 * If asked to flush, do so
--- 1392,1400 ----
  			LogwrtResult.Write = WriteRqst.Write;
  			break;
  		}
! 		currentIndex = NextBufIdx(currentIndex);
  	}
+ 	XLogMultipageFlush(currentIndex);
  
  	/*
  	 * If asked to flush, do so
***************
*** 1333,1338 ****
--- 1430,1440 ----
  		LogwrtResult.Flush = LogwrtResult.Write;
  	}
  
+ #ifdef XLOG_MULTIPAGE_WRITER_DEBUG
+ 	if (Write->curridx != currentIndex)
+ 		elog(PANIC, "Write->curridx != currentIndex (%d) : %d != %d", __LINE__, Write->curridx, currentIndex);
+ #endif
+ 
  	/*
  	 * Update shared-memory status
  	 *
***************
*** 1354,1359 ****
--- 1456,1466 ----
  	}
  
  	Write->LogwrtResult = LogwrtResult;
+ 
+ #ifdef XLOG_MULTIPAGE_WRITER_DEBUG
+ 	if (XLogMultipage.pages)
+ 		elog(PANIC, "XLogMultipage.pages not null (%d) : size=%d", __LINE__, XLogMultipage.size);
+ #endif
  }
  
  /*
***************
*** 1476,1481 ****
--- 1583,1591 ----
  			 "xlog flush request %X/%X is not satisfied --- flushed only to %X/%X",
  			 record.xlogid, record.xrecoff,
  			 LogwrtResult.Flush.xlogid, LogwrtResult.Flush.xrecoff);
+ 
+ 	if (XLogMultipage.pages)
+ 		elog(PANIC, "xlog multipage-write usage error at XLogFlush");
  }
  
  /*
***************
*** 3380,3386 ****
  		XLOGbuffers = MinXLOGbuffers;
  
  	return MAXALIGN(sizeof(XLogCtlData) + sizeof(XLogRecPtr) * XLOGbuffers)
! 		+ BLCKSZ * XLOGbuffers +
  		MAXALIGN(sizeof(ControlFileData));
  }
  
--- 3490,3496 ----
  		XLOGbuffers = MinXLOGbuffers;
  
  	return MAXALIGN(sizeof(XLogCtlData) + sizeof(XLogRecPtr) * XLOGbuffers)
! 		+ XLOG_EXTRA_BUFFERS + BLCKSZ * XLOGbuffers +
  		MAXALIGN(sizeof(ControlFileData));
  }
  
***************
*** 3398,3404 ****
  		ShmemInitStruct("XLOG Ctl",
  						MAXALIGN(sizeof(XLogCtlData) +
  								 sizeof(XLogRecPtr) * XLOGbuffers)
! 						+ BLCKSZ * XLOGbuffers,
  						&foundXLog);
  	ControlFile = (ControlFileData *)
  		ShmemInitStruct("Control File", sizeof(ControlFileData), &foundCFile);
--- 3508,3514 ----
  		ShmemInitStruct("XLOG Ctl",
  						MAXALIGN(sizeof(XLogCtlData) +
  								 sizeof(XLogRecPtr) * XLOGbuffers)
! 						+ XLOG_EXTRA_BUFFERS + BLCKSZ * XLOGbuffers,
  						&foundXLog);
  	ControlFile = (ControlFileData *)
  		ShmemInitStruct("Control File", sizeof(ControlFileData), &foundCFile);
***************
*** 3426,3433 ****
  	 * buffers have worst-case alignment.
  	 */
  	XLogCtl->pages =
! 		((char *) XLogCtl) + MAXALIGN(sizeof(XLogCtlData) +
! 									  sizeof(XLogRecPtr) * XLOGbuffers);
  	memset(XLogCtl->pages, 0, BLCKSZ * XLOGbuffers);
  
  	/*
--- 3536,3543 ----
  	 * buffers have worst-case alignment.
  	 */
  	XLogCtl->pages =
! 		(char*)XLOG_BUFFERS_ALIGN(((char *) XLogCtl)
! 		+ sizeof(XLogCtlData) + sizeof(XLogRecPtr) * XLOGbuffers);
  	memset(XLogCtl->pages, 0, BLCKSZ * XLOGbuffers);
  
  	/*
***************
*** 3465,3470 ****
--- 3575,3584 ----
  	struct timeval tv;
  	crc64		crc;
  
+ #ifdef ISSUE_BOOTSTRAP_MEMORYLEAK
+ 	void* buffer0;
+ #endif
+ 
  	/*
  	 * Select a hopefully-unique system identifier code for this
  	 * installation. We use the result of gettimeofday(), including the
***************
*** 3485,3492 ****
--- 3599,3616 ----
  	/* First timeline ID is always 1 */
  	ThisTimeLineID = 1;
  
+ 	/* XXX: Does buffer leak? */
+ #ifdef ISSUE_BOOTSTRAP_MEMORYLEAK
+ 	buffer0 = malloc(BLCKSZ + XLOG_EXTRA_BUFFERS);
+ 	buffer = (char *) XLOG_BUFFERS_ALIGN(buffer0);
+ #elif 1
+ 	buffer = (char *) XLOG_BUFFERS_ALIGN(
+ 		malloc(BLCKSZ + XLOG_EXTRA_BUFFERS) );
+ #else
  	/* Use malloc() to ensure buffer is MAXALIGNED */
  	buffer = (char *) malloc(BLCKSZ);
+ #endif
+ 
  	page = (XLogPageHeader) buffer;
  	memset(buffer, 0, BLCKSZ);
  
***************
*** 3576,3581 ****
--- 3700,3709 ----
  	/* Bootstrap the commit log, too */
  	BootStrapCLOG();
  	BootStrapSUBTRANS();
+ 
+ #ifdef ISSUE_BOOTSTRAP_MEMORYLEAK
+ 	free(buffer0);
+ #endif
  }
  
  static char *
***************
*** 5180,5185 ****
--- 5308,5320 ----
  		new_sync_bit = OPEN_DATASYNC_FLAG;
  	}
  #endif
+ #ifdef OPEN_DIRECT_FLAG
+ 	else if (pg_strcasecmp(method, "open_direct") == 0)
+ 	{
+ 		new_sync_method = SYNC_METHOD_OPEN;
+ 		new_sync_bit = OPEN_DIRECT_FLAG;
+ 	}
+ #endif
  	else
  		return NULL;
  
