>From f3afb1afb7bb5967d37311b2210071b668caa52f Mon Sep 17 00:00:00 2001
From: Cary Huang <cary.huang@highgo.ca>
Date: Fri, 3 May 2024 10:34:40 -0700
Subject: [PATCH] v2 parallel tid range scan: 1) fixed incorrect query output
 of parallel tid range scan by disabling syncscan in such case 2) reused
 tidrangequals computed from regular tid range scan instead of creating
 another 3) removed unused min and max tid when initializing parallel scan
 context

---
 src/backend/access/table/tableam.c            | 28 +++++++
 src/backend/executor/execParallel.c           | 20 +++++
 src/backend/executor/nodeTidrangescan.c       | 81 +++++++++++++++++++
 src/backend/optimizer/path/costsize.c         | 15 ++++
 src/backend/optimizer/path/tidpath.c          | 18 ++++-
 src/backend/optimizer/util/pathnode.c         |  7 +-
 src/include/access/tableam.h                  | 10 +++
 src/include/executor/nodeTidrangescan.h       |  7 ++
 src/include/nodes/execnodes.h                 |  1 +
 src/include/optimizer/pathnode.h              |  3 +-
 src/test/regress/expected/select_parallel.out | 51 ++++++++++++
 src/test/regress/sql/select_parallel.sql      | 15 ++++
 12 files changed, 251 insertions(+), 5 deletions(-)

diff --git a/src/backend/access/table/tableam.c b/src/backend/access/table/tableam.c
index e57a0b7ea3..8fd0f10c08 100644
--- a/src/backend/access/table/tableam.c
+++ b/src/backend/access/table/tableam.c
@@ -187,6 +187,34 @@ table_beginscan_parallel(Relation relation, ParallelTableScanDesc pscan)
 											pscan, flags);
 }
 
+TableScanDesc
+table_beginscan_parallel_tidrange(Relation relation, ParallelTableScanDesc pscan)
+{
+	Snapshot	snapshot;
+	uint32		flags = SO_TYPE_TIDRANGESCAN | SO_ALLOW_PAGEMODE;
+	TableScanDesc sscan;
+
+	Assert(RelationGetRelid(relation) == pscan->phs_relid);
+
+	if (!pscan->phs_snapshot_any)
+	{
+		/* Snapshot was serialized -- restore it */
+		snapshot = RestoreSnapshot((char *) pscan + pscan->phs_snapshot_off);
+		RegisterSnapshot(snapshot);
+		flags |= SO_TEMP_SNAPSHOT;
+	}
+	else
+	{
+		/* SnapshotAny passed by caller (not serialized) */
+		snapshot = SnapshotAny;
+	}
+
+	sscan = relation->rd_tableam->scan_begin(relation, snapshot, 0, NULL,
+											 pscan, flags);
+
+	return sscan;
+}
+
 
 /* ----------------------------------------------------------------------------
  * Index scan related functions.
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c
index 8c53d1834e..e4733ca5a3 100644
--- a/src/backend/executor/execParallel.c
+++ b/src/backend/executor/execParallel.c
@@ -40,6 +40,7 @@
 #include "executor/nodeSort.h"
 #include "executor/nodeSubplan.h"
 #include "executor/tqueue.h"
+#include "executor/nodeTidrangescan.h"
 #include "jit/jit.h"
 #include "nodes/nodeFuncs.h"
 #include "pgstat.h"
@@ -296,6 +297,11 @@ ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e)
 			/* even when not parallel-aware, for EXPLAIN ANALYZE */
 			ExecMemoizeEstimate((MemoizeState *) planstate, e->pcxt);
 			break;
+		case T_TidRangeScanState:
+			if (planstate->plan->parallel_aware)
+				ExecTidRangeScanEstimate((TidRangeScanState *) planstate,
+									e->pcxt);
+			break;
 		default:
 			break;
 	}
