>From 76cda0eb7b660654b0aa379883ebe4952658cdf7 Mon Sep 17 00:00:00 2001
From: Takashi Menjo <takashi.menjou.vg@hco.ntt.co.jp>
Date: Mon, 10 Feb 2020 17:53:12 +0900
Subject: [msync 2/5] Use WAL segments as WAL buffers

Note that we ignore wal_sync_method from here.
---
 src/backend/access/transam/xlog.c | 833 +++++++++---------------------
 1 file changed, 243 insertions(+), 590 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index e2cd34057f..43f9a8affc 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -15,6 +15,7 @@
 #include "postgres.h"
 
 #include <ctype.h>
+#include <libpmem.h>
 #include <math.h>
 #include <time.h>
 #include <fcntl.h>
@@ -613,24 +614,8 @@ typedef struct XLogCtlData
 	XLogwrtResult LogwrtResult;
 
 	/*
-	 * Latest initialized page in the cache (last byte position + 1).
-	 *
-	 * To change the identity of a buffer (and InitializedUpTo), you need to
-	 * hold WALBufMappingLock.  To change the identity of a buffer that's
-	 * still dirty, the old page needs to be written out first, and for that
-	 * you need WALWriteLock, and you need to ensure that there are no
-	 * in-progress insertions to the page by calling
-	 * WaitXLogInsertionsToFinish().
+	 * This value does not change after startup.
 	 */
-	XLogRecPtr	InitializedUpTo;
-
-	/*
-	 * These values do not change after startup, although the pointed-to pages
-	 * and xlblocks values certainly do.  xlblock values are protected by
-	 * WALBufMappingLock.
-	 */
-	char	   *pages;			/* buffers for unwritten XLOG pages */
-	XLogRecPtr *xlblocks;		/* 1st byte ptr-s + XLOG_BLCKSZ */
 	int			XLogCacheBlck;	/* highest allocated xlog buffer index */
 
 	/*
@@ -775,9 +760,16 @@ static const char *xlogSourceNames[] = {"any", "archive", "pg_wal", "stream"};
  * openLogFile is -1 or a kernel FD for an open log file segment.
  * openLogSegNo identifies the segment.  These variables are only used to
  * write the XLOG, and so will normally refer to the active segment.
+ *
+ * mappedPages is mmap(2)-ed address for an open log file segment.
+ * It is used as WAL buffer instead of XLogCtl->pages.
+ *
+ * pmemMapped is true if mappedPages is on PMEM.
  */
 static int	openLogFile = -1;
 static XLogSegNo openLogSegNo = 0;
+static char *mappedPages = NULL;
+static bool pmemMapped = 0;
 
 /*
  * These variables are used similarly to the ones above, but for reading
@@ -875,12 +867,12 @@ static void CheckPointGuts(XLogRecPtr checkPointRedo, int flags);
 static void KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo);
 static XLogRecPtr XLogGetReplicationSlotMinimumLSN(void);
 
-static void AdvanceXLInsertBuffer(XLogRecPtr upto, bool opportunistic);
 static bool XLogCheckpointNeeded(XLogSegNo new_segno);
 static void XLogWrite(XLogwrtRqst WriteRqst, bool flexible);
 static bool InstallXLogFileSegment(XLogSegNo *segno, char *tmppath,
 								   bool find_free, XLogSegNo max_segno,
 								   bool use_lock);
+static char *XLogFileMap(XLogSegNo segno, bool *is_pmem);
 static int	XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli,
 						 int source, bool notfoundOk);
 static int	XLogFileReadAnyTLI(XLogSegNo segno, int emode, int source);
@@ -891,6 +883,7 @@ static bool WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
 										bool fetching_ckpt, XLogRecPtr tliRecPtr);
 static int	emode_for_corrupt_record(int emode, XLogRecPtr RecPtr);
 static void XLogFileClose(void);
+static void XLogFileUnmap(char *pages, XLogSegNo segno);
 static void PreallocXlogFiles(XLogRecPtr RedoRecPtr, XLogRecPtr endptr);
 static void RemoveTempXlogFiles(void);
 static void RemoveOldXlogFiles(XLogSegNo segno, XLogRecPtr RedoRecPtr, XLogRecPtr endptr);
@@ -940,7 +933,6 @@ static void checkXLogConsistency(XLogReaderState *record);
 static void WALInsertLockAcquire(void);
 static void WALInsertLockAcquireExclusive(void);
 static void WALInsertLockRelease(void);
-static void WALInsertLockUpdateInsertingAt(XLogRecPtr insertingAt);
 
 /*
  * Insert an XLOG record represented by an already-constructed chain of data
@@ -1574,27 +1566,7 @@ CopyXLogRecordToWAL(int write_len, bool isLogSwitch, XLogRecData *rdata,
 		 */
 		while (CurrPos < EndPos)
 		{
-			/*
-			 * The minimal action to flush the page would be to call
-			 * WALInsertLockUpdateInsertingAt(CurrPos) followed by
-			 * AdvanceXLInsertBuffer(...).  The page would be left initialized
-			 * mostly to zeros, except for the page header (always the short
-			 * variant, as this is never a segment's first page).
-			 *
-			 * The large vistas of zeros are good for compressibility, but the
-			 * headers interrupting them every XLOG_BLCKSZ (with values that
-			 * differ from page to page) are not.  The effect varies with
-			 * compression tool, but bzip2 for instance compresses about an
-			 * order of magnitude worse if those headers are left in place.
-			 *
-			 * Rather than complicating AdvanceXLInsertBuffer itself (which is
-			 * called in heavily-loaded circumstances as well as this lightly-
-			 * loaded one) with variant behavior, we just use GetXLogBuffer
-			 * (which itself calls the two methods we need) to get the pointer
-			 * and zero most of the page.  Then we just zero the page header.
-			 */
-			currpos = GetXLogBuffer(CurrPos);
-			MemSet(currpos, 0, SizeOfXLogShortPHD);
+			/* XXX We assume that XLogFileInit does what we did here */
 
 			CurrPos += XLOG_BLCKSZ;
 		}
