From 7e3739cec08140154818e6f75b1ca74e0f2e9234 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Wed, 23 Oct 2019 16:17:57 +1300
Subject: [PATCH 1/2] Show parallel leader stats in EXPLAIN output.

Previously, we only showed VERBOSE information from parallel
workers.  Show the leader's contribution too.

Instead of aggregating all workers' instrumentation into the
leader's, create a separate instrumentation object into which
all processes' contributions are aggregated.  Then the leader's
is still available for separate display.

Author: Thomas Munro
Reviewed-by: Melanie Plageman, Rafia Sabih
Discussion: https://postgr.es/m/CA+hUKG+Z22=vkVkXtKRznzRdtj=MtygfZMYUJs8j7ObjzkG1Lw@mail.gmail.com
---
 src/backend/commands/explain.c      | 221 ++++++++++++++++------------
 src/backend/executor/execParallel.c |  18 ++-
 src/include/nodes/execnodes.h       |   1 +
 3 files changed, 137 insertions(+), 103 deletions(-)

diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c
index 62fb3434a3..89c39de72a 100644
--- a/src/backend/commands/explain.c
+++ b/src/backend/commands/explain.c
@@ -105,7 +105,7 @@ static void show_hash_info(HashState *hashstate, ExplainState *es);
 static void show_tidbitmap_info(BitmapHeapScanState *planstate,
 								ExplainState *es);
 static void show_instrumentation_count(const char *qlabel, int which,
-									   PlanState *planstate, ExplainState *es);
+									   Instrumentation *instrument, ExplainState *es);
 static void show_foreignscan_info(ForeignScanState *fsstate, ExplainState *es);
 static void show_eval_params(Bitmapset *bms_params, ExplainState *es);
 static const char *explain_get_index_name(Oid indexId);
@@ -1041,6 +1041,75 @@ ExplainPreScanNode(PlanState *planstate, Bitmapset **rels_used)
 	return planstate_tree_walker(planstate, ExplainPreScanNode, rels_used);
 }
 
+/*
+ * ExplainNodePerProcess -
+ *	  Show the per-process instrumentation for a node run in parallel.
+ *
+ * Subroutine for ExplainNode() when showing a parallel query in ANALYZE
+ * VERBOSE mode.  Use worker number -1 for the leader process.
+ */
+static void
+ExplainNodePerProcess(ExplainState *es, bool *opened_group,
+					  int worker_number, Instrumentation *instrument)
+{
+	double		nloops = instrument->nloops;
+	double		startup_ms;
+	double		total_ms;
+	double		rows;
+
+	if (nloops <= 0)
+		return;
+	startup_ms = 1000.0 * instrument->startup / nloops;
+	total_ms = 1000.0 * instrument->total / nloops;
+	rows = instrument->ntuples / nloops;
+
+	if (es->format == EXPLAIN_FORMAT_TEXT)
+	{
+		appendStringInfoSpaces(es->str, es->indent * 2);
+		if (worker_number < 0)
+			appendStringInfo(es->str, "Leader: ");
+		else
+			appendStringInfo(es->str, "Worker %d: ", worker_number);
+		if (es->timing)
+			appendStringInfo(es->str,
+							 "actual time=%.3f..%.3f rows=%.0f loops=%.0f\n",
+							 startup_ms, total_ms, rows, nloops);
+		else
+			appendStringInfo(es->str,
+							 "actual rows=%.0f loops=%.0f\n",
+							 rows, nloops);
+		es->indent++;
+		if (es->buffers)
+			show_buffer_usage(es, &instrument->bufusage);
+		es->indent--;
+	}
+	else
+	{
+		if (!*opened_group)
+		{
+			ExplainOpenGroup("Workers", "Workers", false, es);
+			*opened_group = true;
+		}
+		ExplainOpenGroup("Worker", NULL, true, es);
+		ExplainPropertyInteger("Worker Number", NULL, worker_number, es);
+
+		if (es->timing)
+		{
+			ExplainPropertyFloat("Actual Startup Time", "ms",
+								 startup_ms, 3, es);
+			ExplainPropertyFloat("Actual Total Time", "ms",
+								 total_ms, 3, es);
+		}
+		ExplainPropertyFloat("Actual Rows", NULL, rows, 0, es);
+		ExplainPropertyFloat("Actual Loops", NULL, nloops, 0, es);
+
+		if (es->buffers)
+			show_buffer_usage(es, &instrument->bufusage);
+
+		ExplainCloseGroup("Worker", NULL, true, es);
+	}
+}
+
 /*
  * ExplainNode -
  *	  Appends a description of a plan tree to es->str
@@ -1067,6 +1136,7 @@ ExplainNode(PlanState *planstate, List *ancestors,
 			ExplainState *es)
 {
 	Plan	   *plan = planstate->plan;
+	Instrumentation *show_instrument;
 	const char *pname;			/* node type name for text output */
 	const char *sname;			/* node type name for non-text output */
 	const char *strategy = NULL;
