From 8e6032e626f4e2b35327dcb5a41142acd3e50868 Mon Sep 17 00:00:00 2001
From: Greg Nancarrow <gregn4422@gmail.com>
Date: Fri, 30 Oct 2020 00:56:02 +1100
Subject: [PATCH v6 2/2] Enable parallel INSERT and/or SELECT for "INSERT INTO
 ... SELECT ...", where it is safe to do so.

Parallel INSERT can't be utilized in the following cases:
- INSERT statement uses ON CONFLICT ... DO UPDATE ...
- Target table is a foreign or temporary table
- Target table has a:
  - Parallel-unsafe trigger
  - Foreign key trigger (RI_TRIGGER_FK)
  - Parallel-unsafe index expression
  - Parallel-unsafe column default expression
  - Parallel-unsafe check constraint
- Partitioned table or partition with any of the above parallel-unsafe features
- Partitioned table with parallel-unsafe partition key expressions or support
  functions

Where the above-mentioned target table features are parallel-restricted, rather
than parallel-unsafe, at least parallel SELECT may be utilized.

Discussion: https://postgr.es/m/CAJcOf-cXnB5cnMKqWEp2E2z7Mvcd04iLVmV=qpFJrR3AcrTS3g@mail.gmail.com
---
 src/backend/access/heap/heapam.c        |   4 -
 src/backend/access/transam/xact.c       |  33 ++++++--
 src/backend/executor/execMain.c         |   7 +-
 src/backend/executor/execParallel.c     |  59 ++++++++++++++-
 src/backend/executor/nodeGather.c       |  64 +++++++++++++---
 src/backend/executor/nodeModifyTable.c  |  44 ++++++++++-
 src/backend/optimizer/path/costsize.c   |  46 ++++++++++++
 src/backend/optimizer/plan/createplan.c |   2 +-
 src/backend/optimizer/plan/planner.c    | 129 +++++++++++++++++++++++++++++++-
 src/backend/optimizer/plan/setrefs.c    |  12 ++-
 src/backend/optimizer/util/pathnode.c   |  55 ++++----------
 src/include/access/xact.h               |   3 +-
 src/include/executor/execParallel.h     |   1 +
 src/include/executor/nodeModifyTable.h  |   3 +-
 src/include/nodes/execnodes.h           |   4 +-
 src/include/optimizer/cost.h            |   1 +
 src/include/optimizer/pathnode.h        |   3 +-
 17 files changed, 395 insertions(+), 75 deletions(-)

diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 1585861..a52d322 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -2049,10 +2049,6 @@ heap_prepare_insert(Relation relation, HeapTuple tup, TransactionId xid,
 	 * inserts in general except for the cases where inserts generate a new
 	 * CommandId (eg. inserts into a table having a foreign key column).
 	 */
-	if (IsParallelWorker())
-		ereport(ERROR,
-				(errcode(ERRCODE_INVALID_TRANSACTION_STATE),
-				 errmsg("cannot insert tuples in a parallel worker")));
 
 	tup->t_data->t_infomask &= ~(HEAP_XACT_MASK);
 	tup->t_data->t_infomask2 &= ~(HEAP2_XACT_MASK);
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 6130df1..9146878 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -517,6 +517,20 @@ GetCurrentFullTransactionIdIfAny(void)
 }
 
 /*
+ *	SetCurrentCommandIdUsedForWorker
+ *
+ * For a parallel worker, record that the currentCommandId has been used.
+ * This must only be called at the start of a parallel operation.
+ */
+void
+SetCurrentCommandIdUsedForWorker(void)
+{
+	Assert(IsParallelWorker() && !currentCommandIdUsed && currentCommandId != InvalidCommandId);
+
+	currentCommandIdUsed = true;
+}
+
+/*
  *	MarkCurrentTransactionIdLoggedIfAny
  *
  * Remember that the current xid - if it is assigned - now has been wal logged.
@@ -764,12 +778,16 @@ GetCurrentCommandId(bool used)
 	if (used)
 	{
 		/*
-		 * Forbid setting currentCommandIdUsed in a parallel worker, because
-		 * we have no provision for communicating this back to the leader.  We
-		 * could relax this restriction when currentCommandIdUsed was already
-		 * true at the start of the parallel operation.
+		 * If in a parallel worker, only allow setting currentCommandIdUsed
+		 * if currentCommandIdUsed was already true at the start of the
+		 * parallel operation (by way of SetCurrentCommandIdUsed()), otherwise
+		 * forbid setting currentCommandIdUsed because we have no provision
+		 * for communicating this back to the leader. Once currentCommandIdUsed
+		 * is set, the commandId used by leader and workers can't be changed,
+		 * because CommandCounterIncrement() then prevents any attempted
+		 * increment of the current commandId.
 		 */
-		Assert(!IsParallelWorker());
+		Assert(!(IsParallelWorker() && !currentCommandIdUsed));
 		currentCommandIdUsed = true;
 	}
 	return currentCommandId;
@@ -1020,10 +1038,13 @@ IsInParallelMode(void)
  * Prepare for entering parallel mode by assigning a FullTransactionId, to be
  * included in the transaction state that is serialized in the parallel DSM.
  */