@@ -1708,29 +1680,6 @@ WALInsertLockRelease(void)
 	}
 }
 
-/*
- * Update our insertingAt value, to let others know that we've finished
- * inserting up to that point.
- */
-static void
-WALInsertLockUpdateInsertingAt(XLogRecPtr insertingAt)
-{
-	if (holdingAllLocks)
-	{
-		/*
-		 * We use the last lock to mark our actual position, see comments in
-		 * WALInsertLockAcquireExclusive.
-		 */
-		LWLockUpdateVar(&WALInsertLocks[NUM_XLOGINSERT_LOCKS - 1].l.lock,
-						&WALInsertLocks[NUM_XLOGINSERT_LOCKS - 1].l.insertingAt,
-						insertingAt);
-	}
-	else
-		LWLockUpdateVar(&WALInsertLocks[MyLockNo].l.lock,
-						&WALInsertLocks[MyLockNo].l.insertingAt,
-						insertingAt);
-}
-
 /*
  * Wait for any WAL insertions < upto to finish.
  *
@@ -1831,123 +1780,37 @@ WaitXLogInsertionsToFinish(XLogRecPtr upto)
 /*
  * Get a pointer to the right location in the WAL buffer containing the
  * given XLogRecPtr.
- *
- * If the page is not initialized yet, it is initialized. That might require
- * evicting an old dirty buffer from the buffer cache, which means I/O.
- *
- * The caller must ensure that the page containing the requested location
- * isn't evicted yet, and won't be evicted. The way to ensure that is to
- * hold onto a WAL insertion lock with the insertingAt position set to
- * something <= ptr. GetXLogBuffer() will update insertingAt if it needs
- * to evict an old page from the buffer. (This means that once you call
- * GetXLogBuffer() with a given 'ptr', you must not access anything before
- * that point anymore, and must not call GetXLogBuffer() with an older 'ptr'
- * later, because older buffers might be recycled already)
  */
 static char *
 GetXLogBuffer(XLogRecPtr ptr)
 {
-	int			idx;
-	XLogRecPtr	endptr;
-	static uint64 cachedPage = 0;
-	static char *cachedPos = NULL;
-	XLogRecPtr	expectedEndPtr;
+	int				idx;
+	XLogPageHeader	page;
+	XLogSegNo		segno;
 
-	/*
-	 * Fast path for the common case that we need to access again the same
-	 * page as last time.
-	 */
-	if (ptr / XLOG_BLCKSZ == cachedPage)
+	/* shut-up compiler if not --enable-cassert */
+	(void) page;
+
+	XLByteToSeg(ptr, segno, wal_segment_size);
+	if (segno != openLogSegNo)
 	{
-		Assert(((XLogPageHeader) cachedPos)->xlp_magic == XLOG_PAGE_MAGIC);
-		Assert(((XLogPageHeader) cachedPos)->xlp_pageaddr == ptr - (ptr % XLOG_BLCKSZ));
-		return cachedPos + ptr % XLOG_BLCKSZ;
+		/* Unmap the current segment if mapped */
+		if (mappedPages != NULL)
+			XLogFileUnmap(mappedPages, openLogSegNo);
+
+		/* Map the segment we need */
+		mappedPages = XLogFileMap(segno, &pmemMapped);
+		Assert(mappedPages != NULL);
+		openLogSegNo = segno;
 	}
 
-	/*
-	 * The XLog buffer cache is organized so that a page is always loaded to a
-	 * particular buffer.  That way we can easily calculate the buffer a given
-	 * page must be loaded into, from the XLogRecPtr alone.
-	 */
 	idx = XLogRecPtrToBufIdx(ptr);
+	page = (XLogPageHeader) (mappedPages + idx * (Size) XLOG_BLCKSZ);
 
-	/*
-	 * See what page is loaded in the buffer at the moment. It could be the
-	 * page we're looking for, or something older. It can't be anything newer
-	 * - that would imply the page we're looking for has already been written
-	 * out to disk and evicted, and the caller is responsible for making sure
-	 * that doesn't happen.
-	 *
-	 * However, we don't hold a lock while we read the value. If someone has
-	 * just initialized the page, it's possible that we get a "torn read" of
-	 * the XLogRecPtr if 64-bit fetches are not atomic on this platform. In
-	 * that case we will see a bogus value. That's ok, we'll grab the mapping
-	 * lock (in AdvanceXLInsertBuffer) and retry if we see anything else than
-	 * the page we're looking for. But it means that when we do this unlocked
-	 * read, we might see a value that appears to be ahead of the page we're
-	 * looking for. Don't PANIC on that, until we've verified the value while
-	 * holding the lock.
-	 */
-	expectedEndPtr = ptr;
-	expectedEndPtr += XLOG_BLCKSZ - ptr % XLOG_BLCKSZ;
+	Assert(page->xlp_magic == XLOG_PAGE_MAGIC);
+	Assert(page->xlp_pageaddr == ptr - (ptr % XLOG_BLCKSZ));
 
-	endptr = XLogCtl->xlblocks[idx];
-	if (expectedEndPtr != endptr)
-	{
-		XLogRecPtr	initializedUpto;
-
-		/*
-		 * Before calling AdvanceXLInsertBuffer(), which can block, let others
-		 * know how far we're finished with inserting the record.
-		 *
-		 * NB: If 'ptr' points to just after the page header, advertise a
-		 * position at the beginning of the page rather than 'ptr' itself. If
-		 * there are no other insertions running, someone might try to flush
-		 * up to our advertised location. If we advertised a position after
-		 * the page header, someone might try to flush the page header, even
-		 * though page might actually not be initialized yet. As the first
-		 * inserter on the page, we are effectively responsible for making
-		 * sure that it's initialized, before we let insertingAt to move past
-		 * the page header.
-		 */
-		if (ptr % XLOG_BLCKSZ == SizeOfXLogShortPHD &&
-			XLogSegmentOffset(ptr, wal_segment_size) > XLOG_BLCKSZ)
-			initializedUpto = ptr - SizeOfXLogShortPHD;
-		else if (ptr % XLOG_BLCKSZ == SizeOfXLogLongPHD &&
-				 XLogSegmentOffset(ptr, wal_segment_size) < XLOG_BLCKSZ)
-			initializedUpto = ptr - SizeOfXLogLongPHD;
-		else
-			initializedUpto = ptr;
-
-		WALInsertLockUpdateInsertingAt(initializedUpto);
-
-		AdvanceXLInsertBuffer(ptr, false);
-		endptr = XLogCtl->xlblocks[idx];
-
-		if (expectedEndPtr != endptr)
-			elog(PANIC, "could not find WAL buffer for %X/%X",
-				 (uint32) (ptr >> 32), (uint32) ptr);
-	}
-	else
-	{
-		/*
-		 * Make sure the initialization of the page is visible to us, and
-		 * won't arrive later to overwrite the WAL data we write on the page.
-		 */
-		pg_memory_barrier();
-	}
-
-	/*
-	 * Found the buffer holding this page. Return a pointer to the right
-	 * offset within the page.
-	 */
-	cachedPage = ptr / XLOG_BLCKSZ;
-	cachedPos = XLogCtl->pages + idx * (Size) XLOG_BLCKSZ;
-
-	Assert(((XLogPageHeader) cachedPos)->xlp_magic == XLOG_PAGE_MAGIC);
-	Assert(((XLogPageHeader) cachedPos)->xlp_pageaddr == ptr - (ptr % XLOG_BLCKSZ));
-
-	return cachedPos + ptr % XLOG_BLCKSZ;
+	return mappedPages + ptr % wal_segment_size;
 }
 
 /*
@@ -2075,178 +1938,6 @@ XLogRecPtrToBytePos(XLogRecPtr ptr)
 	return result;
 }
 
-/*
- * Initialize XLOG buffers, writing out old buffers if they still contain
- * unwritten data, upto the page containing 'upto'. Or if 'opportunistic' is
- * true, initialize as many pages as we can without having to write out
- * unwritten data. Any new pages are initialized to zeros, with pages headers
- * initialized properly.
- */
-static void
-AdvanceXLInsertBuffer(XLogRecPtr upto, bool opportunistic)
-{
-	XLogCtlInsert *Insert = &XLogCtl->Insert;
-	int			nextidx;
-	XLogRecPtr	OldPageRqstPtr;
-	XLogwrtRqst WriteRqst;
-	XLogRecPtr	NewPageEndPtr = InvalidXLogRecPtr;
-	XLogRecPtr	NewPageBeginPtr;
-	XLogPageHeader NewPage;
-	int			npages = 0;
-
-	LWLockAcquire(WALBufMappingLock, LW_EXCLUSIVE);
-
-	/*
-	 * Now that we have the lock, check if someone initialized the page
-	 * already.
-	 */
-	while (upto >= XLogCtl->InitializedUpTo || opportunistic)
-	{
-		nextidx = XLogRecPtrToBufIdx(XLogCtl->InitializedUpTo);
-
-		/*
-		 * Get ending-offset of the buffer page we need to replace (this may
-		 * be zero if the buffer hasn't been used yet).  Fall through if it's
-		 * already written out.
-		 */
-		OldPageRqstPtr = XLogCtl->xlblocks[nextidx];
-		if (LogwrtResult.Write < OldPageRqstPtr)
-		{
-			/*
-			 * Nope, got work to do. If we just want to pre-initialize as much
-			 * as we can without flushing, give up now.
-			 */
-			if (opportunistic)
-				break;
-
-			/* Before waiting, get info_lck and update LogwrtResult */
-			SpinLockAcquire(&XLogCtl->info_lck);
-			if (XLogCtl->LogwrtRqst.Write < OldPageRqstPtr)
-				XLogCtl->LogwrtRqst.Write = OldPageRqstPtr;
-			LogwrtResult = XLogCtl->LogwrtResult;
-			SpinLockRelease(&XLogCtl->info_lck);
-
-			/*
-			 * Now that we have an up-to-date LogwrtResult value, see if we
-			 * still need to write it or if someone else already did.
-			 */
-			if (LogwrtResult.Write < OldPageRqstPtr)
-			{
-				/*
-				 * Must acquire write lock. Release WALBufMappingLock first,
-				 * to make sure that all insertions that we need to wait for
-				 * can finish (up to this same position). Otherwise we risk
-				 * deadlock.
-				 */
-				LWLockRelease(WALBufMappingLock);
-
-				WaitXLogInsertionsToFinish(OldPageRqstPtr);
-
-				LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
-
-				LogwrtResult = XLogCtl->LogwrtResult;
-				if (LogwrtResult.Write >= OldPageRqstPtr)
-				{
-					/* OK, someone wrote it already */
-					LWLockRelease(WALWriteLock);
-				}
-				else
-				{
-					/* Have to write it ourselves */
-					TRACE_POSTGRESQL_WAL_BUFFER_WRITE_DIRTY_START();
-					WriteRqst.Write = OldPageRqstPtr;
-					WriteRqst.Flush = 0;
-					XLogWrite(WriteRqst, false);
-					LWLockRelease(WALWriteLock);
-					TRACE_POSTGRESQL_WAL_BUFFER_WRITE_DIRTY_DONE();
-				}
-				/* Re-acquire WALBufMappingLock and retry */
-				LWLockAcquire(WALBufMappingLock, LW_EXCLUSIVE);
-				continue;
-			}
-		}
-
-		/*
-		 * Now the next buffer slot is free and we can set it up to be the
-		 * next output page.
-		 */
-		NewPageBeginPtr = XLogCtl->InitializedUpTo;
-		NewPageEndPtr = NewPageBeginPtr + XLOG_BLCKSZ;
-
-		Assert(XLogRecPtrToBufIdx(NewPageBeginPtr) == nextidx);
-
-		NewPage = (XLogPageHeader) (XLogCtl->pages + nextidx * (Size) XLOG_BLCKSZ);
-
-		/*
-		 * Be sure to re-zero the buffer so that bytes beyond what we've
-		 * written will look like zeroes and not valid XLOG records...
-		 */
-		MemSet((char *) NewPage, 0, XLOG_BLCKSZ);
-
-		/*
-		 * Fill the new page's header
-		 */
-		NewPage->xlp_magic = XLOG_PAGE_MAGIC;
-
-		/* NewPage->xlp_info = 0; */	/* done by memset */
-		NewPage->xlp_tli = ThisTimeLineID;
-		NewPage->xlp_pageaddr = NewPageBeginPtr;
-
-		/* NewPage->xlp_rem_len = 0; */	/* done by memset */
-
-		/*
-		 * If online backup is not in progress, mark the header to indicate
-		 * that WAL records beginning in this page have removable backup
-		 * blocks.  This allows the WAL archiver to know whether it is safe to
-		 * compress archived WAL data by transforming full-block records into
-		 * the non-full-block format.  It is sufficient to record this at the
-		 * page level because we force a page switch (in fact a segment
-		 * switch) when starting a backup, so the flag will be off before any
-		 * records can be written during the backup.  At the end of a backup,
-		 * the last page will be marked as all unsafe when perhaps only part
-		 * is unsafe, but at worst the archiver would miss the opportunity to
-		 * compress a few records.
-		 */
-		if (!Insert->forcePageWrites)
-			NewPage->xlp_info |= XLP_BKP_REMOVABLE;
-
-		/*
-		 * If first page of an XLOG segment file, make it a long header.
-		 */
-		if ((XLogSegmentOffset(NewPage->xlp_pageaddr, wal_segment_size)) == 0)
-		{
-			XLogLongPageHeader NewLongPage = (XLogLongPageHeader) NewPage;
-
-			NewLongPage->xlp_sysid = ControlFile->system_identifier;
-			NewLongPage->xlp_seg_size = wal_segment_size;
-			NewLongPage->xlp_xlog_blcksz = XLOG_BLCKSZ;
-			NewPage->xlp_info |= XLP_LONG_HEADER;
-		}
-
-		/*
-		 * Make sure the initialization of the page becomes visible to others
-		 * before the xlblocks update. GetXLogBuffer() reads xlblocks without
-		 * holding a lock.
-		 */
-		pg_write_barrier();
-
-		*((volatile XLogRecPtr *) &XLogCtl->xlblocks[nextidx]) = NewPageEndPtr;
-
-		XLogCtl->InitializedUpTo = NewPageEndPtr;
-
-		npages++;
-	}
-	LWLockRelease(WALBufMappingLock);
-
-#ifdef WAL_DEBUG
-	if (XLOG_DEBUG && npages > 0)
-	{
-		elog(DEBUG1, "initialized %d pages, up to %X/%X",
-			 npages, (uint32) (NewPageEndPtr >> 32), (uint32) NewPageEndPtr);
-	}
-#endif
-}
-
 /*
  * Calculate CheckPointSegments based on max_wal_size_mb and
  * checkpoint_completion_target.
@@ -2375,14 +2066,9 @@ XLogCheckpointNeeded(XLogSegNo new_segno)
 static void
 XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
 {
-	bool		ispartialpage;
-	bool		last_iteration;
 	bool		finishing_seg;
-	bool		use_existent;
-	int			curridx;
-	int			npages;
-	int			startidx;
-	uint32		startoffset;
+	XLogSegNo	rqstLogSegNo;
+	XLogSegNo	segno;
 
 	/* We should always be inside a critical section here */
 	Assert(CritSectionCount > 0);