@@ -1076,6 +1146,12 @@ ExplainNode(PlanState *planstate, List *ancestors,
 	int			save_indent = es->indent;
 	bool		haschildren;
 
+	/* If this is a parallel query, show the combined stats. */
+	if (planstate->combined_instrument)
+		show_instrument = planstate->combined_instrument;
+	else
+		show_instrument = planstate->instrument;
+
 	switch (nodeTag(plan))
 	{
 		case T_Result:
@@ -1505,13 +1581,19 @@ ExplainNode(PlanState *planstate, List *ancestors,
 	if (planstate->instrument)
 		InstrEndLoop(planstate->instrument);
 
-	if (es->analyze &&
-		planstate->instrument && planstate->instrument->nloops > 0)
+	/*
+	 * If we have combined instrumentation from parallel workers, add the
+	 * leader's contribution.
+	 */
+	if (planstate->combined_instrument && planstate->instrument)
+		InstrAggNode(planstate->combined_instrument, planstate->instrument);
+
+	if (es->analyze && show_instrument && show_instrument->nloops > 0)
 	{
-		double		nloops = planstate->instrument->nloops;
-		double		startup_ms = 1000.0 * planstate->instrument->startup / nloops;
-		double		total_ms = 1000.0 * planstate->instrument->total / nloops;
-		double		rows = planstate->instrument->ntuples / nloops;
+		double		nloops = show_instrument->nloops;
+		double		startup_ms = 1000.0 * show_instrument->startup / nloops;
+		double		total_ms = 1000.0 * show_instrument->total / nloops;
+		double		rows = show_instrument->ntuples / nloops;
 
 		if (es->format == EXPLAIN_FORMAT_TEXT)
 		{
@@ -1586,26 +1668,26 @@ ExplainNode(PlanState *planstate, List *ancestors,
 						   "Index Cond", planstate, ancestors, es);
 			if (((IndexScan *) plan)->indexqualorig)
 				show_instrumentation_count("Rows Removed by Index Recheck", 2,
-										   planstate, es);
+										   show_instrument, es);
 			show_scan_qual(((IndexScan *) plan)->indexorderbyorig,
 						   "Order By", planstate, ancestors, es);
 			show_scan_qual(plan->qual, "Filter", planstate, ancestors, es);
 			if (plan->qual)
 				show_instrumentation_count("Rows Removed by Filter", 1,
-										   planstate, es);
+										   show_instrument, es);
 			break;
 		case T_IndexOnlyScan:
 			show_scan_qual(((IndexOnlyScan *) plan)->indexqual,
 						   "Index Cond", planstate, ancestors, es);
 			if (((IndexOnlyScan *) plan)->indexqual)
 				show_instrumentation_count("Rows Removed by Index Recheck", 2,
-										   planstate, es);
+										   show_instrument, es);
 			show_scan_qual(((IndexOnlyScan *) plan)->indexorderby,
 						   "Order By", planstate, ancestors, es);
 			show_scan_qual(plan->qual, "Filter", planstate, ancestors, es);
 			if (plan->qual)
 				show_instrumentation_count("Rows Removed by Filter", 1,
-										   planstate, es);
+										   show_instrument, es);
 			if (es->analyze)
 				ExplainPropertyFloat("Heap Fetches", NULL,
 									 planstate->instrument->ntuples2, 0, es);
@@ -1619,11 +1701,11 @@ ExplainNode(PlanState *planstate, List *ancestors,
 						   "Recheck Cond", planstate, ancestors, es);
 			if (((BitmapHeapScan *) plan)->bitmapqualorig)
 				show_instrumentation_count("Rows Removed by Index Recheck", 2,
-										   planstate, es);
+										   show_instrument, es);
 			show_scan_qual(plan->qual, "Filter", planstate, ancestors, es);
 			if (plan->qual)
 				show_instrumentation_count("Rows Removed by Filter", 1,
-										   planstate, es);
+										   show_instrument, es);
 			if (es->analyze)
 				show_tidbitmap_info((BitmapHeapScanState *) planstate, es);
 			break;
