From 4a726359d9a8e870bb181bf6181fbf507f9616f2 Mon Sep 17 00:00:00 2001
From: Dilip Kumar <dilipkumar@localhost.localdomain>
Date: Tue, 5 Oct 2021 11:45:02 +0530
Subject: [PATCH v6 6/6] WAL logged CREATE DATABASE

Currently, CREATE DATABASE forces a checkpoint, then copies all the files,
then forces another checkpoint. The comments in the createdb() function
explain the reasons for this. The attached patch fixes this problem by making
create database completely WAL logged so that we can avoid the checkpoints.

This can also be useful for supporting the TDE. For example, if we need different
encryption for the source and the target database then we can not re-encrypt the
page data if we copy the whole directory.  But with this patch, we are copying
page by page so we have an opportunity to re-encrypt the page before copying that
to the target database.
---
 src/backend/access/rmgrdesc/dbasedesc.c |   3 +-
 src/backend/commands/dbcommands.c       | 679 ++++++++++++++++--------
 src/bin/pg_rewind/parsexlog.c           |   1 +
 src/include/commands/dbcommands_xlog.h  |   3 -
 4 files changed, 462 insertions(+), 224 deletions(-)

diff --git a/src/backend/access/rmgrdesc/dbasedesc.c b/src/backend/access/rmgrdesc/dbasedesc.c
index 26609845aa..5010f72b2c 100644
--- a/src/backend/access/rmgrdesc/dbasedesc.c
+++ b/src/backend/access/rmgrdesc/dbasedesc.c
@@ -28,8 +28,7 @@ dbase_desc(StringInfo buf, XLogReaderState *record)
 	{
 		xl_dbase_create_rec *xlrec = (xl_dbase_create_rec *) rec;
 
-		appendStringInfo(buf, "copy dir %u/%u to %u/%u",
-						 xlrec->src_tablespace_id, xlrec->src_db_id,
+		appendStringInfo(buf, "create dir %u/%u",
 						 xlrec->tablespace_id, xlrec->db_id);
 	}
 	else if (info == XLOG_DBASE_DROP)
diff --git a/src/backend/commands/dbcommands.c b/src/backend/commands/dbcommands.c
index 1d963d8428..12f5abcd4d 100644
--- a/src/backend/commands/dbcommands.c
+++ b/src/backend/commands/dbcommands.c
@@ -45,13 +45,13 @@
 #include "commands/dbcommands_xlog.h"
 #include "commands/defrem.h"
 #include "commands/seclabel.h"
+#include "commands/tablecmds.h"
 #include "commands/tablespace.h"
 #include "mb/pg_wchar.h"
 #include "miscadmin.h"
 #include "pgstat.h"
 #include "postmaster/bgwriter.h"
 #include "replication/slot.h"
-#include "storage/copydir.h"
 #include "storage/fd.h"
 #include "storage/ipc.h"
 #include "storage/lmgr.h"
@@ -62,6 +62,7 @@
 #include "utils/builtins.h"
 #include "utils/fmgroids.h"
 #include "utils/pg_locale.h"
+#include "utils/relmapper.h"
 #include "utils/snapmgr.h"
 #include "utils/syscache.h"
 
@@ -77,6 +78,19 @@ typedef struct
 	Oid			dest_tsoid;		/* tablespace we are trying to move to */
 } movedb_failure_params;
 
+/*
+ * When creating a database, we scan the pg_class of the source database to
+ * identify all the relations to be copied.  The structure is used for storing
+ * information about each relation of the source database.
+ */
+typedef struct CreateDBRelInfo
+{
+	RelFileNode		rnode;				/* physical relation identifier */
+	Oid				reloid;				/* relation oid */
+	char			relpersistence;		/* relation's persistence level */
+} CreateDBRelInfo;
+
+
 /* non-export function prototypes */
 static void createdb_failure_callback(int code, Datum arg);
 static void movedb(const char *dbname, const char *tblspcname);
@@ -91,6 +105,425 @@ static bool have_createdb_privilege(void);
 static void remove_dbtablespaces(Oid db_id);
 static bool check_db_file_conflict(Oid db_id);
 static int	errdetail_busy_db(int notherbackends, int npreparedxacts);