@@ -2392,223 +2078,140 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
 	 */
 	LogwrtResult = XLogCtl->LogwrtResult;
 
-	/*
-	 * Since successive pages in the xlog cache are consecutively allocated,
-	 * we can usually gather multiple pages together and issue just one
-	 * write() call.  npages is the number of pages we have determined can be
-	 * written together; startidx is the cache block index of the first one,
-	 * and startoffset is the file offset at which it should go. The latter
-	 * two variables are only valid when npages > 0, but we must initialize
-	 * all of them to keep the compiler quiet.
-	 */
-	npages = 0;
-	startidx = 0;
-	startoffset = 0;
+	/* Fast return if not requested to flush */
+	if (WriteRqst.Flush == 0)
+		return;
+	Assert(WriteRqst.Flush == WriteRqst.Write);
 
 	/*
-	 * Within the loop, curridx is the cache block index of the page to
-	 * consider writing.  Begin at the buffer containing the next unwritten
-	 * page, or last partially written page.
+	 * Call pmem_persist() or pmem_msync() for each segment file that contains
+	 * records to be flushed.
 	 */
-	curridx = XLogRecPtrToBufIdx(LogwrtResult.Write);
-
-	while (LogwrtResult.Write < WriteRqst.Write)
+	XLByteToPrevSeg(WriteRqst.Flush, rqstLogSegNo, wal_segment_size);
+	XLByteToSeg(LogwrtResult.Flush, segno, wal_segment_size);
+	while (segno <= rqstLogSegNo)
 	{
-		/*
-		 * Make sure we're not ahead of the insert process.  This could happen
-		 * if we're passed a bogus WriteRqst.Write that is past the end of the
-		 * last page that's been initialized by AdvanceXLInsertBuffer.
-		 */
-		XLogRecPtr	EndPtr = XLogCtl->xlblocks[curridx];
+		bool		is_pmem;
+		char	   *addr;
+		char	   *p;
+		Size		len;
+		XLogRecPtr	BeginPtr;
+		XLogRecPtr	EndPtr;
 
-		if (LogwrtResult.Write >= EndPtr)
-			elog(PANIC, "xlog write request %X/%X is past end of log %X/%X",
-				 (uint32) (LogwrtResult.Write >> 32),
-				 (uint32) LogwrtResult.Write,
-				 (uint32) (EndPtr >> 32), (uint32) EndPtr);
-
-		/* Advance LogwrtResult.Write to end of current buffer page */
-		LogwrtResult.Write = EndPtr;
-		ispartialpage = WriteRqst.Write < LogwrtResult.Write;
-
-		if (!XLByteInPrevSeg(LogwrtResult.Write, openLogSegNo,
-							 wal_segment_size))
+		/* Check if the segment is not mapped yet */
+		if (segno != openLogSegNo)
 		{
+			/* Map newly */
+			is_pmem = 0;
+			addr = XLogFileMap(segno, &is_pmem);
+
 			/*
-			 * Switch to new logfile segment.  We cannot have any pending
-			 * pages here (since we dump what we have at segment end).
+			 * Use the mapped above as WAL buffer of this process for the
+			 * future.  Note that it might be unmapped within this loop.
 			 */
-			Assert(npages == 0);
-			if (openLogFile >= 0)
-				XLogFileClose();
-			XLByteToPrevSeg(LogwrtResult.Write, openLogSegNo,
-							wal_segment_size);
-
-			/* create/use new log file */
-			use_existent = true;
-			openLogFile = XLogFileInit(openLogSegNo, &use_existent, true);
+			if (openLogSegNo == 0)
+			{
+				pmemMapped = is_pmem;
+				mappedPages = addr;
+				openLogSegNo = segno;
+			}
 		}
-
-		/* Make sure we have the current logfile open */
-		if (openLogFile < 0)
+		else
 		{
-			XLByteToPrevSeg(LogwrtResult.Write, openLogSegNo,
-							wal_segment_size);
-			openLogFile = XLogFileOpen(openLogSegNo);
+			/* Or use existent mapping */
+			is_pmem = pmemMapped;
+			addr = mappedPages;
 		}
+		Assert(addr != NULL);
+		Assert(mappedPages != NULL);
+		Assert(openLogSegNo > 0);
 
-		/* Add current page to the set of pending pages-to-dump */
-		if (npages == 0)
-		{
-			/* first of group */
-			startidx = curridx;
-			startoffset = XLogSegmentOffset(LogwrtResult.Write - XLOG_BLCKSZ,
-											wal_segment_size);
-		}
-		npages++;
+		/* Find beginning position to be flushed */
+		BeginPtr = segno * wal_segment_size;
+		if (BeginPtr < LogwrtResult.Flush)
+			BeginPtr = LogwrtResult.Flush;
+
+		/* Find ending position to be flushed */
+		EndPtr = (segno + 1) * wal_segment_size;
+		if (EndPtr > WriteRqst.Flush)
+			EndPtr = WriteRqst.Flush;
+
+		/* Convert LSN to memory address */
+		Assert(BeginPtr <= EndPtr);
+		p = addr + BeginPtr % wal_segment_size;
+		len = (Size) (EndPtr - BeginPtr);
 
 		/*
-		 * Dump the set if this will be the last loop iteration, or if we are
-		 * at the last page of the cache area (since the next page won't be
-		 * contiguous in memory), or if we are at the end of the logfile
-		 * segment.
+		 * Do cache-flush or msync.
+		 *
+		 * Note that pmem_msync() does backoff to the page boundary.
 		 */
-		last_iteration = WriteRqst.Write <= LogwrtResult.Write;
-
-		finishing_seg = !ispartialpage &&
-			(startoffset + npages * XLOG_BLCKSZ) >= wal_segment_size;
-
-		if (last_iteration ||
-			curridx == XLogCtl->XLogCacheBlck ||
-			finishing_seg)
+		if (is_pmem)
 		{
-			char	   *from;
-			Size		nbytes;
-			Size		nleft;
-			int			written;
-
-			/* OK to write the page(s) */
-			from = XLogCtl->pages + startidx * (Size) XLOG_BLCKSZ;
-			nbytes = npages * (Size) XLOG_BLCKSZ;
-			nleft = nbytes;
-			do
+			pgstat_report_wait_start(WAIT_EVENT_WAL_SYNC);
+			pmem_persist(p, len);
+			pgstat_report_wait_end();
+		}
+		else
+		{
+			pgstat_report_wait_start(WAIT_EVENT_WAL_SYNC);
+			if (pmem_msync(p, len))
 			{
-				errno = 0;
-				pgstat_report_wait_start(WAIT_EVENT_WAL_WRITE);
-				written = pg_pwrite(openLogFile, from, nleft, startoffset);
 				pgstat_report_wait_end();
-				if (written <= 0)
-				{
-					if (errno == EINTR)
-						continue;
-					ereport(PANIC,
-							(errcode_for_file_access(),
-							 errmsg("could not write to log file %s "
-									"at offset %u, length %zu: %m",
-									XLogFileNameP(ThisTimeLineID, openLogSegNo),
-									startoffset, nleft)));
-				}
-				nleft -= written;
-				from += written;
-				startoffset += written;
-			} while (nleft > 0);
+				ereport(PANIC,
+						(errcode_for_file_access(),
+						 errmsg("could not msync file \"%s\": %m",
+								XLogFileNameP(ThisTimeLineID, segno))));
+			}
+			pgstat_report_wait_end();
+		}
+		LogwrtResult.Flush = LogwrtResult.Write = EndPtr;
 
