On Tue, Mar 15, 2022 at 11:09 PM Robert Haas <robertmh...@gmail.com> wrote:
>
> On Tue, Mar 15, 2022 at 1:26 PM Ashutosh Sharma <ashu.coe...@gmail.com> wrote:
> > > On Tue, Mar 15, 2022 at 12:30 PM Ashutosh Sharma <ashu.coe...@gmail.com> 
> > > wrote:
> > > > Few comments on the latest patch:
> > > >
> > > > -               /* We need to construct the pathname for this database 
> > > > */
> > > > -               dbpath = GetDatabasePath(xlrec->dbid, xlrec->tsid);
> > > > +               if (xlrec->dbid != InvalidOid)
> > > > +                       dbpath = GetDatabasePath(xlrec->dbid, 
> > > > xlrec->tsid);
> > > > +               else
> > > > +                       dbpath = pstrdup("global");
> > > >
> > > > Do we really need this change? Is GetDatabasePath() alone not capable
> > > > of handling it?
> > >
> > > Well, I mean, that function has a special case for
> > > GLOBALTABLESPACE_OID, but GLOBALTABLESPACE_OID is 1664, and InvalidOid
> > > is 0.
> > >
> >
> > Wouldn't this be true only in case of a shared map file (when dbOid is
> > Invalid and tblspcOid is globaltablespace_oid) or am I missing
> > something?
>
> *facepalm*
>
> Good catch, sorry that I'm slow on the uptake today.
>
> v3 attached.

Thanks Ashutosh and Robert.  Other pacthes cleanly applied on this
patch still generated a new version so that we can find all patches
together.  There are no other changes.

-- 
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com
From cd0fe403cd54e5dbeb7e17b321bdf0434b509162 Mon Sep 17 00:00:00 2001
From: Dilip Kumar <dilipkumar@localhost.localdomain>
Date: Tue, 15 Mar 2022 09:18:52 +0530
Subject: [PATCH v16 2/6] Extend relmap interfaces

Support new interfaces in relmapper, 1) Support copying the
relmap file from one database path to the other database path.
2) And another interface for getting filenode from oid.  We already
have RelationMapOidToFilenode for the same purpose but that assumes
we are connected to the database for which we want to get the mapping.
So this new interface will do the same but instead, it will get the
mapping for the input database.

These interfaces are required for next patch, for supporting the
wal logged created database.
---
 src/backend/utils/cache/relmapper.c | 60 +++++++++++++++++++++++++++++++++++++
 src/include/utils/relmapper.h       |  4 ++-
 2 files changed, 63 insertions(+), 1 deletion(-)