-void PrepareParallelModeForModify(CmdType commandType)
+void PrepareParallelModeForModify(CmdType commandType, bool isParallelModifyLeader)
 {
 	Assert(!IsInParallelMode());
 
+	if (isParallelModifyLeader)
+		(void)GetCurrentCommandId(true);
+
 	(void)GetCurrentTransactionId();
 }
 
diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c
index 74a7960..0969a66 100644
--- a/src/backend/executor/execMain.c
+++ b/src/backend/executor/execMain.c
@@ -790,7 +790,8 @@ ExecCheckXactReadOnly(PlannedStmt *plannedstmt)
 		PreventCommandIfReadOnly(CreateCommandName((Node *) plannedstmt));
 	}
 
-	if (plannedstmt->commandType != CMD_SELECT || plannedstmt->hasModifyingCTE)
+	if ((plannedstmt->commandType != CMD_SELECT &&
+			!IsModifySupportedInParallelMode(plannedstmt->commandType)) || plannedstmt->hasModifyingCTE)
 		PreventCommandIfParallelMode(CreateCommandName((Node *) plannedstmt));
 }
 
@@ -1527,13 +1528,15 @@ ExecutePlan(EState *estate,
 	estate->es_use_parallel_mode = use_parallel_mode;
 	if (use_parallel_mode)
 	{
+		bool isParallelModifyLeader = IsA(planstate, GatherState) && IsA(outerPlanState(planstate), ModifyTableState);
+
 		/*
 		 * Supported table-modification commands may require additional steps
 		 * prior to entering parallel mode, such as assigning a FullTransactionId.
 		 */
 		if (IsModifySupportedInParallelMode(estate->es_plannedstmt->commandType))
 		{
-			PrepareParallelModeForModify(estate->es_plannedstmt->commandType);
+			PrepareParallelModeForModify(estate->es_plannedstmt->commandType, isParallelModifyLeader);
 		}
 		EnterParallelMode();
 	}
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c
index befde52..39f60af 100644
--- a/src/backend/executor/execParallel.c
+++ b/src/backend/executor/execParallel.c
@@ -23,6 +23,7 @@
 
 #include "postgres.h"
 
+#include "access/xact.h"
 #include "executor/execParallel.h"
 #include "executor/executor.h"
 #include "executor/nodeAgg.h"
@@ -65,6 +66,7 @@
 #define PARALLEL_KEY_QUERY_TEXT		UINT64CONST(0xE000000000000008)
 #define PARALLEL_KEY_JIT_INSTRUMENTATION UINT64CONST(0xE000000000000009)
 #define PARALLEL_KEY_WAL_USAGE			UINT64CONST(0xE00000000000000A)
+#define PARALLEL_KEY_PROCESSED_COUNT	UINT64CONST(0xE00000000000000B)
 
 #define PARALLEL_TUPLE_QUEUE_SIZE		65536
 
@@ -173,9 +175,11 @@ ExecSerializePlan(Plan *plan, EState *estate)
 	 * PlannedStmt to start the executor.
 	 */
 	pstmt = makeNode(PlannedStmt);
-	pstmt->commandType = CMD_SELECT;
+	Assert(estate->es_plannedstmt->commandType == CMD_SELECT ||
+			IsModifySupportedInParallelMode(estate->es_plannedstmt->commandType));
+	pstmt->commandType = IsA(plan, ModifyTable) ? castNode(ModifyTable, plan)->operation : CMD_SELECT;
 	pstmt->queryId = UINT64CONST(0);
-	pstmt->hasReturning = false;
+	pstmt->hasReturning = estate->es_plannedstmt->hasReturning;
 	pstmt->hasModifyingCTE = false;
 	pstmt->canSetTag = true;
 	pstmt->transientPlan = false;
@@ -183,7 +187,7 @@ ExecSerializePlan(Plan *plan, EState *estate)
 	pstmt->parallelModeNeeded = false;
 	pstmt->planTree = plan;
 	pstmt->rtable = estate->es_range_table;
-	pstmt->resultRelations = NIL;
+	pstmt->resultRelations = estate->es_plannedstmt->resultRelations;
 	pstmt->appendRelations = NIL;
 
 	/*
@@ -590,6 +594,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate,
 	char	   *paramlistinfo_space;
 	BufferUsage *bufusage_space;
 	WalUsage   *walusage_space;
+	uint64	   *processed_count_space;
 	SharedExecutorInstrumentation *instrumentation = NULL;
 	SharedJitInstrumentation *jit_instrumentation = NULL;
 	int			pstmt_len;
@@ -675,6 +680,14 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate,
 						   mul_size(PARALLEL_TUPLE_QUEUE_SIZE, pcxt->nworkers));
 	shm_toc_estimate_keys(&pcxt->estimator, 1);
 
+	if (IsA(planstate->plan, ModifyTable))
+	{
+		/* Estimate space for returned "# of tuples processed" count. */
+		shm_toc_estimate_chunk(&pcxt->estimator,
+							   mul_size(sizeof(uint64), pcxt->nworkers));
+		shm_toc_estimate_keys(&pcxt->estimator, 1);
+	}
+
 	/*
 	 * Give parallel-aware nodes a chance to add to the estimates, and get a
 	 * count of how many PlanState nodes there are.
@@ -764,6 +777,19 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate,
 	/* We don't need the TupleQueueReaders yet, though. */
 	pei->reader = NULL;
 
