From 2145e9070f81301128ad1f3b76ffa46483b51415 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@enterprisedb.com>
Date: Mon, 4 Dec 2017 11:52:11 +1300
Subject: [PATCH] Fix EXPLAIN ANALYZE of hash join when the leader doesn't
 execute.

If a hash join appears in a parallel query, there may be no hash table
available for explain.c to inspect even though a hash table may have been
built in other processes.  This could happen either because
parallel_leader_participation was set to off or because the leader happened to
hit the end of the outer relation immediately (even though the complete
relation is not empty) and decided not to build the hash table.

Commit bf11e7ee introduced a way for workers to exchange instrumentation via
the DSM segment for Sort nodes even though they are not parallel-aware.  This
commit does the same for Hash nodes, so that explain.c has a way to find
instrumentation data from an arbitrary participant that actually built the
hash table.

Author: Thomas Munro
Reviewed-By: Andres Freund
Discussion: https://postgr.es/m/CAEepm%3D3DUQC2-z252N55eOcZBer6DPdM%3DFzrxH9dZc5vYLsjaA%40mail.gmail.com
---
 src/backend/commands/explain.c      |  58 ++++++++++++++------
 src/backend/executor/execParallel.c |  43 +++++++++++----
 src/backend/executor/execProcnode.c |   3 ++
 src/backend/executor/nodeHash.c     | 104 ++++++++++++++++++++++++++++++++++++
 src/include/executor/nodeHash.h     |   9 ++++
 src/include/nodes/execnodes.h       |  26 +++++++++
 src/test/regress/expected/join.out  |  15 ++++++
 src/test/regress/sql/join.sql       |  11 ++++
 8 files changed, 244 insertions(+), 25 deletions(-)

diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c
index 447f69d044e..1ffe635d66c 100644
--- a/src/backend/commands/explain.c
+++ b/src/backend/commands/explain.c
@@ -2379,34 +2379,62 @@ show_sort_info(SortState *sortstate, ExplainState *es)
 static void
 show_hash_info(HashState *hashstate, ExplainState *es)
 {
-	HashJoinTable hashtable;
+	HashInstrumentation *hinstrument = NULL;
 
-	hashtable = hashstate->hashtable;
+	/*
+	 * In a parallel query, the leader process may or may not have run the
+	 * hash join, and even if it did it may not have built a hash table due to
+	 * timing (if it started late it might have seen no tuples in the outer
+	 * relation and skipped building the hash table).  Therefore we have to be
+	 * prepared to get instrumentation data from a worker if there is no hash
+	 * table.
+	 */
+	if (hashstate->hashtable)
+	{
+		hinstrument = (HashInstrumentation *)
+			palloc(sizeof(HashInstrumentation));
+		ExecHashGetInstrumentation(hinstrument, hashstate->hashtable);
+	}
+	else if (hashstate->shared_info)
+	{
+		SharedHashInfo *shared_info = hashstate->shared_info;
+		int		i;
+
+		/* Find the first worker that built a hash table. */
+		for (i = 0; i < shared_info->num_workers; ++i)
+		{
+			if (shared_info->hinstrument[i].nbatch > 0)
+			{
+				hinstrument = &shared_info->hinstrument[i];
+				break;
+			}
+		}
+	}
 
-	if (hashtable)
+	if (hinstrument)
 	{
-		long		spacePeakKb = (hashtable->spacePeak + 1023) / 1024;
+		long		spacePeakKb = (hinstrument->space_peak + 1023) / 1024;
 
 		if (es->format != EXPLAIN_FORMAT_TEXT)
 		{
-			ExplainPropertyLong("Hash Buckets", hashtable->nbuckets, es);
+			ExplainPropertyLong("Hash Buckets", hinstrument->nbuckets, es);
 			ExplainPropertyLong("Original Hash Buckets",
-								hashtable->nbuckets_original, es);
-			ExplainPropertyLong("Hash Batches", hashtable->nbatch, es);
+								hinstrument->nbuckets_original, es);
+			ExplainPropertyLong("Hash Batches", hinstrument->nbatch, es);
 			ExplainPropertyLong("Original Hash Batches",
-								hashtable->nbatch_original, es);
+								hinstrument->nbatch_original, es);
 			ExplainPropertyLong("Peak Memory Usage", spacePeakKb, es);
 		}
-		else if (hashtable->nbatch_original != hashtable->nbatch ||
-				 hashtable->nbuckets_original != hashtable->nbuckets)
+		else if (hinstrument->nbatch_original != hinstrument->nbatch ||
+				 hinstrument->nbuckets_original != hinstrument->nbuckets)
 		{
 			appendStringInfoSpaces(es->str, es->indent * 2);
 			appendStringInfo(es->str,
 							 "Buckets: %d (originally %d)  Batches: %d (originally %d)  Memory Usage: %ldkB\n",
-							 hashtable->nbuckets,
-							 hashtable->nbuckets_original,
-							 hashtable->nbatch,
-							 hashtable->nbatch_original,
+							 hinstrument->nbuckets,
+							 hinstrument->nbuckets_original,
+							 hinstrument->nbatch,
+							 hinstrument->nbatch_original,
 							 spacePeakKb);
 		}
 		else