diff --git a/src/backend/utils/cache/relmapper.c b/src/backend/utils/cache/relmapper.c
index 4d0718f..5b22dbb 100644
--- a/src/backend/utils/cache/relmapper.c
+++ b/src/backend/utils/cache/relmapper.c
@@ -252,6 +252,60 @@ RelationMapFilenodeToOid(Oid filenode, bool shared)
 }
 
 /*
+ * RelationMapOidToFilenodeForDatabase
+ *
+ * Same as RelationMapOidToFilenode, but instead of reading the mapping from
+ * the database we are connected to it will read the mapping from the input
+ * database.
+ */
+Oid
+RelationMapOidToFilenodeForDatabase(char *dbpath, Oid relationId)
+{
+	RelMapFile	map;
+	int			i;
+
+	/* Read the relmap file from the source database. */
+	read_relmap_file(&map, dbpath, false, ERROR);
+
+	/* Iterate over the relmap entries to find the input relation oid. */
+	for (i = 0; i < map.num_mappings; i++)
+	{
+		if (relationId == map.mappings[i].mapoid)
+			return map.mappings[i].mapfilenode;
+	}
+
+	return InvalidOid;
+}
+
+/*
+ * RelationMapCopy
+ *
+ * Copy relmapfile from source db path to the destination db path and WAL log
+ * the operation.
+ */
+void
+RelationMapCopy(Oid dbid, Oid tsid, char *srcdbpath, char *dstdbpath)
+{
+	RelMapFile map;
+
+	/*
+	 * Read the relmap file from the source database.  This function is only
+	 * called during the create database, so elevel can be ERROR.
+	 */
+	read_relmap_file(&map, srcdbpath, false, ERROR);
+
+	/*
+	 * Write map contents into the destination database's relmap file. No
+	 * sinval needed because we are creating new file while creating a new
+	 * database so no one else must be accessing this file and for the same
+	 * reason we don't need to acquire the RelationMappingLock as well.  And,
+	 * we also don't need to preserve files because we are creating a new
+	 * database so in case of anerror relation files will be deleted anyway.
+	 */
+	write_relmap_file(&map, true, false, false, dbid, tsid, dstdbpath);
+}
+
+/*
  * RelationMapUpdateMap
  *
  * Install a new relfilenode mapping for the specified relation.
@@ -1031,6 +1085,12 @@ relmap_redo(XLogReaderState *record)
 		 *
 		 * There shouldn't be anyone else updating relmaps during WAL replay,
 		 * but grab the lock to interlock against load_relmap_file().
+		 *
+		 * Note - this WAL is also written for copying the relmap file while
+		 * creating a database.  Therefore, it makes no sense to acquire a
+		 * relmap lock or send sinval.  But if we want to avoid that, then we
+		 * must set an extra flag in WAL.  So let it grab the lock and send
+		 * sinval because there is no harm in that.
 		 */
 		LWLockAcquire(RelationMappingLock, LW_EXCLUSIVE);
 		write_relmap_file(&newmap, false, true, false,
diff --git a/src/include/utils/relmapper.h b/src/include/utils/relmapper.h
index 9fbb5a7..f10353e 100644
--- a/src/include/utils/relmapper.h
+++ b/src/include/utils/relmapper.h
@@ -38,7 +38,9 @@ typedef struct xl_relmap_update
 extern Oid	RelationMapOidToFilenode(Oid relationId, bool shared);
 
 extern Oid	RelationMapFilenodeToOid(Oid relationId, bool shared);
-
+extern Oid RelationMapOidToFilenodeForDatabase(char *dbpath, Oid relationId);
+extern void RelationMapCopy(Oid dbid, Oid tsid, char *srcdbpath,
+							char *dstdbpath);
 extern void RelationMapUpdateMap(Oid relationId, Oid fileNode, bool shared,
 								 bool immediate);
 
-- 
1.8.3.1

From cfe9b1cece03e3704902b375d2a18efc288bbe38 Mon Sep 17 00:00:00 2001
From: Dilip Kumar <dilipkumar@localhost.localdomain>
Date: Wed, 16 Mar 2022 09:53:26 +0530
Subject: [PATCH v16 1/6] Refactor relmap load and relmap write functions

Currently, relmap reading and writing interfaces are tightly
coupled with shared_map and local_map of the database
it is connected to.  But as higher level patch set we need
interfaces where we can read relmap into any input memory
and while writing also we should be able to pass the map.
And, also support reading relmap file from input database
path instead of assuming we are connected to the database.

So as part of this patch, we are doing refactoring of the
existing code such that we can expose the read and write
interfaces that are independent of the shared_map and the
local_map, without changing any logic.

Author: Robert Haas
---
 src/backend/utils/cache/relmapper.c | 121 ++++++++++++++++++------------------
 1 file changed, 60 insertions(+), 61 deletions(-)

diff --git a/src/backend/utils/cache/relmapper.c b/src/backend/utils/cache/relmapper.c
index 4f6811f..4d0718f 100644
--- a/src/backend/utils/cache/relmapper.c
+++ b/src/backend/utils/cache/relmapper.c
@@ -137,8 +137,10 @@ static void apply_map_update(RelMapFile *map, Oid relationId, Oid fileNode,
 static void merge_map_updates(RelMapFile *map, const RelMapFile *updates,
 							  bool add_okay);
 static void load_relmap_file(bool shared, bool lock_held);
-static void write_relmap_file(bool shared, RelMapFile *newmap,
-							  bool write_wal, bool send_sinval, bool preserve_files,
+static void read_relmap_file(RelMapFile *map, char *dbpath, bool lock_held,
+							 int elevel);
+static void write_relmap_file(RelMapFile *newmap, bool write_wal,
+							  bool send_sinval, bool preserve_files,
 							  Oid dbid, Oid tsid, const char *dbpath);
 static void perform_relmap_update(bool shared, const RelMapFile *updates);
 
@@ -568,9 +570,9 @@ RelationMapFinishBootstrap(void)
 	Assert(pending_local_updates.num_mappings == 0);
 
 	/* Write the files; no WAL or sinval needed */
-	write_relmap_file(true, &shared_map, false, false, false,
-					  InvalidOid, GLOBALTABLESPACE_OID, NULL);
-	write_relmap_file(false, &local_map, false, false, false,
+	write_relmap_file(&shared_map, false, false, false,
+					  InvalidOid, GLOBALTABLESPACE_OID, "global");
+	write_relmap_file(&local_map, false, false, false,
 					  MyDatabaseId, MyDatabaseTableSpace, DatabasePath);
 }
 
@@ -687,39 +689,48 @@ RestoreRelationMap(char *startAddress)
 }
 
 /*
- * load_relmap_file -- load data from the shared or local map file
+ * load_relmap_file -- load the shared or local map file
  *
- * Because the map file is essential for access to core system catalogs,
- * failure to read it is a fatal error.
+ * Because these files are essential for access to core system catalogs,
+ * failure to load either of them is a fatal error.
  *
  * Note that the local case requires DatabasePath to be set up.
  */
 static void
 load_relmap_file(bool shared, bool lock_held)
 {
-	RelMapFile *map;
+	if (shared)
+		read_relmap_file(&shared_map, "global", lock_held, FATAL);
+	else
+		read_relmap_file(&local_map, DatabasePath, lock_held, FATAL);
+}
+
+/*
+ * read_relmap_file -- load data from any relation mapper file
+ *
+ * dbpath must be the relevant database path, or "global" for shared relations.
+ *
+ * RelationMappingLock will be acquired released unless lock_held = true.
+ *
+ * Errors will be reported at the indicated elevel, which should be at least
+ * ERROR.
+ */
+static void
+read_relmap_file(RelMapFile *map, char *dbpath, bool lock_held, int elevel)
+{
 	char		mapfilename[MAXPGPATH];
 	pg_crc32c	crc;
 	int			fd;
 	int			r;
 
-	if (shared)
-	{
-		snprintf(mapfilename, sizeof(mapfilename), "global/%s",
-				 RELMAPPER_FILENAME);
-		map = &shared_map;
-	}
-	else
-	{
-		snprintf(mapfilename, sizeof(mapfilename), "%s/%s",
-				 DatabasePath, RELMAPPER_FILENAME);
-		map = &local_map;
-	}
+	Assert(elevel >= ERROR);
 
-	/* Read data ... */
+	/* Open the target file. */
+	snprintf(mapfilename, sizeof(mapfilename), "%s/%s", dbpath,
+			 RELMAPPER_FILENAME);
 	fd = OpenTransientFile(mapfilename, O_RDONLY | PG_BINARY);
 	if (fd < 0)
-		ereport(FATAL,
+		ereport(elevel,
 				(errcode_for_file_access(),
 				 errmsg("could not open file \"%s\": %m",
 						mapfilename)));
@@ -734,16 +745,17 @@ load_relmap_file(bool shared, bool lock_held)
 	if (!lock_held)
 		LWLockAcquire(RelationMappingLock, LW_SHARED);
 
+	/* Now read the data. */
 	pgstat_report_wait_start(WAIT_EVENT_RELATION_MAP_READ);
 	r = read(fd, map, sizeof(RelMapFile));
 	if (r != sizeof(RelMapFile))
 	{
 		if (r < 0)
-			ereport(FATAL,
+			ereport(elevel,
 					(errcode_for_file_access(),
 					 errmsg("could not read file \"%s\": %m", mapfilename)));
 		else
-			ereport(FATAL,
+			ereport(elevel,
 					(errcode(ERRCODE_DATA_CORRUPTED),
 					 errmsg("could not read file \"%s\": read %d of %zu",
 							mapfilename, r, sizeof(RelMapFile))));
@@ -754,7 +766,7 @@ load_relmap_file(bool shared, bool lock_held)
 		LWLockRelease(RelationMappingLock);
 
 	if (CloseTransientFile(fd) != 0)
-		ereport(FATAL,
+		ereport(elevel,
 				(errcode_for_file_access(),
 				 errmsg("could not close file \"%s\": %m",
 						mapfilename)));
@@ -763,7 +775,7 @@ load_relmap_file(bool shared, bool lock_held)
 	if (map->magic != RELMAPPER_FILEMAGIC ||
 		map->num_mappings < 0 ||
 		map->num_mappings > MAX_MAPPINGS)
-		ereport(FATAL,
+		ereport(elevel,
 				(errmsg("relation mapping file \"%s\" contains invalid data",
 						mapfilename)));
 
@@ -773,7 +785,7 @@ load_relmap_file(bool shared, bool lock_held)
 	FIN_CRC32C(crc);
 
 	if (!EQ_CRC32C(crc, map->crc))
-		ereport(FATAL,
+		ereport(elevel,
 				(errmsg("relation mapping file \"%s\" contains incorrect checksum",
 						mapfilename)));
 }
@@ -795,16 +807,16 @@ load_relmap_file(bool shared, bool lock_held)
  *
  * Because this may be called during WAL replay when MyDatabaseId,
  * DatabasePath, etc aren't valid, we require the caller to pass in suitable
- * values.  The caller is also responsible for being sure no concurrent
- * map update could be happening.
+ * values. Pass dbpath as "global" for the shared map.
+ *
+ * The caller is also responsible for being sure no concurrent map update
+ * could be happening.
  */
 static void
-write_relmap_file(bool shared, RelMapFile *newmap,
-				  bool write_wal, bool send_sinval, bool preserve_files,
-				  Oid dbid, Oid tsid, const char *dbpath)
+write_relmap_file(RelMapFile *newmap, bool write_wal, bool send_sinval,
+				  bool preserve_files, Oid dbid, Oid tsid, const char *dbpath)
 {
 	int			fd;
-	RelMapFile *realmap;
 	char		mapfilename[MAXPGPATH];
 
 	/*
@@ -822,19 +834,8 @@ write_relmap_file(bool shared, RelMapFile *newmap,
 	 * Open the target file.  We prefer to do this before entering the
 	 * critical section, so that an open() failure need not force PANIC.
 	 */
-	if (shared)
-	{
-		snprintf(mapfilename, sizeof(mapfilename), "global/%s",
-				 RELMAPPER_FILENAME);
-		realmap = &shared_map;
-	}
-	else
-	{
-		snprintf(mapfilename, sizeof(mapfilename), "%s/%s",
-				 dbpath, RELMAPPER_FILENAME);
-		realmap = &local_map;
-	}
-
+	snprintf(mapfilename, sizeof(mapfilename), "%s/%s",
+			 dbpath, RELMAPPER_FILENAME);
 	fd = OpenTransientFile(mapfilename, O_WRONLY | O_CREAT | PG_BINARY);
 	if (fd < 0)
 		ereport(ERROR,
@@ -934,16 +935,6 @@ write_relmap_file(bool shared, RelMapFile *newmap,
 		}
 	}
 
-	/*
-	 * Success, update permanent copy.  During bootstrap, we might be working
-	 * on the permanent copy itself, in which case skip the memcpy() to avoid
-	 * invoking nominally-undefined behavior.
-	 */
-	if (realmap != newmap)
-		memcpy(realmap, newmap, sizeof(RelMapFile));
-	else
-		Assert(!send_sinval);	/* must be bootstrapping */
-
 	/* Critical section done */
 	if (write_wal)
 		END_CRIT_SECTION();
@@ -990,10 +981,19 @@ perform_relmap_update(bool shared, const RelMapFile *updates)
 	merge_map_updates(&newmap, updates, allowSystemTableMods);
 
 	/* Write out the updated map and do other necessary tasks */
-	write_relmap_file(shared, &newmap, true, true, true,
+	write_relmap_file(&newmap, true, true, true,
 					  (shared ? InvalidOid : MyDatabaseId),
 					  (shared ? GLOBALTABLESPACE_OID : MyDatabaseTableSpace),
-					  DatabasePath);
+					  (shared ? "global" : DatabasePath));
+
+	/*
+	 * We succesfully wrote the updated file, so it's now safe to rely on the
+	 * new values in this process, too.
+	 */
+	if (shared)
+		memcpy(&shared_map, &newmap, sizeof(RelMapFile));
+	else
+		memcpy(&local_map, &newmap, sizeof(RelMapFile));
 
 	/* Now we can release the lock */
 	LWLockRelease(RelationMappingLock);
@@ -1033,8 +1033,7 @@ relmap_redo(XLogReaderState *record)
 		 * but grab the lock to interlock against load_relmap_file().
 		 */
 		LWLockAcquire(RelationMappingLock, LW_EXCLUSIVE);
-		write_relmap_file((xlrec->dbid == InvalidOid), &newmap,
-						  false, true, false,
+		write_relmap_file(&newmap, false, true, false,
 						  xlrec->dbid, xlrec->tsid, dbpath);
 		LWLockRelease(RelationMappingLock);
 
-- 
1.8.3.1

From 328c353539b4acf5ec9b8c802801a2321dfc2e03 Mon Sep 17 00:00:00 2001
From: Dilip Kumar <dilipkumar@localhost.localdomain>
Date: Thu, 10 Feb 2022 15:55:33 +0530
Subject: [PATCH v16 3/6] Allow ReadBufferWithoutRelcache to support unlogged
 relpersistence

At present, this function may only be used on permanent relations,
because we only use it during XLOG replay.  But now as part of the
bigger patch set, we will be using this for reading the buffer from
the database to which we are not connected.  So now we need this
for the unlogged relations as well.
---
 src/backend/access/transam/xlogutils.c |  6 +++---
 src/backend/storage/buffer/bufmgr.c    | 18 ++++++++++--------
 src/include/storage/bufmgr.h           |  3 ++-
 3 files changed, 15 insertions(+), 12 deletions(-)

diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c
index 54d5f20..6b10656 100644
--- a/src/backend/access/transam/xlogutils.c
+++ b/src/backend/access/transam/xlogutils.c
@@ -484,7 +484,7 @@ XLogReadBufferExtended(RelFileNode rnode, ForkNumber forknum,
 	{
 		/* page exists in file */
 		buffer = ReadBufferWithoutRelcache(rnode, forknum, blkno,
-										   mode, NULL);
+										   mode, NULL, true);
 	}
 	else
 	{
@@ -509,7 +509,7 @@ XLogReadBufferExtended(RelFileNode rnode, ForkNumber forknum,
 				ReleaseBuffer(buffer);
 			}
 			buffer = ReadBufferWithoutRelcache(rnode, forknum,
-											   P_NEW, mode, NULL);
+											   P_NEW, mode, NULL, true);
 		}
 		while (BufferGetBlockNumber(buffer) < blkno);
 		/* Handle the corner case that P_NEW returns non-consecutive pages */
@@ -519,7 +519,7 @@ XLogReadBufferExtended(RelFileNode rnode, ForkNumber forknum,
 				LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
 			ReleaseBuffer(buffer);
 			buffer = ReadBufferWithoutRelcache(rnode, forknum, blkno,
-											   mode, NULL);
+											   mode, NULL, true);
 		}
 	}
 
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index f5459c6..3cadcd2 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -772,23 +772,25 @@ ReadBufferExtended(Relation reln, ForkNumber forkNum, BlockNumber blockNum,
  * ReadBufferWithoutRelcache -- like ReadBufferExtended, but doesn't require
  *		a relcache entry for the relation.
  *
- * NB: At present, this function may only be used on permanent relations, which
- * is OK, because we only use it during XLOG replay.  If in the future we
- * want to use it on temporary or unlogged relations, we could pass additional
- * parameters.
+ * The caller should pass 'permanent' as true for the regular relation and
+ * false for the unlogged relation.
+ *
+ * NB: At present, this function may only be used on unlogged and regular
+ * relations, which is OK, because we only use it during XLOG replay and while
+ * copying the database.  If in the future we want to use it on temporary
+ * relations, we could pass additional parameters.
  */
 Buffer
 ReadBufferWithoutRelcache(RelFileNode rnode, ForkNumber forkNum,
 						  BlockNumber blockNum, ReadBufferMode mode,
-						  BufferAccessStrategy strategy)
+						  BufferAccessStrategy strategy, bool permanent)
 {
 	bool		hit;
 
 	SMgrRelation smgr = smgropen(rnode, InvalidBackendId);
 
-	Assert(InRecovery);
-
-	return ReadBuffer_common(smgr, RELPERSISTENCE_PERMANENT, forkNum, blockNum,
+	return ReadBuffer_common(smgr, permanent ? RELPERSISTENCE_PERMANENT :
+							 RELPERSISTENCE_UNLOGGED, forkNum, blockNum,
 							 mode, strategy, &hit);
 }
 
diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h
index dd01841..fd0452f 100644
--- a/src/include/storage/bufmgr.h
+++ b/src/include/storage/bufmgr.h
@@ -184,7 +184,8 @@ extern Buffer ReadBufferExtended(Relation reln, ForkNumber forkNum,
 								 BufferAccessStrategy strategy);
 extern Buffer ReadBufferWithoutRelcache(RelFileNode rnode,
 										ForkNumber forkNum, BlockNumber blockNum,
-										ReadBufferMode mode, BufferAccessStrategy strategy);
+										ReadBufferMode mode, BufferAccessStrategy strategy,
+										bool permanent);
 extern void ReleaseBuffer(Buffer buffer);
 extern void UnlockReleaseBuffer(Buffer buffer);
 extern void MarkBufferDirty(Buffer buffer);
-- 
1.8.3.1

From 96a4aa7be6d8a789ebd7deb5fe8f1e107ddaa19b Mon Sep 17 00:00:00 2001
From: Dilip Kumar <dilipkumar@localhost.localdomain>
Date: Fri, 24 Sep 2021 18:29:17 +0530
Subject: [PATCH v16 4/6] New interface to lock relation id

Currently, we have LockRelationOid which provide a mechanism to
lock the relation oid but we must be connected to the database
from which this relation belong.  As part of this patch we are
providing a new interface which can lock the relation even if we
are not connected to the containing database.
---
 src/backend/storage/lmgr/lmgr.c | 28 ++++++++++++++++++++++++++++
 src/include/storage/lmgr.h      |  1 +
 2 files changed, 29 insertions(+)

diff --git a/src/backend/storage/lmgr/lmgr.c b/src/backend/storage/lmgr/lmgr.c
index 5ae52dd..1543da6 100644
--- a/src/backend/storage/lmgr/lmgr.c
+++ b/src/backend/storage/lmgr/lmgr.c
@@ -176,6 +176,34 @@ ConditionalLockRelationOid(Oid relid, LOCKMODE lockmode)
 }
 
 /*
+ *		LockRelationId
+ *
+ * Lock, given a LockRelId.  Same as LockRelationOid but take LockRelId as an
+ * input.
+ */
+void
+LockRelationId(LockRelId *relid, LOCKMODE lockmode)
+{
+	LOCKTAG		tag;
+	LOCALLOCK  *locallock;
+	LockAcquireResult res;
+
+	SET_LOCKTAG_RELATION(tag, relid->dbId, relid->relId);
+
+	res = LockAcquireExtended(&tag, lockmode, false, false, true, &locallock);
+
+	/*
+	 * Now that we have the lock, check for invalidation messages; see notes
+	 * in LockRelationOid.
+	 */
+	if (res != LOCKACQUIRE_ALREADY_CLEAR)
+	{
+		AcceptInvalidationMessages();
+		MarkLockClear(locallock);
+	}
+}
+
+/*
  *		UnlockRelationId
  *
  * Unlock, given a LockRelId.  This is preferred over UnlockRelationOid
diff --git a/src/include/storage/lmgr.h b/src/include/storage/lmgr.h
index 49edbcc..be1d2c9 100644
--- a/src/include/storage/lmgr.h
+++ b/src/include/storage/lmgr.h
@@ -38,6 +38,7 @@ extern void RelationInitLockInfo(Relation relation);
 
 /* Lock a relation */
 extern void LockRelationOid(Oid relid, LOCKMODE lockmode);
+extern void LockRelationId(LockRelId *relid, LOCKMODE lockmode);
 extern bool ConditionalLockRelationOid(Oid relid, LOCKMODE lockmode);
 extern void UnlockRelationId(LockRelId *relid, LOCKMODE lockmode);
 extern void UnlockRelationOid(Oid relid, LOCKMODE lockmode);
-- 
1.8.3.1

From 05d089efccb2f8e60812bfd826ef36d1c9d70d93 Mon Sep 17 00:00:00 2001
From: Dilip Kumar <dilipkumar@localhost.localdomain>
Date: Tue, 15 Mar 2022 09:41:20 +0530
Subject: [PATCH v16 5/6] WAL logged CREATE DATABASE

Currently, CREATE DATABASE forces a checkpoint, then copies all the files,
then forces another checkpoint. The comments in the createdb() function
explain the reasons for this. The attached patch fixes this problem by making
create database completely WAL logged so that we can avoid the checkpoints.

We are also maintaining the old way of creating the database and for that we
are providing an option to choose the strategy for creating the database.
For the new method the user need to give STRATEGY=WAL_LOG and for the
old method they need to give STRATEGY=FILE_COPY.  The default strategy will
be WAL_LOG.
---
 contrib/bloom/blinsert.c                 |   2 +-
 doc/src/sgml/ref/create_database.sgml    |  23 +
 src/backend/access/heap/heapam_handler.c |   2 +-
 src/backend/access/nbtree/nbtree.c       |   2 +-
 src/backend/access/rmgrdesc/dbasedesc.c  |  20 +-
 src/backend/commands/dbcommands.c        | 716 ++++++++++++++++++++++++++-----
 src/backend/storage/buffer/bufmgr.c      | 156 +++++++
 src/bin/pg_rewind/parsexlog.c            |   9 +-
 src/bin/psql/tab-complete.c              |   4 +-
 src/include/commands/dbcommands_xlog.h   |  24 +-
 src/include/storage/bufmgr.h             |   3 +
 src/tools/pgindent/typedefs.list         |   5 +-
 12 files changed, 838 insertions(+), 128 deletions(-)

diff --git a/contrib/bloom/blinsert.c b/contrib/bloom/blinsert.c
index c94cf34..82378db 100644
--- a/contrib/bloom/blinsert.c
+++ b/contrib/bloom/blinsert.c
@@ -173,7 +173,7 @@ blbuildempty(Relation index)
 	 * Write the page and log it.  It might seem that an immediate sync would
 	 * be sufficient to guarantee that the file exists on disk, but recovery
 	 * itself might remove it while replaying, for example, an
-	 * XLOG_DBASE_CREATE or XLOG_TBLSPC_CREATE record.  Therefore, we need
+	 * XLOG_DBASE_CREATE* or XLOG_TBLSPC_CREATE record.  Therefore, we need
 	 * this even when wal_level=minimal.
 	 */
 	PageSetChecksumInplace(metapage, BLOOM_METAPAGE_BLKNO);
diff --git a/doc/src/sgml/ref/create_database.sgml b/doc/src/sgml/ref/create_database.sgml
index f70d0c7..b0c94e40 100644
--- a/doc/src/sgml/ref/create_database.sgml
+++ b/doc/src/sgml/ref/create_database.sgml
@@ -34,6 +34,7 @@ CREATE DATABASE <replaceable class="parameter">name</replaceable>
            [ CONNECTION LIMIT [=] <replaceable class="parameter">connlimit</replaceable> ]
            [ IS_TEMPLATE [=] <replaceable class="parameter">istemplate</replaceable> ]
            [ OID [=] <replaceable class="parameter">oid</replaceable> ] ]
+           [ STRATEGY [=] <replaceable class="parameter">strategy</replaceable> ] ]
 </synopsis>
  </refsynopsisdiv>
 
@@ -240,6 +241,28 @@ CREATE DATABASE <replaceable class="parameter">name</replaceable>
        </listitem>
       </varlistentry>
 
+      <varlistentry>
+       <term><replaceable class="parameter">strategy</replaceable></term>
+       <listitem>
+        <para>
+         This is used for copying the database directory.  Currently, we have
+         two strategies the <literal>WAL_LOG</literal> and the
+         <literal>FILE_COPY</literal>.  If <literal>WAL_LOG</literal> strategy
+         is used then the database will be copied block by block and it will
+         also WAL log each copied block.  Otherwise, if <literal>FILE_COPY
+         </literal> strategy is used then it will do the file system level copy
+         but the individual operations will not be WAL logged.  The default
+         strategy is <literal>WAL_LOG</literal>.  If we choose the file system
+         level copy then it has to issue a checkpoint before and after
+         performing the copy and if there are a lot of dirty buffers then
+         performing the checkpoint could be costly and it may impact the
+         performance of the whole system.  On the other hand, if we wal log
+         each block then it may take more time in database creation if the
+         source database is large.
+        </para>
+       </listitem>
+      </varlistentry>
+
     </variablelist>
 
   <para>
diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c
index 39ef8a0..2b70ca0 100644
--- a/src/backend/access/heap/heapam_handler.c
+++ b/src/backend/access/heap/heapam_handler.c
@@ -601,7 +601,7 @@ heapam_relation_set_new_filenode(Relation rel,
 	 * even if the page has been logged, because the write did not go through
 	 * shared_buffers and therefore a concurrent checkpoint may have moved the
 	 * redo pointer past our xlog record.  Recovery may as well remove it
-	 * while replaying, for example, XLOG_DBASE_CREATE or XLOG_TBLSPC_CREATE
+	 * while replaying, for example, XLOG_DBASE_CREATE* or XLOG_TBLSPC_CREATE
 	 * record. Therefore, logging is necessary even if wal_level=minimal.
 	 */
 	if (persistence == RELPERSISTENCE_UNLOGGED)
diff --git a/src/backend/access/nbtree/nbtree.c b/src/backend/access/nbtree/nbtree.c
index c9b4964..dacf3f7 100644
--- a/src/backend/access/nbtree/nbtree.c
+++ b/src/backend/access/nbtree/nbtree.c
@@ -161,7 +161,7 @@ btbuildempty(Relation index)
 	 * Write the page and log it.  It might seem that an immediate sync would
 	 * be sufficient to guarantee that the file exists on disk, but recovery
 	 * itself might remove it while replaying, for example, an
-	 * XLOG_DBASE_CREATE or XLOG_TBLSPC_CREATE record.  Therefore, we need
+	 * XLOG_DBASE_CREATE* or XLOG_TBLSPC_CREATE record.  Therefore, we need
 	 * this even when wal_level=minimal.
 	 */
 	PageSetChecksumInplace(metapage, BTREE_METAPAGE);
diff --git a/src/backend/access/rmgrdesc/dbasedesc.c b/src/backend/access/rmgrdesc/dbasedesc.c
index 03af3fd..523d0b3 100644
--- a/src/backend/access/rmgrdesc/dbasedesc.c
+++ b/src/backend/access/rmgrdesc/dbasedesc.c
@@ -24,14 +24,23 @@ dbase_desc(StringInfo buf, XLogReaderState *record)
 	char	   *rec = XLogRecGetData(record);
 	uint8		info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
 
-	if (info == XLOG_DBASE_CREATE)
+	if (info == XLOG_DBASE_CREATE_FILE_COPY)
 	{
-		xl_dbase_create_rec *xlrec = (xl_dbase_create_rec *) rec;
+		xl_dbase_create_file_copy_rec *xlrec =
+		(xl_dbase_create_file_copy_rec *) rec;
 
 		appendStringInfo(buf, "copy dir %u/%u to %u/%u",
 						 xlrec->src_tablespace_id, xlrec->src_db_id,
 						 xlrec->tablespace_id, xlrec->db_id);
 	}
+	else if (info == XLOG_DBASE_CREATE_WAL_LOG)
+	{
+		xl_dbase_create_wal_log_rec *xlrec =
+		(xl_dbase_create_wal_log_rec *) rec;
+
+		appendStringInfo(buf, "create dir %u/%u",
+						 xlrec->tablespace_id, xlrec->db_id);
+	}
 	else if (info == XLOG_DBASE_DROP)
 	{
 		xl_dbase_drop_rec *xlrec = (xl_dbase_drop_rec *) rec;
@@ -51,8 +60,11 @@ dbase_identify(uint8 info)
 
 	switch (info & ~XLR_INFO_MASK)
 	{
-		case XLOG_DBASE_CREATE:
-			id = "CREATE";
+		case XLOG_DBASE_CREATE_FILE_COPY:
+			id = "CREATE_FILE_COPY";
+			break;
+		case XLOG_DBASE_CREATE_WAL_LOG:
+			id = "CREATE_WAL_LOG";
 			break;
 		case XLOG_DBASE_DROP:
 			id = "DROP";
diff --git a/src/backend/commands/dbcommands.c b/src/backend/commands/dbcommands.c
index c37e3c9..9636688 100644
--- a/src/backend/commands/dbcommands.c
+++ b/src/backend/commands/dbcommands.c
@@ -63,13 +63,27 @@
 #include "utils/builtins.h"
 #include "utils/fmgroids.h"
 #include "utils/pg_locale.h"
+#include "utils/relmapper.h"
 #include "utils/snapmgr.h"
 #include "utils/syscache.h"
 
+/*
+ * Create database strategy.  The CREATEDB_WAL_LOG will copy the database at
+ * the block level and WAL log each copied block.  Whereas the
+ * CREATEDB_FILE_COPY will directly do the file system level copy of the
+ * database so the individual operations will not be WAL logged.
+ */
+typedef enum CreateDBStrategy
+{
+	CREATEDB_WAL_LOG,
+	CREATEDB_FILE_COPY
+} CreateDBStrategy;
+
 typedef struct
 {
 	Oid			src_dboid;		/* source (template) DB */
 	Oid			dest_dboid;		/* DB we are trying to create */
+	CreateDBStrategy strategy;	/* create db strategy */
 } createdb_failure_params;
 
 typedef struct
@@ -78,6 +92,19 @@ typedef struct
 	Oid			dest_tsoid;		/* tablespace we are trying to move to */
 } movedb_failure_params;
 
+/*
+ * When creating a database, we scan the pg_class of the source database to
+ * identify all the relations to be copied.  The structure is used for storing
+ * information about each relation of the source database.
+ */
+typedef struct CreateDBRelInfo
+{
+	RelFileNode rnode;			/* physical relation identifier */
+	Oid			reloid;			/* relation oid */
+	bool		permanent;		/* relation is permanent or unlogged */
+} CreateDBRelInfo;
+
+
 /* non-export function prototypes */
 static void createdb_failure_callback(int code, Datum arg);
 static void movedb(const char *dbname, const char *tblspcname);
@@ -92,7 +119,507 @@ static bool have_createdb_privilege(void);
 static void remove_dbtablespaces(Oid db_id);
 static bool check_db_file_conflict(Oid db_id);
 static int	errdetail_busy_db(int notherbackends, int npreparedxacts);
+static void CreateDirAndVersionFile(char *dbpath, Oid dbid, Oid tsid,
+									bool isRedo);
+static CreateDBRelInfo *ScanSourceDatabasePgClassTuple(HeapTupleData *tuple,
+													   Oid tbid, Oid dbid,
+													   char *srcpath);
+static List *ScanSourceDatabasePgClassPage(Page page, Buffer buf, Oid tbid,
+										   Oid dbid, char *srcpath,
+										   List *rnodelist, Snapshot snapshot);
+static List *ScanSourceDatabasePgClass(Oid srctbid, Oid srcdbid, char *srcpath);
+static void CreateDatabaseUsingWalLog(Oid src_dboid, Oid dboid, Oid src_tsid,
+									  Oid dst_tsid);
+static void CreateDatabaseUsingFileCopy(Oid src_dboid, Oid dboid, Oid src_tsid,
+										Oid dst_tsid);
+
+/*
+ * Create database directory and write out the PG_VERSION file in the database
+ * path.  If isRedo is true, it's okay for the database directory to exist
+ * already.  We can directly write PG_MAJORVERSION in the version file instead
+ * of copying from the source database file because these two must be the same.
+ */
+static void
+CreateDirAndVersionFile(char *dbpath, Oid dbid, Oid tsid, bool isRedo)
+{
+	int			fd;
+	int			nbytes;
+	char		versionfile[MAXPGPATH];
+	char		buf[16];
+
+	/* Prepare version data before starting a critical section. */
+	sprintf(buf, "%s\n", PG_MAJORVERSION);
+	nbytes = strlen(PG_MAJORVERSION) + 1;
+
+	/* If we are not in WAL replay then write the WAL. */
+	if (!isRedo)
+	{
+		xl_dbase_create_wal_log_rec xlrec;
+		XLogRecPtr	lsn;
+
+		START_CRIT_SECTION();
+
+		xlrec.db_id = dbid;
+		xlrec.tablespace_id = tsid;
+
+		XLogBeginInsert();
+		XLogRegisterData((char *) (&xlrec),
+						 sizeof(xl_dbase_create_wal_log_rec));
+
+		lsn = XLogInsert(RM_DBASE_ID, XLOG_DBASE_CREATE_WAL_LOG);
+
+		/* As always, WAL must hit the disk before the data update does. */
+		XLogFlush(lsn);
+	}
+
+	/* Create database directory. */
+	if (MakePGDirectory(dbpath) < 0)
+	{
+		/* Failure other than already exists or not in WAL replay? */
+		if (errno != EEXIST || !isRedo)
+			ereport(ERROR,
+					(errcode_for_file_access(),
+					 errmsg("could not create directory \"%s\": %m", dbpath)));
+	}
+
+	/*
+	 * Create PG_VERSION file in the database path.  If the file already
+	 * exists and we are in WAL replay then try again to open it in write
+	 * mode.
+	 */
+	snprintf(versionfile, sizeof(versionfile), "%s/%s", dbpath, "PG_VERSION");
+
+	fd = OpenTransientFile(versionfile, O_WRONLY | O_CREAT | O_EXCL | PG_BINARY);
+	if (fd < 0 && errno == EEXIST && isRedo)
+		fd = OpenTransientFile(versionfile, O_WRONLY | O_TRUNC | PG_BINARY);
+
+	if (fd < 0)
+		ereport(ERROR,
+				(errcode_for_file_access(),
+				 errmsg("could not create file \"%s\": %m", versionfile)));
+
+	/* Write PG_MAJORVERSION in the PG_VERSION file. */
+	pgstat_report_wait_start(WAIT_EVENT_COPY_FILE_WRITE);
+	errno = 0;
+	if ((int) write(fd, buf, nbytes) != nbytes)
+	{
+		/* If write didn't set errno, assume problem is no disk space. */
+		if (errno == 0)
+			errno = ENOSPC;
+		ereport(ERROR,
+				(errcode_for_file_access(),
+				 errmsg("could not write to file \"%s\": %m", versionfile)));
+	}
+	pgstat_report_wait_end();
+
+	/* Close the version file. */
+	CloseTransientFile(fd);
+
+	/* Critical section done. */
+	if (!isRedo)
+		END_CRIT_SECTION();
+}
+
+/*
+ * Helper function for ScanSourceDatabasePgClassPage to prepare a single
+ * CreateDBRelInfo element from the input pg_class tuple.
+ */
+CreateDBRelInfo *
+ScanSourceDatabasePgClassTuple(HeapTupleData *tuple, Oid tbid, Oid dbid,
+							   char *srcpath)
+{
+	CreateDBRelInfo	   *relinfo;
+	Form_pg_class		classForm;
+	Oid					relfilenode = InvalidOid;
+
+	classForm = (Form_pg_class) GETSTRUCT(tuple);
+
+	/*
+	 * If this is a shared object, the object doesn't have the storage or a
+	 * temp relation then nothing to be done, so just return.
+	 */
+	if (classForm->reltablespace == GLOBALTABLESPACE_OID ||
+		!RELKIND_HAS_STORAGE(classForm->relkind) ||
+		classForm->relpersistence == RELPERSISTENCE_TEMP)
+		return NULL;
+
+	/*
+	 * If relfilenode is valid then directly use it.  Otherwise, consult the
+	 * relmapper for the mapped relation.
+	 */
+	if (OidIsValid(classForm->relfilenode))
+		relfilenode = classForm->relfilenode;
+	else
+		relfilenode = RelationMapOidToFilenodeForDatabase(srcpath,
+														  classForm->oid);
+
+	/* We must have a valid relfilenode oid. */
+	Assert(OidIsValid(relfilenode));
+
+	/* Prepare a rel info element and add it to the list. */
+	relinfo = (CreateDBRelInfo *) palloc(sizeof(CreateDBRelInfo));
+	if (OidIsValid(classForm->reltablespace))
+		relinfo->rnode.spcNode = classForm->reltablespace;
+	else
+		relinfo->rnode.spcNode = tbid;
+
+	relinfo->rnode.dbNode = dbid;
+	relinfo->rnode.relNode = relfilenode;
+	relinfo->reloid = classForm->oid;
+
+	/* We should never reach here for the temp relations. */
+	Assert(classForm->relpersistence != RELPERSISTENCE_TEMP);
+	relinfo->permanent =
+		(classForm->relpersistence == RELPERSISTENCE_PERMANENT) ? true : false;
+
+	return relinfo;
+}
+
+/*
+ * Helper function for ScanSourceDatabasePgClass to identify all the valid
+ * relfilenodes for the given page.
+ */
+static List *
+ScanSourceDatabasePgClassPage(Page page, Buffer buf, Oid tbid, Oid dbid,
+							  char *srcpath, List *rnodelist,
+							  Snapshot snapshot)
+{
+	BlockNumber		blkno = BufferGetBlockNumber(buf);
+	OffsetNumber	offnum;
+	OffsetNumber	maxoff;
+	HeapTupleData	tuple;
+
+	maxoff = PageGetMaxOffsetNumber(page);
+
+	/* Iterate over each tuple of the page. */
+	for (offnum = FirstOffsetNumber;
+		 offnum <= maxoff;
+		 offnum = OffsetNumberNext(offnum))
+	{
+		ItemId		itemid;
+
+		itemid = PageGetItemId(page, offnum);
+
+		/* Nothing to do if slot is empty or already dead. */
+		if (!ItemIdIsUsed(itemid) || ItemIdIsDead(itemid) ||
+			ItemIdIsRedirected(itemid))
+			continue;
+
+		Assert(ItemIdIsNormal(itemid));
+		ItemPointerSet(&(tuple.t_self), blkno, offnum);
+
+		/* Initialize a HeapTupleData structure. */
+		tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
+		tuple.t_len = ItemIdGetLength(itemid);
+		tuple.t_tableOid = RelationRelationId;
+
+		/*
+		 * If the pg_class tuple is visible then prepare a CreateDBRelInfo and
+		 * append it to the list.
+		 */
+		if (HeapTupleSatisfiesVisibility(&tuple, snapshot, buf))
+		{
+			CreateDBRelInfo *relinfo;
+
+			relinfo = ScanSourceDatabasePgClassTuple(&tuple, tbid, dbid,
+													 srcpath);
+
+			/* Add it to the list. */
+			if (relinfo != NULL)
+				rnodelist = lappend(rnodelist, relinfo);
+		}
+	}
+
+	return rnodelist;
+}
+
+/*
+ * Identify all the valid relfilenodes from the source database so that we can
+ * copy them to the destination database.  In order to identify that, this
+ * function will iterate over each block of the pg_class relation of the source
+ * database.  From there, we will check all the visible tuples in order to get
+ * a list of all the valid relfilenodes in the source database.
+ */
+static List *
+ScanSourceDatabasePgClass(Oid tbid, Oid dbid, char *srcpath)
+{
+	RelFileNode rnode;
+	BlockNumber nblocks;
+	BlockNumber blkno;
+	Buffer		buf;
+	Oid			relfilenode;
+	Page		page;
+	List	   *rnodelist = NIL;
+	LockRelId	relid;
+	Snapshot	snapshot;
+	SMgrRelation rd_smgr;
+	BufferAccessStrategy bstrategy;
+
+	/* Get pg_class relfilenode. */
+	relfilenode = RelationMapOidToFilenodeForDatabase(srcpath,
+													  RelationRelationId);
+
+	/*
+	 * We are going to read the buffers associated with the pg_class relation.
+	 * Thus, acquire the relation level lock before start scanning.  As we are
+	 * not connected to the database, we cannot use relation_open directly, so
+	 * we have to lock using relation id.
+	 */
+	relid.dbId = dbid;
+	relid.relId = RelationRelationId;
+	LockRelationId(&relid, AccessShareLock);
+
+	/* Prepare a relnode for pg_class relation. */
+	rnode.spcNode = tbid;
+	rnode.dbNode = dbid;
+	rnode.relNode = relfilenode;
+
+	/*
+	 * We are not connected to the source database so open the pg_class
+	 * relation at the smgr level and get the block count.
+	 */
+	rd_smgr = smgropen(rnode, InvalidBackendId);
+	nblocks = smgrnblocks(rd_smgr, MAIN_FORKNUM);
+
+	/*
+	 * We're going to read the whole pg_class so better to use bulk-read
+	 * buffer access strategy.
+	 */
+	bstrategy = GetAccessStrategy(BAS_BULKREAD);
+
+	/* Get latest snapshot for scanning the pg_class. */
+	snapshot = GetLatestSnapshot();
+
+	/* Iterate over each block of the pg_class relation. */
+	for (blkno = 0; blkno < nblocks; blkno++)
+	{
+		/*
+		 * We are not connected to the source database so directly use the
+		 * lower level bufmgr interface which operates on the rnode.
+		 */
+		buf = ReadBufferWithoutRelcache(rnode, MAIN_FORKNUM, blkno,
+										RBM_NORMAL, bstrategy, false);
+
+		LockBuffer(buf, BUFFER_LOCK_SHARE);
+		page = BufferGetPage(buf);
+		if (PageIsNew(page) || PageIsEmpty(page))
+		{
+			UnlockReleaseBuffer(buf);
+			continue;
+		}
+
+		/*
+		 * Process pg_class tuples for the current page and add all the valid
+		 * relfilenode entries to the rnodelist.
+		 */
+		rnodelist = ScanSourceDatabasePgClassPage(page, buf, tbid, dbid,
+												  srcpath, rnodelist,
+												  snapshot);
+
+		/* Release the buffer lock. */
+		UnlockReleaseBuffer(buf);
+	}
+
+	/* Release the lock. */
+	UnlockRelationId(&relid, AccessShareLock);
+
+	return rnodelist;
+}
+
+/*
+ * Copy source database to the target using WAL.  Create target database
+ * directory and copy data files from the source database to the target
+ * database, block by block and WAL log all the operations.
+ */
+static void
+CreateDatabaseUsingWalLog(Oid src_dboid, Oid dst_dboid, Oid src_tsid, Oid dst_tsid)
+{
+	char	   *srcpath;
+	char	   *dstpath;
+	List	   *rnodelist = NULL;
+	ListCell   *cell;
+	LockRelId	relid;
+	RelFileNode srcrnode;
+	RelFileNode dstrnode;
+	CreateDBRelInfo *relinfo;
+
+	/* Get the source database path. */
+	srcpath = GetDatabasePath(src_dboid, src_tsid);
+
+	/* Get the destination database path. */
+	dstpath = GetDatabasePath(dst_dboid, dst_tsid);
+
+	/* Create database directory and write PG_VERSION file. */
+	CreateDirAndVersionFile(dstpath, dst_dboid, dst_tsid, false);
+
+	/* Copy relmap file from source database to the destination database. */
+	RelationMapCopy(dst_dboid, dst_tsid, srcpath, dstpath);
+
+	/* Get list of all valid relnode from the source database. */
+	rnodelist = ScanSourceDatabasePgClass(src_tsid, src_dboid, srcpath);
+	Assert(rnodelist != NIL);
+
+	/*
+	 * Database id is common for all the relation so set it before entering to
+	 * the loop.
+	 */
+	relid.dbId = src_dboid;
+
+	/*
+	 * Iterate over each relfilenode and copy the relation data block by block
+	 * from source database to the destination database.
+	 */
+	foreach(cell, rnodelist)
+	{
+		relinfo = lfirst(cell);
+		srcrnode = relinfo->rnode;
+
+		/*
+		 * If the relation is from the source db's default tablespace then we
+		 * need to create it in the destinations db's default tablespace.
+		 * Otherwise, we need to create in the same tablespace as it is in the
+		 * source database.
+		 */
+		if (srcrnode.spcNode == src_tsid)
+			dstrnode.spcNode = dst_tsid;
+		else
+			dstrnode.spcNode = srcrnode.spcNode;
+
+		dstrnode.dbNode = dst_dboid;
+		dstrnode.relNode = srcrnode.relNode;
+
+		/* Acquire the lock on relation before start copying. */
+		relid.relId = relinfo->reloid;
+		LockRelationId(&relid, AccessShareLock);
+
+		/* Copy relation storage from source to the destination. */
+		CreateAndCopyRelationData(srcrnode, dstrnode, relinfo->permanent);
 
+		/* Release the lock. */
+		UnlockRelationId(&relid, AccessShareLock);
+	}
+
+	list_free_deep(rnodelist);
+}
+
+/*
+ * Copy source database directory to the destination directory using file
+ * system level copy operation.
+ */
+static void
+CreateDatabaseUsingFileCopy(Oid src_dboid, Oid dst_dboid, Oid src_tsid,
+							Oid dst_tsid)
+{
+	TableScanDesc scan;
+	Relation	rel;
+	HeapTuple	tuple;
+
+	/*
+	 * Force a checkpoint before starting the copy. This will force all dirty
+	 * buffers, including those of unlogged tables, out to disk, to ensure
+	 * source database is up-to-date on disk for the copy.
+	 * FlushDatabaseBuffers() would suffice for that, but we also want to
+	 * process any pending unlink requests. Otherwise, if a checkpoint
+	 * happened while we're copying files, a file might be deleted just when
+	 * we're about to copy it, causing the lstat() call in copydir() to fail
+	 * with ENOENT.
+	 */
+	RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE |
+					  CHECKPOINT_WAIT | CHECKPOINT_FLUSH_ALL);
+
+	/*
+	 * Iterate through all tablespaces of the template database, and copy each
+	 * one to the new database.
+	 */
+	rel = table_open(TableSpaceRelationId, AccessShareLock);
+	scan = table_beginscan_catalog(rel, 0, NULL);
+	while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
+	{
+		Form_pg_tablespace spaceform = (Form_pg_tablespace) GETSTRUCT(tuple);
+		Oid			srctablespace = spaceform->oid;
+		Oid			dsttablespace;
+		char	   *srcpath;
+		char	   *dstpath;
+		struct stat st;
+
+		/* No need to copy global tablespace */
+		if (srctablespace == GLOBALTABLESPACE_OID)
+			continue;
+
+		srcpath = GetDatabasePath(src_dboid, srctablespace);
+
+		if (stat(srcpath, &st) < 0 || !S_ISDIR(st.st_mode) ||
+			directory_is_empty(srcpath))
+		{
+			/* Assume we can ignore it */
+			pfree(srcpath);
+			continue;
+		}
+
+		if (srctablespace == src_tsid)
+			dsttablespace = dst_tsid;
+		else
+			dsttablespace = srctablespace;
+
+		dstpath = GetDatabasePath(dst_dboid, dsttablespace);
+
+		/*
+		 * Copy this subdirectory to the new location
+		 *
+		 * We don't need to copy subdirectories
+		 */
+		copydir(srcpath, dstpath, false);
+
+		/* Record the filesystem change in XLOG */
+		{
+			xl_dbase_create_file_copy_rec xlrec;
+
+			xlrec.db_id = dst_dboid;
+			xlrec.tablespace_id = dsttablespace;
+			xlrec.src_db_id = src_dboid;
+			xlrec.src_tablespace_id = srctablespace;
+
+			XLogBeginInsert();
+			XLogRegisterData((char *) &xlrec,
+							 sizeof(xl_dbase_create_file_copy_rec));
+
+			(void) XLogInsert(RM_DBASE_ID,
+							  XLOG_DBASE_CREATE_FILE_COPY | XLR_SPECIAL_REL_UPDATE);
+		}
+	}
+	table_endscan(scan);
+	table_close(rel, AccessShareLock);
+
+	/*
+	 * We force a checkpoint before committing.  This effectively means that
+	 * committed XLOG_DBASE_CREATE_FILE_COPY operations will never need to be
+	 * replayed (at least not in ordinary crash recovery; we still have to
+	 * make the XLOG entry for the benefit of PITR operations). This avoids
+	 * two nasty scenarios:
+	 *
+	 * #1: When PITR is off, we don't XLOG the contents of newly created
+	 * indexes; therefore the drop-and-recreate-whole-directory behavior of
+	 * DBASE_CREATE replay would lose such indexes.
+	 *
+	 * #2: Since we have to recopy the source database during DBASE_CREATE
+	 * replay, we run the risk of copying changes in it that were committed
+	 * after the original CREATE DATABASE command but before the system crash
+	 * that led to the replay.  This is at least unexpected and at worst could
+	 * lead to inconsistencies, eg duplicate table names.
+	 *
+	 * (Both of these were real bugs in releases 8.0 through 8.0.3.)
+	 *
+	 * In PITR replay, the first of these isn't an issue, and the second is
+	 * only a risk if the CREATE DATABASE and subsequent template database
+	 * change both occur while a base backup is being taken. There doesn't
+	 * seem to be much we can do about that except document it as a
+	 * limitation.
+	 *
+	 * Perhaps if we ever implement CREATE DATABASE in a less cheesy way, we
+	 * can avoid this.
+	 */
+	RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
+}
 
 /*
  * CREATE DATABASE
@@ -100,8 +627,6 @@ static int	errdetail_busy_db(int notherbackends, int npreparedxacts);
 Oid
 createdb(ParseState *pstate, const CreatedbStmt *stmt)
 {
-	TableScanDesc scan;
-	Relation	rel;
 	Oid			src_dboid;
 	Oid			src_owner;
 	int			src_encoding = -1;
@@ -132,6 +657,7 @@ createdb(ParseState *pstate, const CreatedbStmt *stmt)
 	DefElem    *dallowconnections = NULL;
 	DefElem    *dconnlimit = NULL;
 	DefElem    *dcollversion = NULL;
+	DefElem    *dstrategy = NULL;
 	char	   *dbname = stmt->dbname;
 	char	   *dbowner = NULL;
 	const char *dbtemplate = NULL;
@@ -145,6 +671,7 @@ createdb(ParseState *pstate, const CreatedbStmt *stmt)
 	char	   *dbcollversion = NULL;
 	int			notherbackends;
 	int			npreparedxacts;
+	CreateDBStrategy dbstrategy = CREATEDB_WAL_LOG;
 	createdb_failure_params fparms;
 
 	/* Extract options from the statement node tree */
@@ -250,6 +777,12 @@ createdb(ParseState *pstate, const CreatedbStmt *stmt)
 						(errcode(ERRCODE_INVALID_PARAMETER_VALUE)),
 						errmsg("OIDs less than %u are reserved for system objects", FirstNormalObjectId));
 		}
+		else if (strcmp(defel->defname, "strategy") == 0)
+		{
+			if (dstrategy)
+				errorConflictingDefElem(defel, pstate);
+			dstrategy = defel;
+		}
 		else
 			ereport(ERROR,
 					(errcode(ERRCODE_SYNTAX_ERROR),
@@ -374,6 +907,23 @@ createdb(ParseState *pstate, const CreatedbStmt *stmt)
 							dbtemplate)));
 	}
 
+	/* Validate the database creation strategy. */
+	if (dstrategy && dstrategy->arg)
+	{
+		char	   *strategy;
+
+		strategy = defGetString(dstrategy);
+		if (strcmp(strategy, "wal_log") == 0)
+			dbstrategy = CREATEDB_WAL_LOG;
+		else if (strcmp(strategy, "file_copy") == 0)
+			dbstrategy = CREATEDB_FILE_COPY;
+		else
+			ereport(ERROR,
+					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+					 errmsg("invalid create database strategy %s", strategy),
+					 errhint("Valid strategies are \"wal_log\", and \"file_copy\".")));
+	}
+
 	/* If encoding or locales are defaulted, use source's setting */
 	if (encoding < 0)
 		encoding = src_encoding;
@@ -668,19 +1218,6 @@ createdb(ParseState *pstate, const CreatedbStmt *stmt)
 	InvokeObjectPostCreateHook(DatabaseRelationId, dboid, 0);
 
 	/*
-	 * Force a checkpoint before starting the copy. This will force all dirty
-	 * buffers, including those of unlogged tables, out to disk, to ensure
-	 * source database is up-to-date on disk for the copy.
-	 * FlushDatabaseBuffers() would suffice for that, but we also want to
-	 * process any pending unlink requests. Otherwise, if a checkpoint
-	 * happened while we're copying files, a file might be deleted just when
-	 * we're about to copy it, causing the lstat() call in copydir() to fail
-	 * with ENOENT.
-	 */
-	RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT
-					  | CHECKPOINT_FLUSH_ALL);
-
-	/*
 	 * Once we start copying subdirectories, we need to be able to clean 'em
 	 * up if we fail.  Use an ENSURE block to make sure this happens.  (This
 	 * is not a 100% solution, because of the possibility of failure during
@@ -689,101 +1226,24 @@ createdb(ParseState *pstate, const CreatedbStmt *stmt)
 	 */
 	fparms.src_dboid = src_dboid;
 	fparms.dest_dboid = dboid;