+static void CreateDirAndVersionFile(char *dbpath, Oid dbid, Oid tsid,
+									bool isRedo);
+static List *GetDatabaseRelationList(Oid srctbid, Oid srcdbid, char *srcpath);
+static void RelationCopyStorageUsingBuffer(SMgrRelation src, SMgrRelation dst,
+									ForkNumber forkNum, char relpersistence);
+static void CopyDatabase(Oid src_dboid, Oid dboid, Oid src_tsid, Oid dst_tsid);
+
+/*
+ * CreateDirAndVersionFile - Create database directory and write out the
+ *							 PG_VERSION file in the database path.
+ *
+ * If isRedo is true, it's okay for the database directory to exist already.
+ *
+ * We can directly write PG_MAJORVERSION in the version file instead of copying
+ * from the source database file because these two must be the same.
+ */
+static void
+CreateDirAndVersionFile(char *dbpath, Oid dbid, Oid tsid, bool isRedo)
+{
+	int		fd;
+	char	versionfile[MAXPGPATH];
+	StringInfoData	buf;
+
+	/* Prepare version data before starting a critical section. */
+	initStringInfo(&buf);
+	appendStringInfo(&buf, "%s\n", PG_MAJORVERSION);
+
+	/* If we are not in WAL replay then write the WAL. */
+	if (!isRedo)
+	{
+		xl_dbase_create_rec xlrec;
+		XLogRecPtr	lsn;
+
+		/* Now errors are fatal ... */
+		START_CRIT_SECTION();
+
+		xlrec.db_id = dbid;
+		xlrec.tablespace_id = tsid;
+
+		XLogBeginInsert();
+		XLogRegisterData((char *) (&xlrec), sizeof(xl_dbase_create_rec));
+
+		lsn = XLogInsert(RM_DBASE_ID, XLOG_DBASE_CREATE);
+
+		/* As always, WAL must hit the disk before the data update does. */
+		XLogFlush(lsn);
+	}
+
+	/* Create database directory. */
+	if (MakePGDirectory(dbpath) < 0)
+	{
+		/* Failure other than already exists or not in WAL replay? */
+		if (errno != EEXIST || !isRedo)
+			ereport(ERROR,
+					(errcode_for_file_access(),
+					 errmsg("could not create directory \"%s\": %m", dbpath)));
+	}
+
+	/*
+	 * Create PG_VERSION file in the database path.  If the file already exists
+	 * and we are in WAL replay then try again to open it in write mode.
+	 */
+	snprintf(versionfile, sizeof(versionfile), "%s/%s", dbpath, "PG_VERSION");
+
+	fd = OpenTransientFile(versionfile, O_WRONLY | O_CREAT | O_EXCL | PG_BINARY);
+	if (fd < 0 && errno == EEXIST && isRedo)
+		fd = OpenTransientFile(versionfile, O_WRONLY | PG_BINARY);
+
+	if (fd < 0)
+		ereport(ERROR,
+				(errcode_for_file_access(),
+				 errmsg("could not create file \"%s\": %m", versionfile)));
+
+	/* Write PG_MAJORVERSION in the PG_VERSION file. */
+	pgstat_report_wait_start(WAIT_EVENT_COPY_FILE_WRITE);
+	errno = 0;
+	if ((int) write(fd, buf.data, buf.len) != buf.len)
+	{
+		/* If write didn't set errno, assume problem is no disk space. */
+		if (errno == 0)
+			errno = ENOSPC;
+		ereport(ERROR,
+				(errcode_for_file_access(),
+				 errmsg("could not write to file \"%s\": %m", versionfile)));
+	}
+	pgstat_report_wait_end();
+
+	/* Close the version file. */
+	CloseTransientFile(fd);
+
+	/* Critical section done. */
+	if (!isRedo)
+		END_CRIT_SECTION();
+}
+
+/*
+ * GetDatabaseRelationList - Get relfilenode list to be copied.
+ *
+ * Iterate over each block of the pg_class relation.  From there, we will check
+ * all the visible tuples in order to get a list of all the valid relfilenodes
+ * in the source database that should be copied to the target database.
+ */
+static List *
+GetDatabaseRelationList(Oid tbid, Oid dbid, char *srcpath)
+{
+	SMgrRelation	rd_smgr;
+	RelFileNode		rnode;
+	BlockNumber		nblocks;
+	BlockNumber		blkno;
+	OffsetNumber	offnum;
+	OffsetNumber	maxoff;
+	Buffer			buf;
+	Oid				relfilenode;
+	Page			page;
+	List		   *rnodelist = NIL;
+	HeapTupleData	tuple;
+	Form_pg_class	classForm;
+	LockRelId		relid;
+	BufferAccessStrategy bstrategy;
+
+	/* Get pg_class relfilenode. */
+	relfilenode = RelationMapOidToFilenodeForDatabase(srcpath,
+													  RelationRelationId);
+	/*
+	 * We are going to read the buffers associated with the pg_class relation.
+	 * Thus, acquire the relation level lock before start scanning.  As we are
+	 * not connected to the database, we cannot use relation_open directly, so
+	 * we have to lock using relation id.
+	 */
+	relid.dbId = dbid;
+	relid.relId = RelationRelationId;
+	LockRelationId(&relid, AccessShareLock);
+
+	/* Prepare a relnode for pg_class relation. */
+	rnode.spcNode = tbid;
+	rnode.dbNode = dbid;
+	rnode.relNode = relfilenode;
+
+	/*
+	 * We are not connected to the source database so open the pg_class
+	 * relation at the smgr level and get the block count.
+	 */
+	rd_smgr = smgropen(rnode, InvalidBackendId);
+	nblocks = smgrnblocks(rd_smgr, MAIN_FORKNUM);
+
+	/*
+	 * We're going to read the whole pg_class so better to use bulk-read buffer
+	 * access strategy.
+	 */
+	bstrategy = GetAccessStrategy(BAS_BULKREAD);
+
+	/* Iterate over each block on the pg_class relation. */
+	for (blkno = 0; blkno < nblocks; blkno++)
+	{
+		/*
+		 * We are not connected to the source database so directly use the lower
+		 * level bufmgr interface which operates on the rnode.
+		 */
+		buf = ReadBufferWithoutRelcache(rnode, MAIN_FORKNUM, blkno,
+										RBM_NORMAL, bstrategy,
+										RELPERSISTENCE_PERMANENT);
+
+		LockBuffer(buf, BUFFER_LOCK_SHARE);
+		page = BufferGetPage(buf);
+		if (PageIsNew(page) || PageIsEmpty(page))
+		{
+			UnlockReleaseBuffer(buf);
+			continue;
+		}
+
+		maxoff = PageGetMaxOffsetNumber(page);
+
+		/* Iterate over each tuple on the page. */
+		for (offnum = FirstOffsetNumber;
+			 offnum <= maxoff;
+			 offnum = OffsetNumberNext(offnum))
+		{
+			ItemId		itemid;
+
+			itemid = PageGetItemId(page, offnum);
+
+			/* Nothing to do if slot is empty or already dead. */
+			if (!ItemIdIsUsed(itemid) || ItemIdIsDead(itemid) ||
+				ItemIdIsRedirected(itemid))
+				continue;
+
+			Assert(ItemIdIsNormal(itemid));
+			ItemPointerSet(&(tuple.t_self), blkno, offnum);
+
+			tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
+			tuple.t_len = ItemIdGetLength(itemid);
+			tuple.t_tableOid = RelationRelationId;
+
+			/*
+			 * If the tuple is visible then add its relfilenode info to the
+			 * list.
+			 */
+			if (HeapTupleSatisfiesVisibility(&tuple, GetActiveSnapshot(), buf))
+			{
+				Oid				relfilenode = InvalidOid;
+				CreateDBRelInfo   *relinfo;
+
+				classForm = (Form_pg_class) GETSTRUCT(&tuple);
+
+				/* We don't need to copy the shared objects to the target. */
+				if (classForm->reltablespace == GLOBALTABLESPACE_OID)
+					continue;
+
+				/*
+				 * If the object doesn't have the storage then nothing to be
+				 * done for that object so just ignore it.
+				 */
+				if (!RELKIND_HAS_STORAGE(classForm->relkind))
+					continue;
+
+				/*
+				 * If relfilenode is valid then directly use it.  Otherwise,
+				 * consult the relmapper for the mapped relation.
+				 */
+				if (OidIsValid(classForm->relfilenode))
+					relfilenode = classForm->relfilenode;
+				else
+					relfilenode = RelationMapOidToFilenodeForDatabase(srcpath,
+													classForm->oid);
+
+				/* We must have a valid relfilenode oid. */
+				Assert(OidIsValid(relfilenode));
+
+				/* Prepare a rel info element and add it to the list. */
+				relinfo = (CreateDBRelInfo *) palloc(sizeof(CreateDBRelInfo));
+				if (OidIsValid(classForm->reltablespace))
+					relinfo->rnode.spcNode = classForm->reltablespace;
+				else
+					relinfo->rnode.spcNode = tbid;
+
+				relinfo->rnode.dbNode = dbid;
+				relinfo->rnode.relNode = relfilenode;
+				relinfo->reloid = classForm->oid;
+				relinfo->relpersistence = classForm->relpersistence;
+
+				/* Add it to the list. */
+				rnodelist = lappend(rnodelist, relinfo);
+			}
+		}
+
+		/* Release the buffer lock. */
+		UnlockReleaseBuffer(buf);
+	}
+
+	/* Release the lock. */
+	UnlockRelationId(&relid, AccessShareLock);
+
+	return rnodelist;
+}
+
+/*
+ * RelationCopyStorageUsingBuffer - Copy fork's data using bufmgr.
+ *
+ * Same as RelationCopyStorage but instead of using smgrread and smgrextend
+ * this will copy using bufmgr APIs.
+ */
+static void
+RelationCopyStorageUsingBuffer(SMgrRelation src, SMgrRelation dst,
+							   ForkNumber forkNum, char relpersistence)
+{
+	Buffer		srcBuf;
+	Buffer		dstBuf;
+	Page		srcPage;
+	Page		dstPage;
+	bool		use_wal;
+	bool		copying_initfork;
+	BlockNumber nblocks;
+	BlockNumber blkno;
+	BufferAccessStrategy bstrategy_src;
+	BufferAccessStrategy bstrategy_dst;
+
+	/* Refer comments in RelationCopyStorage. */
+	copying_initfork = relpersistence == RELPERSISTENCE_UNLOGGED &&
+		forkNum == INIT_FORKNUM;
+	use_wal = XLogIsNeeded() &&
+		(relpersistence == RELPERSISTENCE_PERMANENT || copying_initfork);
+
+	/* Get number of blocks in the source relation. */
+	nblocks = smgrnblocks(src, forkNum);
+
+	/*
+	 * We are going to copy whole relation from the source to the destination
+	 * so use BAS_BULKREAD strategy for the source relation and BAS_BULKWRITE
+	 * strategy for the destination relation.
+	 */
+	bstrategy_src = GetAccessStrategy(BAS_BULKREAD);
+	bstrategy_dst = GetAccessStrategy(BAS_BULKWRITE);
+
+	/* Iterate over each block of the source relation file. */
+	for (blkno = 0; blkno < nblocks; blkno++)
+	{
+		/* If we got a cancel signal during the copy of the data, quit */
+		CHECK_FOR_INTERRUPTS();
+
+		/* Read block from source relation. */
+		srcBuf = ReadBufferWithoutRelcache(src->smgr_rnode.node, forkNum,
+										   blkno, RBM_NORMAL, bstrategy_src,
+										   relpersistence);
+		srcPage = BufferGetPage(srcBuf);
+		if (PageIsNew(srcPage) || PageIsEmpty(srcPage))
+		{
+			ReleaseBuffer(srcBuf);
+			continue;
+		}
+
+		/* Use P_NEW to extend the relation. */
+		dstBuf = ReadBufferWithoutRelcache(dst->smgr_rnode.node, forkNum,
+										   P_NEW, RBM_NORMAL, bstrategy_dst,
+										   relpersistence);
+		LockBuffer(dstBuf, BUFFER_LOCK_EXCLUSIVE);
+
+		START_CRIT_SECTION();
+
+		/* Initialize the page and write the data. */
+		dstPage = BufferGetPage(dstBuf);
+		PageInit(dstPage, BufferGetPageSize(dstBuf), 0);
+		memcpy(dstPage, srcPage, BLCKSZ);
+		MarkBufferDirty(dstBuf);
+
+		/* WAL-log the copied page. */
+		if (use_wal)
+			log_newpage_buffer(dstBuf, true);
+
+		END_CRIT_SECTION();
+
+		UnlockReleaseBuffer(dstBuf);
+		ReleaseBuffer(srcBuf);
+	}
+}
+
+/*
+ * CopyDatabase - Copy source database to the target database.
+ *
+ * Create target database directory and copy data files from the source database
+ * to the target database, block by block and WAL log all the operations.
+ */
+static void
+CopyDatabase(Oid src_dboid, Oid dst_dboid, Oid src_tsid, Oid dst_tsid)
+{
+	char	   *srcpath;
+	char	   *dstpath;
+	List	   *rnodelist = NULL;
+	ListCell   *cell;
+	LockRelId	relid;
+	RelFileNode	srcrnode;
+	RelFileNode	dstrnode;
+	CreateDBRelInfo	*relinfo;
+
+	/* Get the source database path. */
+	srcpath = GetDatabasePath(src_dboid, src_tsid);
+
+	/* Get the destination database path. */
+	dstpath = GetDatabasePath(dst_dboid, dst_tsid);
+
+	/* Create database directory and write PG_VERSION file. */
+	CreateDirAndVersionFile(dstpath, dst_dboid, dst_tsid, false);
+
+	/* Copy relmap file from source database to the destination database. */
+	CopyRelationMap(dst_dboid, dst_tsid, srcpath, dstpath);
+
+	/* Get list of all valid relnode from the source database. */
+	rnodelist = GetDatabaseRelationList(src_tsid, src_dboid, srcpath);
+	Assert(rnodelist != NIL);
+
+	/*
+	 * Database id is common for all the relation so set it before entering to
+	 * the loop.
+	 */
+	relid.dbId = src_dboid;
+
+	/*
+	 * Iterate over each relfilenode and copy the relation data block by block
+	 * from source database to the destination database.
+	 */
+	foreach(cell, rnodelist)
+	{
+		SMgrRelation	src_smgr;
+		SMgrRelation	dst_smgr;
+
+		relinfo = lfirst(cell);
+		srcrnode = relinfo->rnode;
+
+		/*
+		 * If the relation is from the default tablespace then we need to
+		 * create it in the destinations db's default tablespace.  Otherwise,
+		 * we need to create in the same tablespace as it is in the source
+		 * database.
+		 */
+		if (srcrnode.spcNode != src_tsid)
+			dstrnode.spcNode = srcrnode.spcNode;
+		else
+			dstrnode.spcNode = dst_tsid;
+
+		dstrnode.dbNode = dst_dboid;
+		dstrnode.relNode = srcrnode.relNode;
+
+		/* Acquire the lock on relation before start copying. */
+		relid.relId = relinfo->reloid;
+		LockRelationId(&relid, AccessShareLock);
+
+		/* Open the source and the destination relation at smgr level. */
+		src_smgr = smgropen(srcrnode, InvalidBackendId);
+		dst_smgr = smgropen(dstrnode, InvalidBackendId);
+
+		/* Copy relation storage from source to the destination. */
+		RelationCopyAllFork(src_smgr, dst_smgr, relinfo->relpersistence,
+							RelationCopyStorageUsingBuffer);
+
+		/* Release the lock. */
+		UnlockRelationId(&relid, AccessShareLock);
+	}
+
+	list_free_deep(rnodelist);
+}
 
 
 /*
@@ -99,8 +532,6 @@ static int	errdetail_busy_db(int notherbackends, int npreparedxacts);
 Oid
 createdb(ParseState *pstate, const CreatedbStmt *stmt)
 {
-	TableScanDesc scan;
-	Relation	rel;
 	Oid			src_dboid;
 	Oid			src_owner;
 	int			src_encoding = -1;
@@ -562,19 +993,6 @@ createdb(ParseState *pstate, const CreatedbStmt *stmt)
 	/* Post creation hook for new database */
 	InvokeObjectPostCreateHook(DatabaseRelationId, dboid, 0);
 
