Hi all,

With reference to the last patches (v11) I received [1] and while reviewing
Melanie’s latest feedback, I understood that PageSetBatchChecksumInplace()
is currently WIP and depends on upcoming changes to hint-bit locking. It
will be contrary to the flow if I propose new functional changes to
checksum batching at this time. So for now I will focus on preparatory or
documentation improvements until I get the updates on dependencies.
Regarding my patch attached, the patch introduces write-combining during
checkpoints by batching contiguous buffers and allowing them to be written
using vectorized I/O. My patch includes write-combining for checkpoint
buffer flushes, contiguous buffer batching, Preserved WAL ordering,
locking, and buffer state invariants. The change is currently limited to
the checkpointer path (BufferSync()). So far I tested my implementation and
found that all the regression (233 tests) and isolation tests (121 tests)
got passed, the manual pgbench validation completed successfully and also
verified pg_stat_bgwriter counters before and after checkpoints. So far the
implementation is stable in my system. And currently, I am focussing on the
implementation of check pointer write combining for bgwriter behavior and
will update it soon after validating my implementation.
I hope my findings and approach will be helpful in further implementations
and I would appreciate feedback on my approach.

Regards,
Soumya

Reference:
[1]
https://www.postgresql.org/message-id/flat/CAAKRu_ZiEpE_EHww3S3-E3iznybdnX8mXSO7Wsuru7%3DP9Y%3DczQ%40mail.gmail.com#52e4f67645f26609427d5b1fc31fdf08
From 9aba47eea0f5a856d023fbc32ab61ec62cacf22a Mon Sep 17 00:00:00 2001
From: Soumya S Murali <[email protected]>
Date: Mon, 15 Dec 2025 14:08:42 +0530
Subject: [PATCH] Add write-combining to checkpoint buffer writes.

Signed-off-by: Soumya S Murali <[email protected]>
---
 src/backend/storage/buffer/bufmgr.c | 750 ++++++++++++++++------------
 src/backend/storage/page/bufpage.c  |  19 +
 src/include/storage/buf_internals.h |  22 +
 src/include/storage/bufmgr.h        |   4 +-
 4 files changed, 485 insertions(+), 310 deletions(-)

diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index f373cead95f..2ea5311aed2 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -537,6 +537,12 @@ static void FlushUnlockedBuffer(BufferDesc *buf, SMgrRelation reln,
 								IOObject io_object, IOContext io_context);
 static void FlushBuffer(BufferDesc *buf, SMgrRelation reln,
 						IOObject io_object, IOContext io_context);
