On Thu, Sep 11, 2025 at 11:33 PM Chao Li <[email protected]> wrote:
>
> I don’t understand why the two versions are different:
>
> if (XLogNeedsFlush(lsn))
> {
> /*
> * Remove the dirty buffer from the ring; necessary to prevent an
> * infinite loop if all ring members are dirty.
> */
> strategy->buffers[strategy->current] = InvalidBuffer;
> return true;
> }
>
> return false;
>
> VS
>
> if (XLogNeedsFlush(lsn))
> return false;
I think you mean
if (!XLogNeedsFlush(lsn))
{
return false;
}
// remove buffer
return true
is the same as
if (XLogNeedsFlush(lsn))
{
//remove dirty buffer
return true
}
return false;
Which is true. I've changed it to be like that.
Attached version 7 is rebased and has some bug fixes.
I also added a bonus batch on the end (0007) that refactors
SyncOneBuffer() to use the CAS loop pattern for pinning the buffer
that Andres introduced in 5e89985928795f243. bgwriter is now the only
user of SyncOneBuffer() and it rejects writing out buffers that are
used, so it seemed like a decent use case for this.
- Melanie
From ade120635e4d20b36829200f9e6806063ff4eb7a Mon Sep 17 00:00:00 2001
From: Melanie Plageman <[email protected]>
Date: Wed, 15 Oct 2025 10:53:48 -0400
Subject: [PATCH v7 1/7] Refactor goto into for loop in GetVictimBuffer()
GetVictimBuffer() implemented a loop to optimistically lock a clean
victim buffer using a goto. Future commits will add batch flushing
functionality to GetVictimBuffer. The new logic works better with
standard for loop flow control.
This commit is only a refactor and does not introduce any new
functionality.
Author: Melanie Plageman <[email protected]>
Reviewed-by: Nazir Bilal Yavuz <[email protected]>
Reviewed-by: Chao Li <[email protected]>
Discussion: https://postgr.es/m/2FA0BAC7-5413-4ABD-94CA-4398FE77750D%40gmail.com
Discussion: https://postgr.es/m/flat/CAAKRu_Yjn4mvN9NBxtmsCQSGwup45CoA4e05nhR7ADP-v0WCig%40mail.gmail.com
---
src/backend/storage/buffer/bufmgr.c | 189 ++++++++++++--------------
src/backend/storage/buffer/freelist.c | 20 ++-
src/include/storage/buf_internals.h | 6 +
3 files changed, 112 insertions(+), 103 deletions(-)
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index edf17ce3ea1..453fa16de84 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -68,10 +68,6 @@
#include "utils/timestamp.h"
-/* Note: these two macros only work on shared buffers, not local ones! */
-#define BufHdrGetBlock(bufHdr) ((Block) (BufferBlocks + ((Size) (bufHdr)->buf_id) * BLCKSZ))
-#define BufferGetLSN(bufHdr) (PageGetLSN(BufHdrGetBlock(bufHdr)))
-
/* Note: this macro only works on local buffers, not shared ones! */
#define LocalBufHdrGetBlock(bufHdr) \
LocalBufferBlockPointers[-((bufHdr)->buf_id + 2)]
@@ -2331,125 +2327,116 @@ GetVictimBuffer(BufferAccessStrategy strategy, IOContext io_context)
ReservePrivateRefCountEntry();
ResourceOwnerEnlarge(CurrentResourceOwner);
- /* we return here if a prospective victim buffer gets used concurrently */
-again:
-
- /*
- * Select a victim buffer. The buffer is returned pinned and owned by
- * this backend.
- */
- buf_hdr = StrategyGetBuffer(strategy, &buf_state, &from_ring);
- buf = BufferDescriptorGetBuffer(buf_hdr);
-
- /*
- * We shouldn't have any other pins for this buffer.
- */
- CheckBufferIsPinnedOnce(buf);
-
- /*
- * If the buffer was dirty, try to write it out. There is a race
- * condition here, in that someone might dirty it after we released the
- * buffer header lock above, or even while we are writing it out (since
- * our share-lock won't prevent hint-bit updates). We will recheck the
- * dirty bit after re-locking the buffer header.
- */
- if (buf_state & BM_DIRTY)
+ /* Select a victim buffer using an optimistic locking scheme. */
+ for (;;)
{
- LWLock *content_lock;
- Assert(buf_state & BM_TAG_VALID);
- Assert(buf_state & BM_VALID);
+ /* Attempt to claim a victim buffer. Buffer is returned pinned. */
+ buf_hdr = StrategyGetBuffer(strategy, &buf_state, &from_ring);
+ buf = BufferDescriptorGetBuffer(buf_hdr);
/*
- * We need a share-lock on the buffer contents to write it out (else
- * we might write invalid data, eg because someone else is compacting
- * the page contents while we write). We must use a conditional lock
- * acquisition here to avoid deadlock. Even though the buffer was not
- * pinned (and therefore surely not locked) when StrategyGetBuffer
- * returned it, someone else could have pinned and exclusive-locked it
- * by the time we get here. If we try to get the lock unconditionally,
- * we'd block waiting for them; if they later block waiting for us,
- * deadlock ensues. (This has been observed to happen when two
- * backends are both trying to split btree index pages, and the second
- * one just happens to be trying to split the page the first one got
- * from StrategyGetBuffer.)
+ * We shouldn't have any other pins for this buffer.
*/
- content_lock = BufferDescriptorGetContentLock(buf_hdr);
- if (!LWLockConditionalAcquire(content_lock, LW_SHARED))
- {
- /*
- * Someone else has locked the buffer, so give it up and loop back
- * to get another one.
- */
- UnpinBuffer(buf_hdr);
- goto again;
- }
+ CheckBufferIsPinnedOnce(buf);
/*
- * If using a nondefault strategy, and writing the buffer would
- * require a WAL flush, let the strategy decide whether to go ahead
- * and write/reuse the buffer or to choose another victim. We need a
- * lock to inspect the page LSN, so this can't be done inside
- * StrategyGetBuffer.
+ * If the buffer was dirty, try to write it out. There is a race
+ * condition here, in that someone might dirty it after we released
+ * the buffer header lock above, or even while we are writing it out
+ * (since our share-lock won't prevent hint-bit updates). We will
+ * recheck the dirty bit after re-locking the buffer header.
*/
- if (strategy != NULL)
+ if (buf_state & BM_DIRTY)
{
- XLogRecPtr lsn;
+ LWLock *content_lock;
- /* Read the LSN while holding buffer header lock */
- buf_state = LockBufHdr(buf_hdr);
- lsn = BufferGetLSN(buf_hdr);
- UnlockBufHdr(buf_hdr, buf_state);
+ Assert(buf_state & BM_TAG_VALID);
+ Assert(buf_state & BM_VALID);
- if (XLogNeedsFlush(lsn)
- && StrategyRejectBuffer(strategy, buf_hdr, from_ring))
+ /*
+ * We need a share-lock on the buffer contents to write it out
+ * (else we might write invalid data, eg because someone else is
+ * compacting the page contents while we write). We must use a
+ * conditional lock acquisition here to avoid deadlock. Even
+ * though the buffer was not pinned (and therefore surely not
+ * locked) when StrategyGetBuffer returned it, someone else could
+ * have pinned and exclusive-locked it by the time we get here. If
+ * we try to get the lock unconditionally, we'd block waiting for
+ * them; if they later block waiting for us, deadlock ensues.
+ * (This has been observed to happen when two backends are both
+ * trying to split btree index pages, and the second one just
+ * happens to be trying to split the page the first one got from
+ * StrategyGetBuffer.)
+ */
+ content_lock = BufferDescriptorGetContentLock(buf_hdr);
+ if (!LWLockConditionalAcquire(content_lock, LW_SHARED))
+ {
+ /*
+ * Someone else has locked the buffer, so give it up and loop
+ * back to get another one.
+ */
+ UnpinBuffer(buf_hdr);
+ continue;
+ }
+
+ /*
+ * If using a nondefault strategy, and writing the buffer would
+ * require a WAL flush, let the strategy decide whether to go
+ * ahead and write/reuse the buffer or to choose another victim.
+ * We need the content lock to inspect the page LSN, so this can't
+ * be done inside StrategyGetBuffer.
+ */
+ if (StrategyRejectBuffer(strategy, buf_hdr, from_ring))
{
LWLockRelease(content_lock);
UnpinBuffer(buf_hdr);
- goto again;
+ continue;
}
- }
- /* OK, do the I/O */
- FlushBuffer(buf_hdr, NULL, IOOBJECT_RELATION, io_context);
- LWLockRelease(content_lock);
+ /* OK, do the I/O */
+ FlushBuffer(buf_hdr, NULL, IOOBJECT_RELATION, io_context);
+ LWLockRelease(content_lock);
- ScheduleBufferTagForWriteback(&BackendWritebackContext, io_context,
- &buf_hdr->tag);
- }
+ ScheduleBufferTagForWriteback(&BackendWritebackContext, io_context,
+ &buf_hdr->tag);
+ }
- if (buf_state & BM_VALID)
- {
+ if (buf_state & BM_VALID)
+ {
+ /*
+ * When a BufferAccessStrategy is in use, blocks evicted from
+ * shared buffers are counted as IOOP_EVICT in the corresponding
+ * context (e.g. IOCONTEXT_BULKWRITE). Shared buffers are evicted
+ * by a strategy in two cases: 1) while initially claiming buffers
+ * for the strategy ring 2) to replace an existing strategy ring
+ * buffer because it is pinned or in use and cannot be reused.
+ *
+ * Blocks evicted from buffers already in the strategy ring are
+ * counted as IOOP_REUSE in the corresponding strategy context.
+ *
+ * At this point, we can accurately count evictions and reuses,
+ * because we have successfully claimed the valid buffer.
+ * Previously, we may have been forced to release the buffer due
+ * to concurrent pinners or erroring out.
+ */
+ pgstat_count_io_op(IOOBJECT_RELATION, io_context,
+ from_ring ? IOOP_REUSE : IOOP_EVICT, 1, 0);
+ }
+
/*
- * When a BufferAccessStrategy is in use, blocks evicted from shared
- * buffers are counted as IOOP_EVICT in the corresponding context
- * (e.g. IOCONTEXT_BULKWRITE). Shared buffers are evicted by a
- * strategy in two cases: 1) while initially claiming buffers for the
- * strategy ring 2) to replace an existing strategy ring buffer
- * because it is pinned or in use and cannot be reused.
- *
- * Blocks evicted from buffers already in the strategy ring are
- * counted as IOOP_REUSE in the corresponding strategy context.
- *
- * At this point, we can accurately count evictions and reuses,
- * because we have successfully claimed the valid buffer. Previously,
- * we may have been forced to release the buffer due to concurrent
- * pinners or erroring out.
+ * If the buffer has an entry in the buffer mapping table, delete it.
+ * This can fail because another backend could have pinned or dirtied
+ * the buffer.
*/
- pgstat_count_io_op(IOOBJECT_RELATION, io_context,
- from_ring ? IOOP_REUSE : IOOP_EVICT, 1, 0);
- }
+ if ((buf_state & BM_TAG_VALID) && !InvalidateVictimBuffer(buf_hdr))
+ {
+ UnpinBuffer(buf_hdr);
+ continue;
+ }
- /*
- * If the buffer has an entry in the buffer mapping table, delete it. This
- * can fail because another backend could have pinned or dirtied the
- * buffer.
- */
- if ((buf_state & BM_TAG_VALID) && !InvalidateVictimBuffer(buf_hdr))
- {
- UnpinBuffer(buf_hdr);
- goto again;
+ break;
}
/* a final set of sanity checks */
diff --git a/src/backend/storage/buffer/freelist.c b/src/backend/storage/buffer/freelist.c
index 7fe34d3ef4c..b76be264eb5 100644
--- a/src/backend/storage/buffer/freelist.c
+++ b/src/backend/storage/buffer/freelist.c
@@ -15,6 +15,7 @@
*/
#include "postgres.h"
+#include "access/xlog.h"
#include "pgstat.h"
#include "port/atomics.h"
#include "storage/buf_internals.h"
@@ -779,12 +780,21 @@ IOContextForStrategy(BufferAccessStrategy strategy)
* be written out and doing so would require flushing WAL too. This gives us
* a chance to choose a different victim.
*
+ * The buffer must be pinned and content locked and the buffer header spinlock
+ * must not be held. We must hold the content lock to examine the LSN.
+ *
* Returns true if buffer manager should ask for a new victim, and false
* if this buffer should be written and re-used.
*/
bool
StrategyRejectBuffer(BufferAccessStrategy strategy, BufferDesc *buf, bool from_ring)
{
+ uint32 buf_state;
+ XLogRecPtr lsn;
+
+ if (!strategy)
+ return false;
+
/* We only do this in bulkread mode */
if (strategy->btype != BAS_BULKREAD)
return false;
@@ -794,11 +804,17 @@ StrategyRejectBuffer(BufferAccessStrategy strategy, BufferDesc *buf, bool from_r
strategy->buffers[strategy->current] != BufferDescriptorGetBuffer(buf))
return false;
+ buf_state = LockBufHdr(buf);
+ lsn = BufferGetLSN(buf);
+ UnlockBufHdr(buf, buf_state);
+
+ if (XLogNeedsFlush(lsn))
+ return false;
+
/*
- * Remove the dirty buffer from the ring; necessary to prevent infinite
+ * Remove the dirty buffer from the ring; necessary to prevent an infinite
* loop if all ring members are dirty.
*/
strategy->buffers[strategy->current] = InvalidBuffer;
-
return true;
}
diff --git a/src/include/storage/buf_internals.h b/src/include/storage/buf_internals.h
index c1206a46aba..7e258383048 100644
--- a/src/include/storage/buf_internals.h
+++ b/src/include/storage/buf_internals.h
@@ -421,6 +421,12 @@ ResourceOwnerForgetBufferIO(ResourceOwner owner, Buffer buffer)
/*
* Internal buffer management routines
*/
+
+
+/* Note: these two macros only work on shared buffers, not local ones! */
+#define BufHdrGetBlock(bufHdr) ((Block) (BufferBlocks + ((Size) (bufHdr)->buf_id) * BLCKSZ))
+#define BufferGetLSN(bufHdr) (PageGetLSN(BufHdrGetBlock(bufHdr)))
+
/* bufmgr.c */
extern void WritebackContextInit(WritebackContext *context, int *max_pending);
extern void IssuePendingWritebacks(WritebackContext *wb_context, IOContext io_context);
--
2.43.0
From 6dfe3036229f25b9708109c460ce9c1425111650 Mon Sep 17 00:00:00 2001
From: Melanie Plageman <[email protected]>
Date: Wed, 15 Oct 2025 10:54:19 -0400
Subject: [PATCH v7 2/7] Split FlushBuffer() into two parts
Before adding write combining to write a batch of blocks when flushing
dirty buffers, refactor FlushBuffer() into the preparatory step and
actual buffer flushing step. This separation procides symmetry with
future code for batch flushing which necessarily separates these steps,
as it must prepare multiple buffers before flushing them together.
These steps are moved into a new FlushBuffer() helper function,
CleanVictimBuffer() which will contain both the batch flushing and
single flush code in future commits.
Author: Melanie Plageman <[email protected]>
Reviewed-by: Chao Li <[email protected]>
Reviewed-by: Nazir Bilal Yavuz <[email protected]>
Discussion: https://postgr.es/m/2FA0BAC7-5413-4ABD-94CA-4398FE77750D%40gmail.com
Discussion: https://postgr.es/m/flat/CAAKRu_Yjn4mvN9NBxtmsCQSGwup45CoA4e05nhR7ADP-v0WCig%40mail.gmail.com
---
src/backend/storage/buffer/bufmgr.c | 143 +++++++++++++++++++---------
1 file changed, 100 insertions(+), 43 deletions(-)
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 453fa16de84..769138a5373 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -533,6 +533,12 @@ static void FlushUnlockedBuffer(BufferDesc *buf, SMgrRelation reln,
IOObject io_object, IOContext io_context);
static void FlushBuffer(BufferDesc *buf, SMgrRelation reln,
IOObject io_object, IOContext io_context);
+static bool PrepareFlushBuffer(BufferDesc *bufdesc, XLogRecPtr *lsn);
+static void DoFlushBuffer(BufferDesc *buf, SMgrRelation reln,
+ IOObject io_object, IOContext io_context,
+ XLogRecPtr buffer_lsn);
+static void CleanVictimBuffer(BufferDesc *bufdesc, bool from_ring,
+ IOContext io_context);
static void FindAndDropRelationBuffers(RelFileLocator rlocator,
ForkNumber forkNum,
BlockNumber nForkBlock,
@@ -2394,12 +2400,8 @@ GetVictimBuffer(BufferAccessStrategy strategy, IOContext io_context)
continue;
}
- /* OK, do the I/O */
- FlushBuffer(buf_hdr, NULL, IOOBJECT_RELATION, io_context);
- LWLockRelease(content_lock);
-
- ScheduleBufferTagForWriteback(&BackendWritebackContext, io_context,
- &buf_hdr->tag);
+ /* Content lock is released inside CleanVictimBuffer */
+ CleanVictimBuffer(buf_hdr, from_ring, io_context);
}
@@ -4276,53 +4278,67 @@ static void
FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object,
IOContext io_context)
{
- XLogRecPtr recptr;
- ErrorContextCallback errcallback;
- instr_time io_start;
- Block bufBlock;
- char *bufToWrite;
- uint32 buf_state;
+ XLogRecPtr lsn;
- /*
- * Try to start an I/O operation. If StartBufferIO returns false, then
- * someone else flushed the buffer before we could, so we need not do
- * anything.
- */
- if (!StartBufferIO(buf, false, false))
- return;
+ if (PrepareFlushBuffer(buf, &lsn))
+ DoFlushBuffer(buf, reln, io_object, io_context, lsn);
+}
- /* Setup error traceback support for ereport() */
- errcallback.callback = shared_buffer_write_error_callback;
- errcallback.arg = buf;
- errcallback.previous = error_context_stack;
- error_context_stack = &errcallback;
+/*
+ * Prepare and write out a dirty victim buffer.
+ *
+ * Buffer must be pinned, the content lock must be held exclusively, and the
+ * buffer header spinlock must not be held. The exclusive lock is released and
+ * the buffer is returned pinned but not locked.
+ *
+ * bufdesc may be modified.
+ */
+static void
+CleanVictimBuffer(BufferDesc *bufdesc,
+ bool from_ring, IOContext io_context)
+{
- /* Find smgr relation for buffer */
- if (reln == NULL)
- reln = smgropen(BufTagGetRelFileLocator(&buf->tag), INVALID_PROC_NUMBER);
+ XLogRecPtr max_lsn = InvalidXLogRecPtr;
+ LWLock *content_lock;
- TRACE_POSTGRESQL_BUFFER_FLUSH_START(BufTagGetForkNum(&buf->tag),
- buf->tag.blockNum,
- reln->smgr_rlocator.locator.spcOid,
- reln->smgr_rlocator.locator.dbOid,
- reln->smgr_rlocator.locator.relNumber);
+ Assert(pg_atomic_read_u32(&bufdesc->state) & BM_DIRTY);
- buf_state = LockBufHdr(buf);
+ /* Set up this victim buffer to be flushed */
+ if (!PrepareFlushBuffer(bufdesc, &max_lsn))
+ return;
+
+ DoFlushBuffer(bufdesc, NULL, IOOBJECT_RELATION, io_context, max_lsn);
+ content_lock = BufferDescriptorGetContentLock(bufdesc);
+ LWLockRelease(content_lock);
+ ScheduleBufferTagForWriteback(&BackendWritebackContext, io_context,
+ &bufdesc->tag);
+}
+
+/*
+ * Prepare the buffer with bufdesc for writing. Returns true if the buffer
+ * acutally needs writing and false otherwise. lsn returns the buffer's LSN if
+ * the table is logged.
+ */
+static bool
+PrepareFlushBuffer(BufferDesc *bufdesc, XLogRecPtr *lsn)
+{
+ uint32 buf_state;
/*
- * Run PageGetLSN while holding header lock, since we don't have the
- * buffer locked exclusively in all cases.
+ * Try to start an I/O operation. If StartBufferIO returns false, then
+ * someone else flushed the buffer before we could, so we need not do
+ * anything.
*/
- recptr = BufferGetLSN(buf);
+ if (!StartBufferIO(bufdesc, false, false))
+ return false;
- /* To check if block content changes while flushing. - vadim 01/17/97 */
- buf_state &= ~BM_JUST_DIRTIED;
- UnlockBufHdr(buf, buf_state);
+ *lsn = InvalidXLogRecPtr;
+ buf_state = LockBufHdr(bufdesc);
/*
- * Force XLOG flush up to buffer's LSN. This implements the basic WAL
- * rule that log updates must hit disk before any of the data-file changes
- * they describe do.
+ * Record the buffer's LSN. We will force XLOG flush up to buffer's LSN.
+ * This implements the basic WAL rule that log updates must hit disk
+ * before any of the data-file changes they describe do.
*
* However, this rule does not apply to unlogged relations, which will be
* lost after a crash anyway. Most unlogged relation pages do not bear
@@ -4335,9 +4351,50 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object,
* happen, attempting to flush WAL through that location would fail, with
* disastrous system-wide consequences. To make sure that can't happen,
* skip the flush if the buffer isn't permanent.
+ *
+ * We must hold the buffer header lock when examining the page LSN since
+ * don't have buffer exclusively locked in all cases.
*/
if (buf_state & BM_PERMANENT)
- XLogFlush(recptr);
+ *lsn = BufferGetLSN(bufdesc);
+
+ /* To check if block content changes while flushing. - vadim 01/17/97 */
+ buf_state &= ~BM_JUST_DIRTIED;
+ UnlockBufHdr(bufdesc, buf_state);
+ return true;
+}
+
+/*
+ * Actually do the write I/O to clean a buffer. buf and reln may be modified.
+ */
+static void
+DoFlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object,
+ IOContext io_context, XLogRecPtr buffer_lsn)
+{
+ ErrorContextCallback errcallback;
+ instr_time io_start;
+ Block bufBlock;
+ char *bufToWrite;
+
+ /* Setup error traceback support for ereport() */
+ errcallback.callback = shared_buffer_write_error_callback;
+ errcallback.arg = buf;
+ errcallback.previous = error_context_stack;
+ error_context_stack = &errcallback;
+
+ /* Find smgr relation for buffer */
+ if (reln == NULL)
+ reln = smgropen(BufTagGetRelFileLocator(&buf->tag), INVALID_PROC_NUMBER);
+
+ TRACE_POSTGRESQL_BUFFER_FLUSH_START(BufTagGetForkNum(&buf->tag),
+ buf->tag.blockNum,
+ reln->smgr_rlocator.locator.spcOid,
+ reln->smgr_rlocator.locator.dbOid,
+ reln->smgr_rlocator.locator.relNumber);
+
+ /* Force XLOG flush up to buffer's LSN */
+ if (!XLogRecPtrIsInvalid(buffer_lsn))
+ XLogFlush(buffer_lsn);
/*
* Now it's safe to write the buffer to disk. Note that no one else should
--
2.43.0
From 75209f0288a5d539168aad7b177e6629f4790569 Mon Sep 17 00:00:00 2001
From: Melanie Plageman <[email protected]>
Date: Wed, 15 Oct 2025 13:15:43 -0400
Subject: [PATCH v7 3/7] Eagerly flush bulkwrite strategy ring
Operations using BAS_BULKWRITE (COPY FROM and createdb) will inevitably
need to flush buffers in the strategy ring in order to reuse them. By
eagerly flushing the buffers in a larger run, we encourage larger writes
at the kernel level and less interleaving of WAL flushes and data file
writes. The effect is mainly noticeable with multiple parallel COPY
FROMs. In this case, client backends achieve higher write throughput and
end up spending less time waiting on acquiring the lock to flush WAL.
Larger flush operations also mean less time waiting for flush operations
at the kernel level.
The heuristic for eager eviction is to only flush buffers in the
strategy ring which do not require a WAL flush.
This patch also is a step toward AIO writes.
Reviewed-by: Chao Li <[email protected]>
Reviewed-by: Nazir Bilal Yavuz <[email protected]>
Earlier version Reviewed-by: Kirill Reshke <[email protected]>
Discussion: https://postgr.es/m/2FA0BAC7-5413-4ABD-94CA-4398FE77750D%40gmail.com
Discussion: https://postgr.es/m/flat/CAAKRu_Yjn4mvN9NBxtmsCQSGwup45CoA4e05nhR7ADP-v0WCig%40mail.gmail.com
---
src/backend/storage/buffer/bufmgr.c | 238 +++++++++++++++++++++++++-
src/backend/storage/buffer/freelist.c | 48 ++++++
src/include/storage/buf_internals.h | 4 +
3 files changed, 282 insertions(+), 8 deletions(-)
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 769138a5373..7a553b8cdd2 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -531,14 +531,25 @@ static void CheckReadBuffersOperation(ReadBuffersOperation *operation, bool is_c
static Buffer GetVictimBuffer(BufferAccessStrategy strategy, IOContext io_context);
static void FlushUnlockedBuffer(BufferDesc *buf, SMgrRelation reln,
IOObject io_object, IOContext io_context);
+
static void FlushBuffer(BufferDesc *buf, SMgrRelation reln,
IOObject io_object, IOContext io_context);
-static bool PrepareFlushBuffer(BufferDesc *bufdesc, XLogRecPtr *lsn);
+static BufferDesc *NextStratBufToFlush(BufferAccessStrategy strategy,
+ Buffer sweep_end,
+ XLogRecPtr *lsn, int *sweep_cursor);
+
+static bool BufferNeedsWALFlush(BufferDesc *bufdesc, XLogRecPtr *lsn);
+static BufferDesc *PrepareOrRejectEagerFlushBuffer(Buffer bufnum, BlockNumber require,
+ RelFileLocator *rlocator, bool skip_pinned,
+ XLogRecPtr *max_lsn);
+static bool PrepareFlushBuffer(BufferDesc *bufdesc,
+ XLogRecPtr *lsn);
static void DoFlushBuffer(BufferDesc *buf, SMgrRelation reln,
IOObject io_object, IOContext io_context,
XLogRecPtr buffer_lsn);
-static void CleanVictimBuffer(BufferDesc *bufdesc, bool from_ring,
- IOContext io_context);
+static void CleanVictimBuffer(BufferAccessStrategy strategy,
+ BufferDesc *bufdesc,
+ bool from_ring, IOContext io_context);
static void FindAndDropRelationBuffers(RelFileLocator rlocator,
ForkNumber forkNum,
BlockNumber nForkBlock,
@@ -2401,7 +2412,7 @@ GetVictimBuffer(BufferAccessStrategy strategy, IOContext io_context)
}
/* Content lock is released inside CleanVictimBuffer */
- CleanVictimBuffer(buf_hdr, from_ring, io_context);
+ CleanVictimBuffer(strategy, buf_hdr, from_ring, io_context);
}
@@ -4284,6 +4295,61 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object,
DoFlushBuffer(buf, reln, io_object, io_context, lsn);
}
+/*
+ * Returns true if the buffer needs WAL flushed before it can be written out.
+ * Caller must not already hold the buffer header spinlock.
+ */
+static bool
+BufferNeedsWALFlush(BufferDesc *bufdesc, XLogRecPtr *lsn)
+{
+ uint32 buf_state = LockBufHdr(bufdesc);
+
+ *lsn = BufferGetLSN(bufdesc);
+
+ UnlockBufHdr(bufdesc, buf_state);
+
+ /*
+ * See buffer flushing code for more details on why we condition this on
+ * the relation being logged.
+ */
+ return buf_state & BM_PERMANENT && XLogNeedsFlush(*lsn);
+}
+
+
+/*
+ * Returns the buffer descriptor of the buffer containing the next block we
+ * should eagerly flush or NULL when there are no further buffers to consider
+ * writing out.
+ */
+static BufferDesc *
+NextStratBufToFlush(BufferAccessStrategy strategy,
+ Buffer sweep_end,
+ XLogRecPtr *lsn, int *sweep_cursor)
+{
+ Buffer bufnum;
+ BufferDesc *bufdesc;
+
+ while ((bufnum =
+ StrategySweepNextBuffer(strategy, sweep_cursor)) != sweep_end)
+ {
+ /*
+ * For BAS_BULKWRITE, once you hit an InvalidBuffer, the remaining
+ * buffers in the ring will be invalid.
+ */
+ if (!BufferIsValid(bufnum))
+ break;
+
+ if ((bufdesc = PrepareOrRejectEagerFlushBuffer(bufnum,
+ InvalidBlockNumber,
+ NULL,
+ true,
+ lsn)) != NULL)
+ return bufdesc;
+ }
+
+ return NULL;
+}
+
/*
* Prepare and write out a dirty victim buffer.
*
@@ -4294,12 +4360,14 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object,
* bufdesc may be modified.
*/
static void
-CleanVictimBuffer(BufferDesc *bufdesc,
+CleanVictimBuffer(BufferAccessStrategy strategy,
+ BufferDesc *bufdesc,
bool from_ring, IOContext io_context)
{
XLogRecPtr max_lsn = InvalidXLogRecPtr;
LWLock *content_lock;
+ bool first_buffer = true;
Assert(pg_atomic_read_u32(&bufdesc->state) & BM_DIRTY);
@@ -4307,11 +4375,165 @@ CleanVictimBuffer(BufferDesc *bufdesc,
if (!PrepareFlushBuffer(bufdesc, &max_lsn))
return;
- DoFlushBuffer(bufdesc, NULL, IOOBJECT_RELATION, io_context, max_lsn);
+ if (from_ring && StrategySupportsEagerFlush(strategy))
+ {
+ Buffer sweep_end = BufferDescriptorGetBuffer(bufdesc);
+ int cursor = StrategySweepStart(strategy);
+
+ /* Clean victim buffer and find more to flush opportunistically */
+ do
+ {
+ DoFlushBuffer(bufdesc, NULL, IOOBJECT_RELATION, io_context, max_lsn);
+ content_lock = BufferDescriptorGetContentLock(bufdesc);
+ LWLockRelease(content_lock);
+ ScheduleBufferTagForWriteback(&BackendWritebackContext, io_context,
+ &bufdesc->tag);
+ /* We leave the first buffer pinned for the caller */
+ if (!first_buffer)
+ UnpinBuffer(bufdesc);
+ first_buffer = false;
+ } while ((bufdesc = NextStratBufToFlush(strategy, sweep_end,
+ &max_lsn, &cursor)) != NULL);
+ }
+ else
+ {
+ DoFlushBuffer(bufdesc, NULL, IOOBJECT_RELATION, io_context, max_lsn);
+ content_lock = BufferDescriptorGetContentLock(bufdesc);
+ LWLockRelease(content_lock);
+ ScheduleBufferTagForWriteback(&BackendWritebackContext, io_context,
+ &bufdesc->tag);
+ }
+}
+
+/*
+ * Prepare bufdesc for eager flushing.
+ *
+ * Given bufnum, return the buffer descriptor of the buffer to eagerly flush,
+ * pinned and locked, or NULL if this buffer does not contain a block that
+ * should be flushed.
+ *
+ * require is the BlockNumber required by the caller. Some callers may require
+ * a specific BlockNumber to be in bufnum because they are assembling a
+ * contiguous run of blocks.
+ *
+ * If the caller needs the block to be from a specific relation, rlocator will
+ * be provided.
+ */
+static BufferDesc *
+PrepareOrRejectEagerFlushBuffer(Buffer bufnum, BlockNumber require,
+ RelFileLocator *rlocator, bool skip_pinned,
+ XLogRecPtr *max_lsn)
+{
+ BufferDesc *bufdesc;
+ uint32 old_buf_state;
+ uint32 buf_state;
+ XLogRecPtr lsn;
+ BlockNumber blknum;
+ LWLock *content_lock;
+
+ if (!BufferIsValid(bufnum))
+ return NULL;
+
+ Assert(!BufferIsLocal(bufnum));
+
+ bufdesc = GetBufferDescriptor(bufnum - 1);
+
+ /* Block may need to be in a specific relation */
+ if (rlocator &&
+ !RelFileLocatorEquals(BufTagGetRelFileLocator(&bufdesc->tag),
+ *rlocator))
+ return NULL;
+
+ /*
+ * Ensure that theres a free refcount entry and resource owner slot for
+ * the pin before pinning the buffer. While this may leake a refcount and
+ * slot if we return without a buffer, we should use that slot the next
+ * time we try and reserve a spot.
+ */
+ ResourceOwnerEnlarge(CurrentResourceOwner);
+ ReservePrivateRefCountEntry();
+
+ /*
+ * Check whether the buffer can be used and pin it if so. Do this using a
+ * CAS loop, to avoid having to lock the buffer header. We have to lock
+ * the buffer header later if we succeed in pinning the buffer here, but
+ * avoiding locking the buffer header if the buffer is in use is worth it.
+ */
+ old_buf_state = pg_atomic_read_u32(&bufdesc->state);
+
+ for (;;)
+ {
+ buf_state = old_buf_state;
+
+ if (!(buf_state & BM_DIRTY) || !(buf_state & BM_VALID))
+ return NULL;
+
+ /* We don't eagerly flush buffers used by others */
+ if (skip_pinned &&
+ (BUF_STATE_GET_REFCOUNT(buf_state) > 0 ||
+ BUF_STATE_GET_USAGECOUNT(buf_state) > 1))
+ return NULL;
+
+ if (unlikely(buf_state & BM_LOCKED))
+ {
+ old_buf_state = WaitBufHdrUnlocked(bufdesc);
+ continue;
+ }
+
+ /* pin the buffer if the CAS succeeds */
+ buf_state += BUF_REFCOUNT_ONE;
+
+ if (pg_atomic_compare_exchange_u32(&bufdesc->state, &old_buf_state,
+ buf_state))
+ {
+ TrackNewBufferPin(BufferDescriptorGetBuffer(bufdesc));
+ break;
+ }
+ }
+
+ CheckBufferIsPinnedOnce(bufnum);
+
+ blknum = BufferGetBlockNumber(bufnum);
+ Assert(BlockNumberIsValid(blknum));
+
+ /* We only include contiguous blocks in the run */
+ if (BlockNumberIsValid(require) && blknum != require)
+ goto except_unpin_buffer;
+
+ /* Don't eagerly flush buffers requiring WAL flush */
+ if (BufferNeedsWALFlush(bufdesc, &lsn))
+ goto except_unpin_buffer;
+
content_lock = BufferDescriptorGetContentLock(bufdesc);
+ if (!LWLockConditionalAcquire(content_lock, LW_SHARED))
+ goto except_unpin_buffer;
+
+ /*
+ * Now that we have the content lock, we need to recheck if we need to
+ * flush WAL.
+ */
+ if (BufferNeedsWALFlush(bufdesc, &lsn))
+ goto except_unpin_buffer;
+
+ /* Try to start an I/O operation */
+ if (!StartBufferIO(bufdesc, false, true))
+ goto except_unlock_content;
+
+ if (lsn > *max_lsn)
+ *max_lsn = lsn;
+
+ buf_state = LockBufHdr(bufdesc);
+ buf_state &= ~BM_JUST_DIRTIED;
+ UnlockBufHdr(bufdesc, buf_state);
+
+ return bufdesc;
+
+except_unlock_content:
LWLockRelease(content_lock);
- ScheduleBufferTagForWriteback(&BackendWritebackContext, io_context,
- &bufdesc->tag);
+
+except_unpin_buffer:
+ UnpinBuffer(bufdesc);
+ return NULL;
}
/*
diff --git a/src/backend/storage/buffer/freelist.c b/src/backend/storage/buffer/freelist.c
index b76be264eb5..4baa0550bb1 100644
--- a/src/backend/storage/buffer/freelist.c
+++ b/src/backend/storage/buffer/freelist.c
@@ -156,6 +156,31 @@ ClockSweepTick(void)
return victim;
}
+/*
+ * Some BufferAccessStrategies support eager flushing -- which is flushing
+ * buffers in the ring before they are needed. This can lead to better I/O
+ * patterns than lazily flushing buffers immediately before reusing them.
+ */
+bool
+StrategySupportsEagerFlush(BufferAccessStrategy strategy)
+{
+ Assert(strategy);
+
+ switch (strategy->btype)
+ {
+ case BAS_BULKWRITE:
+ return true;
+ case BAS_VACUUM:
+ case BAS_NORMAL:
+ case BAS_BULKREAD:
+ return false;
+ default:
+ elog(ERROR, "unrecognized buffer access strategy: %d",
+ (int) strategy->btype);
+ return false;
+ }
+}
+
/*
* StrategyGetBuffer
*
@@ -307,6 +332,29 @@ StrategyGetBuffer(BufferAccessStrategy strategy, uint32 *buf_state, bool *from_r
}
}
+/*
+ * Return the next buffer in the ring or InvalidBuffer if the current sweep is
+ * over.
+ */
+Buffer
+StrategySweepNextBuffer(BufferAccessStrategy strategy, int *sweep_cursor)
+{
+ if (++(*sweep_cursor) >= strategy->nbuffers)
+ *sweep_cursor = 0;
+
+ return strategy->buffers[*sweep_cursor];
+}
+
+/*
+ * Return the starting buffer of a sweep of the strategy ring
+ */
+int
+StrategySweepStart(BufferAccessStrategy strategy)
+{
+ return strategy->current;
+}
+
+
/*
* StrategySyncStart -- tell BgBufferSync where to start syncing
*
diff --git a/src/include/storage/buf_internals.h b/src/include/storage/buf_internals.h
index 7e258383048..b48dece3e63 100644
--- a/src/include/storage/buf_internals.h
+++ b/src/include/storage/buf_internals.h
@@ -442,6 +442,10 @@ extern void TerminateBufferIO(BufferDesc *buf, bool clear_dirty, uint32 set_flag
/* freelist.c */
+extern bool StrategySupportsEagerFlush(BufferAccessStrategy strategy);
+extern Buffer StrategySweepNextBuffer(BufferAccessStrategy strategy,
+ int *sweep_cursor);
+extern int StrategySweepStart(BufferAccessStrategy strategy);
extern IOContext IOContextForStrategy(BufferAccessStrategy strategy);
extern BufferDesc *StrategyGetBuffer(BufferAccessStrategy strategy,
uint32 *buf_state, bool *from_ring);
--
2.43.0
From f3b1be877ddbd5911f3813a8a0b3cd3877e0d5b9 Mon Sep 17 00:00:00 2001
From: Melanie Plageman <[email protected]>
Date: Wed, 15 Oct 2025 13:42:47 -0400
Subject: [PATCH v7 4/7] Write combining for BAS_BULKWRITE
Implement write combining for users of the bulkwrite buffer access
strategy (e.g. COPY FROM). When the buffer access strategy needs to
clean a buffer for reuse, it already opportunistically flushes some
other buffers. Now, combine any contiguous blocks from the same relation
into larger writes and issue them with smgrwritev().
The performance benefit for COPY FROM is mostly noticeable for multiple
concurrent COPY FROMs because a single COPY FROM is either CPU bound or
bound by WAL writes.
The infrastructure for flushing larger batches of IOs will be reused by
checkpointer and other processes doing writes of dirty data.
XXX: Because this sets in-place checksums for batches, it is not
committable until additional infrastructure goes in place.
Author: Melanie Plageman <[email protected]>
Reviewed-by: Chao Li <[email protected]>
Discussion: https://postgr.es/m/flat/CAAKRu_bcWRvRwZUop_d9vzF9nHAiT%2B-uPzkJ%3DS3ShZ1GqeAYOw%40mail.gmail.com
---
src/backend/storage/buffer/bufmgr.c | 217 ++++++++++++++++++++++++--
src/backend/storage/buffer/freelist.c | 26 +++
src/backend/storage/page/bufpage.c | 20 +++
src/backend/utils/probes.d | 2 +
src/include/storage/buf_internals.h | 32 ++++
src/include/storage/bufpage.h | 2 +
src/tools/pgindent/typedefs.list | 1 +
7 files changed, 288 insertions(+), 12 deletions(-)
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 7a553b8cdd2..3c49c8c2ef2 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -537,7 +537,11 @@ static void FlushBuffer(BufferDesc *buf, SMgrRelation reln,
static BufferDesc *NextStratBufToFlush(BufferAccessStrategy strategy,
Buffer sweep_end,
XLogRecPtr *lsn, int *sweep_cursor);
-
+static void FindFlushAdjacents(BufferAccessStrategy strategy, Buffer sweep_end,
+ BufferDesc *batch_start,
+ uint32 max_batch_size,
+ BufWriteBatch *batch,
+ int *sweep_cursor);
static bool BufferNeedsWALFlush(BufferDesc *bufdesc, XLogRecPtr *lsn);
static BufferDesc *PrepareOrRejectEagerFlushBuffer(Buffer bufnum, BlockNumber require,
RelFileLocator *rlocator, bool skip_pinned,
@@ -4316,10 +4320,91 @@ BufferNeedsWALFlush(BufferDesc *bufdesc, XLogRecPtr *lsn)
}
+
+/*
+ * Given a buffer descriptor, start, from a strategy ring, strategy, that
+ * supports eager flushing, find additional buffers from the ring that can be
+ * combined into a single write batch with this buffer.
+ *
+ * max_batch_size is the maximum number of blocks that can be combined into a
+ * single write in general. This function, based on the block number of start,
+ * will determine the maximum IO size for this particular write given how much
+ * of the file remains. max_batch_size is provided by the caller so it doesn't
+ * have to be recalculated for each write.
+ *
+ * batch is an output parameter that this function will fill with the needed
+ * information to write this IO.
+ *
+ * This function will pin and content lock all of the buffers that it
+ * assembles for the IO batch. The caller is responsible for issuing the IO.
+ */
+static void
+FindFlushAdjacents(BufferAccessStrategy strategy, Buffer sweep_end,
+ BufferDesc *batch_start,
+ uint32 max_batch_size,
+ BufWriteBatch *batch,
+ int *sweep_cursor)
+{
+ BlockNumber limit;
+ uint32 buf_state;
+
+ Assert(batch_start);
+ batch->bufdescs[0] = batch_start;
+
+ buf_state = LockBufHdr(batch_start);
+ batch->max_lsn = BufferGetLSN(batch_start);
+ UnlockBufHdr(batch_start, buf_state);
+
+ batch->start = batch->bufdescs[0]->tag.blockNum;
+ Assert(BlockNumberIsValid(batch->start));
+ batch->n = 1;
+ batch->forkno = BufTagGetForkNum(&batch->bufdescs[0]->tag);
+ batch->rlocator = BufTagGetRelFileLocator(&batch->bufdescs[0]->tag);
+ batch->reln = smgropen(batch->rlocator, INVALID_PROC_NUMBER);
+
+ limit = smgrmaxcombine(batch->reln, batch->forkno, batch->start);
+ limit = Min(max_batch_size, limit);
+ limit = Min(GetAdditionalPinLimit(), limit);
+
+ /*
+ * It's possible we're not allowed any more pins or there aren't more
+ * blocks in the target relation. In this case, just return. Our batch
+ * will have only one buffer.
+ */
+ if (limit <= 0)
+ return;
+
+ /* Now assemble a run of blocks to write out. */
+ for (; batch->n < limit; batch->n++)
+ {
+ Buffer bufnum;
+
+ if ((bufnum =
+ StrategySweepNextBuffer(strategy, sweep_cursor)) == sweep_end)
+ break;
+
+ /*
+ * For BAS_BULKWRITE, once you hit an InvalidBuffer, the remaining
+ * buffers in the ring will be invalid.
+ */
+ if (!BufferIsValid(bufnum))
+ break;
+
+ /* Stop when we encounter a buffer that will break the run */
+ if ((batch->bufdescs[batch->n] =
+ PrepareOrRejectEagerFlushBuffer(bufnum,
+ batch->start + batch->n,
+ &batch->rlocator,
+ true,
+ &batch->max_lsn)) == NULL)
+ break;
+ }
+}
+
/*
* Returns the buffer descriptor of the buffer containing the next block we
* should eagerly flush or NULL when there are no further buffers to consider
- * writing out.
+ * writing out. This will be the start of a new batch of buffers to write out.
*/
static BufferDesc *
NextStratBufToFlush(BufferAccessStrategy strategy,
@@ -4367,7 +4452,6 @@ CleanVictimBuffer(BufferAccessStrategy strategy,
XLogRecPtr max_lsn = InvalidXLogRecPtr;
LWLock *content_lock;
- bool first_buffer = true;
Assert(pg_atomic_read_u32(&bufdesc->state) & BM_DIRTY);
@@ -4379,19 +4463,22 @@ CleanVictimBuffer(BufferAccessStrategy strategy,
{
Buffer sweep_end = BufferDescriptorGetBuffer(bufdesc);
int cursor = StrategySweepStart(strategy);
+ uint32 max_batch_size = StrategyMaxWriteBatchSize(strategy);
+
+ /* Pin our victim again so it stays ours even after batch released */
+ ReservePrivateRefCountEntry();
+ ResourceOwnerEnlarge(CurrentResourceOwner);
+ IncrBufferRefCount(BufferDescriptorGetBuffer(bufdesc));
/* Clean victim buffer and find more to flush opportunistically */
do
{
- DoFlushBuffer(bufdesc, NULL, IOOBJECT_RELATION, io_context, max_lsn);
- content_lock = BufferDescriptorGetContentLock(bufdesc);
- LWLockRelease(content_lock);
- ScheduleBufferTagForWriteback(&BackendWritebackContext, io_context,
- &bufdesc->tag);
- /* We leave the first buffer pinned for the caller */
- if (!first_buffer)
- UnpinBuffer(bufdesc);
- first_buffer = false;
+ BufWriteBatch batch;
+
+ FindFlushAdjacents(strategy, sweep_end, bufdesc, max_batch_size,
+ &batch, &cursor);
+ FlushBufferBatch(&batch, io_context);
+ CompleteWriteBatchIO(&batch, io_context, &BackendWritebackContext);
} while ((bufdesc = NextStratBufToFlush(strategy, sweep_end,
&max_lsn, &cursor)) != NULL);
}
@@ -4536,6 +4623,70 @@ except_unpin_buffer:
return NULL;
}
+/*
+ * Given a prepared batch of buffers write them out as a vector.
+ */
+void
+FlushBufferBatch(BufWriteBatch *batch,
+ IOContext io_context)
+{
+ BlockNumber blknums[MAX_IO_COMBINE_LIMIT];
+ Block blocks[MAX_IO_COMBINE_LIMIT];
+ instr_time io_start;
+ ErrorContextCallback errcallback =
+ {
+ .callback = shared_buffer_write_error_callback,
+ .previous = error_context_stack,
+ };
+
+ error_context_stack = &errcallback;
+
+ if (!XLogRecPtrIsInvalid(batch->max_lsn))
+ XLogFlush(batch->max_lsn);
+
+ if (batch->reln == NULL)
+ batch->reln = smgropen(batch->rlocator, INVALID_PROC_NUMBER);
+
+#ifdef USE_ASSERT_CHECKING
+ for (uint32 i = 0; i < batch->n; i++)
+ {
+ XLogRecPtr lsn;
+
+ Assert(!BufferNeedsWALFlush(batch->bufdescs[i], &lsn));
+ }
+#endif
+
+ TRACE_POSTGRESQL_BUFFER_BATCH_FLUSH_START(batch->forkno,
+ batch->reln->smgr_rlocator.locator.spcOid,
+ batch->reln->smgr_rlocator.locator.dbOid,
+ batch->reln->smgr_rlocator.locator.relNumber,
+ batch->reln->smgr_rlocator.backend,
+ batch->n);
+
+ /*
+ * XXX: All blocks should be copied and then checksummed but doing so
+ * takes a lot of extra memory and a future patch will eliminate this
+ * requirement.
+ */
+ for (BlockNumber i = 0; i < batch->n; i++)
+ {
+ blknums[i] = batch->start + i;
+ blocks[i] = BufHdrGetBlock(batch->bufdescs[i]);
+ }
+
+ PageSetBatchChecksumInplace((Page *) blocks, blknums, batch->n);
+
+ io_start = pgstat_prepare_io_time(track_io_timing);
+
+ smgrwritev(batch->reln, batch->forkno,
+ batch->start, (const void **) blocks, batch->n, false);
+
+ pgstat_count_io_op_time(IOOBJECT_RELATION, io_context, IOOP_WRITE,
+ io_start, batch->n, BLCKSZ);
+
+ error_context_stack = errcallback.previous;
+}
+
/*
* Prepare the buffer with bufdesc for writing. Returns true if the buffer
* acutally needs writing and false otherwise. lsn returns the buffer's LSN if
@@ -4696,6 +4847,48 @@ FlushUnlockedBuffer(BufferDesc *buf, SMgrRelation reln,
LWLockRelease(BufferDescriptorGetContentLock(buf));
}
+/*
+ * Given a previously initialized batch with buffers that have already been
+ * flushed, terminate the IO on each buffer and then unlock and unpin them.
+ * This assumes all the buffers were locked and pinned. wb_context will be
+ * modified.
+ */
+void
+CompleteWriteBatchIO(BufWriteBatch *batch, IOContext io_context,
+ WritebackContext *wb_context)
+{
+ ErrorContextCallback errcallback =
+ {
+ .callback = shared_buffer_write_error_callback,
+ .previous = error_context_stack,
+ };
+
+ error_context_stack = &errcallback;
+ pgBufferUsage.shared_blks_written += batch->n;
+
+ for (uint32 i = 0; i < batch->n; i++)
+ {
+ Buffer buffer = BufferDescriptorGetBuffer(batch->bufdescs[i]);
+
+ errcallback.arg = batch->bufdescs[i];
+
+ /* Mark the buffer as clean and end the BM_IO_IN_PROGRESS state. */
+ TerminateBufferIO(batch->bufdescs[i], true, 0, true, false);
+ LWLockRelease(BufferDescriptorGetContentLock(batch->bufdescs[i]));
+ ReleaseBuffer(buffer);
+ ScheduleBufferTagForWriteback(wb_context, io_context,
+ &batch->bufdescs[i]->tag);
+ }
+
+ TRACE_POSTGRESQL_BUFFER_BATCH_FLUSH_DONE(batch->forkno,
+ batch->reln->smgr_rlocator.locator.spcOid,
+ batch->reln->smgr_rlocator.locator.dbOid,
+ batch->reln->smgr_rlocator.locator.relNumber,
+ batch->reln->smgr_rlocator.backend,
+ batch->n, batch->start);
+ error_context_stack = errcallback.previous;
+}
+
/*
* RelationGetNumberOfBlocksInFork
* Determines the current number of pages in the specified relation fork.
diff --git a/src/backend/storage/buffer/freelist.c b/src/backend/storage/buffer/freelist.c
index 4baa0550bb1..f73a52c7e56 100644
--- a/src/backend/storage/buffer/freelist.c
+++ b/src/backend/storage/buffer/freelist.c
@@ -775,6 +775,32 @@ GetBufferFromRing(BufferAccessStrategy strategy, uint32 *buf_state)
return NULL;
}
+
+/*
+ * Determine the largest IO we can assemble from the given strategy ring given
+ * strategy-specific as well as global constraints on the number of pinned
+ * buffers and max IO size.
+ */
+uint32
+StrategyMaxWriteBatchSize(BufferAccessStrategy strategy)
+{
+ uint32 max_possible_buffer_limit;
+ uint32 max_write_batch_size;
+ int strategy_pin_limit;
+
+ max_write_batch_size = io_combine_limit;
+
+ strategy_pin_limit = GetAccessStrategyPinLimit(strategy);
+ max_possible_buffer_limit = GetPinLimit();
+
+ max_write_batch_size = Min(strategy_pin_limit, max_write_batch_size);
+ max_write_batch_size = Min(max_possible_buffer_limit, max_write_batch_size);
+ max_write_batch_size = Max(1, max_write_batch_size);
+ max_write_batch_size = Min(max_write_batch_size, io_combine_limit);
+ Assert(max_write_batch_size < MAX_IO_COMBINE_LIMIT);
+ return max_write_batch_size;
+}
+
/*
* AddBufferToRing -- add a buffer to the buffer ring
*
diff --git a/src/backend/storage/page/bufpage.c b/src/backend/storage/page/bufpage.c
index dbb49ed9197..12503934502 100644
--- a/src/backend/storage/page/bufpage.c
+++ b/src/backend/storage/page/bufpage.c
@@ -1546,3 +1546,23 @@ PageSetChecksumInplace(Page page, BlockNumber blkno)
((PageHeader) page)->pd_checksum = pg_checksum_page(page, blkno);
}
+
+/*
+ * A helper to set multiple block's checksums
+ */
+void
+PageSetBatchChecksumInplace(Page *pages, const BlockNumber *blknos, uint32 length)
+{
+ /* If we don't need a checksum, just return */
+ if (!DataChecksumsEnabled())
+ return;
+
+ for (uint32 i = 0; i < length; i++)
+ {
+ Page page = pages[i];
+
+ if (PageIsNew(page))
+ continue;
+ ((PageHeader) page)->pd_checksum = pg_checksum_page(page, blknos[i]);
+ }
+}
diff --git a/src/backend/utils/probes.d b/src/backend/utils/probes.d
index e9e413477ba..36dd4f8375b 100644
--- a/src/backend/utils/probes.d
+++ b/src/backend/utils/probes.d
@@ -61,6 +61,8 @@ provider postgresql {
probe buffer__flush__done(ForkNumber, BlockNumber, Oid, Oid, Oid);
probe buffer__extend__start(ForkNumber, Oid, Oid, Oid, int, unsigned int);
probe buffer__extend__done(ForkNumber, Oid, Oid, Oid, int, unsigned int, BlockNumber);
+ probe buffer__batch__flush__start(ForkNumber, Oid, Oid, Oid, int, unsigned int);
+ probe buffer__batch__flush__done(ForkNumber, Oid, Oid, Oid, int, unsigned int, BlockNumber);
probe buffer__checkpoint__start(int);
probe buffer__checkpoint__sync__start();
diff --git a/src/include/storage/buf_internals.h b/src/include/storage/buf_internals.h
index b48dece3e63..337f1427bbc 100644
--- a/src/include/storage/buf_internals.h
+++ b/src/include/storage/buf_internals.h
@@ -418,6 +418,34 @@ ResourceOwnerForgetBufferIO(ResourceOwner owner, Buffer buffer)
ResourceOwnerForget(owner, Int32GetDatum(buffer), &buffer_io_resowner_desc);
}
+/*
+ * Used to write out multiple blocks at a time in a combined IO. bufdescs
+ * contains buffer descriptors for buffers containing adjacent blocks of the
+ * same fork of the same relation.
+ */
+typedef struct BufWriteBatch
+{
+ RelFileLocator rlocator;
+ ForkNumber forkno;
+ SMgrRelation reln;
+
+ /*
+ * The BlockNumber of the first block in the run of contiguous blocks to
+ * be written out as a single IO.
+ */
+ BlockNumber start;
+
+ /*
+ * While assembling the buffers, we keep track of the maximum LSN so that
+ * we can flush WAL through this LSN before flushing the buffers.
+ */
+ XLogRecPtr max_lsn;
+
+ /* The number of valid buffers in bufdescs */
+ uint32 n;
+ BufferDesc *bufdescs[MAX_IO_COMBINE_LIMIT];
+} BufWriteBatch;
+
/*
* Internal buffer management routines
*/
@@ -432,6 +460,7 @@ extern void WritebackContextInit(WritebackContext *context, int *max_pending);
extern void IssuePendingWritebacks(WritebackContext *wb_context, IOContext io_context);
extern void ScheduleBufferTagForWriteback(WritebackContext *wb_context,
IOContext io_context, BufferTag *tag);
+extern void FlushBufferBatch(BufWriteBatch *batch, IOContext io_context);
extern void TrackNewBufferPin(Buffer buf);
@@ -443,9 +472,12 @@ extern void TerminateBufferIO(BufferDesc *buf, bool clear_dirty, uint32 set_flag
/* freelist.c */
extern bool StrategySupportsEagerFlush(BufferAccessStrategy strategy);
+extern uint32 StrategyMaxWriteBatchSize(BufferAccessStrategy strategy);
extern Buffer StrategySweepNextBuffer(BufferAccessStrategy strategy,
int *sweep_cursor);
extern int StrategySweepStart(BufferAccessStrategy strategy);
+extern void CompleteWriteBatchIO(BufWriteBatch *batch, IOContext io_context,
+ WritebackContext *wb_context);
extern IOContext IOContextForStrategy(BufferAccessStrategy strategy);
extern BufferDesc *StrategyGetBuffer(BufferAccessStrategy strategy,
uint32 *buf_state, bool *from_ring);
diff --git a/src/include/storage/bufpage.h b/src/include/storage/bufpage.h
index aeb67c498c5..bb4e6af461a 100644
--- a/src/include/storage/bufpage.h
+++ b/src/include/storage/bufpage.h
@@ -507,5 +507,7 @@ extern bool PageIndexTupleOverwrite(Page page, OffsetNumber offnum,
Item newtup, Size newsize);
extern char *PageSetChecksumCopy(Page page, BlockNumber blkno);
extern void PageSetChecksumInplace(Page page, BlockNumber blkno);
+extern void PageSetBatchChecksumInplace(Page *pages, const BlockNumber *blknos,
+ uint32 length);
#endif /* BUFPAGE_H */
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 5290b91e83e..c80e1ff4107 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -350,6 +350,7 @@ BufferManagerRelation
BufferStrategyControl
BufferTag
BufferUsage
+BufWriteBatch
BuildAccumulator
BuiltinScript
BulkInsertState
--
2.43.0
From 7a48e402ebabc76bcead5ffd8db1f62720cc1704 Mon Sep 17 00:00:00 2001
From: Melanie Plageman <[email protected]>
Date: Tue, 2 Sep 2025 15:22:11 -0400
Subject: [PATCH v7 5/7] Add database Oid to CkptSortItem
This is useful for checkpointer write combining -- which will be added
in a future commit.
Author: Melanie Plageman <[email protected]>
Reviewed-by: Chao Li <[email protected]>
Discussion: https://postgr.es/m/2FA0BAC7-5413-4ABD-94CA-4398FE77750D%40gmail.com
---
src/backend/storage/buffer/bufmgr.c | 8 ++++++++
src/include/storage/buf_internals.h | 1 +
2 files changed, 9 insertions(+)
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 3c49c8c2ef2..0ee7feba7ba 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -3413,6 +3413,7 @@ BufferSync(int flags)
item = &CkptBufferIds[num_to_scan++];
item->buf_id = buf_id;
item->tsId = bufHdr->tag.spcOid;
+ item->dbId = bufHdr->tag.dbOid;
item->relNumber = BufTagGetRelNumber(&bufHdr->tag);
item->forkNum = BufTagGetForkNum(&bufHdr->tag);
item->blockNum = bufHdr->tag.blockNum;
@@ -6812,6 +6813,13 @@ ckpt_buforder_comparator(const CkptSortItem *a, const CkptSortItem *b)
return -1;
else if (a->tsId > b->tsId)
return 1;
+
+ /* compare database */
+ if (a->dbId < b->dbId)
+ return -1;
+ else if (a->dbId > b->dbId)
+ return 1;
+
/* compare relation */
if (a->relNumber < b->relNumber)
return -1;
diff --git a/src/include/storage/buf_internals.h b/src/include/storage/buf_internals.h
index 337f1427bbc..a0051780a13 100644
--- a/src/include/storage/buf_internals.h
+++ b/src/include/storage/buf_internals.h
@@ -384,6 +384,7 @@ extern uint32 WaitBufHdrUnlocked(BufferDesc *buf);
typedef struct CkptSortItem
{
Oid tsId;
+ Oid dbId;
RelFileNumber relNumber;
ForkNumber forkNum;
BlockNumber blockNum;
--
2.43.0
From 69f182fd94768efc2dac8fca3c823e8c2a8cd483 Mon Sep 17 00:00:00 2001
From: Melanie Plageman <[email protected]>
Date: Wed, 15 Oct 2025 15:23:16 -0400
Subject: [PATCH v7 6/7] Implement checkpointer data write combining
When the checkpointer writes out dirty buffers, writing multiple
contiguous blocks as a single IO is a substantial performance
improvement. The checkpointer is usually bottlenecked on IO, so issuing
larger IOs leads to increased write throughput and faster checkpoints.
Author: Melanie Plageman <[email protected]>
Reviewed-by: Chao Li <[email protected]>
Discussion: https://postgr.es/m/2FA0BAC7-5413-4ABD-94CA-4398FE77750D%40gmail.com
---
src/backend/storage/buffer/bufmgr.c | 222 ++++++++++++++++++++++++----
src/backend/utils/probes.d | 2 +-
2 files changed, 196 insertions(+), 28 deletions(-)
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 0ee7feba7ba..7a0284973e0 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -513,6 +513,7 @@ static bool PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy,
static void PinBuffer_Locked(BufferDesc *buf);
static void UnpinBuffer(BufferDesc *buf);
static void UnpinBufferNoOwner(BufferDesc *buf);
+static uint32 CheckpointerMaxBatchSize(void);
static void BufferSync(int flags);
static int SyncOneBuffer(int buf_id, bool skip_recently_used,
WritebackContext *wb_context);
@@ -3355,7 +3356,6 @@ TrackNewBufferPin(Buffer buf)
static void
BufferSync(int flags)
{
- uint32 buf_state;
int buf_id;
int num_to_scan;
int num_spaces;
@@ -3367,6 +3367,8 @@ BufferSync(int flags)
int i;
uint32 mask = BM_DIRTY;
WritebackContext wb_context;
+ uint32 max_batch_size;
+ BufWriteBatch batch;
/*
* Unless this is a shutdown checkpoint or we have been explicitly told,
@@ -3397,6 +3399,7 @@ BufferSync(int flags)
for (buf_id = 0; buf_id < NBuffers; buf_id++)
{
BufferDesc *bufHdr = GetBufferDescriptor(buf_id);
+ uint32 buf_state;
/*
* Header spinlock is enough to examine BM_DIRTY, see comment in
@@ -3537,48 +3540,196 @@ BufferSync(int flags)
*/
num_processed = 0;
num_written = 0;
+ max_batch_size = CheckpointerMaxBatchSize();
while (!binaryheap_empty(ts_heap))
{
+ BlockNumber limit = max_batch_size;
BufferDesc *bufHdr = NULL;
CkptTsStatus *ts_stat = (CkptTsStatus *)
DatumGetPointer(binaryheap_first(ts_heap));
+ int ts_end = ts_stat->index - ts_stat->num_scanned + ts_stat->num_to_scan;
+ int processed = 0;
- buf_id = CkptBufferIds[ts_stat->index].buf_id;
- Assert(buf_id != -1);
+ batch.start = InvalidBlockNumber;
+ batch.max_lsn = InvalidXLogRecPtr;
+ batch.n = 0;
- bufHdr = GetBufferDescriptor(buf_id);
+ while (batch.n < limit)
+ {
+ uint32 buf_state;
+ XLogRecPtr lsn = InvalidXLogRecPtr;
+ LWLock *content_lock;
+ CkptSortItem item;
- num_processed++;
+ if (ProcSignalBarrierPending)
+ ProcessProcSignalBarrier();
- /*
- * We don't need to acquire the lock here, because we're only looking
- * at a single bit. It's possible that someone else writes the buffer
- * and clears the flag right after we check, but that doesn't matter
- * since SyncOneBuffer will then do nothing. However, there is a
- * further race condition: it's conceivable that between the time we
- * examine the bit here and the time SyncOneBuffer acquires the lock,
- * someone else not only wrote the buffer but replaced it with another
- * page and dirtied it. In that improbable case, SyncOneBuffer will
- * write the buffer though we didn't need to. It doesn't seem worth
- * guarding against this, though.
- */
- if (pg_atomic_read_u32(&bufHdr->state) & BM_CHECKPOINT_NEEDED)
- {
- if (SyncOneBuffer(buf_id, false, &wb_context) & BUF_WRITTEN)
+ /* Check if we are done with this tablespace */
+ if (ts_stat->index + processed >= ts_end)
+ break;
+
+ item = CkptBufferIds[ts_stat->index + processed];
+
+ buf_id = item.buf_id;
+ Assert(buf_id != -1);
+
+ bufHdr = GetBufferDescriptor(buf_id);
+
+ /*
+ * If this is the first block of the batch, then check if we need
+ * to open a new relation. Open the relation now because we have
+ * to determine the maximum IO size based on how many blocks
+ * remain in the file.
+ */
+ if (!BlockNumberIsValid(batch.start))
+ {
+ Assert(batch.max_lsn == InvalidXLogRecPtr && batch.n == 0);
+ batch.rlocator.spcOid = item.tsId;
+ batch.rlocator.dbOid = item.dbId;
+ batch.rlocator.relNumber = item.relNumber;
+ batch.forkno = item.forkNum;
+ batch.start = item.blockNum;
+ batch.reln = smgropen(batch.rlocator, INVALID_PROC_NUMBER);
+ limit = smgrmaxcombine(batch.reln, batch.forkno, batch.start);
+ limit = Min(max_batch_size, limit);
+ limit = Min(GetAdditionalPinLimit(), limit);
+ /* Guarantee progress */
+ limit = Max(limit, 1);
+ }
+
+ /*
+ * Once we hit blocks from the next relation or fork of the
+ * relation, break out of the loop and issue the IO we've built up
+ * so far. It is important that we don't increment processed
+ * because we want to start the next IO with this item.
+ */
+ if (item.dbId != batch.rlocator.dbOid)
+ break;
+
+ if (item.relNumber != batch.rlocator.relNumber)
+ break;
+
+ if (item.forkNum != batch.forkno)
+ break;
+
+ Assert(item.tsId == batch.rlocator.spcOid);
+
+ /*
+ * If the next block is not contiguous, we can't include it in the
+ * IO we will issue. Break out of the loop and issue what we have
+ * so far. Do not count this item as processed -- otherwise we
+ * will end up skipping it.
+ */
+ if (item.blockNum != batch.start + batch.n)
+ break;
+
+ /*
+ * We don't need to acquire the lock here, because we're only
+ * looking at a few bits. It's possible that someone else writes
+ * the buffer and clears the flag right after we check, but that
+ * doesn't matter since StartBufferIO will then return false.
+ *
+ * If the buffer doesn't need checkpointing, don't include it in
+ * the batch we are building. And if the buffer doesn't need
+ * flushing, we're done with the item, so count it as processed
+ * and break out of the loop to issue the IO so far.
+ */
+ buf_state = pg_atomic_read_u32(&bufHdr->state);
+ if ((buf_state & (BM_CHECKPOINT_NEEDED | BM_VALID | BM_DIRTY)) !=
+ (BM_CHECKPOINT_NEEDED | BM_VALID | BM_DIRTY))
+ {
+ processed++;
+ break;
+ }
+
+ ReservePrivateRefCountEntry();
+ ResourceOwnerEnlarge(CurrentResourceOwner);
+ PinBuffer(bufHdr, NULL, false);
+
+ /*
+ * There is a race condition here: it's conceivable that between
+ * the time we examine the buffer header for BM_CHECKPOINT_NEEDED
+ * above and when we are now acquiring the lock that, someone else
+ * not only wrote the buffer but replaced it with another page and
+ * dirtied it. In that improbable case, we will write the buffer
+ * though we didn't need to. It doesn't seem worth guarding
+ * against this, though.
+ */
+ content_lock = BufferDescriptorGetContentLock(bufHdr);
+
+ /*
+ * We are willing to wait for the content lock on the first IO in
+ * the batch. However, for subsequent IOs, waiting could lead to
+ * deadlock. We have to eventually flush all eligible buffers,
+ * though. So, if we fail to acquire the lock on a subsequent
+ * buffer, we break out and issue the IO we've built up so far.
+ * Then we come back and start a new IO with that buffer as the
+ * starting buffer. As such, we must not count the item as
+ * processed if we end up failing to acquire the content lock.
+ */
+ if (batch.n == 0)
+ LWLockAcquire(content_lock, LW_SHARED);
+ else if (!LWLockConditionalAcquire(content_lock, LW_SHARED))
{
- TRACE_POSTGRESQL_BUFFER_SYNC_WRITTEN(buf_id);
- PendingCheckpointerStats.buffers_written++;
- num_written++;
+ UnpinBuffer(bufHdr);
+ break;
}
+
+ /*
+ * If the buffer doesn't need IO, count the item as processed,
+ * release the buffer, and break out of the loop to issue the IO
+ * we have built up so far.
+ */
+ if (!StartBufferIO(bufHdr, false, true))
+ {
+ processed++;
+ LWLockRelease(content_lock);
+ UnpinBuffer(bufHdr);
+ break;
+ }
+
+ buf_state = LockBufHdr(bufHdr);
+ lsn = BufferGetLSN(bufHdr);
+ buf_state &= ~BM_JUST_DIRTIED;
+ UnlockBufHdr(bufHdr, buf_state);
+
+ /*
+ * Keep track of the max LSN so that we can be sure to flush
+ * enough WAL before flushing data from the buffers. See comment
+ * in DoFlushBuffer() for more on why we don't consider the LSNs
+ * of unlogged relations.
+ */
+ if (buf_state & BM_PERMANENT && lsn > batch.max_lsn)
+ batch.max_lsn = lsn;
+
+ batch.bufdescs[batch.n++] = bufHdr;
+ processed++;
}
/*
* Measure progress independent of actually having to flush the buffer
- * - otherwise writing become unbalanced.
+ * - otherwise writing becomes unbalanced.
*/
- ts_stat->progress += ts_stat->progress_slice;
- ts_stat->num_scanned++;
- ts_stat->index++;
+ num_processed += processed;
+ ts_stat->progress += ts_stat->progress_slice * processed;
+ ts_stat->num_scanned += processed;
+ ts_stat->index += processed;
+
+ /*
+ * If we built up an IO, issue it. There's a chance we didn't find any
+ * items referencing buffers that needed flushing this time, but we
+ * still want to check if we should update the heap if we examined and
+ * processed the items.
+ */
+ if (batch.n > 0)
+ {
+ FlushBufferBatch(&batch, IOCONTEXT_NORMAL);
+ CompleteWriteBatchIO(&batch, IOCONTEXT_NORMAL, &wb_context);
+
+ TRACE_POSTGRESQL_BUFFER_BATCH_SYNC_WRITTEN(batch.n);
+ PendingCheckpointerStats.buffers_written += batch.n;
+ num_written += batch.n;
+ }
/* Have all the buffers from the tablespace been processed? */
if (ts_stat->num_scanned == ts_stat->num_to_scan)
@@ -6414,6 +6565,23 @@ IsBufferCleanupOK(Buffer buffer)
return false;
}
+/*
+ * The maximum number of blocks that can be written out in a single batch by
+ * the checkpointer.
+ */
+static uint32
+CheckpointerMaxBatchSize(void)
+{
+ uint32 result;
+ uint32 pin_limit = GetPinLimit();
+
+ result = Max(pin_limit, 1);
+ result = Min(pin_limit, io_combine_limit);
+ result = Max(result, 1);
+ Assert(result < MAX_IO_COMBINE_LIMIT);
+ return result;
+}
+
/*
* Functions for buffer I/O handling
diff --git a/src/backend/utils/probes.d b/src/backend/utils/probes.d
index 36dd4f8375b..d6970731ba9 100644
--- a/src/backend/utils/probes.d
+++ b/src/backend/utils/probes.d
@@ -68,7 +68,7 @@ provider postgresql {
probe buffer__checkpoint__sync__start();
probe buffer__checkpoint__done();
probe buffer__sync__start(int, int);
- probe buffer__sync__written(int);
+ probe buffer__batch__sync__written(BlockNumber);
probe buffer__sync__done(int, int, int);
probe deadlock__found();
--
2.43.0
From ab750885bc1816b0e6f01173b62f3296b2ea21a1 Mon Sep 17 00:00:00 2001
From: Melanie Plageman <[email protected]>
Date: Wed, 15 Oct 2025 16:16:58 -0400
Subject: [PATCH v7 7/7] WIP: Refactor SyncOneBuffer for bgwriter only
Only bgwriter uses SyncOneBuffer now so we can remove the
skip_recently_used parameter and make it the default.
5e89985928795f243 introduced the pattern of using a CAS loop instead of
locking the buffer header and then calling PinBuffer_Locked(). Do that
in SyncOneBuffer() so we can avoid taking the buffer header spinlock in
the common case that the buffer is recently used.
---
src/backend/storage/buffer/bufmgr.c | 96 +++++++++++++++++------------
1 file changed, 56 insertions(+), 40 deletions(-)
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 7a0284973e0..cf515a4d07a 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -515,8 +515,7 @@ static void UnpinBuffer(BufferDesc *buf);
static void UnpinBufferNoOwner(BufferDesc *buf);
static uint32 CheckpointerMaxBatchSize(void);
static void BufferSync(int flags);
-static int SyncOneBuffer(int buf_id, bool skip_recently_used,
- WritebackContext *wb_context);
+static int SyncOneBuffer(int buf_id, WritebackContext *wb_context);
static void WaitIO(BufferDesc *buf);
static void AbortBufferIO(Buffer buffer);
static void shared_buffer_write_error_callback(void *arg);
@@ -4003,8 +4002,7 @@ BgBufferSync(WritebackContext *wb_context)
/* Execute the LRU scan */
while (num_to_scan > 0 && reusable_buffers < upcoming_alloc_est)
{
- int sync_state = SyncOneBuffer(next_to_clean, true,
- wb_context);
+ int sync_state = SyncOneBuffer(next_to_clean, wb_context);
if (++next_to_clean >= NBuffers)
{
@@ -4067,8 +4065,8 @@ BgBufferSync(WritebackContext *wb_context)
/*
* SyncOneBuffer -- process a single buffer during syncing.
*
- * If skip_recently_used is true, we don't write currently-pinned buffers, nor
- * buffers marked recently used, as these are not replacement candidates.
+ * We don't write currently-pinned buffers, nor buffers marked recently used,
+ * as these are not replacement candidates.
*
* Returns a bitmask containing the following flag bits:
* BUF_WRITTEN: we wrote the buffer.
@@ -4079,53 +4077,71 @@ BgBufferSync(WritebackContext *wb_context)
* after locking it, but we don't care all that much.)
*/
static int
-SyncOneBuffer(int buf_id, bool skip_recently_used, WritebackContext *wb_context)
+SyncOneBuffer(int buf_id, WritebackContext *wb_context)
{
BufferDesc *bufHdr = GetBufferDescriptor(buf_id);
int result = 0;
+ uint32 old_buf_state;
uint32 buf_state;
BufferTag tag;
- /* Make sure we can handle the pin */
- ReservePrivateRefCountEntry();
- ResourceOwnerEnlarge(CurrentResourceOwner);
-
/*
- * Check whether buffer needs writing.
- *
- * We can make this check without taking the buffer content lock so long
- * as we mark pages dirty in access methods *before* logging changes with
- * XLogInsert(): if someone marks the buffer dirty just after our check we
- * don't worry because our checkpoint.redo points before log record for
- * upcoming changes and so we are not required to write such dirty buffer.
+ * Check whether the buffer can be used and pin it if so. Do this using a
+ * CAS loop, to avoid having to lock the buffer header.
*/
- buf_state = LockBufHdr(bufHdr);
-
- if (BUF_STATE_GET_REFCOUNT(buf_state) == 0 &&
- BUF_STATE_GET_USAGECOUNT(buf_state) == 0)
+ old_buf_state = pg_atomic_read_u32(&bufHdr->state);
+ for (;;)
{
+ buf_state = old_buf_state;
+
+ /*
+ * We can make these check without taking the buffer content lock so
+ * long as we mark pages dirty in access methods *before* logging
+ * changes with XLogInsert(): if someone marks the buffer dirty just
+ * after our check we don't worry because our checkpoint.redo points
+ * before log record for upcoming changes and so we are not required
+ * to write such dirty buffer.
+ */
+ if (BUF_STATE_GET_REFCOUNT(buf_state) != 0 ||
+ BUF_STATE_GET_USAGECOUNT(buf_state) != 0)
+ {
+ /* Don't write recently-used buffers */
+ return result;
+ }
+
result |= BUF_REUSABLE;
- }
- else if (skip_recently_used)
- {
- /* Caller told us not to write recently-used buffers */
- UnlockBufHdr(bufHdr, buf_state);
- return result;
- }
- if (!(buf_state & BM_VALID) || !(buf_state & BM_DIRTY))
- {
- /* It's clean, so nothing to do */
- UnlockBufHdr(bufHdr, buf_state);
- return result;
+ if (!(buf_state & BM_VALID) || !(buf_state & BM_DIRTY))
+ {
+ /* It's clean, so nothing to do */
+ return result;
+ }
+
+ if (unlikely(buf_state & BM_LOCKED))
+ {
+ old_buf_state = WaitBufHdrUnlocked(bufHdr);
+ continue;
+ }
+
+ /* Make sure we can handle the pin */
+ ReservePrivateRefCountEntry();
+ ResourceOwnerEnlarge(CurrentResourceOwner);
+
+ /* pin the buffer if the CAS succeeds */
+ buf_state += BUF_REFCOUNT_ONE;
+
+ if (pg_atomic_compare_exchange_u32(&bufHdr->state, &old_buf_state,
+ buf_state))
+ {
+ TrackNewBufferPin(BufferDescriptorGetBuffer(bufHdr));
+ break;
+ }
}
/*
- * Pin it, share-lock it, write it. (FlushBuffer will do nothing if the
- * buffer is clean by the time we've locked it.)
+ * Share lock and write it out (FlushBuffer will do nothing if the buffer
+ * is clean by the time we've locked it.)
*/
- PinBuffer_Locked(bufHdr);
-
FlushUnlockedBuffer(bufHdr, NULL, IOOBJECT_RELATION, IOCONTEXT_NORMAL);
tag = bufHdr->tag;
@@ -4133,8 +4149,8 @@ SyncOneBuffer(int buf_id, bool skip_recently_used, WritebackContext *wb_context)
UnpinBuffer(bufHdr);
/*
- * SyncOneBuffer() is only called by checkpointer and bgwriter, so
- * IOContext will always be IOCONTEXT_NORMAL.
+ * SyncOneBuffer() is only called by bgwriter, so IOContext will always be
+ * IOCONTEXT_NORMAL.
*/
ScheduleBufferTagForWriteback(wb_context, IOCONTEXT_NORMAL, &tag);
--
2.43.0