Thanks! Version 20 is attached.

At Wed, 30 Mar 2022 08:44:02 -0500, Justin Pryzby <pry...@telsasoft.com> wrote 
in 
> On Tue, Mar 01, 2022 at 02:14:13PM +0900, Kyotaro Horiguchi wrote:
> > Rebased on a recent xlog refactoring.
> 
> It'll come as no surprise that this neds to be rebased again.
> At least a few typos I reported in January aren't fixed.
> Set to "waiting".

Oh, I'm sorry for overlooking it. It somehow didn't show up on my
mailer.

> I started looking at this and reviewed docs and comments again.
> 
> > +typedef struct PendingCleanup
> > +{
> > +   RelFileNode relnode;            /* relation that may need to be deleted 
> > */
> > +   int                     op;                             /* operation 
> > mask */
> > +   bool            bufpersistence; /* buffer persistence to set */
> > +   int                     unlink_forknum; /* forknum to unlink */
> 
> This can be of data type "ForkNumber"

Right. Fixed.

> > +    * We are going to create an init fork. If server crashes before the
> > +    * current transaction ends the init fork left alone corrupts data while
> > +    * recovery.  The mark file works as the sentinel to identify that
> > +    * situation.
> 
> s/while/during/

This was in v17, but dissapeared in v18.

> > +    * index-init fork needs further initialization. ambuildempty shoud do
> 
> should (I reported this before)
> 
> > +   if (inxact_created)
> > +   {
> > +           SMgrRelation srel = smgropen(rnode, InvalidBackendId);
> > +
> > +           /*
> > +            * INIT forks never be loaded to shared buffer so no point in 
> > dropping
> 
> "are never loaded"

If was fixed in v18.

> > +   elog(DEBUG1, "perform in-place persistnce change");
> 
> persistence (I reported this before)

Sorry. Fixed.

> > +           /*
> > +            * While wal_level >= replica, switching to LOGGED requires the
> > +            * relation content to be WAL-logged to recover the table.
> > +            * We don't emit this fhile wal_level = minimal.
> 
> while (or "if")

There are "While" and "fhile". I changed the latter to "if".

> > +                    * The relation is persistent and stays remain 
> > persistent. Don't
> > +                    * drop the buffers for this relation.
> 
> "stays remain" is redundant (I reported this before)

Thanks. I changed it to "stays persistent".

> > +                   if (unlink(rm_path) < 0)
> > +                           ereport(ERROR,
> > +                                           (errcode_for_file_access(),
> > +                                            errmsg("could not remove file 
> > \"%s\": %m",
> > +                                                           rm_path)));
> 
> The parens around errcode are unnecessary since last year.
> I suggest to avoid using them here and elsewhere.

It is just moved from elsewhere without editing, but of course I can
do that.  I didn't know about that change of ereport and not found the
corresponding commit, but I found that Tom mentioned that change.

https://www.postgresql.org/message-id/flat/5063.1584641224%40sss.pgh.pa.us#63e611c30800133bbddb48de857668e8
> Now that we can rely on having varargs macros, I think we could
> stop requiring the extra level of parentheses, ie instead of
...
>         ereport(ERROR,
>                 errcode(ERRCODE_DIVISION_BY_ZERO),
>                 errmsg("division by zero"));
> 
> (The old syntax had better still work, of course.  I'm not advocating
> running around and changing existing calls.)

I changed all ereport calls added by this patch to this style.

> > +                    * And we may have SMGR_MARK_UNCOMMITTED file.  Remove 
> > it if the
> > +                    * fork files has been successfully removed. It's ok if 
> > the file
> 
> file

Fixed.

> > +     <para>
> > +      All tables in the current database in a tablespace can be changed by 
> > using
> 
> given tablespace

I did /database in a tablespace/database in the given tablespace/. Is
it right?

> > +      the <literal>ALL IN TABLESPACE</literal> form, which will lock all 
> > tables
> 
> which will first lock
>
> > +      to be changed first and then change each one.  This form also 
> > supports
> 
> remove "first" here

This is almost a dead copy of the description of SET TABLESPACE. This
change makes the two almost the same description vary slightly in that
wordings.  Anyway I did that as suggested only for the part this patch
adds in this version.

> > +      <literal>OWNED BY</literal>, which will only change tables owned by 
> > the
> > +      roles specified.  If the <literal>NOWAIT</literal> option is 
> > specified
> 
> specified roles.
> is specified, (comma)

This is the same as above. I did that but it makes the description
differ from another almost-the-same description.

> > +      then the command will fail if it is unable to acquire all of the 
> > locks
> 
> if it is unable to immediately acquire
>
> > +      required immediately.  The <literal>information_schema</literal>
> 
> remove immediately

Ditto.

> > +      relations are not considered part of the system catalogs and will be
> 
> I think you need to first say that "relations in the pg_catalog schema cannot
> be changed".

Mmm. I don't agree on this.  Aren't such "exceptions"-ish descriptions
usually placed after the descriptions of how the feature works? This
is also the same structure with SET TABLESPACE.

> in patch 2/2:
> typo: persistene

Hmm. Bad. I checked the spellings of the whole patches and found some
typos.

+                                        * The crashed trasaction did SET 
UNLOGGED. This relation
+                                        * is restored to a LOGGED relation.
s/trasaction/transaction/

+                               errmsg("could not crete mark file \"%s\": %m", 
path));
s/crete/create/

Then rebased on 9c08aea6a3 then pgindent'ed.

Thanks!

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
>From d2fb934c20912d4e1fe091805ff4790addd8f77d Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horikyoga....@gmail.com>
Date: Wed, 11 Nov 2020 21:51:11 +0900
Subject: [PATCH v20 1/2] In-place table persistence change

Even though ALTER TABLE SET LOGGED/UNLOGGED does not require data
rewriting, currently it runs heap rewrite which causes large amount of
file I/O.  This patch makes the command run without heap rewrite.
Addition to that, SET LOGGED while wal_level > minimal emits WAL using
XLOG_FPI instead of massive number of HEAP_INSERT's, which should be
smaller.

Also this allows for the cleanup of files left behind in the crash of
the transaction that created it.
---
 src/backend/access/rmgrdesc/smgrdesc.c    |  49 ++
 src/backend/access/transam/README         |   9 +
 src/backend/access/transam/xact.c         |   7 +
 src/backend/access/transam/xlogrecovery.c |  18 +
 src/backend/catalog/storage.c             | 548 +++++++++++++++++++++-
 src/backend/commands/tablecmds.c          | 266 +++++++++--
 src/backend/replication/basebackup.c      |   3 +-
 src/backend/storage/buffer/bufmgr.c       |  85 ++++
 src/backend/storage/file/fd.c             |   4 +-
 src/backend/storage/file/reinit.c         | 340 ++++++++++----
 src/backend/storage/smgr/md.c             |  94 +++-
 src/backend/storage/smgr/smgr.c           |  32 ++
 src/backend/storage/sync/sync.c           |  21 +-
 src/bin/pg_rewind/parsexlog.c             |  22 +
 src/bin/pg_rewind/pg_rewind.c             |   1 -
 src/common/relpath.c                      |  47 +-
 src/include/catalog/storage.h             |   3 +
 src/include/catalog/storage_xlog.h        |  42 +-
 src/include/common/relpath.h              |   9 +-
 src/include/storage/bufmgr.h              |   2 +
 src/include/storage/fd.h                  |   1 +
 src/include/storage/md.h                  |   8 +-
 src/include/storage/reinit.h              |  10 +-
 src/include/storage/smgr.h                |  17 +
 src/tools/pgindent/typedefs.list          |   7 +
 25 files changed, 1464 insertions(+), 181 deletions(-)

diff --git a/src/backend/access/rmgrdesc/smgrdesc.c b/src/backend/access/rmgrdesc/smgrdesc.c
index 7547813254..225ffbafef 100644
--- a/src/backend/access/rmgrdesc/smgrdesc.c
+++ b/src/backend/access/rmgrdesc/smgrdesc.c
@@ -40,6 +40,46 @@ smgr_desc(StringInfo buf, XLogReaderState *record)
 						 xlrec->blkno, xlrec->flags);
 		pfree(path);
 	}
+	else if (info == XLOG_SMGR_UNLINK)
+	{
+		xl_smgr_unlink *xlrec = (xl_smgr_unlink *) rec;
+		char	   *path = relpathperm(xlrec->rnode, xlrec->forkNum);
+
+		appendStringInfoString(buf, path);
+		pfree(path);
+	}
+	else if (info == XLOG_SMGR_MARK)
+	{
+		xl_smgr_mark *xlrec = (xl_smgr_mark *) rec;
+		char	   *path = GetRelationPath(xlrec->rnode.dbNode,
+										   xlrec->rnode.spcNode,
+										   xlrec->rnode.relNode,
+										   InvalidBackendId,
+										   xlrec->forkNum, xlrec->mark);
+		char	   *action = "<none>";
+
+		switch (xlrec->action)
+		{
+			case XLOG_SMGR_MARK_CREATE:
+				action = "CREATE";
+				break;
+			case XLOG_SMGR_MARK_UNLINK:
+				action = "DELETE";
+				break;
+		}
+
+		appendStringInfo(buf, "%s %s", action, path);
+		pfree(path);
+	}
+	else if (info == XLOG_SMGR_BUFPERSISTENCE)
+	{
+		xl_smgr_bufpersistence *xlrec = (xl_smgr_bufpersistence *) rec;
+		char	   *path = relpathperm(xlrec->rnode, MAIN_FORKNUM);
+
+		appendStringInfoString(buf, path);
+		appendStringInfo(buf, " persistence %d", xlrec->persistence);
+		pfree(path);
+	}
 }
 
 const char *
@@ -55,6 +95,15 @@ smgr_identify(uint8 info)
 		case XLOG_SMGR_TRUNCATE:
 			id = "TRUNCATE";
 			break;
+		case XLOG_SMGR_UNLINK:
+			id = "UNLINK";
+			break;
+		case XLOG_SMGR_MARK:
+			id = "MARK";
+			break;
+		case XLOG_SMGR_BUFPERSISTENCE:
+			id = "BUFPERSISTENCE";
+			break;
 	}
 
 	return id;
diff --git a/src/backend/access/transam/README b/src/backend/access/transam/README
index 1edc8180c1..2ecd8c8c7c 100644
--- a/src/backend/access/transam/README
+++ b/src/backend/access/transam/README
@@ -725,6 +725,15 @@ then restart recovery.  This is part of the reason for not writing a WAL
 entry until we've successfully done the original action.
 
 
+The Smgr MARK files
+--------------------------------
+
+An smgr mark file is created when a new relation file is created to
+mark the relfilenode needs to be cleaned up at recovery time.  In
+contrast to the four actions above, failure to remove smgr mark files
+will lead to data loss, in which case the server will shut down.
+
+
 Skipping WAL for New RelFileNode
 --------------------------------
 
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 3596a7d734..f48d950895 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -2199,6 +2199,9 @@ CommitTransaction(void)
 	 */
 	smgrDoPendingSyncs(true, is_parallel_worker);
 
+	/* Likewise delete mark files for files created during this transaction. */
+	smgrDoPendingCleanups(true);
+
 	/* close large objects before lower-level cleanup */
 	AtEOXact_LargeObject(true);
 
@@ -2449,6 +2452,9 @@ PrepareTransaction(void)
 	 */
 	smgrDoPendingSyncs(true, false);
 
+	/* Likewise delete mark files for files created during this transaction. */
+	smgrDoPendingCleanups(true);
+
 	/* close large objects before lower-level cleanup */
 	AtEOXact_LargeObject(true);
 
@@ -2774,6 +2780,7 @@ AbortTransaction(void)
 	AfterTriggerEndXact(false); /* 'false' means it's abort */
 	AtAbort_Portals();
 	smgrDoPendingSyncs(false, is_parallel_worker);
+	smgrDoPendingCleanups(false);
 	AtEOXact_LargeObject(false);
 	AtAbort_Notify();
 	AtEOXact_RelationMap(false, is_parallel_worker);
diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c
index 8d2395dae2..e7786a3851 100644
--- a/src/backend/access/transam/xlogrecovery.c
+++ b/src/backend/access/transam/xlogrecovery.c
@@ -40,6 +40,7 @@
 #include "access/xlogrecovery.h"
 #include "access/xlogutils.h"
 #include "catalog/pg_control.h"
+#include "catalog/storage.h"
 #include "commands/tablespace.h"
 #include "miscadmin.h"
 #include "pgstat.h"
@@ -53,6 +54,7 @@
 #include "storage/pmsignal.h"
 #include "storage/proc.h"
 #include "storage/procarray.h"
+#include "storage/reinit.h"
 #include "storage/spin.h"
 #include "utils/builtins.h"
 #include "utils/guc.h"
@@ -1746,6 +1748,14 @@ PerformWalRecovery(void)
 			}
 		}
 
