On Thu, Mar 10, 2022 at 7:22 PM Ashutosh Sharma <ashu.coe...@gmail.com> wrote:
>
> Here are some review comments on the latest patch
> (v11-0004-WAL-logged-CREATE-DATABASE.patch). I actually did the review
> of the v10 patch but that applies for this latest version as well.
>
> +               /* Now errors are fatal ... */
> +               START_CRIT_SECTION();
>
> Did you mean PANIC instead of FATAL?

I think here fatal didn't really mean the error level FATAL, that
means critical and I have seen it is used in other places also.  But I
really don't think we need this comments to removed to avoid any
confusion.

> ==
>
> +
> (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
> +                                        errmsg("invalid create
> strategy %s", strategy),
> +                                        errhint("Valid strategies are
> \"wal_log\", and \"file_copy\".")));
> +       }
>
>
> Should this be - "invalid createdb strategy" instead of "invalid
> create strategy"?

Changed

> ==
>
> +               /*
> +                * In case of ALTER DATABASE SET TABLESPACE we don't need to 
> do
> +                * anything for the object which are not in the source
> db's default
> +                * tablespace.  The source and destination dboid will be same 
> in
> +                * case of ALTER DATABASE SET TABLESPACE.
> +                */
> +               else if (src_dboid == dst_dboid)
> +                       continue;
> +               else
> +                       dstrnode.spcNode = srcrnode.spcNode;
>
>
> Is this change still required? Do we support the WAL_COPY strategy for
> ALTER DATABASE?

Yeah now it is unreachable code so removed.

> ==
>
> +               /* Open the source and the destination relation at
> smgr level. */
> +               src_smgr = smgropen(srcrnode, InvalidBackendId);
> +               dst_smgr = smgropen(dstrnode, InvalidBackendId);
> +
> +               /* Copy relation storage from source to the destination. */
> +               CreateAndCopyRelationData(src_smgr, dst_smgr,
> relinfo->relpersistence);
>
> Do we need to do smgropen for destination relfilenode here? Aren't we
> already doing that inside RelationCreateStorage?

Yeah I have changed the complete logic and removed the smgr_open for
both source and destination and moved inside
CreateAndCopyRelationData, please check the updated code.

> ==
>
> +       use_wal = XLogIsNeeded() &&
> +               (relpersistence == RELPERSISTENCE_PERMANENT ||
> copying_initfork);
> +
> +       /* Get number of blocks in the source relation. */
> +       nblocks = smgrnblocks(src, forkNum);
>
> What if number of blocks in a source relation is ZERO? Should we check
> for that and return immediately. We have already done smgrcreate.

Yeah make sense to optimize, with that we will not have to get the
buffer strategy so done.

> ==
>
> +       /* We don't need to copy the shared objects to the target. */
> +       if (classForm->reltablespace == GLOBALTABLESPACE_OID)
> +               return NULL;
> +
> +       /*
> +        * If the object doesn't have the storage then nothing to be
> +        * done for that object so just ignore it.
> +        */
> +       if (!RELKIND_HAS_STORAGE(classForm->relkind))
> +               return NULL;
>
> We can probably club together above two if-checks.

Done

> ==
>
> +      <varlistentry>
> +       <term><replaceable class="parameter">strategy</replaceable></term>
> +       <listitem>
> +        <para>
> +         This is used for copying the database directory.  Currently, we have
> +         two strategies the <literal>WAL_LOG</literal> and the
> +         <literal>FILE_COPY</literal>.  If <literal>WAL_LOG</literal> 
> strategy
> +         is used then the database will be copied block by block and it will
> +         also WAL log each copied block.  Otherwise, if <literal>FILE_COPY
>
> I think we need to mention the default strategy in the documentation page.

Done

-- 
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com
From 60bcdf3dc8458481b17fd4f4053b48e69ac9f050 Mon Sep 17 00:00:00 2001
From: Dilip Kumar <dilipkumar@localhost.localdomain>
Date: Mon, 14 Feb 2022 17:48:03 +0530
Subject: [PATCH v12 4/4] WAL logged CREATE DATABASE

Currently, CREATE DATABASE forces a checkpoint, then copies all the files,
then forces another checkpoint. The comments in the createdb() function
explain the reasons for this. The attached patch fixes this problem by making
create database completely WAL logged so that we can avoid the checkpoints.

We are also maintaining the old way of creating the database and for that we
are providing an option to choose the strategy for creating the database.
For the new method the user need to give STRATEGY=WAL_LOG and for the
old method they need to give STRATEGY=FILE_COPY.  The default strategy will
be WAL_LOG.
---
 doc/src/sgml/ref/create_database.sgml  |  23 +
 src/backend/commands/dbcommands.c      | 756 +++++++++++++++++++++++++++------
 src/backend/storage/buffer/bufmgr.c    | 146 +++++++
 src/include/commands/dbcommands_xlog.h |   8 +
 src/include/storage/bufmgr.h           |   3 +
 src/tools/pgindent/typedefs.list       |   1 +
 6 files changed, 805 insertions(+), 132 deletions(-)

diff --git a/doc/src/sgml/ref/create_database.sgml b/doc/src/sgml/ref/create_database.sgml
index f70d0c7..2f6b069 100644
--- a/doc/src/sgml/ref/create_database.sgml
+++ b/doc/src/sgml/ref/create_database.sgml
@@ -34,6 +34,7 @@ CREATE DATABASE <replaceable class="parameter">name</replaceable>
            [ CONNECTION LIMIT [=] <replaceable class="parameter">connlimit</replaceable> ]
            [ IS_TEMPLATE [=] <replaceable class="parameter">istemplate</replaceable> ]
            [ OID [=] <replaceable class="parameter">oid</replaceable> ] ]
+           [ STRATEGY [=] <replaceable class="parameter">strategy</replaceable> ] ]
 </synopsis>
  </refsynopsisdiv>
 
@@ -240,6 +241,28 @@ CREATE DATABASE <replaceable class="parameter">name</replaceable>
        </listitem>
       </varlistentry>
 
+      <varlistentry>
+       <term><replaceable class="parameter">strategy</replaceable></term>
+       <listitem>
+        <para>
+         This is used for copying the database directory.  Currently, we have
+         two strategies the <literal>WAL_LOG</literal> and the
+         <literal>FILE_COPY</literal>.  If <literal>WAL_LOG</literal> strategy
+         is used then the database will be copied block by block and it will
+         also WAL log each copied block.  Otherwise, if <literal>FILE_COPY
+         </literal> strategy is used then it will do the file system level copy
+         so individual the block is not WAL logged.  The default strategy is
+         <literal>WAL_LOG</literal>.  If the <literal>FILE_COPY</literal>
+         strategy is used then it has to issue a checkpoint before and after
+         performing the copy and if the shared buffers are large and there are
+         a lot of dirty buffers then issuing checkpoint would be costly and it
+         may impact the performance of the whole system.  On the other hand, if
+         we WAL log each block then if the source database is large then
+         creating the database may take more time.
+        </para>
+       </listitem>
+      </varlistentry>
+
     </variablelist>
 
   <para>
diff --git a/src/backend/commands/dbcommands.c b/src/backend/commands/dbcommands.c
index c37e3c9..8dd19f0 100644
--- a/src/backend/commands/dbcommands.c
+++ b/src/backend/commands/dbcommands.c
@@ -46,6 +46,7 @@
 #include "commands/dbcommands_xlog.h"
 #include "commands/defrem.h"
 #include "commands/seclabel.h"
+#include "commands/tablecmds.h"
 #include "commands/tablespace.h"
 #include "mb/pg_wchar.h"
 #include "miscadmin.h"
@@ -63,13 +64,27 @@
 #include "utils/builtins.h"
 #include "utils/fmgroids.h"
 #include "utils/pg_locale.h"
+#include "utils/relmapper.h"
 #include "utils/snapmgr.h"
 #include "utils/syscache.h"
 
+/*
+ * Create database strategy.  The CREATEDB_WAL_LOG will copy the database at
+ * the block level and WAL log each copied block.  Whereas the
+ * CREATEDB_FILE_COPY will directly copy the database at the file level and no
+ * individual operations will be WAL logged.
+ */
+typedef enum CreateDBStrategy
+{
+	CREATEDB_WAL_LOG = 0,
+	CREATEDB_FILE_COPY = 1
+} CreateDBStrategy;
+
 typedef struct
 {
 	Oid			src_dboid;		/* source (template) DB */
 	Oid			dest_dboid;		/* DB we are trying to create */
+	CreateDBStrategy	strategy;	/* create db strategy */
 } createdb_failure_params;
 
 typedef struct
@@ -78,6 +93,19 @@ typedef struct
 	Oid			dest_tsoid;		/* tablespace we are trying to move to */
 } movedb_failure_params;
 