-			npages = 0;
+		/* Check if whole my WAL buffers are synchronized to the segment */
+		finishing_seg = (LogwrtResult.Flush % wal_segment_size == 0) &&
+						XLByteInPrevSeg(LogwrtResult.Flush, openLogSegNo,
+										wal_segment_size);
 
-			/*
-			 * If we just wrote the whole last page of a logfile segment,
-			 * fsync the segment immediately.  This avoids having to go back
-			 * and re-open prior segments when an fsync request comes along
-			 * later. Doing it here ensures that one and only one backend will
-			 * perform this fsync.
-			 *
-			 * This is also the right place to notify the Archiver that the
-			 * segment is ready to copy to archival storage, and to update the
-			 * timer for archive_timeout, and to signal for a checkpoint if
-			 * too many logfile segments have been used since the last
-			 * checkpoint.
-			 */
+		if (segno != openLogSegNo || finishing_seg)
+		{
+			XLogFileUnmap(addr, segno);
 			if (finishing_seg)
 			{
-				issue_xlog_fsync(openLogFile, openLogSegNo);
-
-				/* signal that we need to wakeup walsenders later */
-				WalSndWakeupRequest();
-
-				LogwrtResult.Flush = LogwrtResult.Write;	/* end of page */
-
-				if (XLogArchivingActive())
-					XLogArchiveNotifySeg(openLogSegNo);
-
-				XLogCtl->lastSegSwitchTime = (pg_time_t) time(NULL);
-				XLogCtl->lastSegSwitchLSN = LogwrtResult.Flush;
-
-				/*
-				 * Request a checkpoint if we've consumed too much xlog since
-				 * the last one.  For speed, we first check using the local
-				 * copy of RedoRecPtr, which might be out of date; if it looks
-				 * like a checkpoint is needed, forcibly update RedoRecPtr and
-				 * recheck.
-				 */
-				if (IsUnderPostmaster && XLogCheckpointNeeded(openLogSegNo))
-				{
-					(void) GetRedoRecPtr();
-					if (XLogCheckpointNeeded(openLogSegNo))
-						RequestCheckpoint(CHECKPOINT_CAUSE_XLOG);
-				}
+				Assert(segno == openLogSegNo);
+				mappedPages = NULL;
+				openLogSegNo = 0;
 			}
-		}
 
