Hi,

Here are some vectored writeback patches I worked on in the 17 cycle
and posted as part of various patch sets, but didn't get into a good
enough shape to take further.  They "push" vectored writes out, but I
think what they need is to be turned inside out and converted into
users of a new hypothetical write_stream.c, so that we have a model
that will survive contact with asynchronous I/O and would "pull"
writes from a stream that controls I/O concurrency.  That all seemed a
lot less urgent to work on than reads, hence leaving on ice for now.
There is a lot of code that reads, and a small finite amount that
writes.  I think the patches show some aspects of the problem-space
though, and they certainly make checkpointing faster.  They cover 2
out of 5ish ways we write relation data: checkpointing, and strategies
AKA ring buffers.

They make checkpoints look like this, respecting io_combine_limit,
instead of lots of 8kB writes:

pwritev(9,[...],2,0x0) = 131072 (0x20000)
pwrite(9,...,131072,0x20000) = 131072 (0x20000)
pwrite(9,...,131072,0x40000) = 131072 (0x20000)
pwrite(9,...,131072,0x60000) = 131072 (0x20000)
pwrite(9,...,131072,0x80000) = 131072 (0x20000)
...

Two more ways data gets written back are: bgwriter and regular
BAS_NORMAL buffer eviction, but they are not such natural candidates
for write combining.  Well, if you know you're going to write out a
buffer, *maybe* it's worth probing the buffer pool to see if adjacent
block numbers are also present and dirty?  I don't know.  Before and
after?  Or maybe it's better to wait for the tree-based mapping table
of legend first so it becomes cheaper to navigate in block number
order.

The 5th way is raw file copy that doesn't go through the buffer pool,
such as CREATE DATABASE ... STRATEGY=FILE_COPY, which already works
with big writes, and CREATE INDEX via bulk_write.c which is easily
converted to vectored writes, and I plan to push the patches for that
shortly.  I think those should ultimately become stream-based too.

Anyway, I wanted to share these uncommitfest patches, having rebased
them over relevant recent commits, so I could leave them in working
state in case anyone is interested in this file I/O-level stuff...
From c6d227678c586387a49c30c4f9a61f62c9b04b1c Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.mu...@gmail.com>
Date: Wed, 13 Mar 2024 17:02:42 +1300
Subject: [PATCH v5 1/3] Provide vectored variant of FlushBuffer().

FlushBuffers() is just like FlushBuffer() except that it tries to write
out multiple consecutive disk blocks at a time with smgrwritev().  The
traditional simple FlushBuffer() call is now implemented in terms of it.
---
 src/backend/storage/buffer/bufmgr.c | 240 ++++++++++++++++++++--------
 src/include/storage/buf_internals.h |  10 ++
 2 files changed, 185 insertions(+), 65 deletions(-)

diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 06e9ffd2b00..a815e58e307 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -57,6 +57,7 @@
 #include "storage/smgr.h"
 #include "storage/standby.h"
 #include "utils/memdebug.h"
+#include "utils/memutils.h"
 #include "utils/ps_status.h"
 #include "utils/rel.h"
 #include "utils/resowner.h"