+/*
+ * When creating a database, we scan the pg_class of the source database to
+ * identify all the relations to be copied.  The structure is used for storing
+ * information about each relation of the source database.
+ */
+typedef struct CreateDBRelInfo
+{
+	RelFileNode		rnode;				/* physical relation identifier */
+	Oid				reloid;				/* relation oid */
+	char			relpersistence;		/* relation's persistence level */
+} CreateDBRelInfo;
+
+
 /* non-export function prototypes */
 static void createdb_failure_callback(int code, Datum arg);
 static void movedb(const char *dbname, const char *tblspcname);
@@ -92,7 +120,505 @@ static bool have_createdb_privilege(void);
 static void remove_dbtablespaces(Oid db_id);
 static bool check_db_file_conflict(Oid db_id);
 static int	errdetail_busy_db(int notherbackends, int npreparedxacts);
+static void CreateDirAndVersionFile(char *dbpath, Oid dbid, Oid tsid,
+									bool isRedo);
+static CreateDBRelInfo *GetRelInfoFromTuple(HeapTupleData *tuple,
+											Oid tbid, Oid dbid, char *srcpath);
+static List *GetRelListFromPage(Page page, Buffer buf, Oid tbid, Oid dbid,
+								char *srcpath, List *rnodelist, Snapshot
+								snapshot);
+static List *GetDatabaseRelationList(Oid srctbid, Oid srcdbid, char *srcpath);
+static void CopyDatabaseWithWal(Oid src_dboid, Oid dboid, Oid src_tsid,
+								Oid dst_tsid);
+static void CopyDatabase(Oid src_dboid, Oid dboid, Oid src_tsid, Oid dst_tsid);
+
+/*
+ * CreateDirAndVersionFile - Create database directory and write out the
+ *							 PG_VERSION file in the database path.
+ *
+ * If isRedo is true, it's okay for the database directory to exist already.
+ *
+ * We can directly write PG_MAJORVERSION in the version file instead of copying
+ * from the source database file because these two must be the same.
+ */
+static void
+CreateDirAndVersionFile(char *dbpath, Oid dbid, Oid tsid, bool isRedo)
+{
+	int		fd;
+	int		nbytes;
+	char	versionfile[MAXPGPATH];
+	char	buf[16];
+
+	/* Prepare version data before starting a critical section. */
+	sprintf(buf, "%s\n", PG_MAJORVERSION);
+	nbytes = strlen(PG_MAJORVERSION) + 1;
+
+	/* If we are not in WAL replay then write the WAL. */
+	if (!isRedo)
+	{
+		xl_dbase_create_rec xlrec;
+		XLogRecPtr	lsn;
+
+		START_CRIT_SECTION();
+
+		xlrec.db_id = dbid;
+		xlrec.tablespace_id = tsid;
+		xlrec.src_db_id = InvalidOid;
+		xlrec.src_tablespace_id = InvalidOid;
+
+		XLogBeginInsert();
+		XLogRegisterData((char *) (&xlrec), sizeof(xl_dbase_create_rec));
+
+		lsn = XLogInsert(RM_DBASE_ID, XLOG_DBASE_CREATE);
+
+		/* As always, WAL must hit the disk before the data update does. */
+		XLogFlush(lsn);
+	}
+
+	/* Create database directory. */
+	if (MakePGDirectory(dbpath) < 0)
+	{
+		/* Failure other than already exists or not in WAL replay? */
+		if (errno != EEXIST || !isRedo)
+			ereport(ERROR,
+					(errcode_for_file_access(),
+					 errmsg("could not create directory \"%s\": %m", dbpath)));
+	}
+
+	/*
+	 * Create PG_VERSION file in the database path.  If the file already exists
+	 * and we are in WAL replay then try again to open it in write mode.
+	 */
+	snprintf(versionfile, sizeof(versionfile), "%s/%s", dbpath, "PG_VERSION");
+
+	fd = OpenTransientFile(versionfile, O_WRONLY | O_CREAT | O_EXCL | PG_BINARY);
+	if (fd < 0 && errno == EEXIST && isRedo)
+		fd = OpenTransientFile(versionfile, O_WRONLY | O_TRUNC | PG_BINARY);
+
+	if (fd < 0)
+		ereport(ERROR,
+				(errcode_for_file_access(),
+				 errmsg("could not create file \"%s\": %m", versionfile)));
+
+	/* Write PG_MAJORVERSION in the PG_VERSION file. */
+	pgstat_report_wait_start(WAIT_EVENT_COPY_FILE_WRITE);
+	errno = 0;
+	if ((int) write(fd, buf, nbytes) != nbytes)
+	{
+		/* If write didn't set errno, assume problem is no disk space. */
+		if (errno == 0)
+			errno = ENOSPC;
+		ereport(ERROR,
+				(errcode_for_file_access(),
+				 errmsg("could not write to file \"%s\": %m", versionfile)));
+	}
+	pgstat_report_wait_end();
+
+	/* Close the version file. */
+	CloseTransientFile(fd);
+
+	/* Critical section done. */
+	if (!isRedo)
+		END_CRIT_SECTION();
+}
+
+/*
+ * GetRelInfoFromTuple - Prepare a CreateDBRelInfo element from the tuple
+ *
+ * Helper function for GetRelListFromPage to prepare a single element from the
+ * pg_class tuple.
+ */
+CreateDBRelInfo *
+GetRelInfoFromTuple(HeapTupleData *tuple, Oid tbid, Oid dbid, char *srcpath)
+{
+	CreateDBRelInfo	   *relinfo;
+	Form_pg_class		classForm;
+	Oid					relfilenode = InvalidOid;
+
+	classForm = (Form_pg_class) GETSTRUCT(tuple);
+
+	/*
+	 * If this is a shared object or the object doesn't have the storage then
+	 * nothing to be done, so just return.
+	 */
+	if (classForm->reltablespace == GLOBALTABLESPACE_OID ||
+		!RELKIND_HAS_STORAGE(classForm->relkind))
+		return NULL;
+
+	/*
+	 * If relfilenode is valid then directly use it.  Otherwise,
+	 * consult the relmapper for the mapped relation.
+	 */
+	if (OidIsValid(classForm->relfilenode))
+		relfilenode = classForm->relfilenode;
+	else
+		relfilenode = RelationMapOidToFilenodeForDatabase(srcpath,
+										classForm->oid);
+
+	/* We must have a valid relfilenode oid. */
+	Assert(OidIsValid(relfilenode));
+
+	/* Prepare a rel info element and add it to the list. */
+	relinfo = (CreateDBRelInfo *) palloc(sizeof(CreateDBRelInfo));
+	if (OidIsValid(classForm->reltablespace))
+		relinfo->rnode.spcNode = classForm->reltablespace;
+	else
+		relinfo->rnode.spcNode = tbid;
+
+	relinfo->rnode.dbNode = dbid;
+	relinfo->rnode.relNode = relfilenode;
+	relinfo->reloid = classForm->oid;
+	relinfo->relpersistence = classForm->relpersistence;
+
+	return relinfo;
+}
+
+/*
+ * GetRelListFromPage - Helper function for GetDatabaseRelationList.
+ *
+ * Iterate over each tuple of input pg_class and get a list of all the valid
+ * relfilenodes of the given block and append them to input rnodelist.
+ */
+static List *
+GetRelListFromPage(Page page, Buffer buf, Oid tbid, Oid dbid, char *srcpath,
+				  List *rnodelist, Snapshot snapshot)
+{
+	BlockNumber		blkno = BufferGetBlockNumber(buf);
+	OffsetNumber	offnum;
+	OffsetNumber	maxoff;
+	HeapTupleData	tuple;
+
+	maxoff = PageGetMaxOffsetNumber(page);
+
+	/* Iterate over each tuple on the page. */
+	for (offnum = FirstOffsetNumber;
+		 offnum <= maxoff;
+		 offnum = OffsetNumberNext(offnum))
+	{
+		ItemId		itemid;
+
+		itemid = PageGetItemId(page, offnum);
+
+		/* Nothing to do if slot is empty or already dead. */
+		if (!ItemIdIsUsed(itemid) || ItemIdIsDead(itemid) ||
+			ItemIdIsRedirected(itemid))
+			continue;
+
+		Assert(ItemIdIsNormal(itemid));
+		ItemPointerSet(&(tuple.t_self), blkno, offnum);
+
+		tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
+		tuple.t_len = ItemIdGetLength(itemid);
+		tuple.t_tableOid = RelationRelationId;
 
+		/*
+		 * If the tuple is visible then add its relfilenode info to the
+		 * list.
+		 */
+		if (HeapTupleSatisfiesVisibility(&tuple, snapshot, buf))
+		{
+			CreateDBRelInfo	   *relinfo;
+
+			relinfo = GetRelInfoFromTuple(&tuple, tbid, dbid, srcpath);
+
+			/* Add it to the list. */
+			if (relinfo != NULL)
+				rnodelist = lappend(rnodelist, relinfo);
+		}
+	}
+
+	return rnodelist;
+}
+
+/*
+ * GetDatabaseRelationList - Get relfilenode list to be copied.
+ *
+ * Iterate over each block of the pg_class relation.  From there, we will check
+ * all the visible tuples in order to get a list of all the valid relfilenodes
+ * in the source database that should be copied to the target database.
+ */
+static List *
+GetDatabaseRelationList(Oid tbid, Oid dbid, char *srcpath)
+{
+	SMgrRelation	rd_smgr;
+	RelFileNode		rnode;
+	BlockNumber		nblocks;
+	BlockNumber		blkno;
+	Buffer			buf;
+	Oid				relfilenode;
+	Page			page;
+	List		   *rnodelist = NIL;
+	LockRelId		relid;
+	Snapshot		snapshot;
+	BufferAccessStrategy bstrategy;
+
+	/* Get pg_class relfilenode. */
+	relfilenode = RelationMapOidToFilenodeForDatabase(srcpath,
+													  RelationRelationId);
+	/*
+	 * We are going to read the buffers associated with the pg_class relation.
+	 * Thus, acquire the relation level lock before start scanning.  As we are
+	 * not connected to the database, we cannot use relation_open directly, so
+	 * we have to lock using relation id.
+	 */
+	relid.dbId = dbid;
+	relid.relId = RelationRelationId;
+	LockRelationId(&relid, AccessShareLock);
+
+	/* Prepare a relnode for pg_class relation. */
+	rnode.spcNode = tbid;
+	rnode.dbNode = dbid;
+	rnode.relNode = relfilenode;
+
+	/*
+	 * We are not connected to the source database so open the pg_class
+	 * relation at the smgr level and get the block count.
+	 */
+	rd_smgr = smgropen(rnode, InvalidBackendId);
+	nblocks = smgrnblocks(rd_smgr, MAIN_FORKNUM);
+
+	/*
+	 * We're going to read the whole pg_class so better to use bulk-read buffer
+	 * access strategy.
+	 */
+	bstrategy = GetAccessStrategy(BAS_BULKREAD);
+
+	/* Get latest snapshot for scanning the pg_class. */
+	snapshot = GetLatestSnapshot();
+
+	/* Iterate over each block on the pg_class relation. */
+	for (blkno = 0; blkno < nblocks; blkno++)
+	{
+		/*
+		 * We are not connected to the source database so directly use the lower
+		 * level bufmgr interface which operates on the rnode.
+		 */
+		buf = ReadBufferWithoutRelcache(rnode, MAIN_FORKNUM, blkno,
+										RBM_NORMAL, bstrategy,
+										RELPERSISTENCE_PERMANENT);
+
+		LockBuffer(buf, BUFFER_LOCK_SHARE);
+		page = BufferGetPage(buf);
+		if (PageIsNew(page) || PageIsEmpty(page))
+		{
+			UnlockReleaseBuffer(buf);
+			continue;
+		}
+
+		/*
+		 * Process pg_class tuple for the current page and add all the valid
+		 * relfilenode entries to the rnodelist.
+		 */
+		rnodelist = GetRelListFromPage(page, buf, tbid, dbid, srcpath,
+									   rnodelist, snapshot);
+
+		/* Release the buffer lock. */
+		UnlockReleaseBuffer(buf);
+	}
+
+	/* Release the lock. */
+	UnlockRelationId(&relid, AccessShareLock);
+
+	return rnodelist;
+}
+
+/*
+ * CopyDatabaseWithWal - Copy source database to the target database with WAL
+ *
+ * Create target database directory and copy data files from the source
+ * database to the target database, block by block and WAL log all the
+ * operations.
+ */
+static void
+CopyDatabaseWithWal(Oid src_dboid, Oid dst_dboid, Oid src_tsid, Oid dst_tsid)
+{
+	char	   *srcpath;
+	char	   *dstpath;
+	List	   *rnodelist = NULL;
+	ListCell   *cell;
+	LockRelId	relid;
+	RelFileNode	srcrnode;
+	RelFileNode	dstrnode;
+	CreateDBRelInfo	*relinfo;
+
+	/* Get the source database path. */
+	srcpath = GetDatabasePath(src_dboid, src_tsid);
+
+	/* Get the destination database path. */
+	dstpath = GetDatabasePath(dst_dboid, dst_tsid);
+
+	/* Create database directory and write PG_VERSION file. */
+	CreateDirAndVersionFile(dstpath, dst_dboid, dst_tsid, false);
+
+	/* Copy relmap file from source database to the destination database. */
+	CopyRelationMap(dst_dboid, dst_tsid, srcpath, dstpath);
+
+	/* Get list of all valid relnode from the source database. */
+	rnodelist = GetDatabaseRelationList(src_tsid, src_dboid, srcpath);
+	Assert(rnodelist != NIL);
+
+	/*
+	 * Database id is common for all the relation so set it before entering to
+	 * the loop.
+	 */
+	relid.dbId = src_dboid;
+
+	/*
+	 * Iterate over each relfilenode and copy the relation data block by block
+	 * from source database to the destination database.
+	 */
+	foreach(cell, rnodelist)
+	{
+		relinfo = lfirst(cell);
+		srcrnode = relinfo->rnode;
+
+		/*
+		 * If the relation is from the source db's default tablespace then we
+		 * need to create it in the destinations db's default tablespace.
+		 * Otherwise, we need to create in the same tablespace as it is in the
+		 * source database.
+		 */
+		if (srcrnode.spcNode == src_tsid)
+			dstrnode.spcNode = dst_tsid;
+		else
+			dstrnode.spcNode = srcrnode.spcNode;
+
+		dstrnode.dbNode = dst_dboid;
+		dstrnode.relNode = srcrnode.relNode;
+
+		/* Acquire the lock on relation before start copying. */
+		relid.relId = relinfo->reloid;
+		LockRelationId(&relid, AccessShareLock);
+
+		/* Copy relation storage from source to the destination. */
+		CreateAndCopyRelationData(srcrnode, dstrnode, relinfo->relpersistence);
+
+		/* Release the lock. */
+		UnlockRelationId(&relid, AccessShareLock);
+	}
+
+	list_free_deep(rnodelist);
+}
+
+/*
+ * CopyDatabase - Copy source database to the target database
+ *
+ * Copy source database directory to the destination directory using copydir
+ * operation.
+ */
+static void
+CopyDatabase(Oid src_dboid, Oid dst_dboid, Oid src_tsid, Oid dst_tsid)
+{
+	TableScanDesc	scan;
+	Relation		rel;
+	HeapTuple		tuple;
+
+	/*
+	 * Force a checkpoint before starting the copy. This will force all
+	 * dirty buffers, including those of unlogged tables, out to disk, to
+	 * ensure source database is up-to-date on disk for the copy.
+	 * FlushDatabaseBuffers() would suffice for that, but we also want to
+	 * process any pending unlink requests. Otherwise, if a checkpoint
+	 * happened while we're copying files, a file might be deleted just
+	 * when we're about to copy it, causing the lstat() call in copydir()
+	 * to fail with ENOENT.
+	 */
+	RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE |
+					  CHECKPOINT_WAIT | CHECKPOINT_FLUSH_ALL);
+
+	/*
+	 * Iterate through all tablespaces of the template database, and copy
+	 * each one to the new database.
+	 */
+	rel = table_open(TableSpaceRelationId, AccessShareLock);
+	scan = table_beginscan_catalog(rel, 0, NULL);
+	while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
+	{
+		Form_pg_tablespace spaceform = (Form_pg_tablespace) GETSTRUCT(tuple);
+		Oid			srctablespace = spaceform->oid;
+		Oid			dsttablespace;
+		char	   *srcpath;
+		char	   *dstpath;
+		struct stat st;
+
+		/* No need to copy global tablespace */
+		if (srctablespace == GLOBALTABLESPACE_OID)
+			continue;
+
+		srcpath = GetDatabasePath(src_dboid, srctablespace);
+
+		if (stat(srcpath, &st) < 0 || !S_ISDIR(st.st_mode) ||
+			directory_is_empty(srcpath))
+		{
+			/* Assume we can ignore it */
+			pfree(srcpath);
+			continue;
+		}
+
+		if (srctablespace == src_tsid)
+			dsttablespace = dst_tsid;
+		else
+			dsttablespace = srctablespace;
+
+		dstpath = GetDatabasePath(dst_dboid, dsttablespace);
+
+		/*
+		 * Copy this subdirectory to the new location
+		 *
+		 * We don't need to copy subdirectories
+		 */
+		copydir(srcpath, dstpath, false);
+
+		/* Record the filesystem change in XLOG */
+		{
+			xl_dbase_create_rec xlrec;
+
+			xlrec.db_id = dst_dboid;
+			xlrec.tablespace_id = dsttablespace;
+			xlrec.src_db_id = src_dboid;
+			xlrec.src_tablespace_id = srctablespace;
+
+			XLogBeginInsert();
+			XLogRegisterData((char *) &xlrec, sizeof(xl_dbase_create_rec));
+
+			(void) XLogInsert(RM_DBASE_ID,
+							  XLOG_DBASE_CREATE | XLR_SPECIAL_REL_UPDATE);
+		}
+	}
+	table_endscan(scan);
+	table_close(rel, AccessShareLock);
+
+	/*
+	 * We force a checkpoint before committing.  This effectively means
+	 * that committed XLOG_DBASE_CREATE operations will never need to be
+	 * replayed (at least not in ordinary crash recovery; we still have to
+	 * make the XLOG entry for the benefit of PITR operations). This
+	 * avoids two nasty scenarios:
+	 *
+	 * #1: When PITR is off, we don't XLOG the contents of newly created
+	 * indexes; therefore the drop-and-recreate-whole-directory behavior
+	 * of DBASE_CREATE replay would lose such indexes.
+	 *
+	 * #2: Since we have to recopy the source database during DBASE_CREATE
+	 * replay, we run the risk of copying changes in it that were
+	 * committed after the original CREATE DATABASE command but before the
+	 * system crash that led to the replay.  This is at least unexpected
+	 * and at worst could lead to inconsistencies, eg duplicate table
+	 * names.
+	 *
+	 * (Both of these were real bugs in releases 8.0 through 8.0.3.)
+	 *
+	 * In PITR replay, the first of these isn't an issue, and the second
+	 * is only a risk if the CREATE DATABASE and subsequent template
+	 * database change both occur while a base backup is being taken.
+	 * There doesn't seem to be much we can do about that except document
+	 * it as a limitation.
+	 *
+	 * Perhaps if we ever implement CREATE DATABASE in a less cheesy way,
+	 * we can avoid this.
+	 */
+	RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
+}
 
 /*
  * CREATE DATABASE
@@ -100,8 +626,6 @@ static int	errdetail_busy_db(int notherbackends, int npreparedxacts);
 Oid
 createdb(ParseState *pstate, const CreatedbStmt *stmt)
 {
-	TableScanDesc scan;
-	Relation	rel;
 	Oid			src_dboid;
 	Oid			src_owner;
 	int			src_encoding = -1;
@@ -132,6 +656,7 @@ createdb(ParseState *pstate, const CreatedbStmt *stmt)
 	DefElem    *dallowconnections = NULL;
 	DefElem    *dconnlimit = NULL;
 	DefElem    *dcollversion = NULL;
+	DefElem    *dstrategy = NULL;
 	char	   *dbname = stmt->dbname;
 	char	   *dbowner = NULL;
 	const char *dbtemplate = NULL;
@@ -145,6 +670,7 @@ createdb(ParseState *pstate, const CreatedbStmt *stmt)
 	char	   *dbcollversion = NULL;
 	int			notherbackends;
 	int			npreparedxacts;
+	CreateDBStrategy	dbstrategy = CREATEDB_WAL_LOG;
 	createdb_failure_params fparms;
 
 	/* Extract options from the statement node tree */