-		if (ispartialpage)
-		{
-			/* Only asked to write a partial page */
-			LogwrtResult.Write = WriteRqst.Write;
-			break;
-		}
-		curridx = NextBufIdx(curridx);
+			/* signal that we need to wakeup walsenders later */
+			WalSndWakeupRequest();
 
-		/* If flexible, break out of loop as soon as we wrote something */
-		if (flexible && npages == 0)
-			break;
-	}
+			if (XLogArchivingActive())
+				XLogArchiveNotifySeg(segno);
 
-	Assert(npages == 0);
+			XLogCtl->lastSegSwitchTime = (pg_time_t) time(NULL);
+			XLogCtl->lastSegSwitchLSN = LogwrtResult.Flush;
 
-	/*
-	 * If asked to flush, do so
-	 */
-	if (LogwrtResult.Flush < WriteRqst.Flush &&
-		LogwrtResult.Flush < LogwrtResult.Write)
-
-	{
-		/*
-		 * Could get here without iterating above loop, in which case we might
-		 * have no open file or the wrong one.  However, we do not need to
-		 * fsync more than one file.
-		 */
-		if (sync_method != SYNC_METHOD_OPEN &&
-			sync_method != SYNC_METHOD_OPEN_DSYNC)
-		{
-			if (openLogFile >= 0 &&
-				!XLByteInPrevSeg(LogwrtResult.Write, openLogSegNo,
-								 wal_segment_size))
-				XLogFileClose();
-			if (openLogFile < 0)
+			/*
+			 * Request a checkpoint if we've consumed too much xlog since
+			 * the last one.  For speed, we first check using the local
+			 * copy of RedoRecPtr, which might be out of date; if it looks
+			 * like a checkpoint is needed, forcibly update RedoRecPtr and
+			 * recheck.
+			 */
+			if (IsUnderPostmaster && XLogCheckpointNeeded(segno))
 			{
-				XLByteToPrevSeg(LogwrtResult.Write, openLogSegNo,
-								wal_segment_size);
-				openLogFile = XLogFileOpen(openLogSegNo);
+				(void) GetRedoRecPtr();
+				if (XLogCheckpointNeeded(segno))
+					RequestCheckpoint(CHECKPOINT_CAUSE_XLOG);
 			}
-
-			issue_xlog_fsync(openLogFile, openLogSegNo);
 		}
 
-		/* signal that we need to wakeup walsenders later */
-		WalSndWakeupRequest();
-
-		LogwrtResult.Flush = LogwrtResult.Write;
+		++segno;
 	}
 