+	if (IsA(planstate->plan, ModifyTable))
+	{
+		/* Allocate space for each worker's returned "# of tuples processed" count. */
+		processed_count_space = shm_toc_allocate(pcxt->toc,
+											mul_size(sizeof(uint64), pcxt->nworkers));
+		shm_toc_insert(pcxt->toc, PARALLEL_KEY_PROCESSED_COUNT, processed_count_space);
+		pei->processed_count = processed_count_space;
+	}
+	else
+	{
+		pei->processed_count = NULL;
+	}
+
 	/*
 	 * If instrumentation options were supplied, allocate space for the data.
 	 * It only gets partially initialized here; the rest happens during
@@ -1152,6 +1178,15 @@ ExecParallelFinish(ParallelExecutorInfo *pei)
 	for (i = 0; i < nworkers; i++)
 		InstrAccumParallelQuery(&pei->buffer_usage[i], &pei->wal_usage[i]);
 
+	/*
+	 * Update total # of tuples processed, using counts from each worker.
+	 */
+	if (pei->processed_count != NULL)
+	{
+		for (i = 0; i < nworkers; i++)
+			pei->planstate->state->es_processed += pei->processed_count[i];
+	}
+
 	pei->finished = true;
 }
 
@@ -1379,6 +1414,7 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
 	FixedParallelExecutorState *fpes;
 	BufferUsage *buffer_usage;
 	WalUsage   *wal_usage;
+	uint64   *processed_count;
 	DestReceiver *receiver;
 	QueryDesc  *queryDesc;
 	SharedExecutorInstrumentation *instrumentation;
@@ -1400,6 +1436,16 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
 										 true);
 	queryDesc = ExecParallelGetQueryDesc(toc, receiver, instrument_options);
 
+	Assert(queryDesc->operation == CMD_SELECT || IsModifySupportedInParallelMode(queryDesc->operation));
+	if (IsModifySupportedInParallelMode(queryDesc->operation))
+	{
+		/*
+		 * Record that the CurrentCommandId is used, at the start of
+		 * the parallel operation.
+		 */
+		SetCurrentCommandIdUsedForWorker();
+	}
+
 	/* Setting debug_query_string for individual workers */
 	debug_query_string = queryDesc->sourceText;
 
@@ -1458,6 +1504,13 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
 	InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber],
 						  &wal_usage[ParallelWorkerNumber]);
 
+	if (IsModifySupportedInParallelMode(queryDesc->operation))
+	{
+		/* Report the # of tuples processed during parallel INSERT execution. */
+		processed_count = shm_toc_lookup(toc, PARALLEL_KEY_PROCESSED_COUNT, false);
+		processed_count[ParallelWorkerNumber] = queryDesc->estate->es_processed;
+	}
+
 	/* Report instrumentation data if any instrumentation options are set. */
 	if (instrumentation != NULL)
 		ExecParallelReportInstrumentation(queryDesc->planstate,
diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c
index a01b46a..f8d85bc 100644
--- a/src/backend/executor/nodeGather.c
+++ b/src/backend/executor/nodeGather.c
@@ -35,6 +35,7 @@
 #include "executor/execdebug.h"
 #include "executor/execParallel.h"
 #include "executor/nodeGather.h"
+#include "executor/nodeModifyTable.h"
 #include "executor/nodeSubplan.h"
 #include "executor/tqueue.h"
 #include "miscadmin.h"
@@ -60,6 +61,7 @@ ExecInitGather(Gather *node, EState *estate, int eflags)
 	GatherState *gatherstate;
 	Plan	   *outerNode;
 	TupleDesc	tupDesc;
+	Index		varno;
 
 	/* Gather node doesn't have innerPlan node. */
 	Assert(innerPlan(node) == NULL);
@@ -104,7 +106,9 @@ ExecInitGather(Gather *node, EState *estate, int eflags)
 	 * Initialize result type and projection.
 	 */
 	ExecInitResultTypeTL(&gatherstate->ps);
-	ExecConditionalAssignProjectionInfo(&gatherstate->ps, tupDesc, OUTER_VAR);
+	varno = (IsA(outerNode, ModifyTable) && castNode(ModifyTable, outerNode)->returningLists != NULL) ?
+					castNode(ModifyTableState, outerPlanState(gatherstate))->resultRelInfo->ri_RangeTableIndex : OUTER_VAR;
+	ExecConditionalAssignProjectionInfo(&gatherstate->ps, tupDesc, varno);
 
 	/*
 	 * Without projections result slot type is not trivially known, see
@@ -144,9 +148,19 @@ ExecGather(PlanState *pstate)
 	GatherState *node = castNode(GatherState, pstate);
 	TupleTableSlot *slot;
 	ExprContext *econtext;
+	ModifyTableState *nodeModifyTableState = NULL;
+	bool isParallelModifyLeader = false;
+	bool isParallelModifyWithReturning = false;
 
 	CHECK_FOR_INTERRUPTS();
 
+	if (IsA(outerPlanState(pstate), ModifyTableState))
+	{
+		nodeModifyTableState = castNode(ModifyTableState, outerPlanState(pstate));
+		isParallelModifyLeader = IsModifySupportedInParallelMode(nodeModifyTableState->operation);
+		isParallelModifyWithReturning = isParallelModifyLeader && nodeModifyTableState->ps.plan->targetlist != NIL;
+	}
+
 	/*
 	 * Initialize the parallel context and workers on first execution. We do
 	 * this on first execution rather than during node initialization, as it
@@ -178,6 +192,16 @@ ExecGather(PlanState *pstate)
 										 node->pei,
 										 gather->initParam);
 
+			if (isParallelModifyLeader)
+			{
+				/*
+				 * For a supported parallel table-modification command, if there
+				 * are BEFORE STATEMENT triggers, these must be fired by the leader,
+				 * not by the parallel workers.
+				 */
+				fireBSTriggersInLeader(nodeModifyTableState);
+			}
+
 			/*
 			 * Register backend workers. We might not get as many as we
 			 * requested, or indeed any at all.
@@ -188,7 +212,7 @@ ExecGather(PlanState *pstate)
 			node->nworkers_launched = pcxt->nworkers_launched;
 
 			/* Set up tuple queue readers to read the results. */
