From 3c51ef6459228a83f458ef9e48fb4b98e707746d Mon Sep 17 00:00:00 2001
From: Takashi Menjo <takashi.menjou.vg@hco.ntt.co.jp>
Date: Tue, 23 Mar 2021 11:45:44 +0900
Subject: [PATCH v4 4/9] Map WAL segment files on PMEM as WAL buffers

Fixes introduced in patchset v2:
- Keep openLogSegNo even if wal_pmem_map=true
- Fix sync issue of PmemXLogCreate
- Fix unmapping issue of PmemXLogUnmap
- Remove unused XLogPageOffset
---
 src/backend/access/transam/Makefile   |   1 +
 src/backend/access/transam/xlog.c     | 143 ++++++++++---
 src/backend/access/transam/xlogpmem.c | 297 ++++++++++++++++++++++++++
 src/include/access/xlogpmem.h         |  59 +++++
 4 files changed, 466 insertions(+), 34 deletions(-)
 create mode 100644 src/backend/access/transam/xlogpmem.c
 create mode 100644 src/include/access/xlogpmem.h

diff --git a/src/backend/access/transam/Makefile b/src/backend/access/transam/Makefile
index 595e02de72..3a29583bc0 100644
--- a/src/backend/access/transam/Makefile
+++ b/src/backend/access/transam/Makefile
@@ -31,6 +31,7 @@ OBJS = \
 	xlogarchive.o \
 	xlogfuncs.o \
 	xloginsert.o \
+	xlogpmem.o \
 	xlogreader.o \
 	xlogutils.o
 
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index bc07131e99..0986dbc0a1 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -35,6 +35,7 @@
 #include "access/xlog_internal.h"
 #include "access/xlogarchive.h"
 #include "access/xloginsert.h"
+#include "access/xlogpmem.h"
 #include "access/xlogreader.h"
 #include "access/xlogutils.h"
 #include "catalog/catversion.h"
@@ -1992,7 +1993,13 @@ GetXLogBuffer(XLogRecPtr ptr)
 	 * offset within the page.
 	 */
 	cachedPage = ptr / XLOG_BLCKSZ;
-	cachedPos = XLogCtl->pages + idx * (Size) XLOG_BLCKSZ;
+	if (wal_pmem_map)
+	{
+		openLogSegNo = PmemXLogEnsurePrevMapped(endptr);
+		cachedPos = PmemXLogGetBufferPages() + idx * (Size) XLOG_BLCKSZ;
+	}
+	else
+		cachedPos = XLogCtl->pages + idx * (Size) XLOG_BLCKSZ;
 
 	Assert(((XLogPageHeader) cachedPos)->xlp_magic == XLOG_PAGE_MAGIC);
 	Assert(((XLogPageHeader) cachedPos)->xlp_pageaddr == ptr - (ptr % XLOG_BLCKSZ));
@@ -2226,7 +2233,13 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, bool opportunistic)
 
 		Assert(XLogRecPtrToBufIdx(NewPageBeginPtr) == nextidx);
 
-		NewPage = (XLogPageHeader) (XLogCtl->pages + nextidx * (Size) XLOG_BLCKSZ);
+		if (wal_pmem_map)
+		{
+			openLogSegNo = PmemXLogEnsurePrevMapped(NewPageEndPtr);
+			NewPage = (XLogPageHeader) (PmemXLogGetBufferPages() + nextidx * (Size) XLOG_BLCKSZ);
+		}
+		else
+			NewPage = (XLogPageHeader) (XLogCtl->pages + nextidx * (Size) XLOG_BLCKSZ);
 
 		/*
 		 * Be sure to re-zero the buffer so that bytes beyond what we've
@@ -2445,6 +2458,8 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
 	int			npages;
 	int			startidx;
 	uint32		startoffset;
+	bool		isfirstpage;
+	XLogRecPtr	startpageptr;
 
 	/* We should always be inside a critical section here */
 	Assert(CritSectionCount > 0);
@@ -2467,6 +2482,10 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
 	startidx = 0;
 	startoffset = 0;
 