-	/*
-	 * Force a checkpoint before starting the copy. This will force all dirty
-	 * buffers, including those of unlogged tables, out to disk, to ensure
-	 * source database is up-to-date on disk for the copy.
-	 * FlushDatabaseBuffers() would suffice for that, but we also want to
-	 * process any pending unlink requests. Otherwise, if a checkpoint
-	 * happened while we're copying files, a file might be deleted just when
-	 * we're about to copy it, causing the lstat() call in copydir() to fail
-	 * with ENOENT.
-	 */
-	RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT
-					  | CHECKPOINT_FLUSH_ALL);
-
 	/*
 	 * Once we start copying subdirectories, we need to be able to clean 'em
 	 * up if we fail.  Use an ENSURE block to make sure this happens.  (This
@@ -587,115 +1005,16 @@ createdb(ParseState *pstate, const CreatedbStmt *stmt)
 	PG_ENSURE_ERROR_CLEANUP(createdb_failure_callback,
 							PointerGetDatum(&fparms));
 	{
-		/*
-		 * Iterate through all tablespaces of the template database, and copy
-		 * each one to the new database.
-		 */
-		rel = table_open(TableSpaceRelationId, AccessShareLock);
-		scan = table_beginscan_catalog(rel, 0, NULL);
-		while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
-		{
-			Form_pg_tablespace spaceform = (Form_pg_tablespace) GETSTRUCT(tuple);
-			Oid			srctablespace = spaceform->oid;
-			Oid			dsttablespace;
-			char	   *srcpath;
-			char	   *dstpath;
-			struct stat st;
-
-			/* No need to copy global tablespace */
-			if (srctablespace == GLOBALTABLESPACE_OID)
-				continue;
-
-			srcpath = GetDatabasePath(src_dboid, srctablespace);
-
-			if (stat(srcpath, &st) < 0 || !S_ISDIR(st.st_mode) ||
-				directory_is_empty(srcpath))
-			{
-				/* Assume we can ignore it */
-				pfree(srcpath);
-				continue;
-			}
-
-			if (srctablespace == src_deftablespace)
-				dsttablespace = dst_deftablespace;
-			else
-				dsttablespace = srctablespace;
-
-			dstpath = GetDatabasePath(dboid, dsttablespace);
-
-			/*
-			 * Copy this subdirectory to the new location
-			 *
-			 * We don't need to copy subdirectories
-			 */
-			copydir(srcpath, dstpath, false);
-
-			/* Record the filesystem change in XLOG */
-			{
-				xl_dbase_create_rec xlrec;
-
-				xlrec.db_id = dboid;
-				xlrec.tablespace_id = dsttablespace;
-				xlrec.src_db_id = src_dboid;
-				xlrec.src_tablespace_id = srctablespace;
-
-				XLogBeginInsert();
-				XLogRegisterData((char *) &xlrec, sizeof(xl_dbase_create_rec));
-
-				(void) XLogInsert(RM_DBASE_ID,
-								  XLOG_DBASE_CREATE | XLR_SPECIAL_REL_UPDATE);
-			}
-		}
-		table_endscan(scan);
-		table_close(rel, AccessShareLock);
-
-		/*
-		 * We force a checkpoint before committing.  This effectively means
-		 * that committed XLOG_DBASE_CREATE operations will never need to be
-		 * replayed (at least not in ordinary crash recovery; we still have to
-		 * make the XLOG entry for the benefit of PITR operations). This
-		 * avoids two nasty scenarios:
-		 *
-		 * #1: When PITR is off, we don't XLOG the contents of newly created
-		 * indexes; therefore the drop-and-recreate-whole-directory behavior
-		 * of DBASE_CREATE replay would lose such indexes.
-		 *
-		 * #2: Since we have to recopy the source database during DBASE_CREATE
-		 * replay, we run the risk of copying changes in it that were
-		 * committed after the original CREATE DATABASE command but before the
-		 * system crash that led to the replay.  This is at least unexpected
-		 * and at worst could lead to inconsistencies, eg duplicate table
-		 * names.
-		 *
-		 * (Both of these were real bugs in releases 8.0 through 8.0.3.)
-		 *
-		 * In PITR replay, the first of these isn't an issue, and the second
-		 * is only a risk if the CREATE DATABASE and subsequent template
-		 * database change both occur while a base backup is being taken.
-		 * There doesn't seem to be much we can do about that except document
-		 * it as a limitation.
-		 *
-		 * Perhaps if we ever implement CREATE DATABASE in a less cheesy way,
-		 * we can avoid this.
-		 */
-		RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
-
-		/*
-		 * Close pg_database, but keep lock till commit.
-		 */
-		table_close(pg_database_rel, NoLock);
-
-		/*
-		 * Force synchronous commit, thus minimizing the window between
-		 * creation of the database files and committal of the transaction. If
-		 * we crash before committing, we'll have a DB that's taking up disk
-		 * space but is not in pg_database, which is not good.
-		 */
-		ForceSyncCommit();
+		CopyDatabase(src_dboid, dboid, src_deftablespace, dst_deftablespace);
 	}
 	PG_END_ENSURE_ERROR_CLEANUP(createdb_failure_callback,
 								PointerGetDatum(&fparms));
 