+		/* cleanup garbage files left during crash recovery */
+		if (!InArchiveRecovery)
+			ResetUnloggedRelations(UNLOGGED_RELATION_DROP_BUFFER |
+								   UNLOGGED_RELATION_CLEANUP);
+
+		/* run rollback cleanup if any */
+		smgrDoPendingDeletes(false);
+
 		/* Allow resource managers to do any required cleanup. */
 		for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
 		{
@@ -3026,6 +3036,14 @@ ReadRecord(XLogReaderState *xlogreader, int emode,
 			{
 				ereport(DEBUG1,
 						(errmsg_internal("reached end of WAL in pg_wal, entering archive recovery")));
+
+				/* cleanup garbage files left during crash recovery */
+				ResetUnloggedRelations(UNLOGGED_RELATION_DROP_BUFFER |
+									   UNLOGGED_RELATION_CLEANUP);
+
+				/* run rollback cleanup if any */
+				smgrDoPendingDeletes(false);
+
 				InArchiveRecovery = true;
 				if (StandbyModeRequested)
 					StandbyMode = true;
diff --git a/src/backend/catalog/storage.c b/src/backend/catalog/storage.c
index 9898701a43..ab8ec34c3d 100644
--- a/src/backend/catalog/storage.c
+++ b/src/backend/catalog/storage.c
@@ -19,6 +19,7 @@
 
 #include "postgres.h"
 
+#include "access/amapi.h"
 #include "access/parallel.h"
 #include "access/visibilitymap.h"
 #include "access/xact.h"
@@ -66,6 +67,23 @@ typedef struct PendingRelDelete
 	struct PendingRelDelete *next;	/* linked-list link */
 } PendingRelDelete;
 
+#define	PCOP_UNLINK_FORK		(1 << 0)
+#define	PCOP_UNLINK_MARK		(1 << 1)
+#define	PCOP_SET_PERSISTENCE	(1 << 2)
+
+typedef struct PendingCleanup
+{
+	RelFileNode relnode;		/* relation that may need to be deleted */
+	int			op;				/* operation mask */
+	bool		bufpersistence; /* buffer persistence to set */
+	ForkNumber	unlink_forknum; /* forknum to unlink */
+	StorageMarks unlink_mark;	/* mark to unlink */
+	BackendId	backend;		/* InvalidBackendId if not a temp rel */
+	bool		atCommit;		/* T=delete at commit; F=delete at abort */
+	int			nestLevel;		/* xact nesting level of request */
+	struct PendingCleanup *next;	/* linked-list link */
+}			PendingCleanup;
+
 typedef struct PendingRelSync
 {
 	RelFileNode rnode;
@@ -73,6 +91,7 @@ typedef struct PendingRelSync
 } PendingRelSync;
 
 static PendingRelDelete *pendingDeletes = NULL; /* head of linked list */
+static PendingCleanup * pendingCleanups = NULL; /* head of linked list */
 HTAB	   *pendingSyncHash = NULL;
 
 
@@ -145,7 +164,14 @@ RelationCreateStorage(RelFileNode rnode, char relpersistence,
 			return NULL;		/* placate compiler */
 	}
 
+	/*
+	 * We are going to create a new storage file. If server crashes before the
+	 * current transaction ends the file needs to be cleaned up. The
+	 * SMGR_MARK_UNCOMMITED mark file prompts that work at the next startup.
+	 */
 	srel = smgropen(rnode, backend);
+	log_smgrcreatemark(&rnode, MAIN_FORKNUM, SMGR_MARK_UNCOMMITTED);
+	smgrcreatemark(srel, MAIN_FORKNUM, SMGR_MARK_UNCOMMITTED, false);
 	smgrcreate(srel, MAIN_FORKNUM, false);
 
 	if (needs_wal)
@@ -157,16 +183,30 @@ RelationCreateStorage(RelFileNode rnode, char relpersistence,
 	 */
 	if (register_delete)
 	{
-		PendingRelDelete *pending;
+		PendingRelDelete *pendingdel;
+		PendingCleanup *pendingclean;
 
-		pending = (PendingRelDelete *)
+		pendingdel = (PendingRelDelete *)
 			MemoryContextAlloc(TopMemoryContext, sizeof(PendingRelDelete));
-		pending->relnode = rnode;
-		pending->backend = backend;
-		pending->atCommit = false;	/* delete if abort */
-		pending->nestLevel = GetCurrentTransactionNestLevel();
-		pending->next = pendingDeletes;
-		pendingDeletes = pending;
+		pendingdel->relnode = rnode;
+		pendingdel->backend = backend;
+		pendingdel->atCommit = false;	/* delete if abort */
+		pendingdel->nestLevel = GetCurrentTransactionNestLevel();
+		pendingdel->next = pendingDeletes;
+		pendingDeletes = pendingdel;
+
+		/* drop mark files at commit */
+		pendingclean = (PendingCleanup *)
+			MemoryContextAlloc(TopMemoryContext, sizeof(PendingCleanup));
+		pendingclean->relnode = rnode;
+		pendingclean->op = PCOP_UNLINK_MARK;
+		pendingclean->unlink_forknum = MAIN_FORKNUM;
+		pendingclean->unlink_mark = SMGR_MARK_UNCOMMITTED;
+		pendingclean->backend = backend;
+		pendingclean->atCommit = true;
+		pendingclean->nestLevel = GetCurrentTransactionNestLevel();
+		pendingclean->next = pendingCleanups;
+		pendingCleanups = pendingclean;
 	}
 
 	if (relpersistence == RELPERSISTENCE_PERMANENT && !XLogIsNeeded())
@@ -178,6 +218,200 @@ RelationCreateStorage(RelFileNode rnode, char relpersistence,
 	return srel;
 }
 
+/*
+ * RelationCreateInitFork
+ *		Create physical storage for the init fork of a relation.
+ *
+ * Create the init fork for the relation.
+ *
+ * This function is transactional. The creation is WAL-logged, and if the
+ * transaction aborts later on, the init fork will be removed.
+ */
+void
+RelationCreateInitFork(Relation rel)
+{
+	RelFileNode rnode = rel->rd_node;
+	PendingCleanup *pending;
+	PendingCleanup *prev;
+	PendingCleanup *next;
+	SMgrRelation srel;
+	bool		create = true;
+
+	/* switch buffer persistence */
+	SetRelationBuffersPersistence(RelationGetSmgr(rel), false, false);
+
+	/*
+	 * If we have a pending-unlink for the init-fork of this relation, that
+	 * means the init-fork exists since before the current transaction
+	 * started. This function reverts that change just by removing the entry.
+	 * See RelationDropInitFork.
+	 */
+	prev = NULL;
+	for (pending = pendingCleanups; pending != NULL; pending = next)
+	{
+		next = pending->next;
+
+		if (RelFileNodeEquals(rnode, pending->relnode) &&
+			pending->unlink_forknum == INIT_FORKNUM)
+		{
+			if (prev)
+				prev->next = next;
+			else
+				pendingCleanups = next;
+
+			pfree(pending);
+			/* prev does not change */
+
+			create = false;
+		}
+		else
+			prev = pending;
+	}
+
+	if (!create)
+		return;
+
+	/* create the init fork, along with the commit-sentinel file */
+	srel = smgropen(rnode, InvalidBackendId);
+	log_smgrcreatemark(&rnode, INIT_FORKNUM, SMGR_MARK_UNCOMMITTED);
+	smgrcreatemark(srel, INIT_FORKNUM, SMGR_MARK_UNCOMMITTED, false);
+
+	/* We don't have existing init fork, create it. */
+	smgrcreate(srel, INIT_FORKNUM, false);
+
+	/*
+	 * init fork for indexes needs further initialization. ambuildempty should
+	 * do WAL-log and file sync by itself but otherwise we do that by
+	 * ourselves.
+	 */
+	if (rel->rd_rel->relkind == RELKIND_INDEX)
+		rel->rd_indam->ambuildempty(rel);
+	else
+	{
+		log_smgrcreate(&rnode, INIT_FORKNUM);
+		smgrimmedsync(srel, INIT_FORKNUM);
+	}
+
+	/* drop the init fork, mark file and revert persistence at abort */
+	pending = (PendingCleanup *)
+		MemoryContextAlloc(TopMemoryContext, sizeof(PendingCleanup));
+	pending->relnode = rnode;
+	pending->op = PCOP_UNLINK_FORK | PCOP_UNLINK_MARK | PCOP_SET_PERSISTENCE;
+	pending->unlink_forknum = INIT_FORKNUM;
+	pending->unlink_mark = SMGR_MARK_UNCOMMITTED;
+	pending->bufpersistence = true;
+	pending->backend = InvalidBackendId;
+	pending->atCommit = false;
+	pending->nestLevel = GetCurrentTransactionNestLevel();
+	pending->next = pendingCleanups;
+	pendingCleanups = pending;
+
+	/* drop mark file at commit */
+	pending = (PendingCleanup *)
+		MemoryContextAlloc(TopMemoryContext, sizeof(PendingCleanup));
+	pending->relnode = rnode;
+	pending->op = PCOP_UNLINK_MARK;
+	pending->unlink_forknum = INIT_FORKNUM;
+	pending->unlink_mark = SMGR_MARK_UNCOMMITTED;
+	pending->backend = InvalidBackendId;
+	pending->atCommit = true;
+	pending->nestLevel = GetCurrentTransactionNestLevel();
+	pending->next = pendingCleanups;
+	pendingCleanups = pending;
+}
+
+/*
+ * RelationDropInitFork
+ *		Delete physical storage for the init fork of a relation.
+ *
+ * Register pending-delete of the init fork. The real deletion is performed by
+ * smgrDoPendingDeletes at commit.
+ *
+ * This function is transactional. If the transaction aborts later on, the
+ * deletion is canceled.
+ */
+void
+RelationDropInitFork(Relation rel)
+{
+	RelFileNode rnode = rel->rd_node;
+	PendingCleanup *pending;
+	PendingCleanup *prev;
+	PendingCleanup *next;
+	bool		inxact_created = false;
+
+	/* switch buffer persistence */
+	SetRelationBuffersPersistence(RelationGetSmgr(rel), true, false);
+
+	/*
+	 * If we have a pending-unlink for the init-fork of this relation, that
+	 * means the init fork is created in the current transaction.  We remove
+	 * both the init fork and mark file immediately in that case.  Otherwise
+	 * just register a pending-unlink for the existing init fork.  See
+	 * RelationCreateInitFork.
+	 */
+	prev = NULL;
+	for (pending = pendingCleanups; pending != NULL; pending = next)
+	{
+		next = pending->next;
+
+		if (RelFileNodeEquals(rnode, pending->relnode) &&
+			pending->unlink_forknum != INIT_FORKNUM)
+		{
+			/* unlink list entry */
+			if (prev)
+				prev->next = next;
+			else
+				pendingCleanups = next;
+
+			pfree(pending);
+			/* prev does not change */
+
+			inxact_created = true;
+		}
+		else
+			prev = pending;
+	}
+
+	if (inxact_created)
+	{
+		SMgrRelation srel = smgropen(rnode, InvalidBackendId);
+
+		/*
+		 * INIT forks are never loaded to shared buffer so no point in
+		 * dropping buffers for such files.
+		 */
+		log_smgrunlinkmark(&rnode, INIT_FORKNUM, SMGR_MARK_UNCOMMITTED);
+		smgrunlinkmark(srel, INIT_FORKNUM, SMGR_MARK_UNCOMMITTED, false);
+		log_smgrunlink(&rnode, INIT_FORKNUM);
+		smgrunlink(srel, INIT_FORKNUM, false);
+		return;
+	}
+
+	/* register drop of this init fork file at commit */
+	pending = (PendingCleanup *)
+		MemoryContextAlloc(TopMemoryContext, sizeof(PendingCleanup));
+	pending->relnode = rnode;
+	pending->op = PCOP_UNLINK_FORK;
+	pending->unlink_forknum = INIT_FORKNUM;
+	pending->backend = InvalidBackendId;
+	pending->atCommit = true;
+	pending->nestLevel = GetCurrentTransactionNestLevel();
+	pending->next = pendingCleanups;
+	pendingCleanups = pending;
+
+	/* revert buffer-persistence changes at abort */
+	pending = (PendingCleanup *)
+		MemoryContextAlloc(TopMemoryContext, sizeof(PendingCleanup));
+	pending->relnode = rnode;
+	pending->op = PCOP_SET_PERSISTENCE;
+	pending->bufpersistence = false;
+	pending->backend = InvalidBackendId;
+	pending->atCommit = false;
+	pending->nestLevel = GetCurrentTransactionNestLevel();
+	pending->next = pendingCleanups;
+	pendingCleanups = pending;
+}
+
 /*
  * Perform XLogInsert of an XLOG_SMGR_CREATE record to WAL.
  */
@@ -197,6 +431,88 @@ log_smgrcreate(const RelFileNode *rnode, ForkNumber forkNum)
 	XLogInsert(RM_SMGR_ID, XLOG_SMGR_CREATE | XLR_SPECIAL_REL_UPDATE);
 }
 
+/*
+ * Perform XLogInsert of an XLOG_SMGR_UNLINK record to WAL.
+ */
+void
+log_smgrunlink(const RelFileNode *rnode, ForkNumber forkNum)
+{
+	xl_smgr_unlink xlrec;
+
+	/*
+	 * Make an XLOG entry reporting the file unlink.
+	 */
+	xlrec.rnode = *rnode;
+	xlrec.forkNum = forkNum;
+
+	XLogBeginInsert();
+	XLogRegisterData((char *) &xlrec, sizeof(xlrec));
+	XLogInsert(RM_SMGR_ID, XLOG_SMGR_UNLINK | XLR_SPECIAL_REL_UPDATE);
+}
+
+/*
+ * Perform XLogInsert of an XLOG_SMGR_CREATEMARK record to WAL.
+ */
+void
+log_smgrcreatemark(const RelFileNode *rnode, ForkNumber forkNum,
+				   StorageMarks mark)
+{
+	xl_smgr_mark xlrec;
+
+	/*
+	 * Make an XLOG entry reporting the file creation.
+	 */
+	xlrec.rnode = *rnode;
+	xlrec.forkNum = forkNum;
+	xlrec.mark = mark;
+	xlrec.action = XLOG_SMGR_MARK_CREATE;
+
+	XLogBeginInsert();
+	XLogRegisterData((char *) &xlrec, sizeof(xlrec));
+	XLogInsert(RM_SMGR_ID, XLOG_SMGR_MARK | XLR_SPECIAL_REL_UPDATE);
+}
+
+/*
+ * Perform XLogInsert of an XLOG_SMGR_UNLINKMARK record to WAL.
+ */
+void
+log_smgrunlinkmark(const RelFileNode *rnode, ForkNumber forkNum,
+				   StorageMarks mark)
+{
+	xl_smgr_mark xlrec;
+
+	/*
+	 * Make an XLOG entry reporting the file creation.
+	 */
+	xlrec.rnode = *rnode;
+	xlrec.forkNum = forkNum;
+	xlrec.mark = mark;
+	xlrec.action = XLOG_SMGR_MARK_UNLINK;
+
+	XLogBeginInsert();
+	XLogRegisterData((char *) &xlrec, sizeof(xlrec));
+	XLogInsert(RM_SMGR_ID, XLOG_SMGR_MARK | XLR_SPECIAL_REL_UPDATE);
+}
+
+/*
+ * Perform XLogInsert of an XLOG_SMGR_BUFPERSISTENCE record to WAL.
+ */
+void
+log_smgrbufpersistence(const RelFileNode *rnode, bool persistence)
+{
+	xl_smgr_bufpersistence xlrec;
+
+	/*
+	 * Make an XLOG entry reporting the change of buffer persistence.
+	 */
+	xlrec.rnode = *rnode;
+	xlrec.persistence = persistence;
+
+	XLogBeginInsert();
+	XLogRegisterData((char *) &xlrec, sizeof(xlrec));
+	XLogInsert(RM_SMGR_ID, XLOG_SMGR_BUFPERSISTENCE | XLR_SPECIAL_REL_UPDATE);
+}
+
 /*
  * RelationDropStorage
  *		Schedule unlinking of physical storage at transaction commit.
@@ -710,6 +1026,95 @@ smgrDoPendingDeletes(bool isCommit)
 	}
 }
 
+/*
+ *	smgrDoPendingUnmark() -- Clean up work that emits WAL records
+ *
+ *  The operations handled in the function emits WAL records, which must be
+ *  part of the current transaction.
+ */
+void
+smgrDoPendingCleanups(bool isCommit)
+{
+	int			nestLevel = GetCurrentTransactionNestLevel();
+	PendingCleanup *pending;
+	PendingCleanup *prev;
+	PendingCleanup *next;
+
+	prev = NULL;
+	for (pending = pendingCleanups; pending != NULL; pending = next)
+	{
+		next = pending->next;
+		if (pending->nestLevel < nestLevel)
+		{
+			/* outer-level entries should not be processed yet */
+			prev = pending;
+		}
+		else
+		{
+			/* unlink list entry first, so we don't retry on failure */
+			if (prev)
+				prev->next = next;
+			else
+				pendingCleanups = next;
+
+			/* do cleanup if called for */
+			if (pending->atCommit == isCommit)
+			{
+				SMgrRelation srel;
+
+				srel = smgropen(pending->relnode, pending->backend);
+
+				Assert((pending->op &
+						~(PCOP_UNLINK_FORK | PCOP_UNLINK_MARK |
+						  PCOP_SET_PERSISTENCE)) == 0);
+
+				if (pending->op & PCOP_SET_PERSISTENCE)
+				{
+					SetRelationBuffersPersistence(srel, pending->bufpersistence,
+												  InRecovery);
+				}
+
+				if (pending->op & PCOP_UNLINK_FORK)
+				{
+					/*
+					 * Unlink the fork file. Currently we use this only for
+					 * init forks and we're sure that the init fork is not
+					 * loaded on shared buffers.  For RelationDropInitFork
+					 * case, the function dropped that buffers. For
+					 * RelationCreateInitFork case, PCOP_SET_PERSISTENCE(true)
+					 * is set and the buffers have been dropped just before.
+					 */
+					Assert(pending->unlink_forknum == INIT_FORKNUM);
+
+					/* Don't emit wal while recovery. */
+					if (!InRecovery)
+						log_smgrunlink(&pending->relnode,
+									   pending->unlink_forknum);
+					smgrunlink(srel, pending->unlink_forknum, false);
+				}
+
+				if (pending->op & PCOP_UNLINK_MARK)
+				{
+					SMgrRelation srel;
+
+					if (!InRecovery)
+						log_smgrunlinkmark(&pending->relnode,
+										   pending->unlink_forknum,
+										   pending->unlink_mark);
+					srel = smgropen(pending->relnode, pending->backend);
+					smgrunlinkmark(srel, pending->unlink_forknum,
+								   pending->unlink_mark, InRecovery);
+					smgrclose(srel);
+				}
+			}
+
+			/* must explicitly free the list entry */
+			pfree(pending);
+			/* prev does not change */
+		}
+	}
+}
+
 /*
  *	smgrDoPendingSyncs() -- Take care of relation syncs at end of xact.
  */
@@ -970,6 +1375,15 @@ smgr_redo(XLogReaderState *record)
 		reln = smgropen(xlrec->rnode, InvalidBackendId);
 		smgrcreate(reln, xlrec->forkNum, true);
 	}
+	else if (info == XLOG_SMGR_UNLINK)
+	{
+		xl_smgr_unlink *xlrec = (xl_smgr_unlink *) XLogRecGetData(record);
+		SMgrRelation reln;
+
+		reln = smgropen(xlrec->rnode, InvalidBackendId);
+		smgrunlink(reln, xlrec->forkNum, true);
+		smgrclose(reln);
+	}
 	else if (info == XLOG_SMGR_TRUNCATE)
 	{
 		xl_smgr_truncate *xlrec = (xl_smgr_truncate *) XLogRecGetData(record);
@@ -1058,6 +1472,124 @@ smgr_redo(XLogReaderState *record)
 
 		FreeFakeRelcacheEntry(rel);
 	}
+	else if (info == XLOG_SMGR_MARK)
+	{
+		xl_smgr_mark *xlrec = (xl_smgr_mark *) XLogRecGetData(record);
+		SMgrRelation reln;
+		PendingCleanup *pending;
+		bool		created = false;
+
+		reln = smgropen(xlrec->rnode, InvalidBackendId);
+
+		switch (xlrec->action)
+		{
+			case XLOG_SMGR_MARK_CREATE:
+				smgrcreatemark(reln, xlrec->forkNum, xlrec->mark, true);
+				created = true;
+				break;
+			case XLOG_SMGR_MARK_UNLINK:
+				smgrunlinkmark(reln, xlrec->forkNum, xlrec->mark, true);
+				break;
+			default:
+				elog(ERROR, "unknown smgr_mark action \"%c\"", xlrec->mark);
+		}
+
+		if (created)
+		{
+			/* revert mark file operation at abort */
+			pending = (PendingCleanup *)
+				MemoryContextAlloc(TopMemoryContext, sizeof(PendingCleanup));
+			pending->relnode = xlrec->rnode;
+			pending->op = PCOP_UNLINK_MARK;
+			pending->unlink_forknum = xlrec->forkNum;
+			pending->unlink_mark = xlrec->mark;
+			pending->backend = InvalidBackendId;
+			pending->atCommit = false;
+			pending->nestLevel = GetCurrentTransactionNestLevel();
+			pending->next = pendingCleanups;
+			pendingCleanups = pending;
+		}
+		else
+		{
+			/*
+			 * Delete pending action for this mark file if any. We should have
+			 * at most one entry for this action.
+			 */
+			PendingCleanup *prev = NULL;
+
+			for (pending = pendingCleanups; pending != NULL;
+				 pending = pending->next)
+			{
+				if (RelFileNodeEquals(xlrec->rnode, pending->relnode) &&
+					pending->unlink_forknum == xlrec->forkNum &&
+					(pending->op & PCOP_UNLINK_MARK) != 0)
+				{
+					if (prev)
+						prev->next = pending->next;
+					else
+						pendingCleanups = pending->next;
+
+					pfree(pending);
+					break;
+				}
+
+				prev = pending;
+			}
+		}
+	}
+	else if (info == XLOG_SMGR_BUFPERSISTENCE)
+	{
+		xl_smgr_bufpersistence *xlrec =
+		(xl_smgr_bufpersistence *) XLogRecGetData(record);
+		SMgrRelation reln;
+		PendingCleanup *pending;
+		PendingCleanup *prev = NULL;
+
+		reln = smgropen(xlrec->rnode, InvalidBackendId);
+		SetRelationBuffersPersistence(reln, xlrec->persistence, true);
+
+		/*
+		 * Delete pending action for persistence change if any. We should have
+		 * at most one entry for this action.
+		 */
+		for (pending = pendingCleanups; pending != NULL;
+			 pending = pending->next)
+		{
+			if (RelFileNodeEquals(xlrec->rnode, pending->relnode) &&
+				(pending->op & PCOP_SET_PERSISTENCE) != 0)
+			{
+				Assert(pending->bufpersistence == xlrec->persistence);
+
+				if (prev)
+					prev->next = pending->next;
+				else
+					pendingCleanups = pending->next;
+
+				pfree(pending);
+				break;
+			}
+
+			prev = pending;
+		}
+
+		/*
+		 * Revert buffer-persistence changes at abort if the relation is going
+		 * to different persistence from before this transaction.
+		 */
+		if (!pending)
+		{
+			pending = (PendingCleanup *)
+				MemoryContextAlloc(TopMemoryContext, sizeof(PendingCleanup));
+			pending->relnode = xlrec->rnode;
+			pending->op = PCOP_SET_PERSISTENCE;
+			pending->bufpersistence = !xlrec->persistence;
+			pending->backend = InvalidBackendId;
+			pending->atCommit = false;
+			pending->nestLevel = GetCurrentTransactionNestLevel();
+			pending->next = pendingCleanups;
+			pendingCleanups = pending;
+		}
+	}
 	else
 		elog(PANIC, "smgr_redo: unknown op code %u", info);
 }
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 51b4a00d50..71aaf3320a 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -55,6 +55,7 @@
 #include "commands/defrem.h"
 #include "commands/event_trigger.h"
 #include "commands/policy.h"