-			if (pcxt->nworkers_launched > 0)
+			if (pcxt->nworkers_launched > 0 && !(isParallelModifyLeader && !isParallelModifyWithReturning))
 			{
 				ExecParallelCreateReaders(node->pei);
 				/* Make a working array showing the active readers */
@@ -200,7 +224,11 @@ ExecGather(PlanState *pstate)
 			}
 			else
 			{
-				/* No workers?	Then never mind. */
+				/*
+				 * No workers were launched, or this is a supported parallel
+				 * table-modification command without a RETURNING clause -
+				 * no readers are required.
+				 */
 				node->nreaders = 0;
 				node->reader = NULL;
 			}
@@ -208,7 +236,7 @@ ExecGather(PlanState *pstate)
 		}
 
 		/* Run plan locally if no workers or enabled and not single-copy. */
-		node->need_to_scan_locally = (node->nreaders == 0)
+		node->need_to_scan_locally = (node->nworkers_launched <= 0)
 			|| (!gather->single_copy && parallel_leader_participation);
 		node->initialized = true;
 	}
@@ -418,14 +446,32 @@ ExecShutdownGatherWorkers(GatherState *node)
 void
 ExecShutdownGather(GatherState *node)
 {
-	ExecShutdownGatherWorkers(node);
+	/*
+	 * If the parallel context has already been destroyed, this
+	 * function must have been previously called, so just
+	 * return.
+	 */
+	if (node->pei == NULL)
+		return;
 
-	/* Now destroy the parallel context. */
-	if (node->pei != NULL)
+	bool isParallelModifyLeader = IsA(outerPlanState(node), ModifyTableState) &&
+									IsModifySupportedInParallelMode(castNode(ModifyTableState, outerPlanState(node))->operation);
+	if (isParallelModifyLeader)
 	{
-		ExecParallelCleanup(node->pei);
-		node->pei = NULL;
+		/*
+		 * For a supported parallel table-modification command, if there are
+		 * AFTER STATEMENT triggers, these must be fired by the leader, not
+		 * by the parallel workers.
+		 */
+		ModifyTableState *nodeModifyTableState = castNode(ModifyTableState, outerPlanState(node));
+		fireASTriggersInLeader(nodeModifyTableState);
 	}
+
+	ExecShutdownGatherWorkers(node);
+
+	/* Now destroy the parallel context. */
+	ExecParallelCleanup(node->pei);
+	node->pei = NULL;
 }
 
 /* ----------------------------------------------------------------
diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c
index 29e07b7..220d408 100644
--- a/src/backend/executor/nodeModifyTable.c
+++ b/src/backend/executor/nodeModifyTable.c
@@ -39,6 +39,7 @@
 
 #include "access/heapam.h"
 #include "access/htup_details.h"
+#include "access/parallel.h"
 #include "access/tableam.h"
 #include "access/xact.h"
 #include "catalog/catalog.h"
@@ -1833,6 +1834,39 @@ fireASTriggers(ModifyTableState *node)
 }
 
 /*
+ * Process BEFORE EACH STATEMENT triggers, in the leader
+ */
+void
+fireBSTriggersInLeader(ModifyTableState *node)
+{
+	Assert(IsInParallelMode() && !IsParallelWorker());
+
+	if (node->fireBSTriggers)
+	{
+		fireBSTriggers(node);
+		node->fireBSTriggers = false;
+
+		/*
+		 * Disable firing of AFTER STATEMENT triggers by local
+		 * plan execution (ModifyTable processing). These will be
+		 * fired at the end of Gather processing.
+		 */
+		node->fireASTriggers = false;
+	}
+}
+
+/*
+ * Process AFTER EACH STATEMENT triggers, in the leader
+ */
+void
+fireASTriggersInLeader(ModifyTableState *node)
+{
+	Assert(IsInParallelMode() && !IsParallelWorker());
+
+	fireASTriggers(node);
+}
+
+/*
  * Set up the state needed for collecting transition tuples for AFTER
  * triggers.
  */
