On Wed, Jan 12, 2022 at 10:30 AM Melanie Plageman
<melanieplage...@gmail.com> wrote:
> On Fri, Nov 26, 2021 at 3:11 PM Thomas Munro <thomas.mu...@gmail.com> wrote:
> > #3 0x00000000009cf57e in ExceptionalCondition (conditionName=0x29cae8
> > "BarrierParticipants(&accessor->shared->batch_barrier) == 1",
> > errorType=<optimized out>, fileName=0x2ae561 "nodeHash.c",
> > lineNumber=lineNumber@entry=2224) at assert.c:69
> > No locals.
> > #4 0x000000000071575e in ExecParallelScanHashTableForUnmatched
> > (hjstate=hjstate@entry=0x80a60a3c8,
> > econtext=econtext@entry=0x80a60ae98) at nodeHash.c:2224
>
> I believe this assert can be safely removed.

Agreed.

I was looking at this with a view to committing it, but I need more
time.  This will be at the front of my queue when the tree reopens.
I'm trying to find the tooling I had somewhere that could let you test
attaching and detaching at every phase.

The attached version is just pgindent'd.
From e7453cae9b2a686d57f967fd41533546d463dd0c Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.mu...@gmail.com>
Date: Fri, 2 Oct 2020 15:53:44 +1300
Subject: [PATCH v11 1/3] Fix race condition in parallel hash join batch
 cleanup.

With unlucky timing and parallel_leader_participation off, PHJ could
attempt to access per-batch state just as it was being freed.  There was
code intended to prevent that by checking for a cleared pointer, but it
was racy.  Fix, by introducing an extra barrier phase.  The new phase
PHJ_BUILD_RUNNING means that it's safe to access the per-batch state to
find a batch to help with, and PHJ_BUILD_DONE means that it is too late.
The last to detach will free the array of per-batch state as before, but
now it will also atomically advance the phase at the same time, so that
late attachers can avoid the hazard.  This mirrors the way per-batch
hash tables are freed (see phases PHJ_BATCH_PROBING and PHJ_BATCH_DONE).

The build barrier must make it to PHJ_BUILD_DONE before shared resources
can be safely destroyed. This works fine in most cases with the addition
of another synchronization point. However, when the inner side is empty,
the build barrier will only make it to PHJ_BUILD_HASHING_INNER before
workers attempt to detach from the hashtable. In the case of the empty
inner optimization, advance the build barrier to PHJ_BUILD_RUNNING
before proceeding to cleanup. See batch 0 batch barrier fast forward in
ExecParallelHashJoinSetUpBatches() for precedent.

Revealed by a build farm failure, where BarrierAttach() failed a sanity
check assertion, because the memory had been clobbered by dsa_free().

This should eventually be back-patched to all supported releases, but
the failure is rare and the previous attempt at this was reverted, so
let's do this in master only for now, ahead of some other changes that
will move things around a bit.

Author: Thomas Munro <thomas.mu...@gmail.com>
Author: Melanie Plageman <melanieplage...@gmail.com>
Reported-by: Michael Paquier <mich...@paquier.xyz>
Discussion: https://postgr.es/m/20200929061142.GA29096%40paquier.xyz
---
 src/backend/executor/nodeHash.c     | 49 +++++++++++++++++---------
 src/backend/executor/nodeHashjoin.c | 54 ++++++++++++++++++++---------
 src/include/executor/hashjoin.h     |  3 +-
 3 files changed, 73 insertions(+), 33 deletions(-)

diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c
index 3510a4247c..d7d1d77ed1 100644
--- a/src/backend/executor/nodeHash.c
+++ b/src/backend/executor/nodeHash.c
@@ -333,14 +333,21 @@ MultiExecParallelHash(HashState *node)
 	hashtable->nbuckets = pstate->nbuckets;
 	hashtable->log2_nbuckets = my_log2(hashtable->nbuckets);
 	hashtable->totalTuples = pstate->total_tuples;
-	ExecParallelHashEnsureBatchAccessors(hashtable);
+
+	/*
+	 * Unless we're completely done and the batch state has been freed, make
+	 * sure we have accessors.
+	 */
+	if (BarrierPhase(build_barrier) < PHJ_BUILD_DONE)
+		ExecParallelHashEnsureBatchAccessors(hashtable);
 
 	/*
 	 * The next synchronization point is in ExecHashJoin's HJ_BUILD_HASHTABLE
-	 * case, which will bring the build phase to PHJ_BUILD_DONE (if it isn't
-	 * there already).
+	 * case, which will bring the build phase to PHJ_BUILD_RUNNING (if it
+	 * isn't there already).
 	 */
 	Assert(BarrierPhase(build_barrier) == PHJ_BUILD_HASHING_OUTER ||
+		   BarrierPhase(build_barrier) == PHJ_BUILD_RUNNING ||
 		   BarrierPhase(build_barrier) == PHJ_BUILD_DONE);
 }
 
@@ -624,7 +631,7 @@ ExecHashTableCreate(HashState *state, List *hashOperators, List *hashCollations,
 		/*
 		 * The next Parallel Hash synchronization point is in
 		 * MultiExecParallelHash(), which will progress it all the way to
-		 * PHJ_BUILD_DONE.  The caller must not return control from this
+		 * PHJ_BUILD_RUNNING.  The caller must not return control from this
 		 * executor node between now and then.
 		 */
 	}
@@ -3065,14 +3072,11 @@ ExecParallelHashEnsureBatchAccessors(HashJoinTable hashtable)
 	}
 
 	/*
-	 * It's possible for a backend to start up very late so that the whole
-	 * join is finished and the shm state for tracking batches has already
-	 * been freed by ExecHashTableDetach().  In that case we'll just leave
-	 * hashtable->batches as NULL so that ExecParallelHashJoinNewBatch() gives
-	 * up early.
+	 * We should never see a state where the batch-tracking array is freed,
+	 * because we should have given up sooner if we join when the build
+	 * barrier has reached the PHJ_BUILD_DONE phase.
 	 */
-	if (!DsaPointerIsValid(pstate->batches))
-		return;
+	Assert(DsaPointerIsValid(pstate->batches));
 
 	/* Use hash join memory context. */
 	oldcxt = MemoryContextSwitchTo(hashtable->hashCxt);
@@ -3192,9 +3196,17 @@ ExecHashTableDetachBatch(HashJoinTable hashtable)
 void
 ExecHashTableDetach(HashJoinTable hashtable)
 {
-	if (hashtable->parallel_state)
+	ParallelHashJoinState *pstate = hashtable->parallel_state;
+
+	/*
+	 * If we're involved in a parallel query, we must either have got all the
+	 * way to PHJ_BUILD_RUNNING, or joined too late and be in PHJ_BUILD_DONE.
+	 */
+	Assert(!pstate ||
+		   BarrierPhase(&pstate->build_barrier) >= PHJ_BUILD_RUNNING);
+
+	if (pstate && BarrierPhase(&pstate->build_barrier) == PHJ_BUILD_RUNNING)
 	{
-		ParallelHashJoinState *pstate = hashtable->parallel_state;
 		int			i;
 
 		/* Make sure any temporary files are closed. */
@@ -3210,17 +3222,22 @@ ExecHashTableDetach(HashJoinTable hashtable)
 		}
 
 		/* If we're last to detach, clean up shared memory. */
-		if (BarrierDetach(&pstate->build_barrier))
+		if (BarrierArriveAndDetach(&pstate->build_barrier))
 		{
+			/*
+			 * Late joining processes will see this state and give up
+			 * immediately.
+			 */
+			Assert(BarrierPhase(&pstate->build_barrier) == PHJ_BUILD_DONE);
+
 			if (DsaPointerIsValid(pstate->batches))
 			{
 				dsa_free(hashtable->area, pstate->batches);
 				pstate->batches = InvalidDsaPointer;
 			}
 		}
-
-		hashtable->parallel_state = NULL;
 	}
+	hashtable->parallel_state = NULL;
 }
 
 /*
diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c
index 88b870655e..ba4895e44d 100644
--- a/src/backend/executor/nodeHashjoin.c
+++ b/src/backend/executor/nodeHashjoin.c
@@ -45,7 +45,8 @@
  *   PHJ_BUILD_ALLOCATING            -- one sets up the batches and table 0
  *   PHJ_BUILD_HASHING_INNER         -- all hash the inner rel
  *   PHJ_BUILD_HASHING_OUTER         -- (multi-batch only) all hash the outer
- *   PHJ_BUILD_DONE                  -- building done, probing can begin
+ *   PHJ_BUILD_RUNNING               -- building done, probing can begin
+ *   PHJ_BUILD_DONE                  -- all work complete, one frees batches
  *
  * While in the phase PHJ_BUILD_HASHING_INNER a separate pair of barriers may
  * be used repeatedly as required to coordinate expansions in the number of
@@ -73,7 +74,7 @@
  * batches whenever it encounters them while scanning and probing, which it
  * can do because it processes batches in serial order.
  *
- * Once PHJ_BUILD_DONE is reached, backends then split up and process
+ * Once PHJ_BUILD_RUNNING is reached, backends then split up and process
  * different batches, or gang up and work together on probing batches if there
  * aren't enough to go around.  For each batch there is a separate barrier
  * with the following phases:
@@ -95,11 +96,16 @@
  *
  * To avoid deadlocks, we never wait for any barrier unless it is known that
  * all other backends attached to it are actively executing the node or have
- * already arrived.  Practically, that means that we never return a tuple
- * while attached to a barrier, unless the barrier has reached its final
- * state.  In the slightly special case of the per-batch barrier, we return
- * tuples while in PHJ_BATCH_PROBING phase, but that's OK because we use
- * BarrierArriveAndDetach() to advance it to PHJ_BATCH_DONE without waiting.
+ * finished.  Practically, that means that we never emit a tuple while attached
+ * to a barrier, unless the barrier has reached a phase that means that no
+ * process will wait on it again.  We emit tuples while attached to the build
+ * barrier in phase PHJ_BUILD_RUNNING, and to a per-batch barrier in phase
+ * PHJ_BATCH_PROBING.  These are advanced to PHJ_BUILD_DONE and PHJ_BATCH_DONE
+ * respectively without waiting, using BarrierArriveAndDetach().  The last to
+ * detach receives a different return value so that it knows that it's safe to
+ * clean up.  Any straggler process that attaches after that phase is reached
+ * will see that it's too late to participate or access the relevant shared
+ * memory objects.
  *
  *-------------------------------------------------------------------------
  */
