Hello. This is a thread for an alternative solution to wal_level=none
[*1] for bulk data loading.

*1: 
https://www.postgresql.org/message-id/TYAPR01MB29901EBE5A3ACCE55BA99186FE320%40TYAPR01MB2990.jpnprd01.prod.outlook.com

At Tue, 10 Nov 2020 09:33:12 -0500, Stephen Frost <sfr...@snowman.net> wrote in 
> Greetings,
> 
> * Kyotaro Horiguchi (horikyota....@gmail.com) wrote:
> > For fuel(?) of the discussion, I tried a very-quick PoC for in-place
> > ALTER TABLE SET LOGGED/UNLOGGED and resulted as attached. After some
> > trials of several ways, I drifted to the following way after poking
> > several ways.
> > 
> > 1. Flip BM_PERMANENT of active buffers
> > 2. adding/removing init fork
> > 3. sync files,
> > 4. Flip pg_class.relpersistence.
> > 
> > It always skips table copy in the SET UNLOGGED case, and only when
> > wal_level=minimal in the SET LOGGED case.  Crash recovery seems
> > working by some brief testing by hand.
> 
> Somehow missed that this patch more-or-less does what I was referring to
> down-thread, but I did want to mention that it looks like it's missing a
> necessary FlushRelationBuffers() call before the sync, otherwise there
> could be dirty buffers for the relation that's being set to LOGGED (with
> wal_level=minimal), which wouldn't be good.  See the comments above
> smgrimmedsync().

Right. Thanks.  However, since SetRelFileNodeBuffersPersistence()
called just above scans shared buffers so I don't want to just call
FlushRelationBuffers() separately.  Instead, I added buffer-flush to
SetRelFileNodeBuffersPersistence().

FWIW this is a revised version of the PoC, which has some known
problems.

- Flipping of Buffer persistence is not WAL-logged nor even be able to
  be safely roll-backed. (It might be better to drop buffers).

- This version handles indexes but not yet handle toast relatins.

- tableAMs are supposed to support this feature. (but I'm not sure
  it's worth allowing them not to do so).

> > Of course, I haven't performed intensive test on it.
> 
> Reading through the thread, it didn't seem very clear, but we should
> definitely make sure that it does the right thing on replicas when going
> between unlogged and logged (and between logged and unlogged too), of
> course.

regards.

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c
index dcaea7135f..0c6ce70484 100644
--- a/src/backend/access/heap/heapam_handler.c
+++ b/src/backend/access/heap/heapam_handler.c
@@ -613,6 +613,27 @@ heapam_relation_set_new_filenode(Relation rel,
 	smgrclose(srel);
 }
 
+static void
+heapam_relation_set_persistence(Relation rel, char persistence)
+{
+	Assert(rel->rd_rel->relpersistence == RELPERSISTENCE_PERMANENT ||
+		   rel->rd_rel->relpersistence == RELPERSISTENCE_UNLOGGED);
+
+	Assert (rel->rd_rel->relpersistence != persistence);
+
+	if (persistence == RELPERSISTENCE_UNLOGGED)
+	{
+		Assert(rel->rd_rel->relkind == RELKIND_RELATION ||
+			   rel->rd_rel->relkind == RELKIND_MATVIEW ||
+			   rel->rd_rel->relkind == RELKIND_TOASTVALUE);
+
+		RelationCreateInitFork(rel->rd_node, false);
+	}
+	else
+		RelationDropInitFork(rel->rd_node);
+}
+
+
 static void
 heapam_relation_nontransactional_truncate(Relation rel)
 {
@@ -2540,6 +2561,7 @@ static const TableAmRoutine heapam_methods = {
 	.compute_xid_horizon_for_tuples = heap_compute_xid_horizon_for_tuples,
 
 	.relation_set_new_filenode = heapam_relation_set_new_filenode,
+	.relation_set_persistence = heapam_relation_set_persistence,
 	.relation_nontransactional_truncate = heapam_relation_nontransactional_truncate,
 	.relation_copy_data = heapam_relation_copy_data,
 	.relation_copy_for_cluster = heapam_relation_copy_for_cluster,
diff --git a/src/backend/access/rmgrdesc/smgrdesc.c b/src/backend/access/rmgrdesc/smgrdesc.c
index a7c0cb1bc3..8397002613 100644
--- a/src/backend/access/rmgrdesc/smgrdesc.c
+++ b/src/backend/access/rmgrdesc/smgrdesc.c
@@ -40,6 +40,14 @@ smgr_desc(StringInfo buf, XLogReaderState *record)
 						 xlrec->blkno, xlrec->flags);
 		pfree(path);
 	}
+	else if (info == XLOG_SMGR_UNLINK)
+	{
+		xl_smgr_unlink *xlrec = (xl_smgr_unlink *) rec;
+		char	   *path = relpathperm(xlrec->rnode, xlrec->forkNum);
+
+		appendStringInfoString(buf, path);
+		pfree(path);
+	}
 }
 
 const char *