+	fparms.strategy = dbstrategy;
+
 	PG_ENSURE_ERROR_CLEANUP(createdb_failure_callback,
 							PointerGetDatum(&fparms));
 	{
 		/*
-		 * Iterate through all tablespaces of the template database, and copy
-		 * each one to the new database.
-		 */
-		rel = table_open(TableSpaceRelationId, AccessShareLock);
-		scan = table_beginscan_catalog(rel, 0, NULL);
-		while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
-		{
-			Form_pg_tablespace spaceform = (Form_pg_tablespace) GETSTRUCT(tuple);
-			Oid			srctablespace = spaceform->oid;
-			Oid			dsttablespace;
-			char	   *srcpath;
-			char	   *dstpath;
-			struct stat st;
-
-			/* No need to copy global tablespace */
-			if (srctablespace == GLOBALTABLESPACE_OID)
-				continue;
-
-			srcpath = GetDatabasePath(src_dboid, srctablespace);
-
-			if (stat(srcpath, &st) < 0 || !S_ISDIR(st.st_mode) ||
-				directory_is_empty(srcpath))
-			{
-				/* Assume we can ignore it */
-				pfree(srcpath);
-				continue;
-			}
-
-			if (srctablespace == src_deftablespace)
-				dsttablespace = dst_deftablespace;
-			else
-				dsttablespace = srctablespace;
-
-			dstpath = GetDatabasePath(dboid, dsttablespace);
-
-			/*
-			 * Copy this subdirectory to the new location
-			 *
-			 * We don't need to copy subdirectories
-			 */
-			copydir(srcpath, dstpath, false);
-
-			/* Record the filesystem change in XLOG */
-			{
-				xl_dbase_create_rec xlrec;
-
-				xlrec.db_id = dboid;
-				xlrec.tablespace_id = dsttablespace;
-				xlrec.src_db_id = src_dboid;
-				xlrec.src_tablespace_id = srctablespace;
-
-				XLogBeginInsert();
-				XLogRegisterData((char *) &xlrec, sizeof(xl_dbase_create_rec));
-
-				(void) XLogInsert(RM_DBASE_ID,
-								  XLOG_DBASE_CREATE | XLR_SPECIAL_REL_UPDATE);
-			}
-		}
-		table_endscan(scan);
-		table_close(rel, AccessShareLock);
-
-		/*
-		 * We force a checkpoint before committing.  This effectively means
-		 * that committed XLOG_DBASE_CREATE operations will never need to be
-		 * replayed (at least not in ordinary crash recovery; we still have to
-		 * make the XLOG entry for the benefit of PITR operations). This
-		 * avoids two nasty scenarios:
-		 *
-		 * #1: When PITR is off, we don't XLOG the contents of newly created
-		 * indexes; therefore the drop-and-recreate-whole-directory behavior
-		 * of DBASE_CREATE replay would lose such indexes.
-		 *
-		 * #2: Since we have to recopy the source database during DBASE_CREATE
-		 * replay, we run the risk of copying changes in it that were
-		 * committed after the original CREATE DATABASE command but before the
-		 * system crash that led to the replay.  This is at least unexpected
-		 * and at worst could lead to inconsistencies, eg duplicate table
-		 * names.
-		 *
-		 * (Both of these were real bugs in releases 8.0 through 8.0.3.)
-		 *
-		 * In PITR replay, the first of these isn't an issue, and the second
-		 * is only a risk if the CREATE DATABASE and subsequent template
-		 * database change both occur while a base backup is being taken.
-		 * There doesn't seem to be much we can do about that except document
-		 * it as a limitation.
-		 *
-		 * Perhaps if we ever implement CREATE DATABASE in a less cheesy way,
-		 * we can avoid this.
+		 * If the user has asked to create a database with WAL_LOG strategy
+		 * then call CreateDatabaseUsingWalLog, which will copy the database
+		 * at the block level and it will WAL log each copied block.
+		 * Otherwise, call CreateDatabaseUsingFileCopy that will copy the
+		 * database file by file.
 		 */
-		RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
+		if (dbstrategy == CREATEDB_WAL_LOG)
+			CreateDatabaseUsingWalLog(src_dboid, dboid, src_deftablespace,
+									  dst_deftablespace);
+		else
+			CreateDatabaseUsingFileCopy(src_dboid, dboid, src_deftablespace,
+										dst_deftablespace);
 
 		/*
 		 * Close pg_database, but keep lock till commit.
@@ -870,6 +1330,21 @@ createdb_failure_callback(int code, Datum arg)
 	createdb_failure_params *fparms = (createdb_failure_params *) DatumGetPointer(arg);
 
 	/*
+	 * If we were copying database at block levels then drop pages for the
+	 * destination database that are in the shared buffer cache.  And tell
+	 * checkpointer to forget any pending fsync and unlink requests for files
+	 * in the database.  The reasoning behind doing this is same as explained
+	 * in dropdb function.  But unlike dropdb we don't need to call
+	 * pgstat_drop_database because this database is still not created so
+	 * there should not be any stat for this.
+	 */
+	if (fparms->strategy == CREATEDB_WAL_LOG)
+	{
+		DropDatabaseBuffers(fparms->dest_dboid);
+		ForgetDatabaseSyncRequests(fparms->dest_dboid);
+	}
+
+	/*
 	 * Release lock on source database before doing recursive remove. This is
 	 * not essential but it seems desirable to release the lock as soon as
 	 * possible.
@@ -1393,7 +1868,7 @@ movedb(const char *dbname, const char *tblspcname)
 		 * Record the filesystem change in XLOG
 		 */
 		{
-			xl_dbase_create_rec xlrec;
+			xl_dbase_create_file_copy_rec xlrec;
 
 			xlrec.db_id = db_id;
 			xlrec.tablespace_id = dst_tblspcoid;
@@ -1401,10 +1876,11 @@ movedb(const char *dbname, const char *tblspcname)
 			xlrec.src_tablespace_id = src_tblspcoid;
 
 			XLogBeginInsert();
-			XLogRegisterData((char *) &xlrec, sizeof(xl_dbase_create_rec));
+			XLogRegisterData((char *) &xlrec,
+							 sizeof(xl_dbase_create_file_copy_rec));
 
 			(void) XLogInsert(RM_DBASE_ID,
-							  XLOG_DBASE_CREATE | XLR_SPECIAL_REL_UPDATE);
+							  XLOG_DBASE_CREATE_FILE_COPY | XLR_SPECIAL_REL_UPDATE);
 		}
 
 		/*
@@ -1440,9 +1916,10 @@ movedb(const char *dbname, const char *tblspcname)
 
 		/*
 		 * Force another checkpoint here.  As in CREATE DATABASE, this is to
-		 * ensure that we don't have to replay a committed XLOG_DBASE_CREATE
-		 * operation, which would cause us to lose any unlogged operations
-		 * done in the new DB tablespace before the next checkpoint.
+		 * ensure that we don't have to replay a committed
+		 * XLOG_DBASE_CREATE_FILE_COPY operation, which would cause us to lose
+		 * any unlogged operations done in the new DB tablespace before the
+		 * next checkpoint.
 		 */
 		RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
 