@@ -520,6 +526,11 @@ ExecParallelInitializeDSM(PlanState *planstate,
 			/* even when not parallel-aware, for EXPLAIN ANALYZE */
 			ExecMemoizeInitializeDSM((MemoizeState *) planstate, d->pcxt);
 			break;
+		case T_TidRangeScanState:
+			if (planstate->plan->parallel_aware)
+				ExecTidRangeScanInitializeDSM((TidRangeScanState *) planstate,
+										 d->pcxt);
+			break;
 		default:
 			break;
 	}
@@ -1006,6 +1017,11 @@ ExecParallelReInitializeDSM(PlanState *planstate,
 		case T_MemoizeState:
 			/* these nodes have DSM state, but no reinitialization is required */
 			break;
+		case T_TidRangeScanState:
+			if (planstate->plan->parallel_aware)
+				ExecTidRangeScanReInitializeDSM((TidRangeScanState *) planstate,
+										   pcxt);
+			break;
 
 		default:
 			break;
@@ -1372,6 +1388,10 @@ ExecParallelInitializeWorker(PlanState *planstate, ParallelWorkerContext *pwcxt)
 			/* even when not parallel-aware, for EXPLAIN ANALYZE */
 			ExecMemoizeInitializeWorker((MemoizeState *) planstate, pwcxt);
 			break;
+		case T_TidRangeScanState:
+			if (planstate->plan->parallel_aware)
+				ExecTidRangeScanInitializeWorker((TidRangeScanState *) planstate, pwcxt);
+			break;
 		default:
 			break;
 	}
diff --git a/src/backend/executor/nodeTidrangescan.c b/src/backend/executor/nodeTidrangescan.c
index 9aa7683d7e..b1553b990d 100644
--- a/src/backend/executor/nodeTidrangescan.c
+++ b/src/backend/executor/nodeTidrangescan.c
@@ -403,3 +403,84 @@ ExecInitTidRangeScan(TidRangeScan *node, EState *estate, int eflags)
 	 */
 	return tidrangestate;
 }
+/* ----------------------------------------------------------------
+ *						Parallel Scan Support
+ * ----------------------------------------------------------------
+ */
+
+/* ----------------------------------------------------------------
+ *		ExecTidRangeScanEstimate
+ *
+ *		Compute the amount of space we'll need in the parallel
+ *		query DSM, and inform pcxt->estimator about our needs.
+ * ----------------------------------------------------------------
+ */
+void
+ExecTidRangeScanEstimate(TidRangeScanState *node,
+					ParallelContext *pcxt)
+{
+	EState	   *estate = node->ss.ps.state;
+
+	node->pscan_len = table_parallelscan_estimate(node->ss.ss_currentRelation,
+												  estate->es_snapshot);
+	shm_toc_estimate_chunk(&pcxt->estimator, node->pscan_len);
+	shm_toc_estimate_keys(&pcxt->estimator, 1);
+}
+
+/* ----------------------------------------------------------------
+ *		ExecTidRangeScanInitializeDSM
+ *
+ *		Set up a parallel heap scan descriptor.
+ * ----------------------------------------------------------------
+ */
+void
+ExecTidRangeScanInitializeDSM(TidRangeScanState *node,
+						 ParallelContext *pcxt)
+{
+	EState	   *estate = node->ss.ps.state;
+	ParallelTableScanDesc pscan;
+
+	pscan = shm_toc_allocate(pcxt->toc, node->pscan_len);
+	table_parallelscan_initialize(node->ss.ss_currentRelation,
+								  pscan,
+								  estate->es_snapshot);
+	/* disable syncscan in parallel tid range scan */
+	pscan->phs_syncscan = false;
+	shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id, pscan);
+	node->ss.ss_currentScanDesc =
+		table_beginscan_parallel_tidrange(node->ss.ss_currentRelation, pscan);
+}
+
+/* ----------------------------------------------------------------
+ *		ExecTidRangeScanReInitializeDSM
+ *
+ *		Reset shared state before beginning a fresh scan.
+ * ----------------------------------------------------------------
+ */
+void
+ExecTidRangeScanReInitializeDSM(TidRangeScanState *node,
+						   ParallelContext *pcxt)
+{
+	ParallelTableScanDesc pscan;
+
+	pscan = node->ss.ss_currentScanDesc->rs_parallel;
+	table_parallelscan_reinitialize(node->ss.ss_currentRelation, pscan);
+
+}
+
+/* ----------------------------------------------------------------
+ *		ExecSeqScanInitializeWorker
+ *
+ *		Copy relevant information from TOC into planstate.
+ * ----------------------------------------------------------------
+ */
+void
+ExecTidRangeScanInitializeWorker(TidRangeScanState *node,
+							ParallelWorkerContext *pwcxt)
+{
+	ParallelTableScanDesc pscan;
+
+	pscan = shm_toc_lookup(pwcxt->toc, node->ss.ps.plan->plan_node_id, false);
+	node->ss.ss_currentScanDesc =
+		table_beginscan_parallel_tidrange(node->ss.ss_currentRelation, pscan);
+}
diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c
index ee23ed7835..fee603d048 100644
--- a/src/backend/optimizer/path/costsize.c
+++ b/src/backend/optimizer/path/costsize.c
@@ -1435,6 +1435,21 @@ cost_tidrangescan(Path *path, PlannerInfo *root,
 	startup_cost += path->pathtarget->cost.startup;
 	run_cost += path->pathtarget->cost.per_tuple * path->rows;
 
+	/* Adjust costing for parallelism, if used. */
+	if (path->parallel_workers > 0)
+	{
+		double		parallel_divisor = get_parallel_divisor(path);
+
+		/* The CPU cost is divided among all the workers. */
+		run_cost /= parallel_divisor;
+
+		/*
+		 * In the case of a parallel plan, the row count needs to represent
+		 * the number of tuples processed per worker.
+		 */
+		path->rows = clamp_row_est(path->rows / parallel_divisor);
+	}
+
 	path->startup_cost = startup_cost;
 	path->total_cost = startup_cost + run_cost;
 }