+#include "commands/progress.h"
 #include "commands/sequence.h"
 #include "commands/tablecmds.h"
 #include "commands/tablespace.h"
@@ -5374,6 +5375,187 @@ ATParseTransformCmd(List **wqueue, AlteredTableInfo *tab, Relation rel,
 	return newcmd;
 }
 
+/*
+ * RelationChangePersistence: do in-place persistence change of a relation
+ */
+static void
+RelationChangePersistence(AlteredTableInfo *tab, char persistence,
+						  LOCKMODE lockmode)
+{
+	Relation	rel;
+	Relation	classRel;
+	HeapTuple	tuple,
+				newtuple;
+	Datum		new_val[Natts_pg_class];
+	bool		new_null[Natts_pg_class],
+				new_repl[Natts_pg_class];
+	int			i;
+	List	   *relids;
+	ListCell   *lc_oid;
+
+	Assert(tab->rewrite == AT_REWRITE_ALTER_PERSISTENCE);
+	Assert(lockmode == AccessExclusiveLock);
+
+	/*
+	 * Under the following condition, we need to call ATRewriteTable, which
+	 * cannot be false in the AT_REWRITE_ALTER_PERSISTENCE case.
+	 */
+	Assert(tab->constraints == NULL && tab->partition_constraint == NULL &&
+		   tab->newvals == NULL && !tab->verify_new_notnull);
+
+	rel = table_open(tab->relid, lockmode);
+
+	Assert(rel->rd_rel->relpersistence != persistence);
+
+	elog(DEBUG1, "perform in-place persistence change");
+
+	/*
+	 * First we collect all relations that we need to change persistence.
+	 */
+
+	/* Collect OIDs of indexes and toast relations */
+	relids = RelationGetIndexList(rel);
+	relids = lcons_oid(rel->rd_id, relids);
+
+	/* Add toast relation if any */
+	if (OidIsValid(rel->rd_rel->reltoastrelid))
+	{
+		List	   *toastidx;
+		Relation	toastrel = table_open(rel->rd_rel->reltoastrelid, lockmode);
+
+		relids = lappend_oid(relids, rel->rd_rel->reltoastrelid);
+		toastidx = RelationGetIndexList(toastrel);
+		relids = list_concat(relids, toastidx);
+		pfree(toastidx);
+		table_close(toastrel, NoLock);
+	}
+
+	table_close(rel, NoLock);
+
+	/* Make changes in storage */
+	classRel = table_open(RelationRelationId, RowExclusiveLock);
+
+	foreach(lc_oid, relids)
+	{
+		Oid			reloid = lfirst_oid(lc_oid);
+		Relation	r = relation_open(reloid, lockmode);
+
+		/*
+		 * XXXX: Some access methods do not bear up an in-place persistence
+		 * change. Specifically, GiST uses page LSNs to figure out whether a
+		 * block has changed, where UNLOGGED GiST indexes use fake LSNs that
+		 * are incompatible with real LSNs used for LOGGED ones.
+		 *
+		 * Maybe if gistGetFakeLSN behaved the same way for permanent and
+		 * unlogged indexes, we could skip index rebuild in exchange of some
+		 * extra WAL records emitted while it is unlogged.
+		 *
+		 * Check relam against a positive list so that we take this way for
+		 * unknown AMs.
+		 */
+		if (r->rd_rel->relkind == RELKIND_INDEX &&
+		/* GiST is excluded */
+			r->rd_rel->relam != BTREE_AM_OID &&
+			r->rd_rel->relam != HASH_AM_OID &&
+			r->rd_rel->relam != GIN_AM_OID &&
+			r->rd_rel->relam != SPGIST_AM_OID &&
+			r->rd_rel->relam != BRIN_AM_OID)
+		{
+			int			reindex_flags;
+			ReindexParams params = {0};
+
+			/* reindex doesn't allow concurrent use of the index */
+			table_close(r, NoLock);
+
+			reindex_flags =
+				REINDEX_REL_SUPPRESS_INDEX_USE |
+				REINDEX_REL_CHECK_CONSTRAINTS;
+
+			/* Set the same persistence with the parent relation. */
+			if (persistence == RELPERSISTENCE_UNLOGGED)
+				reindex_flags |= REINDEX_REL_FORCE_INDEXES_UNLOGGED;
+			else
+				reindex_flags |= REINDEX_REL_FORCE_INDEXES_PERMANENT;
+
+			reindex_index(reloid, reindex_flags, persistence, &params);
+
+			continue;
+		}
+
+		/* Create or drop init fork */
+		if (persistence == RELPERSISTENCE_UNLOGGED)
+			RelationCreateInitFork(r);
+		else
+			RelationDropInitFork(r);
+
+		/*
+		 * When this relation gets WAL-logged, immediately sync all files but
+		 * initfork to establish the initial state on storage.  Buffers have
+		 * already flushed out by RelationCreate(Drop)InitFork called just
+		 * above. Initfork should have been synced as needed.
+		 */
+		if (persistence == RELPERSISTENCE_PERMANENT)
+		{
+			for (i = 0; i < INIT_FORKNUM; i++)
+			{
+				if (smgrexists(RelationGetSmgr(r), i))
+					smgrimmedsync(RelationGetSmgr(r), i);
+			}
+		}
+
+		/* Update catalog */
+		tuple = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(reloid));
+		if (!HeapTupleIsValid(tuple))
+			elog(ERROR, "cache lookup failed for relation %u", reloid);
+
+		memset(new_val, 0, sizeof(new_val));
+		memset(new_null, false, sizeof(new_null));
+		memset(new_repl, false, sizeof(new_repl));
+
+		new_val[Anum_pg_class_relpersistence - 1] = CharGetDatum(persistence);
+		new_null[Anum_pg_class_relpersistence - 1] = false;
+		new_repl[Anum_pg_class_relpersistence - 1] = true;
+
+		newtuple = heap_modify_tuple(tuple, RelationGetDescr(classRel),
+									 new_val, new_null, new_repl);
+
+		CatalogTupleUpdate(classRel, &newtuple->t_self, newtuple);
+		heap_freetuple(newtuple);
+
+		/*
+		 * While wal_level >= replica, switching to LOGGED requires the
+		 * relation content to be WAL-logged to recover the table. We don't
+		 * emit this if wal_level = minimal.
+		 */
+		if (persistence == RELPERSISTENCE_PERMANENT && XLogIsNeeded())
+		{
+			ForkNumber	fork;
+			xl_smgr_truncate xlrec;
+
+			xlrec.blkno = 0;
+			xlrec.rnode = r->rd_node;
+			xlrec.flags = SMGR_TRUNCATE_ALL;
+
+			XLogBeginInsert();
+			XLogRegisterData((char *) &xlrec, sizeof(xlrec));
+
+			XLogInsert(RM_SMGR_ID, XLOG_SMGR_TRUNCATE | XLR_SPECIAL_REL_UPDATE);
+
+			for (fork = 0; fork < INIT_FORKNUM; fork++)
+			{
+				if (smgrexists(RelationGetSmgr(r), fork))
+					log_newpage_range(r, fork, 0,
+									  smgrnblocks(RelationGetSmgr(r), fork),
+									  false);
+			}
+		}
+
+		table_close(r, NoLock);
+	}
+
+	table_close(classRel, NoLock);
+}
+
 /*
  * ATRewriteTables: ALTER TABLE phase 3
  */
@@ -5504,47 +5686,55 @@ ATRewriteTables(AlterTableStmt *parsetree, List **wqueue, LOCKMODE lockmode,
 										 tab->relid,
 										 tab->rewrite);
 
