From 4fa8ad852064403a6553cc71c8de21d59d972f01 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@enterprisedb.com>
Date: Sun, 31 Dec 2017 02:03:07 +1300
Subject: [PATCH] Fix EXPLAIN ANALYZE output for Parallel Hash.

In a race case, EXPLAIN ANALYZE could fail to display correct nbatch and size
information.  Refactor so that participants report only on batches they worked
on rather than trying to report on all of them, and teach explain.c to
consider the HashInstrumentation object from all participants instead of
picking the first one it can find.  This should fix an occasional build farm
failure in the "join" regression test.

Author: Thomas Munro
Reviewed-By: Andres Freund
Discussion: https://postgr.es/m/30219.1514428346%40sss.pgh.pa.us
---
 src/backend/commands/explain.c      | 79 ++++++++++++++++++++++++-------------
 src/backend/executor/nodeHash.c     | 27 +++++--------
 src/backend/executor/nodeHashjoin.c |  6 ---
 src/include/executor/nodeHash.h     |  1 -
 4 files changed, 62 insertions(+), 51 deletions(-)

diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c
index 7e4fbafc535..2156385ac88 100644
--- a/src/backend/commands/explain.c
+++ b/src/backend/commands/explain.c
@@ -2379,62 +2379,87 @@ show_sort_info(SortState *sortstate, ExplainState *es)
 static void
 show_hash_info(HashState *hashstate, ExplainState *es)
 {
-	HashInstrumentation *hinstrument = NULL;
+	HashInstrumentation hinstrument = {0};
 
 	/*
 	 * In a parallel query, the leader process may or may not have run the
 	 * hash join, and even if it did it may not have built a hash table due to
 	 * timing (if it started late it might have seen no tuples in the outer
 	 * relation and skipped building the hash table).  Therefore we have to be
-	 * prepared to get instrumentation data from a worker if there is no hash
-	 * table.
+	 * prepared to get instrumentation data from all participants.
 	 */
 	if (hashstate->hashtable)
-	{
-		hinstrument = (HashInstrumentation *)
-			palloc(sizeof(HashInstrumentation));
-		ExecHashGetInstrumentation(hinstrument, hashstate->hashtable);
-	}
-	else if (hashstate->shared_info)
+		ExecHashGetInstrumentation(&hinstrument, hashstate->hashtable);
+
+	/*
+	 * Merge results from workers.  In the parallel-oblivious case, the
+	 * results from all participants should be identical, except where
+	 * participants didn't run the join at all so have no data.  In the
+	 * parallel-aware case, we need to consider all the results.  Each worker
+	 * may have seen a different subset of batches and we want to find the
+	 * highest memory usage for any one batch across all batches.
+	 */
+	if (hashstate->shared_info)
 	{
 		SharedHashInfo *shared_info = hashstate->shared_info;
-		int		i;
+		int			i;
 
-		/* Find the first worker that built a hash table. */
 		for (i = 0; i < shared_info->num_workers; ++i)
 		{
-			if (shared_info->hinstrument[i].nbatch > 0)
+			HashInstrumentation *worker_hi = &shared_info->hinstrument[i];
+
+			if (worker_hi->nbatch > 0)
 			{
-				hinstrument = &shared_info->hinstrument[i];
-				break;
+				/*
+				 * Every participant should agree on the buckets, so to be
+				 * sure we have a value we'll just overwrite each time.
+				 */
+				hinstrument.nbuckets = worker_hi->nbuckets;
+				hinstrument.nbuckets_original = worker_hi->nbuckets_original;
+
+				/*
+				 * Normally every participant should agree on the number of
+				 * batches too, but it's possible for a backend that started
+				 * late and missed the whole join not to have the final nbatch
+				 * number.  So we'll take the largest number.
+				 */
+				hinstrument.nbatch = Max(hinstrument.nbatch, worker_hi->nbatch);
+				hinstrument.nbatch_original = worker_hi->nbatch_original;
+
+				/*
+				 * In a parallel-aware hash join, for now we report the
+				 * maximum peak memory reported by any worker.
+				 */
+				hinstrument.space_peak =
+					Max(hinstrument.space_peak, worker_hi->space_peak);
 			}
 		}
 	}
 
-	if (hinstrument)
+	if (hinstrument.nbatch > 0)
 	{
-		long		spacePeakKb = (hinstrument->space_peak + 1023) / 1024;
+		long		spacePeakKb = (hinstrument.space_peak + 1023) / 1024;
 
 		if (es->format != EXPLAIN_FORMAT_TEXT)
 		{
-			ExplainPropertyLong("Hash Buckets", hinstrument->nbuckets, es);
+			ExplainPropertyLong("Hash Buckets", hinstrument.nbuckets, es);
 			ExplainPropertyLong("Original Hash Buckets",
-								hinstrument->nbuckets_original, es);
-			ExplainPropertyLong("Hash Batches", hinstrument->nbatch, es);
+								hinstrument.nbuckets_original, es);
+			ExplainPropertyLong("Hash Batches", hinstrument.nbatch, es);
 			ExplainPropertyLong("Original Hash Batches",
-								hinstrument->nbatch_original, es);
+								hinstrument.nbatch_original, es);
 			ExplainPropertyLong("Peak Memory Usage", spacePeakKb, es);
 		}