@@ -296,7 +302,21 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
 				 * outer relation.
 				 */
 				if (hashtable->totalTuples == 0 && !HJ_FILL_OUTER(node))
+				{
+					if (parallel)
+					{
+						/*
+						 * Advance the build barrier to PHJ_BUILD_RUNNING
+						 * before proceeding to cleanup to comply with build
+						 * barrier safety requirements.
+						 */
+						Barrier    *build_barrier = &parallel_state->build_barrier;
+
+						while (BarrierPhase(build_barrier) < PHJ_BUILD_RUNNING)
+							BarrierArriveAndWait(build_barrier, 0);
+					}
 					return NULL;
+				}
 
 				/*
 				 * need to remember whether nbatch has increased since we
@@ -317,6 +337,7 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
 
 					build_barrier = &parallel_state->build_barrier;
 					Assert(BarrierPhase(build_barrier) == PHJ_BUILD_HASHING_OUTER ||
+						   BarrierPhase(build_barrier) == PHJ_BUILD_RUNNING ||
 						   BarrierPhase(build_barrier) == PHJ_BUILD_DONE);
 					if (BarrierPhase(build_barrier) == PHJ_BUILD_HASHING_OUTER)
 					{
@@ -329,9 +350,18 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
 						BarrierArriveAndWait(build_barrier,
 											 WAIT_EVENT_HASH_BUILD_HASH_OUTER);
 					}
-					Assert(BarrierPhase(build_barrier) == PHJ_BUILD_DONE);
+					else if (BarrierPhase(build_barrier) == PHJ_BUILD_DONE)
+					{
+						/*
+						 * If we attached so late that the job is finished and
+						 * the batch state has been freed, we can return
+						 * immediately.
+						 */
+						return NULL;
+					}
 
 					/* Each backend should now select a batch to work on. */
+					Assert(BarrierPhase(build_barrier) == PHJ_BUILD_RUNNING);
 					hashtable->curbatch = -1;
 					node->hj_JoinState = HJ_NEED_NEW_BATCH;
 
@@ -1090,14 +1120,6 @@ ExecParallelHashJoinNewBatch(HashJoinState *hjstate)
 	int			start_batchno;
 	int			batchno;
 
-	/*
-	 * If we started up so late that the batch tracking array has been freed
-	 * already by ExecHashTableDetach(), then we are finished.  See also
-	 * ExecParallelHashEnsureBatchAccessors().
-	 */
-	if (hashtable->batches == NULL)
-		return false;
-
 	/*
 	 * If we were already attached to a batch, remember not to bother checking
 	 * it again, and detach from it (possibly freeing the hash table if we are
diff --git a/src/include/executor/hashjoin.h b/src/include/executor/hashjoin.h
index 5d72243022..d7e90bc0e2 100644
--- a/src/include/executor/hashjoin.h
+++ b/src/include/executor/hashjoin.h
@@ -258,7 +258,8 @@ typedef struct ParallelHashJoinState
 #define PHJ_BUILD_ALLOCATING			1
 #define PHJ_BUILD_HASHING_INNER			2
 #define PHJ_BUILD_HASHING_OUTER			3
-#define PHJ_BUILD_DONE					4
+#define PHJ_BUILD_RUNNING				4
+#define PHJ_BUILD_DONE					5
 
 /* The phases for probing each batch, used by for batch_barrier. */
 #define PHJ_BATCH_ELECTING				0
-- 
2.30.2

From 008dfa76f9bea487b025af3790c398ddce9d06af Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.mu...@gmail.com>
Date: Sat, 6 Mar 2021 12:06:16 +1300
Subject: [PATCH v11 2/3] Improve the naming of Parallel Hash Join phases.

Commit 3048898e dropped -ING from some wait event names.  Update the
corresponding barrier phases names to match.

While we're here making cosmetic changes, also rename "DONE" to "FREE".
That pairs better with "ALLOCATE", and describes the activity that
actually happens in that phase (as we do for the other phases) rather
than describing a state.  As for the growth barriers, rename their
"ALLOCATE" phase to "REALLOCATE", which is probably a better description
of what happens then.

Reviewed-by: Melanie Plageman <melanieplage...@gmail.com>
Discussion: https://postgr.es/m/CA%2BhUKG%2BA6ftXPz4oe92%2Bx8Er%2BxpGZqto70-Q_ERwRaSyA%3DafNg%40mail.gmail.com
---
 src/backend/executor/nodeHash.c         | 72 +++++++++----------
 src/backend/executor/nodeHashjoin.c     | 91 +++++++++++++------------
 src/backend/utils/activity/wait_event.c |  8 +--
 src/include/executor/hashjoin.h         | 38 +++++------
 src/include/utils/wait_event.h          |  4 +-
 5 files changed, 108 insertions(+), 105 deletions(-)

diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c
index d7d1d77ed1..6a57ac8c98 100644
--- a/src/backend/executor/nodeHash.c
+++ b/src/backend/executor/nodeHash.c
@@ -246,10 +246,10 @@ MultiExecParallelHash(HashState *node)
 	 */
 	pstate = hashtable->parallel_state;
 	build_barrier = &pstate->build_barrier;
-	Assert(BarrierPhase(build_barrier) >= PHJ_BUILD_ALLOCATING);
+	Assert(BarrierPhase(build_barrier) >= PHJ_BUILD_ALLOCATE);
 	switch (BarrierPhase(build_barrier))
 	{
-		case PHJ_BUILD_ALLOCATING:
+		case PHJ_BUILD_ALLOCATE:
 
 			/*
 			 * Either I just allocated the initial hash table in
@@ -259,7 +259,7 @@ MultiExecParallelHash(HashState *node)
 			BarrierArriveAndWait(build_barrier, WAIT_EVENT_HASH_BUILD_ALLOCATE);
 			/* Fall through. */
 
-		case PHJ_BUILD_HASHING_INNER:
+		case PHJ_BUILD_HASH_INNER:
 
 			/*
 			 * It's time to begin hashing, or if we just arrived here then
@@ -271,10 +271,10 @@ MultiExecParallelHash(HashState *node)
 			 * below.
 			 */
 			if (PHJ_GROW_BATCHES_PHASE(BarrierAttach(&pstate->grow_batches_barrier)) !=
-				PHJ_GROW_BATCHES_ELECTING)
+				PHJ_GROW_BATCHES_ELECT)
 				ExecParallelHashIncreaseNumBatches(hashtable);
 			if (PHJ_GROW_BUCKETS_PHASE(BarrierAttach(&pstate->grow_buckets_barrier)) !=
-				PHJ_GROW_BUCKETS_ELECTING)
+				PHJ_GROW_BUCKETS_ELECT)
 				ExecParallelHashIncreaseNumBuckets(hashtable);
 			ExecParallelHashEnsureBatchAccessors(hashtable);
 			ExecParallelHashTableSetCurrentBatch(hashtable, 0);
@@ -338,17 +338,17 @@ MultiExecParallelHash(HashState *node)
 	 * Unless we're completely done and the batch state has been freed, make
 	 * sure we have accessors.
 	 */
-	if (BarrierPhase(build_barrier) < PHJ_BUILD_DONE)
+	if (BarrierPhase(build_barrier) < PHJ_BUILD_FREE)
 		ExecParallelHashEnsureBatchAccessors(hashtable);
 
 	/*
 	 * The next synchronization point is in ExecHashJoin's HJ_BUILD_HASHTABLE
-	 * case, which will bring the build phase to PHJ_BUILD_RUNNING (if it
-	 * isn't there already).
+	 * case, which will bring the build phase to PHJ_BUILD_RUN (if it isn't
+	 * there already).
 	 */
-	Assert(BarrierPhase(build_barrier) == PHJ_BUILD_HASHING_OUTER ||
-		   BarrierPhase(build_barrier) == PHJ_BUILD_RUNNING ||
-		   BarrierPhase(build_barrier) == PHJ_BUILD_DONE);
+	Assert(BarrierPhase(build_barrier) == PHJ_BUILD_HASH_OUTER ||
+		   BarrierPhase(build_barrier) == PHJ_BUILD_RUN ||
+		   BarrierPhase(build_barrier) == PHJ_BUILD_FREE);
 }
 
 /* ----------------------------------------------------------------
@@ -596,7 +596,7 @@ ExecHashTableCreate(HashState *state, List *hashOperators, List *hashCollations,
 		 * Attach to the build barrier.  The corresponding detach operation is
 		 * in ExecHashTableDetach.  Note that we won't attach to the
 		 * batch_barrier for batch 0 yet.  We'll attach later and start it out
-		 * in PHJ_BATCH_PROBING phase, because batch 0 is allocated up front
+		 * in PHJ_BATCH_PROBE phase, because batch 0 is allocated up front
 		 * and then loaded while hashing (the standard hybrid hash join
 		 * algorithm), and we'll coordinate that using build_barrier.
 		 */