+static bool PrepareBufferForCheckpoint(BufferDesc *bufdesc,
+							   XLogRecPtr *lsn);							   
+static bool BufferNeedsWALFlush(BufferDesc *bufdesc, XLogRecPtr *lsn);
+static void CleanVictimBuffer(BufferDesc *bufdesc, bool from_ring,
+							  IOContext io_context);
+static int FlushBufferBatch(BufWriteBatch *batch, IOContext io_context);
 static void FindAndDropRelationBuffers(RelFileLocator rlocator,
 									   ForkNumber forkNum,
 									   BlockNumber nForkBlock,
@@ -2331,34 +2337,35 @@ GetVictimBuffer(BufferAccessStrategy strategy, IOContext io_context)
 	ReservePrivateRefCountEntry();
 	ResourceOwnerEnlarge(CurrentResourceOwner);
 
-	/* we return here if a prospective victim buffer gets used concurrently */
-again:
+	/* retry using loop instead of goto */
+	for (;;)
+	{
 
-	/*
-	 * Select a victim buffer.  The buffer is returned pinned and owned by
-	 * this backend.
-	 */
-	buf_hdr = StrategyGetBuffer(strategy, &buf_state, &from_ring);
-	buf = BufferDescriptorGetBuffer(buf_hdr);
+		/*
+		 * Select a victim buffer.  The buffer is returned pinned and owned by
+		 * this backend.
+		 */
+		buf_hdr = StrategyGetBuffer(strategy, &buf_state, &from_ring);
+		buf = BufferDescriptorGetBuffer(buf_hdr);
 
-	/*
-	 * We shouldn't have any other pins for this buffer.
-	 */
-	CheckBufferIsPinnedOnce(buf);
+		/*
+		 * We shouldn't have any other pins for this buffer.
+		 */
+		CheckBufferIsPinnedOnce(buf);
 
-	/*
-	 * If the buffer was dirty, try to write it out.  There is a race
-	 * condition here, in that someone might dirty it after we released the
-	 * buffer header lock above, or even while we are writing it out (since
-	 * our share-lock won't prevent hint-bit updates).  We will recheck the
-	 * dirty bit after re-locking the buffer header.
-	 */
-	if (buf_state & BM_DIRTY)
-	{
-		LWLock	   *content_lock;
+		/*
+		 * If the buffer was dirty, try to write it out.  There is a race
+		 * condition here, in that someone might dirty it after we released the
+		 * buffer header lock above, or even while we are writing it out (since
+		 * our share-lock won't prevent hint-bit updates).  We will recheck the
+		 * dirty bit after re-locking the buffer header.
+		 */
+		if (buf_state & BM_DIRTY)
+		{
+			LWLock	   *content_lock;
 
-		Assert(buf_state & BM_TAG_VALID);
-		Assert(buf_state & BM_VALID);
+			Assert(buf_state & BM_TAG_VALID);
+			Assert(buf_state & BM_VALID);
 
 		/*
 		 * We need a share-lock on the buffer contents to write it out (else
@@ -2374,16 +2381,16 @@ again:
 		 * one just happens to be trying to split the page the first one got
 		 * from StrategyGetBuffer.)
 		 */
-		content_lock = BufferDescriptorGetContentLock(buf_hdr);
-		if (!LWLockConditionalAcquire(content_lock, LW_SHARED))
-		{
+			content_lock = BufferDescriptorGetContentLock(buf_hdr);
+			if (!LWLockConditionalAcquire(content_lock, LW_SHARED))
+			{
 			/*
 			 * Someone else has locked the buffer, so give it up and loop back
 			 * to get another one.
 			 */
-			UnpinBuffer(buf_hdr);
-			goto again;
-		}
+				UnpinBuffer(buf_hdr);
+				continue;
+			}
 
 		/*
 		 * If using a nondefault strategy, and writing the buffer would
@@ -2392,35 +2399,34 @@ again:
 		 * lock to inspect the page LSN, so this can't be done inside
 		 * StrategyGetBuffer.
 		 */
-		if (strategy != NULL)
-		{
-			XLogRecPtr	lsn;
+			if (strategy != NULL)
+			{
+				XLogRecPtr	lsn;
 
-			/* Read the LSN while holding buffer header lock */
-			buf_state = LockBufHdr(buf_hdr);
-			lsn = BufferGetLSN(buf_hdr);
-			UnlockBufHdr(buf_hdr);
+				/* Read the LSN while holding buffer header lock */
+				buf_state = LockBufHdr(buf_hdr);
+				lsn = BufferGetLSN(buf_hdr);
+				UnlockBufHdr(buf_hdr);
 
-			if (XLogNeedsFlush(lsn)
-				&& StrategyRejectBuffer(strategy, buf_hdr, from_ring))
-			{
-				LWLockRelease(content_lock);
-				UnpinBuffer(buf_hdr);
-				goto again;
+				if (XLogNeedsFlush(lsn)
+					&& StrategyRejectBuffer(strategy, buf_hdr, from_ring))
+				{
+					LWLockRelease(content_lock);
+					UnpinBuffer(buf_hdr);
+					continue;
+				}
 			}
-		}
 
-		/* OK, do the I/O */
-		FlushBuffer(buf_hdr, NULL, IOOBJECT_RELATION, io_context);
-		LWLockRelease(content_lock);
+			/* OK, do the I/O */
+			FlushBuffer(buf_hdr, NULL, IOOBJECT_RELATION, io_context);
+			LWLockRelease(content_lock);
 
-		ScheduleBufferTagForWriteback(&BackendWritebackContext, io_context,
+			ScheduleBufferTagForWriteback(&BackendWritebackContext, io_context,
 									  &buf_hdr->tag);
-	}
-
+		}
 
-	if (buf_state & BM_VALID)
-	{
+		if (buf_state & BM_VALID)
+		{
 		/*
 		 * When a BufferAccessStrategy is in use, blocks evicted from shared
 		 * buffers are counted as IOOP_EVICT in the corresponding context
@@ -2437,32 +2443,33 @@ again:
 		 * we may have been forced to release the buffer due to concurrent
 		 * pinners or erroring out.
 		 */
-		pgstat_count_io_op(IOOBJECT_RELATION, io_context,
-						   from_ring ? IOOP_REUSE : IOOP_EVICT, 1, 0);
-	}
+			pgstat_count_io_op(IOOBJECT_RELATION, io_context,
+						   	from_ring ? IOOP_REUSE : IOOP_EVICT, 1, 0);
+		}
 
-	/*
-	 * If the buffer has an entry in the buffer mapping table, delete it. This
-	 * can fail because another backend could have pinned or dirtied the
-	 * buffer.
-	 */
-	if ((buf_state & BM_TAG_VALID) && !InvalidateVictimBuffer(buf_hdr))
-	{
-		UnpinBuffer(buf_hdr);
-		goto again;
-	}
+		/*
+		 * If the buffer has an entry in the buffer mapping table, delete it. This
+		 * can fail because another backend could have pinned or dirtied the
+		 * buffer.
+		 */
+		if ((buf_state & BM_TAG_VALID) && !InvalidateVictimBuffer(buf_hdr))
+		{
+			UnpinBuffer(buf_hdr);
+			continue;
+		}
 
-	/* a final set of sanity checks */
+		/* a final set of sanity checks */
 #ifdef USE_ASSERT_CHECKING
-	buf_state = pg_atomic_read_u32(&buf_hdr->state);
+		buf_state = pg_atomic_read_u32(&buf_hdr->state);
 
-	Assert(BUF_STATE_GET_REFCOUNT(buf_state) == 1);
-	Assert(!(buf_state & (BM_TAG_VALID | BM_VALID | BM_DIRTY)));
+		Assert(BUF_STATE_GET_REFCOUNT(buf_state) == 1);
+		Assert(!(buf_state & (BM_TAG_VALID | BM_VALID | BM_DIRTY)));
 
-	CheckBufferIsPinnedOnce(buf);
+		CheckBufferIsPinnedOnce(buf);
 #endif
 
-	return buf;
+		return buf;
+	} /* end of for(;;) */
 }
 
 /*
@@ -3329,6 +3336,92 @@ TrackNewBufferPin(Buffer buf)
 #define ST_DEFINE
 #include "lib/sort_template.h"
 
+static void
+ResetBufWriteBatch(BufWriteBatch *batch)
+{
+    batch->n = 0;
+    batch->max_lsn = InvalidXLogRecPtr;
+    batch->reln = NULL;
+    WritebackContextInit(&batch->wb_context, &checkpoint_flush_after);
+}
+
+/*
+ * SyncOneBuffer -- process a single buffer during syncing.
+ *
+ * If skip_recently_used is true, we don't write currently-pinned buffers, nor
+ * buffers marked recently used, as these are not replacement candidates.
+ *
+ * Returns a bitmask containing the following flag bits:
+ *	BUF_WRITTEN: we wrote the buffer.
+ *	BUF_REUSABLE: buffer is available for replacement, ie, it has
+ *		pin count 0 and usage count 0.
+ *
+ * (BUF_WRITTEN could be set in error if FlushBuffer finds the buffer clean
+ * after locking it, but we don't care all that much.)
+ */
+static int
+SyncOneBuffer(int buf_id, bool skip_recently_used, WritebackContext *wb_context)
+{
+	BufferDesc *bufHdr = GetBufferDescriptor(buf_id);
+	int			result = 0;
+	uint32		buf_state;
+	BufferTag	tag;
+
+	/* Make sure we can handle the pin */
+	ReservePrivateRefCountEntry();
+	ResourceOwnerEnlarge(CurrentResourceOwner);
+
+	/*
+	 * Check whether buffer needs writing.
+	 *
+	 * We can make this check without taking the buffer content lock so long
+	 * as we mark pages dirty in access methods *before* logging changes with
+	 * XLogInsert(): if someone marks the buffer dirty just after our check we
+	 * don't worry because our checkpoint.redo points before log record for
+	 * upcoming changes and so we are not required to write such dirty buffer.
+	 */
+	buf_state = LockBufHdr(bufHdr);
+
+	if (BUF_STATE_GET_REFCOUNT(buf_state) == 0 &&
+		BUF_STATE_GET_USAGECOUNT(buf_state) == 0)
+	{
+		result |= BUF_REUSABLE;
+	}
+	else if (skip_recently_used)
+	{
+		/* Caller told us not to write recently-used buffers */
+		UnlockBufHdr(bufHdr);
+		return result;
+	}
+
+	if (!(buf_state & BM_VALID) || !(buf_state & BM_DIRTY))
+	{
+		/* It's clean, so nothing to do */
+		UnlockBufHdr(bufHdr);
+		return result;
+	}
+
+	/*
+	 * Pin it, share-lock it, write it.  (FlushBuffer will do nothing if the
+	 * buffer is clean by the time we've locked it.)
+	 */
+	PinBuffer_Locked(bufHdr);
+
+	FlushUnlockedBuffer(bufHdr, NULL, IOOBJECT_RELATION, IOCONTEXT_NORMAL);
+
+	tag = bufHdr->tag;
+
+	UnpinBuffer(bufHdr);
+
+	/*
+	 * SyncOneBuffer() is only called by checkpointer and bgwriter, so
+	 * IOContext will always be IOCONTEXT_NORMAL.
+	 */
+	ScheduleBufferTagForWriteback(wb_context, IOCONTEXT_NORMAL, &tag);
+
+	return result | BUF_WRITTEN;
+}
+
 /*
  * BufferSync -- Write out all dirty buffers in the pool.
  *
@@ -3344,144 +3437,81 @@ BufferSync(int flags)
 {
 	uint32		buf_state;
 	int			buf_id;
-	int			num_to_scan;
-	int			num_spaces;
-	int			num_processed;
-	int			num_written;
+	int			num_to_scan = 0;
+	int			num_spaces = 0;
+	int			num_processed = 0;
+	int			num_written = 0;
 	CkptTsStatus *per_ts_stat = NULL;
 	Oid			last_tsid;
 	binaryheap *ts_heap;
-	int			i;
-	uint32		mask = BM_DIRTY;
 	WritebackContext wb_context;
+	uint32		mask = BM_DIRTY;
+	int			i;
 
-	/*
-	 * Unless this is a shutdown checkpoint or we have been explicitly told,
-	 * we write only permanent, dirty buffers.  But at shutdown or end of
-	 * recovery, we write all dirty buffers.
-	 */
-	if (!((flags & (CHECKPOINT_IS_SHUTDOWN | CHECKPOINT_END_OF_RECOVERY |
+	/* Determine which buffers must be written */
+	if (!((flags & (CHECKPOINT_IS_SHUTDOWN |
+					CHECKPOINT_END_OF_RECOVERY |
 					CHECKPOINT_FLUSH_UNLOGGED))))
 		mask |= BM_PERMANENT;
 
-	/*
-	 * Loop over all buffers, and mark the ones that need to be written with
-	 * BM_CHECKPOINT_NEEDED.  Count them as we go (num_to_scan), so that we
-	 * can estimate how much work needs to be done.
-	 *
-	 * This allows us to write only those pages that were dirty when the
-	 * checkpoint began, and not those that get dirtied while it proceeds.
-	 * Whenever a page with BM_CHECKPOINT_NEEDED is written out, either by us
-	 * later in this function, or by normal backends or the bgwriter cleaning
-	 * scan, the flag is cleared.  Any buffer dirtied after this point won't
-	 * have the flag set.
-	 *
-	 * Note that if we fail to write some buffer, we may leave buffers with
-	 * BM_CHECKPOINT_NEEDED still set.  This is OK since any such buffer would
-	 * certainly need to be written for the next checkpoint attempt, too.
-	 */
-	num_to_scan = 0;
+	/* identify dirty buffers at checkpoint start */
 	for (buf_id = 0; buf_id < NBuffers; buf_id++)
 	{
 		BufferDesc *bufHdr = GetBufferDescriptor(buf_id);
 		uint32		set_bits = 0;
 
-		/*
-		 * Header spinlock is enough to examine BM_DIRTY, see comment in
-		 * SyncOneBuffer.
-		 */
 		buf_state = LockBufHdr(bufHdr);
 
 		if ((buf_state & mask) == mask)
 		{
-			CkptSortItem *item;
+			CkptSortItem *item = &CkptBufferIds[num_to_scan++];
 
 			set_bits = BM_CHECKPOINT_NEEDED;
 
-			item = &CkptBufferIds[num_to_scan++];
-			item->buf_id = buf_id;
-			item->tsId = bufHdr->tag.spcOid;
-			item->relNumber = BufTagGetRelNumber(&bufHdr->tag);
-			item->forkNum = BufTagGetForkNum(&bufHdr->tag);
-			item->blockNum = bufHdr->tag.blockNum;
+			item->buf_id    = buf_id;
+			item->tsId      = bufHdr->tag.spcOid;
+			item->relNumber = bufHdr->tag.relNumber;
+			item->forkNum   = bufHdr->tag.forkNum;
+			item->blockNum  = bufHdr->tag.blockNum;
 		}
 
 		UnlockBufHdrExt(bufHdr, buf_state,
-						set_bits, 0,
-						0);
+						set_bits, 0, 0);
 
-		/* Check for barrier events in case NBuffers is large. */
 		if (ProcSignalBarrierPending)
 			ProcessProcSignalBarrier();
 	}
 
 	if (num_to_scan == 0)
-		return;					/* nothing to do */
+		return;
 
 	WritebackContextInit(&wb_context, &checkpoint_flush_after);
 
 	TRACE_POSTGRESQL_BUFFER_SYNC_START(NBuffers, num_to_scan);
 
-	/*
-	 * Sort buffers that need to be written to reduce the likelihood of random
-	 * IO. The sorting is also important for the implementation of balancing
-	 * writes between tablespaces. Without balancing writes we'd potentially
-	 * end up writing to the tablespaces one-by-one; possibly overloading the
-	 * underlying system.
-	 */
+	/* Sort by tablespace */
 	sort_checkpoint_bufferids(CkptBufferIds, num_to_scan);
 
-	num_spaces = 0;
-
-	/*
-	 * Allocate progress status for each tablespace with buffers that need to
-	 * be flushed. This requires the to-be-flushed array to be sorted.
-	 */
+	/* Build per-tablespace progress tracking */
 	last_tsid = InvalidOid;
+
 	for (i = 0; i < num_to_scan; i++)
 	{
+		Oid cur_tsid = CkptBufferIds[i].tsId;
 		CkptTsStatus *s;
-		Oid			cur_tsid;
-
-		cur_tsid = CkptBufferIds[i].tsId;
 
-		/*
-		 * Grow array of per-tablespace status structs, every time a new
-		 * tablespace is found.
-		 */
-		if (last_tsid == InvalidOid || last_tsid != cur_tsid)
+		if (last_tsid == InvalidOid || cur_tsid != last_tsid)
 		{
-			Size		sz;
-
 			num_spaces++;
-
-			/*
-			 * Not worth adding grow-by-power-of-2 logic here - even with a
-			 * few hundred tablespaces this should be fine.
-			 */
-			sz = sizeof(CkptTsStatus) * num_spaces;
-
-			if (per_ts_stat == NULL)
-				per_ts_stat = (CkptTsStatus *) palloc(sz);
-			else
-				per_ts_stat = (CkptTsStatus *) repalloc(per_ts_stat, sz);
+			per_ts_stat = (num_spaces == 1)
+					? palloc(sizeof(CkptTsStatus))
+					: repalloc(per_ts_stat, sizeof(CkptTsStatus) * num_spaces);
 
 			s = &per_ts_stat[num_spaces - 1];
 			memset(s, 0, sizeof(*s));
-			s->tsId = cur_tsid;
-
-			/*
-			 * The first buffer in this tablespace. As CkptBufferIds is sorted
-			 * by tablespace all (s->num_to_scan) buffers in this tablespace
-			 * will follow afterwards.
-			 */
+			s->tsId  = cur_tsid;
 			s->index = i;
 
-			/*
-			 * progress_slice will be determined once we know how many buffers
-			 * are in each tablespace, i.e. after this loop.
-			 */
-
 			last_tsid = cur_tsid;
 		}
 		else
@@ -3491,117 +3521,132 @@ BufferSync(int flags)
 
 		s->num_to_scan++;
 
-		/* Check for barrier events. */
 		if (ProcSignalBarrierPending)
 			ProcessProcSignalBarrier();
 	}
 
-	Assert(num_spaces > 0);
-
-	/*
-	 * Build a min-heap over the write-progress in the individual tablespaces,
-	 * and compute how large a portion of the total progress a single
-	 * processed buffer is.
-	 */
 	ts_heap = binaryheap_allocate(num_spaces,
 								  ts_ckpt_progress_comparator,
 								  NULL);
 
 	for (i = 0; i < num_spaces; i++)
 	{
-		CkptTsStatus *ts_stat = &per_ts_stat[i];
-
-		ts_stat->progress_slice = (float8) num_to_scan / ts_stat->num_to_scan;
+		CkptTsStatus *st = &per_ts_stat[i];
 
-		binaryheap_add_unordered(ts_heap, PointerGetDatum(ts_stat));
+		st->progress_slice = (float8) num_to_scan / st->num_to_scan;
+		binaryheap_add_unordered(ts_heap, PointerGetDatum(st));
 	}
 
 	binaryheap_build(ts_heap);
 
-	/*
-	 * Iterate through to-be-checkpointed buffers and write the ones (still)
-	 * marked with BM_CHECKPOINT_NEEDED. The writes are balanced between
-	 * tablespaces; otherwise the sorting would lead to only one tablespace
-	 * receiving writes at a time, making inefficient use of the hardware.
-	 */
-	num_processed = 0;
-	num_written = 0;
+	/* write combining state */
+	BufWriteBatch batch;
+	ResetBufWriteBatch(&batch);
+	
 	while (!binaryheap_empty(ts_heap))
 	{
-		BufferDesc *bufHdr = NULL;
-		CkptTsStatus *ts_stat = (CkptTsStatus *)
-			DatumGetPointer(binaryheap_first(ts_heap));
+		CkptTsStatus *ts_stat =
+			(CkptTsStatus *) DatumGetPointer(binaryheap_first(ts_heap));
 
 		buf_id = CkptBufferIds[ts_stat->index].buf_id;
-		Assert(buf_id != -1);
-
-		bufHdr = GetBufferDescriptor(buf_id);
+		Assert(buf_id >= 0);
 
+		BufferDesc *bufHdr = GetBufferDescriptor(buf_id);
 		num_processed++;
 
 		/*
-		 * We don't need to acquire the lock here, because we're only looking
-		 * at a single bit. It's possible that someone else writes the buffer
-		 * and clears the flag right after we check, but that doesn't matter
-		 * since SyncOneBuffer will then do nothing.  However, there is a
-		 * further race condition: it's conceivable that between the time we
-		 * examine the bit here and the time SyncOneBuffer acquires the lock,
-		 * someone else not only wrote the buffer but replaced it with another
-		 * page and dirtied it.  In that improbable case, SyncOneBuffer will
-		 * write the buffer though we didn't need to.  It doesn't seem worth
-		 * guarding against this, though.
+		 * Write Combining:
+		 * We accumulate buffers that still have BM_CHECKPOINT_NEEDED.
 		 */
 		if (pg_atomic_read_u32(&bufHdr->state) & BM_CHECKPOINT_NEEDED)
 		{
-			if (SyncOneBuffer(buf_id, false, &wb_context) & BUF_WRITTEN)
+
+			bool start_new;
+			BufferDesc *buf = bufHdr;
+			BufferTag   tag = buf->tag;
+
+			/* Extract locator once */
+			RelFileLocator rlocator = BufTagGetRelFileLocator(&tag);
+
+			start_new = false;
+
+			if (batch.n == 0)
+			{
+    			batch.rlocator = rlocator;
+    			batch.forkno   = tag.forkNum;
+    			batch.start    = tag.blockNum;
+			}
+			else
 			{
-				TRACE_POSTGRESQL_BUFFER_SYNC_WRITTEN(buf_id);
-				PendingCheckpointerStats.buffers_written++;
-				num_written++;
+    			bool same_rel  = RelFileLocatorEquals(batch.rlocator, rlocator);
+    			bool same_fork = (batch.forkno == tag.forkNum);
+    			bool contiguous = (tag.blockNum == batch.start + batch.n);
+
+    			if (!(same_rel && same_fork && contiguous))
+        			start_new = true;
+			}
+
+			if (start_new)
+			{
+				/* Flush accumulated boatch */
+				int w = FlushBufferBatch(&batch, IOCONTEXT_NORMAL);
+				num_written += w;
+				PendingCheckpointerStats.buffers_written += w;
+				ResetBufWriteBatch(&batch);
+
+				/* initialize new batch */
+				batch.rlocator = BufTagGetRelFileLocator(&tag);
+    			batch.forkno   = tag.forkNum;
+    			batch.start    = tag.blockNum;
+			}
+
+			/* Add buffer to batch */
+			batch.bufdescs[batch.n++] = buf;
+
+			/* Track max LSN */
+			XLogRecPtr lsn;
+			BufferNeedsWALFlush(buf, &lsn);
+			if (lsn > batch.max_lsn)
+				batch.max_lsn = lsn;
+
+			/* If full, flush immediately */
+			if (batch.n == MAX_IO_COMBINE_LIMIT)
+			{
+				int w = FlushBufferBatch(&batch, IOCONTEXT_NORMAL);
+				num_written += w;
+				PendingCheckpointerStats.buffers_written += w;
+				ResetBufWriteBatch(&batch);
 			}
 		}
 
-		/*
-		 * Measure progress independent of actually having to flush the buffer
-		 * - otherwise writing become unbalanced.
-		 */
+		/* Progress accounting */
 		ts_stat->progress += ts_stat->progress_slice;
 		ts_stat->num_scanned++;
 		ts_stat->index++;
 
-		/* Have all the buffers from the tablespace been processed? */
 		if (ts_stat->num_scanned == ts_stat->num_to_scan)
-		{
 			binaryheap_remove_first(ts_heap);
-		}
 		else
-		{
-			/* update heap with the new progress */
 			binaryheap_replace_first(ts_heap, PointerGetDatum(ts_stat));
-		}
 
-		/*
-		 * Sleep to throttle our I/O rate.
-		 *
-		 * (This will check for barrier events even if it doesn't sleep.)
-		 */
 		CheckpointWriteDelay(flags, (double) num_processed / num_to_scan);
 	}
 
-	/*
-	 * Issue all pending flushes. Only checkpointer calls BufferSync(), so
-	 * IOContext will always be IOCONTEXT_NORMAL.
-	 */
+	/* Flush trailing batch */
+	if (batch.n > 0)
+	{
+		int w = FlushBufferBatch(&batch, IOCONTEXT_NORMAL);
+		num_written += w;
+		PendingCheckpointerStats.buffers_written += w;
+		ResetBufWriteBatch(&batch);
+	}
+
+	/* Complete writeback */
 	IssuePendingWritebacks(&wb_context, IOCONTEXT_NORMAL);
 
 	pfree(per_ts_stat);
-	per_ts_stat = NULL;
 	binaryheap_free(ts_heap);
 
-	/*
-	 * Update checkpoint statistics. As noted above, this doesn't include
-	 * buffers written by other backends or bgwriter scan.
-	 */
 	CheckpointStats.ckpt_bufs_written += num_written;
 
 	TRACE_POSTGRESQL_BUFFER_SYNC_DONE(NBuffers, num_written, num_to_scan);
@@ -3786,6 +3831,7 @@ BgBufferSync(WritebackContext *wb_context)
 	 * a true average we want a fast-attack, slow-decline behavior: we
 	 * immediately follow any increase.
 	 */
+
 	if (smoothed_alloc <= (float) recent_alloc)
 		smoothed_alloc = recent_alloc;
 	else
@@ -3841,8 +3887,7 @@ BgBufferSync(WritebackContext *wb_context)
 	/* Execute the LRU scan */
 	while (num_to_scan > 0 && reusable_buffers < upcoming_alloc_est)
 	{
-		int			sync_state = SyncOneBuffer(next_to_clean, true,
-											   wb_context);
+		int	sync_state = SyncOneBuffer(next_to_clean, true, wb_context);
 
 		if (++next_to_clean >= NBuffers)
 		{
@@ -3902,83 +3947,6 @@ BgBufferSync(WritebackContext *wb_context)
 	return (bufs_to_lap == 0 && recent_alloc == 0);
 }
 
-/*
- * SyncOneBuffer -- process a single buffer during syncing.
- *
- * If skip_recently_used is true, we don't write currently-pinned buffers, nor
- * buffers marked recently used, as these are not replacement candidates.
- *
- * Returns a bitmask containing the following flag bits:
- *	BUF_WRITTEN: we wrote the buffer.
- *	BUF_REUSABLE: buffer is available for replacement, ie, it has
- *		pin count 0 and usage count 0.
- *
- * (BUF_WRITTEN could be set in error if FlushBuffer finds the buffer clean
- * after locking it, but we don't care all that much.)
- */
-static int
-SyncOneBuffer(int buf_id, bool skip_recently_used, WritebackContext *wb_context)
-{
-	BufferDesc *bufHdr = GetBufferDescriptor(buf_id);
-	int			result = 0;
-	uint32		buf_state;
-	BufferTag	tag;
-
-	/* Make sure we can handle the pin */
-	ReservePrivateRefCountEntry();
-	ResourceOwnerEnlarge(CurrentResourceOwner);
-
-	/*
-	 * Check whether buffer needs writing.
-	 *
-	 * We can make this check without taking the buffer content lock so long
-	 * as we mark pages dirty in access methods *before* logging changes with
-	 * XLogInsert(): if someone marks the buffer dirty just after our check we
-	 * don't worry because our checkpoint.redo points before log record for
-	 * upcoming changes and so we are not required to write such dirty buffer.
-	 */
-	buf_state = LockBufHdr(bufHdr);
-
-	if (BUF_STATE_GET_REFCOUNT(buf_state) == 0 &&
-		BUF_STATE_GET_USAGECOUNT(buf_state) == 0)
-	{
-		result |= BUF_REUSABLE;
-	}
-	else if (skip_recently_used)
-	{
-		/* Caller told us not to write recently-used buffers */
-		UnlockBufHdr(bufHdr);
-		return result;
-	}
-
-	if (!(buf_state & BM_VALID) || !(buf_state & BM_DIRTY))
-	{
-		/* It's clean, so nothing to do */
-		UnlockBufHdr(bufHdr);
-		return result;
-	}
-
-	/*
-	 * Pin it, share-lock it, write it.  (FlushBuffer will do nothing if the
-	 * buffer is clean by the time we've locked it.)
-	 */
-	PinBuffer_Locked(bufHdr);
-
-	FlushUnlockedBuffer(bufHdr, NULL, IOOBJECT_RELATION, IOCONTEXT_NORMAL);
-
-	tag = bufHdr->tag;
-
-	UnpinBuffer(bufHdr);
-
-	/*
-	 * SyncOneBuffer() is only called by checkpointer and bgwriter, so
-	 * IOContext will always be IOCONTEXT_NORMAL.
-	 */
-	ScheduleBufferTagForWriteback(wb_context, IOCONTEXT_NORMAL, &tag);
-
-	return result | BUF_WRITTEN;
-}
-
 /*
  *		AtEOXact_Buffers - clean up at end of transaction.
  *
@@ -4412,6 +4380,169 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object,
 	error_context_stack = errcallback.previous;
 }
 
+/*
+ * BufferNeedsWALFlush
+ *
+ * Returns true if this buffer belongs to a permanent relation AND
+ * page LSN indicates - WAL must be flushed before writing it.
+ *
+ * If no flush is needed, *lsn is set to InvalidXLogRecPtr.
+ */
+static bool
+BufferNeedsWALFlush(BufferDesc *bufdesc, XLogRecPtr *lsn)
+{
+    uint32 buf_state;
+
+    /* Lock header to read BM_PERMANENT and LSN safely */
+    buf_state = LockBufHdr(bufdesc);
+
+    /* Extract LSN only if it is a permanent buffer */
+    if (buf_state & BM_PERMANENT)
+        *lsn = BufferGetLSN(bufdesc);
+    else
+        *lsn = InvalidXLogRecPtr;
+
+    UnlockBufHdr(bufdesc);
+
+    /* Skip all WAL flush logic if relation is not logged */
+    if (!(*lsn != InvalidXLogRecPtr))
+        return false;
+
+    /* Must flush WAL up to this LSN before writing the page */
+    return XLogNeedsFlush(*lsn);
+}
+
+/*
+ * CleanVictimBuffer
+ *
+ * Called by checkpointer/bgwriter when selecting a buffer to flush in a
+ * write-combining batch. The buffer must already be pinned.
+ *
+ */
+static void
+CleanVictimBuffer(BufferDesc *bufdesc, bool from_ring, IOContext io_context)
+{
+    XLogRecPtr  max_lsn = InvalidXLogRecPtr;
+    LWLock     *content_lock;
+
+    /*
+     * Acquire share-lock on buffer contents.  We must hold this until either:
+     *  - we decide the buffer does not need flushing, OR
+     *  - we finish preparing it for a write.
+     */
+    content_lock = BufHdrGetContentLock(bufdesc);
+    LWLockAcquire(content_lock, LW_SHARED);
+
+    /*
+     * Now the buffer is a valid flush target.
+     * Switch to exclusive lock for checksum + IO preparation.
+     */
+    LWLockRelease(content_lock);
+    LWLockAcquire(content_lock, LW_EXCLUSIVE);
+
+    /*
+     * Mark the buffer ready for checksum and write.
+     */
+    PrepareBufferForCheckpoint(bufdesc, &max_lsn);
+
+    /* Release exclusive lock; the batch will write the page later */
+    LWLockRelease(content_lock);
+
+    /*
+     * Add LSN to caller's batch tracking.
+     * Caller handles XLogFlush() using highest LSN.
+     */
+    PrepareBufferForCheckpoint(bufdesc, max_lsn);
+}
+
+/*
+ * FlushBufferBatch
+ *
+ * Write a batch of contiguous dirty buffers belonging to the same
+ * relation + fork, using a single smgrwritev() call.
+ *
+ * This is the minimal correct version:
+ *  - prepare buffers (caller has already selected them)
+ *  - flush WAL up to max_lsn
+ *  - compute batch checksums
+ *  - write using smgrwritev()
+ *  - schedule writeback
+ */
+int
+FlushBufferBatch(BufWriteBatch *batch, IOContext io_context)
+{
+    uint32      n = batch->n;
+    BlockNumber blknums[MAX_IO_COMBINE_LIMIT];
+    Page        pages[MAX_IO_COMBINE_LIMIT];
+    instr_time  io_start;
+    int         written = 0;
+
+    /* Nothing to do */
+    if (n == 0)
+        return 0;
+
+    /* Error context for cleaner messages on I/O failure */
+    ErrorContextCallback errcallback;
+    errcallback.callback = shared_buffer_write_error_callback;
+    errcallback.previous = error_context_stack;
+    error_context_stack = &errcallback;
+
+    /* Flush WAL if required */
+    if (XLogRecPtrIsValid(batch->max_lsn))
+        XLogFlush(batch->max_lsn);
+
+    /* Open smgr relation if needed */
+    if (batch->reln == NULL)
+        batch->reln = smgropen(batch->rlocator, INVALID_PROC_NUMBER);
+
+    /*
+     * Copy buffer pages into local array and prepare block numbers.
+     * (Later patches may eliminate this copying.)
+     */
+    for (uint32 i = 0; i < n; i++)
+    {
+        BufferDesc *buf = batch->bufdescs[i];
+        blknums[i] = batch->start + i;
+		pages[i] = (Page) BufHdrGetBlock(buf);
+    }
+
+    /* Compute checksums over the batch */
+    PageSetBatchChecksumInplace(pages, blknums, n);
+
+    /* IO timing */
+    io_start = pgstat_prepare_io_time(track_io_timing);
+
+    /* Write all pages in one system call */
+    smgrwritev(batch->reln,
+               batch->forkno,
+               batch->start,
+               (const void **) pages,
+               n,
+               false);
+
+    pgstat_count_io_op_time(IOOBJECT_RELATION,
+                            io_context,
+                            IOOP_WRITE,
+                            io_start,
+                            n,
+                            BLCKSZ);
+
+    /* Schedule writeback for each buffer */
+    for (uint32 i = 0; i < n; i++)
+    {
+        BufferDesc *buf = batch->bufdescs[i];
+        ScheduleBufferTagForWriteback(&batch->wb_context,
+                                      io_context,
+                                      &buf->tag);
+        written++;
+    }
+
+    /* restore error callback */
+    error_context_stack = errcallback.previous;
+
+    return written;
+}
+
 /*
  * Convenience wrapper around FlushBuffer() that locks/unlocks the buffer
  * before/after calling FlushBuffer().
@@ -5083,12 +5214,13 @@ FlushRelationsAllBuffers(SMgrRelation *smgrs, int nrels)
 		}
 		else
 		{
-			RelFileLocator rlocator;
+			RelFileLocator rlocator = BufTagGetRelFileLocator(&bufHdr->tag);
 
-			rlocator = BufTagGetRelFileLocator(&bufHdr->tag);
-			srelent = bsearch(&rlocator,
-							  srels, nrels, sizeof(SMgrSortArray),
-							  rlocator_comparator);
+        	srelent = bsearch(&rlocator,
+                    		  srels,
+                        	  nrels,
+                        	  sizeof(SMgrSortArray),
+                        	  rlocator_comparator);
 		}
 
 		/* buffer doesn't belong to any of the given relfilelocators; skip it */
diff --git a/src/backend/storage/page/bufpage.c b/src/backend/storage/page/bufpage.c
index aac6e695954..408a4abc444 100644
--- a/src/backend/storage/page/bufpage.c
+++ b/src/backend/storage/page/bufpage.c
@@ -1546,3 +1546,22 @@ PageSetChecksumInplace(Page page, BlockNumber blkno)
 
 	((PageHeader) page)->pd_checksum = pg_checksum_page(page, blkno);
 }
+
+void
+PageSetBatchChecksumInplace(Page *pages, const BlockNumber *blknos, uint32 length)
+{
+    /* If we don't need a checksum, just return */
+    if (!DataChecksumsEnabled())
+        return;
+
+    for (uint32 i = 0; i < length; i++)
+    {
+        Page page = pages[i];
+
+        if (PageIsNew(page))
+            continue;
+
+        ((PageHeader) page)->pd_checksum = pg_checksum_page(page, blknos[i]);
+    }
+}
+
diff --git a/src/include/storage/buf_internals.h b/src/include/storage/buf_internals.h
index 5400c56a965..6a75233c1e6 100644
--- a/src/include/storage/buf_internals.h
+++ b/src/include/storage/buf_internals.h
@@ -482,6 +482,28 @@ ResourceOwnerForgetBufferIO(ResourceOwner owner, Buffer buffer)
 {
 	ResourceOwnerForget(owner, Int32GetDatum(buffer), &buffer_io_resowner_desc);
 }
+/*
+* Used to write out multiple blocks at a time in a combined IO. bufdescs
+* contains buffer descriptors for buffers containing adjacent blocks of the
+* same fork of the same relation.
+*/
+typedef struct BufWriteBatch
+{
+    RelFileLocator rlocator;
+    ForkNumber     forkno;
+    SMgrRelation   reln;
+
+    BlockNumber    start;
+
+    XLogRecPtr     max_lsn;
+
+    WritebackContext wb_context;
+
+    uint32         n;
+    BufferDesc    *bufdescs[MAX_IO_COMBINE_LIMIT];
+} BufWriteBatch;
+
+
 
 /*
  * Internal buffer management routines
diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h
index 9f6785910e0..f9e23838e48 100644
--- a/src/include/storage/bufmgr.h
+++ b/src/include/storage/bufmgr.h
@@ -223,7 +223,9 @@ extern Buffer ReadBufferWithoutRelcache(RelFileLocator rlocator,
 										ForkNumber forkNum, BlockNumber blockNum,
 										ReadBufferMode mode, BufferAccessStrategy strategy,
 										bool permanent);
-
+extern void PageSetBatchChecksumInplace(Page *pages,
+                                 const BlockNumber *blknos,
+                                 uint32 length);
 extern bool StartReadBuffer(ReadBuffersOperation *operation,
 							Buffer *buffer,
 							BlockNumber blocknum,
-- 
2.34.1

Reply via email to