diff --git a/src/backend/optimizer/path/tidpath.c b/src/backend/optimizer/path/tidpath.c
index 2ae5ddfe43..3c52ef911e 100644
--- a/src/backend/optimizer/path/tidpath.c
+++ b/src/backend/optimizer/path/tidpath.c
@@ -46,6 +46,7 @@
 #include "optimizer/pathnode.h"
 #include "optimizer/paths.h"
 #include "optimizer/restrictinfo.h"
+#include "optimizer/cost.h"
 
 
 /*
@@ -496,7 +497,22 @@ create_tidscan_paths(PlannerInfo *root, RelOptInfo *rel)
 
 		add_path(rel, (Path *) create_tidrangescan_path(root, rel,
 														tidrangequals,
-														required_outer));
+														required_outer,
+														0));
+
+		/* If appropriate, consider parallel tid range scan */
+		if (rel->consider_parallel && required_outer == NULL)
+		{
+			int			parallel_workers;
+
+			parallel_workers = compute_parallel_worker(rel, rel->pages, -1,
+													   max_parallel_workers_per_gather);
+			if (parallel_workers > 0)
+			{
+				add_partial_path(rel, (Path *) create_tidrangescan_path(root, rel, tidrangequals,
+								required_outer, parallel_workers));
+			}
+		}
 	}
 
 	/*
diff --git a/src/backend/optimizer/util/pathnode.c b/src/backend/optimizer/util/pathnode.c
index 3cf1dac087..7ceeaf8688 100644
--- a/src/backend/optimizer/util/pathnode.c
+++ b/src/backend/optimizer/util/pathnode.c
@@ -1206,7 +1206,8 @@ create_tidscan_path(PlannerInfo *root, RelOptInfo *rel, List *tidquals,
  */
 TidRangePath *
 create_tidrangescan_path(PlannerInfo *root, RelOptInfo *rel,
-						 List *tidrangequals, Relids required_outer)
+						 List *tidrangequals, Relids required_outer,
+						 int parallel_workers)
 {
 	TidRangePath *pathnode = makeNode(TidRangePath);
 
@@ -1215,9 +1216,9 @@ create_tidrangescan_path(PlannerInfo *root, RelOptInfo *rel,
 	pathnode->path.pathtarget = rel->reltarget;
 	pathnode->path.param_info = get_baserel_parampathinfo(root, rel,
 														  required_outer);
-	pathnode->path.parallel_aware = false;
+	pathnode->path.parallel_aware = (parallel_workers > 0);
 	pathnode->path.parallel_safe = rel->consider_parallel;
-	pathnode->path.parallel_workers = 0;
+	pathnode->path.parallel_workers = parallel_workers;
 	pathnode->path.pathkeys = NIL;	/* always unordered */
 
 	pathnode->tidrangequals = tidrangequals;
diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h
index 8e583b45cd..2cffd813a5 100644
--- a/src/include/access/tableam.h
+++ b/src/include/access/tableam.h
@@ -1175,6 +1175,16 @@ extern void table_parallelscan_initialize(Relation rel,
 extern TableScanDesc table_beginscan_parallel(Relation relation,
 											  ParallelTableScanDesc pscan);
 
+/*
+ * Begin a parallel tidrange scan. `pscan` needs to have been initialized with
+ * table_parallelscan_initialize(), for the same relation. The initialization
+ * does not need to have happened in this backend.
+ *
+ * Caller must hold a suitable lock on the relation.
+ */
+extern TableScanDesc table_beginscan_parallel_tidrange(Relation relation,
+													   ParallelTableScanDesc pscan);
+
 /*
  * Restart a parallel scan.  Call this in the leader process.  Caller is
  * responsible for making sure that all workers have finished the scan
diff --git a/src/include/executor/nodeTidrangescan.h b/src/include/executor/nodeTidrangescan.h
index 1cfc7a07be..977cb8eb6e 100644
--- a/src/include/executor/nodeTidrangescan.h
+++ b/src/include/executor/nodeTidrangescan.h
@@ -14,6 +14,7 @@
 #ifndef NODETIDRANGESCAN_H
 #define NODETIDRANGESCAN_H
 
+#include "access/parallel.h"
 #include "nodes/execnodes.h"
 
 extern TidRangeScanState *ExecInitTidRangeScan(TidRangeScan *node,
@@ -21,4 +22,10 @@ extern TidRangeScanState *ExecInitTidRangeScan(TidRangeScan *node,
 extern void ExecEndTidRangeScan(TidRangeScanState *node);
 extern void ExecReScanTidRangeScan(TidRangeScanState *node);
 
+/* parallel scan support */
+extern void ExecTidRangeScanEstimate(TidRangeScanState *node, ParallelContext *pcxt);
+extern void ExecTidRangeScanInitializeDSM(TidRangeScanState *node, ParallelContext *pcxt);
+extern void ExecTidRangeScanReInitializeDSM(TidRangeScanState *node, ParallelContext *pcxt);
+extern void ExecTidRangeScanInitializeWorker(TidRangeScanState *node, ParallelWorkerContext *pwcxt);
+
 #endif							/* NODETIDRANGESCAN_H */
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index d927ac44a8..81eec34730 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -1862,6 +1862,7 @@ typedef struct TidRangeScanState
 	ItemPointerData trss_mintid;
 	ItemPointerData trss_maxtid;
 	bool		trss_inScan;
+	Size		pscan_len;		/* size of parallel tid range scan descriptor */
 } TidRangeScanState;
 
 /* ----------------
diff --git a/src/include/optimizer/pathnode.h b/src/include/optimizer/pathnode.h
index c5c4756b0f..d7683ec1c3 100644
--- a/src/include/optimizer/pathnode.h
+++ b/src/include/optimizer/pathnode.h
@@ -66,7 +66,8 @@ extern TidPath *create_tidscan_path(PlannerInfo *root, RelOptInfo *rel,
 extern TidRangePath *create_tidrangescan_path(PlannerInfo *root,
 											  RelOptInfo *rel,
 											  List *tidrangequals,
-											  Relids required_outer);
+											  Relids required_outer,
+											  int parallel_workers);
 extern AppendPath *create_append_path(PlannerInfo *root, RelOptInfo *rel,
 									  List *subpaths, List *partial_subpaths,
 									  List *pathkeys, Relids required_outer,
diff --git a/src/test/regress/expected/select_parallel.out b/src/test/regress/expected/select_parallel.out
index 87273fa635..61e6700194 100644
--- a/src/test/regress/expected/select_parallel.out
+++ b/src/test/regress/expected/select_parallel.out
@@ -1293,4 +1293,55 @@ SELECT 1 FROM tenk1_vw_sec
                  Filter: (f1 < tenk1_vw_sec.unique1)
 (9 rows)
 
+-- test parallel tid range scan
+EXPLAIN (COSTS OFF)
+SELECT ctid FROM tenk1 WHERE ctid > '(0,1)' LIMIT 1;
+                  QUERY PLAN                   
+-----------------------------------------------
+ Limit
+   ->  Gather
+         Workers Planned: 4
+         ->  Parallel Tid Range Scan on tenk1
+               TID Cond: (ctid > '(0,1)'::tid)
+(5 rows)
+
+EXPLAIN (COSTS OFF)
+SELECT ctid FROM tenk1 WHERE ctid < '(400,1)' LIMIT 1;
+                   QUERY PLAN                    
+-------------------------------------------------
+ Limit
+   ->  Gather
+         Workers Planned: 4
+         ->  Parallel Tid Range Scan on tenk1
+               TID Cond: (ctid < '(400,1)'::tid)
+(5 rows)
+
+EXPLAIN (COSTS OFF)
+SELECT ctid FROM tenk1 WHERE ctid >= '(0,1)' AND ctid <= '(400,1)' LIMIT 1;
+                                  QUERY PLAN                                   
+-------------------------------------------------------------------------------
+ Limit
+   ->  Gather
+         Workers Planned: 4
+         ->  Parallel Tid Range Scan on tenk1
+               TID Cond: ((ctid >= '(0,1)'::tid) AND (ctid <= '(400,1)'::tid))
+(5 rows)
+
+EXPLAIN (COSTS OFF)
+SELECT t.ctid,t2.c FROM tenk1 t,
+LATERAL (SELECT count(*) c FROM tenk1 t2 WHERE t2.ctid <= t.ctid) t2
+WHERE t.ctid < '(1,0)' LIMIT 1;
+                      QUERY PLAN                      
+------------------------------------------------------
+ Limit
+   ->  Nested Loop
+         ->  Gather
+               Workers Planned: 4
+               ->  Parallel Tid Range Scan on tenk1 t
+                     TID Cond: (ctid < '(1,0)'::tid)
+         ->  Aggregate
+               ->  Tid Range Scan on tenk1 t2
+                     TID Cond: (ctid <= t.ctid)
+(9 rows)
+
 rollback;
diff --git a/src/test/regress/sql/select_parallel.sql b/src/test/regress/sql/select_parallel.sql
index 20376c03fa..1d4ef68790 100644
--- a/src/test/regress/sql/select_parallel.sql
+++ b/src/test/regress/sql/select_parallel.sql
@@ -495,4 +495,19 @@ EXPLAIN (COSTS OFF)
 SELECT 1 FROM tenk1_vw_sec
   WHERE (SELECT sum(f1) FROM int4_tbl WHERE f1 < unique1) < 100;
 
+-- test parallel tid range scan
+EXPLAIN (COSTS OFF)
+SELECT ctid FROM tenk1 WHERE ctid > '(0,1)' LIMIT 1;
+
+EXPLAIN (COSTS OFF)
+SELECT ctid FROM tenk1 WHERE ctid < '(400,1)' LIMIT 1;
+
+EXPLAIN (COSTS OFF)
+SELECT ctid FROM tenk1 WHERE ctid >= '(0,1)' AND ctid <= '(400,1)' LIMIT 1;
+
+EXPLAIN (COSTS OFF)
+SELECT t.ctid,t2.c FROM tenk1 t,
+LATERAL (SELECT count(*) c FROM tenk1 t2 WHERE t2.ctid <= t.ctid) t2
+WHERE t.ctid < '(1,0)' LIMIT 1;
+
 rollback;
-- 
2.17.1

