>From 2093a78191cacaeecae6b1bd095433dbdbf1eb3d Mon Sep 17 00:00:00 2001
From: Cary Huang <cary.huang@highgo.ca>
Date: Mon, 29 Apr 2024 14:48:44 -0700
Subject: [PATCH] added parallel tid range scan feature

---
 src/backend/access/heap/heapam.c              | 19 +++--
 src/backend/access/table/tableam.c            | 29 +++++++
 src/backend/executor/execParallel.c           | 20 +++++
 src/backend/executor/nodeTidrangescan.c       | 81 +++++++++++++++++++
 src/backend/optimizer/path/allpaths.c         |  5 ++
 src/backend/optimizer/path/costsize.c         | 22 +++++
 src/backend/optimizer/path/tidpath.c          | 31 ++++++-
 src/backend/optimizer/util/pathnode.c         |  7 +-
 src/include/access/tableam.h                  | 12 +++
 src/include/executor/nodeTidrangescan.h       |  7 ++
 src/include/nodes/execnodes.h                 |  1 +
 src/include/optimizer/pathnode.h              |  3 +-
 src/include/optimizer/paths.h                 |  2 +
 src/test/regress/expected/select_parallel.out | 51 ++++++++++++
 src/test/regress/sql/select_parallel.sql      | 15 ++++
 15 files changed, 293 insertions(+), 12 deletions(-)

diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 4be0dee4de..2cbb058e96 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -1367,7 +1367,8 @@ heap_set_tidrange(TableScanDesc sscan, ItemPointer mintid,
 	 * Check for an empty range and protect from would be negative results
 	 * from the numBlks calculation below.
 	 */
-	if (ItemPointerCompare(&highestItem, &lowestItem) < 0)
+	if (ItemPointerCompare(&highestItem, &lowestItem) < 0 &&
+		sscan->rs_parallel == NULL)
 	{
 		/* Set an empty range of blocks to scan */
 		heap_setscanlimits(sscan, 0, 0);
@@ -1381,15 +1382,19 @@ heap_set_tidrange(TableScanDesc sscan, ItemPointer mintid,
 	 * lowestItem has an offset above MaxOffsetNumber.  In this case, we could
 	 * advance startBlk by one.  Likewise, if highestItem has an offset of 0
 	 * we could scan one fewer blocks.  However, such an optimization does not
-	 * seem worth troubling over, currently.
+	 * seem worth troubling over, currently. This is set only in non-parallel
+	 * case.
 	 */
-	startBlk = ItemPointerGetBlockNumberNoCheck(&lowestItem);
+	if (sscan->rs_parallel == NULL)
+	{
+		startBlk = ItemPointerGetBlockNumberNoCheck(&lowestItem);
 
-	numBlks = ItemPointerGetBlockNumberNoCheck(&highestItem) -
-		ItemPointerGetBlockNumberNoCheck(&lowestItem) + 1;
+		numBlks = ItemPointerGetBlockNumberNoCheck(&highestItem) -
+			ItemPointerGetBlockNumberNoCheck(&lowestItem) + 1;
 
-	/* Set the start block and number of blocks to scan */
-	heap_setscanlimits(sscan, startBlk, numBlks);
+		/* Set the start block and number of blocks to scan */
+		heap_setscanlimits(sscan, startBlk, numBlks);
+	}
 
 	/* Finally, set the TID range in sscan */
 	ItemPointerCopy(&lowestItem, &sscan->rs_mintid);
diff --git a/src/backend/access/table/tableam.c b/src/backend/access/table/tableam.c
index e57a0b7ea3..38253643e8 100644
--- a/src/backend/access/table/tableam.c
+++ b/src/backend/access/table/tableam.c
@@ -187,6 +187,35 @@ table_beginscan_parallel(Relation relation, ParallelTableScanDesc pscan)
 											pscan, flags);
 }
 
+TableScanDesc
+table_beginscan_parallel_tidrange(Relation relation, ParallelTableScanDesc pscan,
+								  ItemPointer mintid, ItemPointer maxtid)
+{
+	Snapshot	snapshot;
+	uint32		flags = SO_TYPE_TIDRANGESCAN | SO_ALLOW_PAGEMODE;
+	TableScanDesc sscan;
+
+	Assert(RelationGetRelid(relation) == pscan->phs_relid);
+
+	if (!pscan->phs_snapshot_any)
+	{
+		/* Snapshot was serialized -- restore it */
+		snapshot = RestoreSnapshot((char *) pscan + pscan->phs_snapshot_off);
+		RegisterSnapshot(snapshot);
+		flags |= SO_TEMP_SNAPSHOT;
+	}
+	else
+	{
+		/* SnapshotAny passed by caller (not serialized) */
+		snapshot = SnapshotAny;
+	}
+
+	sscan = relation->rd_tableam->scan_begin(relation, snapshot, 0, NULL,
+											 pscan, flags);
+
+	return sscan;
+}
+
 
 /* ----------------------------------------------------------------------------
  * Index scan related functions.
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c
index 8c53d1834e..e4733ca5a3 100644
--- a/src/backend/executor/execParallel.c
+++ b/src/backend/executor/execParallel.c
@@ -40,6 +40,7 @@
 #include "executor/nodeSort.h"
 #include "executor/nodeSubplan.h"
 #include "executor/tqueue.h"
+#include "executor/nodeTidrangescan.h"
 #include "jit/jit.h"
 #include "nodes/nodeFuncs.h"
 #include "pgstat.h"
@@ -296,6 +297,11 @@ ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e)
 			/* even when not parallel-aware, for EXPLAIN ANALYZE */
 			ExecMemoizeEstimate((MemoizeState *) planstate, e->pcxt);
 			break;
+		case T_TidRangeScanState:
+			if (planstate->plan->parallel_aware)
+				ExecTidRangeScanEstimate((TidRangeScanState *) planstate,
+									e->pcxt);
+			break;
 		default:
 			break;
 	}
