Hi all, With reference to the last patches (v11) I received [1] and while reviewing Melanie’s latest feedback, I understood that PageSetBatchChecksumInplace() is currently WIP and depends on upcoming changes to hint-bit locking. It will be contrary to the flow if I propose new functional changes to checksum batching at this time. So for now I will focus on preparatory or documentation improvements until I get the updates on dependencies. Regarding my patch attached, the patch introduces write-combining during checkpoints by batching contiguous buffers and allowing them to be written using vectorized I/O. My patch includes write-combining for checkpoint buffer flushes, contiguous buffer batching, Preserved WAL ordering, locking, and buffer state invariants. The change is currently limited to the checkpointer path (BufferSync()). So far I tested my implementation and found that all the regression (233 tests) and isolation tests (121 tests) got passed, the manual pgbench validation completed successfully and also verified pg_stat_bgwriter counters before and after checkpoints. So far the implementation is stable in my system. And currently, I am focussing on the implementation of check pointer write combining for bgwriter behavior and will update it soon after validating my implementation. I hope my findings and approach will be helpful in further implementations and I would appreciate feedback on my approach.
Regards, Soumya Reference: [1] https://www.postgresql.org/message-id/flat/CAAKRu_ZiEpE_EHww3S3-E3iznybdnX8mXSO7Wsuru7%3DP9Y%3DczQ%40mail.gmail.com#52e4f67645f26609427d5b1fc31fdf08
From 9aba47eea0f5a856d023fbc32ab61ec62cacf22a Mon Sep 17 00:00:00 2001 From: Soumya S Murali <[email protected]> Date: Mon, 15 Dec 2025 14:08:42 +0530 Subject: [PATCH] Add write-combining to checkpoint buffer writes. Signed-off-by: Soumya S Murali <[email protected]> --- src/backend/storage/buffer/bufmgr.c | 750 ++++++++++++++++------------ src/backend/storage/page/bufpage.c | 19 + src/include/storage/buf_internals.h | 22 + src/include/storage/bufmgr.h | 4 +- 4 files changed, 485 insertions(+), 310 deletions(-) diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index f373cead95f..2ea5311aed2 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -537,6 +537,12 @@ static void FlushUnlockedBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object, IOContext io_context); static void FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object, IOContext io_context); +static bool PrepareBufferForCheckpoint(BufferDesc *bufdesc, + XLogRecPtr *lsn); +static bool BufferNeedsWALFlush(BufferDesc *bufdesc, XLogRecPtr *lsn); +static void CleanVictimBuffer(BufferDesc *bufdesc, bool from_ring, + IOContext io_context); +static int FlushBufferBatch(BufWriteBatch *batch, IOContext io_context); static void FindAndDropRelationBuffers(RelFileLocator rlocator, ForkNumber forkNum, BlockNumber nForkBlock, @@ -2331,34 +2337,35 @@ GetVictimBuffer(BufferAccessStrategy strategy, IOContext io_context) ReservePrivateRefCountEntry(); ResourceOwnerEnlarge(CurrentResourceOwner); - /* we return here if a prospective victim buffer gets used concurrently */ -again: + /* retry using loop instead of goto */ + for (;;) + { - /* - * Select a victim buffer. The buffer is returned pinned and owned by - * this backend. - */ - buf_hdr = StrategyGetBuffer(strategy, &buf_state, &from_ring); - buf = BufferDescriptorGetBuffer(buf_hdr); + /* + * Select a victim buffer. The buffer is returned pinned and owned by + * this backend. + */ + buf_hdr = StrategyGetBuffer(strategy, &buf_state, &from_ring); + buf = BufferDescriptorGetBuffer(buf_hdr); - /* - * We shouldn't have any other pins for this buffer. - */ - CheckBufferIsPinnedOnce(buf); + /* + * We shouldn't have any other pins for this buffer. + */ + CheckBufferIsPinnedOnce(buf); - /* - * If the buffer was dirty, try to write it out. There is a race - * condition here, in that someone might dirty it after we released the - * buffer header lock above, or even while we are writing it out (since - * our share-lock won't prevent hint-bit updates). We will recheck the - * dirty bit after re-locking the buffer header. - */ - if (buf_state & BM_DIRTY) - { - LWLock *content_lock; + /* + * If the buffer was dirty, try to write it out. There is a race + * condition here, in that someone might dirty it after we released the + * buffer header lock above, or even while we are writing it out (since + * our share-lock won't prevent hint-bit updates). We will recheck the + * dirty bit after re-locking the buffer header. + */ + if (buf_state & BM_DIRTY) + { + LWLock *content_lock; - Assert(buf_state & BM_TAG_VALID); - Assert(buf_state & BM_VALID); + Assert(buf_state & BM_TAG_VALID); + Assert(buf_state & BM_VALID); /* * We need a share-lock on the buffer contents to write it out (else @@ -2374,16 +2381,16 @@ again: * one just happens to be trying to split the page the first one got * from StrategyGetBuffer.) */ - content_lock = BufferDescriptorGetContentLock(buf_hdr); - if (!LWLockConditionalAcquire(content_lock, LW_SHARED)) - { + content_lock = BufferDescriptorGetContentLock(buf_hdr); + if (!LWLockConditionalAcquire(content_lock, LW_SHARED)) + { /* * Someone else has locked the buffer, so give it up and loop back * to get another one. */ - UnpinBuffer(buf_hdr); - goto again; - } + UnpinBuffer(buf_hdr); + continue; + } /* * If using a nondefault strategy, and writing the buffer would @@ -2392,35 +2399,34 @@ again: * lock to inspect the page LSN, so this can't be done inside * StrategyGetBuffer. */ - if (strategy != NULL) - { - XLogRecPtr lsn; + if (strategy != NULL) + { + XLogRecPtr lsn; - /* Read the LSN while holding buffer header lock */ - buf_state = LockBufHdr(buf_hdr); - lsn = BufferGetLSN(buf_hdr); - UnlockBufHdr(buf_hdr); + /* Read the LSN while holding buffer header lock */ + buf_state = LockBufHdr(buf_hdr); + lsn = BufferGetLSN(buf_hdr); + UnlockBufHdr(buf_hdr); - if (XLogNeedsFlush(lsn) - && StrategyRejectBuffer(strategy, buf_hdr, from_ring)) - { - LWLockRelease(content_lock); - UnpinBuffer(buf_hdr); - goto again; + if (XLogNeedsFlush(lsn) + && StrategyRejectBuffer(strategy, buf_hdr, from_ring)) + { + LWLockRelease(content_lock); + UnpinBuffer(buf_hdr); + continue; + } } - } - /* OK, do the I/O */ - FlushBuffer(buf_hdr, NULL, IOOBJECT_RELATION, io_context); - LWLockRelease(content_lock); + /* OK, do the I/O */ + FlushBuffer(buf_hdr, NULL, IOOBJECT_RELATION, io_context); + LWLockRelease(content_lock); - ScheduleBufferTagForWriteback(&BackendWritebackContext, io_context, + ScheduleBufferTagForWriteback(&BackendWritebackContext, io_context, &buf_hdr->tag); - } - + } - if (buf_state & BM_VALID) - { + if (buf_state & BM_VALID) + { /* * When a BufferAccessStrategy is in use, blocks evicted from shared * buffers are counted as IOOP_EVICT in the corresponding context @@ -2437,32 +2443,33 @@ again: * we may have been forced to release the buffer due to concurrent * pinners or erroring out. */ - pgstat_count_io_op(IOOBJECT_RELATION, io_context, - from_ring ? IOOP_REUSE : IOOP_EVICT, 1, 0); - } + pgstat_count_io_op(IOOBJECT_RELATION, io_context, + from_ring ? IOOP_REUSE : IOOP_EVICT, 1, 0); + } - /* - * If the buffer has an entry in the buffer mapping table, delete it. This - * can fail because another backend could have pinned or dirtied the - * buffer. - */ - if ((buf_state & BM_TAG_VALID) && !InvalidateVictimBuffer(buf_hdr)) - { - UnpinBuffer(buf_hdr); - goto again; - } + /* + * If the buffer has an entry in the buffer mapping table, delete it. This + * can fail because another backend could have pinned or dirtied the + * buffer. + */ + if ((buf_state & BM_TAG_VALID) && !InvalidateVictimBuffer(buf_hdr)) + { + UnpinBuffer(buf_hdr); + continue; + } - /* a final set of sanity checks */ + /* a final set of sanity checks */ #ifdef USE_ASSERT_CHECKING - buf_state = pg_atomic_read_u32(&buf_hdr->state); + buf_state = pg_atomic_read_u32(&buf_hdr->state); - Assert(BUF_STATE_GET_REFCOUNT(buf_state) == 1); - Assert(!(buf_state & (BM_TAG_VALID | BM_VALID | BM_DIRTY))); + Assert(BUF_STATE_GET_REFCOUNT(buf_state) == 1); + Assert(!(buf_state & (BM_TAG_VALID | BM_VALID | BM_DIRTY))); - CheckBufferIsPinnedOnce(buf); + CheckBufferIsPinnedOnce(buf); #endif - return buf; + return buf; + } /* end of for(;;) */ } /* @@ -3329,6 +3336,92 @@ TrackNewBufferPin(Buffer buf) #define ST_DEFINE #include "lib/sort_template.h" +static void +ResetBufWriteBatch(BufWriteBatch *batch) +{ + batch->n = 0; + batch->max_lsn = InvalidXLogRecPtr; + batch->reln = NULL; + WritebackContextInit(&batch->wb_context, &checkpoint_flush_after); +} + +/* + * SyncOneBuffer -- process a single buffer during syncing. + * + * If skip_recently_used is true, we don't write currently-pinned buffers, nor + * buffers marked recently used, as these are not replacement candidates. + * + * Returns a bitmask containing the following flag bits: + * BUF_WRITTEN: we wrote the buffer. + * BUF_REUSABLE: buffer is available for replacement, ie, it has + * pin count 0 and usage count 0. + * + * (BUF_WRITTEN could be set in error if FlushBuffer finds the buffer clean + * after locking it, but we don't care all that much.) + */ +static int +SyncOneBuffer(int buf_id, bool skip_recently_used, WritebackContext *wb_context) +{ + BufferDesc *bufHdr = GetBufferDescriptor(buf_id); + int result = 0; + uint32 buf_state; + BufferTag tag; + + /* Make sure we can handle the pin */ + ReservePrivateRefCountEntry(); + ResourceOwnerEnlarge(CurrentResourceOwner); + + /* + * Check whether buffer needs writing. + * + * We can make this check without taking the buffer content lock so long + * as we mark pages dirty in access methods *before* logging changes with + * XLogInsert(): if someone marks the buffer dirty just after our check we + * don't worry because our checkpoint.redo points before log record for + * upcoming changes and so we are not required to write such dirty buffer. + */ + buf_state = LockBufHdr(bufHdr); + + if (BUF_STATE_GET_REFCOUNT(buf_state) == 0 && + BUF_STATE_GET_USAGECOUNT(buf_state) == 0) + { + result |= BUF_REUSABLE; + } + else if (skip_recently_used) + { + /* Caller told us not to write recently-used buffers */ + UnlockBufHdr(bufHdr); + return result; + } + + if (!(buf_state & BM_VALID) || !(buf_state & BM_DIRTY)) + { + /* It's clean, so nothing to do */ + UnlockBufHdr(bufHdr); + return result; + } + + /* + * Pin it, share-lock it, write it. (FlushBuffer will do nothing if the + * buffer is clean by the time we've locked it.) + */ + PinBuffer_Locked(bufHdr); + + FlushUnlockedBuffer(bufHdr, NULL, IOOBJECT_RELATION, IOCONTEXT_NORMAL); + + tag = bufHdr->tag; + + UnpinBuffer(bufHdr); + + /* + * SyncOneBuffer() is only called by checkpointer and bgwriter, so + * IOContext will always be IOCONTEXT_NORMAL. + */ + ScheduleBufferTagForWriteback(wb_context, IOCONTEXT_NORMAL, &tag); + + return result | BUF_WRITTEN; +} + /* * BufferSync -- Write out all dirty buffers in the pool. * @@ -3344,144 +3437,81 @@ BufferSync(int flags) { uint32 buf_state; int buf_id; - int num_to_scan; - int num_spaces; - int num_processed; - int num_written; + int num_to_scan = 0; + int num_spaces = 0; + int num_processed = 0; + int num_written = 0; CkptTsStatus *per_ts_stat = NULL; Oid last_tsid; binaryheap *ts_heap; - int i; - uint32 mask = BM_DIRTY; WritebackContext wb_context; + uint32 mask = BM_DIRTY; + int i; - /* - * Unless this is a shutdown checkpoint or we have been explicitly told, - * we write only permanent, dirty buffers. But at shutdown or end of - * recovery, we write all dirty buffers. - */ - if (!((flags & (CHECKPOINT_IS_SHUTDOWN | CHECKPOINT_END_OF_RECOVERY | + /* Determine which buffers must be written */ + if (!((flags & (CHECKPOINT_IS_SHUTDOWN | + CHECKPOINT_END_OF_RECOVERY | CHECKPOINT_FLUSH_UNLOGGED)))) mask |= BM_PERMANENT; - /* - * Loop over all buffers, and mark the ones that need to be written with - * BM_CHECKPOINT_NEEDED. Count them as we go (num_to_scan), so that we - * can estimate how much work needs to be done. - * - * This allows us to write only those pages that were dirty when the - * checkpoint began, and not those that get dirtied while it proceeds. - * Whenever a page with BM_CHECKPOINT_NEEDED is written out, either by us - * later in this function, or by normal backends or the bgwriter cleaning - * scan, the flag is cleared. Any buffer dirtied after this point won't - * have the flag set. - * - * Note that if we fail to write some buffer, we may leave buffers with - * BM_CHECKPOINT_NEEDED still set. This is OK since any such buffer would - * certainly need to be written for the next checkpoint attempt, too. - */ - num_to_scan = 0; + /* identify dirty buffers at checkpoint start */ for (buf_id = 0; buf_id < NBuffers; buf_id++) { BufferDesc *bufHdr = GetBufferDescriptor(buf_id); uint32 set_bits = 0; - /* - * Header spinlock is enough to examine BM_DIRTY, see comment in - * SyncOneBuffer. - */ buf_state = LockBufHdr(bufHdr); if ((buf_state & mask) == mask) { - CkptSortItem *item; + CkptSortItem *item = &CkptBufferIds[num_to_scan++]; set_bits = BM_CHECKPOINT_NEEDED; - item = &CkptBufferIds[num_to_scan++]; - item->buf_id = buf_id; - item->tsId = bufHdr->tag.spcOid; - item->relNumber = BufTagGetRelNumber(&bufHdr->tag); - item->forkNum = BufTagGetForkNum(&bufHdr->tag); - item->blockNum = bufHdr->tag.blockNum; + item->buf_id = buf_id; + item->tsId = bufHdr->tag.spcOid; + item->relNumber = bufHdr->tag.relNumber; + item->forkNum = bufHdr->tag.forkNum; + item->blockNum = bufHdr->tag.blockNum; } UnlockBufHdrExt(bufHdr, buf_state, - set_bits, 0, - 0); + set_bits, 0, 0); - /* Check for barrier events in case NBuffers is large. */ if (ProcSignalBarrierPending) ProcessProcSignalBarrier(); } if (num_to_scan == 0) - return; /* nothing to do */ + return; WritebackContextInit(&wb_context, &checkpoint_flush_after); TRACE_POSTGRESQL_BUFFER_SYNC_START(NBuffers, num_to_scan); - /* - * Sort buffers that need to be written to reduce the likelihood of random - * IO. The sorting is also important for the implementation of balancing - * writes between tablespaces. Without balancing writes we'd potentially - * end up writing to the tablespaces one-by-one; possibly overloading the - * underlying system. - */ + /* Sort by tablespace */ sort_checkpoint_bufferids(CkptBufferIds, num_to_scan); - num_spaces = 0; - - /* - * Allocate progress status for each tablespace with buffers that need to - * be flushed. This requires the to-be-flushed array to be sorted. - */ + /* Build per-tablespace progress tracking */ last_tsid = InvalidOid; + for (i = 0; i < num_to_scan; i++) { + Oid cur_tsid = CkptBufferIds[i].tsId; CkptTsStatus *s; - Oid cur_tsid; - - cur_tsid = CkptBufferIds[i].tsId; - /* - * Grow array of per-tablespace status structs, every time a new - * tablespace is found. - */ - if (last_tsid == InvalidOid || last_tsid != cur_tsid) + if (last_tsid == InvalidOid || cur_tsid != last_tsid) { - Size sz; - num_spaces++; - - /* - * Not worth adding grow-by-power-of-2 logic here - even with a - * few hundred tablespaces this should be fine. - */ - sz = sizeof(CkptTsStatus) * num_spaces; - - if (per_ts_stat == NULL) - per_ts_stat = (CkptTsStatus *) palloc(sz); - else - per_ts_stat = (CkptTsStatus *) repalloc(per_ts_stat, sz); + per_ts_stat = (num_spaces == 1) + ? palloc(sizeof(CkptTsStatus)) + : repalloc(per_ts_stat, sizeof(CkptTsStatus) * num_spaces); s = &per_ts_stat[num_spaces - 1]; memset(s, 0, sizeof(*s)); - s->tsId = cur_tsid; - - /* - * The first buffer in this tablespace. As CkptBufferIds is sorted - * by tablespace all (s->num_to_scan) buffers in this tablespace - * will follow afterwards. - */ + s->tsId = cur_tsid; s->index = i; - /* - * progress_slice will be determined once we know how many buffers - * are in each tablespace, i.e. after this loop. - */ - last_tsid = cur_tsid; } else @@ -3491,117 +3521,132 @@ BufferSync(int flags) s->num_to_scan++; - /* Check for barrier events. */ if (ProcSignalBarrierPending) ProcessProcSignalBarrier(); } - Assert(num_spaces > 0); - - /* - * Build a min-heap over the write-progress in the individual tablespaces, - * and compute how large a portion of the total progress a single - * processed buffer is. - */ ts_heap = binaryheap_allocate(num_spaces, ts_ckpt_progress_comparator, NULL); for (i = 0; i < num_spaces; i++) { - CkptTsStatus *ts_stat = &per_ts_stat[i]; - - ts_stat->progress_slice = (float8) num_to_scan / ts_stat->num_to_scan; + CkptTsStatus *st = &per_ts_stat[i]; - binaryheap_add_unordered(ts_heap, PointerGetDatum(ts_stat)); + st->progress_slice = (float8) num_to_scan / st->num_to_scan; + binaryheap_add_unordered(ts_heap, PointerGetDatum(st)); } binaryheap_build(ts_heap); - /* - * Iterate through to-be-checkpointed buffers and write the ones (still) - * marked with BM_CHECKPOINT_NEEDED. The writes are balanced between - * tablespaces; otherwise the sorting would lead to only one tablespace - * receiving writes at a time, making inefficient use of the hardware. - */ - num_processed = 0; - num_written = 0; + /* write combining state */ + BufWriteBatch batch; + ResetBufWriteBatch(&batch); + while (!binaryheap_empty(ts_heap)) { - BufferDesc *bufHdr = NULL; - CkptTsStatus *ts_stat = (CkptTsStatus *) - DatumGetPointer(binaryheap_first(ts_heap)); + CkptTsStatus *ts_stat = + (CkptTsStatus *) DatumGetPointer(binaryheap_first(ts_heap)); buf_id = CkptBufferIds[ts_stat->index].buf_id; - Assert(buf_id != -1); - - bufHdr = GetBufferDescriptor(buf_id); + Assert(buf_id >= 0); + BufferDesc *bufHdr = GetBufferDescriptor(buf_id); num_processed++; /* - * We don't need to acquire the lock here, because we're only looking - * at a single bit. It's possible that someone else writes the buffer - * and clears the flag right after we check, but that doesn't matter - * since SyncOneBuffer will then do nothing. However, there is a - * further race condition: it's conceivable that between the time we - * examine the bit here and the time SyncOneBuffer acquires the lock, - * someone else not only wrote the buffer but replaced it with another - * page and dirtied it. In that improbable case, SyncOneBuffer will - * write the buffer though we didn't need to. It doesn't seem worth - * guarding against this, though. + * Write Combining: + * We accumulate buffers that still have BM_CHECKPOINT_NEEDED. */ if (pg_atomic_read_u32(&bufHdr->state) & BM_CHECKPOINT_NEEDED) { - if (SyncOneBuffer(buf_id, false, &wb_context) & BUF_WRITTEN) + + bool start_new; + BufferDesc *buf = bufHdr; + BufferTag tag = buf->tag; + + /* Extract locator once */ + RelFileLocator rlocator = BufTagGetRelFileLocator(&tag); + + start_new = false; + + if (batch.n == 0) + { + batch.rlocator = rlocator; + batch.forkno = tag.forkNum; + batch.start = tag.blockNum; + } + else { - TRACE_POSTGRESQL_BUFFER_SYNC_WRITTEN(buf_id); - PendingCheckpointerStats.buffers_written++; - num_written++; + bool same_rel = RelFileLocatorEquals(batch.rlocator, rlocator); + bool same_fork = (batch.forkno == tag.forkNum); + bool contiguous = (tag.blockNum == batch.start + batch.n); + + if (!(same_rel && same_fork && contiguous)) + start_new = true; + } + + if (start_new) + { + /* Flush accumulated boatch */ + int w = FlushBufferBatch(&batch, IOCONTEXT_NORMAL); + num_written += w; + PendingCheckpointerStats.buffers_written += w; + ResetBufWriteBatch(&batch); + + /* initialize new batch */ + batch.rlocator = BufTagGetRelFileLocator(&tag); + batch.forkno = tag.forkNum; + batch.start = tag.blockNum; + } + + /* Add buffer to batch */ + batch.bufdescs[batch.n++] = buf; + + /* Track max LSN */ + XLogRecPtr lsn; + BufferNeedsWALFlush(buf, &lsn); + if (lsn > batch.max_lsn) + batch.max_lsn = lsn; + + /* If full, flush immediately */ + if (batch.n == MAX_IO_COMBINE_LIMIT) + { + int w = FlushBufferBatch(&batch, IOCONTEXT_NORMAL); + num_written += w; + PendingCheckpointerStats.buffers_written += w; + ResetBufWriteBatch(&batch); } } - /* - * Measure progress independent of actually having to flush the buffer - * - otherwise writing become unbalanced. - */ + /* Progress accounting */ ts_stat->progress += ts_stat->progress_slice; ts_stat->num_scanned++; ts_stat->index++; - /* Have all the buffers from the tablespace been processed? */ if (ts_stat->num_scanned == ts_stat->num_to_scan) - { binaryheap_remove_first(ts_heap); - } else - { - /* update heap with the new progress */ binaryheap_replace_first(ts_heap, PointerGetDatum(ts_stat)); - } - /* - * Sleep to throttle our I/O rate. - * - * (This will check for barrier events even if it doesn't sleep.) - */ CheckpointWriteDelay(flags, (double) num_processed / num_to_scan); } - /* - * Issue all pending flushes. Only checkpointer calls BufferSync(), so - * IOContext will always be IOCONTEXT_NORMAL. - */ + /* Flush trailing batch */ + if (batch.n > 0) + { + int w = FlushBufferBatch(&batch, IOCONTEXT_NORMAL); + num_written += w; + PendingCheckpointerStats.buffers_written += w; + ResetBufWriteBatch(&batch); + } + + /* Complete writeback */ IssuePendingWritebacks(&wb_context, IOCONTEXT_NORMAL); pfree(per_ts_stat); - per_ts_stat = NULL; binaryheap_free(ts_heap); - /* - * Update checkpoint statistics. As noted above, this doesn't include - * buffers written by other backends or bgwriter scan. - */ CheckpointStats.ckpt_bufs_written += num_written; TRACE_POSTGRESQL_BUFFER_SYNC_DONE(NBuffers, num_written, num_to_scan); @@ -3786,6 +3831,7 @@ BgBufferSync(WritebackContext *wb_context) * a true average we want a fast-attack, slow-decline behavior: we * immediately follow any increase. */ + if (smoothed_alloc <= (float) recent_alloc) smoothed_alloc = recent_alloc; else @@ -3841,8 +3887,7 @@ BgBufferSync(WritebackContext *wb_context) /* Execute the LRU scan */ while (num_to_scan > 0 && reusable_buffers < upcoming_alloc_est) { - int sync_state = SyncOneBuffer(next_to_clean, true, - wb_context); + int sync_state = SyncOneBuffer(next_to_clean, true, wb_context); if (++next_to_clean >= NBuffers) { @@ -3902,83 +3947,6 @@ BgBufferSync(WritebackContext *wb_context) return (bufs_to_lap == 0 && recent_alloc == 0); } -/* - * SyncOneBuffer -- process a single buffer during syncing. - * - * If skip_recently_used is true, we don't write currently-pinned buffers, nor - * buffers marked recently used, as these are not replacement candidates. - * - * Returns a bitmask containing the following flag bits: - * BUF_WRITTEN: we wrote the buffer. - * BUF_REUSABLE: buffer is available for replacement, ie, it has - * pin count 0 and usage count 0. - * - * (BUF_WRITTEN could be set in error if FlushBuffer finds the buffer clean - * after locking it, but we don't care all that much.) - */ -static int -SyncOneBuffer(int buf_id, bool skip_recently_used, WritebackContext *wb_context) -{ - BufferDesc *bufHdr = GetBufferDescriptor(buf_id); - int result = 0; - uint32 buf_state; - BufferTag tag; - - /* Make sure we can handle the pin */ - ReservePrivateRefCountEntry(); - ResourceOwnerEnlarge(CurrentResourceOwner); - - /* - * Check whether buffer needs writing. - * - * We can make this check without taking the buffer content lock so long - * as we mark pages dirty in access methods *before* logging changes with - * XLogInsert(): if someone marks the buffer dirty just after our check we - * don't worry because our checkpoint.redo points before log record for - * upcoming changes and so we are not required to write such dirty buffer. - */ - buf_state = LockBufHdr(bufHdr); - - if (BUF_STATE_GET_REFCOUNT(buf_state) == 0 && - BUF_STATE_GET_USAGECOUNT(buf_state) == 0) - { - result |= BUF_REUSABLE; - } - else if (skip_recently_used) - { - /* Caller told us not to write recently-used buffers */ - UnlockBufHdr(bufHdr); - return result; - } - - if (!(buf_state & BM_VALID) || !(buf_state & BM_DIRTY)) - { - /* It's clean, so nothing to do */ - UnlockBufHdr(bufHdr); - return result; - } - - /* - * Pin it, share-lock it, write it. (FlushBuffer will do nothing if the - * buffer is clean by the time we've locked it.) - */ - PinBuffer_Locked(bufHdr); - - FlushUnlockedBuffer(bufHdr, NULL, IOOBJECT_RELATION, IOCONTEXT_NORMAL); - - tag = bufHdr->tag; - - UnpinBuffer(bufHdr); - - /* - * SyncOneBuffer() is only called by checkpointer and bgwriter, so - * IOContext will always be IOCONTEXT_NORMAL. - */ - ScheduleBufferTagForWriteback(wb_context, IOCONTEXT_NORMAL, &tag); - - return result | BUF_WRITTEN; -} - /* * AtEOXact_Buffers - clean up at end of transaction. * @@ -4412,6 +4380,169 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object, error_context_stack = errcallback.previous; } +/* + * BufferNeedsWALFlush + * + * Returns true if this buffer belongs to a permanent relation AND + * page LSN indicates - WAL must be flushed before writing it. + * + * If no flush is needed, *lsn is set to InvalidXLogRecPtr. + */ +static bool +BufferNeedsWALFlush(BufferDesc *bufdesc, XLogRecPtr *lsn) +{ + uint32 buf_state; + + /* Lock header to read BM_PERMANENT and LSN safely */ + buf_state = LockBufHdr(bufdesc); + + /* Extract LSN only if it is a permanent buffer */ + if (buf_state & BM_PERMANENT) + *lsn = BufferGetLSN(bufdesc); + else + *lsn = InvalidXLogRecPtr; + + UnlockBufHdr(bufdesc); + + /* Skip all WAL flush logic if relation is not logged */ + if (!(*lsn != InvalidXLogRecPtr)) + return false; + + /* Must flush WAL up to this LSN before writing the page */ + return XLogNeedsFlush(*lsn); +} + +/* + * CleanVictimBuffer + * + * Called by checkpointer/bgwriter when selecting a buffer to flush in a + * write-combining batch. The buffer must already be pinned. + * + */ +static void +CleanVictimBuffer(BufferDesc *bufdesc, bool from_ring, IOContext io_context) +{ + XLogRecPtr max_lsn = InvalidXLogRecPtr; + LWLock *content_lock; + + /* + * Acquire share-lock on buffer contents. We must hold this until either: + * - we decide the buffer does not need flushing, OR + * - we finish preparing it for a write. + */ + content_lock = BufHdrGetContentLock(bufdesc); + LWLockAcquire(content_lock, LW_SHARED); + + /* + * Now the buffer is a valid flush target. + * Switch to exclusive lock for checksum + IO preparation. + */ + LWLockRelease(content_lock); + LWLockAcquire(content_lock, LW_EXCLUSIVE); + + /* + * Mark the buffer ready for checksum and write. + */ + PrepareBufferForCheckpoint(bufdesc, &max_lsn); + + /* Release exclusive lock; the batch will write the page later */ + LWLockRelease(content_lock); + + /* + * Add LSN to caller's batch tracking. + * Caller handles XLogFlush() using highest LSN. + */ + PrepareBufferForCheckpoint(bufdesc, max_lsn); +} + +/* + * FlushBufferBatch + * + * Write a batch of contiguous dirty buffers belonging to the same + * relation + fork, using a single smgrwritev() call. + * + * This is the minimal correct version: + * - prepare buffers (caller has already selected them) + * - flush WAL up to max_lsn + * - compute batch checksums + * - write using smgrwritev() + * - schedule writeback + */ +int +FlushBufferBatch(BufWriteBatch *batch, IOContext io_context) +{ + uint32 n = batch->n; + BlockNumber blknums[MAX_IO_COMBINE_LIMIT]; + Page pages[MAX_IO_COMBINE_LIMIT]; + instr_time io_start; + int written = 0; + + /* Nothing to do */ + if (n == 0) + return 0; + + /* Error context for cleaner messages on I/O failure */ + ErrorContextCallback errcallback; + errcallback.callback = shared_buffer_write_error_callback; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* Flush WAL if required */ + if (XLogRecPtrIsValid(batch->max_lsn)) + XLogFlush(batch->max_lsn); + + /* Open smgr relation if needed */ + if (batch->reln == NULL) + batch->reln = smgropen(batch->rlocator, INVALID_PROC_NUMBER); + + /* + * Copy buffer pages into local array and prepare block numbers. + * (Later patches may eliminate this copying.) + */ + for (uint32 i = 0; i < n; i++) + { + BufferDesc *buf = batch->bufdescs[i]; + blknums[i] = batch->start + i; + pages[i] = (Page) BufHdrGetBlock(buf); + } + + /* Compute checksums over the batch */ + PageSetBatchChecksumInplace(pages, blknums, n); + + /* IO timing */ + io_start = pgstat_prepare_io_time(track_io_timing); + + /* Write all pages in one system call */ + smgrwritev(batch->reln, + batch->forkno, + batch->start, + (const void **) pages, + n, + false); + + pgstat_count_io_op_time(IOOBJECT_RELATION, + io_context, + IOOP_WRITE, + io_start, + n, + BLCKSZ); + + /* Schedule writeback for each buffer */ + for (uint32 i = 0; i < n; i++) + { + BufferDesc *buf = batch->bufdescs[i]; + ScheduleBufferTagForWriteback(&batch->wb_context, + io_context, + &buf->tag); + written++; + } + + /* restore error callback */ + error_context_stack = errcallback.previous; + + return written; +} + /* * Convenience wrapper around FlushBuffer() that locks/unlocks the buffer * before/after calling FlushBuffer(). @@ -5083,12 +5214,13 @@ FlushRelationsAllBuffers(SMgrRelation *smgrs, int nrels) } else { - RelFileLocator rlocator; + RelFileLocator rlocator = BufTagGetRelFileLocator(&bufHdr->tag); - rlocator = BufTagGetRelFileLocator(&bufHdr->tag); - srelent = bsearch(&rlocator, - srels, nrels, sizeof(SMgrSortArray), - rlocator_comparator); + srelent = bsearch(&rlocator, + srels, + nrels, + sizeof(SMgrSortArray), + rlocator_comparator); } /* buffer doesn't belong to any of the given relfilelocators; skip it */ diff --git a/src/backend/storage/page/bufpage.c b/src/backend/storage/page/bufpage.c index aac6e695954..408a4abc444 100644 --- a/src/backend/storage/page/bufpage.c +++ b/src/backend/storage/page/bufpage.c @@ -1546,3 +1546,22 @@ PageSetChecksumInplace(Page page, BlockNumber blkno) ((PageHeader) page)->pd_checksum = pg_checksum_page(page, blkno); } + +void +PageSetBatchChecksumInplace(Page *pages, const BlockNumber *blknos, uint32 length) +{ + /* If we don't need a checksum, just return */ + if (!DataChecksumsEnabled()) + return; + + for (uint32 i = 0; i < length; i++) + { + Page page = pages[i]; + + if (PageIsNew(page)) + continue; + + ((PageHeader) page)->pd_checksum = pg_checksum_page(page, blknos[i]); + } +} + diff --git a/src/include/storage/buf_internals.h b/src/include/storage/buf_internals.h index 5400c56a965..6a75233c1e6 100644 --- a/src/include/storage/buf_internals.h +++ b/src/include/storage/buf_internals.h @@ -482,6 +482,28 @@ ResourceOwnerForgetBufferIO(ResourceOwner owner, Buffer buffer) { ResourceOwnerForget(owner, Int32GetDatum(buffer), &buffer_io_resowner_desc); } +/* +* Used to write out multiple blocks at a time in a combined IO. bufdescs +* contains buffer descriptors for buffers containing adjacent blocks of the +* same fork of the same relation. +*/ +typedef struct BufWriteBatch +{ + RelFileLocator rlocator; + ForkNumber forkno; + SMgrRelation reln; + + BlockNumber start; + + XLogRecPtr max_lsn; + + WritebackContext wb_context; + + uint32 n; + BufferDesc *bufdescs[MAX_IO_COMBINE_LIMIT]; +} BufWriteBatch; + + /* * Internal buffer management routines diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h index 9f6785910e0..f9e23838e48 100644 --- a/src/include/storage/bufmgr.h +++ b/src/include/storage/bufmgr.h @@ -223,7 +223,9 @@ extern Buffer ReadBufferWithoutRelcache(RelFileLocator rlocator, ForkNumber forkNum, BlockNumber blockNum, ReadBufferMode mode, BufferAccessStrategy strategy, bool permanent); - +extern void PageSetBatchChecksumInplace(Page *pages, + const BlockNumber *blknos, + uint32 length); extern bool StartReadBuffer(ReadBuffersOperation *operation, Buffer *buffer, BlockNumber blocknum, -- 2.34.1