-			/*
-			 * Create transient table that will receive the modified data.
-			 *
-			 * Ensure it is marked correctly as logged or unlogged.  We have
-			 * to do this here so that buffers for the new relfilenode will
-			 * have the right persistence set, and at the same time ensure
-			 * that the original filenode's buffers will get read in with the
-			 * correct setting (i.e. the original one).  Otherwise a rollback
-			 * after the rewrite would possibly result with buffers for the
-			 * original filenode having the wrong persistence setting.
-			 *
-			 * NB: This relies on swap_relation_files() also swapping the
-			 * persistence. That wouldn't work for pg_class, but that can't be
-			 * unlogged anyway.
-			 */
-			OIDNewHeap = make_new_heap(tab->relid, NewTableSpace, NewAccessMethod,
-									   persistence, lockmode);
+			if (tab->rewrite == AT_REWRITE_ALTER_PERSISTENCE)
+				RelationChangePersistence(tab, persistence, lockmode);
+			else
+			{
+				/*
+				 * Create transient table that will receive the modified data.
+				 *
+				 * Ensure it is marked correctly as logged or unlogged.  We
+				 * have to do this here so that buffers for the new
+				 * relfilenode will have the right persistence set, and at the
+				 * same time ensure that the original filenode's buffers will
+				 * get read in with the correct setting (i.e. the original
+				 * one).  Otherwise a rollback after the rewrite would
+				 * possibly result with buffers for the original filenode
+				 * having the wrong persistence setting.
+				 *
+				 * NB: This relies on swap_relation_files() also swapping the
+				 * persistence. That wouldn't work for pg_class, but that
+				 * can't be unlogged anyway.
+				 */
+				OIDNewHeap = make_new_heap(tab->relid, NewTableSpace,
+										   NewAccessMethod,
+										   persistence, lockmode);
 
-			/*
-			 * Copy the heap data into the new table with the desired
-			 * modifications, and test the current data within the table
-			 * against new constraints generated by ALTER TABLE commands.
-			 */
-			ATRewriteTable(tab, OIDNewHeap, lockmode);
+				/*
+				 * Copy the heap data into the new table with the desired
+				 * modifications, and test the current data within the table
+				 * against new constraints generated by ALTER TABLE commands.
+				 */
+				ATRewriteTable(tab, OIDNewHeap, lockmode);
 
-			/*
-			 * Swap the physical files of the old and new heaps, then rebuild
-			 * indexes and discard the old heap.  We can use RecentXmin for
-			 * the table's new relfrozenxid because we rewrote all the tuples
-			 * in ATRewriteTable, so no older Xid remains in the table.  Also,
-			 * we never try to swap toast tables by content, since we have no
-			 * interest in letting this code work on system catalogs.
-			 */
-			finish_heap_swap(tab->relid, OIDNewHeap,
-							 false, false, true,
-							 !OidIsValid(tab->newTableSpace),
-							 RecentXmin,
-							 ReadNextMultiXactId(),
-							 persistence);
+				/*
+				 * Swap the physical files of the old and new heaps, then
+				 * rebuild indexes and discard the old heap.  We can use
+				 * RecentXmin for the table's new relfrozenxid because we
+				 * rewrote all the tuples in ATRewriteTable, so no older Xid
+				 * remains in the table.  Also, we never try to swap toast
+				 * tables by content, since we have no interest in letting
+				 * this code work on system catalogs.
+				 */
+				finish_heap_swap(tab->relid, OIDNewHeap,
+								 false, false, true,
+								 !OidIsValid(tab->newTableSpace),
+								 RecentXmin,
+								 ReadNextMultiXactId(),
+								 persistence);
 
-			InvokeObjectPostAlterHook(RelationRelationId, tab->relid, 0);
+				InvokeObjectPostAlterHook(RelationRelationId, tab->relid, 0);
+			}
 		}
 		else
 		{
diff --git a/src/backend/replication/basebackup.c b/src/backend/replication/basebackup.c
index 6884cad2c0..c67bae34f5 100644
--- a/src/backend/replication/basebackup.c
+++ b/src/backend/replication/basebackup.c
@@ -1188,6 +1188,7 @@ sendDir(bbsink *sink, const char *path, int basepathlen, bool sizeonly,
 		bool		excludeFound;
 		ForkNumber	relForkNum; /* Type of fork if file is a relation */
 		int			relOidChars;	/* Chars in filename that are the rel oid */
+		StorageMarks mark;
 
 		/* Skip special stuff */
 		if (strcmp(de->d_name, ".") == 0 || strcmp(de->d_name, "..") == 0)
@@ -1238,7 +1239,7 @@ sendDir(bbsink *sink, const char *path, int basepathlen, bool sizeonly,
 		/* Exclude all forks for unlogged tables except the init fork */
 		if (isDbDir &&
 			parse_filename_for_nontemp_relation(de->d_name, &relOidChars,
-												&relForkNum))
+												&relForkNum, &mark))
 		{
 			/* Never exclude init forks */
 			if (relForkNum != INIT_FORKNUM)
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index d73a40c1bc..01974b71d2 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -3159,6 +3159,91 @@ DropRelFileNodeBuffers(SMgrRelation smgr_reln, ForkNumber *forkNum,
 	}
 }
 
+/* ---------------------------------------------------------------------
+ *		SetRelFileNodeBuffersPersistence
+ *
+ *		This function changes the persistence of all buffer pages of a relation
+ *		then writes all dirty pages of the relation out to disk when switching
+ *		to PERMANENT. (or more accurately, out to kernel disk buffers),
+ *		ensuring that the kernel has an up-to-date view of the relation.
+ *
+ *		Generally, the caller should be holding AccessExclusiveLock on the
+ *		target relation to ensure that no other backend is busy dirtying
+ *		more blocks of the relation; the effects can't be expected to last
+ *		after the lock is released.
+ *
+ *		XXX currently it sequentially searches the buffer pool, should be
+ *		changed to more clever ways of searching.  This routine is not
+ *		used in any performance-critical code paths, so it's not worth
+ *		adding additional overhead to normal paths to make it go faster;
+ *		but see also DropRelFileNodeBuffers.
+ * --------------------------------------------------------------------
+ */
+void
+SetRelationBuffersPersistence(SMgrRelation srel, bool permanent, bool isRedo)
+{
+	int			i;
+	RelFileNodeBackend rnode = srel->smgr_rnode;
+
+	Assert(!RelFileNodeBackendIsTemp(rnode));
+
+	if (!isRedo)
+		log_smgrbufpersistence(&srel->smgr_rnode.node, permanent);
+
+	ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
+
+	for (i = 0; i < NBuffers; i++)
+	{
+		BufferDesc *bufHdr = GetBufferDescriptor(i);
+		uint32		buf_state;
+
+		if (!RelFileNodeEquals(bufHdr->tag.rnode, rnode.node))
+			continue;
+
+		ReservePrivateRefCountEntry();
+
+		buf_state = LockBufHdr(bufHdr);
+
+		if (!RelFileNodeEquals(bufHdr->tag.rnode, rnode.node))
+		{
+			UnlockBufHdr(bufHdr, buf_state);
+			continue;
+		}
+
+		if (permanent)
+		{
+			/* Init fork is being dropped, drop buffers for it. */
+			if (bufHdr->tag.forkNum == INIT_FORKNUM)
+			{
+				InvalidateBuffer(bufHdr);
+				continue;
+			}
+
+			buf_state |= BM_PERMANENT;
+			pg_atomic_write_u32(&bufHdr->state, buf_state);
+
+			/* we flush this buffer when switching to PERMANENT */
+			if ((buf_state & (BM_VALID | BM_DIRTY)) == (BM_VALID | BM_DIRTY))
+			{
+				PinBuffer_Locked(bufHdr);
+				LWLockAcquire(BufferDescriptorGetContentLock(bufHdr),
+							  LW_SHARED);
+				FlushBuffer(bufHdr, srel);
+				LWLockRelease(BufferDescriptorGetContentLock(bufHdr));
+				UnpinBuffer(bufHdr, true);
+			}
+			else
+				UnlockBufHdr(bufHdr, buf_state);
+		}
+		else
+		{
+			/* There shouldn't be an init fork */
+			Assert(bufHdr->tag.forkNum != INIT_FORKNUM);
+			UnlockBufHdr(bufHdr, buf_state);
+		}
+	}
+}
+
 /* ---------------------------------------------------------------------
  *		DropRelFileNodesAllBuffers
  *
diff --git a/src/backend/storage/file/fd.c b/src/backend/storage/file/fd.c
index 14b77f2861..2fc9f17c28 100644
--- a/src/backend/storage/file/fd.c
+++ b/src/backend/storage/file/fd.c
@@ -349,8 +349,6 @@ static void pre_sync_fname(const char *fname, bool isdir, int elevel);
 static void datadir_fsync_fname(const char *fname, bool isdir, int elevel);
 static void unlink_if_exists_fname(const char *fname, bool isdir, int elevel);
 
-static int	fsync_parent_path(const char *fname, int elevel);
-
 
 /*
  * pg_fsync --- do fsync with or without writethrough
@@ -3759,7 +3757,7 @@ fsync_fname_ext(const char *fname, bool isdir, bool ignore_perm, int elevel)
  * This is aimed at making file operations persistent on disk in case of
  * an OS crash or power failure.
  */
-static int
+int
 fsync_parent_path(const char *fname, int elevel)
 {
 	char		parentpath[MAXPGPATH];
diff --git a/src/backend/storage/file/reinit.c b/src/backend/storage/file/reinit.c
index f053fe0495..fa4b1c0e6e 100644
--- a/src/backend/storage/file/reinit.c
+++ b/src/backend/storage/file/reinit.c
@@ -16,29 +16,49 @@
 
 #include <unistd.h>
 
+#include "access/xlogrecovery.h"
+#include "catalog/pg_tablespace_d.h"
 #include "common/relpath.h"
 #include "postmaster/startup.h"
+#include "storage/bufmgr.h"
 #include "storage/copydir.h"
 #include "storage/fd.h"
+#include "storage/md.h"
 #include "storage/reinit.h"
+#include "storage/smgr.h"
 #include "utils/hsearch.h"
 #include "utils/memutils.h"
 
 static void ResetUnloggedRelationsInTablespaceDir(const char *tsdirname,
-												  int op);
+												  Oid tspid, int op);
 static void ResetUnloggedRelationsInDbspaceDir(const char *dbspacedirname,
-											   int op);
+											   Oid tspid, Oid dbid, int op);
 
 typedef struct
 {
 	Oid			reloid;			/* hash key */
-} unlogged_relation_entry;
+	bool		has_init;		/* has INIT fork */
+	bool		dirty_init;		/* needs to remove INIT fork */
+	bool		dirty_all;		/* needs to remove all forks */
+} relfile_entry;
 
 /*
- * Reset unlogged relations from before the last restart.
+ * Clean up and reset relation files from before the last restart.
  *
- * If op includes UNLOGGED_RELATION_CLEANUP, we remove all forks of any
- * relation with an "init" fork, except for the "init" fork itself.
+ * If op includes UNLOGGED_RELATION_CLEANUP, we perform different operations
+ * depending on the existence of the "cleanup" forks.
+ *
+ * If SMGR_MARK_UNCOMMITTED mark file for init fork is present, we remove the
+ * init fork along with the mark file.
+ *
+ * If SMGR_MARK_UNCOMMITTED mark file for main fork is present we remove the
+ * whole relation along with the mark file.
+ *
+ * Otherwise, if the "init" fork is found.  we remove all forks of any relation
+ * with the "init" fork, except for the "init" fork itself.
+ *
+ * If op includes UNLOGGED_RELATION_DROP_BUFFER, we drop all buffers for all
+ * relations that have the "cleanup" and/or the "init" forks.
  *
  * If op includes UNLOGGED_RELATION_INIT, we copy the "init" fork to the main
  * fork.
@@ -72,7 +92,7 @@ ResetUnloggedRelations(int op)
 	/*
 	 * First process unlogged files in pg_default ($PGDATA/base)
 	 */
-	ResetUnloggedRelationsInTablespaceDir("base", op);
+	ResetUnloggedRelationsInTablespaceDir("base", DEFAULTTABLESPACE_OID, op);
 
 	/*
 	 * Cycle through directories for all non-default tablespaces.
@@ -81,13 +101,19 @@ ResetUnloggedRelations(int op)
 
 	while ((spc_de = ReadDir(spc_dir, "pg_tblspc")) != NULL)
 	{
+		Oid			tspid;
+
 		if (strcmp(spc_de->d_name, ".") == 0 ||
 			strcmp(spc_de->d_name, "..") == 0)
 			continue;
 
 		snprintf(temp_path, sizeof(temp_path), "pg_tblspc/%s/%s",
 				 spc_de->d_name, TABLESPACE_VERSION_DIRECTORY);
-		ResetUnloggedRelationsInTablespaceDir(temp_path, op);
+
+		tspid = atooid(spc_de->d_name);
+
+		Assert(tspid != 0);
+		ResetUnloggedRelationsInTablespaceDir(temp_path, tspid, op);
 	}
 
 	FreeDir(spc_dir);
@@ -103,7 +129,8 @@ ResetUnloggedRelations(int op)
  * Process one tablespace directory for ResetUnloggedRelations
  */
 static void
