From 9583e36ad93255120f082f67f08a56ffd27bb13a Mon Sep 17 00:00:00 2001
From: Alexander Korotkov <akorotkov@postgresql.org>
Date: Wed, 23 Jul 2025 01:33:19 +0300
Subject: [PATCH v8] Process sync requests incrementally in AbsorbSyncRequests

If the number of sync requests is big enough, the palloc() call in
AbsorbSyncRequests() will attempt to allocate more than 1 GB of memory,
resulting in failure.  This can lead to an infinite loop in the checkpointer
process, as it repeatedly fails to absorb the pending requests.

This commit introduces the following changes to cope with this problem:
 1. Turn pending checkpointer requests array in shared memory into a bounded
    ring buffer.
 2. Limit maximum ring buffer size to 10M items.
 3. Make AbsorbSyncRequests() process requests incrementally in 10K batches.

Even #2 makes the whole queue size fit the maximum palloc() size of 1 GB.
of continuous lock holding.

This commit is for master only.  Simpler fix, which just limits a request
queue size to 10M, will be backpatched.

Reported-by: Ekaterina Sokolova <e.sokolova@postgrespro.ru>
Discussion: https://postgr.es/m/db4534f83a22a29ab5ee2566ad86ca92%40postgrespro.ru
Author: Maxim Orlov <orlovmg@gmail.com>
Co-authored-by:  Xuneng Zhou <xunengzhou@gmail.com>
Reviewed-by: Andres Freund <andres@anarazel.de>
Reviewed-by: Heikki Linnakangas <hlinnaka@iki.fi>
Reviewed-by: Alexander Korotkov <aekorotkov@gmail.com>
---
 src/backend/postmaster/checkpointer.c | 155 +++++++++++++++++++-------
 1 file changed, 114 insertions(+), 41 deletions(-)

diff --git a/src/backend/postmaster/checkpointer.c b/src/backend/postmaster/checkpointer.c
index 2809e298a44..7d235e382ec 100644
--- a/src/backend/postmaster/checkpointer.c
+++ b/src/backend/postmaster/checkpointer.c
@@ -130,6 +130,13 @@ typedef struct
 
 	int			num_requests;	/* current # of requests */
 	int			max_requests;	/* allocated array size */
+
+	int			head;			/* Index of the first request in the ring
+								 * buffer */
+	int			tail;			/* Index of the last request in the ring
+								 * buffer */
+
+	/* The ring buffer of pending checkpointer requests */
 	CheckpointerRequest requests[FLEXIBLE_ARRAY_MEMBER];
 } CheckpointerShmemStruct;
 
@@ -138,6 +145,12 @@ static CheckpointerShmemStruct *CheckpointerShmem;
 /* interval for calling AbsorbSyncRequests in CheckpointWriteDelay */
 #define WRITES_PER_ABSORB		1000
 
+/* Maximum number of checkpointer requests to process in one batch */
+#define CKPT_REQ_BATCH_SIZE 10000
+
+/* Max number of requests the checkpointer request queue can hold */
+#define MAX_CHECKPOINT_REQUESTS 10000000
+
 /*
  * GUC parameters
  */
@@ -973,7 +986,8 @@ CheckpointerShmemInit(void)
 		 */
 		MemSet(CheckpointerShmem, 0, size);
 		SpinLockInit(&CheckpointerShmem->ckpt_lck);
-		CheckpointerShmem->max_requests = NBuffers;
+		CheckpointerShmem->max_requests = Min(NBuffers, MAX_CHECKPOINT_REQUESTS);
+		CheckpointerShmem->head = CheckpointerShmem->tail = 0;
 		ConditionVariableInit(&CheckpointerShmem->start_cv);
 		ConditionVariableInit(&CheckpointerShmem->done_cv);
 	}
@@ -1201,6 +1215,7 @@ ForwardSyncRequest(const FileTag *ftag, SyncRequestType type)
 {
 	CheckpointerRequest *request;
 	bool		too_full;
+	int			insert_pos;
 
 	if (!IsUnderPostmaster)
 		return false;			/* probably shouldn't even get here */
@@ -1224,10 +1239,14 @@ ForwardSyncRequest(const FileTag *ftag, SyncRequestType type)
 	}
 
 	/* OK, insert request */