+	/* signal that we need to wakeup walsenders later */
+	WalSndWakeupRequest();
+
 	/*
 	 * Update shared-memory status
 	 *
@@ -3029,6 +2632,16 @@ XLogBackgroundFlush(void)
 				XLogFileClose();
 			}
 		}
+		else if (mappedPages != NULL)
+		{
+			if (!XLByteInPrevSeg(LogwrtResult.Write, openLogSegNo,
+								 wal_segment_size))
+			{
+				XLogFileUnmap(mappedPages, openLogSegNo);
+				mappedPages = NULL;
+				openLogSegNo = 0;
+			}
+		}
 		return false;
 	}
 
@@ -3095,12 +2708,6 @@ XLogBackgroundFlush(void)
 	/* wake up walsenders now that we've released heavily contended locks */
 	WalSndWakeupProcessRequests();
 
-	/*
-	 * Great, done. To take some work off the critical path, try to initialize
-	 * as many of the no-longer-needed WAL buffers for future use as we can.
-	 */
-	AdvanceXLInsertBuffer(InvalidXLogRecPtr, true);
-
 	/*
 	 * If we determined that we need to write data, but somebody else
 	 * wrote/flushed already, it should be considered as being active, to
@@ -3257,6 +2864,10 @@ XLogFileInit(XLogSegNo logsegno, bool *use_existent, bool use_lock)
 	save_errno = 0;
 	if (wal_init_zero)
 	{
+		XLogCtlInsert  *Insert = &XLogCtl->Insert;
+		XLogPageHeader	NewPage = (XLogPageHeader) zbuffer.data;
+		XLogRecPtr		NewPageBeginPtr = logsegno * wal_segment_size;
+
 		/*
 		 * Zero-fill the file.  With this setting, we do this the hard way to
 		 * ensure that all the file space has really been allocated.  On
@@ -3268,6 +2879,48 @@ XLogFileInit(XLogSegNo logsegno, bool *use_existent, bool use_lock)
 		 */
 		for (nbytes = 0; nbytes < wal_segment_size; nbytes += XLOG_BLCKSZ)
 		{
+			memset(NewPage, 0, SizeOfXLogLongPHD);
+
+			/*
+			 * Fill the new page's header
+			 */
+			NewPage->xlp_magic = XLOG_PAGE_MAGIC;
+
+			/* NewPage->xlp_info = 0; */	/* done by memset */
+			NewPage->xlp_tli = ThisTimeLineID;
+			NewPage->xlp_pageaddr = NewPageBeginPtr;
+
+			/* NewPage->xlp_rem_len = 0; */	/* done by memset */
+
+			/*
+			 * If online backup is not in progress, mark the header to indicate
+			 * that WAL records beginning in this page have removable backup
+			 * blocks.  This allows the WAL archiver to know whether it is safe to
+			 * compress archived WAL data by transforming full-block records into
+			 * the non-full-block format.  It is sufficient to record this at the
+			 * page level because we force a page switch (in fact a segment
+			 * switch) when starting a backup, so the flag will be off before any
+			 * records can be written during the backup.  At the end of a backup,
+			 * the last page will be marked as all unsafe when perhaps only part
+			 * is unsafe, but at worst the archiver would miss the opportunity to
+			 * compress a few records.
+			 */
+			if (!Insert->forcePageWrites)
+				NewPage->xlp_info |= XLP_BKP_REMOVABLE;
+
+			/*
+			 * If first page of an XLOG segment file, make it a long header.
+			 */
+			if ((XLogSegmentOffset(NewPage->xlp_pageaddr, wal_segment_size)) == 0)
+			{
+				XLogLongPageHeader NewLongPage = (XLogLongPageHeader) NewPage;
+
+				NewLongPage->xlp_sysid = ControlFile->system_identifier;
+				NewLongPage->xlp_seg_size = wal_segment_size;
+				NewLongPage->xlp_xlog_blcksz = XLOG_BLCKSZ;
+				NewPage->xlp_info |= XLP_LONG_HEADER;
+			}
+
 			errno = 0;
 			if (write(fd, zbuffer.data, XLOG_BLCKSZ) != XLOG_BLCKSZ)
 			{
@@ -3275,6 +2928,8 @@ XLogFileInit(XLogSegNo logsegno, bool *use_existent, bool use_lock)
 				save_errno = errno ? errno : ENOSPC;
 				break;
 			}
+
+			NewPageBeginPtr += XLOG_BLCKSZ;
 		}
 	}
 	else
