rebased for cfbot

-- 
Nathan Bossart
Amazon Web Services: https://aws.amazon.com
>From abbd26a3bcfcc828e196187e9f6abf6af64f3393 Mon Sep 17 00:00:00 2001
From: Nathan Bossart <bossa...@amazon.com>
Date: Wed, 5 Jan 2022 19:24:22 +0000
Subject: [PATCH v19 1/4] Introduce custodian.

The custodian process is a new auxiliary process that is intended
to help offload tasks could otherwise delay startup and
checkpointing.  This commit simply adds the new process; it does
not yet do anything useful.
---
 doc/src/sgml/glossary.sgml              |  11 +
 src/backend/postmaster/Makefile         |   1 +
 src/backend/postmaster/auxprocess.c     |   8 +
 src/backend/postmaster/custodian.c      | 377 ++++++++++++++++++++++++
 src/backend/postmaster/meson.build      |   1 +
 src/backend/postmaster/postmaster.c     |  38 ++-
 src/backend/storage/ipc/ipci.c          |   3 +
 src/backend/storage/lmgr/proc.c         |   1 +
 src/backend/utils/activity/wait_event.c |   3 +
 src/backend/utils/init/miscinit.c       |   3 +
 src/include/miscadmin.h                 |   3 +
 src/include/postmaster/custodian.h      |  32 ++
 src/include/storage/proc.h              |  11 +-
 src/include/utils/wait_event.h          |   1 +
 14 files changed, 488 insertions(+), 5 deletions(-)
 create mode 100644 src/backend/postmaster/custodian.c
 create mode 100644 src/include/postmaster/custodian.h

diff --git a/doc/src/sgml/glossary.sgml b/doc/src/sgml/glossary.sgml
index 7c01a541fe..ad3f53e2a3 100644
--- a/doc/src/sgml/glossary.sgml
+++ b/doc/src/sgml/glossary.sgml
@@ -144,6 +144,7 @@
      (but not the autovacuum workers),
      the <glossterm linkend="glossary-background-writer">background writer</glossterm>,
      the <glossterm linkend="glossary-checkpointer">checkpointer</glossterm>,
+     the <glossterm linkend="glossary-custodian">custodian</glossterm>,
      the <glossterm linkend="glossary-logger">logger</glossterm>,
      the <glossterm linkend="glossary-startup-process">startup process</glossterm>,
      the <glossterm linkend="glossary-wal-archiver">WAL archiver</glossterm>,
@@ -484,6 +485,16 @@
    </glossdef>
   </glossentry>
 
+  <glossentry id="glossary-custodian">
+   <glossterm>Custodian (process)</glossterm>
+   <glossdef>
+    <para>
+     An <glossterm linkend="glossary-auxiliary-proc">auxiliary process</glossterm>
+     that is responsible for executing assorted cleanup tasks.
+    </para>
+   </glossdef>
+  </glossentry>
+
   <glossentry>
    <glossterm>Data area</glossterm>
    <glosssee otherterm="glossary-data-directory" />
diff --git a/src/backend/postmaster/Makefile b/src/backend/postmaster/Makefile
index 3a794e54d6..e1e1d1123f 100644
--- a/src/backend/postmaster/Makefile
+++ b/src/backend/postmaster/Makefile
@@ -18,6 +18,7 @@ OBJS = \
 	bgworker.o \
 	bgwriter.o \
 	checkpointer.o \
+	custodian.o \
 	fork_process.o \
 	interrupt.o \
 	pgarch.o \
diff --git a/src/backend/postmaster/auxprocess.c b/src/backend/postmaster/auxprocess.c
index cae6feb356..a1f042f13a 100644
--- a/src/backend/postmaster/auxprocess.c
+++ b/src/backend/postmaster/auxprocess.c
@@ -20,6 +20,7 @@
 #include "pgstat.h"
 #include "postmaster/auxprocess.h"
 #include "postmaster/bgwriter.h"
+#include "postmaster/custodian.h"
 #include "postmaster/startup.h"
 #include "postmaster/walwriter.h"
 #include "replication/walreceiver.h"
@@ -74,6 +75,9 @@ AuxiliaryProcessMain(AuxProcType auxtype)
 		case CheckpointerProcess:
 			MyBackendType = B_CHECKPOINTER;
 			break;
+		case CustodianProcess:
+			MyBackendType = B_CUSTODIAN;
+			break;
 		case WalWriterProcess:
 			MyBackendType = B_WAL_WRITER;
 			break;
@@ -153,6 +157,10 @@ AuxiliaryProcessMain(AuxProcType auxtype)
 			CheckpointerMain();
 			proc_exit(1);
 
+		case CustodianProcess:
+			CustodianMain();
+			proc_exit(1);
+
 		case WalWriterProcess:
 			WalWriterMain();
 			proc_exit(1);
