From b8990055f96a052bc0d6609df59d072e02ec0db0 Mon Sep 17 00:00:00 2001
From: Greg Nancarrow <gregn4422@gmail.com>
Date: Thu, 15 Oct 2020 13:59:25 +1100
Subject: [PATCH v2] Enable parallel SELECT for "INSERT INTO ... SELECT ...".

Enable "INSERT INTO ... SELECT ..." to utilize a parallel SELECT, where it is safe to do so.

Parallel SELECT can't be utilized in the following cases:
- INSERT statement uses ON CONFLICT ... DO UPDATE ...
- Target table is a foreign or temporary table
- Target table has a:
  - Row-level trigger or transition-table trigger
  - Parallel-unsafe trigger
  - Foreign key trigger (RI_TRIGGER_FK)
  - Parallel-unsafe index expression
  - Parallel-unsafe column default expression
  - Parallel-unsafe check constraint
- Partitioned table or partition with any of the above parallel-unsafe features.

Discussion: https://postgr.es/m/CAJcOf-cXnB5cnMKqWEp2E2z7Mvcd04iLVmV=qpFJrR3AcrTS3g@mail.gmail.com
---
 src/backend/access/transam/varsup.c  |   5 +-
 src/backend/access/transam/xact.c    |   4 +-
 src/backend/optimizer/plan/planner.c | 295 ++++++++++++++++++++++++++++++++++-
 3 files changed, 294 insertions(+), 10 deletions(-)

diff --git a/src/backend/access/transam/varsup.c b/src/backend/access/transam/varsup.c
index a4944fa..925c875 100644
--- a/src/backend/access/transam/varsup.c
+++ b/src/backend/access/transam/varsup.c
@@ -15,6 +15,7 @@
 
 #include "access/clog.h"
 #include "access/commit_ts.h"
+#include "access/parallel.h"
 #include "access/subtrans.h"
 #include "access/transam.h"
 #include "access/xact.h"
@@ -56,8 +57,8 @@ GetNewTransactionId(bool isSubXact)
 	 * Workers synchronize transaction state at the beginning of each parallel
 	 * operation, so we can't account for new XIDs after that point.
 	 */
-	if (IsInParallelMode())
-		elog(ERROR, "cannot assign TransactionIds during a parallel operation");
+	if (IsParallelWorker())
+		elog(ERROR, "cannot assign TransactionIds in a parallel worker");
 
 	/*
 	 * During bootstrap initialization, we return the special bootstrap
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index af6afce..ef423fb 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -580,8 +580,8 @@ AssignTransactionId(TransactionState s)
 	 * Workers synchronize transaction state at the beginning of each parallel
 	 * operation, so we can't account for new XIDs at this point.
 	 */
-	if (IsInParallelMode() || IsParallelWorker())
-		elog(ERROR, "cannot assign XIDs during a parallel operation");
+	if (IsParallelWorker())
+		elog(ERROR, "cannot assign XIDs in a parallel worker");
 
 	/*
 	 * Ensure parent(s) have XIDs, so that a child always has an XID later
diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c
index f331f82..3bd2e22 100644
--- a/src/backend/optimizer/plan/planner.c
+++ b/src/backend/optimizer/plan/planner.c
@@ -24,10 +24,12 @@
 #include "access/sysattr.h"
 #include "access/table.h"
 #include "access/xact.h"
+#include "catalog/index.h"
 #include "catalog/pg_constraint.h"
 #include "catalog/pg_inherits.h"
 #include "catalog/pg_proc.h"
 #include "catalog/pg_type.h"
+#include "commands/trigger.h"
 #include "executor/executor.h"
 #include "executor/nodeAgg.h"
 #include "foreign/fdwapi.h"
@@ -58,6 +60,7 @@
 #include "parser/parse_agg.h"
 #include "parser/parsetree.h"
 #include "partitioning/partdesc.h"
+#include "rewrite/rewriteHandler.h"
 #include "rewrite/rewriteManip.h"
 #include "storage/dsm_impl.h"
 #include "utils/lsyscache.h"
@@ -248,7 +251,11 @@ static bool group_by_has_partkey(RelOptInfo *input_rel,
 								 List *targetList,
 								 List *groupClause);
 static int	common_prefix_cmp(const void *a, const void *b);
-
+static bool IsTriggerDataParallelModeSafe(TriggerDesc *trigdesc);
+static bool IsRelParallelModeSafeForModify(Oid relid);
+static bool AreIndexExprsParallelModeSafe(Relation rel);
+static bool IsParallelModeSafeForModify(Query *parse);
+static bool IsModifySupportedInParallelMode(CmdType commandType);
 
 /*****************************************************************************
  *
@@ -319,11 +326,11 @@ standard_planner(Query *parse, const char *query_string, int cursorOptions,
 	/*
 	 * Assess whether it's feasible to use parallel mode for this query. We
 	 * can't do this in a standalone backend, or if the command will try to
-	 * modify any data, or if this is a cursor operation, or if GUCs are set
-	 * to values that don't permit parallelism, or if parallel-unsafe
-	 * functions are present in the query tree.
+	 * modify any data using a CTE, or if this is a cursor operation, or if
+	 * GUCs are set to values that don't permit parallelism, or if
+	 * parallel-unsafe functions are present in the query tree.
 	 *
-	 * (Note that we do allow CREATE TABLE AS, SELECT INTO, and CREATE
+	 * (Note that we do allow CREATE TABLE AS, INSERT, SELECT INTO, and CREATE
 	 * MATERIALIZED VIEW to use parallel plans, but as of now, only the leader
 	 * backend writes into a completely new table.  In the future, we can
 	 * extend it to allow workers to write into the table.  However, to allow
@@ -337,7 +344,8 @@ standard_planner(Query *parse, const char *query_string, int cursorOptions,
 	 */
 	if ((cursorOptions & CURSOR_OPT_PARALLEL_OK) != 0 &&
 		IsUnderPostmaster &&
-		parse->commandType == CMD_SELECT &&
+		(parse->commandType == CMD_SELECT ||
+			IsModifySupportedInParallelMode(parse->commandType)) &&
 		!parse->hasModifyingCTE &&
 		max_parallel_workers_per_gather > 0 &&
 		!IsParallelWorker())
