Hi, On Sat, Dec 27, 2025 at 12:41 AM Xuneng Zhou <[email protected]> wrote: > > Hi Bilal, > > Thanks for your review! > > On Fri, Dec 26, 2025 at 6:59 PM Nazir Bilal Yavuz <[email protected]> wrote: > > > > Hi, > > > > Thank you for working on this! > > > > On Thu, 25 Dec 2025 at 09:34, Xuneng Zhou <[email protected]> wrote: > > > > > > Hi, > > > > > > On Thu, Dec 25, 2025 at 1:51 PM Xuneng Zhou <[email protected]> wrote: > > > > > > > > Hi Hackers, > > > > > > > > I noticed several additional paths in contrib modules, beyond [1], > > > > that are potentially suitable for streamification: > > > > > > > > 1) pgstattuple — pgstatapprox.c and parts of pgstattuple_approx_internal > > > > 2) Bloom — scan paths in blgetbitmap() and maintenance paths in > > > > blbulkdelete() > > > > > > > > The following patches streamify those code paths. No benchmarks have > > > > been run yet. > > > > > > > > [1] > > > > https://www.postgresql.org/message-id/flat/CABPTF7UeN2o-trr9r7K76rZExnO2M4SLfvTfbUY2CwQjCekgnQ%40mail.gmail.com > > > > > > > > Feedbacks welcome. > > > > > > > > > > One more in ginvacuumcleanup(). > > > > 0001, 0002 and 0004 LGTM. > > > > 0003: > > > > + buf = read_stream_next_buffer(stream, NULL); > > + if (buf == InvalidBuffer) > > + break; > > > > I think we are loosening the check here. We were sure that there were > > no InvalidBuffers until the nblocks. Streamified version does not have > > this check, it exits from the loop the first time it sees an > > InvalidBuffer, which may be wrong. You might want to add > > 'Assert(p.current_blocknum == nblocks);' before read_stream_end() to > > have a similar check. > > > > Agree. The check has been added in v2 per your suggestion. >
Two more to go: patch 5: Streamify log_newpage_range() WAL logging path patch 6: Streamify hash index VACUUM primary bucket page reads Benchmarks will be conducted soon. -- Best, Xuneng
From aafdfa0851ab61777e8877cfb50e3b34bf54b9b1 Mon Sep 17 00:00:00 2001 From: alterego655 <[email protected]> Date: Sat, 27 Dec 2025 00:28:49 +0800 Subject: [PATCH v2 2/4] Streamify Bloom VACUUM paths. Use streaming reads in blbulkdelete() and blvacuumcleanup() to iterate index pages without repeated ReadBuffer calls, improving VACUUM performance and reducing buffer manager overhead during maintenance operations. --- contrib/bloom/blvacuum.c | 55 +++++++++++++++++++++++++++++++++++++--- 1 file changed, 51 insertions(+), 4 deletions(-) diff --git a/contrib/bloom/blvacuum.c b/contrib/bloom/blvacuum.c index e68a9008f56..7452302f022 100644 --- a/contrib/bloom/blvacuum.c +++ b/contrib/bloom/blvacuum.c @@ -17,6 +17,7 @@ #include "commands/vacuum.h" #include "storage/bufmgr.h" #include "storage/indexfsm.h" +#include "storage/read_stream.h" /* @@ -40,6 +41,8 @@ blbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats, Page page; BloomMetaPageData *metaData; GenericXLogState *gxlogState; + BlockRangeReadStreamPrivate p; + ReadStream *stream; if (stats == NULL) stats = palloc0_object(IndexBulkDeleteResult); @@ -51,6 +54,25 @@ blbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats, * they can't contain tuples to delete. */ npages = RelationGetNumberOfBlocks(index); + + /* Scan all blocks except the metapage using streaming reads */ + p.current_blocknum = BLOOM_HEAD_BLKNO; + p.last_exclusive = npages; + + /* + * It is safe to use batchmode as block_range_read_stream_cb takes no + * locks. + */ + stream = read_stream_begin_relation(READ_STREAM_MAINTENANCE | + READ_STREAM_FULL | + READ_STREAM_USE_BATCHING, + info->strategy, + index, + MAIN_FORKNUM, + block_range_read_stream_cb, + &p, + 0); + for (blkno = BLOOM_HEAD_BLKNO; blkno < npages; blkno++) { BloomTuple *itup, @@ -59,8 +81,7 @@ blbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats, vacuum_delay_point(false); - buffer = ReadBufferExtended(index, MAIN_FORKNUM, blkno, - RBM_NORMAL, info->strategy); + buffer = read_stream_next_buffer(stream, NULL); LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); gxlogState = GenericXLogStart(index); @@ -133,6 +154,9 @@ blbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats, UnlockReleaseBuffer(buffer); } + Assert(read_stream_next_buffer(stream, NULL) == InvalidBuffer); + read_stream_end(stream); + /* * Update the metapage's notFullPage list with whatever we found. Our * info could already be out of date at this point, but blinsert() will @@ -166,6 +190,8 @@ blvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats) Relation index = info->index; BlockNumber npages, blkno; + BlockRangeReadStreamPrivate p; + ReadStream *stream; if (info->analyze_only) return stats; @@ -181,6 +207,25 @@ blvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats) stats->num_pages = npages; stats->pages_free = 0; stats->num_index_tuples = 0; + + /* Scan all blocks except the metapage using streaming reads */ + p.current_blocknum = BLOOM_HEAD_BLKNO; + p.last_exclusive = npages; + + /* + * It is safe to use batchmode as block_range_read_stream_cb takes no + * locks. + */ + stream = read_stream_begin_relation(READ_STREAM_MAINTENANCE | + READ_STREAM_FULL | + READ_STREAM_USE_BATCHING, + info->strategy, + index, + MAIN_FORKNUM, + block_range_read_stream_cb, + &p, + 0); + for (blkno = BLOOM_HEAD_BLKNO; blkno < npages; blkno++) { Buffer buffer; @@ -188,8 +233,7 @@ blvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats) vacuum_delay_point(false); - buffer = ReadBufferExtended(index, MAIN_FORKNUM, blkno, - RBM_NORMAL, info->strategy); + buffer = read_stream_next_buffer(stream, NULL); LockBuffer(buffer, BUFFER_LOCK_SHARE); page = BufferGetPage(buffer); @@ -206,6 +250,9 @@ blvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats) UnlockReleaseBuffer(buffer); } + Assert(read_stream_next_buffer(stream, NULL) == InvalidBuffer); + read_stream_end(stream); + IndexFreeSpaceMapVacuum(info->index); return stats; -- 2.51.0
From d628f3a5cab752d57332865c323479795629fb28 Mon Sep 17 00:00:00 2001 From: alterego655 <[email protected]> Date: Sat, 27 Dec 2025 00:29:09 +0800 Subject: [PATCH v2 4/4] Replace synchronous ReadBufferExtended() loop with the streaming read in ginvacuumcleanup() to improve I/O efficiency during GIN index vacuum cleanup operations --- src/backend/access/gin/ginvacuum.c | 28 ++++++++++++++++++++++++++-- 1 file changed, 26 insertions(+), 2 deletions(-) diff --git a/src/backend/access/gin/ginvacuum.c b/src/backend/access/gin/ginvacuum.c index d7baf7c847c..58e05c71256 100644 --- a/src/backend/access/gin/ginvacuum.c +++ b/src/backend/access/gin/ginvacuum.c @@ -22,6 +22,7 @@ #include "storage/indexfsm.h" #include "storage/lmgr.h" #include "storage/predicate.h" +#include "storage/read_stream.h" #include "utils/memutils.h" struct GinVacuumState @@ -693,6 +694,8 @@ ginvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats) BlockNumber totFreePages; GinState ginstate; GinStatsData idxStat; + BlockRangeReadStreamPrivate p; + ReadStream *stream; /* * In an autovacuum analyze, we want to clean up pending insertions. @@ -743,6 +746,24 @@ ginvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats) totFreePages = 0; + /* Scan all blocks starting from the root using streaming reads */ + p.current_blocknum = GIN_ROOT_BLKNO; + p.last_exclusive = npages; + + /* + * It is safe to use batchmode as block_range_read_stream_cb takes no + * locks. + */ + stream = read_stream_begin_relation(READ_STREAM_MAINTENANCE | + READ_STREAM_FULL | + READ_STREAM_USE_BATCHING, + info->strategy, + index, + MAIN_FORKNUM, + block_range_read_stream_cb, + &p, + 0); + for (blkno = GIN_ROOT_BLKNO; blkno < npages; blkno++) { Buffer buffer; @@ -750,8 +771,8 @@ ginvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats) vacuum_delay_point(false); - buffer = ReadBufferExtended(index, MAIN_FORKNUM, blkno, - RBM_NORMAL, info->strategy); + buffer = read_stream_next_buffer(stream, NULL); + LockBuffer(buffer, GIN_SHARE); page = BufferGetPage(buffer); @@ -776,6 +797,9 @@ ginvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats) UnlockReleaseBuffer(buffer); } + Assert(read_stream_next_buffer(stream, NULL) == InvalidBuffer); + read_stream_end(stream); + /* Update the metapage with accurate page and entry counts */ idxStat.nTotalPages = npages; ginUpdateStats(info->index, &idxStat, false); -- 2.51.0
From 314f9cdf6d8a62cc8523b377b8cf19df646b9913 Mon Sep 17 00:00:00 2001 From: alterego655 <[email protected]> Date: Sat, 27 Dec 2025 00:28:10 +0800 Subject: [PATCH v2 1/4] Switch Bloom scan paths to streaming read. Replace per-page ReadBuffer loops in blgetbitmap() with read_stream_begin_relation() and sequential buffer iteration, reducing buffer churn and improving scan efficiency on large Bloom indexes. --- contrib/bloom/blscan.c | 29 +++++++++++++++++++++++++---- 1 file changed, 25 insertions(+), 4 deletions(-) diff --git a/contrib/bloom/blscan.c b/contrib/bloom/blscan.c index 0d71edbe91c..b1fdabaab74 100644 --- a/contrib/bloom/blscan.c +++ b/contrib/bloom/blscan.c @@ -17,6 +17,7 @@ #include "miscadmin.h" #include "pgstat.h" #include "storage/bufmgr.h" +#include "storage/read_stream.h" /* * Begin scan of bloom index. @@ -75,11 +76,13 @@ int64 blgetbitmap(IndexScanDesc scan, TIDBitmap *tbm) { int64 ntids = 0; - BlockNumber blkno = BLOOM_HEAD_BLKNO, + BlockNumber blkno, npages; int i; BufferAccessStrategy bas; BloomScanOpaque so = (BloomScanOpaque) scan->opaque; + BlockRangeReadStreamPrivate p; + ReadStream *stream; if (so->sign == NULL) { @@ -119,14 +122,29 @@ blgetbitmap(IndexScanDesc scan, TIDBitmap *tbm) if (scan->instrument) scan->instrument->nsearches++; + /* Scan all blocks except the metapage using streaming reads */ + p.current_blocknum = BLOOM_HEAD_BLKNO; + p.last_exclusive = npages; + + /* + * It is safe to use batchmode as block_range_read_stream_cb takes no + * locks. + */ + stream = read_stream_begin_relation(READ_STREAM_FULL | + READ_STREAM_USE_BATCHING, + bas, + scan->indexRelation, + MAIN_FORKNUM, + block_range_read_stream_cb, + &p, + 0); + for (blkno = BLOOM_HEAD_BLKNO; blkno < npages; blkno++) { Buffer buffer; Page page; - buffer = ReadBufferExtended(scan->indexRelation, MAIN_FORKNUM, - blkno, RBM_NORMAL, bas); - + buffer = read_stream_next_buffer(stream, NULL); LockBuffer(buffer, BUFFER_LOCK_SHARE); page = BufferGetPage(buffer); @@ -162,6 +180,9 @@ blgetbitmap(IndexScanDesc scan, TIDBitmap *tbm) UnlockReleaseBuffer(buffer); CHECK_FOR_INTERRUPTS(); } + + Assert(read_stream_next_buffer(stream, NULL) == InvalidBuffer); + read_stream_end(stream); FreeAccessStrategy(bas); return ntids; -- 2.51.0
From 7a3fcec0cf44cd6d9848c930e0d7f620cd193698 Mon Sep 17 00:00:00 2001 From: alterego655 <[email protected]> Date: Sat, 27 Dec 2025 00:29:02 +0800 Subject: [PATCH v2 3/4] Streamify heap bloat estimation scan. Introduce a read-stream callback to skip all-visible pages via VM/FSM lookup and stream-read the rest, reducing page reads and improving pgstattuple_approx execution time on large relations. --- contrib/pgstattuple/pgstatapprox.c | 126 ++++++++++++++++++++++------- 1 file changed, 95 insertions(+), 31 deletions(-) diff --git a/contrib/pgstattuple/pgstatapprox.c b/contrib/pgstattuple/pgstatapprox.c index a59ff4e9d4f..cb05b530ca8 100644 --- a/contrib/pgstattuple/pgstatapprox.c +++ b/contrib/pgstattuple/pgstatapprox.c @@ -23,6 +23,7 @@ #include "storage/bufmgr.h" #include "storage/freespace.h" #include "storage/procarray.h" +#include "storage/read_stream.h" PG_FUNCTION_INFO_V1(pgstattuple_approx); PG_FUNCTION_INFO_V1(pgstattuple_approx_v1_5); @@ -45,6 +46,61 @@ typedef struct output_type #define NUM_OUTPUT_COLUMNS 10 +/* + * Struct for statapprox_heap read stream callback. + */ +typedef struct StatApproxReadStreamPrivate +{ + Relation rel; + output_type *stat; + BlockNumber current_blocknum; + BlockNumber nblocks; + BlockNumber scanned; /* count of pages actually read */ + Buffer vmbuffer; /* for VM lookups */ +} StatApproxReadStreamPrivate; + +/* + * Read stream callback for statapprox_heap. + * + * This callback checks the visibility map for each block. If the block is + * all-visible, we can get the free space from the FSM without reading the + * actual page, and skip to the next block. Only blocks that are not + * all-visible are returned for actual reading. + */ +static BlockNumber +statapprox_heap_read_stream_next(ReadStream *stream, + void *callback_private_data, + void *per_buffer_data) +{ + StatApproxReadStreamPrivate *p = callback_private_data; + + while (p->current_blocknum < p->nblocks) + { + BlockNumber blkno = p->current_blocknum++; + Size freespace; + + CHECK_FOR_INTERRUPTS(); + + /* + * If the page has only visible tuples, then we can find out the free + * space from the FSM and move on without reading the page. + */ + if (VM_ALL_VISIBLE(p->rel, blkno, &p->vmbuffer)) + { + freespace = GetRecordedFreeSpace(p->rel, blkno); + p->stat->tuple_len += BLCKSZ - freespace; + p->stat->free_space += freespace; + continue; + } + + /* This block needs to be read */ + p->scanned++; + return blkno; + } + + return InvalidBlockNumber; +} + /* * This function takes an already open relation and scans its pages, * skipping those that have the corresponding visibility map bit set. @@ -58,53 +114,58 @@ typedef struct output_type static void statapprox_heap(Relation rel, output_type *stat) { - BlockNumber scanned, - nblocks, - blkno; - Buffer vmbuffer = InvalidBuffer; + BlockNumber nblocks; BufferAccessStrategy bstrategy; TransactionId OldestXmin; + StatApproxReadStreamPrivate p; + ReadStream *stream; OldestXmin = GetOldestNonRemovableTransactionId(rel); bstrategy = GetAccessStrategy(BAS_BULKREAD); nblocks = RelationGetNumberOfBlocks(rel); - scanned = 0; - for (blkno = 0; blkno < nblocks; blkno++) + /* Initialize read stream private data */ + p.rel = rel; + p.stat = stat; + p.current_blocknum = 0; + p.nblocks = nblocks; + p.scanned = 0; + p.vmbuffer = InvalidBuffer; + + /* + * Create the read stream. We don't use READ_STREAM_USE_BATCHING because + * the callback accesses the visibility map which may need to read VM + * pages. While this shouldn't cause deadlocks, we err on the side of + * caution. + */ + stream = read_stream_begin_relation(READ_STREAM_FULL, + bstrategy, + rel, + MAIN_FORKNUM, + statapprox_heap_read_stream_next, + &p, + 0); + + for (;;) { Buffer buf; Page page; OffsetNumber offnum, maxoff; - Size freespace; - - CHECK_FOR_INTERRUPTS(); - - /* - * If the page has only visible tuples, then we can find out the free - * space from the FSM and move on. - */ - if (VM_ALL_VISIBLE(rel, blkno, &vmbuffer)) - { - freespace = GetRecordedFreeSpace(rel, blkno); - stat->tuple_len += BLCKSZ - freespace; - stat->free_space += freespace; - continue; - } + BlockNumber blkno; - buf = ReadBufferExtended(rel, MAIN_FORKNUM, blkno, - RBM_NORMAL, bstrategy); + buf = read_stream_next_buffer(stream, NULL); + if (buf == InvalidBuffer) + break; LockBuffer(buf, BUFFER_LOCK_SHARE); page = BufferGetPage(buf); + blkno = BufferGetBlockNumber(buf); stat->free_space += PageGetExactFreeSpace(page); - /* We may count the page as scanned even if it's new/empty */ - scanned++; - if (PageIsNew(page) || PageIsEmpty(page)) { UnlockReleaseBuffer(buf); @@ -169,6 +230,9 @@ statapprox_heap(Relation rel, output_type *stat) UnlockReleaseBuffer(buf); } + Assert(p.current_blocknum == nblocks); + read_stream_end(stream); + stat->table_len = (uint64) nblocks * BLCKSZ; /* @@ -179,7 +243,7 @@ statapprox_heap(Relation rel, output_type *stat) * tuples in all-visible pages, so no correction is needed for that, and * we already accounted for the space in those pages, too. */ - stat->tuple_count = vac_estimate_reltuples(rel, nblocks, scanned, + stat->tuple_count = vac_estimate_reltuples(rel, nblocks, p.scanned, stat->tuple_count); /* It's not clear if we could get -1 here, but be safe. */ @@ -190,16 +254,16 @@ statapprox_heap(Relation rel, output_type *stat) */ if (nblocks != 0) { - stat->scanned_percent = 100.0 * scanned / nblocks; + stat->scanned_percent = 100.0 * p.scanned / nblocks; stat->tuple_percent = 100.0 * stat->tuple_len / stat->table_len; stat->dead_tuple_percent = 100.0 * stat->dead_tuple_len / stat->table_len; stat->free_percent = 100.0 * stat->free_space / stat->table_len; } - if (BufferIsValid(vmbuffer)) + if (BufferIsValid(p.vmbuffer)) { - ReleaseBuffer(vmbuffer); - vmbuffer = InvalidBuffer; + ReleaseBuffer(p.vmbuffer); + p.vmbuffer = InvalidBuffer; } } -- 2.51.0
From d719cbc20c28634f07106e89b5de0c89cf098a8e Mon Sep 17 00:00:00 2001 From: alterego655 <[email protected]> Date: Sun, 28 Dec 2025 18:29:07 +0800 Subject: [PATCH v2 5/6] Streamify log_newpage_range() WAL logging path Refactor log_newpage_range() to use the Read Stream. This allows prefetching of upcoming relation blocks during bulk WAL logging perations, overlapping I/O with CPU-intensive XLogInsert and WAL-writing work. --- src/backend/access/transam/xloginsert.c | 28 +++++++++++++++++++++++-- 1 file changed, 26 insertions(+), 2 deletions(-) diff --git a/src/backend/access/transam/xloginsert.c b/src/backend/access/transam/xloginsert.c index a56d5a55282..04c1a46143a 100644 --- a/src/backend/access/transam/xloginsert.c +++ b/src/backend/access/transam/xloginsert.c @@ -39,6 +39,7 @@ #include "replication/origin.h" #include "storage/bufmgr.h" #include "storage/proc.h" +#include "storage/read_stream.h" #include "utils/memutils.h" #include "utils/pgstat_internal.h" @@ -1295,6 +1296,8 @@ log_newpage_range(Relation rel, ForkNumber forknum, { int flags; BlockNumber blkno; + BlockRangeReadStreamPrivate p; + ReadStream *stream; flags = REGBUF_FORCE_IMAGE; if (page_std) @@ -1307,6 +1310,23 @@ log_newpage_range(Relation rel, ForkNumber forknum, */ XLogEnsureRecordSpace(XLR_MAX_BLOCK_ID - 1, 0); + /* Set up a streaming read for the range of blocks */ + p.current_blocknum = startblk; + p.last_exclusive = endblk; + + /* + * It is safe to use batchmode as block_range_read_stream_cb takes no + * locks. + */ + stream = read_stream_begin_relation(READ_STREAM_MAINTENANCE | + READ_STREAM_USE_BATCHING, + NULL, + rel, + forknum, + block_range_read_stream_cb, + &p, + 0); + blkno = startblk; while (blkno < endblk) { @@ -1321,8 +1341,10 @@ log_newpage_range(Relation rel, ForkNumber forknum, nbufs = 0; while (nbufs < XLR_MAX_BLOCK_ID && blkno < endblk) { - Buffer buf = ReadBufferExtended(rel, forknum, blkno, - RBM_NORMAL, NULL); + Buffer buf = read_stream_next_buffer(stream, NULL); + + if (!BufferIsValid(buf)) + break; LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE); @@ -1361,6 +1383,8 @@ log_newpage_range(Relation rel, ForkNumber forknum, } END_CRIT_SECTION(); } + + read_stream_end(stream); } /* -- 2.51.0
From 7b3018a09bfb26f8c6d1a413fd80c631231073f3 Mon Sep 17 00:00:00 2001 From: alterego655 <[email protected]> Date: Sun, 28 Dec 2025 18:29:28 +0800 Subject: [PATCH v2 6/6] Streamify hash index VACUUM primary bucket page reads Refactor hashbulkdelete() to use the Read Stream for primary bucket pages. This enables prefetching of upcoming buckets while the current one is being processed, improving I/O efficiency during hash index vacuum operations. --- src/backend/access/hash/hash.c | 77 +++++++++++++++++++++++++++++++++- 1 file changed, 76 insertions(+), 1 deletion(-) diff --git a/src/backend/access/hash/hash.c b/src/backend/access/hash/hash.c index e388252afdc..4000d5d8e99 100644 --- a/src/backend/access/hash/hash.c +++ b/src/backend/access/hash/hash.c @@ -30,6 +30,7 @@ #include "nodes/execnodes.h" #include "optimizer/plancat.h" #include "pgstat.h" +#include "storage/read_stream.h" #include "utils/fmgrprotos.h" #include "utils/index_selfuncs.h" #include "utils/rel.h" @@ -42,12 +43,23 @@ typedef struct Relation heapRel; /* heap relation descriptor */ } HashBuildState; +/* Working state for streaming reads in hashbulkdelete */ +typedef struct +{ + HashMetaPage metap; /* cached metapage for BUCKET_TO_BLKNO */ + Bucket next_bucket; /* next bucket to prefetch */ + Bucket max_bucket; /* stop when next_bucket > max_bucket */ +} HashBulkDeleteStreamPrivate; + static void hashbuildCallback(Relation index, ItemPointer tid, Datum *values, bool *isnull, bool tupleIsAlive, void *state); +static BlockNumber hash_bulkdelete_read_stream_cb(ReadStream *stream, + void *callback_private_data, + void *per_buffer_data); /* @@ -450,6 +462,25 @@ hashendscan(IndexScanDesc scan) scan->opaque = NULL; } +/* + * Read stream callback for hashbulkdelete. + * + * Returns the block number of the primary page for the next bucket to + * vacuum, using the BUCKET_TO_BLKNO mapping from the cached metapage. + */ +static BlockNumber +hash_bulkdelete_read_stream_cb(ReadStream *stream, + void *callback_private_data, + void *per_buffer_data) +{ + HashBulkDeleteStreamPrivate *p = callback_private_data; + + if (p->next_bucket > p->max_bucket) + return InvalidBlockNumber; + + return BUCKET_TO_BLKNO(p->metap, p->next_bucket++); +} + /* * Bulk deletion of all index entries pointing to a set of heap tuples. * The set of target tuples is specified via a callback routine that tells @@ -474,6 +505,8 @@ hashbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats, Buffer metabuf = InvalidBuffer; HashMetaPage metap; HashMetaPage cachedmetap; + HashBulkDeleteStreamPrivate stream_private; + ReadStream *stream = NULL; tuples_removed = 0; num_index_tuples = 0; @@ -495,6 +528,24 @@ hashbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats, cur_maxbucket = orig_maxbucket; loop_top: + /* Set up streaming read for primary bucket pages */ + stream_private.metap = cachedmetap; + stream_private.next_bucket = cur_bucket; + stream_private.max_bucket = cur_maxbucket; + + /* + * It is safe to use batchmode as hash_bulkdelete_read_stream_cb takes no + * locks. + */ + stream = read_stream_begin_relation(READ_STREAM_MAINTENANCE | + READ_STREAM_USE_BATCHING, + info->strategy, + rel, + MAIN_FORKNUM, + hash_bulkdelete_read_stream_cb, + &stream_private, + 0); + while (cur_bucket <= cur_maxbucket) { BlockNumber bucket_blkno; @@ -514,7 +565,8 @@ loop_top: * We need to acquire a cleanup lock on the primary bucket page to out * wait concurrent scans before deleting the dead tuples. */ - buf = ReadBufferExtended(rel, MAIN_FORKNUM, blkno, RBM_NORMAL, info->strategy); + buf = read_stream_next_buffer(stream, NULL); + Assert(BufferIsValid(buf)); LockBufferForCleanup(buf); _hash_checkpage(rel, buf, LH_BUCKET_PAGE); @@ -545,6 +597,24 @@ loop_top: { cachedmetap = _hash_getcachedmetap(rel, &metabuf, true); Assert(cachedmetap != NULL); + + /* + * Reset stream with updated metadata for remaining buckets. + * The BUCKET_TO_BLKNO mapping depends on hashm_spares[], + * which may have changed. + */ + read_stream_end(stream); + stream_private.metap = cachedmetap; + stream_private.next_bucket = cur_bucket + 1; + stream_private.max_bucket = cur_maxbucket; + stream = read_stream_begin_relation(READ_STREAM_MAINTENANCE | + READ_STREAM_USE_BATCHING, + info->strategy, + rel, + MAIN_FORKNUM, + hash_bulkdelete_read_stream_cb, + &stream_private, + 0); } } @@ -577,9 +647,14 @@ loop_top: cachedmetap = _hash_getcachedmetap(rel, &metabuf, true); Assert(cachedmetap != NULL); cur_maxbucket = cachedmetap->hashm_maxbucket; + read_stream_end(stream); goto loop_top; } + /* Stream should be exhausted since we processed all buckets */ + Assert(read_stream_next_buffer(stream, NULL) == InvalidBuffer); + read_stream_end(stream); + /* Okay, we're really done. Update tuple count in metapage. */ START_CRIT_SECTION(); -- 2.51.0