@@ -2158,7 +2192,11 @@ ExecModifyTable(PlanState *pstate)
 	/*
 	 * We're done, but fire AFTER STATEMENT triggers before exiting.
 	 */
-	fireASTriggers(node);
+	if (node->fireASTriggers)
+	{
+		fireASTriggers(node);
+		node->fireASTriggers = false;
+	}
 
 	node->mt_done = true;
 
@@ -2235,7 +2273,9 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags)
 
 	/* set up epqstate with dummy subplan data for the moment */
 	EvalPlanQualInit(&mtstate->mt_epqstate, estate, NULL, NIL, node->epqParam);
-	mtstate->fireBSTriggers = true;
+	/* Statement-level triggers must not be fired by parallel workers */
+	mtstate->fireBSTriggers = !IsParallelWorker();
+	mtstate->fireASTriggers = !IsParallelWorker();
 
 	/*
 	 * Build state for collecting transition tuples.  This requires having a
diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c
index f1dfdc1..23676c1 100644
--- a/src/backend/optimizer/path/costsize.c
+++ b/src/backend/optimizer/path/costsize.c
@@ -213,6 +213,52 @@ clamp_row_est(double nrows)
 
 
 /*
+ * cost_modifytable
+ *    Determines and returns the cost of a ModifyTable node.
+ */
+void
+cost_modifytable(ModifyTablePath *path)
+{
+	double      total_size;
+	ListCell   *lc;
+
+	/*
+	 * Compute cost & rowcount as sum of subpath costs & rowcounts.
+	 *
+	 * Currently, we don't charge anything extra for the actual table
+	 * modification work, nor for the WITH CHECK OPTIONS or RETURNING
+	 * expressions if any.
+	 */
+	path->path.startup_cost = 0;
+	path->path.total_cost = 0;
+	path->path.rows = 0;
+	total_size = 0;
+	foreach(lc, path->subpaths)
+	{
+		Path       *subpath = (Path *) lfirst(lc);
+
+		if (lc == list_head(path->subpaths))  /* first node? */
+			path->path.startup_cost = subpath->startup_cost;
+		path->path.total_cost += subpath->total_cost;
+		if (path->returningLists != NIL)
+		{
+			path->path.rows += subpath->rows;
+			total_size += subpath->pathtarget->width * subpath->rows;
+		}
+	}
+
+	/*
+	 * Set width to the average width of the subpath outputs.  XXX this is
+	 * totally wrong: we should return an average of the RETURNING tlist
+	 * widths.  But it's what happened historically, and improving it is a task
+	 * for another day.
+	 */
+	if (path->path.rows > 0)
+		total_size /= path->path.rows;
+	path->path.pathtarget->width = rint(total_size);
+}
+
+/*
  * cost_seqscan
  *	  Determines and returns the cost of scanning a relation sequentially.
  *
diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c
index 94280a7..5893051 100644
--- a/src/backend/optimizer/plan/createplan.c
+++ b/src/backend/optimizer/plan/createplan.c
@@ -339,7 +339,7 @@ create_plan(PlannerInfo *root, Path *best_path)
 	 * top-level tlist seen at execution time.  However, ModifyTable plan
 	 * nodes don't have a tlist matching the querytree targetlist.
 	 */
-	if (!IsA(plan, ModifyTable))
+	if (!IsA(plan, ModifyTable) && !(IsA(plan, Gather) && IsA(outerPlan(plan), ModifyTable)))
 		apply_tlist_labeling(plan->targetlist, root->processed_tlist);
 
 	/*
diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c
index 93c5b6f..5925363 100644
--- a/src/backend/optimizer/plan/planner.c
+++ b/src/backend/optimizer/plan/planner.c
@@ -1807,7 +1807,8 @@ inheritance_planner(PlannerInfo *root)
 									 returningLists,
 									 rowMarks,
 									 NULL,
-									 assign_special_exec_param(root)));
+									 assign_special_exec_param(root),
+									 0));
 }
 
 /*--------------------
@@ -1855,6 +1856,7 @@ grouping_planner(PlannerInfo *root, bool inheritance_update,
 	RelOptInfo *final_rel;
 	FinalPathExtraData extra;
 	ListCell   *lc;
+	int parallel_modify_partial_path_count = 0;
 
 	/* Tweak caller-supplied tuple_fraction if have LIMIT/OFFSET */
 	if (parse->limitCount || parse->limitOffset)