@@ -250,6 +776,12 @@ createdb(ParseState *pstate, const CreatedbStmt *stmt)
 						(errcode(ERRCODE_INVALID_PARAMETER_VALUE)),
 						errmsg("OIDs less than %u are reserved for system objects", FirstNormalObjectId));
 		}
+		else if (strcmp(defel->defname, "strategy") == 0)
+		{
+			if (dstrategy)
+				errorConflictingDefElem(defel, pstate);
+			dstrategy = defel;
+		}
 		else
 			ereport(ERROR,
 					(errcode(ERRCODE_SYNTAX_ERROR),
@@ -374,6 +906,23 @@ createdb(ParseState *pstate, const CreatedbStmt *stmt)
 							dbtemplate)));
 	}
 
+	/* Validate the database creation strategy. */
+	if (dstrategy && dstrategy->arg)
+	{
+		char	*strategy;
+
+		strategy = defGetString(dstrategy);
+		if (strcmp(strategy, "wal_log") == 0)
+			dbstrategy = CREATEDB_WAL_LOG;
+		else if (strcmp(strategy, "file_copy") == 0)
+			dbstrategy = CREATEDB_FILE_COPY;
+		else
+			ereport(ERROR,
+					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+					 errmsg("invalid create database strategy %s", strategy),
+					 errhint("Valid strategies are \"wal_log\", and \"file_copy\".")));
+	}
+
 	/* If encoding or locales are defaulted, use source's setting */
 	if (encoding < 0)
 		encoding = src_encoding;