@@ -2377,9 +2854,10 @@ dbase_redo(XLogReaderState *record)
 	/* Backup blocks are not used in dbase records */
 	Assert(!XLogRecHasAnyBlockRefs(record));
 
-	if (info == XLOG_DBASE_CREATE)
+	if (info == XLOG_DBASE_CREATE_FILE_COPY)
 	{
-		xl_dbase_create_rec *xlrec = (xl_dbase_create_rec *) XLogRecGetData(record);
+		xl_dbase_create_file_copy_rec *xlrec =
+		(xl_dbase_create_file_copy_rec *) XLogRecGetData(record);
 		char	   *src_path;
 		char	   *dst_path;
 		struct stat st;
@@ -2414,6 +2892,18 @@ dbase_redo(XLogReaderState *record)
 		 */
 		copydir(src_path, dst_path, false);
 	}
+	else if (info == XLOG_DBASE_CREATE_WAL_LOG)
+	{
+		xl_dbase_create_wal_log_rec *xlrec =
+		(xl_dbase_create_wal_log_rec *) XLogRecGetData(record);
+		char	   *dbpath;
+
+		dbpath = GetDatabasePath(xlrec->db_id, xlrec->tablespace_id);
+
+		/* Create the database directory with the version file. */
+		CreateDirAndVersionFile(dbpath, xlrec->db_id, xlrec->tablespace_id,
+								true);
+	}
 	else if (info == XLOG_DBASE_DROP)
 	{
 		xl_dbase_drop_rec *xlrec = (xl_dbase_drop_rec *) XLogRecGetData(record);
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 3cadcd2..b1cebc4 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -38,6 +38,7 @@
 #include "access/xlogutils.h"
 #include "catalog/catalog.h"
 #include "catalog/storage.h"
+#include "catalog/storage_xlog.h"
 #include "executor/instrument.h"
 #include "lib/binaryheap.h"
 #include "miscadmin.h"
@@ -486,6 +487,9 @@ static void FindAndDropRelFileNodeBuffers(RelFileNode rnode,
 										  ForkNumber forkNum,
 										  BlockNumber nForkBlock,
 										  BlockNumber firstDelBlock);
+static void RelationCopyStorageUsingBuffer(SMgrRelation src, SMgrRelation dst,
+										   ForkNumber forkNum,
+										   bool isunlogged);
 static void AtProcExit_Buffers(int code, Datum arg);
 static void CheckForBufferLeaks(void);
 static int	rnode_comparator(const void *p1, const void *p2);
@@ -3679,6 +3683,158 @@ FlushRelationsAllBuffers(SMgrRelation *smgrs, int nrels)
 }
 
 /* ---------------------------------------------------------------------
+ *		RelationCopyStorageUsingBuffer
+ *
+ *		Copy fork's data using bufmgr.  Same as RelationCopyStorage but instead
+ *		of using smgrread and smgrextend this will copy using bufmgr APIs.
+ *
+ *		Refer comments atop CreateAndCopyRelationData() for details about
+ *		'permanent' parameter.
+ * --------------------------------------------------------------------
+ */
+static void
+RelationCopyStorageUsingBuffer(SMgrRelation src, SMgrRelation dst,
+							   ForkNumber forkNum, bool permanent)
+{
+	Buffer		srcBuf;
+	Buffer		dstBuf;
+	Page		srcPage;
+	Page		dstPage;
+	bool		use_wal;
+	BlockNumber nblocks;
+	BlockNumber blkno;
+	BufferAccessStrategy bstrategy_src;
+	BufferAccessStrategy bstrategy_dst;
+
+	/*
+	 * We need to log the copied data in WAL iff WAL archiving/streaming is
+	 * enabled and the relation is persistent, or this is the init fork of an
+	 * unlogged relation.
+	 */
+	use_wal = XLogIsNeeded() && (permanent || forkNum == INIT_FORKNUM);
+
+	/* Get number of blocks in the source relation. */
+	nblocks = smgrnblocks(src, forkNum);
+
+	/* Nothing to copy; just return. */
+	if (nblocks == 0)
+		return;
+
+	/*
+	 * We are going to copy whole relation from the source to the destination
+	 * so use BAS_BULKREAD strategy for the source relation and BAS_BULKWRITE
+	 * strategy for the destination relation.
+	 */
+	bstrategy_src = GetAccessStrategy(BAS_BULKREAD);
+	bstrategy_dst = GetAccessStrategy(BAS_BULKWRITE);
+
+	/* Iterate over each block of the source relation file. */
+	for (blkno = 0; blkno < nblocks; blkno++)
+	{
+		/* If we got a cancel signal during the copy of the data, quit */
+		CHECK_FOR_INTERRUPTS();
+
+		/* Read block from source relation. */
+		srcBuf = ReadBufferWithoutRelcache(src->smgr_rnode.node, forkNum,
+										   blkno, RBM_NORMAL, bstrategy_src,
+										   permanent);
+		srcPage = BufferGetPage(srcBuf);
+		if (PageIsNew(srcPage) || PageIsEmpty(srcPage))
+		{
+			ReleaseBuffer(srcBuf);
+			continue;
+		}
+
+		/* Use P_NEW to extend the relation. */
+		dstBuf = ReadBufferWithoutRelcache(dst->smgr_rnode.node, forkNum,
+										   P_NEW, RBM_NORMAL, bstrategy_dst,
+										   permanent);
+		LockBuffer(dstBuf, BUFFER_LOCK_EXCLUSIVE);
+
+		START_CRIT_SECTION();
+
+		/* Initialize the page and write the data. */
+		dstPage = BufferGetPage(dstBuf);
+		PageInit(dstPage, BufferGetPageSize(dstBuf), 0);
+		memcpy(dstPage, srcPage, BLCKSZ);
+		MarkBufferDirty(dstBuf);
+
+		/* WAL-log the copied page. */
+		if (use_wal)
+			log_newpage_buffer(dstBuf, true);
+
+		END_CRIT_SECTION();
+
+		UnlockReleaseBuffer(dstBuf);
+		ReleaseBuffer(srcBuf);
+	}
+}
+
+/* ---------------------------------------------------------------------
+ *		CreateAndCopyRelationData
+ *
+ *		Create destination relation storage and copy source relation's all
+ *		fork's data to the destination.
+ *
+ *		Curretly this API is not supported for the temporary relations.  So
+ *		pass permanent as true for the regular relation and false for the
+ *		unlogged relation.
+ * --------------------------------------------------------------------
+ */
+void
+CreateAndCopyRelationData(RelFileNode src_rnode, RelFileNode dst_rnode,
+						  bool permanent)
+{
+	SMgrRelation	src_smgr;
+	SMgrRelation	dst_smgr;
+	char			relpersistence;
+
+	/* Open the source relation at smgr level. */
+	src_smgr = smgropen(src_rnode, InvalidBackendId);
+
+	/* Set the relpersistence. */
+	relpersistence = permanent ?
+		RELPERSISTENCE_PERMANENT : RELPERSISTENCE_UNLOGGED;
+
+	/*
+	 * Create and copy all forks of the relation.
+	 *
+	 * NOTE: any conflict in relfilenode value will be caught in
+	 * RelationCreateStorage().
+	 */
+	dst_smgr = RelationCreateStorage(dst_rnode, relpersistence);
+
+	/* copy main fork */
+	RelationCopyStorageUsingBuffer(src_smgr, dst_smgr, MAIN_FORKNUM,
+								   permanent);
+
+	/* copy those extra forks that exist */
+	for (ForkNumber forkNum = MAIN_FORKNUM + 1;
+		 forkNum <= MAX_FORKNUM; forkNum++)
+	{
+		if (smgrexists(src_smgr, forkNum))
+		{
+			smgrcreate(dst_smgr, forkNum, false);
+
+			/*
+			 * WAL log creation if the relation is persistent, or this is the
+			 * init fork of an unlogged relation.
+			 */
+			if (permanent || forkNum == INIT_FORKNUM)
+				log_smgrcreate(&dst_rnode, forkNum);
+
+			/* Copy a fork's data, block by block. */
+			RelationCopyStorageUsingBuffer(src_smgr, dst_smgr, forkNum,
+										   permanent);
+		}
+	}
+
+	/* Close the smgr rel */
+	smgrclose(src_smgr);
+	smgrclose(dst_smgr);
+}
+
+/* ---------------------------------------------------------------------
  *		FlushDatabaseBuffers
  *
  *		This function writes all dirty pages of a database out to disk
diff --git a/src/bin/pg_rewind/parsexlog.c b/src/bin/pg_rewind/parsexlog.c
index 56df08c..d5cf9ed 100644
--- a/src/bin/pg_rewind/parsexlog.c
+++ b/src/bin/pg_rewind/parsexlog.c
@@ -370,7 +370,7 @@ extractPageInfo(XLogReaderState *record)
 
 	/* Is this a special record type that I recognize? */
 