@@ -610,7 +610,7 @@ ExecHashTableCreate(HashState *state, List *hashOperators, List *hashCollations,
 		 * SharedHashJoinBatch objects and the hash table for batch 0.  One
 		 * backend will be elected to do that now if necessary.
 		 */
-		if (BarrierPhase(build_barrier) == PHJ_BUILD_ELECTING &&
+		if (BarrierPhase(build_barrier) == PHJ_BUILD_ELECT &&
 			BarrierArriveAndWait(build_barrier, WAIT_EVENT_HASH_BUILD_ELECT))
 		{
 			pstate->nbatch = nbatch;
@@ -631,7 +631,7 @@ ExecHashTableCreate(HashState *state, List *hashOperators, List *hashCollations,
 		/*
 		 * The next Parallel Hash synchronization point is in
 		 * MultiExecParallelHash(), which will progress it all the way to
-		 * PHJ_BUILD_RUNNING.  The caller must not return control from this
+		 * PHJ_BUILD_RUN.  The caller must not return control from this
 		 * executor node between now and then.
 		 */
 	}
@@ -1086,7 +1086,7 @@ ExecParallelHashIncreaseNumBatches(HashJoinTable hashtable)
 	ParallelHashJoinState *pstate = hashtable->parallel_state;
 	int			i;
 
-	Assert(BarrierPhase(&pstate->build_barrier) == PHJ_BUILD_HASHING_INNER);
+	Assert(BarrierPhase(&pstate->build_barrier) == PHJ_BUILD_HASH_INNER);
 
 	/*
 	 * It's unlikely, but we need to be prepared for new participants to show
@@ -1095,7 +1095,7 @@ ExecParallelHashIncreaseNumBatches(HashJoinTable hashtable)
 	 */
 	switch (PHJ_GROW_BATCHES_PHASE(BarrierPhase(&pstate->grow_batches_barrier)))
 	{
-		case PHJ_GROW_BATCHES_ELECTING:
+		case PHJ_GROW_BATCHES_ELECT:
 
 			/*
 			 * Elect one participant to prepare to grow the number of batches.
@@ -1211,13 +1211,13 @@ ExecParallelHashIncreaseNumBatches(HashJoinTable hashtable)
 			}
 			/* Fall through. */
 
-		case PHJ_GROW_BATCHES_ALLOCATING:
+		case PHJ_GROW_BATCHES_REALLOCATE:
 			/* Wait for the above to be finished. */
 			BarrierArriveAndWait(&pstate->grow_batches_barrier,
-								 WAIT_EVENT_HASH_GROW_BATCHES_ALLOCATE);
+								 WAIT_EVENT_HASH_GROW_BATCHES_REALLOCATE);
 			/* Fall through. */
 
-		case PHJ_GROW_BATCHES_REPARTITIONING:
+		case PHJ_GROW_BATCHES_REPARTITION:
 			/* Make sure that we have the current dimensions and buckets. */
 			ExecParallelHashEnsureBatchAccessors(hashtable);
 			ExecParallelHashTableSetCurrentBatch(hashtable, 0);
@@ -1230,7 +1230,7 @@ ExecParallelHashIncreaseNumBatches(HashJoinTable hashtable)
 								 WAIT_EVENT_HASH_GROW_BATCHES_REPARTITION);
 			/* Fall through. */
 
-		case PHJ_GROW_BATCHES_DECIDING:
+		case PHJ_GROW_BATCHES_DECIDE:
 
 			/*
 			 * Elect one participant to clean up and decide whether further
@@ -1285,7 +1285,7 @@ ExecParallelHashIncreaseNumBatches(HashJoinTable hashtable)
 			}
 			/* Fall through. */
 
-		case PHJ_GROW_BATCHES_FINISHING:
+		case PHJ_GROW_BATCHES_FINISH:
 			/* Wait for the above to complete. */
 			BarrierArriveAndWait(&pstate->grow_batches_barrier,
 								 WAIT_EVENT_HASH_GROW_BATCHES_FINISH);
@@ -1525,7 +1525,7 @@ ExecParallelHashIncreaseNumBuckets(HashJoinTable hashtable)
 	HashMemoryChunk chunk;
 	dsa_pointer chunk_s;
 
-	Assert(BarrierPhase(&pstate->build_barrier) == PHJ_BUILD_HASHING_INNER);
+	Assert(BarrierPhase(&pstate->build_barrier) == PHJ_BUILD_HASH_INNER);
 
 	/*
 	 * It's unlikely, but we need to be prepared for new participants to show
@@ -1534,7 +1534,7 @@ ExecParallelHashIncreaseNumBuckets(HashJoinTable hashtable)
 	 */
 	switch (PHJ_GROW_BUCKETS_PHASE(BarrierPhase(&pstate->grow_buckets_barrier)))
 	{
-		case PHJ_GROW_BUCKETS_ELECTING:
+		case PHJ_GROW_BUCKETS_ELECT:
 			/* Elect one participant to prepare to increase nbuckets. */
 			if (BarrierArriveAndWait(&pstate->grow_buckets_barrier,
 									 WAIT_EVENT_HASH_GROW_BUCKETS_ELECT))
@@ -1563,13 +1563,13 @@ ExecParallelHashIncreaseNumBuckets(HashJoinTable hashtable)
 			}
 			/* Fall through. */
 
-		case PHJ_GROW_BUCKETS_ALLOCATING:
+		case PHJ_GROW_BUCKETS_REALLOCATE:
 			/* Wait for the above to complete. */
 			BarrierArriveAndWait(&pstate->grow_buckets_barrier,
-								 WAIT_EVENT_HASH_GROW_BUCKETS_ALLOCATE);
+								 WAIT_EVENT_HASH_GROW_BUCKETS_REALLOCATE);
 			/* Fall through. */
 
-		case PHJ_GROW_BUCKETS_REINSERTING:
+		case PHJ_GROW_BUCKETS_REINSERT:
 			/* Reinsert all tuples into the hash table. */
 			ExecParallelHashEnsureBatchAccessors(hashtable);
 			ExecParallelHashTableSetCurrentBatch(hashtable, 0);
@@ -1725,7 +1725,7 @@ retry:
 
 		/* Try to load it into memory. */
 		Assert(BarrierPhase(&hashtable->parallel_state->build_barrier) ==
-			   PHJ_BUILD_HASHING_INNER);
+			   PHJ_BUILD_HASH_INNER);
 		hashTuple = ExecParallelHashTupleAlloc(hashtable,
 											   HJTUPLE_OVERHEAD + tuple->t_len,
 											   &shared);
@@ -2879,7 +2879,7 @@ ExecParallelHashTupleAlloc(HashJoinTable hashtable, size_t size,
 	if (pstate->growth != PHJ_GROWTH_DISABLED)
 	{
 		Assert(curbatch == 0);
-		Assert(BarrierPhase(&pstate->build_barrier) == PHJ_BUILD_HASHING_INNER);
+		Assert(BarrierPhase(&pstate->build_barrier) == PHJ_BUILD_HASH_INNER);
 
 		/*
 		 * Check if our space limit would be exceeded.  To avoid choking on
@@ -2999,7 +2999,7 @@ ExecParallelHashJoinSetUpBatches(HashJoinTable hashtable, int nbatch)
 		{
 			/* Batch 0 doesn't need to be loaded. */
 			BarrierAttach(&shared->batch_barrier);
-			while (BarrierPhase(&shared->batch_barrier) < PHJ_BATCH_PROBING)
+			while (BarrierPhase(&shared->batch_barrier) < PHJ_BATCH_PROBE)
 				BarrierArriveAndWait(&shared->batch_barrier, 0);
 			BarrierDetach(&shared->batch_barrier);
 		}
@@ -3073,8 +3073,8 @@ ExecParallelHashEnsureBatchAccessors(HashJoinTable hashtable)
 
 	/*
 	 * We should never see a state where the batch-tracking array is freed,
-	 * because we should have given up sooner if we join when the build
-	 * barrier has reached the PHJ_BUILD_DONE phase.
+	 * because we should have given up sooner if we join when the build barrier
+	 * has reached the PHJ_BUILD_FREE phase.
 	 */
 	Assert(DsaPointerIsValid(pstate->batches));
 
@@ -3157,7 +3157,7 @@ ExecHashTableDetachBatch(HashJoinTable hashtable)
 			 * longer attached, but since there is no way it's moving after
 			 * this point it seems safe to make the following assertion.
 			 */
-			Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_DONE);
+			Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_FREE);
 
 			/* Free shared chunks and buckets. */
 			while (DsaPointerIsValid(batch->chunks))
@@ -3200,12 +3200,12 @@ ExecHashTableDetach(HashJoinTable hashtable)
 
 	/*
 	 * If we're involved in a parallel query, we must either have got all the
-	 * way to PHJ_BUILD_RUNNING, or joined too late and be in PHJ_BUILD_DONE.
+	 * way to PHJ_BUILD_RUN, or joined too late and be in PHJ_BUILD_FREE.
 	 */
 	Assert(!pstate ||
-		   BarrierPhase(&pstate->build_barrier) >= PHJ_BUILD_RUNNING);
+		   BarrierPhase(&pstate->build_barrier) >= PHJ_BUILD_RUN);
 