@@ -2391,13 +2393,102 @@ grouping_planner(PlannerInfo *root, bool inheritance_update,
 										returningLists,
 										rowMarks,
 										parse->onConflict,
-										assign_special_exec_param(root));
+										assign_special_exec_param(root),
+										0);
 		}
 
 		/* And shove it into final_rel */
 		add_path(final_rel, path);
 	}
 
+	/* Consider a supported parallel table-modification command */
+	if (IsModifySupportedInParallelMode(parse->commandType) &&
+		 !inheritance_update &&
+		 final_rel->consider_parallel &&
+		 parse->rowMarks == NIL)
+	{
+		Index		rootRelation;
+		List	   *withCheckOptionLists;
+		List	   *returningLists;
+		int			parallelModifyWorkers;
+
+		/*
+		 * Generate partial paths for the final_rel. Insert all surviving paths, with
+		 * Limit, and/or ModifyTable steps added if needed.
+		 */
+		foreach(lc, current_rel->partial_pathlist)
+		{
+			Path	   *path = (Path *) lfirst(lc);
+
+			/*
+			 * If there is a LIMIT/OFFSET clause, add the LIMIT node.
+			 */
+			if (limit_needed(parse))
+			{
+				path = (Path *) create_limit_path(root, final_rel, path,
+												  parse->limitOffset,
+												  parse->limitCount,
+												  parse->limitOption,
+												  offset_est, count_est);
+			}
+
+			/*
+			 * Add the ModifyTable node.
+			 */
+
+			/*
+			 * If target is a partition root table, we need to mark the
+			 * ModifyTable node appropriately for that.
+			 */
+			if (rt_fetch(parse->resultRelation, parse->rtable)->relkind ==
+				RELKIND_PARTITIONED_TABLE)
+				rootRelation = parse->resultRelation;
+			else
+				rootRelation = 0;
+
+			/*
+			 * Set up the WITH CHECK OPTION and RETURNING lists-of-lists, if
+			 * needed.
+			 */
+			if (parse->withCheckOptions)
+				withCheckOptionLists = list_make1(parse->withCheckOptions);
+			else
+				withCheckOptionLists = NIL;
+
+			if (parse->returningList)
+				returningLists = list_make1(parse->returningList);
+			else
+				returningLists = NIL;
+
+			/*
+			 * For the number of workers to use for a parallel INSERT/UPDATE/DELETE,
+			 * it seems resonable to use the same number of workers as estimated
+			 * for the underlying query.
+			 */
+			parallelModifyWorkers = path->parallel_workers;
+
+			path = (Path *)
+				create_modifytable_path(root, final_rel,
+										parse->commandType,
+										parse->canSetTag,
+										parse->resultRelation,
+										rootRelation,
+										false,
+										list_make1_int(parse->resultRelation),
+										list_make1(path),
+										list_make1(root),
+										withCheckOptionLists,
+										returningLists,
+										root->rowMarks,
+										parse->onConflict,
+										assign_special_exec_param(root),
+										parallelModifyWorkers);
+
+			add_partial_path(final_rel, path);
+			parallel_modify_partial_path_count++;
+		}
+	}
+
 	/*
 	 * Generate partial paths for final_rel, too, if outer query levels might
 	 * be able to make use of them.
@@ -2414,6 +2505,12 @@ grouping_planner(PlannerInfo *root, bool inheritance_update,
 		}
 	}
 
+	if (parallel_modify_partial_path_count > 0)
+	{
+		final_rel->rows = current_rel->rows;		/* ??? why hasn't this been set above somewhere ???? */
+		generate_useful_gather_paths(root, final_rel, false);
+	}
+
 	extra.limit_needed = limit_needed(parse);
 	extra.limit_tuples = limit_tuples;
 	extra.count_est = count_est;