-	request = &CheckpointerShmem->requests[CheckpointerShmem->num_requests++];
+	insert_pos = CheckpointerShmem->tail;
+	request = &CheckpointerShmem->requests[insert_pos];
 	request->ftag = *ftag;
 	request->type = type;
 
+	CheckpointerShmem->tail = (CheckpointerShmem->tail + 1) % CheckpointerShmem->max_requests;
+	CheckpointerShmem->num_requests++;
+
 	/* If queue is more than half full, nudge the checkpointer to empty it */
 	too_full = (CheckpointerShmem->num_requests >=
 				CheckpointerShmem->max_requests / 2);
@@ -1269,12 +1288,16 @@ CompactCheckpointerRequestQueue(void)
 	struct CheckpointerSlotMapping
 	{
 		CheckpointerRequest request;
-		int			slot;
+		int			ring_idx;
 	};
 
-	int			n,
-				preserve_count;
+	int			n;
 	int			num_skipped = 0;
+	int			head;
+	int			max_requests;
+	int			num_requests;
+	int			read_idx,
+				write_idx;
 	HASHCTL		ctl;
 	HTAB	   *htab;
 	bool	   *skip_slot;
@@ -1286,8 +1309,13 @@ CompactCheckpointerRequestQueue(void)
 	if (CritSectionCount > 0)
 		return false;
 
+	max_requests = CheckpointerShmem->max_requests;
+	num_requests = CheckpointerShmem->num_requests;
+
 	/* Initialize skip_slot array */
-	skip_slot = palloc0(sizeof(bool) * CheckpointerShmem->num_requests);
+	skip_slot = palloc0(sizeof(bool) * max_requests);
+
+	head = CheckpointerShmem->head;
 
 	/* Initialize temporary hash table */
 	ctl.keysize = sizeof(CheckpointerRequest);
@@ -1311,7 +1339,8 @@ CompactCheckpointerRequestQueue(void)
 	 * away preceding entries that would end up being canceled anyhow), but
 	 * it's not clear that the extra complexity would buy us anything.
 	 */
-	for (n = 0; n < CheckpointerShmem->num_requests; n++)
+	read_idx = head;
+	for (n = 0; n < num_requests; n++)
 	{
 		CheckpointerRequest *request;
 		struct CheckpointerSlotMapping *slotmap;
@@ -1324,16 +1353,19 @@ CompactCheckpointerRequestQueue(void)
 		 * CheckpointerShmemInit.  Note also that RelFileLocator had better
 		 * contain no pad bytes.
 		 */
-		request = &CheckpointerShmem->requests[n];
+		request = &CheckpointerShmem->requests[read_idx];
 		slotmap = hash_search(htab, request, HASH_ENTER, &found);
 		if (found)
 		{
 			/* Duplicate, so mark the previous occurrence as skippable */
-			skip_slot[slotmap->slot] = true;
+			skip_slot[slotmap->ring_idx] = true;
 			num_skipped++;
 		}
 		/* Remember slot containing latest occurrence of this request value */
-		slotmap->slot = n;
+		slotmap->ring_idx = read_idx;
+
+		/* Move to the next request in the ring buffer */
+		read_idx = (read_idx + 1) % max_requests;
 	}
 
 	/* Done with the hash table. */
@@ -1347,17 +1379,34 @@ CompactCheckpointerRequestQueue(void)
 	}
 
 	/* We found some duplicates; remove them. */
-	preserve_count = 0;
-	for (n = 0; n < CheckpointerShmem->num_requests; n++)
+	read_idx = write_idx = head;
+	for (n = 0; n < num_requests; n++)
 	{
-		if (skip_slot[n])
-			continue;
-		CheckpointerShmem->requests[preserve_count++] = CheckpointerShmem->requests[n];
+		/* If this slot is NOT skipped, keep it */
+		if (!skip_slot[read_idx])
+		{
+			/* If the read and write positions are different, copy the request */
+			if (write_idx != read_idx)
+				CheckpointerShmem->requests[write_idx] =
+					CheckpointerShmem->requests[read_idx];
+
+			/* Advance the write position */
+			write_idx = (write_idx + 1) % max_requests;
+		}
+
+		read_idx = (read_idx + 1) % max_requests;
 	}