@@ -55,6 +63,9 @@ smgr_identify(uint8 info)
 		case XLOG_SMGR_TRUNCATE:
 			id = "TRUNCATE";
 			break;
+		case XLOG_SMGR_UNLINK:
+			id = "UNLINK";
+			break;
 	}
 
 	return id;
diff --git a/src/backend/catalog/storage.c b/src/backend/catalog/storage.c
index d538f25726..ac5aea3d38 100644
--- a/src/backend/catalog/storage.c
+++ b/src/backend/catalog/storage.c
@@ -60,6 +60,8 @@ int			wal_skip_threshold = 2048;	/* in kilobytes */
 typedef struct PendingRelDelete
 {
 	RelFileNode relnode;		/* relation that may need to be deleted */
+	bool		deleteinitfork;	/* delete only init fork if true */
+	bool		createinitfork;	/* create init fork if true */
 	BackendId	backend;		/* InvalidBackendId if not a temp rel */
 	bool		atCommit;		/* T=delete at commit; F=delete at abort */
 	int			nestLevel;		/* xact nesting level of request */
@@ -153,6 +155,8 @@ RelationCreateStorage(RelFileNode rnode, char relpersistence)
 	pending = (PendingRelDelete *)
 		MemoryContextAlloc(TopMemoryContext, sizeof(PendingRelDelete));
 	pending->relnode = rnode;
+	pending->deleteinitfork = false;
+	pending->createinitfork = false;
 	pending->backend = backend;
 	pending->atCommit = false;	/* delete if abort */
 	pending->nestLevel = GetCurrentTransactionNestLevel();
@@ -168,6 +172,95 @@ RelationCreateStorage(RelFileNode rnode, char relpersistence)
 	return srel;
 }
 