diff --git a/src/backend/postmaster/custodian.c b/src/backend/postmaster/custodian.c
new file mode 100644
index 0000000000..98bb9efcfd
--- /dev/null
+++ b/src/backend/postmaster/custodian.c
@@ -0,0 +1,377 @@
+/*-------------------------------------------------------------------------
+ *
+ * custodian.c
+ *
+ * The custodian process handles a variety of non-critical tasks that might
+ * otherwise delay startup, checkpointing, etc.  Offloaded tasks should not
+ * be synchronous (e.g., checkpointing shouldn't wait for the custodian to
+ * complete a task before proceeding).  However, tasks can be synchronously
+ * executed when necessary (e.g., single-user mode).  The custodian is not
+ * an essential process and can shutdown quickly when requested.  The
+ * custodian only wakes up to perform its tasks when its latch is set.
+ *
+ *
+ * Copyright (c) 2022, PostgreSQL Global Development Group
+ *
+ *
+ * IDENTIFICATION
+ *   src/backend/postmaster/custodian.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "libpq/pqsignal.h"
+#include "pgstat.h"
+#include "postmaster/custodian.h"
+#include "postmaster/interrupt.h"
+#include "storage/bufmgr.h"
+#include "storage/condition_variable.h"
+#include "storage/fd.h"
+#include "storage/proc.h"
+#include "storage/procsignal.h"
+#include "storage/smgr.h"
+#include "utils/memutils.h"
+
+static void DoCustodianTasks(void);
+static CustodianTask CustodianGetNextTask(void);
+static void CustodianEnqueueTask(CustodianTask task);
+static const struct cust_task_funcs_entry *LookupCustodianFunctions(CustodianTask task);
+
+typedef struct
+{
+	slock_t		cust_lck;
+
+	CustodianTask task_queue_elems[NUM_CUSTODIAN_TASKS];
+	int			task_queue_head;
+} CustodianShmemStruct;
+
+static CustodianShmemStruct *CustodianShmem;
+
+typedef void (*CustodianTaskFunction) (void);
+typedef void (*CustodianTaskHandleArg) (Datum arg);
+
+struct cust_task_funcs_entry
+{
+	CustodianTask task;
+	CustodianTaskFunction task_func;		/* performs task */
+	CustodianTaskHandleArg handle_arg_func;	/* handles additional info in request */
+};
+
+/*
+ * Add new tasks here.
+ *
+ * task_func is the logic that will be executed via DoCustodianTasks() when the
+ * matching task is requested via RequestCustodian().  handle_arg_func is an
+ * optional function for providing extra information for the next invocation of
+ * the task.  Typically, the extra information should be stored in shared
+ * memory for access from the custodian process.  handle_arg_func is invoked
+ * before enqueueing the task, and it will still be invoked regardless of
+ * whether the task is already enqueued.
+ */
+static const struct cust_task_funcs_entry cust_task_functions[] = {
+	{INVALID_CUSTODIAN_TASK, NULL, NULL}	/* must be last */
+};
+
+/*
+ * Main entry point for custodian process
+ *
+ * This is invoked from AuxiliaryProcessMain, which has already created the
+ * basic execution environment, but not enabled signals yet.
+ */
+void
+CustodianMain(void)
+{
+	sigjmp_buf	local_sigjmp_buf;
+	MemoryContext custodian_context;
+
+	/*
+	 * Properly accept or ignore signals that might be sent to us.
+	 */
+	pqsignal(SIGHUP, SignalHandlerForConfigReload);
+	pqsignal(SIGINT, SignalHandlerForShutdownRequest);
+	pqsignal(SIGTERM, SignalHandlerForShutdownRequest);
+	/* SIGQUIT handler was already set up by InitPostmasterChild */
+	pqsignal(SIGALRM, SIG_IGN);
+	pqsignal(SIGPIPE, SIG_IGN);
+	pqsignal(SIGUSR1, procsignal_sigusr1_handler);
+	pqsignal(SIGUSR2, SIG_IGN);
+
+	/*
+	 * Reset some signals that are accepted by postmaster but not here
+	 */
+	pqsignal(SIGCHLD, SIG_DFL);
+
+	/*
+	 * Create a memory context that we will do all our work in.  We do this so
+	 * that we can reset the context during error recovery and thereby avoid
+	 * possible memory leaks.
+	 */
+	custodian_context = AllocSetContextCreate(TopMemoryContext,
+											  "Custodian",
+											  ALLOCSET_DEFAULT_SIZES);
+	MemoryContextSwitchTo(custodian_context);
+
+	/*
+	 * If an exception is encountered, processing resumes here.  As with other
+	 * auxiliary processes, we cannot use PG_TRY because this is the bottom of
+	 * the exception stack.
+	 */
+	if (sigsetjmp(local_sigjmp_buf, 1) != 0)
+	{
+		/* Since not using PG_TRY, must reset error stack by hand */
+		error_context_stack = NULL;
+
+		/* Prevent interrupts while cleaning up */
+		HOLD_INTERRUPTS();
+
+		/* Report the error to the server log */
+		EmitErrorReport();
+
+		/*
+		 * These operations are really just a minimal subset of
+		 * AbortTransaction().  We don't have very many resources to worry
+		 * about.
+		 */
+		LWLockReleaseAll();
+		ConditionVariableCancelSleep();
+		ReleaseAuxProcessResources(false);
+		AtEOXact_Files(false);
+
+		/*
+		 * Now return to normal top-level context and clear ErrorContext for
+		 * next time.
+		 */
+		MemoryContextSwitchTo(custodian_context);
+		FlushErrorState();
+
+		/* Flush any leaked data in the top-level context */
+		MemoryContextResetAndDeleteChildren(custodian_context);
+
+		/* Now we can allow interrupts again */
+		RESUME_INTERRUPTS();
+
+		/*
+		 * Sleep at least 1 second after any error.  A write error is likely
+		 * to be repeated, and we don't want to be filling the error logs as
+		 * fast as we can.
+		 */
+		pg_usleep(1000000L);
+
+		/*
+		 * Close all open files after any error.  This is helpful on Windows,
+		 * where holding deleted files open causes various strange errors.
+		 * It's not clear we need it elsewhere, but shouldn't hurt.
+		 */
+		smgrcloseall();
+
+		/* Report wait end here, when there is no further possibility of wait */
+		pgstat_report_wait_end();
+	}
+
+	/* We can now handle ereport(ERROR) */
+	PG_exception_stack = &local_sigjmp_buf;
+
+	/*
+	 * Unblock signals (they were blocked when the postmaster forked us)
+	 */
+	sigprocmask(SIG_SETMASK, &UnBlockSig, NULL);
+
+	/*
+	 * Advertise our latch that backends can use to wake us up while we're
+	 * sleeping.
+	 */
+	ProcGlobal->custodianLatch = &MyProc->procLatch;
+
+	/*
+	 * Loop forever
+	 */
+	for (;;)
+	{
+		/* Clear any already-pending wakeups */
+		ResetLatch(MyLatch);
+
+		HandleMainLoopInterrupts();
+
+		DoCustodianTasks();
+
+		(void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0,
+						 WAIT_EVENT_CUSTODIAN_MAIN);
+	}
+
+	pg_unreachable();
+}
+
+/*
+ * DoCustodianTasks
+ *		Perform requested custodian tasks
+ *
+ * If we are not in a standalone backend, the custodian will re-enqueue the
+ * currently running task if an exception is encountered.
+ */
+static void
+DoCustodianTasks(void)
+{
+	CustodianTask	task;
+
+	while ((task = CustodianGetNextTask()) != INVALID_CUSTODIAN_TASK)
+	{
+		CustodianTaskFunction func = (LookupCustodianFunctions(task))->task_func;
+
+		PG_TRY();
+		{
+			(*func) ();
+		}
+		PG_CATCH();
+		{
+			if (IsPostmasterEnvironment)
+				CustodianEnqueueTask(task);
+
+			PG_RE_THROW();
+		}
+		PG_END_TRY();
+	}
+}
+
+Size
+CustodianShmemSize(void)
+{
+	return sizeof(CustodianShmemStruct);
+}
+
+void
+CustodianShmemInit(void)
+{
+	Size		size = CustodianShmemSize();
+	bool		found;
+
+	CustodianShmem = (CustodianShmemStruct *)
+		ShmemInitStruct("Custodian Data", size, &found);
+
+	if (!found)
+	{
+		memset(CustodianShmem, 0, size);
+		SpinLockInit(&CustodianShmem->cust_lck);
+		for (int i = 0; i < NUM_CUSTODIAN_TASKS; i++)
+			CustodianShmem->task_queue_elems[i] = INVALID_CUSTODIAN_TASK;
+	}
+}
+
+/*
+ * RequestCustodian
+ *		Called to request a custodian task.
+ *
+ * In standalone backends, the task is performed immediately in the current
+ * process, and this function will not return until it completes.  Otherwise,
+ * the task is added to the custodian's queue if it is not already enqueued,
+ * and this function returns without waiting for the task to complete.
+ *
+ * arg can be used to provide additional information to the custodian that is
+ * necessary for the task.  Typically, the handling function should store this
+ * information in shared memory for later use by the custodian.  Note that the
+ * task's handling function for arg is invoked before enqueueing the task, and
+ * it will still be invoked regardless of whether the task is already enqueued.
+ */
+void
+RequestCustodian(CustodianTask requested, Datum arg)
+{
+	CustodianTaskHandleArg arg_func = (LookupCustodianFunctions(requested))->handle_arg_func;
+
+	/* First process any extra information provided in the request. */
+	if (arg_func)
+		(*arg_func) (arg);
+
+	CustodianEnqueueTask(requested);
+
+	if (!IsPostmasterEnvironment)
+		DoCustodianTasks();
+	else if (ProcGlobal->custodianLatch)
+		SetLatch(ProcGlobal->custodianLatch);
+}
+
+/*
+ * CustodianEnqueueTask
+ *		Add a task to the custodian's queue
+ *
+ * If the task is already in the queue, this function has no effect.
+ */
+static void
+CustodianEnqueueTask(CustodianTask task)
+{
+	Assert(task >= 0 && task < NUM_CUSTODIAN_TASKS);
+
+	SpinLockAcquire(&CustodianShmem->cust_lck);
+
+	for (int i = 0; i < NUM_CUSTODIAN_TASKS; i++)
+	{
+		int idx = (CustodianShmem->task_queue_head + i) % NUM_CUSTODIAN_TASKS;
+		CustodianTask *elem = &CustodianShmem->task_queue_elems[idx];
+
+		/*
+		 * If the task is already queued in this slot or the slot is empty,
+		 * enqueue the task here and return.
+		 */
+		if (*elem == INVALID_CUSTODIAN_TASK || *elem == task)
+		{
+			*elem = task;
+			SpinLockRelease(&CustodianShmem->cust_lck);
+			return;
+		}
+	}
+
+	/* We should never run out of space in the queue. */
+	elog(ERROR, "could not enqueue custodian task %d", task);
+	pg_unreachable();
+}
+
+/*
+ * CustodianGetNextTask
+ *		Retrieve the next task that the custodian should execute
+ *
+ * The returned task is dequeued from the custodian's queue.  If no tasks are
+ * queued, INVALID_CUSTODIAN_TASK is returned.
+ */
+static CustodianTask
+CustodianGetNextTask(void)
+{
+	CustodianTask next_task;
+	CustodianTask *elem;
+
+	SpinLockAcquire(&CustodianShmem->cust_lck);
+
+	elem = &CustodianShmem->task_queue_elems[CustodianShmem->task_queue_head];
+
+	next_task = *elem;
+	*elem = INVALID_CUSTODIAN_TASK;
+
+	CustodianShmem->task_queue_head++;
+	CustodianShmem->task_queue_head %= NUM_CUSTODIAN_TASKS;
+
+	SpinLockRelease(&CustodianShmem->cust_lck);
+
+	return next_task;
+}
+
+/*
+ * LookupCustodianFunctions
+ *		Given a custodian task, look up its function pointers.
+ */
+static const struct cust_task_funcs_entry *
+LookupCustodianFunctions(CustodianTask task)
+{
+	const struct cust_task_funcs_entry *entry;
+
+	Assert(task >= 0 && task < NUM_CUSTODIAN_TASKS);
+
+	for (entry = cust_task_functions;
+		 entry && entry->task != INVALID_CUSTODIAN_TASK;
+		 entry++)
+	{
+		if (entry->task == task)
+			return entry;
+	}
+
+	/* All tasks must have an entry. */
+	elog(ERROR, "could not lookup functions for custodian task %d", task);
+	pg_unreachable();
+}
diff --git a/src/backend/postmaster/meson.build b/src/backend/postmaster/meson.build
index 9079922de7..63f2abe3a1 100644
--- a/src/backend/postmaster/meson.build
+++ b/src/backend/postmaster/meson.build
@@ -6,6 +6,7 @@ backend_sources += files(
   'bgworker.c',
   'bgwriter.c',
   'checkpointer.c',
+  'custodian.c',
   'fork_process.c',
   'interrupt.c',
   'pgarch.c',
diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index 2552327d90..e3aef4081e 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -249,6 +249,7 @@ bool		send_abort_for_kill = false;
 static pid_t StartupPID = 0,
 			BgWriterPID = 0,
 			CheckpointerPID = 0,
+			CustodianPID = 0,
 			WalWriterPID = 0,
 			WalReceiverPID = 0,
 			AutoVacPID = 0,
@@ -560,6 +561,7 @@ static void ShmemBackendArrayRemove(Backend *bn);
 #define StartArchiver()			StartChildProcess(ArchiverProcess)
 #define StartBackgroundWriter() StartChildProcess(BgWriterProcess)
 #define StartCheckpointer()		StartChildProcess(CheckpointerProcess)
+#define StartCustodian()		StartChildProcess(CustodianProcess)
 #define StartWalWriter()		StartChildProcess(WalWriterProcess)
 #define StartWalReceiver()		StartChildProcess(WalReceiverProcess)
 
@@ -1795,13 +1797,16 @@ ServerLoop(void)
 		/*
 		 * If no background writer process is running, and we are not in a
 		 * state that prevents it, start one.  It doesn't matter if this
-		 * fails, we'll just try again later.  Likewise for the checkpointer.
+		 * fails, we'll just try again later.  Likewise for the checkpointer
+		 * and custodian.
 		 */
 		if (pmState == PM_RUN || pmState == PM_RECOVERY ||
 			pmState == PM_HOT_STANDBY || pmState == PM_STARTUP)
 		{
 			if (CheckpointerPID == 0)
 				CheckpointerPID = StartCheckpointer();
+			if (CustodianPID == 0)
+				CustodianPID = StartCustodian();
 			if (BgWriterPID == 0)
 				BgWriterPID = StartBackgroundWriter();
 		}
@@ -2732,6 +2737,8 @@ process_pm_reload_request(void)
 			signal_child(BgWriterPID, SIGHUP);
 		if (CheckpointerPID != 0)
 			signal_child(CheckpointerPID, SIGHUP);
+		if (CustodianPID != 0)
+			signal_child(CustodianPID, SIGHUP);
 		if (WalWriterPID != 0)
 			signal_child(WalWriterPID, SIGHUP);
 		if (WalReceiverPID != 0)
@@ -3085,6 +3092,8 @@ process_pm_child_exit(void)
 			 */
 			if (CheckpointerPID == 0)
 				CheckpointerPID = StartCheckpointer();
+			if (CustodianPID == 0)
+				CustodianPID = StartCustodian();
 			if (BgWriterPID == 0)
 				BgWriterPID = StartBackgroundWriter();
 			if (WalWriterPID == 0)
@@ -3178,6 +3187,20 @@ process_pm_child_exit(void)
 			continue;
 		}
 