-	if (pstate && BarrierPhase(&pstate->build_barrier) == PHJ_BUILD_RUNNING)
+	if (pstate && BarrierPhase(&pstate->build_barrier) == PHJ_BUILD_RUN)
 	{
 		int			i;
 
@@ -3228,7 +3228,7 @@ ExecHashTableDetach(HashJoinTable hashtable)
 			 * Late joining processes will see this state and give up
 			 * immediately.
 			 */
-			Assert(BarrierPhase(&pstate->build_barrier) == PHJ_BUILD_DONE);
+			Assert(BarrierPhase(&pstate->build_barrier) == PHJ_BUILD_FREE);
 
 			if (DsaPointerIsValid(pstate->batches))
 			{
diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c
index ba4895e44d..a45c657550 100644
--- a/src/backend/executor/nodeHashjoin.c
+++ b/src/backend/executor/nodeHashjoin.c
@@ -39,27 +39,30 @@
  *
  * One barrier called build_barrier is used to coordinate the hashing phases.
  * The phase is represented by an integer which begins at zero and increments
- * one by one, but in the code it is referred to by symbolic names as follows:
+ * one by one, but in the code it is referred to by symbolic names as follows.
+ * An asterisk indicates a phase that is performed by a single arbitrarily
+ * chosen process.
  *
- *   PHJ_BUILD_ELECTING              -- initial state
- *   PHJ_BUILD_ALLOCATING            -- one sets up the batches and table 0
- *   PHJ_BUILD_HASHING_INNER         -- all hash the inner rel
- *   PHJ_BUILD_HASHING_OUTER         -- (multi-batch only) all hash the outer
- *   PHJ_BUILD_RUNNING               -- building done, probing can begin
- *   PHJ_BUILD_DONE                  -- all work complete, one frees batches
+ *   PHJ_BUILD_ELECT                 -- initial state
+ *   PHJ_BUILD_ALLOCATE*             -- one sets up the batches and table 0
+ *   PHJ_BUILD_HASH_INNER            -- all hash the inner rel
+ *   PHJ_BUILD_HASH_OUTER            -- (multi-batch only) all hash the outer
+ *   PHJ_BUILD_RUN                   -- building done, probing can begin
+ *   PHJ_BUILD_FREE*                 -- all work complete, one frees batches
  *
- * While in the phase PHJ_BUILD_HASHING_INNER a separate pair of barriers may
+ * While in the phase PHJ_BUILD_HASH_INNER a separate pair of barriers may
  * be used repeatedly as required to coordinate expansions in the number of
  * batches or buckets.  Their phases are as follows:
  *
- *   PHJ_GROW_BATCHES_ELECTING       -- initial state
- *   PHJ_GROW_BATCHES_ALLOCATING     -- one allocates new batches
- *   PHJ_GROW_BATCHES_REPARTITIONING -- all repartition
- *   PHJ_GROW_BATCHES_FINISHING      -- one cleans up, detects skew
+ *   PHJ_GROW_BATCHES_ELECT          -- initial state
+ *   PHJ_GROW_BATCHES_REALLOCATE*    -- one allocates new batches
+ *   PHJ_GROW_BATCHES_REPARTITION    -- all repartition
+ *   PHJ_GROW_BATCHES_DECIDE*        -- one detects skew and cleans up
+ *   PHJ_GROW_BATCHES_FINISH         -- finished one growth cycle
  *
- *   PHJ_GROW_BUCKETS_ELECTING       -- initial state
- *   PHJ_GROW_BUCKETS_ALLOCATING     -- one allocates new buckets
- *   PHJ_GROW_BUCKETS_REINSERTING    -- all insert tuples
+ *   PHJ_GROW_BUCKETS_ELECT          -- initial state
+ *   PHJ_GROW_BUCKETS_REALLOCATE*    -- one allocates new buckets
+ *   PHJ_GROW_BUCKETS_REINSERT       -- all insert tuples
  *
  * If the planner got the number of batches and buckets right, those won't be
  * necessary, but on the other hand we might finish up needing to expand the
@@ -67,27 +70,27 @@
  * within our memory budget and load factor target.  For that reason it's a
  * separate pair of barriers using circular phases.
  *
- * The PHJ_BUILD_HASHING_OUTER phase is required only for multi-batch joins,
+ * The PHJ_BUILD_HASH_OUTER phase is required only for multi-batch joins,
  * because we need to divide the outer relation into batches up front in order
  * to be able to process batches entirely independently.  In contrast, the
  * parallel-oblivious algorithm simply throws tuples 'forward' to 'later'
  * batches whenever it encounters them while scanning and probing, which it
  * can do because it processes batches in serial order.
  *
- * Once PHJ_BUILD_RUNNING is reached, backends then split up and process
+ * Once PHJ_BUILD_RUN is reached, backends then split up and process
  * different batches, or gang up and work together on probing batches if there
  * aren't enough to go around.  For each batch there is a separate barrier
  * with the following phases:
  *
- *  PHJ_BATCH_ELECTING       -- initial state
- *  PHJ_BATCH_ALLOCATING     -- one allocates buckets
- *  PHJ_BATCH_LOADING        -- all load the hash table from disk
- *  PHJ_BATCH_PROBING        -- all probe
- *  PHJ_BATCH_DONE           -- end
+ *  PHJ_BATCH_ELECT          -- initial state
+ *  PHJ_BATCH_ALLOCATE*      -- one allocates buckets
+ *  PHJ_BATCH_LOAD           -- all load the hash table from disk
+ *  PHJ_BATCH_PROBE          -- all probe
+ *  PHJ_BATCH_FREE*          -- one frees memory
  *
  * Batch 0 is a special case, because it starts out in phase
- * PHJ_BATCH_PROBING; populating batch 0's hash table is done during
- * PHJ_BUILD_HASHING_INNER so we can skip loading.
+ * PHJ_BATCH_PROBE; populating batch 0's hash table is done during
+ * PHJ_BUILD_HASH_INNER so we can skip loading.
  *
  * Initially we try to plan for a single-batch hash join using the combined
  * hash_mem of all participants to create a large shared hash table.  If that
@@ -99,8 +102,8 @@
  * finished.  Practically, that means that we never emit a tuple while attached
  * to a barrier, unless the barrier has reached a phase that means that no
  * process will wait on it again.  We emit tuples while attached to the build
- * barrier in phase PHJ_BUILD_RUNNING, and to a per-batch barrier in phase
- * PHJ_BATCH_PROBING.  These are advanced to PHJ_BUILD_DONE and PHJ_BATCH_DONE
+ * barrier in phase PHJ_BUILD_RUN, and to a per-batch barrier in phase
+ * PHJ_BATCH_PROBE.  These are advanced to PHJ_BUILD_FREE and PHJ_BATCH_FREE
  * respectively without waiting, using BarrierArriveAndDetach().  The last to
  * detach receives a different return value so that it knows that it's safe to
  * clean up.  Any straggler process that attaches after that phase is reached
@@ -306,13 +309,13 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
 					if (parallel)
 					{
 						/*
-						 * Advance the build barrier to PHJ_BUILD_RUNNING
-						 * before proceeding to cleanup to comply with build
-						 * barrier safety requirements.
+						 * Advance the build barrier to PHJ_BUILD_RUN before
+						 * proceeding to cleanup to comply with build barrier
+						 * safety requirements.
 						 */
 						Barrier    *build_barrier = &parallel_state->build_barrier;
 
-						while (BarrierPhase(build_barrier) < PHJ_BUILD_RUNNING)
+						while (BarrierPhase(build_barrier) < PHJ_BUILD_RUN)
 							BarrierArriveAndWait(build_barrier, 0);
 					}
 					return NULL;
@@ -336,10 +339,10 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
 					Barrier    *build_barrier;
 
 					build_barrier = &parallel_state->build_barrier;
-					Assert(BarrierPhase(build_barrier) == PHJ_BUILD_HASHING_OUTER ||
-						   BarrierPhase(build_barrier) == PHJ_BUILD_RUNNING ||
-						   BarrierPhase(build_barrier) == PHJ_BUILD_DONE);
-					if (BarrierPhase(build_barrier) == PHJ_BUILD_HASHING_OUTER)
+					Assert(BarrierPhase(build_barrier) == PHJ_BUILD_HASH_OUTER ||
+						   BarrierPhase(build_barrier) == PHJ_BUILD_RUN ||
+						   BarrierPhase(build_barrier) == PHJ_BUILD_FREE);
+					if (BarrierPhase(build_barrier) == PHJ_BUILD_HASH_OUTER)
 					{
 						/*
 						 * If multi-batch, we need to hash the outer relation
@@ -350,7 +353,7 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
 						BarrierArriveAndWait(build_barrier,
 											 WAIT_EVENT_HASH_BUILD_HASH_OUTER);
 					}
-					else if (BarrierPhase(build_barrier) == PHJ_BUILD_DONE)
+					else if (BarrierPhase(build_barrier) == PHJ_BUILD_FREE)
 					{
 						/*
 						 * If we attached so late that the job is finished and
@@ -361,7 +364,7 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
 					}
 
 					/* Each backend should now select a batch to work on. */
-					Assert(BarrierPhase(build_barrier) == PHJ_BUILD_RUNNING);
+					Assert(BarrierPhase(build_barrier) == PHJ_BUILD_RUN);
 					hashtable->curbatch = -1;
 					node->hj_JoinState = HJ_NEED_NEW_BATCH;
 
@@ -1153,7 +1156,7 @@ ExecParallelHashJoinNewBatch(HashJoinState *hjstate)
 
 			switch (BarrierAttach(batch_barrier))
 			{
-				case PHJ_BATCH_ELECTING:
+				case PHJ_BATCH_ELECT:
 
 					/* One backend allocates the hash table. */
 					if (BarrierArriveAndWait(batch_barrier,
@@ -1161,13 +1164,13 @@ ExecParallelHashJoinNewBatch(HashJoinState *hjstate)
 						ExecParallelHashTableAlloc(hashtable, batchno);
 					/* Fall through. */
 
-				case PHJ_BATCH_ALLOCATING:
+				case PHJ_BATCH_ALLOCATE:
 					/* Wait for allocation to complete. */
 					BarrierArriveAndWait(batch_barrier,
 										 WAIT_EVENT_HASH_BATCH_ALLOCATE);
 					/* Fall through. */
 
-				case PHJ_BATCH_LOADING:
+				case PHJ_BATCH_LOAD:
 					/* Start (or join in) loading tuples. */
 					ExecParallelHashTableSetCurrentBatch(hashtable, batchno);
 					inner_tuples = hashtable->batches[batchno].inner_tuples;
@@ -1187,7 +1190,7 @@ ExecParallelHashJoinNewBatch(HashJoinState *hjstate)
 										 WAIT_EVENT_HASH_BATCH_LOAD);
 					/* Fall through. */
 
-				case PHJ_BATCH_PROBING:
+				case PHJ_BATCH_PROBE:
 
 					/*
 					 * This batch is ready to probe.  Return control to
@@ -1197,13 +1200,13 @@ ExecParallelHashJoinNewBatch(HashJoinState *hjstate)
 					 * this barrier again (or else a deadlock could occur).
 					 * All attached participants must eventually call
 					 * BarrierArriveAndDetach() so that the final phase
-					 * PHJ_BATCH_DONE can be reached.
+					 * PHJ_BATCH_FREE can be reached.
 					 */
 					ExecParallelHashTableSetCurrentBatch(hashtable, batchno);
 					sts_begin_parallel_scan(hashtable->batches[batchno].outer_tuples);
 					return true;
 
-				case PHJ_BATCH_DONE:
+				case PHJ_BATCH_FREE:
 
 					/*
 					 * Already done.  Detach and go around again (if any
@@ -1530,7 +1533,7 @@ ExecHashJoinReInitializeDSM(HashJoinState *state, ParallelContext *cxt)
 	/*
 	 * It would be possible to reuse the shared hash table in single-batch
 	 * cases by resetting and then fast-forwarding build_barrier to
-	 * PHJ_BUILD_DONE and batch 0's batch_barrier to PHJ_BATCH_PROBING, but
+	 * PHJ_BUILD_FREE and batch 0's batch_barrier to PHJ_BATCH_PROBE, but
 	 * currently shared hash tables are already freed by now (by the last
 	 * participant to detach from the batch).  We could consider keeping it
 	 * around for single-batch joins.  We'd also need to adjust
@@ -1549,7 +1552,7 @@ ExecHashJoinReInitializeDSM(HashJoinState *state, ParallelContext *cxt)
 	/* Clear any shared batch files. */
 	SharedFileSetDeleteAll(&pstate->fileset);
 
-	/* Reset build_barrier to PHJ_BUILD_ELECTING so we can go around again. */
+	/* Reset build_barrier to PHJ_BUILD_ELECT so we can go around again. */
 	BarrierInit(&pstate->build_barrier, 0);
 }
 
diff --git a/src/backend/utils/activity/wait_event.c b/src/backend/utils/activity/wait_event.c
index 87c15b9c6f..28fd376f9e 100644
--- a/src/backend/utils/activity/wait_event.c
+++ b/src/backend/utils/activity/wait_event.c
@@ -364,8 +364,8 @@ pgstat_get_wait_ipc(WaitEventIPC w)
 		case WAIT_EVENT_HASH_BUILD_HASH_OUTER:
 			event_name = "HashBuildHashOuter";
 			break;
-		case WAIT_EVENT_HASH_GROW_BATCHES_ALLOCATE:
-			event_name = "HashGrowBatchesAllocate";
+		case WAIT_EVENT_HASH_GROW_BATCHES_REALLOCATE:
+			event_name = "HashGrowBatchesReallocate";
 			break;
 		case WAIT_EVENT_HASH_GROW_BATCHES_DECIDE:
 			event_name = "HashGrowBatchesDecide";
@@ -379,8 +379,8 @@ pgstat_get_wait_ipc(WaitEventIPC w)
 		case WAIT_EVENT_HASH_GROW_BATCHES_REPARTITION:
 			event_name = "HashGrowBatchesRepartition";
 			break;
-		case WAIT_EVENT_HASH_GROW_BUCKETS_ALLOCATE:
-			event_name = "HashGrowBucketsAllocate";
+		case WAIT_EVENT_HASH_GROW_BUCKETS_REALLOCATE:
+			event_name = "HashGrowBucketsReallocate";
 			break;
 		case WAIT_EVENT_HASH_GROW_BUCKETS_ELECT:
 			event_name = "HashGrowBucketsElect";
diff --git a/src/include/executor/hashjoin.h b/src/include/executor/hashjoin.h
index d7e90bc0e2..534f818bd7 100644
--- a/src/include/executor/hashjoin.h
+++ b/src/include/executor/hashjoin.h
@@ -254,32 +254,32 @@ typedef struct ParallelHashJoinState
 } ParallelHashJoinState;
 
 /* The phases for building batches, used by build_barrier. */
-#define PHJ_BUILD_ELECTING				0
-#define PHJ_BUILD_ALLOCATING			1
-#define PHJ_BUILD_HASHING_INNER			2
-#define PHJ_BUILD_HASHING_OUTER			3
-#define PHJ_BUILD_RUNNING				4
-#define PHJ_BUILD_DONE					5
+#define PHJ_BUILD_ELECT					0
+#define PHJ_BUILD_ALLOCATE				1
+#define PHJ_BUILD_HASH_INNER			2
+#define PHJ_BUILD_HASH_OUTER			3
+#define PHJ_BUILD_RUN					4
+#define PHJ_BUILD_FREE					5
 
 /* The phases for probing each batch, used by for batch_barrier. */
-#define PHJ_BATCH_ELECTING				0
-#define PHJ_BATCH_ALLOCATING			1
-#define PHJ_BATCH_LOADING				2
-#define PHJ_BATCH_PROBING				3
-#define PHJ_BATCH_DONE					4
+#define PHJ_BATCH_ELECT					0
+#define PHJ_BATCH_ALLOCATE				1
+#define PHJ_BATCH_LOAD					2
+#define PHJ_BATCH_PROBE					3
+#define PHJ_BATCH_FREE					4
 
 /* The phases of batch growth while hashing, for grow_batches_barrier. */
-#define PHJ_GROW_BATCHES_ELECTING		0
-#define PHJ_GROW_BATCHES_ALLOCATING		1
-#define PHJ_GROW_BATCHES_REPARTITIONING 2
-#define PHJ_GROW_BATCHES_DECIDING		3
-#define PHJ_GROW_BATCHES_FINISHING		4
+#define PHJ_GROW_BATCHES_ELECT			0
+#define PHJ_GROW_BATCHES_REALLOCATE		1
+#define PHJ_GROW_BATCHES_REPARTITION	2
+#define PHJ_GROW_BATCHES_DECIDE			3
+#define PHJ_GROW_BATCHES_FINISH			4
 #define PHJ_GROW_BATCHES_PHASE(n)		((n) % 5)	/* circular phases */
 
 /* The phases of bucket growth while hashing, for grow_buckets_barrier. */
-#define PHJ_GROW_BUCKETS_ELECTING		0
-#define PHJ_GROW_BUCKETS_ALLOCATING		1
-#define PHJ_GROW_BUCKETS_REINSERTING	2
+#define PHJ_GROW_BUCKETS_ELECT			0
+#define PHJ_GROW_BUCKETS_REALLOCATE		1
+#define PHJ_GROW_BUCKETS_REINSERT		2
 #define PHJ_GROW_BUCKETS_PHASE(n)		((n) % 3)	/* circular phases */
 
 typedef struct HashJoinTableData
diff --git a/src/include/utils/wait_event.h b/src/include/utils/wait_event.h
index b578e2ec75..73e9b690a8 100644
--- a/src/include/utils/wait_event.h
+++ b/src/include/utils/wait_event.h
@@ -97,12 +97,12 @@ typedef enum
 	WAIT_EVENT_HASH_BUILD_ELECT,
 	WAIT_EVENT_HASH_BUILD_HASH_INNER,
 	WAIT_EVENT_HASH_BUILD_HASH_OUTER,
-	WAIT_EVENT_HASH_GROW_BATCHES_ALLOCATE,
+	WAIT_EVENT_HASH_GROW_BATCHES_REALLOCATE,
 	WAIT_EVENT_HASH_GROW_BATCHES_DECIDE,
 	WAIT_EVENT_HASH_GROW_BATCHES_ELECT,
 	WAIT_EVENT_HASH_GROW_BATCHES_FINISH,
 	WAIT_EVENT_HASH_GROW_BATCHES_REPARTITION,
-	WAIT_EVENT_HASH_GROW_BUCKETS_ALLOCATE,
+	WAIT_EVENT_HASH_GROW_BUCKETS_REALLOCATE,
 	WAIT_EVENT_HASH_GROW_BUCKETS_ELECT,
 	WAIT_EVENT_HASH_GROW_BUCKETS_REINSERT,
 	WAIT_EVENT_LOGICAL_SYNC_DATA,
-- 
2.30.2

From 687bf603dd82d052552c160e0661c01858001591 Mon Sep 17 00:00:00 2001
From: Melanie Plageman <melanieplage...@gmail.com>
Date: Wed, 4 Nov 2020 14:25:33 -0800
Subject: [PATCH v11 3/3] Parallel Hash {Full,Right} Outer Join.

Previously, parallel full and right outer joins were not supported due
to a potential deadlock hazard (see discussion).

For now, sidestep the problem by terminating parallelism for the
unmatched inner tuple scan. The last process to arrive at the barrier
prepares for the unmatched inner tuple scan in HJ_NEED_NEW_OUTER and
transitions to HJ_FILL_INNER, scanning the hash table and emitting
unmatched inner tuples.  Other processes are free to go and work on
other batches, if there are any.

To make parallel and serial hash join more consistent, change the serial
version to scan match bits in tuple chunk order, instead of doing it in
hash table bucket order.

Author: Melanie Plageman <melanieplage...@gmail.com>
Reviewed-by: Thomas Munro <thomas.mu...@gmail.com>
Discussion: https://postgr.es/m/CA%2BhUKG%2BA6ftXPz4oe92%2Bx8Er%2BxpGZqto70-Q_ERwRaSyA%3DafNg%40mail.gmail.com
---
 src/backend/executor/nodeHash.c         | 222 ++++++++++++++++++------
 src/backend/executor/nodeHashjoin.c     |  59 ++++---
 src/backend/optimizer/path/joinpath.c   |  14 +-
 src/include/executor/hashjoin.h         |  15 +-
 src/include/executor/nodeHash.h         |   3 +
 src/test/regress/expected/join_hash.out |  58 ++++++-
 src/test/regress/sql/join_hash.sql      |  25 ++-
 7 files changed, 298 insertions(+), 98 deletions(-)

diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c
index 6a57ac8c98..4da05259bb 100644
--- a/src/backend/executor/nodeHash.c
+++ b/src/backend/executor/nodeHash.c
@@ -517,6 +517,7 @@ ExecHashTableCreate(HashState *state, List *hashOperators, List *hashCollations,
 		hashtable->spaceAllowed * SKEW_HASH_MEM_PERCENT / 100;
 	hashtable->chunks = NULL;
 	hashtable->current_chunk = NULL;
+	hashtable->current_chunk_idx = 0;
 	hashtable->parallel_state = state->parallel_state;
 	hashtable->area = state->ps.state->es_query_dsa;
 	hashtable->batches = NULL;
@@ -596,8 +597,8 @@ ExecHashTableCreate(HashState *state, List *hashOperators, List *hashCollations,
 		 * Attach to the build barrier.  The corresponding detach operation is
 		 * in ExecHashTableDetach.  Note that we won't attach to the
 		 * batch_barrier for batch 0 yet.  We'll attach later and start it out
-		 * in PHJ_BATCH_PROBE phase, because batch 0 is allocated up front
-		 * and then loaded while hashing (the standard hybrid hash join
+		 * in PHJ_BATCH_PROBE phase, because batch 0 is allocated up front and
+		 * then loaded while hashing (the standard hybrid hash join
 		 * algorithm), and we'll coordinate that using build_barrier.
 		 */
 		build_barrier = &pstate->build_barrier;
@@ -2070,16 +2071,72 @@ void
 ExecPrepHashTableForUnmatched(HashJoinState *hjstate)
 {
 	/*----------
-	 * During this scan we use the HashJoinState fields as follows:
+	 * During this scan we use the HashJoinTable fields as follows:
 	 *
-	 * hj_CurBucketNo: next regular bucket to scan
-	 * hj_CurSkewBucketNo: next skew bucket (an index into skewBucketNums)
-	 * hj_CurTuple: last tuple returned, or NULL to start next bucket
+	 * current_chunk: current HashMemoryChunk to scan
+	 * current_chunk_idx: index in current HashMemoryChunk
 	 *----------
 	 */
+	HashJoinTable hashtable = hjstate->hj_HashTable;
+
 	hjstate->hj_CurBucketNo = 0;
 	hjstate->hj_CurSkewBucketNo = 0;
 	hjstate->hj_CurTuple = NULL;
+	hashtable->current_chunk = hashtable->chunks;
+	hashtable->current_chunk_idx = 0;
+}
+
+/*
+ * ExecParallelPrepHashTableForUnmatched
+ *		set up for a series of ExecParallelScanHashTableForUnmatched calls
+ *		return true if this worker is elected to do the unmatched inner scan
+ */
+bool
+ExecParallelPrepHashTableForUnmatched(HashJoinState *hjstate)
+{
+	/*----------
+	 * During this scan we use the ParallelHashJoinBatchAccessor fields for the
+	 * current batch as follows:
+	 *
+	 * current_chunk: current HashMemoryChunk to scan
+	 * current_chunk_idx: index in current HashMemoryChunk
+	 *----------
+	 */
+	HashJoinTable hashtable = hjstate->hj_HashTable;
+	int			curbatch = hashtable->curbatch;
+	ParallelHashJoinBatchAccessor *batch_accessor = &hashtable->batches[curbatch];
+	ParallelHashJoinBatch *batch = hashtable->batches[curbatch].shared;
+	bool		last = false;
+
+	hjstate->hj_CurBucketNo = 0;
+	hjstate->hj_CurSkewBucketNo = 0;
+	hjstate->hj_CurTuple = NULL;
+	if (curbatch < 0)
+		return false;
+	last = BarrierArriveAndDetachExceptLast(&batch->batch_barrier);
+	if (!last)
+	{
+		hashtable->batches[hashtable->curbatch].done = true;
+		/* Make sure any temporary files are closed. */
+		sts_end_parallel_scan(hashtable->batches[curbatch].inner_tuples);
+		sts_end_parallel_scan(hashtable->batches[curbatch].outer_tuples);
+
+		/*
+		 * Track the largest batch we've been attached to.  Though each
+		 * backend might see a different subset of batches, explain.c will
+		 * scan the results from all backends to find the largest value.
+		 */
+		hashtable->spacePeak =
+			Max(hashtable->spacePeak, batch->size + sizeof(dsa_pointer_atomic) * hashtable->nbuckets);
+		hashtable->curbatch = -1;
+	}
+	else
+	{
+		batch_accessor->shared_chunk = batch->chunks;
+		batch_accessor->current_chunk = dsa_get_address(hashtable->area, batch_accessor->shared_chunk);
+		batch_accessor->current_chunk_idx = 0;
+	}
+	return last;
 }
 
 /*
@@ -2093,60 +2150,119 @@ ExecPrepHashTableForUnmatched(HashJoinState *hjstate)
 bool
 ExecScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext *econtext)
 {
+	HashMemoryChunk next;
 	HashJoinTable hashtable = hjstate->hj_HashTable;
-	HashJoinTuple hashTuple = hjstate->hj_CurTuple;
 
-	for (;;)
+	while (hashtable->current_chunk)
 	{
-		/*
-		 * hj_CurTuple is the address of the tuple last returned from the
-		 * current bucket, or NULL if it's time to start scanning a new
-		 * bucket.
-		 */
-		if (hashTuple != NULL)
-			hashTuple = hashTuple->next.unshared;
-		else if (hjstate->hj_CurBucketNo < hashtable->nbuckets)
+		while (hashtable->current_chunk_idx < hashtable->current_chunk->used)
 		{
-			hashTuple = hashtable->buckets.unshared[hjstate->hj_CurBucketNo];
-			hjstate->hj_CurBucketNo++;
-		}
-		else if (hjstate->hj_CurSkewBucketNo < hashtable->nSkewBuckets)
-		{
-			int			j = hashtable->skewBucketNums[hjstate->hj_CurSkewBucketNo];
+			HashJoinTuple hashTuple = (HashJoinTuple)
+			(HASH_CHUNK_DATA(hashtable->current_chunk) +
+			 hashtable->current_chunk_idx);
+
+			MinimalTuple tuple = HJTUPLE_MINTUPLE(hashTuple);
+			int			hashTupleSize = (HJTUPLE_OVERHEAD + tuple->t_len);
+
+			/* next tuple in this chunk */
+			hashtable->current_chunk_idx += MAXALIGN(hashTupleSize);
+
+			if (HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(hashTuple)))
+				continue;
+
+			/* insert hashtable's tuple into exec slot */
+			econtext->ecxt_innertuple =
+				ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple),
+									  hjstate->hj_HashTupleSlot,
+									  false);
+
+			/*
+			 * Reset temp memory each time; although this function doesn't do
+			 * any qual eval, the caller will, so let's keep it parallel to
+			 * ExecScanHashBucket.
+			 */
+			ResetExprContext(econtext);
 
-			hashTuple = hashtable->skewBucket[j]->tuples;
-			hjstate->hj_CurSkewBucketNo++;
+			hjstate->hj_CurTuple = hashTuple;
+			return true;
 		}
-		else
-			break;				/* finished all buckets */
 
-		while (hashTuple != NULL)
+		next = hashtable->current_chunk->next.unshared;
+		hashtable->current_chunk = next;
+		hashtable->current_chunk_idx = 0;
+
+		/* allow this loop to be cancellable */
+		CHECK_FOR_INTERRUPTS();
+	}
+
+	/*
+	 * no more unmatched tuples
+	 */
+	return false;
+}
+
+/*
+ * ExecParallelScanHashTableForUnmatched
+ *		scan the hash table for unmatched inner tuples, in parallel
+ *
+ * On success, the inner tuple is stored into hjstate->hj_CurTuple and
+ * econtext->ecxt_innertuple, using hjstate->hj_HashTupleSlot as the slot
+ * for the latter.
+ */
+bool
+ExecParallelScanHashTableForUnmatched(HashJoinState *hjstate,
+									  ExprContext *econtext)
+{
+	HashJoinTable hashtable = hjstate->hj_HashTable;
+	int			curbatch = hashtable->curbatch;
+	ParallelHashJoinBatchAccessor *accessor = &hashtable->batches[curbatch];
+
+	/*
+	 * Only one worker should execute this function. Since tuples have already
+	 * been emitted, it is hazardous for workers to wait at the batch_barrier
+	 * again.
+	 *
+	 * In order to ensure this, when probing has been completed for this
+	 * batch, all workers except one will detach from the batch barrier. The
+	 * last worker advances the batch barrier to phase PHJ_BATCH_SCAN before
+	 * conducting this unmatched inner tuple scan. Workers attaching to the
+	 * batch barrier once it is in phase PHJ_BATCH_SCAN will simply detach.
+	 */
+	while (accessor->current_chunk)
+	{
+		while (accessor->current_chunk_idx < accessor->current_chunk->used)
 		{
-			if (!HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(hashTuple)))
-			{
-				TupleTableSlot *inntuple;
+			HashJoinTuple hashTuple = (HashJoinTuple)
+			(HASH_CHUNK_DATA(accessor->current_chunk) +
+			 accessor->current_chunk_idx);
 
-				/* insert hashtable's tuple into exec slot */
-				inntuple = ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple),
-												 hjstate->hj_HashTupleSlot,
-												 false);	/* do not pfree */
-				econtext->ecxt_innertuple = inntuple;
+			accessor->current_chunk_idx += MAXALIGN(HJTUPLE_OVERHEAD +
+													HJTUPLE_MINTUPLE(hashTuple)->t_len);
 
-				/*
-				 * Reset temp memory each time; although this function doesn't
-				 * do any qual eval, the caller will, so let's keep it
-				 * parallel to ExecScanHashBucket.
-				 */
-				ResetExprContext(econtext);
+			if (HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(hashTuple)))
+				continue;
 
-				hjstate->hj_CurTuple = hashTuple;
-				return true;
-			}
+			/* insert hashtable's tuple into exec slot */
+			econtext->ecxt_innertuple =
+				ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple),
+									  hjstate->hj_HashTupleSlot, false);
+
+			/*
+			 * Reset temp memory each time; although this function doesn't do
+			 * any qual eval, the caller will, so let's keep it parallel to
+			 * ExecScanHashBucket.
+			 */
+			ResetExprContext(econtext);
 