-		else if (hinstrument->nbatch_original != hinstrument->nbatch ||
-				 hinstrument->nbuckets_original != hinstrument->nbuckets)
+		else if (hinstrument.nbatch_original != hinstrument.nbatch ||
+				 hinstrument.nbuckets_original != hinstrument.nbuckets)
 		{
 			appendStringInfoSpaces(es->str, es->indent * 2);
 			appendStringInfo(es->str,
 							 "Buckets: %d (originally %d)  Batches: %d (originally %d)  Memory Usage: %ldkB\n",
-							 hinstrument->nbuckets,
-							 hinstrument->nbuckets_original,
-							 hinstrument->nbatch,
-							 hinstrument->nbatch_original,
+							 hinstrument.nbuckets,
+							 hinstrument.nbuckets_original,
+							 hinstrument.nbatch,
+							 hinstrument.nbatch_original,
 							 spacePeakKb);
 		}
 		else
@@ -2442,7 +2467,7 @@ show_hash_info(HashState *hashstate, ExplainState *es)
 			appendStringInfoSpaces(es->str, es->indent * 2);
 			appendStringInfo(es->str,
 							 "Buckets: %d  Batches: %d  Memory Usage: %ldkB\n",
-							 hinstrument->nbuckets, hinstrument->nbatch,
+							 hinstrument.nbuckets, hinstrument.nbatch,
 							 spacePeakKb);
 		}
 	}
diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c
index 04eb3650aa3..4e1a2806b55 100644
--- a/src/backend/executor/nodeHash.c
+++ b/src/backend/executor/nodeHash.c
@@ -3090,7 +3090,16 @@ ExecHashTableDetachBatch(HashJoinTable hashtable)
 				batch->buckets = InvalidDsaPointer;
 			}
 		}
-		ExecParallelHashUpdateSpacePeak(hashtable, curbatch);
+
+		/*
+		 * Track the largest batch we've been attached to.  Though each
+		 * backend might see a different subset of batches, explain.c will
+		 * scan the results from all backends to find the largest value.
+		 */
+		hashtable->spacePeak =
+			Max(hashtable->spacePeak,
+				batch->size + sizeof(dsa_pointer_atomic) * hashtable->nbuckets);
+
 		/* Remember that we are not attached to a batch. */
 		hashtable->curbatch = -1;
 	}
@@ -3295,19 +3304,3 @@ ExecParallelHashTuplePrealloc(HashJoinTable hashtable, int batchno, size_t size)
 
 	return true;
 }
-
-/*
- * Update this backend's copy of hashtable->spacePeak to account for a given
- * batch.  This is called at the end of hashing for batch 0, and then for each
- * batch when it is done or discovered to be already done.  The result is used
- * for EXPLAIN output.
- */
-void
-ExecParallelHashUpdateSpacePeak(HashJoinTable hashtable, int batchno)
-{
-	size_t		size;
-
-	size = hashtable->batches[batchno].shared->size;
-	size += sizeof(dsa_pointer_atomic) * hashtable->nbuckets;
-	hashtable->spacePeak = Max(hashtable->spacePeak, size);
-}
diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c
index 5d1dc1f401e..817bcf04713 100644
--- a/src/backend/executor/nodeHashjoin.c
+++ b/src/backend/executor/nodeHashjoin.c
@@ -1186,12 +1186,6 @@ ExecParallelHashJoinNewBatch(HashJoinState *hjstate)
 					 * remain).
 					 */
 					BarrierDetach(batch_barrier);
-
-					/*
-					 * We didn't work on this batch, but we need to observe
-					 * its size for EXPLAIN.
-					 */
-					ExecParallelHashUpdateSpacePeak(hashtable, batchno);
 					hashtable->batches[batchno].done = true;
 					hashtable->curbatch = -1;
 					break;
diff --git a/src/include/executor/nodeHash.h b/src/include/executor/nodeHash.h
index 84c166b3951..367dfff018c 100644
--- a/src/include/executor/nodeHash.h
+++ b/src/include/executor/nodeHash.h
@@ -33,7 +33,6 @@ extern void ExecHashTableDetach(HashJoinTable hashtable);
 extern void ExecHashTableDetachBatch(HashJoinTable hashtable);
 extern void ExecParallelHashTableSetCurrentBatch(HashJoinTable hashtable,
 									 int batchno);
-void		ExecParallelHashUpdateSpacePeak(HashJoinTable hashtable, int batchno);
 
 extern void ExecHashTableInsert(HashJoinTable hashtable,
 					TupleTableSlot *slot,
-- 
2.15.0

