On Fri, 19 Jun 2020 at 11:34, David Rowley <dgrowle...@gmail.com> wrote: > > On Fri, 19 Jun 2020 at 03:26, Robert Haas <robertmh...@gmail.com> wrote: > > > > On Thu, Jun 18, 2020 at 6:15 AM David Rowley <dgrowle...@gmail.com> wrote: > > > With a 32TB relation, the code will make the chunk size 16GB. Perhaps > > > I should change the code to cap that at 1GB. > > > > It seems pretty hard to believe there's any significant advantage to a > > chunk size >1GB, so I would be in favor of that change. > > I could certainly make that change. With the standard page size, 1GB > is 131072 pages and a power of 2. That would change for non-standard > page sizes, so we'd need to decide if we want to keep the chunk size a > power of 2, or just cap it exactly at whatever number of pages 1GB is. > > I'm not sure how much of a difference it'll make, but I also just want > to note that synchronous scans can mean we'll start the scan anywhere > within the table, so capping to 1GB does not mean we read an entire > extent. It's more likely to span 2 extents.
Here's a patch which caps the maximum chunk size to 131072. If someone doubles the page size then that'll be 2GB instead of 1GB. I'm not personally worried about that. I tested the performance on a Windows 10 laptop using the test case from [1] Master: workers=0: Time: 141175.935 ms (02:21.176) workers=1: Time: 316854.538 ms (05:16.855) workers=2: Time: 323471.791 ms (05:23.472) workers=3: Time: 321637.945 ms (05:21.638) workers=4: Time: 308689.599 ms (05:08.690) workers=5: Time: 289014.709 ms (04:49.015) workers=6: Time: 267785.270 ms (04:27.785) workers=7: Time: 248735.817 ms (04:08.736) Patched: workers=0: Time: 155985.204 ms (02:35.985) workers=1: Time: 112238.741 ms (01:52.239) workers=2: Time: 105861.813 ms (01:45.862) workers=3: Time: 91874.311 ms (01:31.874) workers=4: Time: 92538.646 ms (01:32.539) workers=5: Time: 93012.902 ms (01:33.013) workers=6: Time: 94269.076 ms (01:34.269) workers=7: Time: 90858.458 ms (01:30.858) David [1] https://www.postgresql.org/message-id/CAApHDvrfJfYH51_WY-iQqPw8yGR4fDoTxAQKqn%2BSa7NTKEVWtg%40mail.gmail.com
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 537913d1bb..5792040bc1 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -520,12 +520,14 @@ heapgettup(HeapScanDesc scan, { ParallelBlockTableScanDesc pbscan = (ParallelBlockTableScanDesc) scan->rs_base.rs_parallel; + ParallelBlockTableScanWork pbscanwork = + (ParallelBlockTableScanWork) scan->rs_base.rs_parallel_work; table_block_parallelscan_startblock_init(scan->rs_base.rs_rd, - pbscan); + pbscanwork, pbscan); page = table_block_parallelscan_nextpage(scan->rs_base.rs_rd, - pbscan); + pbscanwork, pbscan); /* Other processes might have already finished the scan. */ if (page == InvalidBlockNumber) @@ -720,9 +722,11 @@ heapgettup(HeapScanDesc scan, { ParallelBlockTableScanDesc pbscan = (ParallelBlockTableScanDesc) scan->rs_base.rs_parallel; + ParallelBlockTableScanWork pbscanwork = + (ParallelBlockTableScanWork) scan->rs_base.rs_parallel_work; page = table_block_parallelscan_nextpage(scan->rs_base.rs_rd, - pbscan); + pbscanwork, pbscan); finished = (page == InvalidBlockNumber); } else @@ -834,12 +838,14 @@ heapgettup_pagemode(HeapScanDesc scan, { ParallelBlockTableScanDesc pbscan = (ParallelBlockTableScanDesc) scan->rs_base.rs_parallel; + ParallelBlockTableScanWork pbscanwork = + (ParallelBlockTableScanWork) scan->rs_base.rs_parallel_work; table_block_parallelscan_startblock_init(scan->rs_base.rs_rd, - pbscan); + pbscanwork, pbscan); page = table_block_parallelscan_nextpage(scan->rs_base.rs_rd, - pbscan); + pbscanwork, pbscan); /* Other processes might have already finished the scan. */ if (page == InvalidBlockNumber) @@ -1019,9 +1025,11 @@ heapgettup_pagemode(HeapScanDesc scan, { ParallelBlockTableScanDesc pbscan = (ParallelBlockTableScanDesc) scan->rs_base.rs_parallel; + ParallelBlockTableScanWork pbscanwork = + (ParallelBlockTableScanWork) scan->rs_base.rs_parallel_work; page = table_block_parallelscan_nextpage(scan->rs_base.rs_rd, - pbscan); + pbscanwork, pbscan); finished = (page == InvalidBlockNumber); } else @@ -1155,6 +1163,8 @@ heap_beginscan(Relation relation, Snapshot snapshot, scan->rs_base.rs_nkeys = nkeys; scan->rs_base.rs_flags = flags; scan->rs_base.rs_parallel = parallel_scan; + scan->rs_base.rs_parallel_work = + palloc0(sizeof(ParallelBlockTableScanWorkData)); scan->rs_strategy = NULL; /* set in initscan */ /* diff --git a/src/backend/access/table/tableam.c b/src/backend/access/table/tableam.c index c814733b22..c2ee42e795 100644 --- a/src/backend/access/table/tableam.c +++ b/src/backend/access/table/tableam.c @@ -25,10 +25,17 @@ #include "access/tableam.h" #include "access/xact.h" #include "optimizer/plancat.h" +#include "port/pg_bitutils.h" #include "storage/bufmgr.h" #include "storage/shmem.h" #include "storage/smgr.h" +/* The number of I/O chunks we try to break a parallel seqscan down into */ +#define PARALLEL_SEQSCAN_NCHUNKS 2048 +/* Ramp down size of allocations when we've only this number of chunks left */ +#define PARALLEL_SEQSCAN_RAMPDOWN_CHUNKS 64 +/* Cap the size of parallel I/O chunks to this number of blocks */ +#define PARALLEL_SEQSCAN_MAX_CHUNK_SIZE 131072 /* GUC variables */ char *default_table_access_method = DEFAULT_TABLE_ACCESS_METHOD; @@ -404,10 +411,25 @@ table_block_parallelscan_reinitialize(Relation rel, ParallelTableScanDesc pscan) * to set the startblock once. */ void -table_block_parallelscan_startblock_init(Relation rel, ParallelBlockTableScanDesc pbscan) +table_block_parallelscan_startblock_init(Relation rel, + ParallelBlockTableScanWork workspace, + ParallelBlockTableScanDesc pbscan) { BlockNumber sync_startpage = InvalidBlockNumber; + /* Reset the state we use for controlling allocation size. */ + memset(workspace, 0, sizeof(*workspace)); + + StaticAssertStmt(MaxBlockNumber <= 0xFFFFFFFE, + "pg_nextpower2_32 may be too small for non-standard BlockNumber width"); + + workspace->phsw_chunk_size = pg_nextpower2_32(Max(pbscan->phs_nblocks / + PARALLEL_SEQSCAN_NCHUNKS, 1)); + + /* Ensure we don't go over the maximum size with larger rels */ + workspace->phsw_chunk_size = Min(workspace->phsw_chunk_size, + PARALLEL_SEQSCAN_MAX_CHUNK_SIZE); + retry: /* Grab the spinlock. */ SpinLockAcquire(&pbscan->phs_mutex); @@ -447,12 +469,35 @@ retry: * backend gets an InvalidBlockNumber return. */ BlockNumber -table_block_parallelscan_nextpage(Relation rel, ParallelBlockTableScanDesc pbscan) +table_block_parallelscan_nextpage(Relation rel, + ParallelBlockTableScanWork workspace, + ParallelBlockTableScanDesc pbscan) { BlockNumber page; uint64 nallocated; /* + * The logic below allocates block numbers out to parallel workers in a + * way that each worker will receive a set of consecutive block numbers to + * scan. Earlier versions of this would allocate the next highest block + * number to the next worker to call this function. This would generally + * result in the scan being divided up in stripes of 1 block in size. + * Some operating systems would not detect the sequential I/O pattern due + * to each backend being a different process which could result in poor + * performance due to inefficient or no readahead. To work around this + * issue, we now allocate a range of block numbers for each worker and + * when they come back for another block, we give them the next one in + * that range until the range is complete, we then allocate them another + * range of blocks to scan and return the first block number from that + * range. + * + * Here we name these ranges of blocks "chunks". The initial size of + * these chunks is determined in table_block_parallelscan_startblock_init + * based on the size of the relation. Towards the end of the scan, we + * start making reductions in the size of the chunks in order to attempt + * to divide the remaining work evenly over all workers as evenly as + * possible. + * * phs_nallocated tracks how many pages have been allocated to workers * already. When phs_nallocated >= rs_nblocks, all blocks have been * allocated. @@ -467,7 +512,48 @@ table_block_parallelscan_nextpage(Relation rel, ParallelBlockTableScanDesc pbsca * The actual page to return is calculated by adding the counter to the * starting block number, modulo nblocks. */ - nallocated = pg_atomic_fetch_add_u64(&pbscan->phs_nallocated, 1); + + /* + * First check if we have any remaining blocks in a previous chunk for + * this worker. We must consume all of the blocks from that before we + * allocate another chunk for the worker. + */ + if (workspace->phsw_alloc_remaining > 0) + { + /* + * Give them the next page in the range and update the remaining pages + */ + nallocated = ++workspace->phsw_nallocated; + workspace->phsw_alloc_remaining--; + } + else + { + /* + * When we've only got PARALLEL_SEQSCAN_RAMPDOWN_CHUNKS chunks + * remaining in the scan, we half the chunk size. Since we reduce + * the chunk size here, we'll hit this again after doing + * PARALLEL_SEQSCAN_RAMPDOWN_CHUNKS at the new size. After a few + * iterations of this, we'll end up doing the last few blocks with the + * chunk size set to 1. + */ + if (workspace->phsw_chunk_size > 1 && + workspace->phsw_nallocated > pbscan->phs_nblocks - + (workspace->phsw_chunk_size * PARALLEL_SEQSCAN_RAMPDOWN_CHUNKS)) + workspace->phsw_chunk_size >>= 1; + + nallocated = workspace->phsw_nallocated = + pg_atomic_fetch_add_u64(&pbscan->phs_nallocated, + workspace->phsw_chunk_size); + +#ifdef TBPN_DEBUG + elog(NOTICE, "chunksize = %u, allocated = %u", workspace->phsw_chunk_size, nallocated); +#endif + /* + * Set the remaining number of pages in this chunk so that subsequent + * calls from this worker continue on with this chunk until it's done. + */ + workspace->phsw_alloc_remaining = workspace->phsw_chunk_size - 1; + } if (nallocated >= pbscan->phs_nblocks) page = InvalidBlockNumber; /* all blocks have been allocated */ else diff --git a/src/include/access/relscan.h b/src/include/access/relscan.h index 6f0258831f..78d563aab1 100644 --- a/src/include/access/relscan.h +++ b/src/include/access/relscan.h @@ -42,9 +42,9 @@ typedef struct TableScanDescData */ uint32 rs_flags; + void *rs_parallel_work; struct ParallelTableScanDescData *rs_parallel; /* parallel scan * information */ - } TableScanDescData; typedef struct TableScanDescData *TableScanDesc; @@ -81,6 +81,18 @@ typedef struct ParallelBlockTableScanDescData } ParallelBlockTableScanDescData; typedef struct ParallelBlockTableScanDescData *ParallelBlockTableScanDesc; +/* + * Per backend state for parallel table sacan, for block oriented storage. + */ +typedef struct ParallelBlockTableScanWorkData +{ + uint64 phsw_nallocated; /* Current # of pages into the scan */ + uint32 phsw_alloc_remaining; /* Pages left for this allocation */ + uint32 phsw_chunk_size; /* The number of pages to take in each + * I/O chunk for the scan */ +} ParallelBlockTableScanWorkData; +typedef struct ParallelBlockTableScanWorkData *ParallelBlockTableScanWork; + /* * Base class for fetches from a table via an index. This is the base-class * for such scans, which needs to be embedded in the respective struct for diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h index eb18739c36..f0cefd7e26 100644 --- a/src/include/access/tableam.h +++ b/src/include/access/tableam.h @@ -1790,8 +1790,10 @@ extern Size table_block_parallelscan_initialize(Relation rel, extern void table_block_parallelscan_reinitialize(Relation rel, ParallelTableScanDesc pscan); extern BlockNumber table_block_parallelscan_nextpage(Relation rel, + ParallelBlockTableScanWork pbscanwork, ParallelBlockTableScanDesc pbscan); extern void table_block_parallelscan_startblock_init(Relation rel, + ParallelBlockTableScanWork pbscanwork, ParallelBlockTableScanDesc pbscan);