-	if (rmid == RM_DBASE_ID && rminfo == XLOG_DBASE_CREATE)
+	if (rmid == RM_DBASE_ID && rminfo == XLOG_DBASE_CREATE_FILE_COPY)
 	{
 		/*
 		 * New databases can be safely ignored. It won't be present in the
@@ -382,6 +382,13 @@ extractPageInfo(XLogReaderState *record)
 		 * overwriting the database created in the target system.
 		 */
 	}
+	else if (rmid == RM_DBASE_ID && rminfo == XLOG_DBASE_CREATE_WAL_LOG)
+	{
+		/*
+		 * New databases can be safely ignored. It won't be present in the
+		 * source system, so it will be deleted.
+		 */
+	}
 	else if (rmid == RM_DBASE_ID && rminfo == XLOG_DBASE_DROP)
 	{
 		/*
diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c
index 1717282..d0e3755 100644
--- a/src/bin/psql/tab-complete.c
+++ b/src/bin/psql/tab-complete.c
@@ -2738,10 +2738,12 @@ psql_completion(const char *text, int start, int end)
 		COMPLETE_WITH("OWNER", "TEMPLATE", "ENCODING", "TABLESPACE",
 					  "IS_TEMPLATE",
 					  "ALLOW_CONNECTIONS", "CONNECTION LIMIT",
-					  "LC_COLLATE", "LC_CTYPE", "LOCALE", "OID");
+					  "LC_COLLATE", "LC_CTYPE", "LOCALE", "OID", "STRATEGY");
 
 	else if (Matches("CREATE", "DATABASE", MatchAny, "TEMPLATE"))
 		COMPLETE_WITH_QUERY(Query_for_list_of_template_databases);
+	else if (Matches("CREATE", "DATABASE", MatchAny, "STRATEGY"))
+		COMPLETE_WITH("WAL_LOG", "FILE_COPY");
 
 	/* CREATE DOMAIN */
 	else if (Matches("CREATE", "DOMAIN", MatchAny))
diff --git a/src/include/commands/dbcommands_xlog.h b/src/include/commands/dbcommands_xlog.h
index 593a857..077a000 100644
--- a/src/include/commands/dbcommands_xlog.h
+++ b/src/include/commands/dbcommands_xlog.h
@@ -18,17 +18,31 @@
 #include "lib/stringinfo.h"
 
 /* record types */
-#define XLOG_DBASE_CREATE		0x00
-#define XLOG_DBASE_DROP			0x10
+#define XLOG_DBASE_CREATE_FILE_COPY		0x00
+#define XLOG_DBASE_CREATE_WAL_LOG		0x10
+#define XLOG_DBASE_DROP					0x20
 
-typedef struct xl_dbase_create_rec
+/*
+ * Records copying of a single subdirectory incl. contents, while creating a
+ * database using FILE COPY strategy.
+ */
+typedef struct xl_dbase_create_file_copy_rec
 {
-	/* Records copying of a single subdirectory incl. contents */
 	Oid			db_id;
 	Oid			tablespace_id;
 	Oid			src_db_id;
 	Oid			src_tablespace_id;
-} xl_dbase_create_rec;
+} xl_dbase_create_file_copy_rec;
+
+/*
+ * Records creating a database directory with version file, while creating a
+ * database using WAL LOG strategy.
+ */
+typedef struct xl_dbase_create_wal_log_rec
+{
+	Oid			db_id;
+	Oid			tablespace_id;
+} xl_dbase_create_wal_log_rec;
 
 typedef struct xl_dbase_drop_rec
 {
diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h
index fd0452f..a6b657f 100644
--- a/src/include/storage/bufmgr.h
+++ b/src/include/storage/bufmgr.h
@@ -204,6 +204,9 @@ extern BlockNumber RelationGetNumberOfBlocksInFork(Relation relation,
 extern void FlushOneBuffer(Buffer buffer);
 extern void FlushRelationBuffers(Relation rel);
 extern void FlushRelationsAllBuffers(struct SMgrRelationData **smgrs, int nrels);
+extern void CreateAndCopyRelationData(RelFileNode src_rnode,
+									  RelFileNode dst_rnode,
+									  bool permanent);
 extern void FlushDatabaseBuffers(Oid dbid);
 extern void DropRelFileNodeBuffers(struct SMgrRelationData *smgr_reln, ForkNumber *forkNum,
 								   int nforks, BlockNumber *firstDelBlock);
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index eaf3e7a..0f01356 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -460,6 +460,8 @@ CoverPos
 CreateAmStmt
 CreateCastStmt
 CreateConversionStmt
+CreateDBRelInfo
+CreateDBStrategy
 CreateDomainStmt
 CreateEnumStmt
 CreateEventTrigStmt
@@ -3694,7 +3696,8 @@ xl_btree_update
 xl_btree_vacuum
 xl_clog_truncate
 xl_commit_ts_truncate
-xl_dbase_create_rec
+xl_dbase_create_file_copy_rec
+xl_dbase_create_wal_log_rec
 xl_dbase_drop_rec
 xl_end_of_recovery
 xl_hash_add_ovfl_page
-- 
1.8.3.1

From a3da9f080552f072b14dcc05d4c2d5fe5e0d02ba Mon Sep 17 00:00:00 2001
From: Dilip Kumar <dilipkumar@localhost.localdomain>
Date: Fri, 11 Mar 2022 11:48:55 +0530
Subject: [PATCH v16 6/6] Support create database strategy in createdb tool

---
 doc/src/sgml/ref/createdb.sgml    | 16 ++++++++++++++++
 src/bin/scripts/createdb.c        | 10 +++++++++-
 src/bin/scripts/t/020_createdb.pl | 20 ++++++++++++++++++++
 3 files changed, 45 insertions(+), 1 deletion(-)

diff --git a/doc/src/sgml/ref/createdb.sgml b/doc/src/sgml/ref/createdb.sgml
index 8647345..2a7beca 100644
--- a/doc/src/sgml/ref/createdb.sgml
+++ b/doc/src/sgml/ref/createdb.sgml
@@ -159,6 +159,22 @@ PostgreSQL documentation
      </varlistentry>
 
      <varlistentry>
+      <term><option>-S <replaceable class="parameter">template</replaceable></option></term>
+      <term><option>--strategy=<replaceable class="parameter">strategy</replaceable></option></term>
+      <listitem>
+       <para>
+        Specifies the database creation strategy.  Currently, we have two
+        strategies the <literal>WAL_LOG</literal> and the <literal>FILE_COPY
+        </literal>.  If <literal>WAL_LOG</literal> strategy is used then the
+        database will be copied block by block and it will also WAL log each
+        copied block.  Otherwise, if <literal>FILE_COPY</literal> strategy is
+        used then it will do the file system level copy so individual the block
+        is not WAL logged.  The default strategy is <literal>WAL_LOG</literal>.
+       </para>
+      </listitem>
+     </varlistentry>
+
+     <varlistentry>
       <term><option>-T <replaceable class="parameter">template</replaceable></option></term>
       <term><option>--template=<replaceable class="parameter">template</replaceable></option></term>
       <listitem>
diff --git a/src/bin/scripts/createdb.c b/src/bin/scripts/createdb.c
index b0c6805..9d3c4ef 100644
--- a/src/bin/scripts/createdb.c
+++ b/src/bin/scripts/createdb.c
@@ -37,6 +37,7 @@ main(int argc, char *argv[])
 		{"lc-collate", required_argument, NULL, 1},
 		{"lc-ctype", required_argument, NULL, 2},
 		{"locale", required_argument, NULL, 'l'},
+		{"strategy", required_argument, NULL, 'S'},
 		{"maintenance-db", required_argument, NULL, 3},
 		{NULL, 0, NULL, 0}
 	};
@@ -61,6 +62,7 @@ main(int argc, char *argv[])
 	char	   *lc_collate = NULL;
 	char	   *lc_ctype = NULL;
 	char	   *locale = NULL;
+	char	   *strategy = NULL;
 
 	PQExpBufferData sql;
 
@@ -73,7 +75,7 @@ main(int argc, char *argv[])
 
 	handle_help_version_opts(argc, argv, "createdb", help);
 
-	while ((c = getopt_long(argc, argv, "h:p:U:wWeO:D:T:E:l:", long_options, &optindex)) != -1)
+	while ((c = getopt_long(argc, argv, "h:p:U:wWeO:D:T:E:l:S:", long_options, &optindex)) != -1)
 	{
 		switch (c)
 		{
@@ -119,6 +121,9 @@ main(int argc, char *argv[])
 			case 3:
 				maintenance_db = pg_strdup(optarg);
 				break;
+			case 'S':
+				strategy = pg_strdup(optarg);
+				break;
 			default:
 				fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname);
 				exit(1);
@@ -217,6 +222,8 @@ main(int argc, char *argv[])
 		appendPQExpBufferStr(&sql, " LC_CTYPE ");
 		appendStringLiteralConn(&sql, lc_ctype, conn);
 	}
+	if (strategy)
+		appendPQExpBuffer(&sql, " STRATEGY %s ", fmtId(strategy));
 
 	appendPQExpBufferChar(&sql, ';');
 
@@ -274,6 +281,7 @@ help(const char *progname)
 	printf(_("      --lc-collate=LOCALE      LC_COLLATE setting for the database\n"));
 	printf(_("      --lc-ctype=LOCALE        LC_CTYPE setting for the database\n"));
 	printf(_("  -O, --owner=OWNER            database user to own the new database\n"));
+	printf(_("  -S, --strategy=STRATEGY      database creation strategy wal_log or file_copy\n"));
 	printf(_("  -T, --template=TEMPLATE      template database to copy\n"));
 	printf(_("  -V, --version                output version information, then exit\n"));
 	printf(_("  -?, --help                   show this help, then exit\n"));
diff --git a/src/bin/scripts/t/020_createdb.pl b/src/bin/scripts/t/020_createdb.pl
index 6392454..ccfbe17 100644
--- a/src/bin/scripts/t/020_createdb.pl
+++ b/src/bin/scripts/t/020_createdb.pl
@@ -76,4 +76,24 @@ $node->command_checks_all(
 	],
 	'createdb with incorrect --lc-ctype');
 
+$node->command_checks_all(
+	[ 'createdb', '--strategy', "foo", 'foobar2' ],
+	1,
+	[qr/^$/],
+	[
+		qr/^createdb: error: database creation failed: ERROR:  invalid create database strategy|^createdb: error: database creation failed: ERROR:  invalid create database strategy foo/s
+	],
+	'createdb with incorrect --strategy');
+
+# Check database creation strategy
+$node->issues_sql_like(
+	[ 'createdb', '-T', 'foobar2', 'foobar4', '-S', 'wal_log'],
+	qr/statement: CREATE DATABASE foobar4 TEMPLATE foobar2 STRATEGY wal_log/,
+	'create database with WAL_LOG strategy');
+
+$node->issues_sql_like(
+	[ 'createdb', '-T', 'foobar2', 'foobar5', '-S', 'file_copy'],
+	qr/statement: CREATE DATABASE foobar5 TEMPLATE foobar2 STRATEGY file_copy/,
+	'create database with FILE_COPY strategy');
+
 done_testing();
-- 
1.8.3.1

Reply via email to