@@ -3610,6 +3265,36 @@ XLogFileOpen(XLogSegNo segno)
 	return fd;
 }
 
+/*
+ * Memory-map a pre-existing logfile segment for WAL buffers.
+ *
+ * If success, it returns non-NULL and is_pmem is set whether the file is on
+ * PMEM or not.  Otherwise, it PANICs.
+ */
+static char *
+XLogFileMap(XLogSegNo segno, bool *is_pmem)
+{
+	char		path[MAXPGPATH];
+	char	   *addr;
+	Size		mlen;
+	int			pmem;
+
+	XLogFilePath(path, ThisTimeLineID, segno, wal_segment_size);
+
+	mlen = 0;
+	pmem = 0;
+	addr = pmem_map_file(path, 0, 0, 0, &mlen, &pmem);
+	if (addr == NULL)
+		ereport(PANIC,
+				(errcode_for_file_access(),
+				 errmsg("could not open or mmap file \"%s\": %m", path)));
+
+	Assert(mlen == wal_segment_size);
+
+	*is_pmem = (bool) pmem;
+	return addr;
+}
+
 /*
  * Open a logfile segment for reading (during recovery).
  *
@@ -3799,6 +3484,21 @@ XLogFileClose(void)
 	openLogFile = -1;
 }
 
+/*
+ * Unmap the current logfile segment for WAL buffer.
+ */
+static void
+XLogFileUnmap(char *pages, XLogSegNo segno)
+{
+	Assert(pages != NULL);
+
+	if (pmem_unmap(pages, wal_segment_size))
+		ereport(PANIC,
+				(errcode_for_file_access(),
+				 errmsg("could not unmap file \"%s\": %m",
+						XLogFileNameP(ThisTimeLineID, segno))));
+}
+
 /*
  * Preallocate log files beyond the specified log endpoint.
  */