@@ -520,6 +526,11 @@ ExecParallelInitializeDSM(PlanState *planstate,
 			/* even when not parallel-aware, for EXPLAIN ANALYZE */
 			ExecMemoizeInitializeDSM((MemoizeState *) planstate, d->pcxt);
 			break;
+		case T_TidRangeScanState:
+			if (planstate->plan->parallel_aware)
+				ExecTidRangeScanInitializeDSM((TidRangeScanState *) planstate,
+										 d->pcxt);
+			break;
 		default:
 			break;
 	}
@@ -1006,6 +1017,11 @@ ExecParallelReInitializeDSM(PlanState *planstate,
 		case T_MemoizeState:
 			/* these nodes have DSM state, but no reinitialization is required */
 			break;
+		case T_TidRangeScanState:
+			if (planstate->plan->parallel_aware)
+				ExecTidRangeScanReInitializeDSM((TidRangeScanState *) planstate,
+										   pcxt);
+			break;
 
 		default:
 			break;
@@ -1372,6 +1388,10 @@ ExecParallelInitializeWorker(PlanState *planstate, ParallelWorkerContext *pwcxt)
 			/* even when not parallel-aware, for EXPLAIN ANALYZE */
 			ExecMemoizeInitializeWorker((MemoizeState *) planstate, pwcxt);
 			break;
+		case T_TidRangeScanState:
+			if (planstate->plan->parallel_aware)
+				ExecTidRangeScanInitializeWorker((TidRangeScanState *) planstate, pwcxt);
+			break;
 		default:
 			break;
 	}
diff --git a/src/backend/executor/nodeTidrangescan.c b/src/backend/executor/nodeTidrangescan.c
index 9aa7683d7e..a353a17731 100644
--- a/src/backend/executor/nodeTidrangescan.c
+++ b/src/backend/executor/nodeTidrangescan.c
@@ -403,3 +403,84 @@ ExecInitTidRangeScan(TidRangeScan *node, EState *estate, int eflags)
 	 */
 	return tidrangestate;
 }