+/*
+ * RelationCreateInitFork
+ *		Create physical storage for a relation.
+ *
+ * Create the underlying disk file storage for the relation. This only
+ * creates the main fork; additional forks are created lazily by the
+ * modules that need them.
+ *
+ * This function is transactional. The creation is WAL-logged, and if the
+ * transaction aborts later on, the storage will be destroyed.
+ */
+void
+RelationCreateInitFork(RelFileNode rnode, bool isRedo)
+{
+	PendingRelDelete *pending;
+	SMgrRelation srel;
+	PendingRelDelete *prev;
+	PendingRelDelete *next;
+
+	prev = NULL;
+	for (pending = pendingDeletes; pending != NULL; pending = next)
+	{
+		next = pending->next;
+		if (RelFileNodeEquals(rnode, pending->relnode) &&
+			pending->deleteinitfork && pending->atCommit)
+		{
+			/* unlink and delete list entry */
+			if (prev)
+				prev->next = next;
+			else
+				pendingDeletes = next;
+			pfree(pending);
+			return;
+		}
+		else
+		{
+			/* unrelated entry, don't touch it */
+			prev = pending;
+		}
+	}	
+	srel = smgropen(rnode, InvalidBackendId);
+	smgrcreate(srel, INIT_FORKNUM, isRedo);
+	if (!isRedo)
+		log_smgrcreate(&rnode, INIT_FORKNUM);
+	smgrimmedsync(srel, INIT_FORKNUM);
+
+	/* Add the relation to the list of stuff to delete at abort */
+	pending = (PendingRelDelete *)
+		MemoryContextAlloc(TopMemoryContext, sizeof(PendingRelDelete));
+	pending->relnode = rnode;
+	pending->deleteinitfork = true;
+	pending->createinitfork = false;
+	pending->backend = InvalidBackendId;
+	pending->atCommit = false;	/* delete if abort */
+	pending->nestLevel = GetCurrentTransactionNestLevel();
+	pending->next = pendingDeletes;
+	pendingDeletes = pending;
+}
+
+void
+RelationDropInitFork(RelFileNode rnode)
+{
+	PendingRelDelete *pending;
+	PendingRelDelete *next;
+
+	for (pending = pendingDeletes; pending != NULL; pending = next)
+	{
+		next = pending->next;
+		if (RelFileNodeEquals(rnode, pending->relnode) &&
+			pending->deleteinitfork && pending->atCommit)
+		{
+			/* We're done. */
+			return;
+		}
+	}	
+
+	/* Add the relation to the list of stuff to delete at abort */
+	pending = (PendingRelDelete *)
+		MemoryContextAlloc(TopMemoryContext, sizeof(PendingRelDelete));
+	pending->relnode = rnode;
+	pending->deleteinitfork = true;
+	pending->createinitfork = false;
+	pending->backend = InvalidBackendId;
+	pending->atCommit = true;	/* create if abort */
+	pending->nestLevel = GetCurrentTransactionNestLevel();
+	pending->next = pendingDeletes;
+	pendingDeletes = pending;
+}
+
 /*
  * Perform XLogInsert of an XLOG_SMGR_CREATE record to WAL.
  */
@@ -187,6 +280,25 @@ log_smgrcreate(const RelFileNode *rnode, ForkNumber forkNum)
 	XLogInsert(RM_SMGR_ID, XLOG_SMGR_CREATE | XLR_SPECIAL_REL_UPDATE);
 }
 
+/*
+ * Perform XLogInsert of an XLOG_SMGR_UNLINK record to WAL.
+ */
+void
+log_smgrunlink(const RelFileNode *rnode, ForkNumber forkNum)
+{
+	xl_smgr_unlink xlrec;
+
+	/*
+	 * Make an XLOG entry reporting the file unlink.
+	 */
+	xlrec.rnode = *rnode;
+	xlrec.forkNum = forkNum;
+
+	XLogBeginInsert();
+	XLogRegisterData((char *) &xlrec, sizeof(xlrec));
+	XLogInsert(RM_SMGR_ID, XLOG_SMGR_UNLINK | XLR_SPECIAL_REL_UPDATE);
+}
+
 /*
  * RelationDropStorage
  *		Schedule unlinking of physical storage at transaction commit.
@@ -200,6 +312,8 @@ RelationDropStorage(Relation rel)
 	pending = (PendingRelDelete *)
 		MemoryContextAlloc(TopMemoryContext, sizeof(PendingRelDelete));
 	pending->relnode = rel->rd_node;
+	pending->createinitfork = false;
+	pending->deleteinitfork = false;
 	pending->backend = rel->rd_backend;
 	pending->atCommit = true;	/* delete if commit */
 	pending->nestLevel = GetCurrentTransactionNestLevel();
@@ -626,19 +740,27 @@ smgrDoPendingDeletes(bool isCommit)
 
 				srel = smgropen(pending->relnode, pending->backend);
 
-				/* allocate the initial array, or extend it, if needed */
-				if (maxrels == 0)
+				if (pending->deleteinitfork)
 				{
-					maxrels = 8;
-					srels = palloc(sizeof(SMgrRelation) * maxrels);
+					log_smgrunlink(&pending->relnode, INIT_FORKNUM);
+					smgrunlink(srel, INIT_FORKNUM, false);
 				}