@@ -526,6 +527,8 @@ static inline BufferDesc *BufferAlloc(SMgrRelation smgr,
 static Buffer GetVictimBuffer(BufferAccessStrategy strategy, IOContext io_context);
 static void FlushBuffer(BufferDesc *buf, SMgrRelation reln,
 						IOObject io_object, IOContext io_context);
+static int	FlushBuffers(BufferDesc **bufs, int nbuffers, SMgrRelation reln,
+						 IOObject io_object, IOContext io_context);
 static void FindAndDropRelationBuffers(RelFileLocator rlocator,
 									   ForkNumber forkNum,
 									   BlockNumber nForkBlock,
@@ -3705,9 +3708,19 @@ BufferGetTag(Buffer buffer, RelFileLocator *rlocator, ForkNumber *forknum,
 	*blknum = bufHdr->tag.blockNum;
 }
 
+struct shared_buffer_write_error_info
+{
+	BufferDesc *buf;
+	int			nblocks;
+};
+
 /*
- * FlushBuffer
- *		Physically write out a shared buffer.
+ * FlushBuffers
+ *		Physically write out shared buffers.
+ *
+ * The buffers do not have to be consecutive in memory but must refer to
+ * consecutive blocks of the same relation fork in increasing order of block
+ * number.
  *
  * NOTE: this actually just passes the buffer contents to the kernel; the
  * real write to disk won't happen until the kernel feels like it.  This
@@ -3715,66 +3728,149 @@ BufferGetTag(Buffer buffer, RelFileLocator *rlocator, ForkNumber *forknum,
  * However, we will need to force the changes to disk via fsync before
  * we can checkpoint WAL.
  *
- * The caller must hold a pin on the buffer and have share-locked the
+ * The caller must hold a pin on the buffers and have share-locked the
  * buffer contents.  (Note: a share-lock does not prevent updates of
  * hint bits in the buffer, so the page could change while the write
  * is in progress, but we assume that that will not invalidate the data
  * written.)
  *
  * If the caller has an smgr reference for the buffer's relation, pass it
- * as the second parameter.  If not, pass NULL.
+ * as the third parameter.  If not, pass NULL.
  */
-static void
-FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object,
-			IOContext io_context)
+static int
+FlushBuffers(BufferDesc **bufs, int nbuffers, SMgrRelation reln,
+			 IOObject io_object, IOContext io_context)
 {
-	XLogRecPtr	recptr;
+	XLogRecPtr	max_recptr;
+	struct shared_buffer_write_error_info errinfo;
 	ErrorContextCallback errcallback;
 	instr_time	io_start;
-	Block		bufBlock;
-	char	   *bufToWrite;
-	uint32		buf_state;
+	int			first_start_io_index;
+	void	   *bufBlocks[MAX_IO_COMBINE_LIMIT];
+	bool		need_checksums = DataChecksumsEnabled();
+	static PGIOAlignedBlock *copies;
+
+	Assert(nbuffers > 0);
+	nbuffers = Min(nbuffers, io_combine_limit);
 
 	/*
-	 * Try to start an I/O operation.  If StartBufferIO returns false, then
-	 * someone else flushed the buffer before we could, so we need not do
-	 * anything.
+	 * Update page checksums if desired.  Since we have only shared lock on
+	 * the buffer, other processes might be updating hint bits in it, so we
+	 * must copy the page to private storage if we do checksumming.
+	 *
+	 * XXX:TODO kill static local memory, or better yet, kill unlocked hint
+	 * bit modifications so we don't need this
 	 */
-	if (!StartBufferIO(buf, false, false))
-		return;
+	if (need_checksums)
+	{
+		if (!copies)
+			copies = MemoryContextAllocAligned(TopMemoryContext,
+											   BLCKSZ * io_combine_limit,
+											   PG_IO_ALIGN_SIZE,
+											   0);
+		for (int i = 0; i < nbuffers; ++i)
+		{
+			memcpy(&copies[i], BufHdrGetBlock(bufs[i]), BLCKSZ);
+			PageSetChecksumInplace((Page) &copies[i],
+								   bufs[0]->tag.blockNum + i);
+		}
+	}
+
+	/*
+	 * Try to start an I/O operation on as many buffers as we can.  If
+	 * StartBufferIO returns false, then someone else flushed the buffer
+	 * before we could, so we need not do anything any we give up on any
+	 * remaining buffers, as they are not consecutive with the blocks we've
+	 * collected.
+	 */
+	first_start_io_index = -1;
+	for (int i = 0; i < nbuffers; ++i)
+	{
+		/* Must be consecutive blocks. */
+		if (i > 0)
+			Assert(BufferTagsConsecutive(&bufs[i - 1]->tag, &bufs[i]->tag));
+
+		/* Only wait for the first one, so we can guarantee progress. */
+		if (!StartBufferIO(bufs[i], false, i > 0))
+		{
+			if (first_start_io_index >= 0)
+			{
+				/*
+				 * We can't go any further because later blocks after this gap
+				 * would not be consecutive.  Cap nbuffers here.
+				 */
+				nbuffers = i;
+				break;
+			}
+			else
+			{
+				/*
+				 * Still searching for the first block we can win the right to
+				 * start I/O on, so it doesn't matter that we couldn't get
+				 * this one.
+				 */
+			}
+		}
+		else
+		{
+			/* Keep track of the first block we started I/O on. */
+			if (first_start_io_index < 0)
+				first_start_io_index = i;
+		}
+		/* Collect the source buffers (copies or shared buffers). */
+		bufBlocks[i] = need_checksums ? &copies[i] : BufHdrGetBlock(bufs[i]);
+	}
+
+	/* If we can't write even one buffer, then we're done. */
+	if (first_start_io_index < 0)
+		return nbuffers;
 
 	/* Setup error traceback support for ereport() */
+	errinfo.buf = bufs[first_start_io_index];
+	errinfo.nblocks = nbuffers;
 	errcallback.callback = shared_buffer_write_error_callback;
-	errcallback.arg = (void *) buf;
+	errcallback.arg = &errinfo;
 	errcallback.previous = error_context_stack;
 	error_context_stack = &errcallback;
 
 	/* Find smgr relation for buffer */
 	if (reln == NULL)
-		reln = smgropen(BufTagGetRelFileLocator(&buf->tag), INVALID_PROC_NUMBER);
+		reln = smgropen(BufTagGetRelFileLocator(&bufs[first_start_io_index]->tag),
+						INVALID_PROC_NUMBER);
 
-	TRACE_POSTGRESQL_BUFFER_FLUSH_START(BufTagGetForkNum(&buf->tag),
-										buf->tag.blockNum,
+	TRACE_POSTGRESQL_BUFFER_FLUSH_START(BufTagGetForkNum(&bufs[first_start_io_index]->tag),
+										bufs[first_start_io_index]->tag.blockNum,
 										reln->smgr_rlocator.locator.spcOid,
 										reln->smgr_rlocator.locator.dbOid,
 										reln->smgr_rlocator.locator.relNumber);
 
-	buf_state = LockBufHdr(buf);
+	/* Find the highest LSN across all the I/O-started buffers. */
+	max_recptr = 0;
+	for (int i = first_start_io_index; i < nbuffers; ++i)
+	{
+		XLogRecPtr	page_recptr;
+		uint32		buf_state;
 
-	/*
-	 * Run PageGetLSN while holding header lock, since we don't have the
-	 * buffer locked exclusively in all cases.
-	 */
-	recptr = BufferGetLSN(buf);
+		buf_state = LockBufHdr(bufs[i]);
 
-	/* To check if block content changes while flushing. - vadim 01/17/97 */
-	buf_state &= ~BM_JUST_DIRTIED;
-	UnlockBufHdr(buf, buf_state);
+		/*
+		 * Run PageGetLSN while holding header lock, since we don't have the
+		 * buffer locked exclusively in all cases.
+		 */
+		page_recptr = BufferGetLSN(bufs[i]);
+
+		/* To check if block content changes while flushing. - vadim 01/17/97 */
+		buf_state &= ~BM_JUST_DIRTIED;
+		UnlockBufHdr(bufs[i], buf_state);
+
+		if (buf_state & BM_PERMANENT)
+			max_recptr = Max(page_recptr, max_recptr);
+	}
 
 	/*
-	 * Force XLOG flush up to buffer's LSN.  This implements the basic WAL
-	 * rule that log updates must hit disk before any of the data-file changes
-	 * they describe do.
+	 * Force XLOG flush up to buffers' greatest LSN.  This implements the
+	 * basic WAL rule that log updates must hit disk before any of the
+	 * data-file changes they describe do.
 	 *
 	 * However, this rule does not apply to unlogged relations, which will be
 	 * lost after a crash anyway.  Most unlogged relation pages do not bear
@@ -3788,33 +3884,23 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object,
 	 * disastrous system-wide consequences.  To make sure that can't happen,
 	 * skip the flush if the buffer isn't permanent.
 	 */
-	if (buf_state & BM_PERMANENT)
-		XLogFlush(recptr);
+	if (max_recptr > 0)
+		XLogFlush(max_recptr);
 
 	/*
-	 * Now it's safe to write buffer to disk. Note that no one else should
+	 * Now it's safe to write buffers to disk. Note that no one else should
 	 * have been able to write it while we were busy with log flushing because
-	 * only one process at a time can set the BM_IO_IN_PROGRESS bit.
+	 * only one process at a time can set the BM_IO_IN_PROGRESS bits.
 	 */
-	bufBlock = BufHdrGetBlock(buf);
-
-	/*
-	 * Update page checksum if desired.  Since we have only shared lock on the
-	 * buffer, other processes might be updating hint bits in it, so we must
-	 * copy the page to private storage if we do checksumming.
-	 */
-	bufToWrite = PageSetChecksumCopy((Page) bufBlock, buf->tag.blockNum);
 
 	io_start = pgstat_prepare_io_time(track_io_timing);
 
-	/*
-	 * bufToWrite is either the shared buffer or a copy, as appropriate.
-	 */
-	smgrwrite(reln,
-			  BufTagGetForkNum(&buf->tag),
-			  buf->tag.blockNum,
-			  bufToWrite,
-			  false);
+	smgrwritev(reln,
+			   BufTagGetForkNum(&bufs[first_start_io_index]->tag),
+			   bufs[first_start_io_index]->tag.blockNum,
+			   (const void **) &bufBlocks[first_start_io_index],
+			   nbuffers - first_start_io_index,
+			   false);
 
 	/*
 	 * When a strategy is in use, only flushes of dirty buffers already in the
@@ -3835,24 +3921,41 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object,
 	 * of a dirty shared buffer (IOCONTEXT_NORMAL IOOP_WRITE).
 	 */
 	pgstat_count_io_op_time(IOOBJECT_RELATION, io_context,
-							IOOP_WRITE, io_start, 1);
+							IOOP_WRITE, io_start,
+							nbuffers - first_start_io_index);
 
-	pgBufferUsage.shared_blks_written++;
+	pgBufferUsage.shared_blks_written += nbuffers - first_start_io_index;
 
 	/*
-	 * Mark the buffer as clean (unless BM_JUST_DIRTIED has become set) and
+	 * Mark the buffers as clean (unless BM_JUST_DIRTIED has become set) and
 	 * end the BM_IO_IN_PROGRESS state.
 	 */
-	TerminateBufferIO(buf, true, 0, true);
+	for (int i = first_start_io_index; i < nbuffers; ++i)
+		TerminateBufferIO(bufs[i], true, 0, true);
 
-	TRACE_POSTGRESQL_BUFFER_FLUSH_DONE(BufTagGetForkNum(&buf->tag),
-									   buf->tag.blockNum,
+	TRACE_POSTGRESQL_BUFFER_FLUSH_DONE(BufTagGetForkNum(&bufs[first_start_io_index]->tag),
+									   bufs[0]->tag.blockNum,
 									   reln->smgr_rlocator.locator.spcOid,
 									   reln->smgr_rlocator.locator.dbOid,
 									   reln->smgr_rlocator.locator.relNumber);
 
 	/* Pop the error context stack */
 	error_context_stack = errcallback.previous;
+
+	return nbuffers;
+}
+
+/*
+ * FlushBuffer
+ *		Physically write out just one shared buffer.
+ *
+ * Single-block variant of FlushBuffers().
+ */
+static void
+FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object,
+			IOContext io_context)
+{
+	FlushBuffers(&buf, 1, reln, io_object, io_context);
 }
 
 /*
@@ -5621,16 +5724,23 @@ AbortBufferIO(Buffer buffer)
 static void
 shared_buffer_write_error_callback(void *arg)
 {
-	BufferDesc *bufHdr = (BufferDesc *) arg;
+	struct shared_buffer_write_error_info *info = arg;
 
 	/* Buffer is pinned, so we can read the tag without locking the spinlock */
-	if (bufHdr != NULL)
+	if (info != NULL)
 	{
-		char	   *path = relpathperm(BufTagGetRelFileLocator(&bufHdr->tag),
-									   BufTagGetForkNum(&bufHdr->tag));
-
-		errcontext("writing block %u of relation %s",
-				   bufHdr->tag.blockNum, path);
+		char	   *path = relpathperm(BufTagGetRelFileLocator(&info->buf->tag),
+									   BufTagGetForkNum(&info->buf->tag));
+
+		if (info->nblocks > 1)
+			errcontext("writing blocks %u..%u of relation %s",
+					   info->buf->tag.blockNum,
+					   info->buf->tag.blockNum + info->nblocks - 1,
+					   path);
+		else
+			errcontext("writing block %u of relation %s",
+					   info->buf->tag.blockNum,
+					   path);
 		pfree(path);
 	}
 }