+	/* Those are used actually only if wal_pmem_map=true */
+	isfirstpage = true;
+	startpageptr = 0;
+
 	/*
 	 * Within the loop, curridx is the cache block index of the page to
 	 * consider writing.  Begin at the buffer containing the next unwritten
@@ -2492,31 +2511,34 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
 		LogwrtResult.Write = EndPtr;
 		ispartialpage = WriteRqst.Write < LogwrtResult.Write;
 
-		if (!XLByteInPrevSeg(LogwrtResult.Write, openLogSegNo,
-							 wal_segment_size))
+		if (!wal_pmem_map)
 		{
-			/*
-			 * Switch to new logfile segment.  We cannot have any pending
-			 * pages here (since we dump what we have at segment end).
-			 */
-			Assert(npages == 0);
-			if (openLogFile >= 0)
-				XLogFileClose();
-			XLByteToPrevSeg(LogwrtResult.Write, openLogSegNo,
-							wal_segment_size);
+			if (!XLByteInPrevSeg(LogwrtResult.Write, openLogSegNo,
+								 wal_segment_size))
+			{
+				/*
+				 * Switch to new logfile segment.  We cannot have any pending
+				 * pages here (since we dump what we have at segment end).
+				 */
+				Assert(npages == 0);
+				if (openLogFile >= 0)
+					XLogFileClose();
+				XLByteToPrevSeg(LogwrtResult.Write, openLogSegNo,
+								wal_segment_size);
 
-			/* create/use new log file */
-			openLogFile = XLogFileInit(openLogSegNo);
-			ReserveExternalFD();
-		}
+				/* create/use new log file */
+				openLogFile = XLogFileInit(openLogSegNo);
+				ReserveExternalFD();
+			}
 
-		/* Make sure we have the current logfile open */
-		if (openLogFile < 0)
-		{
-			XLByteToPrevSeg(LogwrtResult.Write, openLogSegNo,
-							wal_segment_size);
-			openLogFile = XLogFileOpen(openLogSegNo);
-			ReserveExternalFD();
+			/* Make sure we have the current logfile open */
+			if (openLogFile < 0)
+			{
+				XLByteToPrevSeg(LogwrtResult.Write, openLogSegNo,
+								wal_segment_size);
+				openLogFile = XLogFileOpen(openLogSegNo);
+				ReserveExternalFD();
+			}
 		}
 
 		/* Add current page to the set of pending pages-to-dump */
@@ -2524,8 +2546,8 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
 		{
 			/* first of group */
 			startidx = curridx;
-			startoffset = XLogSegmentOffset(LogwrtResult.Write - XLOG_BLCKSZ,
-											wal_segment_size);
+			startpageptr = LogwrtResult.Write - XLOG_BLCKSZ;
+			startoffset = XLogSegmentOffset(startpageptr, wal_segment_size);
 		}
 		npages++;
 
@@ -2563,7 +2585,37 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
 					INSTR_TIME_SET_CURRENT(start);
 
 				pgstat_report_wait_start(WAIT_EVENT_WAL_WRITE);