-			hashTuple = hashTuple->next.unshared;
+			hjstate->hj_CurTuple = hashTuple;
+			return true;
 		}
 
-		/* allow this loop to be cancellable */
+		accessor->shared_chunk = accessor->current_chunk->next.shared;
+		accessor->current_chunk = dsa_get_address(hashtable->area,
+												  accessor->shared_chunk);
+		accessor->current_chunk_idx = 0;
+
 		CHECK_FOR_INTERRUPTS();
 	}
 
@@ -3073,8 +3189,8 @@ ExecParallelHashEnsureBatchAccessors(HashJoinTable hashtable)
 
 	/*
 	 * We should never see a state where the batch-tracking array is freed,
-	 * because we should have given up sooner if we join when the build barrier
-	 * has reached the PHJ_BUILD_FREE phase.
+	 * because we should have given up sooner if we join when the build
+	 * barrier has reached the PHJ_BUILD_FREE phase.
 	 */
 	Assert(DsaPointerIsValid(pstate->batches));
 
@@ -3152,13 +3268,6 @@ ExecHashTableDetachBatch(HashJoinTable hashtable)
 		/* Detach from the batch we were last working on. */
 		if (BarrierArriveAndDetach(&batch->batch_barrier))
 		{
-			/*
-			 * Technically we shouldn't access the barrier because we're no
-			 * longer attached, but since there is no way it's moving after
-			 * this point it seems safe to make the following assertion.
-			 */
-			Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_FREE);
-
 			/* Free shared chunks and buckets. */
 			while (DsaPointerIsValid(batch->chunks))
 			{
@@ -3305,6 +3414,9 @@ ExecParallelHashTableSetCurrentBatch(HashJoinTable hashtable, int batchno)
 	hashtable->current_chunk = NULL;
 	hashtable->current_chunk_shared = InvalidDsaPointer;
 	hashtable->batches[batchno].at_least_one_chunk = false;
+	hashtable->batches[batchno].shared_chunk = InvalidDsaPointer;
+	hashtable->batches[batchno].current_chunk = NULL;
+	hashtable->batches[batchno].current_chunk_idx = 0;
 }
 
 /*
diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c
index a45c657550..35206d38df 100644
--- a/src/backend/executor/nodeHashjoin.c
+++ b/src/backend/executor/nodeHashjoin.c
@@ -86,6 +86,7 @@
  *  PHJ_BATCH_ALLOCATE*      -- one allocates buckets
  *  PHJ_BATCH_LOAD           -- all load the hash table from disk
  *  PHJ_BATCH_PROBE          -- all probe
+ *  PHJ_BATCH_SCAN*          -- full/right outer scan
  *  PHJ_BATCH_FREE*          -- one frees memory
  *
  * Batch 0 is a special case, because it starts out in phase
@@ -103,9 +104,10 @@
  * to a barrier, unless the barrier has reached a phase that means that no
  * process will wait on it again.  We emit tuples while attached to the build
  * barrier in phase PHJ_BUILD_RUN, and to a per-batch barrier in phase
- * PHJ_BATCH_PROBE.  These are advanced to PHJ_BUILD_FREE and PHJ_BATCH_FREE
- * respectively without waiting, using BarrierArriveAndDetach().  The last to
- * detach receives a different return value so that it knows that it's safe to
+ * PHJ_BATCH_PROBE.  These are advanced to PHJ_BUILD_FREE and PHJ_BATCH_SCAN
+ * respectively without waiting, using BarrierArriveAndDetach() and
+ * BarrierArriveAndDetachExceptLast() respectively.  The last to detach
+ * receives a different return value so that it knows that it's safe to
  * clean up.  Any straggler process that attaches after that phase is reached
  * will see that it's too late to participate or access the relevant shared
  * memory objects.
@@ -393,9 +395,19 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
 					/* end of batch, or maybe whole join */
 					if (HJ_FILL_INNER(node))
 					{
-						/* set up to scan for unmatched inner tuples */
-						ExecPrepHashTableForUnmatched(node);
-						node->hj_JoinState = HJ_FILL_INNER_TUPLES;
+						if (parallel)
+						{
+							if (ExecParallelPrepHashTableForUnmatched(node))
+								node->hj_JoinState = HJ_FILL_INNER_TUPLES;
+							else
+								node->hj_JoinState = HJ_NEED_NEW_BATCH;
+						}
+						else
+						{
+							/* set up to scan for unmatched inner tuples */
+							ExecPrepHashTableForUnmatched(node);
+							node->hj_JoinState = HJ_FILL_INNER_TUPLES;
+						}
 					}
 					else
 						node->hj_JoinState = HJ_NEED_NEW_BATCH;