+
+	/*
+	 * Update ring buffer state: head remains the same, tail moves, count
+	 * decreases
+	 */
+	CheckpointerShmem->tail = write_idx;
+	CheckpointerShmem->num_requests -= num_skipped;
+
 	ereport(DEBUG1,
 			(errmsg_internal("compacted fsync request queue from %d entries to %d entries",
-							 CheckpointerShmem->num_requests, preserve_count)));
-	CheckpointerShmem->num_requests = preserve_count;
+							 num_requests, CheckpointerShmem->num_requests)));
 
 	/* Cleanup. */
 	pfree(skip_slot);
@@ -1378,40 +1427,64 @@ AbsorbSyncRequests(void)
 {
 	CheckpointerRequest *requests = NULL;
 	CheckpointerRequest *request;
-	int			n;
+	int			n,
+				i;
+	bool		loop;
 
 	if (!AmCheckpointerProcess())
 		return;
 
-	LWLockAcquire(CheckpointerCommLock, LW_EXCLUSIVE);
-
-	/*
-	 * We try to avoid holding the lock for a long time by copying the request
-	 * array, and processing the requests after releasing the lock.
-	 *
-	 * Once we have cleared the requests from shared memory, we have to PANIC
-	 * if we then fail to absorb them (eg, because our hashtable runs out of
-	 * memory).  This is because the system cannot run safely if we are unable
-	 * to fsync what we have been told to fsync.  Fortunately, the hashtable
-	 * is so small that the problem is quite unlikely to arise in practice.
-	 */
-	n = CheckpointerShmem->num_requests;
-	if (n > 0)
+	do
 	{
-		requests = (CheckpointerRequest *) palloc(n * sizeof(CheckpointerRequest));
-		memcpy(requests, CheckpointerShmem->requests, n * sizeof(CheckpointerRequest));
-	}
+		LWLockAcquire(CheckpointerCommLock, LW_EXCLUSIVE);
+
+		/*---
+		 * We try to avoid holding the lock for a long time by:
+		 * 1. Copying the request array, and processing the requests after
+		 *    releasing the lock;
+		 * 2. Processing not the whole queue, but only batches of
+		 *    CKPT_REQ_BATCH_SIZE at once.
+		 *
+		 * Once we have cleared the requests from shared memory, we have to
+		 * PANIC if we then fail to absorb them (eg, because our hashtable
+		 * runs out of memory).  This is because the system cannot run safely
+		 * if we are unable to fsync what we have been told to fsync.
+		 * Fortunately, the hashtable is so small that the problem is quite
+		 * unlikely to arise in practice.
+		 *
+		 * Note: The maximum possible size of a ring buffer is
+		 * MAX_CHECKPOINT_REQUESTS entries, which fit into a maximum palloc
+		 * allocation size of 1Gb.  Our maximum batch size,
+		 * CKPT_REQ_BATCH_SIZE, is even smaller.
+		 */
+		n = Min(CheckpointerShmem->num_requests, CKPT_REQ_BATCH_SIZE);
+		if (n > 0)
+		{
+			if (!requests)
+				requests = (CheckpointerRequest *) palloc(n * sizeof(CheckpointerRequest));
 
-	START_CRIT_SECTION();
+			for (i = 0; i < n; i++)
+			{
+				requests[i] = CheckpointerShmem->requests[CheckpointerShmem->head];
+				CheckpointerShmem->head = (CheckpointerShmem->head + 1) % CheckpointerShmem->max_requests;
+			}
 
-	CheckpointerShmem->num_requests = 0;
+			CheckpointerShmem->num_requests -= n;
 
-	LWLockRelease(CheckpointerCommLock);
+		}
+
+		START_CRIT_SECTION();
+
+		/* Are there any requests in the queue? If so, keep going. */
+		loop = CheckpointerShmem->num_requests != 0;
+
+		LWLockRelease(CheckpointerCommLock);
 
-	for (request = requests; n > 0; request++, n--)
-		RememberSyncRequest(&request->ftag, request->type);
+		for (request = requests; n > 0; request++, n--)
+			RememberSyncRequest(&request->ftag, request->type);
 
-	END_CRIT_SECTION();
+		END_CRIT_SECTION();
+	} while (loop);
 
 	if (requests)
 		pfree(requests);
-- 
2.39.5 (Apple Git-154)