-				written = pg_pwrite(openLogFile, from, nleft, startoffset);
+
+				/*
+				 * If we use a WAL segment file as WAL buffers, we cache-flush
+				 * records on the buffers byte by byte, not page by page. To do
+				 * so, here we fix the range being cache-flushed.
+				 */
+				if (wal_pmem_map)
+				{
+					XLogRecPtr	startbyteptr;
+					XLogRecPtr	endbyteptr;
+
+					startbyteptr = (isfirstpage)
+								 ? XLogCtl->LogwrtResult.Write
+								 : startpageptr;
+
+					endbyteptr = (ispartialpage)
+							   ? WriteRqst.Write
+							   : LogwrtResult.Write;
+
+					/* Now we cache-flush records */
+					openLogSegNo = PmemXLogEnsurePrevMapped(endbyteptr);
+					PmemXLogFlush(startbyteptr, endbyteptr);
+
+					/* Mark the first page is consumed */
+					isfirstpage = false;
+
+					/* Tell all the "pages" have been written successfully */
+					written = nleft;
+				}
+				else
+					written = pg_pwrite(openLogFile, from, nleft, startoffset);
 				pgstat_report_wait_end();
 
 				/*
@@ -2621,7 +2673,10 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
 			 */
 			if (finishing_seg)
 			{
-				issue_xlog_fsync(openLogFile, openLogSegNo);
+				if (wal_pmem_map)
+					PmemXLogSync();
+				else
+					issue_xlog_fsync(openLogFile, openLogSegNo);
 
 				/* signal that we need to wakeup walsenders later */
 				WalSndWakeupRequest();
@@ -2672,12 +2727,14 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
 		LogwrtResult.Flush < LogwrtResult.Write)
 
 	{
+		if (wal_pmem_map)
+			PmemXLogSync();
 		/*
 		 * Could get here without iterating above loop, in which case we might
 		 * have no open file or the wrong one.  However, we do not need to
 		 * fsync more than one file.
 		 */
-		if (sync_method != SYNC_METHOD_OPEN &&
+		else if (sync_method != SYNC_METHOD_OPEN &&
 			sync_method != SYNC_METHOD_OPEN_DSYNC)
 		{
 			if (openLogFile >= 0 &&
@@ -8033,11 +8090,29 @@ StartupXLOG(void)
 
 		firstIdx = XLogRecPtrToBufIdx(EndOfLog);
 
-		/* Copy the valid part of the last block, and zero the rest */
-		page = &XLogCtl->pages[firstIdx * XLOG_BLCKSZ];
-		len = EndOfLog % XLOG_BLCKSZ;
-		memcpy(page, xlogreader->readBuf, len);
-		memset(page + len, 0, XLOG_BLCKSZ - len);
+		if (wal_pmem_map)
+		{
+			/*
+			 * Keep the valid part of the last block, and zero the rest.
+			 * Note that "len" indicates the size of the valid part.
+			 */
+			openLogSegNo = PmemXLogEnsurePrevMapped(EndOfLog);
+			page = PmemXLogGetBufferPages() + firstIdx * (Size) XLOG_BLCKSZ;
+			len = EndOfLog % XLOG_BLCKSZ;
+			memset(page + len, 0, XLOG_BLCKSZ - len);
+
+			/* Cache-flush and sync now */
+			PmemXLogFlush(EndOfLog, pageBeginPtr + XLOG_BLCKSZ);
+			PmemXLogSync();
+		}
+		else
+		{
+			/* Copy the valid part of the last block, and zero the rest */
+			page = &XLogCtl->pages[firstIdx * XLOG_BLCKSZ];
+			len = EndOfLog % XLOG_BLCKSZ;
+			memcpy(page, xlogreader->readBuf, len);
+			memset(page + len, 0, XLOG_BLCKSZ - len);
+		}
 
 		XLogCtl->xlblocks[firstIdx] = pageBeginPtr + XLOG_BLCKSZ;
 		XLogCtl->InitializedUpTo = pageBeginPtr + XLOG_BLCKSZ;
diff --git a/src/backend/access/transam/xlogpmem.c b/src/backend/access/transam/xlogpmem.c
new file mode 100644
index 0000000000..74d49eac92
--- /dev/null
+++ b/src/backend/access/transam/xlogpmem.c
@@ -0,0 +1,297 @@
+#include "postgres.h"
+
+#ifdef USE_LIBPMEM
+
+#include <errno.h>
+#include <limits.h>		/* INT_MAX */
+#include <stddef.h>		/* size_t */
+#include <stdint.h>		/* uintptr_t */
+#include <unistd.h>		/* getpid, unlink */
+
+#include <libpmem.h>
+
+#include "c.h"						/* bool, Size */
+#include "access/xlog.h"
+#include "access/xlog_internal.h"	/* XLogFilePath, XLByteToSeg */
+#include "access/xlogpmem.h"
+#include "common/file_perm.h"		/* pg_file_create_mode */
+#include "miscadmin.h"				/* enableFsync */
+#include "pgstat.h"
+
+static char *mappedPages = NULL;
+static XLogSegNo mappedSegNo = 0;
+
+#define PG_DAX_HUGEPAGE_SIZE (((uintptr_t) 1) << 21)
+#define PG_DAX_HUGEPAGE_MASK (~(PG_DAX_HUGEPAGE_SIZE - 1))
+
+static XLogSegNo PmemXLogMap(XLogSegNo segno);
+static void PmemXLogCreate(XLogSegNo segno);
+static void PmemXLogUnmap(void);
+
+static void *PmemCreateMapFile(const char *path, size_t len);
+static void *PmemOpenMapFile(const char *path, size_t expected_len);
+static void *PmemTryOpenMapFile(const char *path, size_t expected_len);
+static void *PmemMapFile(const char *path, size_t expected_len, int flags,
+						 bool try_open);
+static void PmemUnmapForError(void *addr, size_t len);
+
+/*
+ * Ensures the WAL segment containg {ptr-1} to be mapped.
+ *
+ * Returns mapped XLogSegNo.
+ */
+XLogSegNo
+PmemXLogEnsurePrevMapped(XLogRecPtr ptr)
+{
+	XLogSegNo	segno;
+
+	Assert(wal_pmem_map);
+
+	XLByteToPrevSeg(ptr, segno, wal_segment_size);
+
+	if (mappedPages != NULL)
+	{
+		/* Fast return: The segment we need is already mapped */
+		if (mappedSegNo == segno)
+			return mappedSegNo;
+
+		/* Unmap the current segment we don't need */
+		PmemXLogUnmap();
+	}
+
+	return PmemXLogMap(segno);
+}
+
+/*
+ * Creates a new XLOG file segment, or open a pre-existing one, for WAL buffers.
+ *
+ * Returns mapped XLogSegNo.
+ *
+ * See also XLogFileInit in xlog.c.
+ */
+static XLogSegNo
+PmemXLogMap(XLogSegNo segno)
+{
+	char		path[MAXPGPATH];
+
+	Assert(mappedPages == NULL);
+
+	XLogFilePath(path, ThisTimeLineID, segno, wal_segment_size);
+
+	/* PmemTryOpenMapFile will handle error except ENOENT */
+	mappedPages = PmemTryOpenMapFile(path, wal_segment_size);
+
+	/* Fast return if already exists */
+	if (mappedPages != NULL)
+	{
+		mappedSegNo = segno;
+		return mappedSegNo;
+	}
+
+	elog(DEBUG2, "creating and filling new WAL file");
+	PmemXLogCreate(segno);
+
+	/* PmemCreateMapFile will handle error */
+	mappedPages = PmemOpenMapFile(path, wal_segment_size);
+	mappedSegNo = segno;
+
+	elog(DEBUG2, "done creating and filling new WAL file");
+	return mappedSegNo;
+}
+
+/*
+ * Creates a new XLOG file segment.
+ *
+ * See also XLogFileInit in xlog.c.
+ */
+static void
+PmemXLogCreate(XLogSegNo segno)
+{
+	char	   *addr;
+	char		tmppath[MAXPGPATH];
+	XLogSegNo	inst_segno;
+	XLogSegNo	max_segno;
+
+	snprintf(tmppath, MAXPGPATH, XLOGDIR "/xlogtemp.%d", (int) getpid());
+	unlink(tmppath);
+
+	/* PmemCreateMapFile will handle error */
+	addr = PmemCreateMapFile(tmppath, wal_segment_size);
+
+	/*
+	 * Initialize whole the buffers.
+	 *
+	 * Note that we don't put any single byte if not wal_init_zero. It's okay
+	 * because we already have a new segment file truncated to the proper size.
+	 */
+	pgstat_report_wait_start(WAIT_EVENT_WAL_INIT_WRITE);
+	if (wal_init_zero)
+		pmem_memset_nodrain(addr, 0, wal_segment_size);
+	pgstat_report_wait_end();
+
+	pgstat_report_wait_start(WAIT_EVENT_WAL_INIT_SYNC);
+	if (enableFsync)
+		pmem_drain();
+	pgstat_report_wait_end();
+
+	if (pmem_unmap(addr, wal_segment_size) < 0)
+		elog(ERROR, "could not pmem_unmap temporal WAL buffers: %m");
+
+	inst_segno = segno;
+	max_segno = segno + CheckPointSegments;
+	if (!InstallXLogFileSegment(&inst_segno, tmppath, true, max_segno))
+		unlink(tmppath);
+}
+
+/*
+ * Unmaps the current WAL segment file if mapped.
+ */
+static void
+PmemXLogUnmap(void)
+{
+	/* Fast return if not mapped */
+	if (mappedPages == NULL)
+		return;
+
+	if (pmem_unmap(mappedPages, wal_segment_size) < 0)
+		elog(ERROR, "could not pmem_unmap WAL buffers: %m");
+
+	mappedPages = NULL;
+}
+
+/*
+ * Gets the head address of the WAL buffers.
+ */
+char *
+PmemXLogGetBufferPages(void)
+{
+	Assert(wal_pmem_map);
+	Assert(mappedPages != NULL);
+
+	return mappedPages;
+}
+
+/*
+ * Flushes records in the given range [start, end) within a single segment.
+ */
+void
+PmemXLogFlush(XLogRecPtr start, XLogRecPtr end)
+{
+	Size		off;
+
+	Assert(wal_pmem_map);
+	Assert(start < end);
+	Assert(mappedPages != NULL);
+	Assert(XLByteInSeg(start, mappedSegNo, wal_segment_size));
+	Assert(XLByteInPrevSeg(end, mappedSegNo, wal_segment_size));
+
+	off = XLogSegmentOffset(start, wal_segment_size);
+	pmem_flush(mappedPages + off, end - start);
+}
+
+/*
+ * Wait for cache-flush to finish.
+ */
+void
+PmemXLogSync(void)
+{
+	Assert(wal_pmem_map);
+
+	/* Fast return */
+	if (!enableFsync)
+		return;
+
+	pmem_drain();
+}
+
+/*
+ * Wrappers for pmem_map_file.
+ */
+static void *
+PmemCreateMapFile(const char *path, size_t len)
+{
+	return PmemMapFile(path, len, PMEM_FILE_CREATE | PMEM_FILE_EXCL, false);
+}
+
+static void *
+PmemOpenMapFile(const char *path, size_t expected_len)
+{
+	return PmemMapFile(path, expected_len, 0, false);
+}
+
+static void *
+PmemTryOpenMapFile(const char *path, size_t expected_len)
+{
+	return PmemMapFile(path, expected_len, 0, true);
+}
+
+static void *
+PmemMapFile(const char *path, size_t expected_len, int flags, bool try_open)
+{
+	size_t		param_len;
+	int			mode;
+	size_t		mapped_len;
+	int			is_pmem;
+	void	   *addr;
+
+	Assert(expected_len > 0);
+	Assert(expected_len <= INT_MAX);
+
+	param_len = (flags & PMEM_FILE_CREATE) ? expected_len : 0;
+	mode = (flags & PMEM_FILE_CREATE) ? pg_file_create_mode : 0;
+
+	mapped_len = 0;
+	is_pmem = 0;
+	addr = pmem_map_file(path, param_len, flags, mode, &mapped_len, &is_pmem);
+
+	if (addr == NULL)
+	{
+		if (try_open && errno == ENOENT)
+			return NULL;
+
+		ereport(ERROR,
+				(errcode_for_file_access(),
+				 errmsg("could not pmem_map_file \"%s\": %m", path)));
+	}
+
+	if (mapped_len > INT_MAX)
+	{
+		PmemUnmapForError(addr, mapped_len);
+		elog(ERROR,
+			 "unexpected file size: path \"%s\" actual (greater than %d) expected %d",
+			 path, INT_MAX, (int) expected_len);
+	}
+
+	if (mapped_len != expected_len)
+	{
+		PmemUnmapForError(addr, mapped_len);
+		elog(ERROR,
+			 "unexpected file size: path \"%s\" actual %d expected %d",
+			 path, (int) mapped_len, (int) expected_len);
+	}
+
+	if (!is_pmem)
+	{
+		PmemUnmapForError(addr, mapped_len);
+		elog(ERROR, "file not on PMEM: path \"%s\"", path);
+	}
+
+	if ((uintptr_t) addr & ~PG_DAX_HUGEPAGE_MASK)
+		elog(WARNING,
+			 "file not mapped on DAX hugepage boundary: path \"%s\" addr %p",
+			 path, addr);
+
+	return addr;
+}
+
+static void
+PmemUnmapForError(void *addr, size_t len)
+{
+	int		saved_errno;
+
+	saved_errno = errno;
+	(void) pmem_unmap(addr, len);
+	errno = saved_errno;
+}
+
+#endif /* USE_LIBPMEM */
diff --git a/src/include/access/xlogpmem.h b/src/include/access/xlogpmem.h
new file mode 100644
index 0000000000..62c9e1f5e2
--- /dev/null
+++ b/src/include/access/xlogpmem.h
@@ -0,0 +1,59 @@
+/*
+ * xlogpmem.h
+ *
+ * Definitions for PMEM-mapped WAL buffers.
+ *
+ * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/access/xlogpmem.h
+ */
+#ifndef XLOGPMEM_H
+#define XLOGPMEM_H
+
+#include "postgres.h"
+
+#include "c.h"					/* Size */
+#include "access/xlogdefs.h"	/* XLogRecPtr, XLogSegNo */
+
+#ifdef USE_LIBPMEM
+
+/* Prototypes */
+extern XLogSegNo PmemXLogEnsurePrevMapped(XLogRecPtr ptr);
+extern char *PmemXLogGetBufferPages(void);
+extern void PmemXLogFlush(XLogRecPtr start, XLogRecPtr end);
+extern void PmemXLogSync(void);
+
+#else /* USE_LIBPMEM */
+
+#include <stdlib.h> /* abort */
+
+static inline XLogSegNo
+PmemXLogEnsurePrevMapped(XLogRecPtr ptr)
+{
+	abort();
+	return 0;
+}
+
+static inline char *
+PmemXLogGetBufferPages(void)
+{
+	abort();
+	return NULL;
+}
+
+static inline void
+PmemXLogFlush(XLogRecPtr start, XLogRecPtr end)
+{
+	abort();
+}
+
+static inline void
+PmemXLogSync(void)
+{
+	abort();
+}
+
+#endif /* USE_LIBPMEM */
+
+#endif							/* XLOGPMEM_H */
-- 
2.25.1