diff --git a/src/include/storage/buf_internals.h b/src/include/storage/buf_internals.h
index f190e6e5e46..9e3a3fbd887 100644
--- a/src/include/storage/buf_internals.h
+++ b/src/include/storage/buf_internals.h
@@ -160,6 +160,16 @@ BufferTagsEqual(const BufferTag *tag1, const BufferTag *tag2)
 		(tag1->forkNum == tag2->forkNum);
 }
 
+static inline bool
+BufferTagsConsecutive(const BufferTag *tag1, const BufferTag *tag2)
+{
+	return (tag1->spcOid == tag2->spcOid) &&
+		(tag1->dbOid == tag2->dbOid) &&
+		(tag1->relNumber == tag2->relNumber) &&
+		(tag1->forkNum == tag2->forkNum) &&
+		(tag1->blockNum + 1 == tag2->blockNum);
+}
+
 static inline bool
 BufTagMatchesRelFileLocator(const BufferTag *tag,
 							const RelFileLocator *rlocator)
-- 
2.44.0

From 3446e14fae9539184b7eb93036cad86739254269 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.mu...@gmail.com>
Date: Wed, 13 Mar 2024 17:41:16 +1300
Subject: [PATCH v5 2/3] Use vectored writes in checkpointer.

Use FlushBuffers() instead of FlushBuffer().  This often produces wide
vectored writes, since the checkpointer already has all the dirty
buffers sorted by block number.