@@ -668,19 +1217,6 @@ createdb(ParseState *pstate, const CreatedbStmt *stmt)
 	InvokeObjectPostCreateHook(DatabaseRelationId, dboid, 0);
 
 	/*
-	 * Force a checkpoint before starting the copy. This will force all dirty
-	 * buffers, including those of unlogged tables, out to disk, to ensure
-	 * source database is up-to-date on disk for the copy.
-	 * FlushDatabaseBuffers() would suffice for that, but we also want to
-	 * process any pending unlink requests. Otherwise, if a checkpoint
-	 * happened while we're copying files, a file might be deleted just when
-	 * we're about to copy it, causing the lstat() call in copydir() to fail
-	 * with ENOENT.
-	 */
-	RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT
-					  | CHECKPOINT_FLUSH_ALL);
-
-	/*
 	 * Once we start copying subdirectories, we need to be able to clean 'em
 	 * up if we fail.  Use an ENSURE block to make sure this happens.  (This
 	 * is not a 100% solution, because of the possibility of failure during
@@ -689,114 +1225,47 @@ createdb(ParseState *pstate, const CreatedbStmt *stmt)
 	 */
 	fparms.src_dboid = src_dboid;
 	fparms.dest_dboid = dboid;
+	fparms.strategy = dbstrategy;
+
 	PG_ENSURE_ERROR_CLEANUP(createdb_failure_callback,
 							PointerGetDatum(&fparms));
 	{
 		/*
-		 * Iterate through all tablespaces of the template database, and copy
-		 * each one to the new database.
+		 * If the user has asked to create a database with WAL_LOG strategy
+		 * then call CopyDatabaseWithWal, which will copy the database at the
+		 * block level and it will WAL log each copied block.  Otherwise,
+		 * call CopyDatabase that will copy the database file by file.
 		 */
-		rel = table_open(TableSpaceRelationId, AccessShareLock);
-		scan = table_beginscan_catalog(rel, 0, NULL);
-		while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
+		if (dbstrategy == CREATEDB_WAL_LOG)
 		{
-			Form_pg_tablespace spaceform = (Form_pg_tablespace) GETSTRUCT(tuple);
-			Oid			srctablespace = spaceform->oid;
-			Oid			dsttablespace;
-			char	   *srcpath;
-			char	   *dstpath;
-			struct stat st;
-
-			/* No need to copy global tablespace */
-			if (srctablespace == GLOBALTABLESPACE_OID)
-				continue;
-
-			srcpath = GetDatabasePath(src_dboid, srctablespace);
-
-			if (stat(srcpath, &st) < 0 || !S_ISDIR(st.st_mode) ||
-				directory_is_empty(srcpath))
-			{
-				/* Assume we can ignore it */
-				pfree(srcpath);
-				continue;
-			}
-
-			if (srctablespace == src_deftablespace)
-				dsttablespace = dst_deftablespace;
-			else
-				dsttablespace = srctablespace;
-
-			dstpath = GetDatabasePath(dboid, dsttablespace);
+			CopyDatabaseWithWal(src_dboid, dboid, src_deftablespace,
+								dst_deftablespace);
 
 			/*
-			 * Copy this subdirectory to the new location
-			 *
-			 * We don't need to copy subdirectories
+			 * Close pg_database, but keep lock till commit.
 			 */
-			copydir(srcpath, dstpath, false);
-
-			/* Record the filesystem change in XLOG */
-			{
-				xl_dbase_create_rec xlrec;
-
-				xlrec.db_id = dboid;
-				xlrec.tablespace_id = dsttablespace;
-				xlrec.src_db_id = src_dboid;
-				xlrec.src_tablespace_id = srctablespace;
-
-				XLogBeginInsert();
-				XLogRegisterData((char *) &xlrec, sizeof(xl_dbase_create_rec));
-
-				(void) XLogInsert(RM_DBASE_ID,
-								  XLOG_DBASE_CREATE | XLR_SPECIAL_REL_UPDATE);
-			}
+			table_close(pg_database_rel, NoLock);
 		}
-		table_endscan(scan);
-		table_close(rel, AccessShareLock);
+		else
+		{
+			Assert(dbstrategy == CREATEDB_FILE_COPY);
 
-		/*
-		 * We force a checkpoint before committing.  This effectively means
-		 * that committed XLOG_DBASE_CREATE operations will never need to be
-		 * replayed (at least not in ordinary crash recovery; we still have to
-		 * make the XLOG entry for the benefit of PITR operations). This
-		 * avoids two nasty scenarios:
-		 *
-		 * #1: When PITR is off, we don't XLOG the contents of newly created
-		 * indexes; therefore the drop-and-recreate-whole-directory behavior
-		 * of DBASE_CREATE replay would lose such indexes.
-		 *
-		 * #2: Since we have to recopy the source database during DBASE_CREATE
-		 * replay, we run the risk of copying changes in it that were
-		 * committed after the original CREATE DATABASE command but before the
-		 * system crash that led to the replay.  This is at least unexpected
-		 * and at worst could lead to inconsistencies, eg duplicate table
-		 * names.
-		 *
-		 * (Both of these were real bugs in releases 8.0 through 8.0.3.)
-		 *
-		 * In PITR replay, the first of these isn't an issue, and the second
-		 * is only a risk if the CREATE DATABASE and subsequent template
-		 * database change both occur while a base backup is being taken.
-		 * There doesn't seem to be much we can do about that except document
-		 * it as a limitation.
-		 *
-		 * Perhaps if we ever implement CREATE DATABASE in a less cheesy way,
-		 * we can avoid this.
-		 */
-		RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
+			CopyDatabase(src_dboid, dboid, src_deftablespace,
+						 dst_deftablespace);
 
-		/*
-		 * Close pg_database, but keep lock till commit.
-		 */
-		table_close(pg_database_rel, NoLock);
+			/*
+			 * Close pg_database, but keep lock till commit.
+			 */
+			table_close(pg_database_rel, NoLock);
 
-		/*
-		 * Force synchronous commit, thus minimizing the window between
-		 * creation of the database files and committal of the transaction. If
-		 * we crash before committing, we'll have a DB that's taking up disk
-		 * space but is not in pg_database, which is not good.
-		 */
-		ForceSyncCommit();
+			/*
+			 * Force synchronous commit, thus minimizing the window between
+			 * creation of the database files and committal of the transaction.
+			 * If we crash before committing, we'll have a DB that's taking up
+			 * disk space but is not in pg_database, which is not good.
+			 */
+			ForceSyncCommit();
+		}
 	}
 	PG_END_ENSURE_ERROR_CLEANUP(createdb_failure_callback,
 								PointerGetDatum(&fparms));