@@ -1641,7 +1723,7 @@ ExplainNode(PlanState *planstate, List *ancestors,
 			show_scan_qual(plan->qual, "Filter", planstate, ancestors, es);
 			if (plan->qual)
 				show_instrumentation_count("Rows Removed by Filter", 1,
-										   planstate, es);
+										   show_instrument, es);
 			break;
 		case T_Gather:
 			{
@@ -1650,7 +1732,7 @@ ExplainNode(PlanState *planstate, List *ancestors,
 				show_scan_qual(plan->qual, "Filter", planstate, ancestors, es);
 				if (plan->qual)
 					show_instrumentation_count("Rows Removed by Filter", 1,
-											   planstate, es);
+											   show_instrument, es);
 				ExplainPropertyInteger("Workers Planned", NULL,
 									   gather->num_workers, es);
 
@@ -1696,7 +1778,7 @@ ExplainNode(PlanState *planstate, List *ancestors,
 				show_scan_qual(plan->qual, "Filter", planstate, ancestors, es);
 				if (plan->qual)
 					show_instrumentation_count("Rows Removed by Filter", 1,
-											   planstate, es);
+											   show_instrument, es);
 				ExplainPropertyInteger("Workers Planned", NULL,
 									   gm->num_workers, es);
 
@@ -1734,7 +1816,7 @@ ExplainNode(PlanState *planstate, List *ancestors,
 			show_scan_qual(plan->qual, "Filter", planstate, ancestors, es);
 			if (plan->qual)
 				show_instrumentation_count("Rows Removed by Filter", 1,
-										   planstate, es);
+										   show_instrument, es);
 			break;
 		case T_TableFuncScan:
 			if (es->verbose)
@@ -1748,7 +1830,7 @@ ExplainNode(PlanState *planstate, List *ancestors,
 			show_scan_qual(plan->qual, "Filter", planstate, ancestors, es);
 			if (plan->qual)
 				show_instrumentation_count("Rows Removed by Filter", 1,
-										   planstate, es);
+										   show_instrument, es);
 			break;
 		case T_TidScan:
 			{
@@ -1764,14 +1846,14 @@ ExplainNode(PlanState *planstate, List *ancestors,
 				show_scan_qual(plan->qual, "Filter", planstate, ancestors, es);
 				if (plan->qual)
 					show_instrumentation_count("Rows Removed by Filter", 1,
-											   planstate, es);
+											   show_instrument, es);
 			}
 			break;
 		case T_ForeignScan:
 			show_scan_qual(plan->qual, "Filter", planstate, ancestors, es);
 			if (plan->qual)
 				show_instrumentation_count("Rows Removed by Filter", 1,
-										   planstate, es);
+										   show_instrument, es);
 			show_foreignscan_info((ForeignScanState *) planstate, es);
 			break;
 		case T_CustomScan:
@@ -1781,7 +1863,7 @@ ExplainNode(PlanState *planstate, List *ancestors,
 				show_scan_qual(plan->qual, "Filter", planstate, ancestors, es);
 				if (plan->qual)
 					show_instrumentation_count("Rows Removed by Filter", 1,
-											   planstate, es);
+											   show_instrument, es);
 				if (css->methods->ExplainCustomScan)
 					css->methods->ExplainCustomScan(css, ancestors, es);
 			}