@@ -345,6 +353,14 @@ standard_planner(Query *parse, const char *query_string, int cursorOptions,
 		/* all the cheap tests pass, so scan the query tree */
 		glob->maxParallelHazard = max_parallel_hazard(parse);
 		glob->parallelModeOK = (glob->maxParallelHazard != PROPARALLEL_UNSAFE);
+
+		if (glob->parallelModeOK &&
+			IsModifySupportedInParallelMode(parse->commandType))
+		{
+			glob->parallelModeOK = IsParallelModeSafeForModify(parse);
+			if (!glob->parallelModeOK)
+				glob->maxParallelHazard = PROPARALLEL_UNSAFE;
+		}
 	}
 	else
 	{
@@ -7355,6 +7371,273 @@ can_partial_agg(PlannerInfo *root, const AggClauseCosts *agg_costs)
 }
 
 /*
+ * IsModifySupportedInParallelMode
+ *
+ * Indicates whether execution of the specified table-modification command
+ * (INSERT/UPDATE/DELETE) in parallel-mode is supported, subject to certain
+ * conditions.
+ */
+static pg_attribute_always_inline bool
+IsModifySupportedInParallelMode(CmdType commandType)
+{
+	/* Currently only INSERT is supported */
+	return (commandType == CMD_INSERT);
+}
+
+/*
+ * IsTriggerDataParallelModeSafe
+ *
+ * Checks if the specified trigger data is parallel-mode safe.
+ * Returns false if any one of the triggers are not safe for parallel
+ * operation.
+ */
+static bool
+IsTriggerDataParallelModeSafe(TriggerDesc *trigdesc)
+{
+	int	i;
+
+	/*
+	 * Can't support execution of row-level or transition-table triggers
+	 * during parallel-mode, since such triggers may query the table
+	 * into which the data is being inserted, and the content returned
+	 * would vary unpredictably according to the order of retrieval by
+	 * the workers and the rows already inserted.
+	 */
+	if (trigdesc != NULL &&
+		 (trigdesc->trig_insert_instead_row ||
+		  trigdesc->trig_insert_before_row ||
+		  trigdesc->trig_insert_after_row ||
+		  trigdesc->trig_insert_new_table))
+	{
+		return false;
+	}
+
+	for (i = 0; i < trigdesc->numtriggers; i++)
+	{
+		Trigger    *trigger = &trigdesc->triggers[i];
+		int 		trigtype;
+
+		if (func_parallel(trigger->tgfoid) != PROPARALLEL_SAFE)
+			return false;
+
+		/* If the trigger type is RI_TRIGGER_FK, this indicates a FK exists in
+		 * the relation, and this would result in creation of new CommandIds
+		 * on insert/update/delete and this isn't supported during
+		 * parallel-mode.
+		 */
+		trigtype = RI_FKey_trigger_type(trigger->tgfoid);
+		if (trigtype == RI_TRIGGER_FK)
+			return false;
+	}
+
+	return true;
+}
+
+/*
+ * AreIndexExprsParallelModeSafe
+ *
+ * Checks if index expressions for a specified relation are parallel-mode safe.
+ * Returns false if any index expressions exist which are not safe for parallel
+ * operation.
+ */
+static bool
+AreIndexExprsParallelModeSafe(Relation rel)
+{
+	List		*indexOidList;
+	ListCell	*lc;
+	LOCKMODE	lockmode = AccessShareLock;
+
+	indexOidList = RelationGetIndexList(rel);
+	foreach(lc, indexOidList)
+	{
+		Oid			indexOid = lfirst_oid(lc);
+		Relation	indexRel;
+		IndexInfo	*indexInfo;
+
+		indexRel = index_open(indexOid, lockmode);
+
+		indexInfo = BuildIndexInfo(indexRel);
+
+		if (indexInfo->ii_Expressions != NIL)
+		{
+			int 		i;
+			ListCell	*indexExprItem = list_head(indexInfo->ii_Expressions);
+
+			for (i = 0; i < indexInfo->ii_NumIndexAttrs; i++)
+			{
+				int	keycol = indexInfo->ii_IndexAttrNumbers[i];
+				if (keycol == 0)
+				{
+					/* Found an index expression */
+
+					Node *indexExpr;
+
+					if (indexExprItem == NULL)	/* shouldn't happen */
+						elog(ERROR, "too few entries in indexprs list");
+
+					indexExpr = (Node *)lfirst(indexExprItem);
+					indexExpr = (Node *)expression_planner((Expr *)indexExpr);
+
+					if (max_parallel_hazard((Query *)indexExpr) != PROPARALLEL_SAFE)
+					{
+						index_close(indexRel, lockmode);
+						return false;
+					}
+
+					indexExprItem = lnext(indexInfo->ii_Expressions, indexExprItem);
+				}
+			}
+		}
+		index_close(indexRel, lockmode);
+	}
+
+	return true;
+}
+
+/*
+ * IsRelParallelModeSafeForModify
+ *
+ * Determines whether a specified relation is safe for modification in
+ * parallel-mode.
+ */
+static bool
+IsRelParallelModeSafeForModify(Oid relid)
+{
+	Relation        rel;
+	TupleDesc		tupdesc;
+	int				attnum;
+
+	LOCKMODE		lockmode = AccessShareLock;
+
+	rel = table_open(relid, lockmode);
+
+	/*
+	 * We can't support table modification in parallel-mode if it's a
+	 * foreign table/partition (no FDW API for supporting parallel access)
+	 * or a temporary table.
+	 */
+	if (rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE ||
+		RelationUsesLocalBuffers(rel))
+	{
+		table_close(rel, lockmode);
+		return false;
+	}
+
+	/*
+	 * If a partitioned table, check that each partition is safe for
+	 * modification in parallel-mode.
+	 */
+	if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+	{
+		int i;
+		PartitionDesc pd = RelationGetPartitionDesc(rel);
+		for (i = 0; i < pd->nparts; i++)
+		{
+			if (!IsRelParallelModeSafeForModify(pd->oids[i]))
+			{
+				table_close(rel, lockmode);
+				return false;
+			}
+		}
+	}
+
+	/*
+	 * If there are any index expressions, check that they are parallel-mode
+	 * safe.
+	 */
+	if (!AreIndexExprsParallelModeSafe(rel))
+	{
+		table_close(rel, lockmode);
+		return false;
+	}
+
+	/*
+	 * If any triggers exist, check that they are parallel safe.
+	 */
+	if (rel->trigdesc != NULL &&
+		!IsTriggerDataParallelModeSafe(rel->trigdesc))
+	{
+		table_close(rel, lockmode);
+		return false;
+	}
+
+	/*
+	 * Check if there are any column default expressions which are not
+	 * parallel-mode safe.
+	 */
+	tupdesc = RelationGetDescr(rel);
+	for (attnum = 0; attnum < tupdesc->natts; attnum++)
+	{
+		Expr *defexpr;
+
+		Form_pg_attribute att = TupleDescAttr(tupdesc, attnum);
+
+		/* We don't need info for dropped or generated attributes */
+		if (att->attisdropped || att->attgenerated)
+			continue;
+
+		if (att->atthasdef)
+		{
+			defexpr = (Expr *)build_column_default(rel, attnum + 1);
+
+			/* Run the expression through planner */
+			defexpr = expression_planner(defexpr);
+
+			if (max_parallel_hazard((Query *)defexpr) != PROPARALLEL_SAFE)
+			{
+				table_close(rel, lockmode);
+				return false;
+			}
+		}
+	}
+
+	/*
+	 * Check if there are any CHECK constraints which are not parallel-safe.
+	 */
+	if (tupdesc->constr != NULL && tupdesc->constr->num_check > 0)
+	{
+		int i;
+
+		ConstrCheck *check = tupdesc->constr->check;
+
+		for (i = 0; i < tupdesc->constr->num_check; i++)
+		{
+			Expr *checkExpr = stringToNode(check->ccbin);
+			if (max_parallel_hazard((Query *)checkExpr) != PROPARALLEL_SAFE)
+			{
+				table_close(rel, lockmode);
+				return false;
+			}
+		}
+	}
+
+	table_close(rel, lockmode);
+	return true;
+}
+
+/*
+ * IsParallelModeSafeForModify
+ *
+ * Determines whether the specified table-modification statement is
+ * parallel-mode safe, based on the statement attributes and target table.
+ */
+static bool
+IsParallelModeSafeForModify(Query *parse)
+{
+	RangeTblEntry   *rte;
+
+	/*
+	 * UPDATE is not currently supported in parallel-mode, so prohibit
+	 * INSERT...ON CONFLICT...DO UPDATE...
+	 */
+	if (parse->onConflict != NULL && parse->onConflict->action == ONCONFLICT_UPDATE)
+		return false;
+
+	rte = rt_fetch(parse->resultRelation, parse->rtable);
+	return (IsRelParallelModeSafeForModify(rte->relid));
+}
+
+/*
  * apply_scanjoin_target_to_paths
  *
  * Adjust the final scan/join relation, and recursively all of its children,
-- 
1.8.3.1