@@ -870,6 +1339,21 @@ createdb_failure_callback(int code, Datum arg)
 	createdb_failure_params *fparms = (createdb_failure_params *) DatumGetPointer(arg);
 
 	/*
+	 * If we were copying database at block levels then drop pages for the
+	 * destination database that are in the shared buffer cache.  And tell
+	 * checkpointer to forget any pending fsync and unlink requests for
+	 * files in the database.  The reasoning behind doing this is same as
+	 * explained in dropdb function.  But unlike dropdb we don't need to call
+	 * pgstat_drop_database because this database is still not created so there
+	 * should not be any stat for this.
+	 */
+	if (fparms->strategy == CREATEDB_WAL_LOG)
+	{
+		DropDatabaseBuffers(fparms->dest_dboid);
+		ForgetDatabaseSyncRequests(fparms->dest_dboid);
+	}
+
+	/*
 	 * Release lock on source database before doing recursive remove. This is
 	 * not essential but it seems desirable to release the lock as soon as
 	 * possible.
@@ -2387,32 +2871,40 @@ dbase_redo(XLogReaderState *record)
 		src_path = GetDatabasePath(xlrec->src_db_id, xlrec->src_tablespace_id);
 		dst_path = GetDatabasePath(xlrec->db_id, xlrec->tablespace_id);
 
-		/*
-		 * Our theory for replaying a CREATE is to forcibly drop the target
-		 * subdirectory if present, then re-copy the source data. This may be
-		 * more work than needed, but it is simple to implement.
-		 */
-		if (stat(dst_path, &st) == 0 && S_ISDIR(st.st_mode))
+		if (!OidIsValid(xlrec->src_db_id))
 		{
-			if (!rmtree(dst_path, true))
-				/* If this failed, copydir() below is going to error. */
-				ereport(WARNING,
-						(errmsg("some useless files may be left behind in old database directory \"%s\"",
-								dst_path)));
+			CreateDirAndVersionFile(dst_path, xlrec->db_id, xlrec->tablespace_id,
+									true);
 		}
+		else
+		{
+			/*
+			* Our theory for replaying a CREATE is to forcibly drop the target
+			* subdirectory if present, then re-copy the source data. This may be
+			* more work than needed, but it is simple to implement.
+			*/
+			if (stat(dst_path, &st) == 0 && S_ISDIR(st.st_mode))
+			{
+				if (!rmtree(dst_path, true))
+					/* If this failed, copydir() below is going to error. */
+					ereport(WARNING,
+							(errmsg("some useless files may be left behind in old database directory \"%s\"",
+									dst_path)));
+			}
 
-		/*
-		 * Force dirty buffers out to disk, to ensure source database is
-		 * up-to-date for the copy.
-		 */
-		FlushDatabaseBuffers(xlrec->src_db_id);
+			/*
+			* Force dirty buffers out to disk, to ensure source database is
+			* up-to-date for the copy.
+			*/
+			FlushDatabaseBuffers(xlrec->src_db_id);
 
-		/*
-		 * Copy this subdirectory to the new location
-		 *
-		 * We don't need to copy subdirectories
-		 */
-		copydir(src_path, dst_path, false);
+			/*
+			* Copy this subdirectory to the new location
+			*
+			* We don't need to copy subdirectories
+			*/
+			copydir(src_path, dst_path, false);
+		}
 	}
 	else if (info == XLOG_DBASE_DROP)
 	{
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 0ed2d31..156806d 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -38,6 +38,7 @@
 #include "access/xlogutils.h"
 #include "catalog/catalog.h"
 #include "catalog/storage.h"
+#include "catalog/storage_xlog.h"
 #include "executor/instrument.h"
 #include "lib/binaryheap.h"
 #include "miscadmin.h"
@@ -486,6 +487,9 @@ static void FindAndDropRelFileNodeBuffers(RelFileNode rnode,
 										  ForkNumber forkNum,
 										  BlockNumber nForkBlock,
 										  BlockNumber firstDelBlock);
+static void RelationCopyStorageUsingBuffer(SMgrRelation src, SMgrRelation dst,
+										   ForkNumber forkNum,
+										   char relpersistence);
 static void AtProcExit_Buffers(int code, Datum arg);
 static void CheckForBufferLeaks(void);
 static int	rnode_comparator(const void *p1, const void *p2);
@@ -3670,6 +3674,148 @@ FlushRelationsAllBuffers(SMgrRelation *smgrs, int nrels)
 }
 
 /* ---------------------------------------------------------------------
+ *		RelationCopyStorageUsingBuffer
+ *
+ *		Copy fork's data using bufmgr.  Same as RelationCopyStorage but instead
+ *		of using smgrread and smgrextend this will copy using bufmgr APIs.
+ * --------------------------------------------------------------------
+ */
+static void
+RelationCopyStorageUsingBuffer(SMgrRelation src, SMgrRelation dst,
+							   ForkNumber forkNum, char relpersistence)
+{
+	Buffer		srcBuf;
+	Buffer		dstBuf;
+	Page		srcPage;
+	Page		dstPage;
+	bool		use_wal;
+	bool		copying_initfork;
+	BlockNumber nblocks;
+	BlockNumber blkno;
+	BufferAccessStrategy bstrategy_src;
+	BufferAccessStrategy bstrategy_dst;
+
+	/* Refer comments in RelationCopyStorage. */
+	copying_initfork = relpersistence == RELPERSISTENCE_UNLOGGED &&
+		forkNum == INIT_FORKNUM;
+	use_wal = XLogIsNeeded() &&
+		(relpersistence == RELPERSISTENCE_PERMANENT || copying_initfork);
+
+	/* Get number of blocks in the source relation. */
+	nblocks = smgrnblocks(src, forkNum);
+
+	/* Nothing to copy so directly exit. */
+	if (nblocks == 0)
+		return;
+
+	/*
+	 * We are going to copy whole relation from the source to the destination
+	 * so use BAS_BULKREAD strategy for the source relation and BAS_BULKWRITE
+	 * strategy for the destination relation.
+	 */
+	bstrategy_src = GetAccessStrategy(BAS_BULKREAD);
+	bstrategy_dst = GetAccessStrategy(BAS_BULKWRITE);
+
+	/* Iterate over each block of the source relation file. */
+	for (blkno = 0; blkno < nblocks; blkno++)
+	{
+		/* If we got a cancel signal during the copy of the data, quit */
+		CHECK_FOR_INTERRUPTS();
+
+		/* Read block from source relation. */
+		srcBuf = ReadBufferWithoutRelcache(src->smgr_rnode.node, forkNum,
+										   blkno, RBM_NORMAL, bstrategy_src,
+										   relpersistence);
+		srcPage = BufferGetPage(srcBuf);
+		if (PageIsNew(srcPage) || PageIsEmpty(srcPage))
+		{
+			ReleaseBuffer(srcBuf);
+			continue;
+		}
+
+		/* Use P_NEW to extend the relation. */
+		dstBuf = ReadBufferWithoutRelcache(dst->smgr_rnode.node, forkNum,
+										   P_NEW, RBM_NORMAL, bstrategy_dst,
+										   relpersistence);
+		LockBuffer(dstBuf, BUFFER_LOCK_EXCLUSIVE);
+
+		START_CRIT_SECTION();
+
+		/* Initialize the page and write the data. */
+		dstPage = BufferGetPage(dstBuf);
+		PageInit(dstPage, BufferGetPageSize(dstBuf), 0);
+		memcpy(dstPage, srcPage, BLCKSZ);
+		MarkBufferDirty(dstBuf);
+
+		/* WAL-log the copied page. */
+		if (use_wal)
+			log_newpage_buffer(dstBuf, true);
+
+		END_CRIT_SECTION();
+
+		UnlockReleaseBuffer(dstBuf);
+		ReleaseBuffer(srcBuf);
+	}
+}
+
+/* ---------------------------------------------------------------------
+ *		CreateAndCopyRelationData
+ *
+ *		Create destination relation storage and copy source relation's all
+ *		fork's data to the destination.
+ * --------------------------------------------------------------------
+ */
+void
+CreateAndCopyRelationData(RelFileNode src_rnode, RelFileNode dst_rnode,
+						  char relpersistence)
+{
+	SMgrRelation	src_smgr;
+	SMgrRelation	dst_smgr;
+
+	/* Open the source relation at smgr level. */
+	src_smgr = smgropen(src_rnode, InvalidBackendId);
+
+	/*
+	 * Create and copy all forks of the relation.
+	 *
+	 * NOTE: any conflict in relfilenode value will be caught in
+	 * RelationCreateStorage().
+	 */
+	dst_smgr = RelationCreateStorage(dst_rnode, relpersistence);
+
+	/* copy main fork */
+	RelationCopyStorageUsingBuffer(src_smgr, dst_smgr, MAIN_FORKNUM,
+								   relpersistence);
+
+	/* copy those extra forks that exist */
+	for (ForkNumber forkNum = MAIN_FORKNUM + 1;
+		 forkNum <= MAX_FORKNUM; forkNum++)
+	{
+		if (smgrexists(src_smgr, forkNum))
+		{
+			smgrcreate(dst_smgr, forkNum, false);
+
+			/*
+			 * WAL log creation if the relation is persistent, or this is the
+			 * init fork of an unlogged relation.
+			 */
+			if (relpersistence == RELPERSISTENCE_PERMANENT ||
+				(relpersistence == RELPERSISTENCE_UNLOGGED &&
+				 forkNum == INIT_FORKNUM))
+				log_smgrcreate(&dst_rnode, forkNum);
+
+			/* Copy a fork's data, block by block. */
+			RelationCopyStorageUsingBuffer(src_smgr, dst_smgr, forkNum,
+										   relpersistence);
+		}
+	}
+
+	/* Close the smgr rel */
+	smgrclose(src_smgr);
+	smgrclose(dst_smgr);
+}
+
+/* ---------------------------------------------------------------------
  *		FlushDatabaseBuffers
  *
  *		This function writes all dirty pages of a database out to disk
diff --git a/src/include/commands/dbcommands_xlog.h b/src/include/commands/dbcommands_xlog.h
index 593a857..8f59870 100644
--- a/src/include/commands/dbcommands_xlog.h
+++ b/src/include/commands/dbcommands_xlog.h
@@ -20,6 +20,7 @@
 /* record types */
 #define XLOG_DBASE_CREATE		0x00
 #define XLOG_DBASE_DROP			0x10