+/* ----------------------------------------------------------------
+ *						Parallel Scan Support
+ * ----------------------------------------------------------------
+ */
+
+/* ----------------------------------------------------------------
+ *		ExecTidRangeScanEstimate
+ *
+ *		Compute the amount of space we'll need in the parallel
+ *		query DSM, and inform pcxt->estimator about our needs.
+ * ----------------------------------------------------------------
+ */
+void
+ExecTidRangeScanEstimate(TidRangeScanState *node,
+					ParallelContext *pcxt)
+{
+	EState	   *estate = node->ss.ps.state;
+
+	node->pscan_len = table_parallelscan_estimate(node->ss.ss_currentRelation,
+												  estate->es_snapshot);
+	shm_toc_estimate_chunk(&pcxt->estimator, node->pscan_len);
+	shm_toc_estimate_keys(&pcxt->estimator, 1);
+}
+
+/* ----------------------------------------------------------------
+ *		ExecTidRangeScanInitializeDSM
+ *
+ *		Set up a parallel heap scan descriptor.
+ * ----------------------------------------------------------------
+ */
+void
+ExecTidRangeScanInitializeDSM(TidRangeScanState *node,
+						 ParallelContext *pcxt)
+{
+	EState	   *estate = node->ss.ps.state;
+	ParallelTableScanDesc pscan;
+
+	pscan = shm_toc_allocate(pcxt->toc, node->pscan_len);
+	table_parallelscan_initialize(node->ss.ss_currentRelation,
+								  pscan,
+								  estate->es_snapshot);
+	shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id, pscan);
+	node->ss.ss_currentScanDesc =
+		table_beginscan_parallel_tidrange(node->ss.ss_currentRelation, pscan,
+				&node->trss_mintid, &node->trss_maxtid);
+}
+
+/* ----------------------------------------------------------------
+ *		ExecTidRangeScanReInitializeDSM
+ *
+ *		Reset shared state before beginning a fresh scan.
+ * ----------------------------------------------------------------
+ */
+void
+ExecTidRangeScanReInitializeDSM(TidRangeScanState *node,
+						   ParallelContext *pcxt)
+{
+	ParallelTableScanDesc pscan;
+
+	pscan = node->ss.ss_currentScanDesc->rs_parallel;
+	table_parallelscan_reinitialize(node->ss.ss_currentRelation, pscan);
+
+}
+
+/* ----------------------------------------------------------------
+ *		ExecSeqScanInitializeWorker
+ *
+ *		Copy relevant information from TOC into planstate.
+ * ----------------------------------------------------------------
+ */
+void
+ExecTidRangeScanInitializeWorker(TidRangeScanState *node,
+							ParallelWorkerContext *pwcxt)
+{
+	ParallelTableScanDesc pscan;
+
+	pscan = shm_toc_lookup(pwcxt->toc, node->ss.ps.plan->plan_node_id, false);
+	node->ss.ss_currentScanDesc =
+		table_beginscan_parallel_tidrange(node->ss.ss_currentRelation, pscan,
+				&node->trss_mintid, &node->trss_maxtid);
+}
diff --git a/src/backend/optimizer/path/allpaths.c b/src/backend/optimizer/path/allpaths.c
index cc51ae1757..ab0d1d5fb7 100644
--- a/src/backend/optimizer/path/allpaths.c
+++ b/src/backend/optimizer/path/allpaths.c
@@ -794,6 +794,7 @@ static void
 create_plain_partial_paths(PlannerInfo *root, RelOptInfo *rel)
 {
 	int			parallel_workers;
+	Path		*path = NULL;
 
 	parallel_workers = compute_parallel_worker(rel, rel->pages, -1,
 											   max_parallel_workers_per_gather);
@@ -804,6 +805,10 @@ create_plain_partial_paths(PlannerInfo *root, RelOptInfo *rel)
 
 	/* Add an unordered partial path based on a parallel sequential scan. */
 	add_partial_path(rel, create_seqscan_path(root, rel, NULL, parallel_workers));
+
+	path = create_tidrangescan_subpaths(root, rel, parallel_workers);
+	if (path)
+		add_partial_path(rel, path);
 }
 
 /*
diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c
index ee23ed7835..262b1ef02d 100644
--- a/src/backend/optimizer/path/costsize.c
+++ b/src/backend/optimizer/path/costsize.c
@@ -1435,6 +1435,28 @@ cost_tidrangescan(Path *path, PlannerInfo *root,
 	startup_cost += path->pathtarget->cost.startup;
 	run_cost += path->pathtarget->cost.per_tuple * path->rows;
 
+	/* Adjust costing for parallelism, if used. */
+	if (path->parallel_workers > 0)
+	{
+		double		parallel_divisor = get_parallel_divisor(path);
+
+		/* The CPU cost is divided among all the workers. */
+		run_cost /= parallel_divisor;
+
+		/*
+		 * It may be possible to amortize some of the I/O cost, but probably
+		 * not very much, because most operating systems already do aggressive
+		 * prefetching.  For now, we assume that the disk run cost can't be
+		 * amortized at all.
+		 */
+
+		/*
+		 * In the case of a parallel plan, the row count needs to represent
+		 * the number of tuples processed per worker.
+		 */
+		path->rows = clamp_row_est(path->rows / parallel_divisor);
+	}
+
 	path->startup_cost = startup_cost;
 	path->total_cost = startup_cost + run_cost;
 }