-ResetUnloggedRelationsInTablespaceDir(const char *tsdirname, int op)
+ResetUnloggedRelationsInTablespaceDir(const char *tsdirname,
+									  Oid tspid, int op)
 {
 	DIR		   *ts_dir;
 	struct dirent *de;
@@ -130,6 +157,8 @@ ResetUnloggedRelationsInTablespaceDir(const char *tsdirname, int op)
 
 	while ((de = ReadDir(ts_dir, tsdirname)) != NULL)
 	{
+		Oid			dbid;
+
 		/*
 		 * We're only interested in the per-database directories, which have
 		 * numeric names.  Note that this code will also (properly) ignore "."
@@ -148,7 +177,10 @@ ResetUnloggedRelationsInTablespaceDir(const char *tsdirname, int op)
 			ereport_startup_progress("resetting unlogged relations (cleanup), elapsed time: %ld.%02d s, current path: %s",
 									 dbspace_path);
 
-		ResetUnloggedRelationsInDbspaceDir(dbspace_path, op);
+		dbid = atooid(de->d_name);
+		Assert(dbid != 0);
+
+		ResetUnloggedRelationsInDbspaceDir(dbspace_path, tspid, dbid, op);
 	}
 
 	FreeDir(ts_dir);
@@ -158,125 +190,227 @@ ResetUnloggedRelationsInTablespaceDir(const char *tsdirname, int op)
  * Process one per-dbspace directory for ResetUnloggedRelations
  */
 static void
-ResetUnloggedRelationsInDbspaceDir(const char *dbspacedirname, int op)
+ResetUnloggedRelationsInDbspaceDir(const char *dbspacedirname,
+								   Oid tspid, Oid dbid, int op)
 {
 	DIR		   *dbspace_dir;
 	struct dirent *de;
 	char		rm_path[MAXPGPATH * 2];
+	HTAB	   *hash;
+	HASHCTL		ctl;
 
 	/* Caller must specify at least one operation. */
-	Assert((op & (UNLOGGED_RELATION_CLEANUP | UNLOGGED_RELATION_INIT)) != 0);
+	Assert((op & (UNLOGGED_RELATION_CLEANUP |
+				  UNLOGGED_RELATION_DROP_BUFFER |
+				  UNLOGGED_RELATION_INIT)) != 0);
 
 	/*
 	 * Cleanup is a two-pass operation.  First, we go through and identify all
 	 * the files with init forks.  Then, we go through again and nuke
 	 * everything with the same OID except the init fork.
 	 */
+
+	/*
+	 * It's possible that someone could create tons of unlogged relations in
+	 * the same database & tablespace, so we'd better use a hash table rather
+	 * than an array or linked list to keep track of which files need to be
+	 * reset.  Otherwise, this cleanup operation would be O(n^2).
+	 */
+	memset(&ctl, 0, sizeof(ctl));
+	ctl.keysize = sizeof(Oid);
+	ctl.entrysize = sizeof(relfile_entry);
+	hash = hash_create("relfilenode cleanup hash",
+					   32, &ctl, HASH_ELEM | HASH_BLOBS);
+
+	/* Collect INIT fork and mark files in the directory. */
+	dbspace_dir = AllocateDir(dbspacedirname);
+	while ((de = ReadDir(dbspace_dir, dbspacedirname)) != NULL)
+	{
+		int			oidchars;
+		ForkNumber	forkNum;
+		StorageMarks mark;
+
+		/* Skip anything that doesn't look like a relation data file. */
+		if (!parse_filename_for_nontemp_relation(de->d_name, &oidchars,
+												 &forkNum, &mark))
+			continue;
+
+		if (forkNum == INIT_FORKNUM || mark == SMGR_MARK_UNCOMMITTED)
+		{
+			Oid			key;
+			relfile_entry *ent;
+			bool		found;
+
+			/*
+			 * Record the relfilenode information. If it has
+			 * SMGR_MARK_UNCOMMITTED mark files, the relfilenode is in dirty
+			 * state, where clean up is needed.
+			 */
+			key = atooid(de->d_name);
+			ent = hash_search(hash, &key, HASH_ENTER, &found);
+
+			if (!found)
+			{
+				ent->has_init = false;
+				ent->dirty_init = false;
+				ent->dirty_all = false;
+			}
+
+			if (forkNum == INIT_FORKNUM && mark == SMGR_MARK_UNCOMMITTED)
+				ent->dirty_init = true;
+			else if (forkNum == MAIN_FORKNUM && mark == SMGR_MARK_UNCOMMITTED)
+				ent->dirty_all = true;
+			else
+			{
+				Assert(forkNum == INIT_FORKNUM);
+				ent->has_init = true;
+			}
+		}
+	}
+
+	/* Done with the first pass. */
+	FreeDir(dbspace_dir);
+
+	/* nothing to do if we don't have init nor cleanup forks */
+	if (hash_get_num_entries(hash) < 1)
+	{
+		hash_destroy(hash);
+		return;
+	}
+
+	if ((op & UNLOGGED_RELATION_DROP_BUFFER) != 0)
+	{
+		/*
+		 * When we come here after recovery, smgr object for this file might
+		 * have been created. In that case we need to drop all buffers then
+		 * the smgr object before initializing the unlogged relation.  This is
+		 * safe as far as no other backends have accessed the relation before
+		 * starting archive recovery.
+		 */
+		HASH_SEQ_STATUS status;
+		relfile_entry *ent;
+		SMgrRelation *srels = palloc(sizeof(SMgrRelation) * 8);
+		int			maxrels = 8;
+		int			nrels = 0;
+		int			i;
+
+		Assert(!HotStandbyActive());
+
+		hash_seq_init(&status, hash);
+		while ((ent = (relfile_entry *) hash_seq_search(&status)) != NULL)
+		{
+			RelFileNodeBackend rel;
+
+			/*
+			 * The relation is persistent and stays persistent. Don't drop the
+			 * buffers for this relation.
+			 */
+			if (ent->has_init && ent->dirty_init)
+				continue;
+
+			if (maxrels <= nrels)
+			{
+				maxrels *= 2;
+				srels = repalloc(srels, sizeof(SMgrRelation) * maxrels);
+			}
+
+			rel.backend = InvalidBackendId;
+			rel.node.spcNode = tspid;
+			rel.node.dbNode = dbid;
+			rel.node.relNode = ent->reloid;
+
+			srels[nrels++] = smgropen(rel.node, InvalidBackendId);
+		}
+
+		DropRelFileNodesAllBuffers(srels, nrels);
+
+		for (i = 0; i < nrels; i++)
+			smgrclose(srels[i]);
+	}
+
+	/*
+	 * Now, make a second pass and remove anything that matches.
+	 */
 	if ((op & UNLOGGED_RELATION_CLEANUP) != 0)
 	{
-		HTAB	   *hash;
-		HASHCTL		ctl;
-
-		/*
-		 * It's possible that someone could create a ton of unlogged relations
-		 * in the same database & tablespace, so we'd better use a hash table
-		 * rather than an array or linked list to keep track of which files
-		 * need to be reset.  Otherwise, this cleanup operation would be
-		 * O(n^2).
-		 */
-		ctl.keysize = sizeof(Oid);
-		ctl.entrysize = sizeof(unlogged_relation_entry);
-		ctl.hcxt = CurrentMemoryContext;
-		hash = hash_create("unlogged relation OIDs", 32, &ctl,
-						   HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
-
-		/* Scan the directory. */
 		dbspace_dir = AllocateDir(dbspacedirname);
 		while ((de = ReadDir(dbspace_dir, dbspacedirname)) != NULL)
 		{
 			ForkNumber	forkNum;
+			StorageMarks mark;
 			int			oidchars;
-			unlogged_relation_entry ent;
+			Oid			key;
+			relfile_entry *ent;
+			RelFileNodeBackend rel;
 
 			/* Skip anything that doesn't look like a relation data file. */
 			if (!parse_filename_for_nontemp_relation(de->d_name, &oidchars,
-													 &forkNum))
-				continue;
-
-			/* Also skip it unless this is the init fork. */
-			if (forkNum != INIT_FORKNUM)
-				continue;
-
-			/*
-			 * Put the OID portion of the name into the hash table, if it
-			 * isn't already.
-			 */
-			ent.reloid = atooid(de->d_name);
-			(void) hash_search(hash, &ent, HASH_ENTER, NULL);
-		}
-
-		/* Done with the first pass. */
-		FreeDir(dbspace_dir);
-
-		/*
-		 * If we didn't find any init forks, there's no point in continuing;
-		 * we can bail out now.
-		 */
-		if (hash_get_num_entries(hash) == 0)
-		{
-			hash_destroy(hash);
-			return;
-		}
-
-		/*
-		 * Now, make a second pass and remove anything that matches.
-		 */
-		dbspace_dir = AllocateDir(dbspacedirname);
-		while ((de = ReadDir(dbspace_dir, dbspacedirname)) != NULL)
-		{
-			ForkNumber	forkNum;
-			int			oidchars;
-			unlogged_relation_entry ent;
-
-			/* Skip anything that doesn't look like a relation data file. */
-			if (!parse_filename_for_nontemp_relation(de->d_name, &oidchars,
-													 &forkNum))
-				continue;
-
-			/* We never remove the init fork. */
-			if (forkNum == INIT_FORKNUM)
+													 &forkNum, &mark))
 				continue;
 
 			/*
 			 * See whether the OID portion of the name shows up in the hash
 			 * table.  If so, nuke it!
 			 */
-			ent.reloid = atooid(de->d_name);
-			if (hash_search(hash, &ent, HASH_FIND, NULL))
+			key = atooid(de->d_name);
+			ent = hash_search(hash, &key, HASH_FIND, NULL);
+
+			if (!ent)
+				continue;
+
+			if (!ent->dirty_all)
 			{
-				snprintf(rm_path, sizeof(rm_path), "%s/%s",
-						 dbspacedirname, de->d_name);
-				if (unlink(rm_path) < 0)
-					ereport(ERROR,
-							(errcode_for_file_access(),
-							 errmsg("could not remove file \"%s\": %m",
-									rm_path)));
+				/* clean permanent relations don't need cleanup */
+				if (!ent->has_init)
+					continue;
+
+				if (ent->dirty_init)
+				{
+					/*
+					 * The crashed transaction did SET UNLOGGED. This relation
+					 * is restored to a LOGGED relation.
+					 */
+					if (forkNum != INIT_FORKNUM)
+						continue;
+				}
 				else
-					elog(DEBUG2, "unlinked file \"%s\"", rm_path);
+				{
+					/*
+					 * we don't remove the INIT fork of a non-dirty
+					 * relfilenode
+					 */
+					if (forkNum == INIT_FORKNUM && mark == SMGR_MARK_NONE)
+						continue;
+				}
 			}
+
+			/* so, nuke it! */
+			snprintf(rm_path, sizeof(rm_path), "%s/%s",
+					 dbspacedirname, de->d_name);
+			if (unlink(rm_path) < 0)
+				ereport(ERROR,
+						errcode_for_file_access(),
+						errmsg("could not remove file \"%s\": %m",
+							   rm_path));
+
+			rel.backend = InvalidBackendId;
+			rel.node.spcNode = tspid;
+			rel.node.dbNode = dbid;
+			rel.node.relNode = atooid(de->d_name);
+
+			ForgetRelationForkSyncRequests(rel, forkNum);
 		}
 
 		/* Cleanup is complete. */
 		FreeDir(dbspace_dir);
-		hash_destroy(hash);
 	}
 
+	hash_destroy(hash);
+	hash = NULL;
+
 	/*
 	 * Initialization happens after cleanup is complete: we copy each init
-	 * fork file to the corresponding main fork file.  Note that if we are
-	 * asked to do both cleanup and init, we may never get here: if the
-	 * cleanup code determines that there are no init forks in this dbspace,
-	 * it will return before we get to this point.
+	 * fork file to the corresponding main fork file.
 	 */
 	if ((op & UNLOGGED_RELATION_INIT) != 0)
 	{
@@ -285,6 +419,7 @@ ResetUnloggedRelationsInDbspaceDir(const char *dbspacedirname, int op)
 		while ((de = ReadDir(dbspace_dir, dbspacedirname)) != NULL)
 		{
 			ForkNumber	forkNum;
+			StorageMarks mark;
 			int			oidchars;
 			char		oidbuf[OIDCHARS + 1];
 			char		srcpath[MAXPGPATH * 2];
@@ -292,9 +427,11 @@ ResetUnloggedRelationsInDbspaceDir(const char *dbspacedirname, int op)
 
 			/* Skip anything that doesn't look like a relation data file. */
 			if (!parse_filename_for_nontemp_relation(de->d_name, &oidchars,
-													 &forkNum))
+													 &forkNum, &mark))
 				continue;
 
+			Assert(mark == SMGR_MARK_NONE);
+
 			/* Also skip it unless this is the init fork. */
 			if (forkNum != INIT_FORKNUM)
 				continue;
@@ -328,15 +465,18 @@ ResetUnloggedRelationsInDbspaceDir(const char *dbspacedirname, int op)
 		while ((de = ReadDir(dbspace_dir, dbspacedirname)) != NULL)
 		{
 			ForkNumber	forkNum;
+			StorageMarks mark;
 			int			oidchars;
 			char		oidbuf[OIDCHARS + 1];
 			char		mainpath[MAXPGPATH];
 
 			/* Skip anything that doesn't look like a relation data file. */
 			if (!parse_filename_for_nontemp_relation(de->d_name, &oidchars,
-													 &forkNum))
+													 &forkNum, &mark))
 				continue;
 
+			Assert(mark == SMGR_MARK_NONE);
+
 			/* Also skip it unless this is the init fork. */
 			if (forkNum != INIT_FORKNUM)
 				continue;
@@ -379,7 +519,7 @@ ResetUnloggedRelationsInDbspaceDir(const char *dbspacedirname, int op)
  */
 bool
 parse_filename_for_nontemp_relation(const char *name, int *oidchars,
-									ForkNumber *fork)
+									ForkNumber *fork, StorageMarks *mark)
 {
 	int			pos;
 
@@ -410,11 +550,19 @@ parse_filename_for_nontemp_relation(const char *name, int *oidchars,
 
 		for (segchar = 1; isdigit((unsigned char) name[pos + segchar]); ++segchar)
 			;
-		if (segchar <= 1)
-			return false;
-		pos += segchar;
+		if (segchar > 1)
+			pos += segchar;
 	}
 
+	/* mark file? */
+	if (name[pos] == '.' && name[pos + 1] != 0)
+	{
+		*mark = name[pos + 1];
+		pos += 2;
+	}
+	else
+		*mark = SMGR_MARK_NONE;
+
 	/* Now we should be at the end. */
 	if (name[pos] != '\0')
 		return false;
diff --git a/src/backend/storage/smgr/md.c b/src/backend/storage/smgr/md.c
index 879f647dbc..692508ea98 100644
--- a/src/backend/storage/smgr/md.c
+++ b/src/backend/storage/smgr/md.c
@@ -139,7 +139,8 @@ static MdfdVec *_mdfd_getseg(SMgrRelation reln, ForkNumber forkno,
 							 BlockNumber blkno, bool skipFsync, int behavior);
 static BlockNumber _mdnblocks(SMgrRelation reln, ForkNumber forknum,
 							  MdfdVec *seg);
-
+static bool mdmarkexists(SMgrRelation reln, ForkNumber forkNum,
+						 StorageMarks mark);
 
 /*
  *	mdinit() -- Initialize private state for magnetic disk storage manager.
@@ -169,6 +170,82 @@ mdexists(SMgrRelation reln, ForkNumber forkNum)
 	return (mdopenfork(reln, forkNum, EXTENSION_RETURN_NULL) != NULL);
 }
 
+/*
+ *  mdcreatemark() -- Create a mark file.
+ *
+ * If isRedo is true, it's okay for the file to exist already.
+ */
+void
+mdcreatemark(SMgrRelation reln, ForkNumber forkNum, StorageMarks mark,
+			 bool isRedo)
+{
+	char	   *path = markpath(reln->smgr_rnode, forkNum, mark);
+	int			fd;
+
+	/* See mdcreate for details.. */
+	TablespaceCreateDbspace(reln->smgr_rnode.node.spcNode,
+							reln->smgr_rnode.node.dbNode,
+							isRedo);
+
+	fd = BasicOpenFile(path, O_WRONLY | O_CREAT | O_EXCL);
+	if (fd < 0 && (!isRedo || errno != EEXIST))
+		ereport(ERROR,
+				errcode_for_file_access(),
+				errmsg("could not create mark file \"%s\": %m", path));
+
+	pg_fsync(fd);
+	close(fd);
+
+	/*
+	 * To guarantee that the creation of the file is persistent, fsync its
+	 * parent directory.
+	 */
+	fsync_parent_path(path, ERROR);
+
+	pfree(path);
+}
+
+
+/*
+ *  mdunlinkmark()  -- Delete the mark file
+ *
+ * If isRedo is true, it's okay for the file being not found.
+ */
+void
+mdunlinkmark(SMgrRelation reln, ForkNumber forkNum, StorageMarks mark,
+			 bool isRedo)
+{
+	char	   *path = markpath(reln->smgr_rnode, forkNum, mark);
+
+	if (!isRedo || mdmarkexists(reln, forkNum, mark))
+		durable_unlink(path, ERROR);
+
+	pfree(path);
+}
+
+/*
+ *  mdmarkexists()  -- Check if the file exists.
+ */
+static bool
+mdmarkexists(SMgrRelation reln, ForkNumber forkNum, StorageMarks mark)
+{
+	char	   *path = markpath(reln->smgr_rnode, forkNum, mark);
+	int			fd;
+
+	fd = BasicOpenFile(path, O_RDONLY);
+	if (fd < 0 && errno != ENOENT)
+		ereport(ERROR,
+				errcode_for_file_access(),
+				errmsg("could not access mark file \"%s\": %m", path));
+	pfree(path);
+
+	if (fd < 0)
+		return false;
+
+	close(fd);
+	return true;
+}
+
 /*
  *	mdcreate() -- Create a new relation on magnetic disk.
  *
@@ -1031,6 +1108,15 @@ register_forget_request(RelFileNodeBackend rnode, ForkNumber forknum,
 	RegisterSyncRequest(&tag, SYNC_FORGET_REQUEST, true /* retryOnError */ );
 }
 
+/*
+ * ForgetRelationForkSyncRequests -- forget any fsyncs and unlinks for a fork
+ */
+void
+ForgetRelationForkSyncRequests(RelFileNodeBackend rnode, ForkNumber forknum)
+{
+	register_forget_request(rnode, forknum, 0);
+}
+
 /*
  * ForgetDatabaseSyncRequests -- forget any fsyncs and unlinks for a DB
  */
@@ -1384,12 +1470,14 @@ mdsyncfiletag(const FileTag *ftag, char *path)
  * Return 0 on success, -1 on failure, with errno set.
  */
 int
-mdunlinkfiletag(const FileTag *ftag, char *path)
+mdunlinkfiletag(const FileTag *ftag, char *path, StorageMarks mark)
 {
 	char	   *p;
 
 	/* Compute the path. */
-	p = relpathperm(ftag->rnode, MAIN_FORKNUM);
+	p = GetRelationPath(ftag->rnode.dbNode, ftag->rnode.spcNode,
+						ftag->rnode.relNode, InvalidBackendId, MAIN_FORKNUM,
+						mark);
 	strlcpy(path, p, MAXPGPATH);
 	pfree(p);
 
diff --git a/src/backend/storage/smgr/smgr.c b/src/backend/storage/smgr/smgr.c
index d71a557a35..0710e8b145 100644
--- a/src/backend/storage/smgr/smgr.c
+++ b/src/backend/storage/smgr/smgr.c
@@ -63,6 +63,10 @@ typedef struct f_smgr
 	void		(*smgr_truncate) (SMgrRelation reln, ForkNumber forknum,
 								  BlockNumber nblocks);
 	void		(*smgr_immedsync) (SMgrRelation reln, ForkNumber forknum);
+	void		(*smgr_createmark) (SMgrRelation reln, ForkNumber forknum,
+									StorageMarks mark, bool isRedo);
+	void		(*smgr_unlinkmark) (SMgrRelation reln, ForkNumber forknum,
+									StorageMarks mark, bool isRedo);
 } f_smgr;
 
 static const f_smgr smgrsw[] = {
@@ -84,6 +88,8 @@ static const f_smgr smgrsw[] = {
 		.smgr_nblocks = mdnblocks,
 		.smgr_truncate = mdtruncate,
 		.smgr_immedsync = mdimmedsync,
+		.smgr_createmark = mdcreatemark,
+		.smgr_unlinkmark = mdunlinkmark,
 	}
 };
 
@@ -337,6 +343,26 @@ smgrcreate(SMgrRelation reln, ForkNumber forknum, bool isRedo)
 	smgrsw[reln->smgr_which].smgr_create(reln, forknum, isRedo);
 }
 