@@ -7583,7 +7680,33 @@ apply_scanjoin_target_to_paths(PlannerInfo *root,
 	 * one of the generated paths may turn out to be the cheapest one.
 	 */
 	if (rel->consider_parallel && !IS_OTHER_REL(rel))
-		generate_useful_gather_paths(root, rel, false);
+	{
+		if (IsModifySupportedInParallelMode(root->parse->commandType))
+		{
+			Assert(root->glob->parallelModeOK);
+			if (root->glob->maxParallelHazard != PROPARALLEL_SAFE)
+			{
+				/*
+				 * Don't allow a supported parallel table-modification command,
+				 * because it's not safe.
+				 */
+				if (root->glob->maxParallelHazard == PROPARALLEL_RESTRICTED)
+				{
+					/*
+					 * However, do allow any underlying query to be run by
+					 * parallel workers.
+					 */
+					generate_useful_gather_paths(root, rel, false);
+				}
+				rel->partial_pathlist = NIL;
+				rel->consider_parallel = false;
+			}
+		}
+		else
+		{
+			generate_useful_gather_paths(root, rel, false);
+		}
+	}
 
 	/*
 	 * Reassess which paths are the cheapest, now that we've potentially added
diff --git a/src/backend/optimizer/plan/setrefs.c b/src/backend/optimizer/plan/setrefs.c
index 127ea3d..3eec4ea 100644
--- a/src/backend/optimizer/plan/setrefs.c
+++ b/src/backend/optimizer/plan/setrefs.c
@@ -252,6 +252,7 @@ set_plan_references(PlannerInfo *root, Plan *plan)
 	PlannerGlobal *glob = root->glob;
 	int			rtoffset = list_length(glob->finalrtable);
 	ListCell   *lc;
+	Plan		*finalPlan;
 
 	/*
 	 * Add all the query's RTEs to the flattened rangetable.  The live ones
@@ -302,7 +303,16 @@ set_plan_references(PlannerInfo *root, Plan *plan)
 	}
 
 	/* Now fix the Plan tree */
-	return set_plan_refs(root, plan, rtoffset);
+	finalPlan = set_plan_refs(root, plan, rtoffset);
+	if (finalPlan != NULL && IsA(finalPlan, Gather))
+	{
+		Plan *subplan = outerPlan(finalPlan);
+		if (IsA(subplan, ModifyTable) && castNode(ModifyTable, subplan)->returningLists != NULL)
+		{
+			finalPlan->targetlist = copyObject(subplan->targetlist);
+		}
+	}
+	return finalPlan;
 }
 
 /*
diff --git a/src/backend/optimizer/util/pathnode.c b/src/backend/optimizer/util/pathnode.c
index 5281a2f..30dc022 100644
--- a/src/backend/optimizer/util/pathnode.c
+++ b/src/backend/optimizer/util/pathnode.c
@@ -3528,6 +3528,7 @@ create_lockrows_path(PlannerInfo *root, RelOptInfo *rel,
  * 'rowMarks' is a list of PlanRowMarks (non-locking only)
  * 'onconflict' is the ON CONFLICT clause, or NULL
  * 'epqParam' is the ID of Param for EvalPlanQual re-eval
+ * 'parallelWorkers' is the no. of parallel workers to use
  */
 ModifyTablePath *
 create_modifytable_path(PlannerInfo *root, RelOptInfo *rel,
@@ -3538,10 +3539,10 @@ create_modifytable_path(PlannerInfo *root, RelOptInfo *rel,
 						List *subroots,
 						List *withCheckOptionLists, List *returningLists,
 						List *rowMarks, OnConflictExpr *onconflict,
-						int epqParam)
+						int epqParam,
+						int parallelWorkers)
 {
 	ModifyTablePath *pathnode = makeNode(ModifyTablePath);
-	double		total_size;
 	ListCell   *lc;
 
 	Assert(list_length(resultRelations) == list_length(subpaths));
@@ -3558,47 +3559,21 @@ create_modifytable_path(PlannerInfo *root, RelOptInfo *rel,
 	/* For now, assume we are above any joins, so no parameterization */
 	pathnode->path.param_info = NULL;
 	pathnode->path.parallel_aware = false;
-	pathnode->path.parallel_safe = false;
-	pathnode->path.parallel_workers = 0;
-	pathnode->path.pathkeys = NIL;
-
-	/*
-	 * Compute cost & rowcount as sum of subpath costs & rowcounts.
-	 *
-	 * Currently, we don't charge anything extra for the actual table
-	 * modification work, nor for the WITH CHECK OPTIONS or RETURNING
-	 * expressions if any.  It would only be window dressing, since
-	 * ModifyTable is always a top-level node and there is no way for the
-	 * costs to change any higher-level planning choices.  But we might want
-	 * to make it look better sometime.
-	 */
-	pathnode->path.startup_cost = 0;
-	pathnode->path.total_cost = 0;
-	pathnode->path.rows = 0;
-	total_size = 0;
-	foreach(lc, subpaths)
+	pathnode->path.parallel_safe = rel->consider_parallel && parallelWorkers > 0;
+	if (pathnode->path.parallel_safe)
 	{
-		Path	   *subpath = (Path *) lfirst(lc);
-
-		if (lc == list_head(subpaths))	/* first node? */
-			pathnode->path.startup_cost = subpath->startup_cost;
-		pathnode->path.total_cost += subpath->total_cost;
-		if (returningLists != NIL)
+		foreach (lc, subpaths)
 		{
-			pathnode->path.rows += subpath->rows;
-			total_size += subpath->pathtarget->width * subpath->rows;
+			Path *sp = (Path *)lfirst(lc);
+			if (!sp->parallel_safe)
+			{
+				pathnode->path.parallel_safe = false;
+				break;
+			}
 		}
 	}
-
-	/*
-	 * Set width to the average width of the subpath outputs.  XXX this is
-	 * totally wrong: we should return an average of the RETURNING tlist
-	 * widths.  But it's what happened historically, and improving it is a task
-	 * for another day.
-	 */
-	if (pathnode->path.rows > 0)
-		total_size /= pathnode->path.rows;
-	pathnode->path.pathtarget->width = rint(total_size);
+	pathnode->path.parallel_workers = parallelWorkers;
+	pathnode->path.pathkeys = NIL;
 
 	pathnode->operation = operation;
 	pathnode->canSetTag = canSetTag;
@@ -3614,6 +3589,8 @@ create_modifytable_path(PlannerInfo *root, RelOptInfo *rel,
 	pathnode->onconflict = onconflict;
 	pathnode->epqParam = epqParam;
 
+	cost_modifytable(pathnode);
+
 	return pathnode;
 }
 
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index a926fff..f43a844 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -386,6 +386,7 @@ extern FullTransactionId GetTopFullTransactionId(void);
 extern FullTransactionId GetTopFullTransactionIdIfAny(void);
 extern FullTransactionId GetCurrentFullTransactionId(void);
 extern FullTransactionId GetCurrentFullTransactionIdIfAny(void);
+extern void SetCurrentCommandIdUsedForWorker(void);
 extern void MarkCurrentTransactionIdLoggedIfAny(void);
 extern bool SubTransactionIsActive(SubTransactionId subxid);
 extern CommandId GetCurrentCommandId(bool used);
@@ -466,7 +467,7 @@ extern void ParsePrepareRecord(uint8 info, xl_xact_prepare *xlrec, xl_xact_parse
 extern void EnterParallelMode(void);
 extern void ExitParallelMode(void);
 extern bool IsInParallelMode(void);
-extern void PrepareParallelModeForModify(CmdType commandType);
+extern void PrepareParallelModeForModify(CmdType commandType, bool isParallelModifyLeader);
 
 /*
  * IsModifySupportedInParallelMode
diff --git a/src/include/executor/execParallel.h b/src/include/executor/execParallel.h
index 5a39a5b..afb8a57 100644
--- a/src/include/executor/execParallel.h
+++ b/src/include/executor/execParallel.h
@@ -27,6 +27,7 @@ typedef struct ParallelExecutorInfo
 	ParallelContext *pcxt;		/* parallel context we're using */
 	BufferUsage *buffer_usage;	/* points to bufusage area in DSM */
 	WalUsage   *wal_usage;		/* walusage area in DSM */
+	uint64   *processed_count;	/* processed tuple count area in DSM */
 	SharedExecutorInstrumentation *instrumentation; /* optional */
 	struct SharedJitInstrumentation *jit_instrumentation;	/* optional */
 	dsa_area   *area;			/* points to DSA area in DSM */
diff --git a/src/include/executor/nodeModifyTable.h b/src/include/executor/nodeModifyTable.h
index 46a2dc9..e332482 100644
--- a/src/include/executor/nodeModifyTable.h
+++ b/src/include/executor/nodeModifyTable.h
@@ -22,5 +22,6 @@ extern void ExecComputeStoredGenerated(ResultRelInfo *resultRelInfo,
 extern ModifyTableState *ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags);
 extern void ExecEndModifyTable(ModifyTableState *node);
 extern void ExecReScanModifyTable(ModifyTableState *node);
-
+extern void fireBSTriggersInLeader(ModifyTableState *node);
+extern void fireASTriggersInLeader(ModifyTableState *node);
 #endif							/* NODEMODIFYTABLE_H */
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 6c0a7d6..c558eef 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -1175,8 +1175,8 @@ typedef struct ModifyTableState
 
 	List	  **mt_arowmarks;	/* per-subplan ExecAuxRowMark lists */
 	EPQState	mt_epqstate;	/* for evaluating EvalPlanQual rechecks */
-	bool		fireBSTriggers; /* do we need to fire stmt triggers? */
-
+	bool        fireBSTriggers; /* do we need to fire before stmt triggers? */
+	bool        fireASTriggers; /* do we need to fire after stmt triggers? */
 	/*
 	 * Slot for storing tuples in the root partitioned table's rowtype during
 	 * an UPDATE of a partitioned table.
diff --git a/src/include/optimizer/cost.h b/src/include/optimizer/cost.h
index 6141654..fafa087 100644
--- a/src/include/optimizer/cost.h
+++ b/src/include/optimizer/cost.h
@@ -69,6 +69,7 @@ extern PGDLLIMPORT int constraint_exclusion;
 
 extern double index_pages_fetched(double tuples_fetched, BlockNumber pages,
 								  double index_pages, PlannerInfo *root);
+extern void cost_modifytable(ModifyTablePath *path);
 extern void cost_seqscan(Path *path, PlannerInfo *root, RelOptInfo *baserel,
 						 ParamPathInfo *param_info);
 extern void cost_samplescan(Path *path, PlannerInfo *root, RelOptInfo *baserel,
diff --git a/src/include/optimizer/pathnode.h b/src/include/optimizer/pathnode.h
index 715a24a..2d08f0c 100644
--- a/src/include/optimizer/pathnode.h
+++ b/src/include/optimizer/pathnode.h
@@ -264,7 +264,8 @@ extern ModifyTablePath *create_modifytable_path(PlannerInfo *root,
 												List *subroots,
 												List *withCheckOptionLists, List *returningLists,
 												List *rowMarks, OnConflictExpr *onconflict,
-												int epqParam);
+												int epqParam,
+												int parallel_workers);
 extern LimitPath *create_limit_path(PlannerInfo *root, RelOptInfo *rel,
 									Path *subpath,
 									Node *limitOffset, Node *limitCount,
-- 
1.8.3.1