XXX The spreading isn't right.  We nap 16 times and then do a 16-block
write.  Should we nap once per write, or use longer naps after
multi-block writes?

XXX This is "push" based.  To adopt streaming I/O thinking and drive
asynchronous I/O and direct I/O well, what we need is a "pull" based
approach where I/O completions cause us to start more writes, so we
remain in in control of the I/O depth.
---
 src/backend/storage/buffer/bufmgr.c | 163 ++++++++++++++++++++++++----
 src/include/storage/buf_internals.h |   4 +-
 2 files changed, 147 insertions(+), 20 deletions(-)

diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index a815e58e307..073088ab938 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -528,7 +528,8 @@ static Buffer GetVictimBuffer(BufferAccessStrategy strategy, IOContext io_contex
 static void FlushBuffer(BufferDesc *buf, SMgrRelation reln,
 						IOObject io_object, IOContext io_context);
 static int	FlushBuffers(BufferDesc **bufs, int nbuffers, SMgrRelation reln,
-						 IOObject io_object, IOContext io_context);
+						 IOObject io_object, IOContext io_context,
+						 int *first_written_index, int *written);
 static void FindAndDropRelationBuffers(RelFileLocator rlocator,
 									   ForkNumber forkNum,
 									   BlockNumber nForkBlock,
@@ -1995,7 +1996,7 @@ again:
 		LWLockRelease(content_lock);
 
 		ScheduleBufferTagForWriteback(&BackendWritebackContext, io_context,
-									  &buf_hdr->tag);
+									  &buf_hdr->tag, 1);
 	}
 
 