+		/*
+		 * Was it the custodian?  Normal exit can be ignored; we'll start a
+		 * new one at the next iteration of the postmaster's main loop, if
+		 * necessary.  Any other exit condition is treated as a crash.
+		 */
+		if (pid == CustodianPID)
+		{
+			CustodianPID = 0;
+			if (!EXIT_STATUS_0(exitstatus))
+				HandleChildCrash(pid, exitstatus,
+								 _("custodian process"));
+			continue;
+		}
+
 		/*
 		 * Was it the wal writer?  Normal exit can be ignored; we'll start a
 		 * new one at the next iteration of the postmaster's main loop, if
@@ -3590,6 +3613,12 @@ HandleChildCrash(int pid, int exitstatus, const char *procname)
 	else if (CheckpointerPID != 0 && take_action)
 		sigquit_child(CheckpointerPID);
 
+	/* Take care of the custodian too */
+	if (pid == CustodianPID)
+		CustodianPID = 0;
+	else if (CustodianPID != 0 && take_action)
+		sigquit_child(CustodianPID);
+
 	/* Take care of the walwriter too */
 	if (pid == WalWriterPID)
 		WalWriterPID = 0;
@@ -3744,6 +3773,9 @@ PostmasterStateMachine(void)
 		/* and the bgwriter too */
 		if (BgWriterPID != 0)
 			signal_child(BgWriterPID, SIGTERM);