+#define XLOG_DBASE_CREATEDIR	0x20
 
 typedef struct xl_dbase_create_rec
 {
@@ -30,6 +31,13 @@ typedef struct xl_dbase_create_rec
 	Oid			src_tablespace_id;
 } xl_dbase_create_rec;
 
+typedef struct xl_dbase_createdir_rec
+{
+	/* Records creating database directory */
+	Oid			db_id;
+	Oid			tablespace_id;
+} xl_dbase_createdir_rec;
+
 typedef struct xl_dbase_drop_rec
 {
 	Oid			db_id;
diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h
index 7b80f58..a5659c0 100644
--- a/src/include/storage/bufmgr.h
+++ b/src/include/storage/bufmgr.h
@@ -204,6 +204,9 @@ extern BlockNumber RelationGetNumberOfBlocksInFork(Relation relation,
 extern void FlushOneBuffer(Buffer buffer);
 extern void FlushRelationBuffers(Relation rel);
 extern void FlushRelationsAllBuffers(struct SMgrRelationData **smgrs, int nrels);
+extern void CreateAndCopyRelationData(RelFileNode src_rnode,
+									  RelFileNode dst_rnode,
+									  char relpersistence);
 extern void FlushDatabaseBuffers(Oid dbid);
 extern void DropRelFileNodeBuffers(struct SMgrRelationData *smgr_reln, ForkNumber *forkNum,
 								   int nforks, BlockNumber *firstDelBlock);
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index d9b83f7..dcda8ca 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -460,6 +460,7 @@ CoverPos
 CreateAmStmt
 CreateCastStmt
 CreateConversionStmt
+CreateDBRelInfo
 CreateDomainStmt
 CreateEnumStmt
 CreateEventTrigStmt
-- 
1.8.3.1

From a479f7057649e2c6ef332ff313e9291089e193e0 Mon Sep 17 00:00:00 2001
From: Dilip Kumar <dilipkumar@localhost.localdomain>
Date: Thu, 10 Mar 2022 10:18:18 +0530
Subject: [PATCH v12 1/4] Extend relmap interfaces

Support new interfaces in relmapper, 1) Support copying the
relmap file from one database path to the other database path.
2) And another interface for getting filenode from oid.  We already
have RelationMapOidToFilenode for the same purpose but that assumes
we are connected to the database for which we want to get the mapping.
So this new interface will do the same but instead, it will get the
mapping for the input database.

These interfaces are required for next patch, for supporting the
wal logged created database.
---
 src/backend/utils/cache/relmapper.c | 159 ++++++++++++++++++++++++++++--------
 src/include/utils/relmapper.h       |   7 +-
 2 files changed, 132 insertions(+), 34 deletions(-)

diff --git a/src/backend/utils/cache/relmapper.c b/src/backend/utils/cache/relmapper.c
index 4f6811f..6501110 100644
--- a/src/backend/utils/cache/relmapper.c
+++ b/src/backend/utils/cache/relmapper.c
@@ -136,10 +136,12 @@ static void apply_map_update(RelMapFile *map, Oid relationId, Oid fileNode,
 							 bool add_okay);
 static void merge_map_updates(RelMapFile *map, const RelMapFile *updates,
 							  bool add_okay);
-static void load_relmap_file(bool shared, bool lock_held);
+static void load_relmap_file(bool shared, bool lock_held, RelMapFile *dstmap,
+							 const char *dbpath);
 static void write_relmap_file(bool shared, RelMapFile *newmap,
 							  bool write_wal, bool send_sinval, bool preserve_files,
-							  Oid dbid, Oid tsid, const char *dbpath);
+							  Oid dbid, Oid tsid, const char *dbpath,
+							  bool update_relmap);
 static void perform_relmap_update(bool shared, const RelMapFile *updates);
 
 
@@ -250,6 +252,32 @@ RelationMapFilenodeToOid(Oid filenode, bool shared)
 }
 
 /*
+ * RelationMapOidToFilenodeForDatabase
+ *
+ * Same as RelationMapOidToFilenode, but instead of reading the mapping from
+ * the database we are connected to it will read the mapping from the input
+ * database.
+ */
+Oid
+RelationMapOidToFilenodeForDatabase(const char *dbpath, Oid relationId)
+{
+	RelMapFile	map;
+	int			i;
+
+	/* Read the relmap file from the source database. */
+	load_relmap_file(false, false, &map, dbpath);
+
+	/* Iterate over the relmap entries to find the input relation oid. */
+	for (i = 0; i < map.num_mappings; i++)
+	{
+		if (relationId == map.mappings[i].mapoid)
+			return map.mappings[i].mapfilenode;
+	}
+
+	return InvalidOid;
+}
+
+/*
  * RelationMapUpdateMap
  *
  * Install a new relfilenode mapping for the specified relation.
@@ -405,12 +433,12 @@ RelationMapInvalidate(bool shared)
 	if (shared)
 	{
 		if (shared_map.magic == RELMAPPER_FILEMAGIC)
-			load_relmap_file(true, false);
+			load_relmap_file(true, false, NULL, NULL);
 	}
 	else
 	{
 		if (local_map.magic == RELMAPPER_FILEMAGIC)
-			load_relmap_file(false, false);
+			load_relmap_file(false, false, NULL, NULL);
 	}
 }
 
@@ -425,9 +453,9 @@ void
 RelationMapInvalidateAll(void)
 {
 	if (shared_map.magic == RELMAPPER_FILEMAGIC)
-		load_relmap_file(true, false);
+		load_relmap_file(true, false, NULL, NULL);
 	if (local_map.magic == RELMAPPER_FILEMAGIC)
-		load_relmap_file(false, false);
+		load_relmap_file(false, false, NULL, NULL);
 }
 
 /*
@@ -569,9 +597,9 @@ RelationMapFinishBootstrap(void)
 
 	/* Write the files; no WAL or sinval needed */
 	write_relmap_file(true, &shared_map, false, false, false,
-					  InvalidOid, GLOBALTABLESPACE_OID, NULL);
+					  InvalidOid, GLOBALTABLESPACE_OID, NULL, false);
 	write_relmap_file(false, &local_map, false, false, false,
-					  MyDatabaseId, MyDatabaseTableSpace, DatabasePath);
+					  MyDatabaseId, MyDatabaseTableSpace, DatabasePath, false);
 }
 
 /*
@@ -612,7 +640,7 @@ RelationMapInitializePhase2(void)
 	/*
 	 * Load the shared map file, die on error.
 	 */