@@ -2844,6 +2845,72 @@ UnpinBufferNoOwner(BufferDesc *buf)
 #define ST_DEFINE
 #include <lib/sort_template.h>
 
+/*
+ * Flush a range of already pinned buffers that hold consecutive blocks of a
+ * relation fork.  They are not pinned on return.  Returns the number that
+ * were written out (if this is less than nbuffers, it is because another
+ * backend already wrote some out).
+ */
+static int
+SyncBuffers(BufferDesc **bufs, int nbuffers,
+			WritebackContext *wb_context)
+{
+	int			total_written = 0;
+
+	while (nbuffers > 0)
+	{
+		int			nlocked;
+
+		/* Lock first buffer. */
+		LWLockAcquire(BufferDescriptorGetContentLock(bufs[0]), LW_SHARED);
+		nlocked = 1;
+
+		/* Lock as many more as we can without waiting, to avoid deadlocks. */
+		while (nlocked < nbuffers &&
+			   LWLockConditionalAcquire(BufferDescriptorGetContentLock(bufs[nlocked]),
+										LW_SHARED))
+			nlocked++;
+
+		while (nlocked > 0)
+		{
+			int			flushed;
+			int			written;
+			int			first_written_index;
+
+			/*
+			 * Flush as many as we can with a single write, which may be fewer
+			 * than requested if buffers in this range turn out to have been
+			 * flushed already, creating gaps between flushable block ranges.
+			 */
+			flushed = FlushBuffers(bufs, nlocked, NULL, IOOBJECT_RELATION,
+								   IOCONTEXT_NORMAL,
+								   &first_written_index, &written);
+			total_written += written;
+
+			/* Unlock in reverse order (currently more efficient). */
+			for (int i = flushed - 1; i >= 0; --i)
+				LWLockRelease(BufferDescriptorGetContentLock(bufs[i]));
+
+			/* Queue writeback control. */
+			if (written > 0)
+				ScheduleBufferTagForWriteback(wb_context,
+											  IOCONTEXT_NORMAL,
+											  &bufs[first_written_index]->tag,
+											  written);
+
+			/* Unpin. */
+			for (int i = 0; i < flushed; ++i)
+				UnpinBuffer(bufs[i]);
+
+			bufs += flushed;
+			nlocked -= flushed;
+			nbuffers -= flushed;
+		}
+	}
+
+	return total_written;
+}
+
 /*
  * BufferSync -- Write out all dirty buffers in the pool.
  *
@@ -2869,6 +2936,8 @@ BufferSync(int flags)
 	int			i;
 	int			mask = BM_DIRTY;
 	WritebackContext wb_context;
+	BufferDesc *range_buffers[MAX_IO_COMBINE_LIMIT];
+	int			range_nblocks = 0;
 
 	/*
 	 * Unless this is a shutdown checkpoint or we have been explicitly told,
@@ -3066,11 +3135,40 @@ BufferSync(int flags)
 		 */
 		if (pg_atomic_read_u32(&bufHdr->state) & BM_CHECKPOINT_NEEDED)
 		{
-			if (SyncOneBuffer(buf_id, false, &wb_context) & BUF_WRITTEN)
+			ResourceOwnerEnlarge(CurrentResourceOwner);
+			ReservePrivateRefCountEntry();
+
+			buf_state = LockBufHdr(bufHdr);
+			if (!(buf_state & BM_VALID) || !(buf_state & BM_DIRTY))
 			{
-				TRACE_POSTGRESQL_BUFFER_SYNC_WRITTEN(buf_id);
-				PendingCheckpointerStats.buffers_written++;
-				num_written++;
+				/* It's clean, so nothing to do */
+				UnlockBufHdr(bufHdr, buf_state);
+			}
+			else
+			{
+				PinBuffer_Locked(bufHdr);
+
+				/*
+				 * Check if we need to sync the range of buffers we've
+				 * collected so far, because we've collected enough already or
+				 * because our newly pinned buffer is not consecutive with the
+				 * last one.
+				 */
+				if (range_nblocks == io_combine_limit ||
+					(range_nblocks > 0 &&
+					 !BufferTagsConsecutive(&range_buffers[range_nblocks - 1]->tag,
+											&bufHdr->tag)))
+				{
+					int			written;
+
+					written = SyncBuffers(range_buffers, range_nblocks, &wb_context);
+					range_nblocks = 0;
+					num_written += written;
+					PendingCheckpointerStats.buffers_written += written;
+				}
+
+				/* Collect this pinned buffer in our range. */
+				range_buffers[range_nblocks++] = bufHdr;
 			}
 		}
 
