From 79e818f8041aeb57ae154e916b1844cb9fe427b9 Mon Sep 17 00:00:00 2001
From: Greg Nancarrow <gregn4422@gmail.com>
Date: Thu, 22 Oct 2020 14:58:56 +1100
Subject: [PATCH v3] Enable parallel SELECT for "INSERT INTO ... SELECT ...",
 where it is safe to do so.

Parallel SELECT can't be utilized in the following cases:
- INSERT statement uses ON CONFLICT ... DO UPDATE ...
- Target table is a foreign or temporary table
- Target table has a:
  - Parallel-unsafe trigger
  - Foreign key trigger (RI_TRIGGER_FK)
  - Parallel-unsafe index expression
  - Parallel-unsafe column default expression
  - Parallel-unsafe check constraint
- Partitioned table or partition with any of the above parallel-unsafe features
- Partitioned table with parallel-unsafe partition key expressions or support functions

Discussion: https://postgr.es/m/CAJcOf-cXnB5cnMKqWEp2E2z7Mvcd04iLVmV=qpFJrR3AcrTS3g@mail.gmail.com
---
 src/backend/access/transam/xact.c    |  13 ++
 src/backend/executor/execMain.c      |  10 +
 src/backend/optimizer/plan/planner.c |  23 +-
 src/backend/optimizer/util/clauses.c | 416 +++++++++++++++++++++++++++++++++++
 src/include/access/xact.h            |  15 ++
 src/include/optimizer/clauses.h      |   1 +
 6 files changed, 473 insertions(+), 5 deletions(-)

diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index af6afce..7c37be8 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -1015,6 +1015,19 @@ IsInParallelMode(void)
 }
 
 /*
+ *	PrepareParallelModeForModify
+ *
+ * Prepare for entering parallel mode by assigning a FullTransactionId, to be
+ * included in the transaction state that is serialized in the parallel DSM.
+ */
+void PrepareParallelModeForModify(CmdType commandType)
+{
+	Assert(!IsInParallelMode());
+
+	(void)GetCurrentFullTransactionId();
+}
+
+/*
  *	CommandCounterIncrement
  */
 void
diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c
index aea0479..1a2a675 100644
--- a/src/backend/executor/execMain.c
+++ b/src/backend/executor/execMain.c
@@ -1526,7 +1526,17 @@ ExecutePlan(EState *estate,
 
 	estate->es_use_parallel_mode = use_parallel_mode;
 	if (use_parallel_mode)
+	{
+		/*
+		 * Supported table-modification commands may require additional steps
+		 * prior to entering parallel mode, such as assigning a FullTransactionId.
+		 */
+		if (IsModifySupportedInParallelMode(estate->es_plannedstmt->commandType))
+		{
+			PrepareParallelModeForModify(estate->es_plannedstmt->commandType);
+		}
 		EnterParallelMode();
+	}
 
 	/*
 	 * Loop until we've processed the proper number of tuples from the plan.
diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c
index 986d7a5..7c8c3db 100644
--- a/src/backend/optimizer/plan/planner.c
+++ b/src/backend/optimizer/plan/planner.c
@@ -318,11 +318,11 @@ standard_planner(Query *parse, const char *query_string, int cursorOptions,
 	/*
 	 * Assess whether it's feasible to use parallel mode for this query. We
 	 * can't do this in a standalone backend, or if the command will try to
-	 * modify any data, or if this is a cursor operation, or if GUCs are set
-	 * to values that don't permit parallelism, or if parallel-unsafe
-	 * functions are present in the query tree.
+	 * modify any data using a CTE, or if this is a cursor operation, or if
+	 * GUCs are set to values that don't permit parallelism, or if
+	 * parallel-unsafe functions are present in the query tree.
 	 *
-	 * (Note that we do allow CREATE TABLE AS, SELECT INTO, and CREATE
+	 * (Note that we do allow CREATE TABLE AS, INSERT, SELECT INTO, and CREATE
 	 * MATERIALIZED VIEW to use parallel plans, but as of now, only the leader
 	 * backend writes into a completely new table.  In the future, we can
 	 * extend it to allow workers to write into the table.  However, to allow
@@ -336,7 +336,8 @@ standard_planner(Query *parse, const char *query_string, int cursorOptions,
 	 */
 	if ((cursorOptions & CURSOR_OPT_PARALLEL_OK) != 0 &&
 		IsUnderPostmaster &&
-		parse->commandType == CMD_SELECT &&
+		(parse->commandType == CMD_SELECT ||
+			IsModifySupportedInParallelMode(parse->commandType)) &&
 		!parse->hasModifyingCTE &&
 		max_parallel_workers_per_gather > 0 &&
 		!IsParallelWorker())
@@ -344,6 +345,18 @@ standard_planner(Query *parse, const char *query_string, int cursorOptions,
 		/* all the cheap tests pass, so scan the query tree */
 		glob->maxParallelHazard = max_parallel_hazard(parse);
 		glob->parallelModeOK = (glob->maxParallelHazard != PROPARALLEL_UNSAFE);
+
+		/*
+		 * Additional parallel-mode safety checks are required in order to
+		 * allowing an underlying parallel query to be used for a
+		 * supported table-modification command.
+		 */
+		if (glob->parallelModeOK &&
+			IsModifySupportedInParallelMode(parse->commandType))
+		{
+			glob->maxParallelHazard = MaxParallelHazardForModify(parse, &glob->maxParallelHazard);
+			glob->parallelModeOK = (glob->maxParallelHazard != PROPARALLEL_UNSAFE);
+		}
 	}
 	else
 	{
diff --git a/src/backend/optimizer/util/clauses.c b/src/backend/optimizer/util/clauses.c
index e7d8146..7a2b7dc 100644
--- a/src/backend/optimizer/util/clauses.c
+++ b/src/backend/optimizer/util/clauses.c
@@ -19,13 +19,19 @@
 
 #include "postgres.h"
 
+#include "access/genam.h"
 #include "access/htup_details.h"
+#include "access/table.h"
+#include "catalog/index.h"
+#include "catalog/indexing.h"
 #include "catalog/pg_aggregate.h"
 #include "catalog/pg_class.h"
+#include "catalog/pg_constraint.h"
 #include "catalog/pg_language.h"
 #include "catalog/pg_operator.h"
 #include "catalog/pg_proc.h"
 #include "catalog/pg_type.h"
+#include "commands/trigger.h"
 #include "executor/executor.h"
 #include "executor/functions.h"
 #include "funcapi.h"
@@ -42,7 +48,11 @@
 #include "parser/parse_agg.h"
 #include "parser/parse_coerce.h"
 #include "parser/parse_func.h"
+#include "parser/parsetree.h"
+#include "partitioning/partdesc.h"
+#include "rewrite/rewriteHandler.h"
 #include "rewrite/rewriteManip.h"
+#include "storage/lmgr.h"
 #include "tcop/tcopprot.h"
 #include "utils/acl.h"
 #include "utils/builtins.h"
@@ -50,6 +60,8 @@
 #include "utils/fmgroids.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
+#include "utils/partcache.h"
+#include "utils/rel.h"
 #include "utils/syscache.h"
 #include "utils/typcache.h"
 
@@ -157,6 +169,13 @@ static Query *substitute_actual_srf_parameters(Query *expr,
 static Node *substitute_actual_srf_parameters_mutator(Node *node,
 													  substitute_actual_srf_parameters_context *context);
 
+static char MaxTriggerDataParallelHazardForModify(TriggerDesc *trigdesc,
+													max_parallel_hazard_context *context);
+static char MaxIndexExprsParallelHazardForModify(Relation rel,
+													max_parallel_hazard_context *context);
+static char MaxDomainParallelHazardForModify(Oid typid, max_parallel_hazard_context *context);
+static char MaxRelParallelHazardForModify(Oid relid, CmdType commandType,
+											max_parallel_hazard_context *context);
 
 /*****************************************************************************
  *		Aggregate-function clause manipulation
@@ -1073,6 +1092,403 @@ max_parallel_hazard_walker(Node *node, max_parallel_hazard_context *context)
 								  context);
 }
 
+/*
+ * IsModifySupportedInParallelMode
+ *
+ * Indicates whether execution of the specified table-modification command
+ * (INSERT/UPDATE/DELETE) in parallel-mode is supported, subject to certain
+ * conditions.
+ */
+static pg_attribute_always_inline bool
+IsModifySupportedInParallelMode(CmdType commandType)
+{
+	/* Currently only INSERT is supported */
+	return (commandType == CMD_INSERT);
+}
+
+/*
+ * MaxTriggerDataParallelHazardForModify
+ *
+ * Finds the maximum parallel-mode hazard level for the specified trigger data.
+ */
+static char
+MaxTriggerDataParallelHazardForModify(TriggerDesc *trigdesc,
+										max_parallel_hazard_context *context)
+{
+	int	i;
+
+	for (i = 0; i < trigdesc->numtriggers; i++)
+	{
+		Trigger    *trigger = &trigdesc->triggers[i];
+		int 		trigtype;
+
+		if (max_parallel_hazard_test(func_parallel(trigger->tgfoid), context))
+			break;
+
+		/*
+		 * If the trigger type is RI_TRIGGER_FK, this indicates a FK exists in
+		 * the relation, and this would result in creation of new CommandIds
+		 * on insert/update/delete and this isn't supported during
+		 * parallel-mode.
+		 */
+		trigtype = RI_FKey_trigger_type(trigger->tgfoid);
+		if (trigtype == RI_TRIGGER_FK)
+		{
+			context->max_hazard = PROPARALLEL_UNSAFE;
+			break;
+		}
+	}
+
+	return context->max_hazard;
+}
+
+/*
+ * MaxIndexExprsParallelHazardForModify
+ *
+ * Finds the maximum parallel-mode hazard level for any existing index
+ * expressions of a specified relation.
+ */
+static char
+MaxIndexExprsParallelHazardForModify(Relation rel,
+										max_parallel_hazard_context *context)
+{
+	List		*indexOidList;
+	ListCell	*lc;
+	LOCKMODE	lockmode = AccessShareLock;
+
+	indexOidList = RelationGetIndexList(rel);
+	foreach(lc, indexOidList)
+	{
+		Oid			indexOid = lfirst_oid(lc);
+		Relation	indexRel;
+		IndexInfo	*indexInfo;
+
+		if (ConditionalLockRelationOid(indexOid, lockmode))
+		{
+			indexRel = index_open(indexOid, NoLock);
+		}
+		else
+		{
+			context->max_hazard = PROPARALLEL_UNSAFE;
+			return context->max_hazard;
+		}
+
+		indexInfo = BuildIndexInfo(indexRel);
+
+		if (indexInfo->ii_Expressions != NIL)
+		{
+			int 		i;
+			ListCell	*indexExprItem = list_head(indexInfo->ii_Expressions);
+
+			for (i = 0; i < indexInfo->ii_NumIndexAttrs; i++)
+			{
+				int	keycol = indexInfo->ii_IndexAttrNumbers[i];
+				if (keycol == 0)
+				{
+					/* Found an index expression */
+
+					Node *indexExpr;
+
+					if (indexExprItem == NULL)	/* shouldn't happen */
+						elog(ERROR, "too few entries in indexprs list");
+
+					indexExpr = (Node *)lfirst(indexExprItem);
+					indexExpr = (Node *)expression_planner((Expr *)indexExpr);
+
+					if (max_parallel_hazard_walker(indexExpr, context) == PROPARALLEL_UNSAFE)
+					{
+						index_close(indexRel, lockmode);
+						return context->max_hazard;
+					}
+
+					indexExprItem = lnext(indexInfo->ii_Expressions, indexExprItem);
+				}
+			}
+		}
+		index_close(indexRel, lockmode);
+	}
+
+	return context->max_hazard;
+}
+
+/*
+ * MaxDomainParallelHazardForModify
+ *
+ * Finds the maximum parallel-mode hazard level for the specified DOMAIN type.
+ * Only any CHECK expressions are examined for parallel safety.
+ * DEFAULT values of DOMAIN-type columns in the target-list are already
+ * being checked for parallel-safety in the max_parallel_hazard() scan of the
+ * query tree in standard_planner().
+ *
+ */
+static char
+MaxDomainParallelHazardForModify(Oid typid, max_parallel_hazard_context *context)
+{
+	Relation conRel;
+	ScanKeyData key[1];
+	SysScanDesc scan;
+	HeapTuple	tup;
+
+	LOCKMODE	lockmode = AccessShareLock;
+
+	conRel = table_open(ConstraintRelationId, lockmode);
+
+	ScanKeyInit(&key[0],
+				Anum_pg_constraint_contypid, BTEqualStrategyNumber,
+				F_OIDEQ, ObjectIdGetDatum(typid));
+	scan = systable_beginscan(conRel, ConstraintTypidIndexId, true,
+							  NULL, 1, key);
+
+	while (HeapTupleIsValid((tup = systable_getnext(scan))))
+	{
+		Form_pg_constraint con = (Form_pg_constraint) GETSTRUCT(tup);
+
+		if (con->contype == CONSTRAINT_CHECK)
+		{
+			char	*conbin;
+			Datum	val;
+			bool	isnull;
+			Expr	*checkExpr;
+
+			val = SysCacheGetAttr(CONSTROID, tup,
+						Anum_pg_constraint_conbin, &isnull);
+			if (isnull)
+				elog(ERROR, "null conbin for constraint %u", con->oid);
+			conbin = TextDatumGetCString(val);
+			checkExpr = stringToNode(conbin);
+			if (max_parallel_hazard_walker((Node *)checkExpr, context))
+			{
+				break;
+			}
+		}
+	}
+
+	systable_endscan(scan);
+	table_close(conRel, lockmode);
+	return context->max_hazard;
+}
+
+/*
+ * MaxRelParallelHazardForModify
+ *
+ * Determines the maximum parallel-mode hazard level for modification
+ * of a specified relation.
+ */
+static char
+MaxRelParallelHazardForModify(Oid relid,
+								CmdType commandType,
+								max_parallel_hazard_context *context)
+{
+	Relation        rel;
+	TupleDesc		tupdesc;
+	int				attnum;
+
+	LOCKMODE		lockmode = AccessShareLock;
+
+	/*
+	 * It's possible that this relation is locked for exclusive access
+	 * in another concurrent transaction (e.g. as a result of a
+	 * ALTER TABLE ... operation) until that transaction completes.
+	 * If a share-lock can't be acquired on it now, we have to assume this
+	 * could be the worst-case, so to avoid blocking here until that
+	 * transaction completes, conditionally try to acquire the lock and
+	 * assume and return UNSAFE on failure.
+	 */
+	if (ConditionalLockRelationOid(relid, lockmode))
+	{
+		rel = table_open(relid, NoLock);
+	}
+	else
+	{
+		context->max_hazard = PROPARALLEL_UNSAFE;
+		return context->max_hazard;
+	}
+
+	/*
+	 * We can't support table modification in parallel-mode if it's a
+	 * foreign table/partition (no FDW API for supporting parallel access)
+	 * or a temporary table.
+	 */
+	if (rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE ||
+		RelationUsesLocalBuffers(rel))
+	{
+		table_close(rel, lockmode);
+		context->max_hazard = PROPARALLEL_UNSAFE;
+		return context->max_hazard;
+	}
+
+	/*
+	 * If a partitioned table, check that each partition is safe for
+	 * modification in parallel-mode.
+	 */
+	if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+	{
+		int i;
+		PartitionDesc pdesc;
+		PartitionKey pkey;
+		ListCell *partexprs_item;
+		int	partnatts;
+		List *partexprs;
+
+		pkey = RelationGetPartitionKey(rel);
+
+		partnatts = get_partition_natts(pkey);
+		partexprs = get_partition_exprs(pkey);
+
+		partexprs_item = list_head(partexprs);
+		for (i = 0; i < partnatts; i++)
+		{
+			/* Check parallel-safety of partition key support functions */
+			if (OidIsValid(pkey->partsupfunc[i].fn_oid))
+			{
+				if (max_parallel_hazard_test(func_parallel(pkey->partsupfunc[i].fn_oid), context))
+				{
+					table_close(rel, lockmode);
+					return context->max_hazard;
+				}
+			}
+
+			/* Check parallel-safety of any expressions in the partition key */
+			if (get_partition_col_attnum(pkey, i) == 0)
+			{
+				Node *checkExpr = (Node *) lfirst(partexprs_item);
+				if (max_parallel_hazard_walker(checkExpr, context))
+				{
+					table_close(rel, lockmode);
+					return context->max_hazard;
+				}
+
+				partexprs_item = lnext(partexprs, partexprs_item);
+			}
+		}
+
+		/* Recursively check each partition ... */
+		pdesc = RelationGetPartitionDesc(rel);
+		for (i = 0; i < pdesc->nparts; i++)
+		{
+			if (MaxRelParallelHazardForModify(pdesc->oids[i], commandType, context) == PROPARALLEL_UNSAFE)
+			{
+				table_close(rel, lockmode);
+				return context->max_hazard;
+			}
+		}
+	}
+
+	/*
+	 * If there are any index expressions, check that they are parallel-mode
+	 * safe.
+	 */
+	if (MaxIndexExprsParallelHazardForModify(rel, context) == PROPARALLEL_UNSAFE)
+	{
+		table_close(rel, lockmode);
+		return context->max_hazard;
+	}
+
+	/*
+	 * If any triggers exist, check that they are parallel safe.
+	 */
+	if (rel->trigdesc != NULL &&
+		MaxTriggerDataParallelHazardForModify(rel->trigdesc, context) == PROPARALLEL_UNSAFE)
+	{
+		table_close(rel, lockmode);
+		return context->max_hazard;
+	}
+
+	if (commandType == CMD_INSERT || commandType == CMD_UPDATE)
+	{
+		/*
+		 * Column default expressions for columns in the target-list are already
+		 * being checked for parallel-safety in the max_parallel_hazard() scan of the
+		 * query tree in standard_planner().
+		 */
+
+		tupdesc = RelationGetDescr(rel);
+		for (attnum = 0; attnum < tupdesc->natts; attnum++)
+		{
+			Form_pg_attribute att = TupleDescAttr(tupdesc, attnum);
+
+			/* We don't need info for dropped or generated attributes */
+			if (att->attisdropped || att->attgenerated)
+				continue;
+
+			/*
+			 * If the column is of a DOMAIN type, determine whether that domain
+			 * has any CHECK expressions that are not parallel-mode safe.
+			*/
+			if (get_typtype(att->atttypid) == TYPTYPE_DOMAIN)
+			{
+				if (MaxDomainParallelHazardForModify(att->atttypid, context) == PROPARALLEL_UNSAFE)
+				{
+					table_close(rel, lockmode);
+					return context->max_hazard;
+				}
+			}
+		}
+	}
+
+	/*
+	 * Check if there are any CHECK constraints which are not parallel-safe.
+	 */
+	if ((commandType == CMD_INSERT || commandType == CMD_UPDATE) &&
+			tupdesc->constr != NULL &&
+			tupdesc->constr->num_check > 0)
+	{
+		int i;
+
+		ConstrCheck *check = tupdesc->constr->check;
+
+		for (i = 0; i < tupdesc->constr->num_check; i++)
+		{
+			Expr *checkExpr = stringToNode(check->ccbin);
+			if (max_parallel_hazard_walker((Node *)checkExpr, context))
+			{
+				table_close(rel, lockmode);
+				return context->max_hazard;
+			}
+		}
+	}
+
+	table_close(rel, lockmode);
+	return context->max_hazard;
+}
+
+/*
+ * MaxParallelHazardForModify
+ *
+ * Determines the worst parallel-mode hazard level for the specified
+ * table-modification statement, based on the statement attributes and
+ * target table. An initial max parallel hazard level may optionally be
+ * supplied. The search returns the earliest in the following list:
+ * PROPARALLEL_UNSAFE, PROPARALLEL_RESTRICTED, PROPARALLEL_SAFE
+ */
+char
+MaxParallelHazardForModify(Query *parse, const char *initialMaxParallelHazard)
+{
+	RangeTblEntry				*rte;
+	max_parallel_hazard_context	context;
+
+
+	/*
+	 * UPDATE is not currently supported in parallel-mode, so prohibit
+	 * INSERT...ON CONFLICT...DO UPDATE...
+	 */
+	if (parse->onConflict != NULL && parse->onConflict->action == ONCONFLICT_UPDATE)
+		return PROPARALLEL_UNSAFE;
+
+	/*
+	 * Setup the context used in finding the max parallel-mode hazard.
+	 */
+	Assert(initialMaxParallelHazard == NULL ||
+			*initialMaxParallelHazard == PROPARALLEL_SAFE ||
+			*initialMaxParallelHazard == PROPARALLEL_RESTRICTED);
+	context.max_hazard = initialMaxParallelHazard == NULL ?
+							PROPARALLEL_SAFE : *initialMaxParallelHazard;
+	context.max_interesting = PROPARALLEL_UNSAFE;
+	context.safe_param_ids = NIL;
+
+	rte = rt_fetch(parse->resultRelation, parse->rtable);
+	return (MaxRelParallelHazardForModify(rte->relid, parse->commandType, &context));
+}
 
 /*****************************************************************************
  *		Check clauses for nonstrict functions
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index 7320de3..a926fff 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -466,5 +466,20 @@ extern void ParsePrepareRecord(uint8 info, xl_xact_prepare *xlrec, xl_xact_parse
 extern void EnterParallelMode(void);
 extern void ExitParallelMode(void);
 extern bool IsInParallelMode(void);
+extern void PrepareParallelModeForModify(CmdType commandType);
+
+/*
+ * IsModifySupportedInParallelMode
+ *
+ * Indicates whether execution of the specified table-modification command
+ * (INSERT/UPDATE/DELETE) in parallel-mode is supported, subject to certain
+ * parallel-safety conditions.
+ */
+static inline bool
+IsModifySupportedInParallelMode(CmdType commandType)
+{
+	/* Currently only INSERT is supported */
+	return (commandType == CMD_INSERT);
+}
 
 #endif							/* XACT_H */
diff --git a/src/include/optimizer/clauses.h b/src/include/optimizer/clauses.h
index 7ef8cce..12662eb 100644
--- a/src/include/optimizer/clauses.h
+++ b/src/include/optimizer/clauses.h
@@ -55,5 +55,6 @@ extern void CommuteOpExpr(OpExpr *clause);
 
 extern Query *inline_set_returning_function(PlannerInfo *root,
 											RangeTblEntry *rte);
+extern char MaxParallelHazardForModify(Query *parse, const char *initialMaxParallelHazard);
 
 #endif							/* CLAUSES_H */
-- 
1.8.3.1

