On Fri, 19 Jun 2020 at 11:34, David Rowley <dgrowle...@gmail.com> wrote:
>
> On Fri, 19 Jun 2020 at 03:26, Robert Haas <robertmh...@gmail.com> wrote:
> >
> > On Thu, Jun 18, 2020 at 6:15 AM David Rowley <dgrowle...@gmail.com> wrote:
> > > With a 32TB relation, the code will make the chunk size 16GB.  Perhaps
> > > I should change the code to cap that at 1GB.
> >
> > It seems pretty hard to believe there's any significant advantage to a
> > chunk size >1GB, so I would be in favor of that change.
>
> I could certainly make that change.  With the standard page size, 1GB
> is 131072 pages and a power of 2. That would change for non-standard
> page sizes, so we'd need to decide if we want to keep the chunk size a
> power of 2, or just cap it exactly at whatever number of pages 1GB is.
>
> I'm not sure how much of a difference it'll make, but I also just want
> to note that synchronous scans can mean we'll start the scan anywhere
> within the table, so capping to 1GB does not mean we read an entire
> extent. It's more likely to span 2 extents.

Here's a patch which caps the maximum chunk size to 131072.  If
someone doubles the page size then that'll be 2GB instead of 1GB. I'm
not personally worried about that.

I tested the performance on a Windows 10 laptop using the test case from [1]

Master:

workers=0: Time: 141175.935 ms (02:21.176)
workers=1: Time: 316854.538 ms (05:16.855)
workers=2: Time: 323471.791 ms (05:23.472)
workers=3: Time: 321637.945 ms (05:21.638)
workers=4: Time: 308689.599 ms (05:08.690)
workers=5: Time: 289014.709 ms (04:49.015)
workers=6: Time: 267785.270 ms (04:27.785)
workers=7: Time: 248735.817 ms (04:08.736)

Patched:

workers=0: Time: 155985.204 ms (02:35.985)
workers=1: Time: 112238.741 ms (01:52.239)
workers=2: Time: 105861.813 ms (01:45.862)
workers=3: Time: 91874.311 ms (01:31.874)
workers=4: Time: 92538.646 ms (01:32.539)
workers=5: Time: 93012.902 ms (01:33.013)
workers=6: Time: 94269.076 ms (01:34.269)
workers=7: Time: 90858.458 ms (01:30.858)

David

[1] 
https://www.postgresql.org/message-id/CAApHDvrfJfYH51_WY-iQqPw8yGR4fDoTxAQKqn%2BSa7NTKEVWtg%40mail.gmail.com
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 537913d1bb..5792040bc1 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -520,12 +520,14 @@ heapgettup(HeapScanDesc scan,
 			{
 				ParallelBlockTableScanDesc pbscan =
 				(ParallelBlockTableScanDesc) scan->rs_base.rs_parallel;
+				ParallelBlockTableScanWork pbscanwork =
+				(ParallelBlockTableScanWork) scan->rs_base.rs_parallel_work;
 
 				table_block_parallelscan_startblock_init(scan->rs_base.rs_rd,
-														 pbscan);
+														 pbscanwork, pbscan);
 
 				page = table_block_parallelscan_nextpage(scan->rs_base.rs_rd,
-														 pbscan);
+														 pbscanwork, pbscan);
 
 				/* Other processes might have already finished the scan. */
 				if (page == InvalidBlockNumber)
@@ -720,9 +722,11 @@ heapgettup(HeapScanDesc scan,
 		{
 			ParallelBlockTableScanDesc pbscan =
 			(ParallelBlockTableScanDesc) scan->rs_base.rs_parallel;
+			ParallelBlockTableScanWork pbscanwork =
+			(ParallelBlockTableScanWork) scan->rs_base.rs_parallel_work;
 
 			page = table_block_parallelscan_nextpage(scan->rs_base.rs_rd,
-													 pbscan);
+													 pbscanwork, pbscan);
 			finished = (page == InvalidBlockNumber);
 		}
 		else
@@ -834,12 +838,14 @@ heapgettup_pagemode(HeapScanDesc scan,
 			{
 				ParallelBlockTableScanDesc pbscan =
 				(ParallelBlockTableScanDesc) scan->rs_base.rs_parallel;
+				ParallelBlockTableScanWork pbscanwork =
+				(ParallelBlockTableScanWork) scan->rs_base.rs_parallel_work;
 
 				table_block_parallelscan_startblock_init(scan->rs_base.rs_rd,
-														 pbscan);
+														 pbscanwork, pbscan);
 
 				page = table_block_parallelscan_nextpage(scan->rs_base.rs_rd,
-														 pbscan);
+														 pbscanwork, pbscan);
 
 				/* Other processes might have already finished the scan. */
 				if (page == InvalidBlockNumber)