@@ -1791,11 +1873,11 @@ ExplainNode(PlanState *planstate, List *ancestors,
 							"Join Filter", planstate, ancestors, es);
 			if (((NestLoop *) plan)->join.joinqual)
 				show_instrumentation_count("Rows Removed by Join Filter", 1,
-										   planstate, es);
+										   show_instrument, es);
 			show_upper_qual(plan->qual, "Filter", planstate, ancestors, es);
 			if (plan->qual)
 				show_instrumentation_count("Rows Removed by Filter", 2,
-										   planstate, es);
+										   show_instrument, es);
 			break;
 		case T_MergeJoin:
 			show_upper_qual(((MergeJoin *) plan)->mergeclauses,
@@ -1804,11 +1886,11 @@ ExplainNode(PlanState *planstate, List *ancestors,
 							"Join Filter", planstate, ancestors, es);
 			if (((MergeJoin *) plan)->join.joinqual)
 				show_instrumentation_count("Rows Removed by Join Filter", 1,
-										   planstate, es);
+										   show_instrument, es);
 			show_upper_qual(plan->qual, "Filter", planstate, ancestors, es);
 			if (plan->qual)
 				show_instrumentation_count("Rows Removed by Filter", 2,
-										   planstate, es);
+										   show_instrument, es);
 			break;
 		case T_HashJoin:
 			show_upper_qual(((HashJoin *) plan)->hashclauses,
@@ -1817,25 +1899,25 @@ ExplainNode(PlanState *planstate, List *ancestors,
 							"Join Filter", planstate, ancestors, es);
 			if (((HashJoin *) plan)->join.joinqual)
 				show_instrumentation_count("Rows Removed by Join Filter", 1,
-										   planstate, es);
+										   show_instrument, es);
 			show_upper_qual(plan->qual, "Filter", planstate, ancestors, es);
 			if (plan->qual)
 				show_instrumentation_count("Rows Removed by Filter", 2,
-										   planstate, es);
+										   show_instrument, es);
 			break;
 		case T_Agg:
 			show_agg_keys(castNode(AggState, planstate), ancestors, es);
 			show_upper_qual(plan->qual, "Filter", planstate, ancestors, es);
 			if (plan->qual)
 				show_instrumentation_count("Rows Removed by Filter", 1,
-										   planstate, es);
+										   show_instrument, es);
 			break;
 		case T_Group:
 			show_group_keys(castNode(GroupState, planstate), ancestors, es);
 			show_upper_qual(plan->qual, "Filter", planstate, ancestors, es);
 			if (plan->qual)
 				show_instrumentation_count("Rows Removed by Filter", 1,
-										   planstate, es);
+										   show_instrument, es);
 			break;
 		case T_Sort:
 			show_sort_keys(castNode(SortState, planstate), ancestors, es);