diff --git a/src/backend/optimizer/path/tidpath.c b/src/backend/optimizer/path/tidpath.c
index 2ae5ddfe43..c5413be9e6 100644
--- a/src/backend/optimizer/path/tidpath.c
+++ b/src/backend/optimizer/path/tidpath.c
@@ -496,7 +496,8 @@ create_tidscan_paths(PlannerInfo *root, RelOptInfo *rel)
 
 		add_path(rel, (Path *) create_tidrangescan_path(root, rel,
 														tidrangequals,
-														required_outer));
+														required_outer,
+														0));
 	}
 
 	/*
@@ -526,3 +527,31 @@ create_tidscan_paths(PlannerInfo *root, RelOptInfo *rel)
 	 */
 	BuildParameterizedTidPaths(root, rel, rel->joininfo);
 }
+
+Path *
+create_tidrangescan_subpaths(PlannerInfo *root, RelOptInfo *rel, int parallel_workers)
+{
+	List	   *tidrangequals;
+	Path	   *path;
+	/*
+	 * If there are range quals in the baserestrict list, generate a
+	 * TidRangePath.
+	 */
+	tidrangequals = TidRangeQualFromRestrictInfoList(rel->baserestrictinfo,
+													 rel);
+
+	if (tidrangequals != NIL)
+	{
+		/*
+		 * This path uses no join clauses, but it could still have required
+		 * parameterization due to LATERAL refs in its tlist.
+		 */
+		Relids		required_outer = rel->lateral_relids;
+		path = (Path *) create_tidrangescan_path(root, rel,
+										tidrangequals,
+										required_outer,
+										parallel_workers);
+		return path;
+	}
+	return NULL;
+}
diff --git a/src/backend/optimizer/util/pathnode.c b/src/backend/optimizer/util/pathnode.c
index 3cf1dac087..7ceeaf8688 100644
--- a/src/backend/optimizer/util/pathnode.c
+++ b/src/backend/optimizer/util/pathnode.c
@@ -1206,7 +1206,8 @@ create_tidscan_path(PlannerInfo *root, RelOptInfo *rel, List *tidquals,
  */
 TidRangePath *
 create_tidrangescan_path(PlannerInfo *root, RelOptInfo *rel,
-						 List *tidrangequals, Relids required_outer)
+						 List *tidrangequals, Relids required_outer,
+						 int parallel_workers)
 {
 	TidRangePath *pathnode = makeNode(TidRangePath);
 
@@ -1215,9 +1216,9 @@ create_tidrangescan_path(PlannerInfo *root, RelOptInfo *rel,
 	pathnode->path.pathtarget = rel->reltarget;
 	pathnode->path.param_info = get_baserel_parampathinfo(root, rel,
 														  required_outer);
-	pathnode->path.parallel_aware = false;
+	pathnode->path.parallel_aware = (parallel_workers > 0);
 	pathnode->path.parallel_safe = rel->consider_parallel;
-	pathnode->path.parallel_workers = 0;
+	pathnode->path.parallel_workers = parallel_workers;
 	pathnode->path.pathkeys = NIL;	/* always unordered */
 
 	pathnode->tidrangequals = tidrangequals;
diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h
index 8e583b45cd..14c4a694a1 100644
--- a/src/include/access/tableam.h
+++ b/src/include/access/tableam.h
@@ -1175,6 +1175,18 @@ extern void table_parallelscan_initialize(Relation rel,
 extern TableScanDesc table_beginscan_parallel(Relation relation,
 											  ParallelTableScanDesc pscan);
 
+/*
+ * Begin a parallel tidrange scan. `pscan` needs to have been initialized with
+ * table_parallelscan_initialize(), for the same relation. The initialization
+ * does not need to have happened in this backend.
+ *
+ * Caller must hold a suitable lock on the relation.
+ */
+extern TableScanDesc table_beginscan_parallel_tidrange(Relation relation,
+													   ParallelTableScanDesc pscan,
+													   ItemPointer mintid,
+													   ItemPointer maxtid);
+
 /*
  * Restart a parallel scan.  Call this in the leader process.  Caller is
  * responsible for making sure that all workers have finished the scan
diff --git a/src/include/executor/nodeTidrangescan.h b/src/include/executor/nodeTidrangescan.h
index 1cfc7a07be..977cb8eb6e 100644
--- a/src/include/executor/nodeTidrangescan.h
+++ b/src/include/executor/nodeTidrangescan.h
@@ -14,6 +14,7 @@
 #ifndef NODETIDRANGESCAN_H
 #define NODETIDRANGESCAN_H
 
+#include "access/parallel.h"
 #include "nodes/execnodes.h"
 
 extern TidRangeScanState *ExecInitTidRangeScan(TidRangeScan *node,
@@ -21,4 +22,10 @@ extern TidRangeScanState *ExecInitTidRangeScan(TidRangeScan *node,
 extern void ExecEndTidRangeScan(TidRangeScanState *node);
 extern void ExecReScanTidRangeScan(TidRangeScanState *node);
 
+/* parallel scan support */
+extern void ExecTidRangeScanEstimate(TidRangeScanState *node, ParallelContext *pcxt);
+extern void ExecTidRangeScanInitializeDSM(TidRangeScanState *node, ParallelContext *pcxt);
+extern void ExecTidRangeScanReInitializeDSM(TidRangeScanState *node, ParallelContext *pcxt);
+extern void ExecTidRangeScanInitializeWorker(TidRangeScanState *node, ParallelWorkerContext *pwcxt);
+
 #endif							/* NODETIDRANGESCAN_H */
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index d927ac44a8..81eec34730 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -1862,6 +1862,7 @@ typedef struct TidRangeScanState
 	ItemPointerData trss_mintid;
 	ItemPointerData trss_maxtid;
 	bool		trss_inScan;
+	Size		pscan_len;		/* size of parallel tid range scan descriptor */
 } TidRangeScanState;
 
 /* ----------------
diff --git a/src/include/optimizer/pathnode.h b/src/include/optimizer/pathnode.h
index c5c4756b0f..d7683ec1c3 100644
--- a/src/include/optimizer/pathnode.h
+++ b/src/include/optimizer/pathnode.h
@@ -66,7 +66,8 @@ extern TidPath *create_tidscan_path(PlannerInfo *root, RelOptInfo *rel,
 extern TidRangePath *create_tidrangescan_path(PlannerInfo *root,
 											  RelOptInfo *rel,
 											  List *tidrangequals,
-											  Relids required_outer);
+											  Relids required_outer,
+											  int parallel_workers);
 extern AppendPath *create_append_path(PlannerInfo *root, RelOptInfo *rel,
 									  List *subpaths, List *partial_subpaths,
 									  List *pathkeys, Relids required_outer,
diff --git a/src/include/optimizer/paths.h b/src/include/optimizer/paths.h
index 39ba461548..c571354890 100644
--- a/src/include/optimizer/paths.h
+++ b/src/include/optimizer/paths.h
@@ -87,6 +87,8 @@ extern void check_index_predicates(PlannerInfo *root, RelOptInfo *rel);
  *	  routines to generate tid paths
  */
 extern void create_tidscan_paths(PlannerInfo *root, RelOptInfo *rel);
+extern Path *create_tidrangescan_subpaths(PlannerInfo *root, RelOptInfo *rel,
+										  int parallel_worker);
 
 /*
  * joinpath.c
diff --git a/src/test/regress/expected/select_parallel.out b/src/test/regress/expected/select_parallel.out
index 87273fa635..61e6700194 100644
--- a/src/test/regress/expected/select_parallel.out
+++ b/src/test/regress/expected/select_parallel.out
@@ -1293,4 +1293,55 @@ SELECT 1 FROM tenk1_vw_sec
                  Filter: (f1 < tenk1_vw_sec.unique1)
 (9 rows)
 
+-- test parallel tid range scan
+EXPLAIN (COSTS OFF)
+SELECT ctid FROM tenk1 WHERE ctid > '(0,1)' LIMIT 1;
+                  QUERY PLAN                   
+-----------------------------------------------
+ Limit
+   ->  Gather
+         Workers Planned: 4
+         ->  Parallel Tid Range Scan on tenk1
+               TID Cond: (ctid > '(0,1)'::tid)
+(5 rows)
+
+EXPLAIN (COSTS OFF)
+SELECT ctid FROM tenk1 WHERE ctid < '(400,1)' LIMIT 1;
+                   QUERY PLAN                    
+-------------------------------------------------
+ Limit
+   ->  Gather
+         Workers Planned: 4
+         ->  Parallel Tid Range Scan on tenk1
+               TID Cond: (ctid < '(400,1)'::tid)
+(5 rows)
+
+EXPLAIN (COSTS OFF)
+SELECT ctid FROM tenk1 WHERE ctid >= '(0,1)' AND ctid <= '(400,1)' LIMIT 1;
+                                  QUERY PLAN                                   
+-------------------------------------------------------------------------------
+ Limit
+   ->  Gather
+         Workers Planned: 4
+         ->  Parallel Tid Range Scan on tenk1
+               TID Cond: ((ctid >= '(0,1)'::tid) AND (ctid <= '(400,1)'::tid))
+(5 rows)
+
+EXPLAIN (COSTS OFF)
+SELECT t.ctid,t2.c FROM tenk1 t,
+LATERAL (SELECT count(*) c FROM tenk1 t2 WHERE t2.ctid <= t.ctid) t2
+WHERE t.ctid < '(1,0)' LIMIT 1;
+                      QUERY PLAN                      
+------------------------------------------------------
+ Limit
+   ->  Nested Loop
+         ->  Gather
+               Workers Planned: 4
+               ->  Parallel Tid Range Scan on tenk1 t
+                     TID Cond: (ctid < '(1,0)'::tid)
+         ->  Aggregate
+               ->  Tid Range Scan on tenk1 t2
+                     TID Cond: (ctid <= t.ctid)
+(9 rows)
+
 rollback;
diff --git a/src/test/regress/sql/select_parallel.sql b/src/test/regress/sql/select_parallel.sql
index 20376c03fa..1d4ef68790 100644
--- a/src/test/regress/sql/select_parallel.sql
+++ b/src/test/regress/sql/select_parallel.sql
@@ -495,4 +495,19 @@ EXPLAIN (COSTS OFF)
 SELECT 1 FROM tenk1_vw_sec
   WHERE (SELECT sum(f1) FROM int4_tbl WHERE f1 < unique1) < 100;
 
+-- test parallel tid range scan
+EXPLAIN (COSTS OFF)
+SELECT ctid FROM tenk1 WHERE ctid > '(0,1)' LIMIT 1;
+
+EXPLAIN (COSTS OFF)
+SELECT ctid FROM tenk1 WHERE ctid < '(400,1)' LIMIT 1;
+
+EXPLAIN (COSTS OFF)
+SELECT ctid FROM tenk1 WHERE ctid >= '(0,1)' AND ctid <= '(400,1)' LIMIT 1;
+
+EXPLAIN (COSTS OFF)
+SELECT t.ctid,t2.c FROM tenk1 t,
+LATERAL (SELECT count(*) c FROM tenk1 t2 WHERE t2.ctid <= t.ctid) t2
+WHERE t.ctid < '(1,0)' LIMIT 1;
+
 rollback;
-- 
2.17.1