@@ -488,25 +500,13 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
 				{
 					node->hj_MatchedOuter = true;
 
-					if (parallel)
-					{
-						/*
-						 * Full/right outer joins are currently not supported
-						 * for parallel joins, so we don't need to set the
-						 * match bit.  Experiments show that it's worth
-						 * avoiding the shared memory traffic on large
-						 * systems.
-						 */
-						Assert(!HJ_FILL_INNER(node));
-					}
-					else
-					{
-						/*
-						 * This is really only needed if HJ_FILL_INNER(node),
-						 * but we'll avoid the branch and just set it always.
-						 */
+
+					/*
+					 * This is really only needed if HJ_FILL_INNER(node), but
+					 * we'll avoid the branch and just set it always.
+					 */
+					if (!HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(node->hj_CurTuple)))
 						HeapTupleHeaderSetMatch(HJTUPLE_MINTUPLE(node->hj_CurTuple));
-					}
 
 					/* In an antijoin, we never return a matched tuple */
 					if (node->js.jointype == JOIN_ANTI)
@@ -564,7 +564,8 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
 				 * so any unmatched inner tuples in the hashtable have to be
 				 * emitted before we continue to the next batch.
 				 */
-				if (!ExecScanHashTableForUnmatched(node, econtext))
+				if (!(parallel ? ExecParallelScanHashTableForUnmatched(node, econtext)
+					  : ExecScanHashTableForUnmatched(node, econtext)))
 				{
 					/* no more unmatched tuples */
 					node->hj_JoinState = HJ_NEED_NEW_BATCH;
@@ -1198,13 +1199,15 @@ ExecParallelHashJoinNewBatch(HashJoinState *hjstate)
 					 * hash table stays alive until everyone's finished
 					 * probing it, but no participant is allowed to wait at
 					 * this barrier again (or else a deadlock could occur).
-					 * All attached participants must eventually call
-					 * BarrierArriveAndDetach() so that the final phase
-					 * PHJ_BATCH_FREE can be reached.
+					 * All attached participants must eventually detach from
+					 * the barrier and one worker must advance the phase so
+					 * that the final phase is reached.
 					 */
 					ExecParallelHashTableSetCurrentBatch(hashtable, batchno);
 					sts_begin_parallel_scan(hashtable->batches[batchno].outer_tuples);
 					return true;
+				case PHJ_BATCH_SCAN:
+					/* Fall through. */
 
 				case PHJ_BATCH_FREE:
 
diff --git a/src/backend/optimizer/path/joinpath.c b/src/backend/optimizer/path/joinpath.c
index 9a8c5165b0..d8630ef934 100644
--- a/src/backend/optimizer/path/joinpath.c
+++ b/src/backend/optimizer/path/joinpath.c
@@ -2136,15 +2136,9 @@ hash_inner_and_outer(PlannerInfo *root,
 		 * able to properly guarantee uniqueness.  Similarly, we can't handle
 		 * JOIN_FULL and JOIN_RIGHT, because they can produce false null
 		 * extended rows.  Also, the resulting path must not be parameterized.
-		 * We would be able to support JOIN_FULL and JOIN_RIGHT for Parallel
-		 * Hash, since in that case we're back to a single hash table with a
-		 * single set of match bits for each batch, but that will require
-		 * figuring out a deadlock-free way to wait for the probe to finish.
 		 */
 		if (joinrel->consider_parallel &&
 			save_jointype != JOIN_UNIQUE_OUTER &&
-			save_jointype != JOIN_FULL &&
-			save_jointype != JOIN_RIGHT &&
 			outerrel->partial_pathlist != NIL &&
 			bms_is_empty(joinrel->lateral_relids))
 		{
@@ -2178,9 +2172,13 @@ hash_inner_and_outer(PlannerInfo *root,
 			 * total inner path will also be parallel-safe, but if not, we'll
 			 * have to search for the cheapest safe, unparameterized inner
 			 * path.  If doing JOIN_UNIQUE_INNER, we can't use any alternative
-			 * inner path.
+			 * inner path.  If full or right join, we can't use parallelism
+			 * (building the hash table in each backend) because no one
+			 * process has all the match bits.
 			 */
-			if (cheapest_total_inner->parallel_safe)
+			if (save_jointype == JOIN_FULL || save_jointype == JOIN_RIGHT)
+				cheapest_safe_inner = NULL;
+			else if (cheapest_total_inner->parallel_safe)
 				cheapest_safe_inner = cheapest_total_inner;
 			else if (save_jointype != JOIN_UNIQUE_INNER)
 				cheapest_safe_inner =
diff --git a/src/include/executor/hashjoin.h b/src/include/executor/hashjoin.h
index 534f818bd7..627ba1b1ff 100644
--- a/src/include/executor/hashjoin.h
+++ b/src/include/executor/hashjoin.h
@@ -205,6 +205,15 @@ typedef struct ParallelHashJoinBatchAccessor
 	bool		at_least_one_chunk; /* has this backend allocated a chunk? */
 
 	bool		done;			/* flag to remember that a batch is done */
+
+	/*
+	 * While doing the unmatched inner scan, the assigned worker may emit
+	 * tuples. Thus, we must keep track of where it was in the hashtable so it
+	 * can return to the correct offset within the correct chunk.
+	 */
+	dsa_pointer shared_chunk;
+	HashMemoryChunk current_chunk;
+	size_t		current_chunk_idx;
 	SharedTuplestoreAccessor *inner_tuples;
 	SharedTuplestoreAccessor *outer_tuples;
 } ParallelHashJoinBatchAccessor;
@@ -266,7 +275,8 @@ typedef struct ParallelHashJoinState
 #define PHJ_BATCH_ALLOCATE				1
 #define PHJ_BATCH_LOAD					2
 #define PHJ_BATCH_PROBE					3
-#define PHJ_BATCH_FREE					4
+#define PHJ_BATCH_SCAN					4
+#define PHJ_BATCH_FREE					5
 
 /* The phases of batch growth while hashing, for grow_batches_barrier. */
 #define PHJ_GROW_BATCHES_ELECT			0
@@ -352,6 +362,9 @@ typedef struct HashJoinTableData
 	/* used for dense allocation of tuples (into linked chunks) */
 	HashMemoryChunk chunks;		/* one list for the whole batch */
 
+	/* index of tuple within current chunk for serial unmatched inner scan */
+	size_t		current_chunk_idx;
+
 	/* Shared and private state for Parallel Hash. */
 	HashMemoryChunk current_chunk;	/* this backend's current chunk */
 	dsa_area   *area;			/* DSA area to allocate memory from */
diff --git a/src/include/executor/nodeHash.h b/src/include/executor/nodeHash.h
index e1e0dec24b..03f2f8ee81 100644
--- a/src/include/executor/nodeHash.h
+++ b/src/include/executor/nodeHash.h
@@ -56,8 +56,11 @@ extern void ExecHashGetBucketAndBatch(HashJoinTable hashtable,
 extern bool ExecScanHashBucket(HashJoinState *hjstate, ExprContext *econtext);
 extern bool ExecParallelScanHashBucket(HashJoinState *hjstate, ExprContext *econtext);
 extern void ExecPrepHashTableForUnmatched(HashJoinState *hjstate);
+extern bool ExecParallelPrepHashTableForUnmatched(HashJoinState *hjstate);
 extern bool ExecScanHashTableForUnmatched(HashJoinState *hjstate,
 										  ExprContext *econtext);
+extern bool ExecParallelScanHashTableForUnmatched(HashJoinState *hjstate,
+												  ExprContext *econtext);
 extern void ExecHashTableReset(HashJoinTable hashtable);
 extern void ExecHashTableResetMatchFlags(HashJoinTable hashtable);
 extern void ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,
diff --git a/src/test/regress/expected/join_hash.out b/src/test/regress/expected/join_hash.out
index 3ec07bc1af..027f3888b0 100644
--- a/src/test/regress/expected/join_hash.out
+++ b/src/test/regress/expected/join_hash.out
@@ -784,8 +784,9 @@ select  count(*) from simple r full outer join simple s using (id);
 (1 row)
 
 rollback to settings;
--- parallelism not possible with parallel-oblivious outer hash join
+-- parallelism not possible with parallel-oblivious full hash join
 savepoint settings;
+set enable_parallel_hash = off;
 set local max_parallel_workers_per_gather = 2;
 explain (costs off)
      select  count(*) from simple r full outer join simple s using (id);
@@ -806,7 +807,32 @@ select  count(*) from simple r full outer join simple s using (id);
 (1 row)
 
 rollback to settings;
--- An full outer join where every record is not matched.
+-- parallelism is possible with parallel-aware full hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+     select  count(*) from simple r full outer join simple s using (id);
+                         QUERY PLAN                          
+-------------------------------------------------------------
+ Finalize Aggregate
+   ->  Gather
+         Workers Planned: 2
+         ->  Partial Aggregate
+               ->  Parallel Hash Full Join
+                     Hash Cond: (r.id = s.id)
+                     ->  Parallel Seq Scan on simple r
+                     ->  Parallel Hash
+                           ->  Parallel Seq Scan on simple s
+(9 rows)
+
+select  count(*) from simple r full outer join simple s using (id);
+ count 
+-------
+ 20000
+(1 row)
+
+rollback to settings;
+-- A full outer join where every record is not matched.
 -- non-parallel
 savepoint settings;
 set local max_parallel_workers_per_gather = 0;
@@ -829,8 +855,9 @@ select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
 (1 row)
 
 rollback to settings;
--- parallelism not possible with parallel-oblivious outer hash join
+-- parallelism not possible with parallel-oblivious full hash join
 savepoint settings;
+set enable_parallel_hash = off;
 set local max_parallel_workers_per_gather = 2;
 explain (costs off)
      select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
@@ -850,6 +877,31 @@ select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
  40000
 (1 row)
 
+rollback to settings;
+-- parallelism is possible with parallel-aware full hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+     select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+                         QUERY PLAN                          
+-------------------------------------------------------------
+ Finalize Aggregate
+   ->  Gather
+         Workers Planned: 2
+         ->  Partial Aggregate
+               ->  Parallel Hash Full Join
+                     Hash Cond: ((0 - s.id) = r.id)
+                     ->  Parallel Seq Scan on simple s
+                     ->  Parallel Hash
+                           ->  Parallel Seq Scan on simple r
+(9 rows)
+
+select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+ count 
+-------
+ 40000
+(1 row)
+
 rollback to settings;
 -- exercise special code paths for huge tuples (note use of non-strict
 -- expression and left join required to get the detoasted tuple into
diff --git a/src/test/regress/sql/join_hash.sql b/src/test/regress/sql/join_hash.sql
index 77dbc182d5..ba1b3e6e1b 100644
--- a/src/test/regress/sql/join_hash.sql
+++ b/src/test/regress/sql/join_hash.sql
@@ -435,15 +435,24 @@ explain (costs off)
 select  count(*) from simple r full outer join simple s using (id);
 rollback to settings;
 
--- parallelism not possible with parallel-oblivious outer hash join
+-- parallelism not possible with parallel-oblivious full hash join
 savepoint settings;
+set enable_parallel_hash = off;
 set local max_parallel_workers_per_gather = 2;
 explain (costs off)
      select  count(*) from simple r full outer join simple s using (id);
 select  count(*) from simple r full outer join simple s using (id);
 rollback to settings;
 
--- An full outer join where every record is not matched.
+-- parallelism is possible with parallel-aware full hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+     select  count(*) from simple r full outer join simple s using (id);
+select  count(*) from simple r full outer join simple s using (id);
+rollback to settings;
+
+-- A full outer join where every record is not matched.
 
 -- non-parallel
 savepoint settings;
@@ -453,14 +462,24 @@ explain (costs off)
 select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
 rollback to settings;
 
--- parallelism not possible with parallel-oblivious outer hash join
+-- parallelism not possible with parallel-oblivious full hash join
 savepoint settings;
+set enable_parallel_hash = off;
 set local max_parallel_workers_per_gather = 2;
 explain (costs off)
      select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
 select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
 rollback to settings;
 
+-- parallelism is possible with parallel-aware full hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+     select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+rollback to settings;
+
+
 -- exercise special code paths for huge tuples (note use of non-strict
 -- expression and left join required to get the detoasted tuple into
 -- the hash table)
-- 
2.30.2

Reply via email to