@@ -1851,7 +1933,7 @@ ExplainNode(PlanState *planstate, List *ancestors,
 			show_upper_qual(plan->qual, "Filter", planstate, ancestors, es);
 			if (plan->qual)
 				show_instrumentation_count("Rows Removed by Filter", 1,
-										   planstate, es);
+										   show_instrument, es);
 			break;
 		case T_ModifyTable:
 			show_modifytable_info(castNode(ModifyTableState, planstate), ancestors,
@@ -1865,73 +1947,21 @@ ExplainNode(PlanState *planstate, List *ancestors,
 	}
 
 	/* Show buffer usage */
-	if (es->buffers && planstate->instrument)
-		show_buffer_usage(es, &planstate->instrument->bufusage);
+	if (es->buffers && show_instrument)
+		show_buffer_usage(es, &show_instrument->bufusage);
 
 	/* Show worker detail */
 	if (es->analyze && es->verbose && planstate->worker_instrument)
 	{
 		WorkerInstrumentation *w = planstate->worker_instrument;
 		bool		opened_group = false;
-		int			n;
-
-		for (n = 0; n < w->num_workers; ++n)
-		{
-			Instrumentation *instrument = &w->instrument[n];
-			double		nloops = instrument->nloops;
-			double		startup_ms;
-			double		total_ms;
-			double		rows;
-
-			if (nloops <= 0)
-				continue;
-			startup_ms = 1000.0 * instrument->startup / nloops;
-			total_ms = 1000.0 * instrument->total / nloops;
-			rows = instrument->ntuples / nloops;
-
-			if (es->format == EXPLAIN_FORMAT_TEXT)
-			{
-				appendStringInfoSpaces(es->str, es->indent * 2);
-				appendStringInfo(es->str, "Worker %d: ", n);
-				if (es->timing)
-					appendStringInfo(es->str,
-									 "actual time=%.3f..%.3f rows=%.0f loops=%.0f\n",
-									 startup_ms, total_ms, rows, nloops);
-				else
-					appendStringInfo(es->str,
-									 "actual rows=%.0f loops=%.0f\n",
-									 rows, nloops);
-				es->indent++;
-				if (es->buffers)
-					show_buffer_usage(es, &instrument->bufusage);
-				es->indent--;
-			}
-			else
-			{
-				if (!opened_group)
-				{
-					ExplainOpenGroup("Workers", "Workers", false, es);
-					opened_group = true;
-				}
-				ExplainOpenGroup("Worker", NULL, true, es);
-				ExplainPropertyInteger("Worker Number", NULL, n, es);
-
-				if (es->timing)
-				{
-					ExplainPropertyFloat("Actual Startup Time", "ms",
-										 startup_ms, 3, es);
-					ExplainPropertyFloat("Actual Total Time", "ms",
-										 total_ms, 3, es);
-				}
-				ExplainPropertyFloat("Actual Rows", NULL, rows, 0, es);
-				ExplainPropertyFloat("Actual Loops", NULL, nloops, 0, es);
 
-				if (es->buffers)
-					show_buffer_usage(es, &instrument->bufusage);
+		/* Leader */
+		ExplainNodePerProcess(es, &opened_group, -1, planstate->instrument);
 
-				ExplainCloseGroup("Worker", NULL, true, es);
-			}
-		}
+		/* Workers */
+		for (int n = 0; n < w->num_workers; ++n)
+			ExplainNodePerProcess(es, &opened_group, n, &w->instrument[n]);
 
 		if (opened_group)
 			ExplainCloseGroup("Workers", "Workers", false, es);
@@ -2750,19 +2780,19 @@ show_tidbitmap_info(BitmapHeapScanState *planstate, ExplainState *es)
  */
 static void
 show_instrumentation_count(const char *qlabel, int which,
-						   PlanState *planstate, ExplainState *es)
+						   Instrumentation *instrument, ExplainState *es)
 {
 	double		nfiltered;
 	double		nloops;
 
-	if (!es->analyze || !planstate->instrument)
+	if (!es->analyze || !instrument)
 		return;
 
 	if (which == 2)
-		nfiltered = planstate->instrument->nfiltered2;
+		nfiltered = instrument->nfiltered2;
 	else
-		nfiltered = planstate->instrument->nfiltered1;
-	nloops = planstate->instrument->nloops;
+		nfiltered = instrument->nfiltered1;
+	nloops = instrument->nloops;
 
 	/* In text mode, suppress zero counts; they're not interesting enough */
 	if (nfiltered > 0 || es->format != EXPLAIN_FORMAT_TEXT)
@@ -3280,7 +3310,8 @@ show_modifytable_info(ModifyTableState *mtstate, List *ancestors,
 		{
 			show_upper_qual((List *) node->onConflictWhere, "Conflict Filter",
 							&mtstate->ps, ancestors, es);
-			show_instrumentation_count("Rows Removed by Conflict Filter", 1, &mtstate->ps, es);
+			show_instrumentation_count("Rows Removed by Conflict Filter", 1,
+									   mtstate->ps.instrument, es);
 		}
 
 		/* EXPLAIN ANALYZE display of actual outcome for each tuple proposed */
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c
index 53cd2fc666..26a04692ab 100644
--- a/src/backend/executor/execParallel.c
+++ b/src/backend/executor/execParallel.c
@@ -987,20 +987,22 @@ ExecParallelRetrieveInstrumentation(PlanState *planstate,
 	if (i >= instrumentation->num_plan_nodes)
 		elog(ERROR, "plan node %d not found", plan_node_id);
 
-	/* Accumulate the statistics from all workers. */
-	instrument = GetInstrumentationArray(instrumentation);
-	instrument += i * instrumentation->num_workers;
-	for (n = 0; n < instrumentation->num_workers; ++n)
-		InstrAggNode(planstate->instrument, &instrument[n]);
-
 	/*
-	 * Also store the per-worker detail.
-	 *
 	 * Worker instrumentation should be allocated in the same context as the
 	 * regular instrumentation information, which is the per-query context.
 	 * Switch into per-query memory context.
 	 */
 	oldcontext = MemoryContextSwitchTo(planstate->state->es_query_cxt);
+
+	/* Accumulate the statistics from all workers. */
+	if (!planstate->combined_instrument)
+		planstate->combined_instrument = palloc0(sizeof(Instrumentation));
+	instrument = GetInstrumentationArray(instrumentation);
+	instrument += i * instrumentation->num_workers;
+	for (n = 0; n < instrumentation->num_workers; ++n)
+		InstrAggNode(planstate->combined_instrument, &instrument[n]);
+
+	/* Also store the per-worker detail. */
 	ibytes = mul_size(instrumentation->num_workers, sizeof(Instrumentation));
 	planstate->worker_instrument =
 		palloc(ibytes + offsetof(WorkerInstrumentation, instrument));
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 44f76082e9..cfeddbe5fc 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -948,6 +948,7 @@ typedef struct PlanState
 										 * wrapper */
 
 	Instrumentation *instrument;	/* Optional runtime stats for this node */
+	Instrumentation *combined_instrument;		/* sum for leader and workers */
 	WorkerInstrumentation *worker_instrument;	/* per-worker instrumentation */
 
 	/* Per-worker JIT instrumentation */
-- 
2.23.0