+/*
+ *	smgrcreatemark() -- Create a mark file
+ */
+void
+smgrcreatemark(SMgrRelation reln, ForkNumber forknum, StorageMarks mark,
+			   bool isRedo)
+{
+	smgrsw[reln->smgr_which].smgr_createmark(reln, forknum, mark, isRedo);
+}
+
+/*
+ *	smgrunlinkmark() -- Delete a mark file
+ */
+void
+smgrunlinkmark(SMgrRelation reln, ForkNumber forknum, StorageMarks mark,
+			   bool isRedo)
+{
+	smgrsw[reln->smgr_which].smgr_unlinkmark(reln, forknum, mark, isRedo);
+}
+
 /*
  *	smgrdosyncall() -- Immediately sync all forks of all given relations
  *
@@ -664,6 +690,12 @@ smgrimmedsync(SMgrRelation reln, ForkNumber forknum)
 	smgrsw[reln->smgr_which].smgr_immedsync(reln, forknum);
 }
 
+void
+smgrunlink(SMgrRelation reln, ForkNumber forknum, bool isRedo)
+{
+	smgrsw[reln->smgr_which].smgr_unlink(reln->smgr_rnode, forknum, isRedo);
+}
+
 /*
  * AtEOXact_SMgr
  *
diff --git a/src/backend/storage/sync/sync.c b/src/backend/storage/sync/sync.c
index c695d816fc..ab11600724 100644
--- a/src/backend/storage/sync/sync.c
+++ b/src/backend/storage/sync/sync.c
@@ -91,7 +91,8 @@ static CycleCtr checkpoint_cycle_ctr = 0;
 typedef struct SyncOps
 {
 	int			(*sync_syncfiletag) (const FileTag *ftag, char *path);
-	int			(*sync_unlinkfiletag) (const FileTag *ftag, char *path);
+	int			(*sync_unlinkfiletag) (const FileTag *ftag, char *path,
+									   StorageMarks mark);
 	bool		(*sync_filetagmatches) (const FileTag *ftag,
 										const FileTag *candidate);
 } SyncOps;
@@ -236,7 +237,8 @@ SyncPostCheckpoint(void)
 
 		/* Unlink the file */
 		if (syncsw[entry->tag.handler].sync_unlinkfiletag(&entry->tag,
-														  path) < 0)
+														  path,
+														  SMGR_MARK_NONE) < 0)
 		{
 			/*
 			 * There's a race condition, when the database is dropped at the
@@ -245,6 +247,21 @@ SyncPostCheckpoint(void)
 			 * here. rmtree() also has to ignore ENOENT errors, to deal with
 			 * the possibility that we delete the file first.
 			 */
+			if (errno != ENOENT)
+				ereport(WARNING,
+						errcode_for_file_access(),
+						errmsg("could not remove file \"%s\": %m", path));
+		}
+		else if (syncsw[entry->tag.handler].sync_unlinkfiletag(&entry->tag,
+															   path,
+															   SMGR_MARK_UNCOMMITTED)
+				 < 0)
+		{
+			/*
+			 * And we may have SMGR_MARK_UNCOMMITTED file.  Remove it if the
+			 * fork file has been successfully removed. It's ok if the file
+			 * does not exist.
+			 */
 			if (errno != ENOENT)
 				ereport(WARNING,
 						(errcode_for_file_access(),
diff --git a/src/bin/pg_rewind/parsexlog.c b/src/bin/pg_rewind/parsexlog.c
index 49966e7b7f..c3515e5546 100644
--- a/src/bin/pg_rewind/parsexlog.c
+++ b/src/bin/pg_rewind/parsexlog.c
@@ -416,6 +416,28 @@ extractPageInfo(XLogReaderState *record)
 		 * source system.
 		 */
 	}
+	else if (rmid == RM_SMGR_ID && rminfo == XLOG_SMGR_UNLINK)
+	{
+		/*
+		 * We can safely ignore there.  We'll see that the file don't exist in
+		 * the target data dir, and copy them in from the source system. No
+		 * need to do anything special here.
+		 */
+	}
+	else if (rmid == RM_SMGR_ID && rminfo == XLOG_SMGR_MARK)
+	{
+		/*
+		 * We can safely ignore these, The file will be removed from the
+		 * target, if it doesn't exist in the source system.  The files are
+		 * empty so we don't need to bother the content.
+		 */
+	}
+	else if (rmid == RM_SMGR_ID && rminfo == XLOG_SMGR_BUFPERSISTENCE)
+	{
+		/*
+		 * We can safely ignore these. These don't make any on-disk changes.
+		 */
+	}
 	else if (rmid == RM_XACT_ID &&
 			 ((rminfo & XLOG_XACT_OPMASK) == XLOG_XACT_COMMIT ||
 			  (rminfo & XLOG_XACT_OPMASK) == XLOG_XACT_COMMIT_PREPARED ||
diff --git a/src/bin/pg_rewind/pg_rewind.c b/src/bin/pg_rewind/pg_rewind.c
index b39b5c1aac..9f7235b920 100644
--- a/src/bin/pg_rewind/pg_rewind.c
+++ b/src/bin/pg_rewind/pg_rewind.c
@@ -413,7 +413,6 @@ main(int argc, char **argv)
 	if (showprogress)
 		pg_log_info("reading source file list");
 	source->traverse_files(source, &process_source_file);
-
 	if (showprogress)
 		pg_log_info("reading target file list");
 	traverse_datadir(datadir_target, &process_target_file);
diff --git a/src/common/relpath.c b/src/common/relpath.c
index 636c96efd3..1c19e16fea 100644
--- a/src/common/relpath.c
+++ b/src/common/relpath.c
@@ -139,9 +139,15 @@ GetDatabasePath(Oid dbNode, Oid spcNode)
  */
 char *
 GetRelationPath(Oid dbNode, Oid spcNode, Oid relNode,
-				int backendId, ForkNumber forkNumber)
+				int backendId, ForkNumber forkNumber, char mark)
 {
 	char	   *path;
+	char		markstr[4];
+
+	if (mark == 0)
+		markstr[0] = 0;
+	else
+		snprintf(markstr, sizeof(markstr), ".%c", mark);
 
 	if (spcNode == GLOBALTABLESPACE_OID)
 	{
@@ -149,10 +155,10 @@ GetRelationPath(Oid dbNode, Oid spcNode, Oid relNode,
 		Assert(dbNode == 0);
 		Assert(backendId == InvalidBackendId);
 		if (forkNumber != MAIN_FORKNUM)
-			path = psprintf("global/%u_%s",
-							relNode, forkNames[forkNumber]);
+			path = psprintf("global/%u_%s%s",
+							relNode, forkNames[forkNumber], markstr);
 		else
-			path = psprintf("global/%u", relNode);
+			path = psprintf("global/%u%s", relNode, markstr);
 	}
 	else if (spcNode == DEFAULTTABLESPACE_OID)
 	{
@@ -160,22 +166,22 @@ GetRelationPath(Oid dbNode, Oid spcNode, Oid relNode,
 		if (backendId == InvalidBackendId)
 		{
 			if (forkNumber != MAIN_FORKNUM)
-				path = psprintf("base/%u/%u_%s",
+				path = psprintf("base/%u/%u_%s%s",
 								dbNode, relNode,
-								forkNames[forkNumber]);
+								forkNames[forkNumber], markstr);
 			else
-				path = psprintf("base/%u/%u",
-								dbNode, relNode);
+				path = psprintf("base/%u/%u%s",
+								dbNode, relNode, markstr);
 		}
 		else
 		{
 			if (forkNumber != MAIN_FORKNUM)
-				path = psprintf("base/%u/t%d_%u_%s",
+				path = psprintf("base/%u/t%d_%u_%s%s",
 								dbNode, backendId, relNode,
-								forkNames[forkNumber]);
+								forkNames[forkNumber], markstr);
 			else
-				path = psprintf("base/%u/t%d_%u",
-								dbNode, backendId, relNode);
+				path = psprintf("base/%u/t%d_%u%s",
+								dbNode, backendId, relNode, markstr);
 		}
 	}
 	else
@@ -184,27 +190,28 @@ GetRelationPath(Oid dbNode, Oid spcNode, Oid relNode,
 		if (backendId == InvalidBackendId)
 		{
 			if (forkNumber != MAIN_FORKNUM)
-				path = psprintf("pg_tblspc/%u/%s/%u/%u_%s",
+				path = psprintf("pg_tblspc/%u/%s/%u/%u_%s%s",
 								spcNode, TABLESPACE_VERSION_DIRECTORY,
 								dbNode, relNode,
-								forkNames[forkNumber]);
+								forkNames[forkNumber], markstr);
 			else
-				path = psprintf("pg_tblspc/%u/%s/%u/%u",
+				path = psprintf("pg_tblspc/%u/%s/%u/%u%s",
 								spcNode, TABLESPACE_VERSION_DIRECTORY,
-								dbNode, relNode);
+								dbNode, relNode, markstr);
 		}
 		else
 		{
 			if (forkNumber != MAIN_FORKNUM)
-				path = psprintf("pg_tblspc/%u/%s/%u/t%d_%u_%s",
+				path = psprintf("pg_tblspc/%u/%s/%u/t%d_%u_%s%s",
 								spcNode, TABLESPACE_VERSION_DIRECTORY,
 								dbNode, backendId, relNode,
-								forkNames[forkNumber]);
+								forkNames[forkNumber], markstr);
 			else
-				path = psprintf("pg_tblspc/%u/%s/%u/t%d_%u",
+				path = psprintf("pg_tblspc/%u/%s/%u/t%d_%u%s",
 								spcNode, TABLESPACE_VERSION_DIRECTORY,
-								dbNode, backendId, relNode);
+								dbNode, backendId, relNode, markstr);
 		}
 	}
+
 	return path;
 }
diff --git a/src/include/catalog/storage.h b/src/include/catalog/storage.h
index 844a023b2c..a685665fab 100644
--- a/src/include/catalog/storage.h
+++ b/src/include/catalog/storage.h
@@ -25,6 +25,8 @@ extern int	wal_skip_threshold;
 extern SMgrRelation RelationCreateStorage(RelFileNode rnode,
 										  char relpersistence,
 										  bool register_delete);
+extern void RelationCreateInitFork(Relation rel);
+extern void RelationDropInitFork(Relation rel);
 extern void RelationDropStorage(Relation rel);
 extern void RelationPreserveStorage(RelFileNode rnode, bool atCommit);
 extern void RelationPreTruncate(Relation rel);
@@ -43,6 +45,7 @@ extern void RestorePendingSyncs(char *startAddress);
 extern void smgrDoPendingDeletes(bool isCommit);
 extern void smgrDoPendingSyncs(bool isCommit, bool isParallelWorker);
 extern int	smgrGetPendingDeletes(bool forCommit, RelFileNode **ptr);
+extern void smgrDoPendingCleanups(bool isCommit);
 extern void AtSubCommit_smgr(void);
 extern void AtSubAbort_smgr(void);
 extern void PostPrepare_smgr(void);
diff --git a/src/include/catalog/storage_xlog.h b/src/include/catalog/storage_xlog.h
index 622de22b03..d83fc6876e 100644
--- a/src/include/catalog/storage_xlog.h
+++ b/src/include/catalog/storage_xlog.h
@@ -18,17 +18,23 @@
 #include "lib/stringinfo.h"
 #include "storage/block.h"
 #include "storage/relfilenode.h"
+#include "storage/smgr.h"
 
 /*
  * Declarations for smgr-related XLOG records
  *
- * Note: we log file creation and truncation here, but logging of deletion
- * actions is handled by xact.c, because it is part of transaction commit.
+ * Note: we log file creation, truncation and buffer persistence change here,
+ * but logging of deletion actions is handled mainly by xact.c, because it is
+ * part of transaction commit in most cases.  However, there's a case where
+ * init forks are deleted outside control of transaction.
  */
 
 /* XLOG gives us high 4 bits */
 #define XLOG_SMGR_CREATE	0x10
 #define XLOG_SMGR_TRUNCATE	0x20
+#define XLOG_SMGR_UNLINK	0x30
+#define XLOG_SMGR_MARK		0x40
+#define XLOG_SMGR_BUFPERSISTENCE	0x50
 
 typedef struct xl_smgr_create
 {
@@ -36,6 +42,32 @@ typedef struct xl_smgr_create
 	ForkNumber	forkNum;
 } xl_smgr_create;
 
+typedef struct xl_smgr_unlink
+{
+	RelFileNode rnode;
+	ForkNumber	forkNum;
+} xl_smgr_unlink;
+
+typedef enum smgr_mark_action
+{
+	XLOG_SMGR_MARK_CREATE = 'c',
+	XLOG_SMGR_MARK_UNLINK = 'u'
+} smgr_mark_action;
+
+typedef struct xl_smgr_mark
+{
+	RelFileNode rnode;
+	ForkNumber	forkNum;
+	StorageMarks mark;
+	smgr_mark_action action;
+} xl_smgr_mark;
+
+typedef struct xl_smgr_bufpersistence
+{
+	RelFileNode rnode;
+	bool		persistence;
+} xl_smgr_bufpersistence;
+
 /* flags for xl_smgr_truncate */
 #define SMGR_TRUNCATE_HEAP		0x0001
 #define SMGR_TRUNCATE_VM		0x0002
@@ -51,6 +83,12 @@ typedef struct xl_smgr_truncate
 } xl_smgr_truncate;
 
 extern void log_smgrcreate(const RelFileNode *rnode, ForkNumber forkNum);
+extern void log_smgrunlink(const RelFileNode *rnode, ForkNumber forkNum);
+extern void log_smgrcreatemark(const RelFileNode *rnode, ForkNumber forkNum,
+							   StorageMarks mark);
+extern void log_smgrunlinkmark(const RelFileNode *rnode, ForkNumber forkNum,
+							   StorageMarks mark);
+extern void log_smgrbufpersistence(const RelFileNode *rnode, bool persistence);
 
 extern void smgr_redo(XLogReaderState *record);
 extern void smgr_desc(StringInfo buf, XLogReaderState *record);
diff --git a/src/include/common/relpath.h b/src/include/common/relpath.h
index a4b5dc853b..a864c91614 100644
--- a/src/include/common/relpath.h
+++ b/src/include/common/relpath.h
@@ -67,7 +67,7 @@ extern int	forkname_chars(const char *str, ForkNumber *fork);
 extern char *GetDatabasePath(Oid dbNode, Oid spcNode);
 
 extern char *GetRelationPath(Oid dbNode, Oid spcNode, Oid relNode,
-							 int backendId, ForkNumber forkNumber);
+							 int backendId, ForkNumber forkNumber, char mark);
 
 /*
  * Wrapper macros for GetRelationPath.  Beware of multiple
@@ -77,7 +77,7 @@ extern char *GetRelationPath(Oid dbNode, Oid spcNode, Oid relNode,
 /* First argument is a RelFileNode */
 #define relpathbackend(rnode, backend, forknum) \
 	GetRelationPath((rnode).dbNode, (rnode).spcNode, (rnode).relNode, \
-					backend, forknum)
+					backend, forknum, 0)
 
 /* First argument is a RelFileNode */
 #define relpathperm(rnode, forknum) \
@@ -87,4 +87,9 @@ extern char *GetRelationPath(Oid dbNode, Oid spcNode, Oid relNode,
 #define relpath(rnode, forknum) \
 	relpathbackend((rnode).node, (rnode).backend, forknum)
 
+/* First argument is a RelFileNodeBackend */
+#define markpath(rnode, forknum, mark)								\
+	GetRelationPath((rnode).node.dbNode, (rnode).node.spcNode, \
+					(rnode).node.relNode, \
+					(rnode).backend, forknum, mark)
 #endif							/* RELPATH_H */
diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h
index a6b657f0ba..b7db0b2922 100644
--- a/src/include/storage/bufmgr.h
+++ b/src/include/storage/bufmgr.h
@@ -210,6 +210,8 @@ extern void CreateAndCopyRelationData(RelFileNode src_rnode,
 extern void FlushDatabaseBuffers(Oid dbid);
 extern void DropRelFileNodeBuffers(struct SMgrRelationData *smgr_reln, ForkNumber *forkNum,
 								   int nforks, BlockNumber *firstDelBlock);
+extern void SetRelationBuffersPersistence(struct SMgrRelationData *srel,
+										  bool permanent, bool isRedo);
 extern void DropRelFileNodesAllBuffers(struct SMgrRelationData **smgr_reln, int nnodes);
 extern void DropDatabaseBuffers(Oid dbid);
 
diff --git a/src/include/storage/fd.h b/src/include/storage/fd.h
index 29209e2724..8bf746bf45 100644
--- a/src/include/storage/fd.h
+++ b/src/include/storage/fd.h
@@ -185,6 +185,7 @@ extern ssize_t pg_pwritev_with_retry(int fd,
 extern int	pg_truncate(const char *path, off_t length);
 extern void fsync_fname(const char *fname, bool isdir);
 extern int	fsync_fname_ext(const char *fname, bool isdir, bool ignore_perm, int elevel);
+extern int	fsync_parent_path(const char *fname, int elevel);
 extern int	durable_rename(const char *oldfile, const char *newfile, int loglevel);
 extern int	durable_unlink(const char *fname, int loglevel);
 extern int	durable_rename_excl(const char *oldfile, const char *newfile, int loglevel);
diff --git a/src/include/storage/md.h b/src/include/storage/md.h
index 6e46d8d96a..18b27d366b 100644
--- a/src/include/storage/md.h
+++ b/src/include/storage/md.h
@@ -24,6 +24,10 @@ extern void mdinit(void);
 extern void mdopen(SMgrRelation reln);
 extern void mdclose(SMgrRelation reln, ForkNumber forknum);
 extern void mdrelease(void);
+extern void mdcreatemark(SMgrRelation reln, ForkNumber forknum,
+						 StorageMarks mark, bool isRedo);
+extern void mdunlinkmark(SMgrRelation reln, ForkNumber forknum,
+						 StorageMarks mark, bool isRedo);
 extern void mdcreate(SMgrRelation reln, ForkNumber forknum, bool isRedo);
 extern bool mdexists(SMgrRelation reln, ForkNumber forknum);
 extern void mdunlink(RelFileNodeBackend rnode, ForkNumber forknum, bool isRedo);
@@ -42,12 +46,14 @@ extern void mdtruncate(SMgrRelation reln, ForkNumber forknum,
 					   BlockNumber nblocks);
 extern void mdimmedsync(SMgrRelation reln, ForkNumber forknum);
 
+extern void ForgetRelationForkSyncRequests(RelFileNodeBackend rnode,
+										   ForkNumber forknum);
 extern void ForgetDatabaseSyncRequests(Oid dbid);
 extern void DropRelationFiles(RelFileNode *delrels, int ndelrels, bool isRedo);
 
 /* md sync callbacks */
 extern int	mdsyncfiletag(const FileTag *ftag, char *path);
-extern int	mdunlinkfiletag(const FileTag *ftag, char *path);
+extern int	mdunlinkfiletag(const FileTag *ftag, char *path, StorageMarks mark);
 extern bool mdfiletagmatches(const FileTag *ftag, const FileTag *candidate);
 
 #endif							/* MD_H */
diff --git a/src/include/storage/reinit.h b/src/include/storage/reinit.h
index bf2c10d443..e399aec0c7 100644
--- a/src/include/storage/reinit.h
+++ b/src/include/storage/reinit.h
@@ -16,13 +16,15 @@
 #define REINIT_H
 
 #include "common/relpath.h"
-
+#include "storage/smgr.h"
 
 extern void ResetUnloggedRelations(int op);
-extern bool parse_filename_for_nontemp_relation(const char *name,
-												int *oidchars, ForkNumber *fork);
+extern bool parse_filename_for_nontemp_relation(const char *name, int *oidchars,
+												ForkNumber *fork,
+												StorageMarks *mark);
 
 #define UNLOGGED_RELATION_CLEANUP		0x0001
-#define UNLOGGED_RELATION_INIT			0x0002
+#define UNLOGGED_RELATION_DROP_BUFFER	0x0002
+#define UNLOGGED_RELATION_INIT			0x0004
 
 #endif							/* REINIT_H */
diff --git a/src/include/storage/smgr.h b/src/include/storage/smgr.h
index 8e3ef92cda..43b33b6b8d 100644
--- a/src/include/storage/smgr.h
+++ b/src/include/storage/smgr.h
@@ -18,6 +18,18 @@
 #include "storage/block.h"
 #include "storage/relfilenode.h"
 
+/*
+ * Storage marks is a file of which existence suggests something about a
+ * file. The name of such files is "<filename>.<mark>", where the mark is one
+ * of the values of StorageMarks. Since ".<digit>" means segment files so don't
+ * use digits for the mark character.
+ */
+typedef enum StorageMarks
+{
+	SMGR_MARK_NONE = 0,
+	SMGR_MARK_UNCOMMITTED = 'u' /* the file is not committed yet */
+} StorageMarks;
+
 /*
  * smgr.c maintains a table of SMgrRelation objects, which are essentially
  * cached file handles.  An SMgrRelation is created (if not already present)
@@ -85,7 +97,12 @@ extern void smgrclearowner(SMgrRelation *owner, SMgrRelation reln);
 extern void smgrclose(SMgrRelation reln);
 extern void smgrcloseall(void);
 extern void smgrclosenode(RelFileNodeBackend rnode);
+extern void smgrcreatemark(SMgrRelation reln, ForkNumber forknum,
+						   StorageMarks mark, bool isRedo);
+extern void smgrunlinkmark(SMgrRelation reln, ForkNumber forknum,
+						   StorageMarks mark, bool isRedo);
 extern void smgrcreate(SMgrRelation reln, ForkNumber forknum, bool isRedo);
+extern void smgrunlink(SMgrRelation reln, ForkNumber forknum, bool isRedo);
 extern void smgrdosyncall(SMgrRelation *rels, int nrels);
 extern void smgrdounlinkall(SMgrRelation *rels, int nrels, bool isRedo);
 extern void smgrextend(SMgrRelation reln, ForkNumber forknum,
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 72fafb795b..181709039c 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1890,6 +1890,7 @@ PatternInfoArray
 Pattern_Prefix_Status
 Pattern_Type
 PendingFsyncEntry
+PendingMarkCleanup
 PendingRelDelete
 PendingRelSync
 PendingUnlinkEntry
@@ -2535,6 +2536,7 @@ StdRdOptIndexCleanup
 StdRdOptions
 Step
 StopList
+StorageMarks
 StrategyNumber
 StreamCtl
 StreamXidHash
@@ -3502,6 +3504,7 @@ registered_buffer
 regmatch_t
 regoff_t
 regproc
+relfile_entry
 relopt_bool
 relopt_enum
 relopt_enum_elt_def
@@ -3555,6 +3558,7 @@ slist_iter
 slist_mutable_iter
 slist_node
 slock_t
+smgr_mark_action
 socket_set
 spgBulkDeleteState
 spgChooseIn
@@ -3755,8 +3759,11 @@ xl_replorigin_set
 xl_restore_point
 xl_running_xacts
 xl_seq_rec
+xl_smgr_bufpersistence
 xl_smgr_create
+xl_smgr_mark
 xl_smgr_truncate
+xl_smgr_unlink
 xl_standby_lock
 xl_standby_locks
 xl_tblspc_create_rec
-- 
2.27.0

>From 510c31a90a7874a431b0ed9c669fa7c39f9e68fe Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horikyoga....@gmail.com>
Date: Wed, 11 Nov 2020 23:21:09 +0900
Subject: [PATCH v20 2/2] New command ALTER TABLE ALL IN TABLESPACE SET
 LOGGED/UNLOGGED

To ease invoking ALTER TABLE SET LOGGED/UNLOGGED, this command changes
relation persistence of all tables in the specified tablespace.
---
 doc/src/sgml/ref/alter_table.sgml        |  16 +++
 src/backend/commands/tablecmds.c         | 140 +++++++++++++++++++++++
 src/backend/nodes/copyfuncs.c            |  16 +++
 src/backend/nodes/equalfuncs.c           |  15 +++
 src/backend/parser/gram.y                |  42 +++++++
 src/backend/tcop/utility.c               |  11 ++
 src/include/commands/tablecmds.h         |   2 +
 src/include/nodes/nodes.h                |   1 +
 src/include/nodes/parsenodes.h           |  10 ++
 src/test/regress/expected/tablespace.out |  76 ++++++++++++
 src/test/regress/sql/tablespace.sql      |  41 +++++++
 11 files changed, 370 insertions(+)

diff --git a/doc/src/sgml/ref/alter_table.sgml b/doc/src/sgml/ref/alter_table.sgml
index 5c0735e08a..5ae825b30f 100644
--- a/doc/src/sgml/ref/alter_table.sgml
+++ b/doc/src/sgml/ref/alter_table.sgml
@@ -33,6 +33,8 @@ ALTER TABLE [ IF EXISTS ] <replaceable class="parameter">name</replaceable>
     SET SCHEMA <replaceable class="parameter">new_schema</replaceable>
 ALTER TABLE ALL IN TABLESPACE <replaceable class="parameter">name</replaceable> [ OWNED BY <replaceable class="parameter">role_name</replaceable> [, ... ] ]
     SET TABLESPACE <replaceable class="parameter">new_tablespace</replaceable> [ NOWAIT ]
+ALTER TABLE ALL IN TABLESPACE <replaceable class="parameter">name</replaceable> [ OWNED BY <replaceable class="parameter">role_name</replaceable> [, ... ] ]
+    SET { LOGGED | UNLOGGED } [ NOWAIT ]
 ALTER TABLE [ IF EXISTS ] <replaceable class="parameter">name</replaceable>
     ATTACH PARTITION <replaceable class="parameter">partition_name</replaceable> { FOR VALUES <replaceable class="parameter">partition_bound_spec</replaceable> | DEFAULT }
 ALTER TABLE [ IF EXISTS ] <replaceable class="parameter">name</replaceable>
@@ -753,6 +755,20 @@ WITH ( MODULUS <replaceable class="parameter">numeric_literal</replaceable>, REM
       (see <xref linkend="sql-createtable-unlogged"/>).  It cannot be applied
       to a temporary table.
      </para>
+
+     <para>
+      All tables in the current database in a tablespace can be changed by
+      using the <literal>ALL IN TABLESPACE</literal> form, which will first
+      lock all tables to be changed and then change each one.  This form also
+      supports
+      <literal>OWNED BY</literal>, which will only change tables owned by the
+      specified roles.  If the <literal>NOWAIT</literal> option is specified,
+      then the command will fail if it is unable to immediately acquire all of
+      the locks required.  The <literal>information_schema</literal> relations
+      are not considered part of the system catalogs and will be changed.  See
+      also
+      <link linkend="sql-createtablespace"><command>CREATE TABLESPACE</command></link>.
+     </para>
     </listitem>
    </varlistentry>
 
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 71aaf3320a..5442f790ed 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -14794,6 +14794,146 @@ AlterTableMoveAll(AlterTableMoveAllStmt *stmt)
 	return new_tablespaceoid;
 }
 
+/*
+ * Alter Table ALL ... SET LOGGED/UNLOGGED
+ *
+ * Allows a user to change persistence of all objects in a given tablespace in
+ * the current database.  Objects can be chosen based on the owner of the
+ * object also, to allow users to change persistence only their objects. The
+ * main permissions handling is done by the lower-level change persistence
+ * function.
+ *
+ * All to-be-modified objects are locked first. If NOWAIT is specified and the
+ * lock can't be acquired then we ereport(ERROR).
+ */
+void
+AlterTableSetLoggedAll(AlterTableSetLoggedAllStmt * stmt)
+{
+	List	   *relations = NIL;
+	ListCell   *l;
+	ScanKeyData key[1];
+	Relation	rel;
+	TableScanDesc scan;
+	HeapTuple	tuple;
+	Oid			tablespaceoid;
+	List	   *role_oids = roleSpecsToIds(stmt->roles);
+
+	/* Ensure we were not asked to change something we can't */
+	if (stmt->objtype != OBJECT_TABLE)
+		ereport(ERROR,
+				errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				errmsg("only tables can be specified"));
+
+	/* Get the tablespace OID */
+	tablespaceoid = get_tablespace_oid(stmt->tablespacename, false);
+
+	/*
+	 * Now that the checks are done, check if we should set either to
+	 * InvalidOid because it is our database's default tablespace.
+	 */
+	if (tablespaceoid == MyDatabaseTableSpace)
+		tablespaceoid = InvalidOid;
+
+	/*
+	 * Walk the list of objects in the tablespace to pick up them. This will
+	 * only find objects in our database, of course.
+	 */
+	ScanKeyInit(&key[0],
+				Anum_pg_class_reltablespace,
+				BTEqualStrategyNumber, F_OIDEQ,
+				ObjectIdGetDatum(tablespaceoid));
+
+	rel = table_open(RelationRelationId, AccessShareLock);
+	scan = table_beginscan_catalog(rel, 1, key);
+	while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
+	{
+		Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
+		Oid			relOid = relForm->oid;
+
+		/*
+		 * Do not pick-up objects in pg_catalog as part of this, if an admin
+		 * really wishes to do so, they can issue the individual ALTER
+		 * commands directly.
+		 *
+		 * Also, explicitly avoid any shared tables, temp tables, or TOAST
+		 * (TOAST will be changed with the main table).
+		 */
+		if (IsCatalogNamespace(relForm->relnamespace) ||
+			relForm->relisshared ||
+			isAnyTempNamespace(relForm->relnamespace) ||
+			IsToastNamespace(relForm->relnamespace))
+			continue;
+
+		/* Only pick up the object type requested */
+		if (relForm->relkind != RELKIND_RELATION)
+			continue;
+
+		/* Check if we are only picking-up objects owned by certain roles */
+		if (role_oids != NIL && !list_member_oid(role_oids, relForm->relowner))
+			continue;
+
+		/*
+		 * Handle permissions-checking here since we are locking the tables
+		 * and also to avoid doing a bunch of work only to fail part-way. Note
+		 * that permissions will also be checked by AlterTableInternal().
+		 *
+		 * Caller must be considered an owner on the table of which we're
+		 * going to change persistence.
+		 */
+		if (!pg_class_ownercheck(relOid, GetUserId()))
+			aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(get_rel_relkind(relOid)),
+						   NameStr(relForm->relname));
+
+		if (stmt->nowait &&
+			!ConditionalLockRelationOid(relOid, AccessExclusiveLock))
+			ereport(ERROR,
+					errcode(ERRCODE_OBJECT_IN_USE),
+					errmsg("aborting because lock on relation \"%s.%s\" is not available",
+						   get_namespace_name(relForm->relnamespace),
+						   NameStr(relForm->relname)));
+		else
+			LockRelationOid(relOid, AccessExclusiveLock);
+
+		/*
+		 * Add to our list of objects of which we're going to change
+		 * persistence.
+		 */
+		relations = lappend_oid(relations, relOid);
+	}
+
+	table_endscan(scan);
+	table_close(rel, AccessShareLock);
+
+	if (relations == NIL)
+		ereport(NOTICE,
+				errcode(ERRCODE_NO_DATA_FOUND),
+				errmsg("no matching relations in tablespace \"%s\" found",
+					   tablespaceoid == InvalidOid ? "(database default)" :
+					   get_tablespace_name(tablespaceoid)));
+
+	/*
+	 * Everything is locked, loop through and change persistence of all of the
+	 * relations.
+	 */
+	foreach(l, relations)
+	{
+		List	   *cmds = NIL;
+		AlterTableCmd *cmd = makeNode(AlterTableCmd);
+
+		if (stmt->logged)
+			cmd->subtype = AT_SetLogged;
+		else
+			cmd->subtype = AT_SetUnLogged;
+
+		cmds = lappend(cmds, cmd);
+
+		EventTriggerAlterTableStart((Node *) stmt);
+		/* OID is set by AlterTableInternal */
+		AlterTableInternal(lfirst_oid(l), cmds, false);
+		EventTriggerAlterTableEnd();
+	}
+}
+
 static void
 index_copy_data(Relation rel, RelFileNode newrnode)
 {
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index 56505557bf..722464ab6e 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -4713,6 +4713,19 @@ _copyAlterTableMoveAllStmt(const AlterTableMoveAllStmt *from)
 	return newnode;
 }
 
+static AlterTableSetLoggedAllStmt *
+_copyAlterTableSetLoggedAllStmt(const AlterTableSetLoggedAllStmt * from)
+{
+	AlterTableSetLoggedAllStmt *newnode = makeNode(AlterTableSetLoggedAllStmt);
+
+	COPY_STRING_FIELD(tablespacename);
+	COPY_SCALAR_FIELD(objtype);
+	COPY_SCALAR_FIELD(logged);
+	COPY_SCALAR_FIELD(nowait);
+
+	return newnode;
+}
+
 static CreateExtensionStmt *
 _copyCreateExtensionStmt(const CreateExtensionStmt *from)
 {
@@ -6156,6 +6169,9 @@ copyObjectImpl(const void *from)
 		case T_AlterTableMoveAllStmt:
 			retval = _copyAlterTableMoveAllStmt(from);
 			break;
+		case T_AlterTableSetLoggedAllStmt:
+			retval = _copyAlterTableSetLoggedAllStmt(from);
+			break;
 		case T_CreateExtensionStmt:
 			retval = _copyCreateExtensionStmt(from);
 			break;
diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c
index 9ea3c5abf2..04e00cd7f4 100644
--- a/src/backend/nodes/equalfuncs.c
+++ b/src/backend/nodes/equalfuncs.c
@@ -2221,6 +2221,18 @@ _equalAlterTableMoveAllStmt(const AlterTableMoveAllStmt *a,
 	return true;
 }
 
+static bool
+_equalAlterTableSetLoggedAllStmt(const AlterTableSetLoggedAllStmt * a,
+								 const AlterTableSetLoggedAllStmt * b)
+{
+	COMPARE_STRING_FIELD(tablespacename);
+	COMPARE_SCALAR_FIELD(objtype);
+	COMPARE_SCALAR_FIELD(logged);
+	COMPARE_SCALAR_FIELD(nowait);
+
+	return true;
+}
+
 static bool
 _equalCreateExtensionStmt(const CreateExtensionStmt *a, const CreateExtensionStmt *b)
 {
@@ -4012,6 +4024,9 @@ equal(const void *a, const void *b)
 		case T_AlterTableMoveAllStmt:
 			retval = _equalAlterTableMoveAllStmt(a, b);
 			break;
+		case T_AlterTableSetLoggedAllStmt:
+			retval = _equalAlterTableSetLoggedAllStmt(a, b);
+			break;
 		case T_CreateExtensionStmt:
 			retval = _equalCreateExtensionStmt(a, b);
 			break;
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index eefcf90187..3754e758bd 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -2077,6 +2077,48 @@ AlterTableStmt:
 					n->nowait = $13;
 					$$ = (Node *)n;
 				}
+		|	ALTER TABLE ALL IN_P TABLESPACE name SET LOGGED opt_nowait
+				{
+					AlterTableSetLoggedAllStmt *n =
+						makeNode(AlterTableSetLoggedAllStmt);
+					n->tablespacename = $6;
+					n->objtype = OBJECT_TABLE;
+					n->logged = true;
+					n->nowait = $9;
+					$$ = (Node *)n;
+				}
+		|	ALTER TABLE ALL IN_P TABLESPACE name OWNED BY role_list SET LOGGED opt_nowait
+				{
+					AlterTableSetLoggedAllStmt *n =
+						makeNode(AlterTableSetLoggedAllStmt);
+					n->tablespacename = $6;
+					n->objtype = OBJECT_TABLE;
+					n->roles = $9;
+					n->logged = true;
+					n->nowait = $12;
+					$$ = (Node *)n;
+				}
+		|	ALTER TABLE ALL IN_P TABLESPACE name SET UNLOGGED opt_nowait
+				{
+					AlterTableSetLoggedAllStmt *n =
+						makeNode(AlterTableSetLoggedAllStmt);
+					n->tablespacename = $6;
+					n->objtype = OBJECT_TABLE;
+					n->logged = false;
+					n->nowait = $9;
+					$$ = (Node *)n;
+				}
+		|	ALTER TABLE ALL IN_P TABLESPACE name OWNED BY role_list SET UNLOGGED opt_nowait
+				{
+					AlterTableSetLoggedAllStmt *n =
+						makeNode(AlterTableSetLoggedAllStmt);
+					n->tablespacename = $6;
+					n->objtype = OBJECT_TABLE;
+					n->roles = $9;
+					n->logged = false;
+					n->nowait = $12;
+					$$ = (Node *)n;
+				}
 		|	ALTER INDEX qualified_name alter_table_cmds
 				{
 					AlterTableStmt *n = makeNode(AlterTableStmt);
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index f364a9b88a..f3670a56a2 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -164,6 +164,7 @@ ClassifyUtilityCommandAsReadOnly(Node *parsetree)
 		case T_AlterTSConfigurationStmt:
 		case T_AlterTSDictionaryStmt:
 		case T_AlterTableMoveAllStmt:
+		case T_AlterTableSetLoggedAllStmt:
 		case T_AlterTableSpaceOptionsStmt:
 		case T_AlterTableStmt:
 		case T_AlterTypeStmt:
@@ -1754,6 +1755,12 @@ ProcessUtilitySlow(ParseState *pstate,
 				commandCollected = true;
 				break;
 
+			case T_AlterTableSetLoggedAllStmt:
+				AlterTableSetLoggedAll((AlterTableSetLoggedAllStmt *) parsetree);
+				/* commands are stashed in AlterTableSetLoggedAll */
+				commandCollected = true;
+				break;
+
 			case T_DropStmt:
 				ExecDropStmt((DropStmt *) parsetree, isTopLevel);
 				/* no commands stashed for DROP */
@@ -2682,6 +2689,10 @@ CreateCommandTag(Node *parsetree)
 			tag = AlterObjectTypeCommandTag(((AlterTableMoveAllStmt *) parsetree)->objtype);
 			break;
 
+		case T_AlterTableSetLoggedAllStmt:
+			tag = AlterObjectTypeCommandTag(((AlterTableSetLoggedAllStmt *) parsetree)->objtype);
+			break;
+
 		case T_AlterTableStmt:
 			tag = AlterObjectTypeCommandTag(((AlterTableStmt *) parsetree)->objtype);
 			break;
diff --git a/src/include/commands/tablecmds.h b/src/include/commands/tablecmds.h
index 5d4037f26e..09bb75d6a0 100644
--- a/src/include/commands/tablecmds.h
+++ b/src/include/commands/tablecmds.h
@@ -42,6 +42,8 @@ extern void AlterTableInternal(Oid relid, List *cmds, bool recurse);
 
 extern Oid	AlterTableMoveAll(AlterTableMoveAllStmt *stmt);
 
+extern void AlterTableSetLoggedAll(AlterTableSetLoggedAllStmt * stmt);
+
 extern ObjectAddress AlterTableNamespace(AlterObjectSchemaStmt *stmt,
 										 Oid *oldschema);
 
diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h
index 53f6b05a3f..c078478376 100644
--- a/src/include/nodes/nodes.h
+++ b/src/include/nodes/nodes.h
@@ -444,6 +444,7 @@ typedef enum NodeTag
 	T_AlterCollationStmt,
 	T_CallStmt,
 	T_AlterStatsStmt,
+	T_AlterTableSetLoggedAllStmt,
 
 	/*
 	 * TAGS FOR PARSE TREE NODES (parsenodes.h)
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index c24fc26da1..dda1f67a35 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -2608,6 +2608,16 @@ typedef struct AlterTableMoveAllStmt
 	bool		nowait;
 } AlterTableMoveAllStmt;
 
+typedef struct AlterTableSetLoggedAllStmt
+{
+	NodeTag		type;
+	char	   *tablespacename;
+	ObjectType	objtype;		/* Object type to move */
+	List	   *roles;			/* List of roles to change objects of */
+	bool		logged;
+	bool		nowait;
+}			AlterTableSetLoggedAllStmt;
+
 /* ----------------------
  *		Create/Alter Extension Statements
  * ----------------------
diff --git a/src/test/regress/expected/tablespace.out b/src/test/regress/expected/tablespace.out
index c52cf1cfcf..a679d58553 100644
--- a/src/test/regress/expected/tablespace.out
+++ b/src/test/regress/expected/tablespace.out
@@ -966,5 +966,81 @@ drop cascades to table testschema.part
 drop cascades to table testschema.atable
 drop cascades to materialized view testschema.amv
 drop cascades to table testschema.tablespace_acl
+--
+-- Check persistence change in a tablespace
+CREATE SCHEMA testschema;
+GRANT CREATE ON SCHEMA testschema TO regress_tablespace_user1;
+CREATE TABLESPACE regress_tablespace LOCATION '';
+GRANT CREATE ON TABLESPACE regress_tablespace TO regress_tablespace_user1;
+CREATE TABLE testschema.lsu(a int) TABLESPACE regress_tablespace;
+CREATE UNLOGGED TABLE testschema.usu(a int) TABLESPACE regress_tablespace;
+CREATE TABLE testschema._lsu(a int) TABLESPACE pg_default;
+CREATE UNLOGGED TABLE testschema._usu(a int) TABLESPACE pg_default;
+SET ROLE regress_tablespace_user1;
+CREATE TABLE testschema.lu1(a int) TABLESPACE regress_tablespace;
+CREATE UNLOGGED TABLE testschema.uu1(a int) TABLESPACE regress_tablespace;
+CREATE TABLE testschema._lu1(a int) TABLESPACE pg_default;
+CREATE UNLOGGED TABLE testschema._uu1(a int) TABLESPACE pg_default;
+SELECT relname, t.spcname, relpersistence
+ FROM pg_class c LEFT JOIN pg_tablespace t ON (c.reltablespace = t.oid)
+ WHERE relnamespace = 'testschema'::regnamespace ORDER BY spcname, c.oid;
+ relname |      spcname       | relpersistence 
+---------+--------------------+----------------
+ lsu     | regress_tablespace | p
+ usu     | regress_tablespace | u
+ lu1     | regress_tablespace | p
+ uu1     | regress_tablespace | u
+ _lsu    |                    | p
+ _usu    |                    | u
+ _lu1    |                    | p
+ _uu1    |                    | u
+(8 rows)
+
+ALTER TABLE ALL IN TABLESPACE regress_tablespace
+	  OWNED BY regress_tablespace_user1 SET LOGGED;
+SELECT relname, t.spcname, relpersistence
+ FROM pg_class c LEFT JOIN pg_tablespace t ON (c.reltablespace = t.oid)
+ WHERE relnamespace = 'testschema'::regnamespace ORDER BY spcname, c.oid;
+ relname |      spcname       | relpersistence 
+---------+--------------------+----------------
+ lsu     | regress_tablespace | p
+ usu     | regress_tablespace | u
+ lu1     | regress_tablespace | p
+ uu1     | regress_tablespace | p
+ _lsu    |                    | p
+ _usu    |                    | u
+ _lu1    |                    | p
+ _uu1    |                    | u
+(8 rows)
+
+RESET ROLE;
+ALTER TABLE ALL IN TABLESPACE regress_tablespace SET UNLOGGED;
+SELECT relname, t.spcname, relpersistence
+ FROM pg_class c LEFT JOIN pg_tablespace t ON (c.reltablespace = t.oid)
+ WHERE relnamespace = 'testschema'::regnamespace ORDER BY spcname, c.oid;
+ relname |      spcname       | relpersistence 
+---------+--------------------+----------------
+ lsu     | regress_tablespace | u
+ usu     | regress_tablespace | u
+ lu1     | regress_tablespace | u
+ uu1     | regress_tablespace | u
+ _lsu    |                    | p
+ _usu    |                    | u
+ _lu1    |                    | p
+ _uu1    |                    | u
+(8 rows)
+
+-- Should succeed
+DROP SCHEMA testschema CASCADE;
+NOTICE:  drop cascades to 8 other objects
+DETAIL:  drop cascades to table testschema.lsu
+drop cascades to table testschema.usu
+drop cascades to table testschema._lsu
+drop cascades to table testschema._usu
+drop cascades to table testschema.lu1
+drop cascades to table testschema.uu1
+drop cascades to table testschema._lu1
+drop cascades to table testschema._uu1
+DROP TABLESPACE regress_tablespace;
 DROP ROLE regress_tablespace_user1;
 DROP ROLE regress_tablespace_user2;
diff --git a/src/test/regress/sql/tablespace.sql b/src/test/regress/sql/tablespace.sql
index 21db433f2a..a4b664f4e0 100644
--- a/src/test/regress/sql/tablespace.sql
+++ b/src/test/regress/sql/tablespace.sql
@@ -431,5 +431,46 @@ DROP TABLESPACE regress_tblspace_renamed;
 
 DROP SCHEMA testschema CASCADE;
 
+
+--
+-- Check persistence change in a tablespace
+CREATE SCHEMA testschema;
+GRANT CREATE ON SCHEMA testschema TO regress_tablespace_user1;
+CREATE TABLESPACE regress_tablespace LOCATION '';
+GRANT CREATE ON TABLESPACE regress_tablespace TO regress_tablespace_user1;
+
+CREATE TABLE testschema.lsu(a int) TABLESPACE regress_tablespace;
+CREATE UNLOGGED TABLE testschema.usu(a int) TABLESPACE regress_tablespace;
+CREATE TABLE testschema._lsu(a int) TABLESPACE pg_default;
+CREATE UNLOGGED TABLE testschema._usu(a int) TABLESPACE pg_default;
+SET ROLE regress_tablespace_user1;
+CREATE TABLE testschema.lu1(a int) TABLESPACE regress_tablespace;
+CREATE UNLOGGED TABLE testschema.uu1(a int) TABLESPACE regress_tablespace;
+CREATE TABLE testschema._lu1(a int) TABLESPACE pg_default;
+CREATE UNLOGGED TABLE testschema._uu1(a int) TABLESPACE pg_default;
+
+SELECT relname, t.spcname, relpersistence
+ FROM pg_class c LEFT JOIN pg_tablespace t ON (c.reltablespace = t.oid)
+ WHERE relnamespace = 'testschema'::regnamespace ORDER BY spcname, c.oid;
+
+ALTER TABLE ALL IN TABLESPACE regress_tablespace
+	  OWNED BY regress_tablespace_user1 SET LOGGED;
+
+SELECT relname, t.spcname, relpersistence
+ FROM pg_class c LEFT JOIN pg_tablespace t ON (c.reltablespace = t.oid)
+ WHERE relnamespace = 'testschema'::regnamespace ORDER BY spcname, c.oid;
+
+RESET ROLE;
+
+ALTER TABLE ALL IN TABLESPACE regress_tablespace SET UNLOGGED;
+
+SELECT relname, t.spcname, relpersistence
+ FROM pg_class c LEFT JOIN pg_tablespace t ON (c.reltablespace = t.oid)
+ WHERE relnamespace = 'testschema'::regnamespace ORDER BY spcname, c.oid;
+
+-- Should succeed
+DROP SCHEMA testschema CASCADE;
+DROP TABLESPACE regress_tablespace;
+
 DROP ROLE regress_tablespace_user1;
 DROP ROLE regress_tablespace_user2;
-- 
2.27.0

Reply via email to