@@ -1019,9 +1025,11 @@ heapgettup_pagemode(HeapScanDesc scan,
 		{
 			ParallelBlockTableScanDesc pbscan =
 			(ParallelBlockTableScanDesc) scan->rs_base.rs_parallel;
+			ParallelBlockTableScanWork pbscanwork =
+			(ParallelBlockTableScanWork) scan->rs_base.rs_parallel_work;
 
 			page = table_block_parallelscan_nextpage(scan->rs_base.rs_rd,
-													 pbscan);
+													 pbscanwork, pbscan);
 			finished = (page == InvalidBlockNumber);
 		}
 		else
@@ -1155,6 +1163,8 @@ heap_beginscan(Relation relation, Snapshot snapshot,
 	scan->rs_base.rs_nkeys = nkeys;
 	scan->rs_base.rs_flags = flags;
 	scan->rs_base.rs_parallel = parallel_scan;
+	scan->rs_base.rs_parallel_work =
+		palloc0(sizeof(ParallelBlockTableScanWorkData));
 	scan->rs_strategy = NULL;	/* set in initscan */
 
 	/*
diff --git a/src/backend/access/table/tableam.c b/src/backend/access/table/tableam.c
index c814733b22..c2ee42e795 100644
--- a/src/backend/access/table/tableam.c
+++ b/src/backend/access/table/tableam.c
@@ -25,10 +25,17 @@
 #include "access/tableam.h"
 #include "access/xact.h"
 #include "optimizer/plancat.h"
+#include "port/pg_bitutils.h"
 #include "storage/bufmgr.h"
 #include "storage/shmem.h"
 #include "storage/smgr.h"
 
+/* The number of I/O chunks we try to break a parallel seqscan down into */
+#define PARALLEL_SEQSCAN_NCHUNKS			2048
+/* Ramp down size of allocations when we've only this number of chunks left */
+#define PARALLEL_SEQSCAN_RAMPDOWN_CHUNKS	64
+/* Cap the size of parallel I/O chunks to this number of blocks */
+#define PARALLEL_SEQSCAN_MAX_CHUNK_SIZE		131072
 
 /* GUC variables */
 char	   *default_table_access_method = DEFAULT_TABLE_ACCESS_METHOD;
@@ -404,10 +411,25 @@ table_block_parallelscan_reinitialize(Relation rel, ParallelTableScanDesc pscan)
  * to set the startblock once.
  */
 void
-table_block_parallelscan_startblock_init(Relation rel, ParallelBlockTableScanDesc pbscan)
+table_block_parallelscan_startblock_init(Relation rel,
+										 ParallelBlockTableScanWork workspace,
+										 ParallelBlockTableScanDesc pbscan)
 {
 	BlockNumber sync_startpage = InvalidBlockNumber;
 
+	/* Reset the state we use for controlling allocation size. */
+	memset(workspace, 0, sizeof(*workspace));
+
+	StaticAssertStmt(MaxBlockNumber <= 0xFFFFFFFE,
+					 "pg_nextpower2_32 may be too small for non-standard BlockNumber width");
+
+	workspace->phsw_chunk_size = pg_nextpower2_32(Max(pbscan->phs_nblocks /
+												  PARALLEL_SEQSCAN_NCHUNKS, 1));
+
+	/* Ensure we don't go over the maximum size with larger rels */
+	workspace->phsw_chunk_size = Min(workspace->phsw_chunk_size,
+									 PARALLEL_SEQSCAN_MAX_CHUNK_SIZE);
+
 retry:
 	/* Grab the spinlock. */
 	SpinLockAcquire(&pbscan->phs_mutex);
@@ -447,12 +469,35 @@ retry:
  * backend gets an InvalidBlockNumber return.
  */
 BlockNumber
-table_block_parallelscan_nextpage(Relation rel, ParallelBlockTableScanDesc pbscan)
+table_block_parallelscan_nextpage(Relation rel,
+								  ParallelBlockTableScanWork workspace,
+								  ParallelBlockTableScanDesc pbscan)
 {
 	BlockNumber page;
 	uint64		nallocated;
 
 	/*
+	 * The logic below allocates block numbers out to parallel workers in a
+	 * way that each worker will receive a set of consecutive block numbers to
+	 * scan.  Earlier versions of this would allocate the next highest block
+	 * number to the next worker to call this function.  This would generally
+	 * result in the scan being divided up in stripes of 1 block in size.
+	 * Some operating systems would not detect the sequential I/O pattern due
+	 * to each backend being a different process which could result in poor
+	 * performance due to inefficient or no readahead.  To work around this
+	 * issue, we now allocate a range of block numbers for each worker and
+	 * when they come back for another block, we give them the next one in
+	 * that range until the range is complete, we then allocate them another
+	 * range of blocks to scan and return the first block number from that
+	 * range.
+	 *
+	 * Here we name these ranges of blocks "chunks".  The initial size of
+	 * these chunks is determined in table_block_parallelscan_startblock_init
+	 * based on the size of the relation.  Towards the end of the scan, we
+	 * start making reductions in the size of the chunks in order to attempt
+	 * to divide the remaining work evenly over all workers as evenly as
+	 * possible.
+	 *
 	 * phs_nallocated tracks how many pages have been allocated to workers
 	 * already.  When phs_nallocated >= rs_nblocks, all blocks have been
 	 * allocated.
@@ -467,7 +512,48 @@ table_block_parallelscan_nextpage(Relation rel, ParallelBlockTableScanDesc pbsca
 	 * The actual page to return is calculated by adding the counter to the
 	 * starting block number, modulo nblocks.
 	 */
-	nallocated = pg_atomic_fetch_add_u64(&pbscan->phs_nallocated, 1);
+
+	/*
+	 * First check if we have any remaining blocks in a previous chunk for
+	 * this worker.  We must consume all of the blocks from that before we
+	 * allocate another chunk for the worker.
+	 */
+	if (workspace->phsw_alloc_remaining > 0)
+	{
+		/*
+		 * Give them the next page in the range and update the remaining pages
+		 */
+		nallocated = ++workspace->phsw_nallocated;
+		workspace->phsw_alloc_remaining--;
+	}
+	else
+	{
+		/*
+		 * When we've only got PARALLEL_SEQSCAN_RAMPDOWN_CHUNKS chunks
+		 * remaining in the scan, we half the chunk size.  Since we reduce
+		 * the chunk size here, we'll hit this again after doing
+		 * PARALLEL_SEQSCAN_RAMPDOWN_CHUNKS at the new size.  After a few
+		 * iterations of this, we'll end up doing the last few blocks with the
+		 * chunk size set to 1.
+		 */
+		if (workspace->phsw_chunk_size > 1 &&
+			workspace->phsw_nallocated > pbscan->phs_nblocks -
+			(workspace->phsw_chunk_size * PARALLEL_SEQSCAN_RAMPDOWN_CHUNKS))
+			workspace->phsw_chunk_size >>= 1;
+
+		nallocated = workspace->phsw_nallocated =
+			pg_atomic_fetch_add_u64(&pbscan->phs_nallocated,
+									workspace->phsw_chunk_size);
+
+#ifdef TBPN_DEBUG
+		elog(NOTICE, "chunksize = %u, allocated = %u", workspace->phsw_chunk_size, nallocated);
+#endif
+		/*
+		 * Set the remaining number of pages in this chunk so that subsequent
+		 * calls from this worker continue on with this chunk until it's done.
+		 */
+		workspace->phsw_alloc_remaining = workspace->phsw_chunk_size - 1;
+	}
 	if (nallocated >= pbscan->phs_nblocks)
 		page = InvalidBlockNumber;	/* all blocks have been allocated */
 	else
diff --git a/src/include/access/relscan.h b/src/include/access/relscan.h
index 6f0258831f..78d563aab1 100644
--- a/src/include/access/relscan.h
+++ b/src/include/access/relscan.h
@@ -42,9 +42,9 @@ typedef struct TableScanDescData
 	 */
 	uint32		rs_flags;
 
+	void	   *rs_parallel_work;
 	struct ParallelTableScanDescData *rs_parallel;	/* parallel scan
 													 * information */
-
 } TableScanDescData;
 typedef struct TableScanDescData *TableScanDesc;
 
@@ -81,6 +81,18 @@ typedef struct ParallelBlockTableScanDescData
 }			ParallelBlockTableScanDescData;
 typedef struct ParallelBlockTableScanDescData *ParallelBlockTableScanDesc;
 
+/*
+ * Per backend state for parallel table sacan, for block oriented storage.
+ */
+typedef struct ParallelBlockTableScanWorkData
+{
+	uint64		phsw_nallocated;		/* Current # of pages into the scan */
+	uint32		phsw_alloc_remaining;	/* Pages left for this allocation */
+	uint32		phsw_chunk_size;		/* The number of pages to take in each
+										 * I/O chunk for the scan */
+} ParallelBlockTableScanWorkData;
+typedef struct ParallelBlockTableScanWorkData *ParallelBlockTableScanWork;
+
 /*
  * Base class for fetches from a table via an index. This is the base-class
  * for such scans, which needs to be embedded in the respective struct for
diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h
index eb18739c36..f0cefd7e26 100644
--- a/src/include/access/tableam.h
+++ b/src/include/access/tableam.h
@@ -1790,8 +1790,10 @@ extern Size table_block_parallelscan_initialize(Relation rel,
 extern void table_block_parallelscan_reinitialize(Relation rel,
 												  ParallelTableScanDesc pscan);
 extern BlockNumber table_block_parallelscan_nextpage(Relation rel,
+													 ParallelBlockTableScanWork pbscanwork,
 													 ParallelBlockTableScanDesc pbscan);
 extern void table_block_parallelscan_startblock_init(Relation rel,
+													 ParallelBlockTableScanWork pbscanwork,
 													 ParallelBlockTableScanDesc pbscan);
 
 

Reply via email to