On Fri, 19 Jun 2020 at 11:34, David Rowley <[email protected]> wrote:
>
> On Fri, 19 Jun 2020 at 03:26, Robert Haas <[email protected]> wrote:
> >
> > On Thu, Jun 18, 2020 at 6:15 AM David Rowley <[email protected]> wrote:
> > > With a 32TB relation, the code will make the chunk size 16GB. Perhaps
> > > I should change the code to cap that at 1GB.
> >
> > It seems pretty hard to believe there's any significant advantage to a
> > chunk size >1GB, so I would be in favor of that change.
>
> I could certainly make that change. With the standard page size, 1GB
> is 131072 pages and a power of 2. That would change for non-standard
> page sizes, so we'd need to decide if we want to keep the chunk size a
> power of 2, or just cap it exactly at whatever number of pages 1GB is.
>
> I'm not sure how much of a difference it'll make, but I also just want
> to note that synchronous scans can mean we'll start the scan anywhere
> within the table, so capping to 1GB does not mean we read an entire
> extent. It's more likely to span 2 extents.
Here's a patch which caps the maximum chunk size to 131072. If
someone doubles the page size then that'll be 2GB instead of 1GB. I'm
not personally worried about that.
I tested the performance on a Windows 10 laptop using the test case from [1]
Master:
workers=0: Time: 141175.935 ms (02:21.176)
workers=1: Time: 316854.538 ms (05:16.855)
workers=2: Time: 323471.791 ms (05:23.472)
workers=3: Time: 321637.945 ms (05:21.638)
workers=4: Time: 308689.599 ms (05:08.690)
workers=5: Time: 289014.709 ms (04:49.015)
workers=6: Time: 267785.270 ms (04:27.785)
workers=7: Time: 248735.817 ms (04:08.736)
Patched:
workers=0: Time: 155985.204 ms (02:35.985)
workers=1: Time: 112238.741 ms (01:52.239)
workers=2: Time: 105861.813 ms (01:45.862)
workers=3: Time: 91874.311 ms (01:31.874)
workers=4: Time: 92538.646 ms (01:32.539)
workers=5: Time: 93012.902 ms (01:33.013)
workers=6: Time: 94269.076 ms (01:34.269)
workers=7: Time: 90858.458 ms (01:30.858)
David
[1]
https://www.postgresql.org/message-id/CAApHDvrfJfYH51_WY-iQqPw8yGR4fDoTxAQKqn%2BSa7NTKEVWtg%40mail.gmail.com
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 537913d1bb..5792040bc1 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -520,12 +520,14 @@ heapgettup(HeapScanDesc scan,
{
ParallelBlockTableScanDesc pbscan =
(ParallelBlockTableScanDesc) scan->rs_base.rs_parallel;
+ ParallelBlockTableScanWork pbscanwork =
+ (ParallelBlockTableScanWork) scan->rs_base.rs_parallel_work;
table_block_parallelscan_startblock_init(scan->rs_base.rs_rd,
- pbscan);
+ pbscanwork, pbscan);
page = table_block_parallelscan_nextpage(scan->rs_base.rs_rd,
- pbscan);
+ pbscanwork, pbscan);
/* Other processes might have already finished the scan. */
if (page == InvalidBlockNumber)
@@ -720,9 +722,11 @@ heapgettup(HeapScanDesc scan,
{
ParallelBlockTableScanDesc pbscan =
(ParallelBlockTableScanDesc) scan->rs_base.rs_parallel;
+ ParallelBlockTableScanWork pbscanwork =
+ (ParallelBlockTableScanWork) scan->rs_base.rs_parallel_work;
page = table_block_parallelscan_nextpage(scan->rs_base.rs_rd,
- pbscan);
+ pbscanwork, pbscan);
finished = (page == InvalidBlockNumber);
}
else
@@ -834,12 +838,14 @@ heapgettup_pagemode(HeapScanDesc scan,
{
ParallelBlockTableScanDesc pbscan =
(ParallelBlockTableScanDesc) scan->rs_base.rs_parallel;
+ ParallelBlockTableScanWork pbscanwork =
+ (ParallelBlockTableScanWork) scan->rs_base.rs_parallel_work;
table_block_parallelscan_startblock_init(scan->rs_base.rs_rd,
- pbscan);
+ pbscanwork, pbscan);
page = table_block_parallelscan_nextpage(scan->rs_base.rs_rd,
- pbscan);
+ pbscanwork, pbscan);
/* Other processes might have already finished the scan. */
if (page == InvalidBlockNumber)
@@ -1019,9 +1025,11 @@ heapgettup_pagemode(HeapScanDesc scan,
{
ParallelBlockTableScanDesc pbscan =
(ParallelBlockTableScanDesc) scan->rs_base.rs_parallel;
+ ParallelBlockTableScanWork pbscanwork =
+ (ParallelBlockTableScanWork) scan->rs_base.rs_parallel_work;
page = table_block_parallelscan_nextpage(scan->rs_base.rs_rd,
- pbscan);
+ pbscanwork, pbscan);
finished = (page == InvalidBlockNumber);
}
else
@@ -1155,6 +1163,8 @@ heap_beginscan(Relation relation, Snapshot snapshot,
scan->rs_base.rs_nkeys = nkeys;
scan->rs_base.rs_flags = flags;
scan->rs_base.rs_parallel = parallel_scan;
+ scan->rs_base.rs_parallel_work =
+ palloc0(sizeof(ParallelBlockTableScanWorkData));
scan->rs_strategy = NULL; /* set in initscan */
/*
diff --git a/src/backend/access/table/tableam.c b/src/backend/access/table/tableam.c
index c814733b22..c2ee42e795 100644
--- a/src/backend/access/table/tableam.c
+++ b/src/backend/access/table/tableam.c
@@ -25,10 +25,17 @@
#include "access/tableam.h"
#include "access/xact.h"
#include "optimizer/plancat.h"
+#include "port/pg_bitutils.h"
#include "storage/bufmgr.h"
#include "storage/shmem.h"
#include "storage/smgr.h"
+/* The number of I/O chunks we try to break a parallel seqscan down into */
+#define PARALLEL_SEQSCAN_NCHUNKS 2048
+/* Ramp down size of allocations when we've only this number of chunks left */
+#define PARALLEL_SEQSCAN_RAMPDOWN_CHUNKS 64
+/* Cap the size of parallel I/O chunks to this number of blocks */
+#define PARALLEL_SEQSCAN_MAX_CHUNK_SIZE 131072
/* GUC variables */
char *default_table_access_method = DEFAULT_TABLE_ACCESS_METHOD;
@@ -404,10 +411,25 @@ table_block_parallelscan_reinitialize(Relation rel, ParallelTableScanDesc pscan)
* to set the startblock once.
*/
void
-table_block_parallelscan_startblock_init(Relation rel, ParallelBlockTableScanDesc pbscan)
+table_block_parallelscan_startblock_init(Relation rel,
+ ParallelBlockTableScanWork workspace,
+ ParallelBlockTableScanDesc pbscan)
{
BlockNumber sync_startpage = InvalidBlockNumber;
+ /* Reset the state we use for controlling allocation size. */
+ memset(workspace, 0, sizeof(*workspace));
+
+ StaticAssertStmt(MaxBlockNumber <= 0xFFFFFFFE,
+ "pg_nextpower2_32 may be too small for non-standard BlockNumber width");
+
+ workspace->phsw_chunk_size = pg_nextpower2_32(Max(pbscan->phs_nblocks /
+ PARALLEL_SEQSCAN_NCHUNKS, 1));
+
+ /* Ensure we don't go over the maximum size with larger rels */
+ workspace->phsw_chunk_size = Min(workspace->phsw_chunk_size,
+ PARALLEL_SEQSCAN_MAX_CHUNK_SIZE);
+
retry:
/* Grab the spinlock. */
SpinLockAcquire(&pbscan->phs_mutex);
@@ -447,12 +469,35 @@ retry:
* backend gets an InvalidBlockNumber return.
*/
BlockNumber
-table_block_parallelscan_nextpage(Relation rel, ParallelBlockTableScanDesc pbscan)
+table_block_parallelscan_nextpage(Relation rel,
+ ParallelBlockTableScanWork workspace,
+ ParallelBlockTableScanDesc pbscan)
{
BlockNumber page;
uint64 nallocated;
/*
+ * The logic below allocates block numbers out to parallel workers in a
+ * way that each worker will receive a set of consecutive block numbers to
+ * scan. Earlier versions of this would allocate the next highest block
+ * number to the next worker to call this function. This would generally
+ * result in the scan being divided up in stripes of 1 block in size.
+ * Some operating systems would not detect the sequential I/O pattern due
+ * to each backend being a different process which could result in poor
+ * performance due to inefficient or no readahead. To work around this
+ * issue, we now allocate a range of block numbers for each worker and
+ * when they come back for another block, we give them the next one in
+ * that range until the range is complete, we then allocate them another
+ * range of blocks to scan and return the first block number from that
+ * range.
+ *
+ * Here we name these ranges of blocks "chunks". The initial size of
+ * these chunks is determined in table_block_parallelscan_startblock_init
+ * based on the size of the relation. Towards the end of the scan, we
+ * start making reductions in the size of the chunks in order to attempt
+ * to divide the remaining work evenly over all workers as evenly as
+ * possible.
+ *
* phs_nallocated tracks how many pages have been allocated to workers
* already. When phs_nallocated >= rs_nblocks, all blocks have been
* allocated.
@@ -467,7 +512,48 @@ table_block_parallelscan_nextpage(Relation rel, ParallelBlockTableScanDesc pbsca
* The actual page to return is calculated by adding the counter to the
* starting block number, modulo nblocks.
*/
- nallocated = pg_atomic_fetch_add_u64(&pbscan->phs_nallocated, 1);
+
+ /*
+ * First check if we have any remaining blocks in a previous chunk for
+ * this worker. We must consume all of the blocks from that before we
+ * allocate another chunk for the worker.
+ */
+ if (workspace->phsw_alloc_remaining > 0)
+ {
+ /*
+ * Give them the next page in the range and update the remaining pages
+ */
+ nallocated = ++workspace->phsw_nallocated;
+ workspace->phsw_alloc_remaining--;
+ }
+ else
+ {
+ /*
+ * When we've only got PARALLEL_SEQSCAN_RAMPDOWN_CHUNKS chunks
+ * remaining in the scan, we half the chunk size. Since we reduce
+ * the chunk size here, we'll hit this again after doing
+ * PARALLEL_SEQSCAN_RAMPDOWN_CHUNKS at the new size. After a few
+ * iterations of this, we'll end up doing the last few blocks with the
+ * chunk size set to 1.
+ */
+ if (workspace->phsw_chunk_size > 1 &&
+ workspace->phsw_nallocated > pbscan->phs_nblocks -
+ (workspace->phsw_chunk_size * PARALLEL_SEQSCAN_RAMPDOWN_CHUNKS))
+ workspace->phsw_chunk_size >>= 1;
+
+ nallocated = workspace->phsw_nallocated =
+ pg_atomic_fetch_add_u64(&pbscan->phs_nallocated,
+ workspace->phsw_chunk_size);
+
+#ifdef TBPN_DEBUG
+ elog(NOTICE, "chunksize = %u, allocated = %u", workspace->phsw_chunk_size, nallocated);
+#endif
+ /*
+ * Set the remaining number of pages in this chunk so that subsequent
+ * calls from this worker continue on with this chunk until it's done.
+ */
+ workspace->phsw_alloc_remaining = workspace->phsw_chunk_size - 1;
+ }
if (nallocated >= pbscan->phs_nblocks)
page = InvalidBlockNumber; /* all blocks have been allocated */
else
diff --git a/src/include/access/relscan.h b/src/include/access/relscan.h
index 6f0258831f..78d563aab1 100644
--- a/src/include/access/relscan.h
+++ b/src/include/access/relscan.h
@@ -42,9 +42,9 @@ typedef struct TableScanDescData
*/
uint32 rs_flags;
+ void *rs_parallel_work;
struct ParallelTableScanDescData *rs_parallel; /* parallel scan
* information */
-
} TableScanDescData;
typedef struct TableScanDescData *TableScanDesc;
@@ -81,6 +81,18 @@ typedef struct ParallelBlockTableScanDescData
} ParallelBlockTableScanDescData;
typedef struct ParallelBlockTableScanDescData *ParallelBlockTableScanDesc;
+/*
+ * Per backend state for parallel table sacan, for block oriented storage.
+ */
+typedef struct ParallelBlockTableScanWorkData
+{
+ uint64 phsw_nallocated; /* Current # of pages into the scan */
+ uint32 phsw_alloc_remaining; /* Pages left for this allocation */
+ uint32 phsw_chunk_size; /* The number of pages to take in each
+ * I/O chunk for the scan */
+} ParallelBlockTableScanWorkData;
+typedef struct ParallelBlockTableScanWorkData *ParallelBlockTableScanWork;
+
/*
* Base class for fetches from a table via an index. This is the base-class
* for such scans, which needs to be embedded in the respective struct for
diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h
index eb18739c36..f0cefd7e26 100644
--- a/src/include/access/tableam.h
+++ b/src/include/access/tableam.h
@@ -1790,8 +1790,10 @@ extern Size table_block_parallelscan_initialize(Relation rel,
extern void table_block_parallelscan_reinitialize(Relation rel,
ParallelTableScanDesc pscan);
extern BlockNumber table_block_parallelscan_nextpage(Relation rel,
+ ParallelBlockTableScanWork pbscanwork,
ParallelBlockTableScanDesc pbscan);
extern void table_block_parallelscan_startblock_init(Relation rel,
+ ParallelBlockTableScanWork pbscanwork,
ParallelBlockTableScanDesc pbscan);