-				else if (maxrels <= nrels)
+				else
 				{
-					maxrels *= 2;
-					srels = repalloc(srels, sizeof(SMgrRelation) * maxrels);
-				}
+					/* allocate the initial array, or extend it, if needed */
+					if (maxrels == 0)
+					{
+						maxrels = 8;
+						srels = palloc(sizeof(SMgrRelation) * maxrels);
+					}
+					else if (maxrels <= nrels)
+					{
+						maxrels *= 2;
+						srels = repalloc(srels, sizeof(SMgrRelation) * maxrels);
+					}
 
-				srels[nrels++] = srel;
+					srels[nrels++] = srel;
+				}
 			}
 			/* must explicitly free the list entry */
 			pfree(pending);
@@ -917,6 +1039,14 @@ smgr_redo(XLogReaderState *record)
 		reln = smgropen(xlrec->rnode, InvalidBackendId);
 		smgrcreate(reln, xlrec->forkNum, true);
 	}
+	else if (info == XLOG_SMGR_UNLINK)
+	{
+		xl_smgr_unlink *xlrec = (xl_smgr_unlink *) XLogRecGetData(record);
+		SMgrRelation reln;
+
+		reln = smgropen(xlrec->rnode, InvalidBackendId);
+		smgrunlink(reln, xlrec->forkNum, true);
+	}
 	else if (info == XLOG_SMGR_TRUNCATE)
 	{
 		xl_smgr_truncate *xlrec = (xl_smgr_truncate *) XLogRecGetData(record);
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index e3cfaf8b07..e358174b01 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -4918,6 +4918,137 @@ ATParseTransformCmd(List **wqueue, AlteredTableInfo *tab, Relation rel,
 	return newcmd;
 }
 
+static bool
+try_inplace_persistence_change(AlteredTableInfo *tab, char persistence,
+							   LOCKMODE lockmode)
+{
+	Relation 	rel;
+	Relation	classRel;
+	HeapTuple	tuple,
+				newtuple;
+	Datum		new_val[Natts_pg_class];
+	bool		new_null[Natts_pg_class],
+				new_repl[Natts_pg_class];
+	int			i;
+	List	   *relids;
+	ListCell   *lc_oid;
+
+	Assert(tab->rewrite == AT_REWRITE_ALTER_PERSISTENCE);
+	Assert(lockmode == AccessExclusiveLock);
+
+	/*
+	 * Under the following condition, we need to call ATRewriteTable, which
+	 * cannot be false in the AT_REWRITE_ALTER_PERSISTENCE case.
+	 */
+	Assert(tab->constraints == NULL && tab->partition_constraint == NULL &&
+		   tab->newvals == NULL && !tab->verify_new_notnull);
+
+	/*
+	 * When wal_level is replica or higher we need that the initial state of
+	 * the relation be recoverable from WAL.  When wal_level >= replica
+	 * switching to PERMANENT needs to emit the WAL records to reconstruct the
+	 * current data.  This could be done by writing XLOG_FPI for all pages but
+	 * it is not obvious that that is performant than normal rewriting.
+	 * Otherwise what we need for the relation data is just establishing
+	 * initial state on storage and no need of WAL to reconstruct it.
+	 */
+	if (tab->newrelpersistence == RELPERSISTENCE_PERMANENT && XLogIsNeeded())
+		return false;
+
+	rel = table_open(tab->relid, lockmode);
+
+	Assert(rel->rd_rel->relpersistence != persistence);
+
+	elog(DEBUG1, "perform im-place persistnce change");
+
+	RelationOpenSmgr(rel);
+
+	/* Change persistence then flush-out buffers of the relation */
+
+	/* Get the list of index OIDs for this relation */
+	relids = RelationGetIndexList(rel);
+	relids = lcons_oid(rel->rd_id, relids);
+
+	table_close(rel, lockmode);
+
+	/* Done change on storage. Update catalog including indexes. */
+	/* add the heap oid to the relation ID list */
+
+	classRel = table_open(RelationRelationId, RowExclusiveLock);
+
+	foreach (lc_oid, relids)
+	{
+		Oid reloid = lfirst_oid(lc_oid);
+		Relation r = relation_open(reloid, lockmode);
+
+		RelationOpenSmgr(r);
+
+		if (persistence == RELPERSISTENCE_UNLOGGED)
+		{
+			RelationCreateInitFork(r->rd_node, false);
+
+			if (r->rd_rel->relkind == RELKIND_INDEX ||
+				r->rd_rel->relkind == RELKIND_PARTITIONED_INDEX)
+				r->rd_indam->ambuildempty(r);
+			else
+			{
+				Assert(r->rd_rel->relkind == RELKIND_RELATION ||
+					   r->rd_rel->relkind == RELKIND_MATVIEW ||
+					   r->rd_rel->relkind == RELKIND_TOASTVALUE);
+			}
+		}
+		else
+			RelationDropInitFork(r->rd_node);
+
+		table_close(r, NoLock);
+
+		/*
+		 * This relation is now WAL-logged. Sync all files immediately to
+		 * establish the initial state on storgae.
+		 */
+		if (persistence == RELPERSISTENCE_PERMANENT)
+		{
+			for (i = 0 ; i < MAX_FORKNUM ; i++)
+			{
+				if (smgrexists(r->rd_smgr, i))
+					smgrimmedsync(r->rd_smgr, i);
+			}
+		}
+
+
+		tuple = SearchSysCacheCopy1(RELOID,	ObjectIdGetDatum(reloid));
+		if (!HeapTupleIsValid(tuple))
+			elog(ERROR, "cache lookup failed for relation %u", reloid);
+
+		memset(new_val, 0, sizeof(new_val));
+		memset(new_null, false, sizeof(new_null));
+		memset(new_repl, false, sizeof(new_repl));
+
+		new_val[Anum_pg_class_relpersistence - 1] = CharGetDatum(persistence);
+		new_null[Anum_pg_class_relpersistence - 1] = false;
+		new_repl[Anum_pg_class_relpersistence - 1] = true;
+
+		newtuple = heap_modify_tuple(tuple, RelationGetDescr(classRel),
+									 new_val, new_null, new_repl);
+
+		CatalogTupleUpdate(classRel, &newtuple->t_self, newtuple);
+		heap_freetuple(newtuple);
+	}
+
+	foreach (lc_oid, relids)
+	{
+		Oid reloid = lfirst_oid(lc_oid);
+		Relation r = relation_open(reloid, lockmode);
+
+		RelationOpenSmgr(r);
+		SetRelationBuffersPersistence(r, persistence == RELPERSISTENCE_PERMANENT);
+		table_close(r, NoLock);
+	}
+	table_close(classRel, RowExclusiveLock);
+
+	return true;
+}
+
 /*
  * ATRewriteTables: ALTER TABLE phase 3
  */
@@ -5038,45 +5169,51 @@ ATRewriteTables(AlterTableStmt *parsetree, List **wqueue, LOCKMODE lockmode,
 										 tab->relid,
 										 tab->rewrite);
 
-			/*
-			 * Create transient table that will receive the modified data.
-			 *
-			 * Ensure it is marked correctly as logged or unlogged.  We have
-			 * to do this here so that buffers for the new relfilenode will
-			 * have the right persistence set, and at the same time ensure
-			 * that the original filenode's buffers will get read in with the
-			 * correct setting (i.e. the original one).  Otherwise a rollback
-			 * after the rewrite would possibly result with buffers for the
-			 * original filenode having the wrong persistence setting.
-			 *
-			 * NB: This relies on swap_relation_files() also swapping the
-			 * persistence. That wouldn't work for pg_class, but that can't be
-			 * unlogged anyway.
-			 */
-			OIDNewHeap = make_new_heap(tab->relid, NewTableSpace, persistence,
-									   lockmode);
+			if (tab->rewrite != AT_REWRITE_ALTER_PERSISTENCE ||
+				!try_inplace_persistence_change(tab, persistence, lockmode))
+			{
+				/*
+				 * Create transient table that will receive the modified data.
+				 *
+				 * Ensure it is marked correctly as logged or unlogged.  We
+				 * have to do this here so that buffers for the new relfilenode
+				 * will have the right persistence set, and at the same time
+				 * ensure that the original filenode's buffers will get read in
+				 * with the correct setting (i.e. the original one).  Otherwise
+				 * a rollback after the rewrite would possibly result with
+				 * buffers for the original filenode having the wrong
+				 * persistence setting.
+				 *
+				 * NB: This relies on swap_relation_files() also swapping the
+				 * persistence. That wouldn't work for pg_class, but that can't
+				 * be unlogged anyway.
+				 */
+				OIDNewHeap = make_new_heap(tab->relid, NewTableSpace, persistence,
+										   lockmode);
 
-			/*
-			 * Copy the heap data into the new table with the desired
-			 * modifications, and test the current data within the table
-			 * against new constraints generated by ALTER TABLE commands.
-			 */
-			ATRewriteTable(tab, OIDNewHeap, lockmode);
+				/*
+				 * Copy the heap data into the new table with the desired
+				 * modifications, and test the current data within the table
+				 * against new constraints generated by ALTER TABLE commands.
+				 */
+				ATRewriteTable(tab, OIDNewHeap, lockmode);
 
-			/*
-			 * Swap the physical files of the old and new heaps, then rebuild
-			 * indexes and discard the old heap.  We can use RecentXmin for
-			 * the table's new relfrozenxid because we rewrote all the tuples
-			 * in ATRewriteTable, so no older Xid remains in the table.  Also,
-			 * we never try to swap toast tables by content, since we have no
-			 * interest in letting this code work on system catalogs.
-			 */
-			finish_heap_swap(tab->relid, OIDNewHeap,
-							 false, false, true,
-							 !OidIsValid(tab->newTableSpace),
-							 RecentXmin,
-							 ReadNextMultiXactId(),
-							 persistence);
+				/*
+				 * Swap the physical files of the old and new heaps, then
+				 * rebuild indexes and discard the old heap.  We can use
+				 * RecentXmin for the table's new relfrozenxid because we
+				 * rewrote all the tuples in ATRewriteTable, so no older Xid
+				 * remains in the table.  Also, we never try to swap toast
+				 * tables by content, since we have no interest in letting this
+				 * code work on system catalogs.
+				 */
+				finish_heap_swap(tab->relid, OIDNewHeap,
+								 false, false, true,
+								 !OidIsValid(tab->newTableSpace),
+								 RecentXmin,
+								 ReadNextMultiXactId(),
+								 persistence);
+			}
 		}
 		else
 		{
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index ad0d1a9abc..c71e1a5f92 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -3033,6 +3033,80 @@ DropRelFileNodeBuffers(RelFileNodeBackend rnode, ForkNumber *forkNum,
 	}
 }
 