@@ -2414,7 +2442,7 @@ show_hash_info(HashState *hashstate, ExplainState *es)
 			appendStringInfoSpaces(es->str, es->indent * 2);
 			appendStringInfo(es->str,
 							 "Buckets: %d  Batches: %d  Memory Usage: %ldkB\n",
-							 hashtable->nbuckets, hashtable->nbatch,
+							 hinstrument->nbuckets, hinstrument->nbatch,
 							 spacePeakKb);
 		}
 	}
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c
index 53c5254be13..0aca00b0e68 100644
--- a/src/backend/executor/execParallel.c
+++ b/src/backend/executor/execParallel.c
@@ -29,6 +29,7 @@
 #include "executor/nodeBitmapHeapscan.h"
 #include "executor/nodeCustom.h"
 #include "executor/nodeForeignscan.h"
+#include "executor/nodeHash.h"
 #include "executor/nodeIndexscan.h"
 #include "executor/nodeIndexonlyscan.h"
 #include "executor/nodeSeqscan.h"
@@ -259,8 +260,12 @@ ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e)
 				ExecBitmapHeapEstimate((BitmapHeapScanState *) planstate,
 									   e->pcxt);
 			break;
+		case T_HashState:
+			/* even when not parallel-aware, for EXPLAIN ANALYZE */
+			ExecHashEstimate((HashState *) planstate, e->pcxt);
+			break;
 		case T_SortState:
-			/* even when not parallel-aware */
+			/* even when not parallel-aware, for EXPLAIN ANALYZE */
 			ExecSortEstimate((SortState *) planstate, e->pcxt);
 			break;
 
@@ -458,8 +463,12 @@ ExecParallelInitializeDSM(PlanState *planstate,
 				ExecBitmapHeapInitializeDSM((BitmapHeapScanState *) planstate,
 											d->pcxt);
 			break;
+		case T_HashState:
+			/* even when not parallel-aware, for EXPLAIN ANALYZE */
+			ExecHashInitializeDSM((HashState *) planstate, d->pcxt);
+			break;
 		case T_SortState:
-			/* even when not parallel-aware */
+			/* even when not parallel-aware, for EXPLAIN ANALYZE */
 			ExecSortInitializeDSM((SortState *) planstate, d->pcxt);
 			break;
 
@@ -872,8 +881,12 @@ ExecParallelReInitializeDSM(PlanState *planstate,
 				ExecBitmapHeapReInitializeDSM((BitmapHeapScanState *) planstate,
 											  pcxt);
 			break;
+		case T_HashState:
+			/* even when not parallel-aware, for EXPLAIN ANALYZE */
+			ExecHashReInitializeDSM((HashState *) planstate, pcxt);
+			break;
 		case T_SortState:
-			/* even when not parallel-aware */
+			/* even when not parallel-aware, for EXPLAIN ANALYZE */
 			ExecSortReInitializeDSM((SortState *) planstate, pcxt);
 			break;
 
@@ -928,12 +941,18 @@ ExecParallelRetrieveInstrumentation(PlanState *planstate,
 	planstate->worker_instrument->num_workers = instrumentation->num_workers;
 	memcpy(&planstate->worker_instrument->instrument, instrument, ibytes);
 
-	/*
-	 * Perform any node-type-specific work that needs to be done.  Currently,
-	 * only Sort nodes need to do anything here.
-	 */
-	if (IsA(planstate, SortState))
-		ExecSortRetrieveInstrumentation((SortState *) planstate);
+	/* Perform any node-type-specific work that needs to be done. */
+	switch (nodeTag(planstate))
+	{
+		case T_SortState:
+			ExecSortRetrieveInstrumentation((SortState *) planstate);
+			break;
+		case T_HashState:
+			ExecHashRetrieveInstrumentation((HashState *) planstate);
+			break;
+		default:
+			break;
+	}
 
 	return planstate_tree_walker(planstate, ExecParallelRetrieveInstrumentation,
 								 instrumentation);
@@ -1160,8 +1179,12 @@ ExecParallelInitializeWorker(PlanState *planstate, ParallelWorkerContext *pwcxt)
 				ExecBitmapHeapInitializeWorker((BitmapHeapScanState *) planstate,
 											   pwcxt);
 			break;
+		case T_HashState:
+			/* even when not parallel-aware, for EXPLAIN ANALYZE */
+			ExecHashInitializeWorker((HashState *) planstate, pwcxt);
+			break;
 		case T_SortState:
-			/* even when not parallel-aware */
+			/* even when not parallel-aware, for EXPLAIN ANALYZE */
 			ExecSortInitializeWorker((SortState *) planstate, pwcxt);
 			break;
 
diff --git a/src/backend/executor/execProcnode.c b/src/backend/executor/execProcnode.c
index c1aa5064c90..9befca90161 100644
--- a/src/backend/executor/execProcnode.c
+++ b/src/backend/executor/execProcnode.c
@@ -751,6 +751,9 @@ ExecShutdownNode(PlanState *node)
 		case T_GatherMergeState:
 			ExecShutdownGatherMerge((GatherMergeState *) node);
 			break;
+		case T_HashState:
+			ExecShutdownHash((HashState *) node);
+			break;
 		default:
 			break;
 	}
diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c
index f7cd8fb3472..6fe5d69d558 100644
--- a/src/backend/executor/nodeHash.c
+++ b/src/backend/executor/nodeHash.c
@@ -1637,6 +1637,110 @@ ExecHashRemoveNextSkewBucket(HashJoinTable hashtable)
 	}
 }
 