@@ -4947,12 +4647,6 @@ XLOGShmemSize(void)
 
 	/* WAL insertion locks, plus alignment */
 	size = add_size(size, mul_size(sizeof(WALInsertLockPadded), NUM_XLOGINSERT_LOCKS + 1));
-	/* xlblocks array */
-	size = add_size(size, mul_size(sizeof(XLogRecPtr), XLOGbuffers));
-	/* extra alignment padding for XLOG I/O buffers */
-	size = add_size(size, XLOG_BLCKSZ);
-	/* and the buffers themselves */
-	size = add_size(size, mul_size(XLOG_BLCKSZ, XLOGbuffers));
 
 	/*
 	 * Note: we don't count ControlFileData, it comes out of the "slop factor"
@@ -5028,10 +4722,6 @@ XLOGShmemInit(void)
 	 * needed here.
 	 */
 	allocptr = ((char *) XLogCtl) + sizeof(XLogCtlData);
-	XLogCtl->xlblocks = (XLogRecPtr *) allocptr;
-	memset(XLogCtl->xlblocks, 0, sizeof(XLogRecPtr) * XLOGbuffers);
-	allocptr += sizeof(XLogRecPtr) * XLOGbuffers;
-
 
 	/* WAL insertion locks. Ensure they're aligned to the full padded size */
 	allocptr += sizeof(WALInsertLockPadded) -
@@ -5048,15 +4738,6 @@ XLOGShmemInit(void)
 		WALInsertLocks[i].l.lastImportantAt = InvalidXLogRecPtr;
 	}
 
-	/*
-	 * Align the start of the page buffers to a full xlog block size boundary.
-	 * This simplifies some calculations in XLOG insertion. It is also
-	 * required for O_DIRECT.
-	 */
-	allocptr = (char *) TYPEALIGN(XLOG_BLCKSZ, allocptr);
-	XLogCtl->pages = allocptr;
-	memset(XLogCtl->pages, 0, (Size) XLOG_BLCKSZ * XLOGbuffers);
-
 	/*
 	 * Do basic initialization of XLogCtl shared data. (StartupXLOG will fill
 	 * in additional info.)
@@ -7494,40 +7175,12 @@ StartupXLOG(void)
 	Insert->CurrBytePos = XLogRecPtrToBytePos(EndOfLog);
 
 	/*
-	 * Tricky point here: readBuf contains the *last* block that the LastRec
-	 * record spans, not the one it starts in.  The last block is indeed the
-	 * one we want to use.
+	 * We DO NOT need the if-else block once existed here because we use WAL
+	 * segment files as WAL buffers so the last block is "already on the
+	 * buffers."
+	 *
+	 * XXX We assume there is no torn record.
 	 */
-	if (EndOfLog % XLOG_BLCKSZ != 0)
-	{
-		char	   *page;
-		int			len;
-		int			firstIdx;
-		XLogRecPtr	pageBeginPtr;
-
-		pageBeginPtr = EndOfLog - (EndOfLog % XLOG_BLCKSZ);
-		Assert(readOff == XLogSegmentOffset(pageBeginPtr, wal_segment_size));
-
-		firstIdx = XLogRecPtrToBufIdx(EndOfLog);
-
-		/* Copy the valid part of the last block, and zero the rest */
-		page = &XLogCtl->pages[firstIdx * XLOG_BLCKSZ];
-		len = EndOfLog % XLOG_BLCKSZ;
-		memcpy(page, xlogreader->readBuf, len);
-		memset(page + len, 0, XLOG_BLCKSZ - len);
-
-		XLogCtl->xlblocks[firstIdx] = pageBeginPtr + XLOG_BLCKSZ;
-		XLogCtl->InitializedUpTo = pageBeginPtr + XLOG_BLCKSZ;
-	}
-	else
-	{
-		/*
-		 * There is no partial block to copy. Just set InitializedUpTo, and
-		 * let the first attempt to insert a log record to initialize the next
-		 * buffer.
-		 */
-		XLogCtl->InitializedUpTo = EndOfLog;
-	}
 
 	LogwrtResult.Write = LogwrtResult.Flush = EndOfLog;
 
-- 
2.20.1