+	/*
+	 * Close pg_database, but keep lock till commit.
+	 */
+	table_close(pg_database_rel, NoLock);
+
 	return dboid;
 }
 
@@ -1195,34 +1514,6 @@ movedb(const char *dbname, const char *tblspcname)
 	src_dbpath = GetDatabasePath(db_id, src_tblspcoid);
 	dst_dbpath = GetDatabasePath(db_id, dst_tblspcoid);
 
-	/*
-	 * Force a checkpoint before proceeding. This will force all dirty
-	 * buffers, including those of unlogged tables, out to disk, to ensure
-	 * source database is up-to-date on disk for the copy.
-	 * FlushDatabaseBuffers() would suffice for that, but we also want to
-	 * process any pending unlink requests. Otherwise, the check for existing
-	 * files in the target directory might fail unnecessarily, not to mention
-	 * that the copy might fail due to source files getting deleted under it.
-	 * On Windows, this also ensures that background procs don't hold any open
-	 * files, which would cause rmdir() to fail.
-	 */
-	RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT
-					  | CHECKPOINT_FLUSH_ALL);
-
-	/*
-	 * Now drop all buffers holding data of the target database; they should
-	 * no longer be dirty so DropDatabaseBuffers is safe.
-	 *
-	 * It might seem that we could just let these buffers age out of shared
-	 * buffers naturally, since they should not get referenced anymore.  The
-	 * problem with that is that if the user later moves the database back to
-	 * its original tablespace, any still-surviving buffers would appear to
-	 * contain valid data again --- but they'd be missing any changes made in
-	 * the database while it was in the new tablespace.  In any case, freeing
-	 * buffers that should never be used again seems worth the cycles.
-	 */
-	DropDatabaseBuffers(db_id, src_tblspcoid);
-
 	/*
 	 * Check for existence of files in the target directory, i.e., objects of
 	 * this database that are already in the target tablespace.  We can't
@@ -1268,28 +1559,7 @@ movedb(const char *dbname, const char *tblspcname)
 	PG_ENSURE_ERROR_CLEANUP(movedb_failure_callback,
 							PointerGetDatum(&fparms));
 	{
-		/*
-		 * Copy files from the old tablespace to the new one
-		 */
-		copydir(src_dbpath, dst_dbpath, false);
-
-		/*
-		 * Record the filesystem change in XLOG
-		 */
-		{
-			xl_dbase_create_rec xlrec;
-
-			xlrec.db_id = db_id;
-			xlrec.tablespace_id = dst_tblspcoid;
-			xlrec.src_db_id = db_id;
-			xlrec.src_tablespace_id = src_tblspcoid;
-
-			XLogBeginInsert();
-			XLogRegisterData((char *) &xlrec, sizeof(xl_dbase_create_rec));
-
-			(void) XLogInsert(RM_DBASE_ID,
-							  XLOG_DBASE_CREATE | XLR_SPECIAL_REL_UPDATE);
-		}
+		CopyDatabase(db_id, db_id, src_tblspcoid, dst_tblspcoid);
 
 		/*
 		 * Update the database's pg_database tuple
@@ -1322,22 +1592,6 @@ movedb(const char *dbname, const char *tblspcname)
 
 		systable_endscan(sysscan);
 
-		/*
-		 * Force another checkpoint here.  As in CREATE DATABASE, this is to
-		 * ensure that we don't have to replay a committed XLOG_DBASE_CREATE
-		 * operation, which would cause us to lose any unlogged operations
-		 * done in the new DB tablespace before the next checkpoint.
-		 */
-		RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
-
-		/*
-		 * Force synchronous commit, thus minimizing the window between
-		 * copying the database files and committal of the transaction. If we
-		 * crash before committing, we'll leave an orphaned set of files on
-		 * disk, which is not fatal but not good either.
-		 */
-		ForceSyncCommit();
-
 		/*
 		 * Close pg_database, but keep lock till commit.
 		 */