+		/* and the custodian too */
+		if (CustodianPID != 0)
+			signal_child(CustodianPID, SIGTERM);
 		/* and the walwriter too */
 		if (WalWriterPID != 0)
 			signal_child(WalWriterPID, SIGTERM);
@@ -3781,6 +3813,7 @@ PostmasterStateMachine(void)
 			BgWriterPID == 0 &&
 			(CheckpointerPID == 0 ||
 			 (!FatalError && Shutdown < ImmediateShutdown)) &&
+			CustodianPID == 0 &&
 			WalWriterPID == 0 &&
 			AutoVacPID == 0)
 		{
@@ -3877,6 +3910,7 @@ PostmasterStateMachine(void)
 			Assert(WalReceiverPID == 0);
 			Assert(BgWriterPID == 0);
 			Assert(CheckpointerPID == 0);
+			Assert(CustodianPID == 0);
 			Assert(WalWriterPID == 0);
 			Assert(AutoVacPID == 0);
 			/* syslogger is not considered here */
@@ -4092,6 +4126,8 @@ TerminateChildren(int signal)
 		signal_child(BgWriterPID, signal);
 	if (CheckpointerPID != 0)
 		signal_child(CheckpointerPID, signal);
+	if (CustodianPID != 0)
+		signal_child(CustodianPID, signal);
 	if (WalWriterPID != 0)
 		signal_child(WalWriterPID, signal);
 	if (WalReceiverPID != 0)
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index 8f1ded7338..4268a941ad 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -30,6 +30,7 @@
 #include "postmaster/autovacuum.h"
 #include "postmaster/bgworker_internals.h"
 #include "postmaster/bgwriter.h"
+#include "postmaster/custodian.h"
 #include "postmaster/postmaster.h"
 #include "replication/logicallauncher.h"
 #include "replication/origin.h"
@@ -130,6 +131,7 @@ CalculateShmemSize(int *num_semaphores)
 	size = add_size(size, PMSignalShmemSize());
 	size = add_size(size, ProcSignalShmemSize());
 	size = add_size(size, CheckpointerShmemSize());
+	size = add_size(size, CustodianShmemSize());
 	size = add_size(size, AutoVacuumShmemSize());
 	size = add_size(size, ReplicationSlotsShmemSize());
 	size = add_size(size, ReplicationOriginShmemSize());
@@ -278,6 +280,7 @@ CreateSharedMemoryAndSemaphores(void)
 	PMSignalShmemInit();
 	ProcSignalShmemInit();
 	CheckpointerShmemInit();
+	CustodianShmemInit();
 	AutoVacuumShmemInit();
 	ReplicationSlotsShmemInit();
 	ReplicationOriginShmemInit();
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index 22b4278610..40a83636fe 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -178,6 +178,7 @@ InitProcGlobal(void)
 	ProcGlobal->startupBufferPinWaitBufId = -1;
 	ProcGlobal->walwriterLatch = NULL;
 	ProcGlobal->checkpointerLatch = NULL;
+	ProcGlobal->custodianLatch = NULL;
 	pg_atomic_init_u32(&ProcGlobal->procArrayGroupFirst, INVALID_PGPROCNO);
 	pg_atomic_init_u32(&ProcGlobal->clogGroupFirst, INVALID_PGPROCNO);
 
diff --git a/src/backend/utils/activity/wait_event.c b/src/backend/utils/activity/wait_event.c
index 6e4599278c..9348b441ba 100644
--- a/src/backend/utils/activity/wait_event.c
+++ b/src/backend/utils/activity/wait_event.c
@@ -224,6 +224,9 @@ pgstat_get_wait_activity(WaitEventActivity w)
 		case WAIT_EVENT_CHECKPOINTER_MAIN:
 			event_name = "CheckpointerMain";
 			break;
+		case WAIT_EVENT_CUSTODIAN_MAIN:
+			event_name = "CustodianMain";
+			break;
 		case WAIT_EVENT_LOGICAL_APPLY_MAIN:
 			event_name = "LogicalApplyMain";
 			break;
diff --git a/src/backend/utils/init/miscinit.c b/src/backend/utils/init/miscinit.c
index 59532bbd80..25c2ba97b8 100644
--- a/src/backend/utils/init/miscinit.c
+++ b/src/backend/utils/init/miscinit.c
@@ -283,6 +283,9 @@ GetBackendTypeDesc(BackendType backendType)
 		case B_CHECKPOINTER:
 			backendDesc = "checkpointer";
 			break;
+		case B_CUSTODIAN:
+			backendDesc = "custodian";
+			break;
 		case B_LOGGER:
 			backendDesc = "logger";
 			break;
diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h
index 96b3a1e1a0..738e7f8fb0 100644
--- a/src/include/miscadmin.h
+++ b/src/include/miscadmin.h
@@ -324,6 +324,7 @@ typedef enum BackendType
 	B_BG_WORKER,
 	B_BG_WRITER,
 	B_CHECKPOINTER,
+	B_CUSTODIAN,
 	B_LOGGER,
 	B_STANDALONE_BACKEND,
 	B_STARTUP,
@@ -430,6 +431,7 @@ typedef enum
 	BgWriterProcess,
 	ArchiverProcess,
 	CheckpointerProcess,
+	CustodianProcess,
 	WalWriterProcess,
 	WalReceiverProcess,
 
@@ -442,6 +444,7 @@ extern PGDLLIMPORT AuxProcType MyAuxProcType;
 #define AmBackgroundWriterProcess() (MyAuxProcType == BgWriterProcess)
 #define AmArchiverProcess()			(MyAuxProcType == ArchiverProcess)
 #define AmCheckpointerProcess()		(MyAuxProcType == CheckpointerProcess)
+#define AmCustodianProcess()		(MyAuxProcType == CustodianProcess)
 #define AmWalWriterProcess()		(MyAuxProcType == WalWriterProcess)
 #define AmWalReceiverProcess()		(MyAuxProcType == WalReceiverProcess)
 
diff --git a/src/include/postmaster/custodian.h b/src/include/postmaster/custodian.h
new file mode 100644
index 0000000000..73d0bc5f02
--- /dev/null
+++ b/src/include/postmaster/custodian.h
@@ -0,0 +1,32 @@
+/*-------------------------------------------------------------------------
+ *
+ * custodian.h
+ *   Exports from postmaster/custodian.c.
+ *
+ * Copyright (c) 2022, PostgreSQL Global Development Group
+ *
+ * src/include/postmaster/custodian.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef _CUSTODIAN_H
+#define _CUSTODIAN_H
+
+/*
+ * If you add a new task here, be sure to add its corresponding function
+ * pointers to cust_task_functions in custodian.c.
+ */
+typedef enum CustodianTask
+{
+	FAKE_TASK,						/* placeholder until we have a real task */
+
+	NUM_CUSTODIAN_TASKS,			/* new tasks go above */
+	INVALID_CUSTODIAN_TASK
+} CustodianTask;
+
+extern void CustodianMain(void) pg_attribute_noreturn();
+extern Size CustodianShmemSize(void);
+extern void CustodianShmemInit(void);
+extern void RequestCustodian(CustodianTask task, Datum arg);
+
+#endif						/* _CUSTODIAN_H */
diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h
index 4258cd92c9..25e00a14ff 100644
--- a/src/include/storage/proc.h
+++ b/src/include/storage/proc.h
@@ -400,6 +400,8 @@ typedef struct PROC_HDR
 	Latch	   *walwriterLatch;
 	/* Checkpointer process's latch */
 	Latch	   *checkpointerLatch;
+	/* Custodian process's latch */
+	Latch	   *custodianLatch;
 	/* Current shared estimate of appropriate spins_per_delay value */
 	int			spins_per_delay;
 	/* Buffer id of the buffer that Startup process waits for pin on, or -1 */
@@ -417,11 +419,12 @@ extern PGDLLIMPORT PGPROC *PreparedXactProcs;
  * We set aside some extra PGPROC structures for auxiliary processes,
  * ie things that aren't full-fledged backends but need shmem access.
  *
- * Background writer, checkpointer, WAL writer and archiver run during normal
- * operation.  Startup process and WAL receiver also consume 2 slots, but WAL
- * writer is launched only after startup has exited, so we only need 5 slots.
+ * Background writer, checkpointer, custodian, WAL writer and archiver run
+ * during normal operation.  Startup process and WAL receiver also consume 2
+ * slots, but WAL writer is launched only after startup has exited, so we only
+ * need 6 slots.
  */
-#define NUM_AUXILIARY_PROCS		5
+#define NUM_AUXILIARY_PROCS		6
 
 /* configurable options */
 extern PGDLLIMPORT int DeadlockTimeout;
diff --git a/src/include/utils/wait_event.h b/src/include/utils/wait_event.h
index 6cacd6edaf..ea8ba623e9 100644
--- a/src/include/utils/wait_event.h
+++ b/src/include/utils/wait_event.h
@@ -40,6 +40,7 @@ typedef enum
 	WAIT_EVENT_BGWRITER_HIBERNATE,
 	WAIT_EVENT_BGWRITER_MAIN,
 	WAIT_EVENT_CHECKPOINTER_MAIN,
+	WAIT_EVENT_CUSTODIAN_MAIN,
 	WAIT_EVENT_LOGICAL_APPLY_MAIN,
 	WAIT_EVENT_LOGICAL_LAUNCHER_MAIN,
 	WAIT_EVENT_LOGICAL_PARALLEL_APPLY_MAIN,
-- 
2.25.1

>From c9e0527d39f697408e01bc0ed1730e5f7eeeffd5 Mon Sep 17 00:00:00 2001
From: Nathan Bossart <bossa...@amazon.com>
Date: Sun, 5 Dec 2021 22:02:40 -0800
Subject: [PATCH v19 2/4] Move removal of old serialized snapshots to
 custodian.

This was only done during checkpoints because it was a convenient
place to put it.  However, if there are many snapshots to remove,
it can significantly extend checkpoint time.  To avoid this, move
this work to the newly-introduced custodian process.
---
 contrib/test_decoding/expected/rewrite.out  | 21 +++++++++++++++++++++
 contrib/test_decoding/sql/rewrite.sql       | 17 +++++++++++++++++
 src/backend/access/transam/xlog.c           |  6 ++++--
 src/backend/postmaster/custodian.c          |  2 ++
 src/backend/replication/logical/snapbuild.c |  9 ++++-----
 src/include/postmaster/custodian.h          |  2 +-
 src/include/replication/snapbuild.h         |  2 +-
 7 files changed, 50 insertions(+), 9 deletions(-)

diff --git a/contrib/test_decoding/expected/rewrite.out b/contrib/test_decoding/expected/rewrite.out
index b30999c436..8b97f15f6f 100644
--- a/contrib/test_decoding/expected/rewrite.out
+++ b/contrib/test_decoding/expected/rewrite.out
@@ -162,3 +162,24 @@ DROP TABLE IF EXISTS replication_example;
 DROP FUNCTION iamalongfunction();
 DROP FUNCTION exec(text);
 DROP ROLE regress_justforcomments;
+-- make sure custodian cleans up files
+CHECKPOINT;
+DO $$
+DECLARE
+    snaps_removed bool;
+    loops int := 0;
+BEGIN
+    LOOP
+        snaps_removed := count(*) = 0 FROM pg_ls_logicalsnapdir();
+        IF snaps_removed OR loops > 120 * 100 THEN EXIT; END IF;
+        PERFORM pg_sleep(0.01);
+        loops := loops + 1;
+    END LOOP;
+END
+$$;
+SELECT count(*) = 0 FROM pg_ls_logicalsnapdir();
+ ?column? 
+----------
+ t
+(1 row)
+
diff --git a/contrib/test_decoding/sql/rewrite.sql b/contrib/test_decoding/sql/rewrite.sql
index 62dead3a9b..d268fa559a 100644
--- a/contrib/test_decoding/sql/rewrite.sql
+++ b/contrib/test_decoding/sql/rewrite.sql
@@ -105,3 +105,20 @@ DROP TABLE IF EXISTS replication_example;
 DROP FUNCTION iamalongfunction();
 DROP FUNCTION exec(text);
 DROP ROLE regress_justforcomments;
+
+-- make sure custodian cleans up files
+CHECKPOINT;
+DO $$
+DECLARE
+    snaps_removed bool;
+    loops int := 0;
+BEGIN
+    LOOP
+        snaps_removed := count(*) = 0 FROM pg_ls_logicalsnapdir();
+        IF snaps_removed OR loops > 120 * 100 THEN EXIT; END IF;
+        PERFORM pg_sleep(0.01);
+        loops := loops + 1;
+    END LOOP;
+END
+$$;
+SELECT count(*) = 0 FROM pg_ls_logicalsnapdir();
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index fb4c860bde..382e59f723 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -76,12 +76,12 @@
 #include "port/atomics.h"
 #include "port/pg_iovec.h"
 #include "postmaster/bgwriter.h"
+#include "postmaster/custodian.h"
 #include "postmaster/startup.h"
 #include "postmaster/walwriter.h"
 #include "replication/logical.h"
 #include "replication/origin.h"
 #include "replication/slot.h"
-#include "replication/snapbuild.h"
 #include "replication/walreceiver.h"
 #include "replication/walsender.h"
 #include "storage/bufmgr.h"
@@ -6997,10 +6997,12 @@ CheckPointGuts(XLogRecPtr checkPointRedo, int flags)
 {
 	CheckPointRelationMap();
 	CheckPointReplicationSlots();
-	CheckPointSnapBuild();
 	CheckPointLogicalRewriteHeap();
 	CheckPointReplicationOrigin();
 
+	/* tasks offloaded to custodian */
+	RequestCustodian(CUSTODIAN_REMOVE_SERIALIZED_SNAPSHOTS, (Datum) 0);
+
 	/* Write out all dirty data in SLRUs and the main buffer pool */
 	TRACE_POSTGRESQL_BUFFER_CHECKPOINT_START(flags);
 	CheckpointStats.ckpt_write_t = GetCurrentTimestamp();
diff --git a/src/backend/postmaster/custodian.c b/src/backend/postmaster/custodian.c
index 98bb9efcfd..4e0ce1f7b3 100644
--- a/src/backend/postmaster/custodian.c
+++ b/src/backend/postmaster/custodian.c
@@ -25,6 +25,7 @@
 #include "pgstat.h"
 #include "postmaster/custodian.h"
 #include "postmaster/interrupt.h"
+#include "replication/snapbuild.h"
 #include "storage/bufmgr.h"
 #include "storage/condition_variable.h"
 #include "storage/fd.h"
@@ -70,6 +71,7 @@ struct cust_task_funcs_entry
  * whether the task is already enqueued.
  */
 static const struct cust_task_funcs_entry cust_task_functions[] = {
+	{CUSTODIAN_REMOVE_SERIALIZED_SNAPSHOTS, RemoveOldSerializedSnapshots, NULL},
 	{INVALID_CUSTODIAN_TASK, NULL, NULL}	/* must be last */
 };
 
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index 829c568112..6b403a2bb4 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -2036,14 +2036,13 @@ SnapBuildRestoreContents(int fd, char *dest, Size size, const char *path)
 
 /*
  * Remove all serialized snapshots that are not required anymore because no
- * slot can need them. This doesn't actually have to run during a checkpoint,
- * but it's a convenient point to schedule this.
+ * slot can need them.
  *
- * NB: We run this during checkpoints even if logical decoding is disabled so
- * we cleanup old slots at some point after it got disabled.
+ * NB: We run this even if logical decoding is disabled so we cleanup old slots
+ * at some point after it got disabled.
  */
 void
-CheckPointSnapBuild(void)
+RemoveOldSerializedSnapshots(void)
 {
 	XLogRecPtr	cutoff;
 	XLogRecPtr	redo;
diff --git a/src/include/postmaster/custodian.h b/src/include/postmaster/custodian.h
index 73d0bc5f02..ab6d4283b9 100644
--- a/src/include/postmaster/custodian.h
+++ b/src/include/postmaster/custodian.h
@@ -18,7 +18,7 @@
  */
 typedef enum CustodianTask
 {
-	FAKE_TASK,						/* placeholder until we have a real task */
+	CUSTODIAN_REMOVE_SERIALIZED_SNAPSHOTS,
 
 	NUM_CUSTODIAN_TASKS,			/* new tasks go above */
 	INVALID_CUSTODIAN_TASK
diff --git a/src/include/replication/snapbuild.h b/src/include/replication/snapbuild.h
index f49b941b53..5f1ba3842c 100644
--- a/src/include/replication/snapbuild.h
+++ b/src/include/replication/snapbuild.h
@@ -57,7 +57,7 @@ struct ReorderBuffer;
 struct xl_heap_new_cid;
 struct xl_running_xacts;
 
-extern void CheckPointSnapBuild(void);
+extern void RemoveOldSerializedSnapshots(void);
 
 extern SnapBuild *AllocateSnapshotBuilder(struct ReorderBuffer *reorder,
 										  TransactionId xmin_horizon, XLogRecPtr start_lsn,
-- 
2.25.1

>From 493d858dc4acfc80d201df6d9f7bb45d83881e10 Mon Sep 17 00:00:00 2001
From: Nathan Bossart <bossa...@amazon.com>
Date: Sun, 12 Dec 2021 22:07:11 -0800
Subject: [PATCH v19 3/4] Move removal of old logical rewrite mapping files to
 custodian.

If there are many such files to remove, checkpoints can take much
longer.  To avoid this, move this work to the newly-introduced
custodian process.

Since the mapping files include 32-bit transaction IDs, there is a
risk of wraparound if the files are not cleaned up fast enough.
Removing these files in checkpoints offered decent wraparound
protection simply due to the relatively high frequency of
checkpointing.  With this change, servers should still clean up
mappings files with decently high frequency, but in theory the
wraparound risk might worsen for some (e.g., if the custodian is
spending a lot of time on a different task).  Given this is an
existing problem, this change makes no effort to handle the
wraparound risk, and it is left as a future exercise.
---
 contrib/test_decoding/expected/rewrite.out | 19 ++++++
 contrib/test_decoding/sql/rewrite.sql      | 14 ++++
 src/backend/access/heap/rewriteheap.c      | 78 +++++++++++++++++++---
 src/backend/postmaster/custodian.c         | 43 ++++++++++++
 src/include/access/rewriteheap.h           |  1 +
 src/include/postmaster/custodian.h         |  4 ++
 6 files changed, 149 insertions(+), 10 deletions(-)

diff --git a/contrib/test_decoding/expected/rewrite.out b/contrib/test_decoding/expected/rewrite.out
index 8b97f15f6f..214a514a0a 100644
--- a/contrib/test_decoding/expected/rewrite.out
+++ b/contrib/test_decoding/expected/rewrite.out
@@ -183,3 +183,22 @@ SELECT count(*) = 0 FROM pg_ls_logicalsnapdir();
  t
 (1 row)
 
+DO $$
+DECLARE
+	mappings_removed bool;
+	loops int := 0;
+BEGIN
+	LOOP
+		mappings_removed := count(*) = 0 FROM pg_ls_logicalmapdir();
+		IF mappings_removed OR loops > 120 * 100 THEN EXIT; END IF;
+		PERFORM pg_sleep(0.01);
+		loops := loops + 1;
+	END LOOP;
+END
+$$;
+SELECT count(*) = 0 FROM pg_ls_logicalmapdir();
+ ?column? 
+----------
+ t
+(1 row)
+
diff --git a/contrib/test_decoding/sql/rewrite.sql b/contrib/test_decoding/sql/rewrite.sql
index d268fa559a..d66f70f837 100644
--- a/contrib/test_decoding/sql/rewrite.sql
+++ b/contrib/test_decoding/sql/rewrite.sql
@@ -122,3 +122,17 @@ BEGIN
 END
 $$;
 SELECT count(*) = 0 FROM pg_ls_logicalsnapdir();
+DO $$
+DECLARE
+	mappings_removed bool;
+	loops int := 0;
+BEGIN
+	LOOP
+		mappings_removed := count(*) = 0 FROM pg_ls_logicalmapdir();
+		IF mappings_removed OR loops > 120 * 100 THEN EXIT; END IF;
+		PERFORM pg_sleep(0.01);
+		loops := loops + 1;
+	END LOOP;
+END
+$$;
+SELECT count(*) = 0 FROM pg_ls_logicalmapdir();
diff --git a/src/backend/access/heap/rewriteheap.c b/src/backend/access/heap/rewriteheap.c
index 8993c1ed5a..9ea0f81ac3 100644
--- a/src/backend/access/heap/rewriteheap.c
+++ b/src/backend/access/heap/rewriteheap.c
@@ -116,6 +116,7 @@
 #include "lib/ilist.h"
 #include "miscadmin.h"
 #include "pgstat.h"
+#include "postmaster/custodian.h"
 #include "replication/logical.h"
 #include "replication/slot.h"
 #include "storage/bufmgr.h"
@@ -123,6 +124,7 @@
 #include "storage/procarray.h"
 #include "storage/smgr.h"
 #include "utils/memutils.h"
+#include "utils/pg_lsn.h"
 #include "utils/rel.h"
 
 /*
@@ -1179,7 +1181,8 @@ heap_xlog_logical_rewrite(XLogReaderState *r)
  * Perform a checkpoint for logical rewrite mappings
  *
  * This serves two tasks:
- * 1) Remove all mappings not needed anymore based on the logical restart LSN
+ * 1) Alert the custodian to remove all mappings not needed anymore based on the
+ *    logical restart LSN
  * 2) Flush all remaining mappings to disk, so that replay after a checkpoint
  *	  only has to deal with the parts of a mapping that have been written out
  *	  after the checkpoint started.
@@ -1207,6 +1210,9 @@ CheckPointLogicalRewriteHeap(void)
 	if (cutoff != InvalidXLogRecPtr && redo < cutoff)
 		cutoff = redo;
 
+	/* let the custodian know what it can remove */
+	RequestCustodian(CUSTODIAN_REMOVE_REWRITE_MAPPINGS, LSNGetDatum(cutoff));
+
 	mappings_dir = AllocateDir("pg_logical/mappings");
 	while ((mapping_de = ReadDir(mappings_dir, "pg_logical/mappings")) != NULL)
 	{
@@ -1239,15 +1245,7 @@ CheckPointLogicalRewriteHeap(void)
 
 		lsn = ((uint64) hi) << 32 | lo;
 
-		if (lsn < cutoff || cutoff == InvalidXLogRecPtr)
-		{
-			elog(DEBUG1, "removing logical rewrite file \"%s\"", path);
-			if (unlink(path) < 0)
-				ereport(ERROR,
-						(errcode_for_file_access(),
-						 errmsg("could not remove file \"%s\": %m", path)));
-		}
-		else
+		if (lsn >= cutoff && cutoff != InvalidXLogRecPtr)
 		{
 			/* on some operating systems fsyncing a file requires O_RDWR */
 			int			fd = OpenTransientFile(path, O_RDWR | PG_BINARY);
@@ -1285,3 +1283,63 @@ CheckPointLogicalRewriteHeap(void)
 	/* persist directory entries to disk */
 	fsync_fname("pg_logical/mappings", true);
 }
+
+/*
+ * Remove all mappings not needed anymore based on the logical restart LSN saved
+ * by the checkpointer.  We use this saved value instead of calling
+ * ReplicationSlotsComputeLogicalRestartLSN() so that we don't try to remove
+ * files that a concurrent call to CheckPointLogicalRewriteHeap() is trying to
+ * flush to disk.
+ */
+void
+RemoveOldLogicalRewriteMappings(void)
+{
+	XLogRecPtr	cutoff;
+	DIR		   *mappings_dir;
+	struct dirent *mapping_de;
+	char		path[MAXPGPATH + 20];
+
+	cutoff = CustodianGetLogicalRewriteCutoff();
+
+	mappings_dir = AllocateDir("pg_logical/mappings");
+	while ((mapping_de = ReadDir(mappings_dir, "pg_logical/mappings")) != NULL)
+	{
+		Oid			dboid;
+		Oid			relid;
+		XLogRecPtr	lsn;
+		TransactionId rewrite_xid;
+		TransactionId create_xid;
+		uint32		hi,
+					lo;
+		PGFileType	de_type;
+
+		if (strcmp(mapping_de->d_name, ".") == 0 ||
+			strcmp(mapping_de->d_name, "..") == 0)
+			continue;
+
+		snprintf(path, sizeof(path), "pg_logical/mappings/%s", mapping_de->d_name);
+		de_type = get_dirent_type(path, mapping_de, false, DEBUG1);
+
+		if (de_type != PGFILETYPE_ERROR && de_type != PGFILETYPE_REG)
+			continue;
+
+		/* Skip over files that cannot be ours. */
+		if (strncmp(mapping_de->d_name, "map-", 4) != 0)
+			continue;
+
+		if (sscanf(mapping_de->d_name, LOGICAL_REWRITE_FORMAT,
+				   &dboid, &relid, &hi, &lo, &rewrite_xid, &create_xid) != 6)
+			elog(ERROR, "could not parse filename \"%s\"", mapping_de->d_name);
+
+		lsn = ((uint64) hi) << 32 | lo;
+		if (lsn >= cutoff && cutoff != InvalidXLogRecPtr)
+			continue;
+
+		elog(DEBUG1, "removing logical rewrite file \"%s\"", path);
+		if (unlink(path) < 0)
+			ereport(ERROR,
+					(errcode_for_file_access(),
+					 errmsg("could not remove file \"%s\": %m", path)));
+	}
+	FreeDir(mappings_dir);
+}
diff --git a/src/backend/postmaster/custodian.c b/src/backend/postmaster/custodian.c
index 4e0ce1f7b3..4cbd89fae9 100644
--- a/src/backend/postmaster/custodian.c
+++ b/src/backend/postmaster/custodian.c
@@ -21,6 +21,7 @@
  */
 #include "postgres.h"
 
+#include "access/rewriteheap.h"
 #include "libpq/pqsignal.h"
 #include "pgstat.h"
 #include "postmaster/custodian.h"
@@ -33,11 +34,13 @@
 #include "storage/procsignal.h"
 #include "storage/smgr.h"
 #include "utils/memutils.h"
+#include "utils/pg_lsn.h"
 
 static void DoCustodianTasks(void);
 static CustodianTask CustodianGetNextTask(void);
 static void CustodianEnqueueTask(CustodianTask task);
 static const struct cust_task_funcs_entry *LookupCustodianFunctions(CustodianTask task);
+static void CustodianSetLogicalRewriteCutoff(Datum arg);
 
 typedef struct
 {
@@ -45,6 +48,8 @@ typedef struct
 
 	CustodianTask task_queue_elems[NUM_CUSTODIAN_TASKS];
 	int			task_queue_head;
+
+	XLogRecPtr  logical_rewrite_mappings_cutoff;    /* can remove older mappings */
 } CustodianShmemStruct;
 
 static CustodianShmemStruct *CustodianShmem;
@@ -72,6 +77,7 @@ struct cust_task_funcs_entry
  */
 static const struct cust_task_funcs_entry cust_task_functions[] = {
 	{CUSTODIAN_REMOVE_SERIALIZED_SNAPSHOTS, RemoveOldSerializedSnapshots, NULL},
+	{CUSTODIAN_REMOVE_REWRITE_MAPPINGS, RemoveOldLogicalRewriteMappings, CustodianSetLogicalRewriteCutoff},
 	{INVALID_CUSTODIAN_TASK, NULL, NULL}	/* must be last */
 };
 
@@ -377,3 +383,40 @@ LookupCustodianFunctions(CustodianTask task)
 	elog(ERROR, "could not lookup functions for custodian task %d", task);
 	pg_unreachable();
 }
+
+/*
+ * Stores the provided cutoff LSN in the custodian's shared memory.
+ *
+ * It's okay if the cutoff LSN is updated before a previously set cutoff has
+ * been used for cleaning up files.  If that happens, it just means that the
+ * next invocation of RemoveOldLogicalRewriteMappings() will use a more accurate
+ * cutoff.
+ */
+static void
+CustodianSetLogicalRewriteCutoff(Datum arg)
+{
+	SpinLockAcquire(&CustodianShmem->cust_lck);
+	CustodianShmem->logical_rewrite_mappings_cutoff = DatumGetLSN(arg);
+	SpinLockRelease(&CustodianShmem->cust_lck);
+
+	/* if pass-by-ref, free Datum memory */
+#ifndef USE_FLOAT8_BYVAL
+	pfree(DatumGetPointer(arg));
+#endif
+}
+
+/*
+ * Used by the custodian to determine which logical rewrite mapping files it can
+ * remove.
+ */
+XLogRecPtr
+CustodianGetLogicalRewriteCutoff(void)
+{
+	XLogRecPtr  cutoff;
+
+	SpinLockAcquire(&CustodianShmem->cust_lck);
+	cutoff = CustodianShmem->logical_rewrite_mappings_cutoff;
+	SpinLockRelease(&CustodianShmem->cust_lck);
+
+	return cutoff;
+}
diff --git a/src/include/access/rewriteheap.h b/src/include/access/rewriteheap.h
index 1125457053..dc3eb3e308 100644
--- a/src/include/access/rewriteheap.h
+++ b/src/include/access/rewriteheap.h
@@ -53,5 +53,6 @@ typedef struct LogicalRewriteMappingData
  */
 #define LOGICAL_REWRITE_FORMAT "map-%x-%x-%X_%X-%x-%x"
 extern void CheckPointLogicalRewriteHeap(void);
+extern void RemoveOldLogicalRewriteMappings(void);
 
 #endif							/* REWRITE_HEAP_H */
diff --git a/src/include/postmaster/custodian.h b/src/include/postmaster/custodian.h
index ab6d4283b9..00280c203b 100644
--- a/src/include/postmaster/custodian.h
+++ b/src/include/postmaster/custodian.h
@@ -12,6 +12,8 @@
 #ifndef _CUSTODIAN_H
 #define _CUSTODIAN_H
 
+#include "access/xlogdefs.h"
+
 /*
  * If you add a new task here, be sure to add its corresponding function
  * pointers to cust_task_functions in custodian.c.
@@ -19,6 +21,7 @@
 typedef enum CustodianTask
 {
 	CUSTODIAN_REMOVE_SERIALIZED_SNAPSHOTS,
+	CUSTODIAN_REMOVE_REWRITE_MAPPINGS,
 
 	NUM_CUSTODIAN_TASKS,			/* new tasks go above */
 	INVALID_CUSTODIAN_TASK
@@ -28,5 +31,6 @@ extern void CustodianMain(void) pg_attribute_noreturn();
 extern Size CustodianShmemSize(void);
 extern void CustodianShmemInit(void);
 extern void RequestCustodian(CustodianTask task, Datum arg);
+extern XLogRecPtr CustodianGetLogicalRewriteCutoff(void);
 
 #endif						/* _CUSTODIAN_H */
-- 
2.25.1

>From 45e69ea5ab047b5d296de0f8869d359a17fdcf99 Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nathandboss...@gmail.com>
Date: Mon, 28 Nov 2022 15:15:37 -0800
Subject: [PATCH v19 4/4] Do not delay shutdown due to long-running custodian
 tasks.

These tasks are not essential enough to delay shutdown and can be
retried the next time the server is running.
---
 src/backend/access/heap/rewriteheap.c       | 9 +++++++++
 src/backend/postmaster/custodian.c          | 8 ++++++++
 src/backend/replication/logical/snapbuild.c | 9 +++++++++
 3 files changed, 26 insertions(+)

diff --git a/src/backend/access/heap/rewriteheap.c b/src/backend/access/heap/rewriteheap.c
index 9ea0f81ac3..3ee635fe77 100644
--- a/src/backend/access/heap/rewriteheap.c
+++ b/src/backend/access/heap/rewriteheap.c
@@ -117,6 +117,7 @@
 #include "miscadmin.h"
 #include "pgstat.h"
 #include "postmaster/custodian.h"
+#include "postmaster/interrupt.h"
 #include "replication/logical.h"
 #include "replication/slot.h"
 #include "storage/bufmgr.h"
@@ -1313,6 +1314,14 @@ RemoveOldLogicalRewriteMappings(void)
 					lo;
 		PGFileType	de_type;
 
+		/*
+		 * This task is not essential enough to delay shutdown, so bail out if
+		 * there's a pending shutdown request.  We'll try again the next time
+		 * the server is running.
+		 */
+		if (ShutdownRequestPending)
+			break;
+
 		if (strcmp(mapping_de->d_name, ".") == 0 ||
 			strcmp(mapping_de->d_name, "..") == 0)
 			continue;
diff --git a/src/backend/postmaster/custodian.c b/src/backend/postmaster/custodian.c
index 4cbd89fae9..274b2d4a79 100644
--- a/src/backend/postmaster/custodian.c
+++ b/src/backend/postmaster/custodian.c
@@ -226,6 +226,14 @@ DoCustodianTasks(void)
 	{
 		CustodianTaskFunction func = (LookupCustodianFunctions(task))->task_func;
 
+		/*
+		 * Custodian tasks are not essential enough to delay shutdown, so bail
+		 * out if there's a pending shutdown request.  Tasks should be
+		 * requested again and retried the next time the server is running.
+		 */
+		if (ShutdownRequestPending)
+			break;
+
 		PG_TRY();
 		{
 			(*func) ();
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index 6b403a2bb4..0890825fb9 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -126,6 +126,7 @@
 #include "common/file_utils.h"
 #include "miscadmin.h"
 #include "pgstat.h"
+#include "postmaster/interrupt.h"
 #include "replication/logical.h"
 #include "replication/reorderbuffer.h"
 #include "replication/snapbuild.h"
@@ -2072,6 +2073,14 @@ RemoveOldSerializedSnapshots(void)
 		XLogRecPtr	lsn;
 		PGFileType	de_type;
 
+		/*
+		 * This task is not essential enough to delay shutdown, so bail out if
+		 * there's a pending shutdown request.  We'll try again the next time
+		 * the server is running.
+		 */
+		if (ShutdownRequestPending)
+			break;
+
 		if (strcmp(snap_de->d_name, ".") == 0 ||
 			strcmp(snap_de->d_name, "..") == 0)
 			continue;
-- 
2.25.1

Reply via email to