-	load_relmap_file(true, false);
+	load_relmap_file(true, false, NULL, NULL);
 }
 
 /*
@@ -633,7 +661,7 @@ RelationMapInitializePhase3(void)
 	/*
 	 * Load the local map file, die on error.
 	 */
-	load_relmap_file(false, false);
+	load_relmap_file(false, false, NULL, NULL);
 }
 
 /*
@@ -687,15 +715,46 @@ RestoreRelationMap(char *startAddress)
 }
 
 /*
+ * CopyRelationMap
+ *
+ * Copy relmapfile from source db path to the destination db path and WAL log
+ * the operation.  This function is only called during the create database, so
+ * the destination database is not yet visible to anyone else, thus we don't
+ * need to acquire the relmap lock while updating the destination relmap.
+ */
+void
+CopyRelationMap(Oid dbid, Oid tsid, const char *srcdbpath,
+				const char *dstdbpath)
+{
+	RelMapFile map;
+
+	/* Read the relmap file from the source database. */
+	load_relmap_file(false, false, &map, srcdbpath);
+
+	/*
+	 * Write map contents into the destination database's relmap file; no
+	 * sinval needed because there could be no one else connected to the
+	 * database we are creating now.
+	 */
+	write_relmap_file(false, &map, true, false, true, dbid, tsid, dstdbpath,
+					  true);
+}
+
+/*
  * load_relmap_file -- load data from the shared or local map file
  *
  * Because the map file is essential for access to core system catalogs,
  * failure to read it is a fatal error.
  *
- * Note that the local case requires DatabasePath to be set up.
+ * Note that the local case requires DatabasePath to be set up.  But during
+ * createdb we are not connected to the source database so we will have to pass
+ * the dbpath of the source database from which we want to read the relmap
+ * file.  And, we will have to pass a valid memory for the 'dstmap' into which
+ * we want to read the relmap.
  */
 static void
-load_relmap_file(bool shared, bool lock_held)
+load_relmap_file(bool shared, bool lock_held, RelMapFile *dstmap,
+				 const char *dbpath)
 {
 	RelMapFile *map;
 	char		mapfilename[MAXPGPATH];
@@ -703,7 +762,20 @@ load_relmap_file(bool shared, bool lock_held)
 	int			fd;
 	int			r;
 
-	if (shared)
+	/*
+	 * Prepare relmap file path.  If a valid dbpath is given then read the file
+	 * from that path.
+	 */
+	if (dbpath != NULL)
+	{
+		/* We must pass a valid dstmap for reading the mapfile contents. */
+		Assert(dstmap != NULL);
+
+		snprintf(mapfilename, sizeof(mapfilename), "%s/%s",
+				 dbpath, RELMAPPER_FILENAME);
+		map = dstmap;
+	}
+	else if (shared)
 	{
 		snprintf(mapfilename, sizeof(mapfilename), "global/%s",
 				 RELMAPPER_FILENAME);
@@ -796,12 +868,15 @@ load_relmap_file(bool shared, bool lock_held)
  * Because this may be called during WAL replay when MyDatabaseId,
  * DatabasePath, etc aren't valid, we require the caller to pass in suitable
  * values.  The caller is also responsible for being sure no concurrent
- * map update could be happening.
+ * map update could be happening.  This will also be called during create
+ * database and that time we are not connected to the database for which we
+ * have to write the relmap.  So we have to pass the valid dbpath for which we
+ * want to write the relmap file and also pass create as true.
  */
 static void
 write_relmap_file(bool shared, RelMapFile *newmap,
 				  bool write_wal, bool send_sinval, bool preserve_files,
-				  Oid dbid, Oid tsid, const char *dbpath)
+				  Oid dbid, Oid tsid, const char *dbpath, bool create)
 {
 	int			fd;
 	RelMapFile *realmap;
@@ -819,10 +894,18 @@ write_relmap_file(bool shared, RelMapFile *newmap,
 	FIN_CRC32C(newmap->crc);
 
 	/*
-	 * Open the target file.  We prefer to do this before entering the
-	 * critical section, so that an open() failure need not force PANIC.
+	 * Prepare the target mapfilename, and also set which relmap we want to
+	 * update.  But if the create is passed true then we don't need to update
+	 * the memory relmap because we are not connected to database for which
+	 * we are writing the relmap file.
 	 */
-	if (shared)
+	if (create)
+	{
+		snprintf(mapfilename, sizeof(mapfilename), "%s/%s",
+				 dbpath, RELMAPPER_FILENAME);
+		realmap = NULL;
+	}
+	else if (shared)
 	{
 		snprintf(mapfilename, sizeof(mapfilename), "global/%s",
 				 RELMAPPER_FILENAME);
@@ -853,6 +936,7 @@ write_relmap_file(bool shared, RelMapFile *newmap,
 		xlrec.dbid = dbid;
 		xlrec.tsid = tsid;
 		xlrec.nbytes = sizeof(RelMapFile);
+		xlrec.create = create;
 
 		XLogBeginInsert();
 		XLogRegisterData((char *) (&xlrec), MinSizeOfRelmapUpdate);
@@ -935,14 +1019,17 @@ write_relmap_file(bool shared, RelMapFile *newmap,
 	}
 
 	/*
-	 * Success, update permanent copy.  During bootstrap, we might be working
-	 * on the permanent copy itself, in which case skip the memcpy() to avoid
-	 * invoking nominally-undefined behavior.
+	 * Success, update permanent copy.  During bootstrap and the create
+	 * database, skip the memcpy().  Because during bootstrap, we might be
+	 * working on the permanent copy itself, whereas during create database
+	 * we are not connected to the database for which we are creating the
+	 * relmap file so it will be wrong to update the shared map of the current
+	 * database to which we are connected.
 	 */
-	if (realmap != newmap)
+	if (realmap != NULL && realmap != newmap)
 		memcpy(realmap, newmap, sizeof(RelMapFile));
 	else
-		Assert(!send_sinval);	/* must be bootstrapping */
+		Assert(!send_sinval);	/* must be bootstrapping or createdb */
 
 	/* Critical section done */
 	if (write_wal)
@@ -975,7 +1062,7 @@ perform_relmap_update(bool shared, const RelMapFile *updates)
 	LWLockAcquire(RelationMappingLock, LW_EXCLUSIVE);
 
 	/* Be certain we see any other updates just made */
-	load_relmap_file(shared, true);
+	load_relmap_file(shared, true, NULL, NULL);
 
 	/* Prepare updated data in a local variable */
 	if (shared)
@@ -993,7 +1080,7 @@ perform_relmap_update(bool shared, const RelMapFile *updates)
 	write_relmap_file(shared, &newmap, true, true, true,
 					  (shared ? InvalidOid : MyDatabaseId),
 					  (shared ? GLOBALTABLESPACE_OID : MyDatabaseTableSpace),
-					  DatabasePath);
+					  DatabasePath, false);
 
 	/* Now we can release the lock */
 	LWLockRelease(RelationMappingLock);
@@ -1025,18 +1112,24 @@ relmap_redo(XLogReaderState *record)
 		dbpath = GetDatabasePath(xlrec->dbid, xlrec->tsid);
 
 		/*
-		 * Write out the new map and send sinval, but of course don't write a
-		 * new WAL entry.  There's no surrounding transaction to tell to
-		 * preserve files, either.
+		 * Write out the new map and send sinval if create is not set because
+		 * in case of create there should be no one else accessing the relmap.
+		 * But of course don't write a new WAL entry.  There's no surrounding
+		 * transaction to tell to preserve files, either.
 		 *
 		 * There shouldn't be anyone else updating relmaps during WAL replay,
-		 * but grab the lock to interlock against load_relmap_file().
+		 * but grab the lock to interlock against load_relmap_file().  But if
+		 * create is set then we don't need to lock because we are creating a
+		 * new database so there can be absolutely no one else looking at its
+		 * relmap file.
 		 */
-		LWLockAcquire(RelationMappingLock, LW_EXCLUSIVE);
+		if (!xlrec->create)
+			LWLockAcquire(RelationMappingLock, LW_EXCLUSIVE);
 		write_relmap_file((xlrec->dbid == InvalidOid), &newmap,
-						  false, true, false,
-						  xlrec->dbid, xlrec->tsid, dbpath);
-		LWLockRelease(RelationMappingLock);
+						  false, !xlrec->create, false,
+						  xlrec->dbid, xlrec->tsid, dbpath, xlrec->create);
+		if (!xlrec->create)
+			LWLockRelease(RelationMappingLock);
 
 		pfree(dbpath);
 	}
diff --git a/src/include/utils/relmapper.h b/src/include/utils/relmapper.h
index 9fbb5a7..b8c7ef0 100644
--- a/src/include/utils/relmapper.h
+++ b/src/include/utils/relmapper.h
@@ -29,6 +29,7 @@ typedef struct xl_relmap_update
 	Oid			dbid;			/* database ID, or 0 for shared map */
 	Oid			tsid;			/* database's tablespace, or pg_global */
 	int32		nbytes;			/* size of relmap data */
+	bool		create;			/* true if creating new relmap */
 	char		data[FLEXIBLE_ARRAY_MEMBER];
 } xl_relmap_update;
 
@@ -39,6 +40,9 @@ extern Oid	RelationMapOidToFilenode(Oid relationId, bool shared);
 
 extern Oid	RelationMapFilenodeToOid(Oid relationId, bool shared);
 
+extern Oid RelationMapOidToFilenodeForDatabase(const char *dbpath,
+											   Oid relationId);
+
 extern void RelationMapUpdateMap(Oid relationId, Oid fileNode, bool shared,
 								 bool immediate);
 
@@ -62,7 +66,8 @@ extern void RelationMapInitializePhase3(void);
 extern Size EstimateRelationMapSpace(void);
 extern void SerializeRelationMap(Size maxSize, char *startAddress);
 extern void RestoreRelationMap(char *startAddress);
-
+extern void CopyRelationMap(Oid dbid, Oid tsid, const char *srcdbpath,
+							const char *dstdbpath);
 extern void relmap_redo(XLogReaderState *record);
 extern void relmap_desc(StringInfo buf, XLogReaderState *record);
 extern const char *relmap_identify(uint8 info);
-- 
1.8.3.1

From 2b54a88af311c67f7c36418f4e530c81d36e5a78 Mon Sep 17 00:00:00 2001
From: Dilip Kumar <dilipkumar@localhost.localdomain>
Date: Fri, 24 Sep 2021 18:29:17 +0530
Subject: [PATCH v12 3/4] New interface to lock relation id

Currently, we have LockRelationOid which provide a mechanism to
lock the relation oid but we must be connected to the database
from which this relation belong.  As part of this patch we are
providing a new interface which can lock the relation even if we
are not connected to the containing database.
---
 src/backend/storage/lmgr/lmgr.c | 28 ++++++++++++++++++++++++++++
 src/include/storage/lmgr.h      |  1 +
 2 files changed, 29 insertions(+)

diff --git a/src/backend/storage/lmgr/lmgr.c b/src/backend/storage/lmgr/lmgr.c
index 5ae52dd..1543da6 100644
--- a/src/backend/storage/lmgr/lmgr.c
+++ b/src/backend/storage/lmgr/lmgr.c
@@ -176,6 +176,34 @@ ConditionalLockRelationOid(Oid relid, LOCKMODE lockmode)
 }
 
 /*
+ *		LockRelationId
+ *
+ * Lock, given a LockRelId.  Same as LockRelationOid but take LockRelId as an
+ * input.
+ */
+void
+LockRelationId(LockRelId *relid, LOCKMODE lockmode)
+{
+	LOCKTAG		tag;
+	LOCALLOCK  *locallock;
+	LockAcquireResult res;
+
+	SET_LOCKTAG_RELATION(tag, relid->dbId, relid->relId);
+
+	res = LockAcquireExtended(&tag, lockmode, false, false, true, &locallock);
+
+	/*
+	 * Now that we have the lock, check for invalidation messages; see notes
+	 * in LockRelationOid.
+	 */
+	if (res != LOCKACQUIRE_ALREADY_CLEAR)
+	{
+		AcceptInvalidationMessages();
+		MarkLockClear(locallock);
+	}
+}
+
+/*
  *		UnlockRelationId
  *
  * Unlock, given a LockRelId.  This is preferred over UnlockRelationOid
diff --git a/src/include/storage/lmgr.h b/src/include/storage/lmgr.h
index 49edbcc..be1d2c9 100644
--- a/src/include/storage/lmgr.h
+++ b/src/include/storage/lmgr.h
@@ -38,6 +38,7 @@ extern void RelationInitLockInfo(Relation relation);
 
 /* Lock a relation */
 extern void LockRelationOid(Oid relid, LOCKMODE lockmode);