+/* ---------------------------------------------------------------------
+ *		SetRelFileNodeBuffersPersistence
+ *
+ *		This function changes the persistence of all buffer pages of a relation
+ *		then writes all dirty pages of the relation out to disk when switching
+ *		to PERMANENT. (or more accurately, out to kernel disk buffers),
+ *		ensuring that the kernel has an up-to-date view of the relation.
+ *
+ *		Generally, the caller should be holding AccessExclusiveLock on the
+ *		target relation to ensure that no other backend is busy dirtying
+ *		more blocks of the relation; the effects can't be expected to last
+ *		after the lock is released.
+ *
+ *		XXX currently it sequentially searches the buffer pool, should be
+ *		changed to more clever ways of searching.  This routine is not
+ *		used in any performance-critical code paths, so it's not worth
+ *		adding additional overhead to normal paths to make it go faster;
+ *		but see also DropRelFileNodeBuffers.
+ * --------------------------------------------------------------------
+ */
+void
+SetRelationBuffersPersistence(Relation rel, bool permanent)
+{
+	int			i;
+	RelFileNodeBackend rnode = rel->rd_smgr->smgr_rnode;
+	
+	Assert (!RelFileNodeBackendIsTemp(rnode));
+
+	for (i = 0; i < NBuffers; i++)
+	{
+		BufferDesc *bufHdr = GetBufferDescriptor(i);
+		uint32		buf_state;
+
+		if (!RelFileNodeEquals(bufHdr->tag.rnode, rnode.node))
+			continue;
+
+		ReservePrivateRefCountEntry();
+
+		buf_state = LockBufHdr(bufHdr);
+
+		if (RelFileNodeEquals(bufHdr->tag.rnode, rnode.node))
+		{
+			ereport(LOG, (errmsg ("#%d: %d", i, (buf_state &  BM_PERMANENT) == 0), errhidestmt(true)));
+			if (permanent)
+			{
+				Assert ((buf_state &  BM_PERMANENT) == 0);
+				buf_state |= BM_PERMANENT;
+				pg_atomic_write_u32(&bufHdr->state, buf_state);
+
+				/* we flush this buffer when swithing to PERMANENT */
+				if ((buf_state & (BM_VALID | BM_DIRTY)) ==
+					(BM_VALID | BM_DIRTY))
+				{
+					PinBuffer_Locked(bufHdr);
+					LWLockAcquire(BufferDescriptorGetContentLock(bufHdr),
+								  LW_SHARED);
+					FlushBuffer(bufHdr, rel->rd_smgr);
+					LWLockRelease(BufferDescriptorGetContentLock(bufHdr));
+					UnpinBuffer(bufHdr, true);
+				}
+				else
+					UnlockBufHdr(bufHdr, buf_state);
+			}
+			else
+			{
+				Assert ((buf_state &  BM_PERMANENT) != 0);
+				buf_state &= ~BM_PERMANENT;
+				UnlockBufHdr(bufHdr, buf_state);
+			}
+			ereport(LOG, (errmsg ("#%d: -> %d", i, (buf_state &  BM_PERMANENT) == 0), errhidestmt(true)));
+		}
+	}
+}
+
 /* ---------------------------------------------------------------------
  *		DropRelFileNodesAllBuffers
  *
diff --git a/src/backend/storage/smgr/smgr.c b/src/backend/storage/smgr/smgr.c
index dcc09df0c7..5eb9e97b3d 100644
--- a/src/backend/storage/smgr/smgr.c
+++ b/src/backend/storage/smgr/smgr.c
@@ -645,6 +645,12 @@ smgrimmedsync(SMgrRelation reln, ForkNumber forknum)
 	smgrsw[reln->smgr_which].smgr_immedsync(reln, forknum);
 }
 
+void
+smgrunlink(SMgrRelation reln, ForkNumber forknum, bool isRedo)
+{
+	smgrsw[reln->smgr_which].smgr_unlink(reln->smgr_rnode, forknum, isRedo);
+}
+
 /*
  * AtEOXact_SMgr
  *
diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h
index 387eb34a61..1d19278a18 100644
--- a/src/include/access/tableam.h
+++ b/src/include/access/tableam.h
@@ -451,6 +451,15 @@ typedef struct TableAmRoutine
 											  TransactionId *freezeXid,
 											  MultiXactId *minmulti);
 
+	/*
+	 * This callback needs to switch persistence of the relation between
+	 * RELPERSISTENCE_PERMANENT and RELPERSISTENCE_UNLOGGED. Actual change on
+	 * storage is performed elsewhere.
+	 *
+	 * See also table_relation_set_persistence().
+	 */
+	void		(*relation_set_persistence) (Relation rel, char persistence);
+
 	/*
 	 * This callback needs to remove all contents from `rel`'s current
 	 * relfilenode. No provisions for transactional behaviour need to be made.
@@ -1404,6 +1413,18 @@ table_relation_set_new_filenode(Relation rel,
 											   freezeXid, minmulti);
 }
 
+/*
+ * Switch storage persistence between RELPERSISTENCE_PERMANENT and
+ * RELPERSISTENCE_UNLOGGED.
+ *
+ * This is used during in-place persistence switching 
+ */
+static inline void
+table_relation_set_persistence(Relation rel, char persistence)
+{
+	rel->rd_tableam->relation_set_persistence(rel, persistence);
+}
+
 /*
  * Remove all table contents from `rel`, in a non-transactional manner.
  * Non-transactional meaning that there's no need to support rollbacks. This
diff --git a/src/include/catalog/storage.h b/src/include/catalog/storage.h
index 30c38e0ca6..43d2eb0fb4 100644
--- a/src/include/catalog/storage.h
+++ b/src/include/catalog/storage.h
@@ -23,6 +23,8 @@
 extern int	wal_skip_threshold;
 
 extern SMgrRelation RelationCreateStorage(RelFileNode rnode, char relpersistence);
+extern void RelationCreateInitFork(RelFileNode rel, bool isRedo);
+extern void RelationDropInitFork(RelFileNode rel);
 extern void RelationDropStorage(Relation rel);
 extern void RelationPreserveStorage(RelFileNode rnode, bool atCommit);
 extern void RelationPreTruncate(Relation rel);
diff --git a/src/include/catalog/storage_xlog.h b/src/include/catalog/storage_xlog.h
index 7b21cab2e0..73ad2ae89e 100644
--- a/src/include/catalog/storage_xlog.h
+++ b/src/include/catalog/storage_xlog.h
@@ -29,6 +29,7 @@
 /* XLOG gives us high 4 bits */
 #define XLOG_SMGR_CREATE	0x10
 #define XLOG_SMGR_TRUNCATE	0x20