@@ -1346,6 +1600,21 @@ movedb(const char *dbname, const char *tblspcname)
 	PG_END_ENSURE_ERROR_CLEANUP(movedb_failure_callback,
 								PointerGetDatum(&fparms));
 
+	/*
+	 * Now drop all buffers holding data of the target database for the old
+	 * tablespace oid; We have already copied all the data to the new
+	 * tablespace so we no longer required the old buffers.
+	 *
+	 * It might seem that we could just let these buffers age out of shared
+	 * buffers naturally, since they should not get referenced anymore.  The
+	 * problem with that is that if the user later moves the database back to
+	 * its original tablespace, any still-surviving buffers would appear to
+	 * contain valid data again --- but they'd be missing any changes made in
+	 * the database while it was in the new tablespace.  In any case, freeing
+	 * buffers that should never be used again seems worth the cycles.
+	 */
+	DropDatabaseBuffers(db_id, src_tblspcoid);
+
 	/*
 	 * Commit the transaction so that the pg_database update is committed. If
 	 * we crash while removing files, the database won't be corrupt, we'll
@@ -2138,39 +2407,11 @@ dbase_redo(XLogReaderState *record)
 	if (info == XLOG_DBASE_CREATE)
 	{
 		xl_dbase_create_rec *xlrec = (xl_dbase_create_rec *) XLogRecGetData(record);
-		char	   *src_path;
-		char	   *dst_path;
-		struct stat st;
-
-		src_path = GetDatabasePath(xlrec->src_db_id, xlrec->src_tablespace_id);
-		dst_path = GetDatabasePath(xlrec->db_id, xlrec->tablespace_id);
+		char	   *dbpath;
 
-		/*
-		 * Our theory for replaying a CREATE is to forcibly drop the target
-		 * subdirectory if present, then re-copy the source data. This may be
-		 * more work than needed, but it is simple to implement.
-		 */
-		if (stat(dst_path, &st) == 0 && S_ISDIR(st.st_mode))
-		{
-			if (!rmtree(dst_path, true))
-				/* If this failed, copydir() below is going to error. */
-				ereport(WARNING,
-						(errmsg("some useless files may be left behind in old database directory \"%s\"",
-								dst_path)));
-		}
-
-		/*
-		 * Force dirty buffers out to disk, to ensure source database is
-		 * up-to-date for the copy.
-		 */
-		FlushDatabaseBuffers(xlrec->src_db_id);
-
-		/*
-		 * Copy this subdirectory to the new location
-		 *
-		 * We don't need to copy subdirectories
-		 */
-		copydir(src_path, dst_path, false);
+		dbpath = GetDatabasePath(xlrec->db_id, xlrec->tablespace_id);
+		CreateDirAndVersionFile(dbpath, xlrec->db_id, xlrec->tablespace_id,
+								true);
 	}
 	else if (info == XLOG_DBASE_DROP)
 	{
diff --git a/src/bin/pg_rewind/parsexlog.c b/src/bin/pg_rewind/parsexlog.c
index 436df54120..a68f6c732b 100644
--- a/src/bin/pg_rewind/parsexlog.c
+++ b/src/bin/pg_rewind/parsexlog.c
@@ -23,6 +23,7 @@
 #include "fe_utils/archive.h"
 #include "filemap.h"
 #include "pg_rewind.h"
+#include "utils/relmapper.h"
 
 /*
  * RmgrNames is an array of resource manager names, to make error messages
diff --git a/src/include/commands/dbcommands_xlog.h b/src/include/commands/dbcommands_xlog.h
index f5ed762677..21dc58ea5d 100644
--- a/src/include/commands/dbcommands_xlog.h
+++ b/src/include/commands/dbcommands_xlog.h
@@ -23,11 +23,8 @@
 
 typedef struct xl_dbase_create_rec
 {
-	/* Records copying of a single subdirectory incl. contents */
 	Oid			db_id;
 	Oid			tablespace_id;
-	Oid			src_db_id;
-	Oid			src_tablespace_id;
 } xl_dbase_create_rec;
 
 typedef struct xl_dbase_drop_rec
-- 
2.23.0