@@ -3101,6 +3199,16 @@ BufferSync(int flags)
 		CheckpointWriteDelay(flags, (double) num_processed / num_to_scan);
 	}
 
+	/* Sync our final range. */
+	if (range_nblocks > 0)
+	{
+		int			written;
+
+		written = SyncBuffers(range_buffers, range_nblocks, &wb_context);
+		num_written += written;
+		PendingCheckpointerStats.buffers_written += written;
+	}
+
 	/*
 	 * Issue all pending flushes. Only checkpointer calls BufferSync(), so
 	 * IOContext will always be IOCONTEXT_NORMAL.
@@ -3490,7 +3598,7 @@ SyncOneBuffer(int buf_id, bool skip_recently_used, WritebackContext *wb_context)
 	 * SyncOneBuffer() is only called by checkpointer and bgwriter, so
 	 * IOContext will always be IOCONTEXT_NORMAL.
 	 */
-	ScheduleBufferTagForWriteback(wb_context, IOCONTEXT_NORMAL, &tag);
+	ScheduleBufferTagForWriteback(wb_context, IOCONTEXT_NORMAL, &tag, 1);
 
 	return result | BUF_WRITTEN;
 }
@@ -3739,7 +3847,8 @@ struct shared_buffer_write_error_info
  */
 static int
 FlushBuffers(BufferDesc **bufs, int nbuffers, SMgrRelation reln,
-			 IOObject io_object, IOContext io_context)
+			 IOObject io_object, IOContext io_context,
+			 int *first_written_index, int *written)
 {
 	XLogRecPtr	max_recptr;
 	struct shared_buffer_write_error_info errinfo;
@@ -3823,7 +3932,13 @@ FlushBuffers(BufferDesc **bufs, int nbuffers, SMgrRelation reln,
 
 	/* If we can't write even one buffer, then we're done. */
 	if (first_start_io_index < 0)
+	{
+		if (first_written_index)
+			*first_written_index = 0;
+		if (written)
+			*written = 0;
 		return nbuffers;
+	}
 
 	/* Setup error traceback support for ereport() */
 	errinfo.buf = bufs[first_start_io_index];
@@ -3942,6 +4057,12 @@ FlushBuffers(BufferDesc **bufs, int nbuffers, SMgrRelation reln,
 	/* Pop the error context stack */
 	error_context_stack = errcallback.previous;
 
+	/* Report the range of buffers that we actually wrote, if any. */
+	if (first_written_index)
+		*first_written_index = first_start_io_index;
+	if (written)
+		*written = nbuffers - first_start_io_index;
+
 	return nbuffers;
 }
 
@@ -3955,7 +4076,7 @@ static void
 FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object,
 			IOContext io_context)
 {
-	FlushBuffers(&buf, 1, reln, io_object, io_context);
+	FlushBuffers(&buf, 1, reln, io_object, io_context, NULL, NULL);
 }
 
 /*
@@ -5947,11 +6068,11 @@ WritebackContextInit(WritebackContext *context, int *max_pending)
 }
 
 /*
- * Add buffer to list of pending writeback requests.
+ * Add buffer tag range to list of pending writeback requests.
  */
 void
 ScheduleBufferTagForWriteback(WritebackContext *wb_context, IOContext io_context,
-							  BufferTag *tag)
+							  BufferTag *tag, int nblocks)
 {
 	PendingWriteback *pending;
 
@@ -5969,6 +6090,7 @@ ScheduleBufferTagForWriteback(WritebackContext *wb_context, IOContext io_context
 		pending = &wb_context->pending_writebacks[wb_context->nr_pending++];
 
 		pending->tag = *tag;
+		pending->nblocks = nblocks;
 	}
 
 	/*
@@ -6025,10 +6147,11 @@ IssuePendingWritebacks(WritebackContext *wb_context, IOContext io_context)
 		int			ahead;
 		BufferTag	tag;
 		RelFileLocator currlocator;
-		Size		nblocks = 1;
+		Size		nblocks;
 
 		cur = &wb_context->pending_writebacks[i];
 		tag = cur->tag;
+		nblocks = cur->nblocks;
 		currlocator = BufTagGetRelFileLocator(&tag);
 
 		/*
@@ -6037,6 +6160,8 @@ IssuePendingWritebacks(WritebackContext *wb_context, IOContext io_context)
 		 */
 		for (ahead = 0; i + ahead + 1 < wb_context->nr_pending; ahead++)
 		{
+			BlockNumber this_end;
+			BlockNumber next_end;
 
 			next = &wb_context->pending_writebacks[i + ahead + 1];
 
@@ -6046,15 +6171,15 @@ IssuePendingWritebacks(WritebackContext *wb_context, IOContext io_context)
 				BufTagGetForkNum(&cur->tag) != BufTagGetForkNum(&next->tag))
 				break;
 
-			/* ok, block queued twice, skip */
-			if (cur->tag.blockNum == next->tag.blockNum)
-				continue;
-
-			/* only merge consecutive writes */
-			if (cur->tag.blockNum + 1 != next->tag.blockNum)
+			/* only merge consecutive or overlapping writes */
+			if (next->tag.blockNum > tag.blockNum + nblocks)
 				break;
 
-			nblocks++;
+			/* find the nblocks value that covers the end of both */
+			this_end = tag.blockNum + nblocks;
+			next_end = next->tag.blockNum + next->nblocks;
+			nblocks = Max(this_end, next_end) - tag.blockNum;
+
 			cur = next;
 		}
 