+/*
+ * Reserve space in the DSM segment for instrumentation data.
+ */
+void
+ExecHashEstimate(HashState *node, ParallelContext *pcxt)
+{
+	size_t		size;
+
+	size = mul_size(pcxt->nworkers, sizeof(HashInstrumentation));
+	size = add_size(size, offsetof(SharedHashInfo, hinstrument));
+	shm_toc_estimate_chunk(&pcxt->estimator, size);
+	shm_toc_estimate_keys(&pcxt->estimator, 1);
+}
+
+/*
+ * Set up a space in the DSM for all workers to record instrumentation data
+ * about their hash table.
+ */
+void
+ExecHashInitializeDSM(HashState *node, ParallelContext *pcxt)
+{
+	size_t		size;
+
+	size = offsetof(SharedHashInfo, hinstrument) +
+		pcxt->nworkers * sizeof(HashInstrumentation);
+	node->shared_info = (SharedHashInfo *) shm_toc_allocate(pcxt->toc, size);
+	memset(node->shared_info, 0, size);
+	node->shared_info->num_workers = pcxt->nworkers;
+	shm_toc_insert(pcxt->toc, node->ps.plan->plan_node_id,
+				   node->shared_info);
+}
+
+/*
+ * Reset shared state before beginning a fresh scan.
+ */
+void
+ExecHashReInitializeDSM(HashState *node, ParallelContext *pcxt)
+{
+	if (node->shared_info != NULL)
+	{
+		memset(node->shared_info->hinstrument, 0,
+			   node->shared_info->num_workers * sizeof(HashInstrumentation));
+	}
+}
+
+/*
+ * Locate the DSM space for hash table instrumentation data that we'll write
+ * to at shutdown time.
+ */
+void
+ExecHashInitializeWorker(HashState *node, ParallelWorkerContext *pwcxt)
+{
+	SharedHashInfo *shared_info;
+
+	shared_info = (SharedHashInfo *)
+		shm_toc_lookup(pwcxt->toc, node->ps.plan->plan_node_id, true);
+	node->hinstrument = &shared_info->hinstrument[ParallelWorkerNumber];
+}
+
+/*
+ * Copy instrumentation data from this worker's hash table (if it built one)
+ * to DSM memory so the leader can retrieve it.  This must be done in an
+ * ExecShutdownHash() rather than ExecEndHash() because the latter runs after
+ * we've detached from the DSM segment.
+ */
+void
+ExecShutdownHash(HashState *node)
+{
+	if (node->hinstrument && node->hashtable)
+		ExecHashGetInstrumentation(node->hinstrument, node->hashtable);
+}
+
+/*
+ * Retrieve instrumentation data from workers before the DSM segment is
+ * detached, so that EXPLAIN can access it.
+ */
+void
+ExecHashRetrieveInstrumentation(HashState *node)
+{
+	SharedHashInfo *shared_info = node->shared_info;
+	size_t		size;
+
+	/* Replace node->shared_info with a copy in backend-local memory. */
+	size = offsetof(SharedHashInfo, hinstrument) +
+		shared_info->num_workers * sizeof(HashInstrumentation);
+	node->shared_info = palloc(size);
+	memcpy(node->shared_info, shared_info, size);
+}
+
+/*
+ * Copy the instrumentation data from 'hashtable' into a HashInstrumentation
+ * struct.
+ */
+void
+ExecHashGetInstrumentation(HashInstrumentation *instrument,
+						   HashJoinTable hashtable)
+{
+	instrument->nbuckets = hashtable->nbuckets;
+	instrument->nbuckets_original = hashtable->nbuckets_original;
+	instrument->nbatch = hashtable->nbatch;
+	instrument->nbatch_original = hashtable->nbatch_original;
+	instrument->space_peak = hashtable->spacePeak;
+}
+
 /*
  * Allocate 'size' bytes from the currently active HashMemoryChunk
  */