+#define XLOG_SMGR_UNLINK	0x30
 
 typedef struct xl_smgr_create
 {
@@ -36,6 +37,12 @@ typedef struct xl_smgr_create
 	ForkNumber	forkNum;
 } xl_smgr_create;
 
+typedef struct xl_smgr_unlink
+{
+	RelFileNode rnode;
+	ForkNumber	forkNum;
+} xl_smgr_unlink;
+
 /* flags for xl_smgr_truncate */
 #define SMGR_TRUNCATE_HEAP		0x0001
 #define SMGR_TRUNCATE_VM		0x0002
@@ -51,6 +58,7 @@ typedef struct xl_smgr_truncate
 } xl_smgr_truncate;
 
 extern void log_smgrcreate(const RelFileNode *rnode, ForkNumber forkNum);
+extern void log_smgrunlink(const RelFileNode *rnode, ForkNumber forkNum);
 
 extern void smgr_redo(XLogReaderState *record);
 extern void smgr_desc(StringInfo buf, XLogReaderState *record);
diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h
index ee91b8fa26..f65a273999 100644
--- a/src/include/storage/bufmgr.h
+++ b/src/include/storage/bufmgr.h
@@ -205,6 +205,7 @@ extern void FlushRelationsAllBuffers(struct SMgrRelationData **smgrs, int nrels)
 extern void FlushDatabaseBuffers(Oid dbid);
 extern void DropRelFileNodeBuffers(RelFileNodeBackend rnode, ForkNumber *forkNum,
 								   int nforks, BlockNumber *firstDelBlock);
+extern void SetRelationBuffersPersistence(Relation rnode, bool permanent);
 extern void DropRelFileNodesAllBuffers(RelFileNodeBackend *rnodes, int nnodes);
 extern void DropDatabaseBuffers(Oid dbid);
 
diff --git a/src/include/storage/smgr.h b/src/include/storage/smgr.h
index f28a842401..5d74631006 100644
--- a/src/include/storage/smgr.h
+++ b/src/include/storage/smgr.h
@@ -86,6 +86,7 @@ extern void smgrclose(SMgrRelation reln);
 extern void smgrcloseall(void);
 extern void smgrclosenode(RelFileNodeBackend rnode);
 extern void smgrcreate(SMgrRelation reln, ForkNumber forknum, bool isRedo);
+extern void smgrunlink(SMgrRelation reln, ForkNumber forknum, bool isRedo);
 extern void smgrdosyncall(SMgrRelation *rels, int nrels);
 extern void smgrdounlinkall(SMgrRelation *rels, int nrels, bool isRedo);
 extern void smgrextend(SMgrRelation reln, ForkNumber forknum,

Reply via email to