Hi, Here are some vectored writeback patches I worked on in the 17 cycle and posted as part of various patch sets, but didn't get into a good enough shape to take further. They "push" vectored writes out, but I think what they need is to be turned inside out and converted into users of a new hypothetical write_stream.c, so that we have a model that will survive contact with asynchronous I/O and would "pull" writes from a stream that controls I/O concurrency. That all seemed a lot less urgent to work on than reads, hence leaving on ice for now. There is a lot of code that reads, and a small finite amount that writes. I think the patches show some aspects of the problem-space though, and they certainly make checkpointing faster. They cover 2 out of 5ish ways we write relation data: checkpointing, and strategies AKA ring buffers.
They make checkpoints look like this, respecting io_combine_limit, instead of lots of 8kB writes: pwritev(9,[...],2,0x0) = 131072 (0x20000) pwrite(9,...,131072,0x20000) = 131072 (0x20000) pwrite(9,...,131072,0x40000) = 131072 (0x20000) pwrite(9,...,131072,0x60000) = 131072 (0x20000) pwrite(9,...,131072,0x80000) = 131072 (0x20000) ... Two more ways data gets written back are: bgwriter and regular BAS_NORMAL buffer eviction, but they are not such natural candidates for write combining. Well, if you know you're going to write out a buffer, *maybe* it's worth probing the buffer pool to see if adjacent block numbers are also present and dirty? I don't know. Before and after? Or maybe it's better to wait for the tree-based mapping table of legend first so it becomes cheaper to navigate in block number order. The 5th way is raw file copy that doesn't go through the buffer pool, such as CREATE DATABASE ... STRATEGY=FILE_COPY, which already works with big writes, and CREATE INDEX via bulk_write.c which is easily converted to vectored writes, and I plan to push the patches for that shortly. I think those should ultimately become stream-based too. Anyway, I wanted to share these uncommitfest patches, having rebased them over relevant recent commits, so I could leave them in working state in case anyone is interested in this file I/O-level stuff...
From c6d227678c586387a49c30c4f9a61f62c9b04b1c Mon Sep 17 00:00:00 2001 From: Thomas Munro <thomas.mu...@gmail.com> Date: Wed, 13 Mar 2024 17:02:42 +1300 Subject: [PATCH v5 1/3] Provide vectored variant of FlushBuffer(). FlushBuffers() is just like FlushBuffer() except that it tries to write out multiple consecutive disk blocks at a time with smgrwritev(). The traditional simple FlushBuffer() call is now implemented in terms of it. --- src/backend/storage/buffer/bufmgr.c | 240 ++++++++++++++++++++-------- src/include/storage/buf_internals.h | 10 ++ 2 files changed, 185 insertions(+), 65 deletions(-) diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index 06e9ffd2b00..a815e58e307 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -57,6 +57,7 @@ #include "storage/smgr.h" #include "storage/standby.h" #include "utils/memdebug.h" +#include "utils/memutils.h" #include "utils/ps_status.h" #include "utils/rel.h" #include "utils/resowner.h" @@ -526,6 +527,8 @@ static inline BufferDesc *BufferAlloc(SMgrRelation smgr, static Buffer GetVictimBuffer(BufferAccessStrategy strategy, IOContext io_context); static void FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object, IOContext io_context); +static int FlushBuffers(BufferDesc **bufs, int nbuffers, SMgrRelation reln, + IOObject io_object, IOContext io_context); static void FindAndDropRelationBuffers(RelFileLocator rlocator, ForkNumber forkNum, BlockNumber nForkBlock, @@ -3705,9 +3708,19 @@ BufferGetTag(Buffer buffer, RelFileLocator *rlocator, ForkNumber *forknum, *blknum = bufHdr->tag.blockNum; } +struct shared_buffer_write_error_info +{ + BufferDesc *buf; + int nblocks; +}; + /* - * FlushBuffer - * Physically write out a shared buffer. + * FlushBuffers + * Physically write out shared buffers. + * + * The buffers do not have to be consecutive in memory but must refer to + * consecutive blocks of the same relation fork in increasing order of block + * number. * * NOTE: this actually just passes the buffer contents to the kernel; the * real write to disk won't happen until the kernel feels like it. This @@ -3715,66 +3728,149 @@ BufferGetTag(Buffer buffer, RelFileLocator *rlocator, ForkNumber *forknum, * However, we will need to force the changes to disk via fsync before * we can checkpoint WAL. * - * The caller must hold a pin on the buffer and have share-locked the + * The caller must hold a pin on the buffers and have share-locked the * buffer contents. (Note: a share-lock does not prevent updates of * hint bits in the buffer, so the page could change while the write * is in progress, but we assume that that will not invalidate the data * written.) * * If the caller has an smgr reference for the buffer's relation, pass it - * as the second parameter. If not, pass NULL. + * as the third parameter. If not, pass NULL. */ -static void -FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object, - IOContext io_context) +static int +FlushBuffers(BufferDesc **bufs, int nbuffers, SMgrRelation reln, + IOObject io_object, IOContext io_context) { - XLogRecPtr recptr; + XLogRecPtr max_recptr; + struct shared_buffer_write_error_info errinfo; ErrorContextCallback errcallback; instr_time io_start; - Block bufBlock; - char *bufToWrite; - uint32 buf_state; + int first_start_io_index; + void *bufBlocks[MAX_IO_COMBINE_LIMIT]; + bool need_checksums = DataChecksumsEnabled(); + static PGIOAlignedBlock *copies; + + Assert(nbuffers > 0); + nbuffers = Min(nbuffers, io_combine_limit); /* - * Try to start an I/O operation. If StartBufferIO returns false, then - * someone else flushed the buffer before we could, so we need not do - * anything. + * Update page checksums if desired. Since we have only shared lock on + * the buffer, other processes might be updating hint bits in it, so we + * must copy the page to private storage if we do checksumming. + * + * XXX:TODO kill static local memory, or better yet, kill unlocked hint + * bit modifications so we don't need this */ - if (!StartBufferIO(buf, false, false)) - return; + if (need_checksums) + { + if (!copies) + copies = MemoryContextAllocAligned(TopMemoryContext, + BLCKSZ * io_combine_limit, + PG_IO_ALIGN_SIZE, + 0); + for (int i = 0; i < nbuffers; ++i) + { + memcpy(&copies[i], BufHdrGetBlock(bufs[i]), BLCKSZ); + PageSetChecksumInplace((Page) &copies[i], + bufs[0]->tag.blockNum + i); + } + } + + /* + * Try to start an I/O operation on as many buffers as we can. If + * StartBufferIO returns false, then someone else flushed the buffer + * before we could, so we need not do anything any we give up on any + * remaining buffers, as they are not consecutive with the blocks we've + * collected. + */ + first_start_io_index = -1; + for (int i = 0; i < nbuffers; ++i) + { + /* Must be consecutive blocks. */ + if (i > 0) + Assert(BufferTagsConsecutive(&bufs[i - 1]->tag, &bufs[i]->tag)); + + /* Only wait for the first one, so we can guarantee progress. */ + if (!StartBufferIO(bufs[i], false, i > 0)) + { + if (first_start_io_index >= 0) + { + /* + * We can't go any further because later blocks after this gap + * would not be consecutive. Cap nbuffers here. + */ + nbuffers = i; + break; + } + else + { + /* + * Still searching for the first block we can win the right to + * start I/O on, so it doesn't matter that we couldn't get + * this one. + */ + } + } + else + { + /* Keep track of the first block we started I/O on. */ + if (first_start_io_index < 0) + first_start_io_index = i; + } + /* Collect the source buffers (copies or shared buffers). */ + bufBlocks[i] = need_checksums ? &copies[i] : BufHdrGetBlock(bufs[i]); + } + + /* If we can't write even one buffer, then we're done. */ + if (first_start_io_index < 0) + return nbuffers; /* Setup error traceback support for ereport() */ + errinfo.buf = bufs[first_start_io_index]; + errinfo.nblocks = nbuffers; errcallback.callback = shared_buffer_write_error_callback; - errcallback.arg = (void *) buf; + errcallback.arg = &errinfo; errcallback.previous = error_context_stack; error_context_stack = &errcallback; /* Find smgr relation for buffer */ if (reln == NULL) - reln = smgropen(BufTagGetRelFileLocator(&buf->tag), INVALID_PROC_NUMBER); + reln = smgropen(BufTagGetRelFileLocator(&bufs[first_start_io_index]->tag), + INVALID_PROC_NUMBER); - TRACE_POSTGRESQL_BUFFER_FLUSH_START(BufTagGetForkNum(&buf->tag), - buf->tag.blockNum, + TRACE_POSTGRESQL_BUFFER_FLUSH_START(BufTagGetForkNum(&bufs[first_start_io_index]->tag), + bufs[first_start_io_index]->tag.blockNum, reln->smgr_rlocator.locator.spcOid, reln->smgr_rlocator.locator.dbOid, reln->smgr_rlocator.locator.relNumber); - buf_state = LockBufHdr(buf); + /* Find the highest LSN across all the I/O-started buffers. */ + max_recptr = 0; + for (int i = first_start_io_index; i < nbuffers; ++i) + { + XLogRecPtr page_recptr; + uint32 buf_state; - /* - * Run PageGetLSN while holding header lock, since we don't have the - * buffer locked exclusively in all cases. - */ - recptr = BufferGetLSN(buf); + buf_state = LockBufHdr(bufs[i]); - /* To check if block content changes while flushing. - vadim 01/17/97 */ - buf_state &= ~BM_JUST_DIRTIED; - UnlockBufHdr(buf, buf_state); + /* + * Run PageGetLSN while holding header lock, since we don't have the + * buffer locked exclusively in all cases. + */ + page_recptr = BufferGetLSN(bufs[i]); + + /* To check if block content changes while flushing. - vadim 01/17/97 */ + buf_state &= ~BM_JUST_DIRTIED; + UnlockBufHdr(bufs[i], buf_state); + + if (buf_state & BM_PERMANENT) + max_recptr = Max(page_recptr, max_recptr); + } /* - * Force XLOG flush up to buffer's LSN. This implements the basic WAL - * rule that log updates must hit disk before any of the data-file changes - * they describe do. + * Force XLOG flush up to buffers' greatest LSN. This implements the + * basic WAL rule that log updates must hit disk before any of the + * data-file changes they describe do. * * However, this rule does not apply to unlogged relations, which will be * lost after a crash anyway. Most unlogged relation pages do not bear @@ -3788,33 +3884,23 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object, * disastrous system-wide consequences. To make sure that can't happen, * skip the flush if the buffer isn't permanent. */ - if (buf_state & BM_PERMANENT) - XLogFlush(recptr); + if (max_recptr > 0) + XLogFlush(max_recptr); /* - * Now it's safe to write buffer to disk. Note that no one else should + * Now it's safe to write buffers to disk. Note that no one else should * have been able to write it while we were busy with log flushing because - * only one process at a time can set the BM_IO_IN_PROGRESS bit. + * only one process at a time can set the BM_IO_IN_PROGRESS bits. */ - bufBlock = BufHdrGetBlock(buf); - - /* - * Update page checksum if desired. Since we have only shared lock on the - * buffer, other processes might be updating hint bits in it, so we must - * copy the page to private storage if we do checksumming. - */ - bufToWrite = PageSetChecksumCopy((Page) bufBlock, buf->tag.blockNum); io_start = pgstat_prepare_io_time(track_io_timing); - /* - * bufToWrite is either the shared buffer or a copy, as appropriate. - */ - smgrwrite(reln, - BufTagGetForkNum(&buf->tag), - buf->tag.blockNum, - bufToWrite, - false); + smgrwritev(reln, + BufTagGetForkNum(&bufs[first_start_io_index]->tag), + bufs[first_start_io_index]->tag.blockNum, + (const void **) &bufBlocks[first_start_io_index], + nbuffers - first_start_io_index, + false); /* * When a strategy is in use, only flushes of dirty buffers already in the @@ -3835,24 +3921,41 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object, * of a dirty shared buffer (IOCONTEXT_NORMAL IOOP_WRITE). */ pgstat_count_io_op_time(IOOBJECT_RELATION, io_context, - IOOP_WRITE, io_start, 1); + IOOP_WRITE, io_start, + nbuffers - first_start_io_index); - pgBufferUsage.shared_blks_written++; + pgBufferUsage.shared_blks_written += nbuffers - first_start_io_index; /* - * Mark the buffer as clean (unless BM_JUST_DIRTIED has become set) and + * Mark the buffers as clean (unless BM_JUST_DIRTIED has become set) and * end the BM_IO_IN_PROGRESS state. */ - TerminateBufferIO(buf, true, 0, true); + for (int i = first_start_io_index; i < nbuffers; ++i) + TerminateBufferIO(bufs[i], true, 0, true); - TRACE_POSTGRESQL_BUFFER_FLUSH_DONE(BufTagGetForkNum(&buf->tag), - buf->tag.blockNum, + TRACE_POSTGRESQL_BUFFER_FLUSH_DONE(BufTagGetForkNum(&bufs[first_start_io_index]->tag), + bufs[0]->tag.blockNum, reln->smgr_rlocator.locator.spcOid, reln->smgr_rlocator.locator.dbOid, reln->smgr_rlocator.locator.relNumber); /* Pop the error context stack */ error_context_stack = errcallback.previous; + + return nbuffers; +} + +/* + * FlushBuffer + * Physically write out just one shared buffer. + * + * Single-block variant of FlushBuffers(). + */ +static void +FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object, + IOContext io_context) +{ + FlushBuffers(&buf, 1, reln, io_object, io_context); } /* @@ -5621,16 +5724,23 @@ AbortBufferIO(Buffer buffer) static void shared_buffer_write_error_callback(void *arg) { - BufferDesc *bufHdr = (BufferDesc *) arg; + struct shared_buffer_write_error_info *info = arg; /* Buffer is pinned, so we can read the tag without locking the spinlock */ - if (bufHdr != NULL) + if (info != NULL) { - char *path = relpathperm(BufTagGetRelFileLocator(&bufHdr->tag), - BufTagGetForkNum(&bufHdr->tag)); - - errcontext("writing block %u of relation %s", - bufHdr->tag.blockNum, path); + char *path = relpathperm(BufTagGetRelFileLocator(&info->buf->tag), + BufTagGetForkNum(&info->buf->tag)); + + if (info->nblocks > 1) + errcontext("writing blocks %u..%u of relation %s", + info->buf->tag.blockNum, + info->buf->tag.blockNum + info->nblocks - 1, + path); + else + errcontext("writing block %u of relation %s", + info->buf->tag.blockNum, + path); pfree(path); } } diff --git a/src/include/storage/buf_internals.h b/src/include/storage/buf_internals.h index f190e6e5e46..9e3a3fbd887 100644 --- a/src/include/storage/buf_internals.h +++ b/src/include/storage/buf_internals.h @@ -160,6 +160,16 @@ BufferTagsEqual(const BufferTag *tag1, const BufferTag *tag2) (tag1->forkNum == tag2->forkNum); } +static inline bool +BufferTagsConsecutive(const BufferTag *tag1, const BufferTag *tag2) +{ + return (tag1->spcOid == tag2->spcOid) && + (tag1->dbOid == tag2->dbOid) && + (tag1->relNumber == tag2->relNumber) && + (tag1->forkNum == tag2->forkNum) && + (tag1->blockNum + 1 == tag2->blockNum); +} + static inline bool BufTagMatchesRelFileLocator(const BufferTag *tag, const RelFileLocator *rlocator) -- 2.44.0
From 3446e14fae9539184b7eb93036cad86739254269 Mon Sep 17 00:00:00 2001 From: Thomas Munro <thomas.mu...@gmail.com> Date: Wed, 13 Mar 2024 17:41:16 +1300 Subject: [PATCH v5 2/3] Use vectored writes in checkpointer. Use FlushBuffers() instead of FlushBuffer(). This often produces wide vectored writes, since the checkpointer already has all the dirty buffers sorted by block number. XXX The spreading isn't right. We nap 16 times and then do a 16-block write. Should we nap once per write, or use longer naps after multi-block writes? XXX This is "push" based. To adopt streaming I/O thinking and drive asynchronous I/O and direct I/O well, what we need is a "pull" based approach where I/O completions cause us to start more writes, so we remain in in control of the I/O depth. --- src/backend/storage/buffer/bufmgr.c | 163 ++++++++++++++++++++++++---- src/include/storage/buf_internals.h | 4 +- 2 files changed, 147 insertions(+), 20 deletions(-) diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index a815e58e307..073088ab938 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -528,7 +528,8 @@ static Buffer GetVictimBuffer(BufferAccessStrategy strategy, IOContext io_contex static void FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object, IOContext io_context); static int FlushBuffers(BufferDesc **bufs, int nbuffers, SMgrRelation reln, - IOObject io_object, IOContext io_context); + IOObject io_object, IOContext io_context, + int *first_written_index, int *written); static void FindAndDropRelationBuffers(RelFileLocator rlocator, ForkNumber forkNum, BlockNumber nForkBlock, @@ -1995,7 +1996,7 @@ again: LWLockRelease(content_lock); ScheduleBufferTagForWriteback(&BackendWritebackContext, io_context, - &buf_hdr->tag); + &buf_hdr->tag, 1); } @@ -2844,6 +2845,72 @@ UnpinBufferNoOwner(BufferDesc *buf) #define ST_DEFINE #include <lib/sort_template.h> +/* + * Flush a range of already pinned buffers that hold consecutive blocks of a + * relation fork. They are not pinned on return. Returns the number that + * were written out (if this is less than nbuffers, it is because another + * backend already wrote some out). + */ +static int +SyncBuffers(BufferDesc **bufs, int nbuffers, + WritebackContext *wb_context) +{ + int total_written = 0; + + while (nbuffers > 0) + { + int nlocked; + + /* Lock first buffer. */ + LWLockAcquire(BufferDescriptorGetContentLock(bufs[0]), LW_SHARED); + nlocked = 1; + + /* Lock as many more as we can without waiting, to avoid deadlocks. */ + while (nlocked < nbuffers && + LWLockConditionalAcquire(BufferDescriptorGetContentLock(bufs[nlocked]), + LW_SHARED)) + nlocked++; + + while (nlocked > 0) + { + int flushed; + int written; + int first_written_index; + + /* + * Flush as many as we can with a single write, which may be fewer + * than requested if buffers in this range turn out to have been + * flushed already, creating gaps between flushable block ranges. + */ + flushed = FlushBuffers(bufs, nlocked, NULL, IOOBJECT_RELATION, + IOCONTEXT_NORMAL, + &first_written_index, &written); + total_written += written; + + /* Unlock in reverse order (currently more efficient). */ + for (int i = flushed - 1; i >= 0; --i) + LWLockRelease(BufferDescriptorGetContentLock(bufs[i])); + + /* Queue writeback control. */ + if (written > 0) + ScheduleBufferTagForWriteback(wb_context, + IOCONTEXT_NORMAL, + &bufs[first_written_index]->tag, + written); + + /* Unpin. */ + for (int i = 0; i < flushed; ++i) + UnpinBuffer(bufs[i]); + + bufs += flushed; + nlocked -= flushed; + nbuffers -= flushed; + } + } + + return total_written; +} + /* * BufferSync -- Write out all dirty buffers in the pool. * @@ -2869,6 +2936,8 @@ BufferSync(int flags) int i; int mask = BM_DIRTY; WritebackContext wb_context; + BufferDesc *range_buffers[MAX_IO_COMBINE_LIMIT]; + int range_nblocks = 0; /* * Unless this is a shutdown checkpoint or we have been explicitly told, @@ -3066,11 +3135,40 @@ BufferSync(int flags) */ if (pg_atomic_read_u32(&bufHdr->state) & BM_CHECKPOINT_NEEDED) { - if (SyncOneBuffer(buf_id, false, &wb_context) & BUF_WRITTEN) + ResourceOwnerEnlarge(CurrentResourceOwner); + ReservePrivateRefCountEntry(); + + buf_state = LockBufHdr(bufHdr); + if (!(buf_state & BM_VALID) || !(buf_state & BM_DIRTY)) { - TRACE_POSTGRESQL_BUFFER_SYNC_WRITTEN(buf_id); - PendingCheckpointerStats.buffers_written++; - num_written++; + /* It's clean, so nothing to do */ + UnlockBufHdr(bufHdr, buf_state); + } + else + { + PinBuffer_Locked(bufHdr); + + /* + * Check if we need to sync the range of buffers we've + * collected so far, because we've collected enough already or + * because our newly pinned buffer is not consecutive with the + * last one. + */ + if (range_nblocks == io_combine_limit || + (range_nblocks > 0 && + !BufferTagsConsecutive(&range_buffers[range_nblocks - 1]->tag, + &bufHdr->tag))) + { + int written; + + written = SyncBuffers(range_buffers, range_nblocks, &wb_context); + range_nblocks = 0; + num_written += written; + PendingCheckpointerStats.buffers_written += written; + } + + /* Collect this pinned buffer in our range. */ + range_buffers[range_nblocks++] = bufHdr; } } @@ -3101,6 +3199,16 @@ BufferSync(int flags) CheckpointWriteDelay(flags, (double) num_processed / num_to_scan); } + /* Sync our final range. */ + if (range_nblocks > 0) + { + int written; + + written = SyncBuffers(range_buffers, range_nblocks, &wb_context); + num_written += written; + PendingCheckpointerStats.buffers_written += written; + } + /* * Issue all pending flushes. Only checkpointer calls BufferSync(), so * IOContext will always be IOCONTEXT_NORMAL. @@ -3490,7 +3598,7 @@ SyncOneBuffer(int buf_id, bool skip_recently_used, WritebackContext *wb_context) * SyncOneBuffer() is only called by checkpointer and bgwriter, so * IOContext will always be IOCONTEXT_NORMAL. */ - ScheduleBufferTagForWriteback(wb_context, IOCONTEXT_NORMAL, &tag); + ScheduleBufferTagForWriteback(wb_context, IOCONTEXT_NORMAL, &tag, 1); return result | BUF_WRITTEN; } @@ -3739,7 +3847,8 @@ struct shared_buffer_write_error_info */ static int FlushBuffers(BufferDesc **bufs, int nbuffers, SMgrRelation reln, - IOObject io_object, IOContext io_context) + IOObject io_object, IOContext io_context, + int *first_written_index, int *written) { XLogRecPtr max_recptr; struct shared_buffer_write_error_info errinfo; @@ -3823,7 +3932,13 @@ FlushBuffers(BufferDesc **bufs, int nbuffers, SMgrRelation reln, /* If we can't write even one buffer, then we're done. */ if (first_start_io_index < 0) + { + if (first_written_index) + *first_written_index = 0; + if (written) + *written = 0; return nbuffers; + } /* Setup error traceback support for ereport() */ errinfo.buf = bufs[first_start_io_index]; @@ -3942,6 +4057,12 @@ FlushBuffers(BufferDesc **bufs, int nbuffers, SMgrRelation reln, /* Pop the error context stack */ error_context_stack = errcallback.previous; + /* Report the range of buffers that we actually wrote, if any. */ + if (first_written_index) + *first_written_index = first_start_io_index; + if (written) + *written = nbuffers - first_start_io_index; + return nbuffers; } @@ -3955,7 +4076,7 @@ static void FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object, IOContext io_context) { - FlushBuffers(&buf, 1, reln, io_object, io_context); + FlushBuffers(&buf, 1, reln, io_object, io_context, NULL, NULL); } /* @@ -5947,11 +6068,11 @@ WritebackContextInit(WritebackContext *context, int *max_pending) } /* - * Add buffer to list of pending writeback requests. + * Add buffer tag range to list of pending writeback requests. */ void ScheduleBufferTagForWriteback(WritebackContext *wb_context, IOContext io_context, - BufferTag *tag) + BufferTag *tag, int nblocks) { PendingWriteback *pending; @@ -5969,6 +6090,7 @@ ScheduleBufferTagForWriteback(WritebackContext *wb_context, IOContext io_context pending = &wb_context->pending_writebacks[wb_context->nr_pending++]; pending->tag = *tag; + pending->nblocks = nblocks; } /* @@ -6025,10 +6147,11 @@ IssuePendingWritebacks(WritebackContext *wb_context, IOContext io_context) int ahead; BufferTag tag; RelFileLocator currlocator; - Size nblocks = 1; + Size nblocks; cur = &wb_context->pending_writebacks[i]; tag = cur->tag; + nblocks = cur->nblocks; currlocator = BufTagGetRelFileLocator(&tag); /* @@ -6037,6 +6160,8 @@ IssuePendingWritebacks(WritebackContext *wb_context, IOContext io_context) */ for (ahead = 0; i + ahead + 1 < wb_context->nr_pending; ahead++) { + BlockNumber this_end; + BlockNumber next_end; next = &wb_context->pending_writebacks[i + ahead + 1]; @@ -6046,15 +6171,15 @@ IssuePendingWritebacks(WritebackContext *wb_context, IOContext io_context) BufTagGetForkNum(&cur->tag) != BufTagGetForkNum(&next->tag)) break; - /* ok, block queued twice, skip */ - if (cur->tag.blockNum == next->tag.blockNum) - continue; - - /* only merge consecutive writes */ - if (cur->tag.blockNum + 1 != next->tag.blockNum) + /* only merge consecutive or overlapping writes */ + if (next->tag.blockNum > tag.blockNum + nblocks) break; - nblocks++; + /* find the nblocks value that covers the end of both */ + this_end = tag.blockNum + nblocks; + next_end = next->tag.blockNum + next->nblocks; + nblocks = Max(this_end, next_end) - tag.blockNum; + cur = next; } diff --git a/src/include/storage/buf_internals.h b/src/include/storage/buf_internals.h index 9e3a3fbd887..93822dd8407 100644 --- a/src/include/storage/buf_internals.h +++ b/src/include/storage/buf_internals.h @@ -301,6 +301,7 @@ typedef struct PendingWriteback { /* could store different types of pending flushes here */ BufferTag tag; + int nblocks; } PendingWriteback; /* struct forward declared in bufmgr.h */ @@ -427,7 +428,8 @@ ResourceOwnerForgetBufferIO(ResourceOwner owner, Buffer buffer) extern void WritebackContextInit(WritebackContext *context, int *max_pending); extern void IssuePendingWritebacks(WritebackContext *wb_context, IOContext io_context); extern void ScheduleBufferTagForWriteback(WritebackContext *wb_context, - IOContext io_context, BufferTag *tag); + IOContext io_context, BufferTag *tag, + int nblocks); /* freelist.c */ extern IOContext IOContextForStrategy(BufferAccessStrategy strategy); -- 2.44.0
From 8a7cfbebf65f714cd2e3f24b490ed219e042e15a Mon Sep 17 00:00:00 2001 From: Thomas Munro <thomas.mu...@gmail.com> Date: Wed, 13 Mar 2024 22:37:07 +1300 Subject: [PATCH v5 3/3] Vectored ring buffer writes. When using eg BAS_VACUUM, we usually finish up writing recently dirtied pages out to disk. If the blocks are sequential, we can try to write out multiple dirty buffers in ring order, to clean them in bulk. XXX Should we try to detect cases when it's not working and suppress the behaviour (ie if the dirty pages are in random order)? XXX The reported statistics need work --- src/backend/storage/buffer/bufmgr.c | 90 +++++++++++++++++++++++++-- src/backend/storage/buffer/freelist.c | 23 +++++++ src/include/storage/buf_internals.h | 1 + 3 files changed, 108 insertions(+), 6 deletions(-) diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index 073088ab938..3a975a05737 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -1976,6 +1976,11 @@ again: if (strategy != NULL) { XLogRecPtr lsn; + Buffer buffers[MAX_IO_COMBINE_LIMIT]; + int nbuffers; + BufferDesc *hdrs[MAX_IO_COMBINE_LIMIT]; + int nlocked; + int nflushed; /* Read the LSN while holding buffer header lock */ buf_state = LockBufHdr(buf_hdr); @@ -1989,14 +1994,87 @@ again: UnpinBuffer(buf_hdr); goto again; } - } - /* OK, do the I/O */ - FlushBuffer(buf_hdr, NULL, IOOBJECT_RELATION, io_context); - LWLockRelease(content_lock); + /* + * See if any future buffers could be written out at the same + * time. They must be valid, dirty, unpinned, low usage count and + * consecutive, and we must be able to acquire the content lock + * without waiting to avoid deadlocks. + */ + nbuffers = StrategyPeek(strategy, buffers, io_combine_limit); + Assert(nbuffers >= 1); + Assert(buffers[0] == buf); + hdrs[0] = buf_hdr; + nlocked = 1; + while (nlocked < nbuffers) + { + BufferDesc *hdr; + uint32 state; + + ReservePrivateRefCountEntry(); + ResourceOwnerEnlarge(CurrentResourceOwner); + hdr = GetBufferDescriptor(buffers[nlocked] - 1); + state = LockBufHdr(hdr); + if ((state & (BM_DIRTY | BM_VALID)) == (BM_DIRTY | BM_VALID) && + BUF_STATE_GET_REFCOUNT(state) == 0 && + BUF_STATE_GET_USAGECOUNT(state) <= 1) + { + PinBuffer_Locked(hdr); + if (BufferTagsConsecutive(&hdrs[nlocked - 1]->tag, + &hdr->tag) && + LWLockConditionalAcquire(BufferDescriptorGetContentLock(hdr), + LW_SHARED)) + { + /* Yes, we can extend the range by one. */ + hdrs[nlocked++] = hdr; + } + else + { + UnpinBuffer(hdr); + break; + } + } + else + { + UnlockBufHdr(hdr, state); + break; + } + } - ScheduleBufferTagForWriteback(&BackendWritebackContext, io_context, - &buf_hdr->tag, 1); + /* + * Flush as many of these as we can. This may stop partway + * through, if it can't get an I/O lock, but it always flushes at + * least one. Since we're only trying to be helpful by doing more + * than one, no need to loop for the rest. + */ + nflushed = FlushBuffers(hdrs, nlocked, NULL, + IOOBJECT_RELATION, io_context, + NULL, NULL); + Assert(nflushed >= 1); + + /* Unlock in reverse order because it goes faster. */ + for (int i = nlocked - 1; i >= 0; --i) + LWLockRelease(BufferDescriptorGetContentLock(hdrs[i])); + + ScheduleBufferTagForWriteback(&BackendWritebackContext, io_context, + &buf_hdr->tag, nflushed); + + /* Unpin all but the first. */ + for (int i = 1; i < nlocked; ++i) + UnpinBuffer(hdrs[i]); + } + else + { + /* + * Do single block I/O. There is no reason to expect buffers that + * don't come from a ring to be consecutive. + */ + FlushBuffer(buf_hdr, NULL, IOOBJECT_RELATION, io_context); + LWLockRelease(content_lock); + + ScheduleBufferTagForWriteback(&BackendWritebackContext, io_context, + &buf_hdr->tag, 1); + } } diff --git a/src/backend/storage/buffer/freelist.c b/src/backend/storage/buffer/freelist.c index 3611357fa30..6a3e548e7ca 100644 --- a/src/backend/storage/buffer/freelist.c +++ b/src/backend/storage/buffer/freelist.c @@ -772,3 +772,26 @@ StrategyRejectBuffer(BufferAccessStrategy strategy, BufferDesc *buf, bool from_r return true; } + +/* + * StrategyPeek -- which buffers will be returned by future calls to + * GetBufferFromRing()? The 'current' buffer, ie most recently returned by + * GetBufferFromRing() will be in position 0. + */ +int +StrategyPeek(BufferAccessStrategy strategy, Buffer *buffers, int nbuffers) +{ + int cursor = strategy->current; + int n = 0; + + while (strategy->buffers[cursor] != InvalidBuffer && n < nbuffers) + { + buffers[n++] = strategy->buffers[cursor++]; + if (cursor == strategy->nbuffers) + cursor = 0; + if (cursor == strategy->current) + break; + } + + return n; +} diff --git a/src/include/storage/buf_internals.h b/src/include/storage/buf_internals.h index 93822dd8407..588864e156c 100644 --- a/src/include/storage/buf_internals.h +++ b/src/include/storage/buf_internals.h @@ -438,6 +438,7 @@ extern BufferDesc *StrategyGetBuffer(BufferAccessStrategy strategy, extern void StrategyFreeBuffer(BufferDesc *buf); extern bool StrategyRejectBuffer(BufferAccessStrategy strategy, BufferDesc *buf, bool from_ring); +extern int StrategyPeek(BufferAccessStrategy strategy, Buffer *buffers, int nbuffers); extern int StrategySyncStart(uint32 *complete_passes, uint32 *num_buf_alloc); extern void StrategyNotifyBgWriter(int bgwprocno); -- 2.44.0