diff --git a/src/include/executor/nodeHash.h b/src/include/executor/nodeHash.h
index 3ae556fb6c5..75d4c70f6f6 100644
--- a/src/include/executor/nodeHash.h
+++ b/src/include/executor/nodeHash.h
@@ -14,6 +14,7 @@
 #ifndef NODEHASH_H
 #define NODEHASH_H
 
+#include "access/parallel.h"
 #include "nodes/execnodes.h"
 
 extern HashState *ExecInitHash(Hash *node, EState *estate, int eflags);
@@ -48,5 +49,13 @@ extern void ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,
 						int *numbatches,
 						int *num_skew_mcvs);
 extern int	ExecHashGetSkewBucket(HashJoinTable hashtable, uint32 hashvalue);
+extern void ExecHashEstimate(HashState *node, ParallelContext *pcxt);
+extern void ExecHashInitializeDSM(HashState *node, ParallelContext *pcxt);
+extern void ExecHashInitializeWorker(HashState *node, ParallelWorkerContext *pwcxt);
+extern void ExecHashReInitializeDSM(HashState *node, ParallelContext *pcxt);
+extern void ExecHashRetrieveInstrumentation(HashState *node);
+extern void ExecShutdownHash(HashState *node);
+extern void ExecHashGetInstrumentation(HashInstrumentation *instrument,
+									   HashJoinTable hashtable);
 
 #endif							/* NODEHASH_H */
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index e05bc04f525..084d59ef834 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -1980,6 +1980,29 @@ typedef struct GatherMergeState
 	struct binaryheap *gm_heap; /* binary heap of slot indices */
 } GatherMergeState;
 
+/* ----------------
+ *	 Values displayed by EXPLAIN ANALYZE
+ * ----------------
+ */
+typedef struct HashInstrumentation
+{
+	int			nbuckets;		/* number of buckets at end of execution */
+	int			nbuckets_original;	/* planned number of buckets */
+	int			nbatch;			/* number of batches at end of execution */
+	int			nbatch_original;	/* planned number of batches */
+	size_t		space_peak;		/* speak memory usage in bytes */
+} HashInstrumentation;
+
+/* ----------------
+ *	 Shared memory container for per-worker hash information
+ * ----------------
+ */
+typedef struct SharedHashInfo
+{
+	int			num_workers;
+	HashInstrumentation hinstrument[FLEXIBLE_ARRAY_MEMBER];
+} SharedHashInfo;
+
 /* ----------------
  *	 HashState information
  * ----------------
@@ -1990,6 +2013,9 @@ typedef struct HashState
 	HashJoinTable hashtable;	/* hash table for the hashjoin */
 	List	   *hashkeys;		/* list of ExprState nodes */
 	/* hashkeys is same as parent's hj_InnerHashKeys */
+
+	SharedHashInfo *shared_info;	/* one entry per worker */
+	HashInstrumentation *hinstrument;	/* this worker's entry */
 } HashState;
 
 /* ----------------
diff --git a/src/test/regress/expected/join.out b/src/test/regress/expected/join.out
index b7d17900978..001d96dc2d8 100644
--- a/src/test/regress/expected/join.out
+++ b/src/test/regress/expected/join.out
@@ -6173,6 +6173,21 @@ $$);
 
 rollback to settings;
 -- A couple of other hash join tests unrelated to work_mem management.
+-- Check that EXPLAIN ANALYZE has data even if the leader doesn't participate
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+set local work_mem = '4MB';
+set local parallel_leader_participation = off;
+select * from hash_join_batches(
+$$
+  select count(*) from simple r join simple s using (id);
+$$);
+ original | final 
+----------+-------
+        1 |     1
+(1 row)
+
+rollback to settings;
 -- A full outer join where every record is matched.
 -- non-parallel
 savepoint settings;
diff --git a/src/test/regress/sql/join.sql b/src/test/regress/sql/join.sql
index c6d4a513e86..882601b3388 100644
--- a/src/test/regress/sql/join.sql
+++ b/src/test/regress/sql/join.sql
@@ -2159,6 +2159,17 @@ rollback to settings;
 
 -- A couple of other hash join tests unrelated to work_mem management.
 
+-- Check that EXPLAIN ANALYZE has data even if the leader doesn't participate
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+set local work_mem = '4MB';
+set local parallel_leader_participation = off;
+select * from hash_join_batches(
+$$
+  select count(*) from simple r join simple s using (id);
+$$);
+rollback to settings;
+
 -- A full outer join where every record is matched.
 
 -- non-parallel
-- 
2.15.0