diff --git a/src/include/storage/buf_internals.h b/src/include/storage/buf_internals.h
index 9e3a3fbd887..93822dd8407 100644
--- a/src/include/storage/buf_internals.h
+++ b/src/include/storage/buf_internals.h
@@ -301,6 +301,7 @@ typedef struct PendingWriteback
 {
 	/* could store different types of pending flushes here */
 	BufferTag	tag;
+	int			nblocks;
 } PendingWriteback;
 
 /* struct forward declared in bufmgr.h */
@@ -427,7 +428,8 @@ ResourceOwnerForgetBufferIO(ResourceOwner owner, Buffer buffer)
 extern void WritebackContextInit(WritebackContext *context, int *max_pending);
 extern void IssuePendingWritebacks(WritebackContext *wb_context, IOContext io_context);
 extern void ScheduleBufferTagForWriteback(WritebackContext *wb_context,
-										  IOContext io_context, BufferTag *tag);
+										  IOContext io_context, BufferTag *tag,
+										  int nblocks);
 
 /* freelist.c */
 extern IOContext IOContextForStrategy(BufferAccessStrategy strategy);
-- 
2.44.0

From 8a7cfbebf65f714cd2e3f24b490ed219e042e15a Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.mu...@gmail.com>
Date: Wed, 13 Mar 2024 22:37:07 +1300
Subject: [PATCH v5 3/3] Vectored ring buffer writes.

When using eg BAS_VACUUM, we usually finish up writing recently dirtied
pages out to disk.  If the blocks are sequential, we can try to write
out multiple dirty buffers in ring order, to clean them in bulk.

XXX Should we try to detect cases when it's not working and suppress the
behaviour (ie if the dirty pages are in random order)?

XXX The reported statistics need work
---
 src/backend/storage/buffer/bufmgr.c   | 90 +++++++++++++++++++++++++--
 src/backend/storage/buffer/freelist.c | 23 +++++++
 src/include/storage/buf_internals.h   |  1 +
 3 files changed, 108 insertions(+), 6 deletions(-)

diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 073088ab938..3a975a05737 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -1976,6 +1976,11 @@ again:
 		if (strategy != NULL)
 		{
 			XLogRecPtr	lsn;
+			Buffer		buffers[MAX_IO_COMBINE_LIMIT];
+			int			nbuffers;
+			BufferDesc *hdrs[MAX_IO_COMBINE_LIMIT];
+			int			nlocked;
+			int			nflushed;
 
 			/* Read the LSN while holding buffer header lock */
 			buf_state = LockBufHdr(buf_hdr);
@@ -1989,14 +1994,87 @@ again:
 				UnpinBuffer(buf_hdr);
 				goto again;
 			}
-		}
 