+extern void LockRelationId(LockRelId *relid, LOCKMODE lockmode);
 extern bool ConditionalLockRelationOid(Oid relid, LOCKMODE lockmode);
 extern void UnlockRelationId(LockRelId *relid, LOCKMODE lockmode);
 extern void UnlockRelationOid(Oid relid, LOCKMODE lockmode);
-- 
1.8.3.1

From f8dda8ea34673ab12872c7d3bcc7e95610d86f1a Mon Sep 17 00:00:00 2001
From: Dilip Kumar <dilipkumar@localhost.localdomain>
Date: Thu, 10 Feb 2022 15:55:33 +0530
Subject: [PATCH v12 2/4] Extend bufmgr interfaces

Extend ReadBufferWithoutRelcache interface to take relpersistence
as input. At present, this function may only be used on permanent
relations, because we only use it during XLOG replay.  But now as
part of the bigger patch set, we will be using this for reading the
buffer from the database which we are not connected so now we might
have temporary and unlogged relations as well.
---
 src/backend/access/transam/xlogutils.c |  9 ++++++---
 src/backend/storage/buffer/bufmgr.c    | 11 ++---------
 src/include/storage/bufmgr.h           |  3 ++-
 3 files changed, 10 insertions(+), 13 deletions(-)

diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c
index 54d5f20..c292794 100644
--- a/src/backend/access/transam/xlogutils.c
+++ b/src/backend/access/transam/xlogutils.c
@@ -484,7 +484,8 @@ XLogReadBufferExtended(RelFileNode rnode, ForkNumber forknum,
 	{
 		/* page exists in file */
 		buffer = ReadBufferWithoutRelcache(rnode, forknum, blkno,
-										   mode, NULL);
+										   mode, NULL,
+										   RELPERSISTENCE_PERMANENT);
 	}
 	else
 	{
@@ -509,7 +510,8 @@ XLogReadBufferExtended(RelFileNode rnode, ForkNumber forknum,
 				ReleaseBuffer(buffer);
 			}
 			buffer = ReadBufferWithoutRelcache(rnode, forknum,
-											   P_NEW, mode, NULL);
+											   P_NEW, mode, NULL,
+											   RELPERSISTENCE_PERMANENT);
 		}
 		while (BufferGetBlockNumber(buffer) < blkno);
 		/* Handle the corner case that P_NEW returns non-consecutive pages */
@@ -519,7 +521,8 @@ XLogReadBufferExtended(RelFileNode rnode, ForkNumber forknum,
 				LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
 			ReleaseBuffer(buffer);
 			buffer = ReadBufferWithoutRelcache(rnode, forknum, blkno,
-											   mode, NULL);
+											   mode, NULL,
+											   RELPERSISTENCE_PERMANENT);
 		}
 	}
 
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index f5459c6..0ed2d31 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -771,24 +771,17 @@ ReadBufferExtended(Relation reln, ForkNumber forkNum, BlockNumber blockNum,
 /*
  * ReadBufferWithoutRelcache -- like ReadBufferExtended, but doesn't require
  *		a relcache entry for the relation.
- *
- * NB: At present, this function may only be used on permanent relations, which
- * is OK, because we only use it during XLOG replay.  If in the future we
- * want to use it on temporary or unlogged relations, we could pass additional
- * parameters.
  */
 Buffer
 ReadBufferWithoutRelcache(RelFileNode rnode, ForkNumber forkNum,
 						  BlockNumber blockNum, ReadBufferMode mode,
-						  BufferAccessStrategy strategy)
+						  BufferAccessStrategy strategy, char relpersistence)
 {
 	bool		hit;
 
 	SMgrRelation smgr = smgropen(rnode, InvalidBackendId);
 
-	Assert(InRecovery);
-
-	return ReadBuffer_common(smgr, RELPERSISTENCE_PERMANENT, forkNum, blockNum,
+	return ReadBuffer_common(smgr, relpersistence, forkNum, blockNum,
 							 mode, strategy, &hit);
 }
 
diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h
index dd01841..7b80f58 100644
--- a/src/include/storage/bufmgr.h
+++ b/src/include/storage/bufmgr.h
@@ -184,7 +184,8 @@ extern Buffer ReadBufferExtended(Relation reln, ForkNumber forkNum,
 								 BufferAccessStrategy strategy);
 extern Buffer ReadBufferWithoutRelcache(RelFileNode rnode,
 										ForkNumber forkNum, BlockNumber blockNum,
-										ReadBufferMode mode, BufferAccessStrategy strategy);
+										ReadBufferMode mode, BufferAccessStrategy strategy,
+										char relpersistence);
 extern void ReleaseBuffer(Buffer buffer);
 extern void UnlockReleaseBuffer(Buffer buffer);
 extern void MarkBufferDirty(Buffer buffer);
-- 
1.8.3.1

Reply via email to