-		/* OK, do the I/O */
-		FlushBuffer(buf_hdr, NULL, IOOBJECT_RELATION, io_context);
-		LWLockRelease(content_lock);
+			/*
+			 * See if any future buffers could be written out at the same
+			 * time.  They must be valid, dirty, unpinned, low usage count and
+			 * consecutive, and we must be able to acquire the content lock
+			 * without waiting to avoid deadlocks.
+			 */
+			nbuffers = StrategyPeek(strategy, buffers, io_combine_limit);
+			Assert(nbuffers >= 1);
+			Assert(buffers[0] == buf);
+			hdrs[0] = buf_hdr;
+			nlocked = 1;
+			while (nlocked < nbuffers)
+			{
+				BufferDesc *hdr;
+				uint32		state;
+
+				ReservePrivateRefCountEntry();
+				ResourceOwnerEnlarge(CurrentResourceOwner);
+				hdr = GetBufferDescriptor(buffers[nlocked] - 1);
+				state = LockBufHdr(hdr);
+				if ((state & (BM_DIRTY | BM_VALID)) == (BM_DIRTY | BM_VALID) &&
+					BUF_STATE_GET_REFCOUNT(state) == 0 &&
+					BUF_STATE_GET_USAGECOUNT(state) <= 1)
+				{
+					PinBuffer_Locked(hdr);
+					if (BufferTagsConsecutive(&hdrs[nlocked - 1]->tag,
+											  &hdr->tag) &&
+						LWLockConditionalAcquire(BufferDescriptorGetContentLock(hdr),
+												 LW_SHARED))
+					{
+						/* Yes, we can extend the range by one. */
+						hdrs[nlocked++] = hdr;
+					}
+					else
+					{
+						UnpinBuffer(hdr);
+						break;
+					}
+				}
+				else
+				{
+					UnlockBufHdr(hdr, state);
+					break;
+				}
+			}
 
-		ScheduleBufferTagForWriteback(&BackendWritebackContext, io_context,
-									  &buf_hdr->tag, 1);
+			/*
+			 * Flush as many of these as we can.  This may stop partway
+			 * through, if it can't get an I/O lock, but it always flushes at
+			 * least one.  Since we're only trying to be helpful by doing more
+			 * than one, no need to loop for the rest.
+			 */
+			nflushed = FlushBuffers(hdrs, nlocked, NULL,
+									IOOBJECT_RELATION, io_context,
+									NULL, NULL);
+			Assert(nflushed >= 1);
+
+			/* Unlock in reverse order because it goes faster. */
+			for (int i = nlocked - 1; i >= 0; --i)
+				LWLockRelease(BufferDescriptorGetContentLock(hdrs[i]));
+
+			ScheduleBufferTagForWriteback(&BackendWritebackContext, io_context,
+										  &buf_hdr->tag, nflushed);
+
+			/* Unpin all but the first. */
+			for (int i = 1; i < nlocked; ++i)
+				UnpinBuffer(hdrs[i]);
+		}
+		else
+		{
+			/*
+			 * Do single block I/O.  There is no reason to expect buffers that
+			 * don't come from a ring to be consecutive.
+			 */
+			FlushBuffer(buf_hdr, NULL, IOOBJECT_RELATION, io_context);
+			LWLockRelease(content_lock);
+
+			ScheduleBufferTagForWriteback(&BackendWritebackContext, io_context,
+										  &buf_hdr->tag, 1);
+		}
 	}
 
 
diff --git a/src/backend/storage/buffer/freelist.c b/src/backend/storage/buffer/freelist.c
index 3611357fa30..6a3e548e7ca 100644
--- a/src/backend/storage/buffer/freelist.c
+++ b/src/backend/storage/buffer/freelist.c
@@ -772,3 +772,26 @@ StrategyRejectBuffer(BufferAccessStrategy strategy, BufferDesc *buf, bool from_r
 
 	return true;
 }
+
+/*
+ * StrategyPeek -- which buffers will be returned by future calls to
+ * GetBufferFromRing()?  The 'current' buffer, ie most recently returned by
+ * GetBufferFromRing() will be in position 0.
+ */
+int
+StrategyPeek(BufferAccessStrategy strategy, Buffer *buffers, int nbuffers)
+{
+	int			cursor = strategy->current;
+	int			n = 0;
+
+	while (strategy->buffers[cursor] != InvalidBuffer && n < nbuffers)
+	{
+		buffers[n++] = strategy->buffers[cursor++];
+		if (cursor == strategy->nbuffers)
+			cursor = 0;
+		if (cursor == strategy->current)
+			break;
+	}
+
+	return n;
+}
diff --git a/src/include/storage/buf_internals.h b/src/include/storage/buf_internals.h
index 93822dd8407..588864e156c 100644
--- a/src/include/storage/buf_internals.h
+++ b/src/include/storage/buf_internals.h
@@ -438,6 +438,7 @@ extern BufferDesc *StrategyGetBuffer(BufferAccessStrategy strategy,
 extern void StrategyFreeBuffer(BufferDesc *buf);
 extern bool StrategyRejectBuffer(BufferAccessStrategy strategy,
 								 BufferDesc *buf, bool from_ring);
+extern int	StrategyPeek(BufferAccessStrategy strategy, Buffer *buffers, int nbuffers);
 
 extern int	StrategySyncStart(uint32 *complete_passes, uint32 *num_buf_alloc);
 extern void StrategyNotifyBgWriter(int bgwprocno);
-- 
2.44.0

Reply via email to