Re: Parallel Full Hash Join

2023-04-12 Thread Thomas Munro
On Mon, Apr 10, 2023 at 11:33 AM Michael Paquier  wrote:
> On Sat, Apr 08, 2023 at 02:19:54PM -0400, Melanie Plageman wrote:
> > Another worker attached to the batch barrier, saw that it was in
> > PHJ_BATCH_SCAN, marked it done and detached. This is fine.
> > BarrierArriveAndDetachExceptLast() is meant to ensure no one waits
> > (deadlock hazard) and that at least one worker stays to do the unmatched
> > scan. It doesn't hurt anything for another worker to join and find out
> > there is no work to do.
> >
> > We should simply delete this assertion.

Agreed, and pushed.  Thanks!

> I have added an open item about that.  This had better be tracked.

Thanks, will update.




Re: Parallel Full Hash Join

2023-04-09 Thread Michael Paquier
On Sat, Apr 08, 2023 at 02:19:54PM -0400, Melanie Plageman wrote:
> Another worker attached to the batch barrier, saw that it was in
> PHJ_BATCH_SCAN, marked it done and detached. This is fine.
> BarrierArriveAndDetachExceptLast() is meant to ensure no one waits
> (deadlock hazard) and that at least one worker stays to do the unmatched
> scan. It doesn't hurt anything for another worker to join and find out
> there is no work to do.
> 
> We should simply delete this assertion.

I have added an open item about that.  This had better be tracked.
--
Michael


signature.asc
Description: PGP signature


Re: Parallel Full Hash Join

2023-04-08 Thread Melanie Plageman
On Sat, Apr 8, 2023 at 1:30 PM Melanie Plageman
 wrote:
>
> On Sat, Apr 8, 2023 at 12:33 PM Tom Lane  wrote:
> >
> > Thomas Munro  writes:
> > > I committed the main patch.
> >
> > BTW, it was easy to miss in all the buildfarm noise from
> > last-possible-minute patches, but chimaera just showed something
> > that looks like a bug in this code [1]:
> >
> > 2023-04-08 12:25:28.709 UTC [18027:321] pg_regress/join_hash LOG:  
> > statement: savepoint settings;
> > 2023-04-08 12:25:28.709 UTC [18027:322] pg_regress/join_hash LOG:  
> > statement: set local max_parallel_workers_per_gather = 2;
> > 2023-04-08 12:25:28.710 UTC [18027:323] pg_regress/join_hash LOG:  
> > statement: explain (costs off)
> >  select  count(*) from simple r full outer join simple s on 
> > (r.id = 0 - s.id);
> > 2023-04-08 12:25:28.710 UTC [18027:324] pg_regress/join_hash LOG:  
> > statement: select  count(*) from simple r full outer join simple s on (r.id 
> > = 0 - s.id);
> > TRAP: failed Assert("BarrierParticipants(>batch_barrier) == 1"), 
> > File: "nodeHash.c", Line: 2118, PID: 19147

So, after staring at this for awhile, I suspect this assertion is just
plain wrong. BarrierArriveAndDetachExceptLast() contains this code:

if (barrier->participants > 1)
{
--barrier->participants;
SpinLockRelease(>mutex);

return false;
}
Assert(barrier->participants == 1);

So in between this assertion and the one we tripped,

if (!BarrierArriveAndDetachExceptLast(>batch_barrier))
{
...
return false;
}

/* Now we are alone with this batch. */
Assert(BarrierPhase(>batch_barrier) == PHJ_BATCH_SCAN);
Assert(BarrierParticipants(>batch_barrier) == 1);

Another worker attached to the batch barrier, saw that it was in
PHJ_BATCH_SCAN, marked it done and detached. This is fine.
BarrierArriveAndDetachExceptLast() is meant to ensure no one waits
(deadlock hazard) and that at least one worker stays to do the unmatched
scan. It doesn't hurt anything for another worker to join and find out
there is no work to do.

We should simply delete this assertion.

- Melanie




Re: Parallel Full Hash Join

2023-04-08 Thread Tom Lane
Melanie Plageman  writes:
> Having not done much debugging on buildfarm animals before, I don't
> suppose there is any way to get access to the core itself? I'd like to
> see how many participants the batch barrier had at the time of the
> assertion failure. I assume it was 2, but I just wanted to make sure I
> understand the race.

I don't know about chimaera in particular, but buildfarm animals are
not typically configured to save any build products.  They'd run out
of disk space after awhile :-(.

If you think the number of participants would be useful data, I'd
suggest replacing that Assert with an elog() that prints what you
want to know.

regards, tom lane




Re: Parallel Full Hash Join

2023-04-08 Thread Melanie Plageman
On Sat, Apr 8, 2023 at 12:33 PM Tom Lane  wrote:
>
> Thomas Munro  writes:
> > I committed the main patch.
>
> BTW, it was easy to miss in all the buildfarm noise from
> last-possible-minute patches, but chimaera just showed something
> that looks like a bug in this code [1]:
>
> 2023-04-08 12:25:28.709 UTC [18027:321] pg_regress/join_hash LOG:  statement: 
> savepoint settings;
> 2023-04-08 12:25:28.709 UTC [18027:322] pg_regress/join_hash LOG:  statement: 
> set local max_parallel_workers_per_gather = 2;
> 2023-04-08 12:25:28.710 UTC [18027:323] pg_regress/join_hash LOG:  statement: 
> explain (costs off)
>  select  count(*) from simple r full outer join simple s on (r.id 
> = 0 - s.id);
> 2023-04-08 12:25:28.710 UTC [18027:324] pg_regress/join_hash LOG:  statement: 
> select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
> TRAP: failed Assert("BarrierParticipants(>batch_barrier) == 1"), File: 
> "nodeHash.c", Line: 2118, PID: 19147
> postgres: parallel worker for PID 18027 
> (ExceptionalCondition+0x84)[0x10ae2bfa4]
> postgres: parallel worker for PID 18027 
> (ExecParallelPrepHashTableForUnmatched+0x224)[0x10aa67544]
> postgres: parallel worker for PID 18027 (+0x3db868)[0x10aa6b868]
> postgres: parallel worker for PID 18027 (+0x3c4204)[0x10aa54204]
> postgres: parallel worker for PID 18027 (+0x3c81b8)[0x10aa581b8]
> postgres: parallel worker for PID 18027 (+0x3b3d28)[0x10aa43d28]
> postgres: parallel worker for PID 18027 
> (standard_ExecutorRun+0x208)[0x10aa39768]
> postgres: parallel worker for PID 18027 (ParallelQueryMain+0x2bc)[0x10aa4092c]
> postgres: parallel worker for PID 18027 
> (ParallelWorkerMain+0x660)[0x10a874870]
> postgres: parallel worker for PID 18027 
> (StartBackgroundWorker+0x2a8)[0x10ab8abf8]
> postgres: parallel worker for PID 18027 (+0x50290c)[0x10ab9290c]
> postgres: parallel worker for PID 18027 (+0x5035e4)[0x10ab935e4]
> postgres: parallel worker for PID 18027 (PostmasterMain+0x1304)[0x10ab96334]
> postgres: parallel worker for PID 18027 (main+0x86c)[0x10a79daec]

Having not done much debugging on buildfarm animals before, I don't
suppose there is any way to get access to the core itself? I'd like to
see how many participants the batch barrier had at the time of the
assertion failure. I assume it was 2, but I just wanted to make sure I
understand the race.

- Melanie




Re: Parallel Full Hash Join

2023-04-08 Thread Tom Lane
Thomas Munro  writes:
> I committed the main patch.

BTW, it was easy to miss in all the buildfarm noise from
last-possible-minute patches, but chimaera just showed something
that looks like a bug in this code [1]:

2023-04-08 12:25:28.709 UTC [18027:321] pg_regress/join_hash LOG:  statement: 
savepoint settings;
2023-04-08 12:25:28.709 UTC [18027:322] pg_regress/join_hash LOG:  statement: 
set local max_parallel_workers_per_gather = 2;
2023-04-08 12:25:28.710 UTC [18027:323] pg_regress/join_hash LOG:  statement: 
explain (costs off)
 select  count(*) from simple r full outer join simple s on (r.id = 
0 - s.id);
2023-04-08 12:25:28.710 UTC [18027:324] pg_regress/join_hash LOG:  statement: 
select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
TRAP: failed Assert("BarrierParticipants(>batch_barrier) == 1"), File: 
"nodeHash.c", Line: 2118, PID: 19147
postgres: parallel worker for PID 18027 (ExceptionalCondition+0x84)[0x10ae2bfa4]
postgres: parallel worker for PID 18027 
(ExecParallelPrepHashTableForUnmatched+0x224)[0x10aa67544]
postgres: parallel worker for PID 18027 (+0x3db868)[0x10aa6b868]
postgres: parallel worker for PID 18027 (+0x3c4204)[0x10aa54204]
postgres: parallel worker for PID 18027 (+0x3c81b8)[0x10aa581b8]
postgres: parallel worker for PID 18027 (+0x3b3d28)[0x10aa43d28]
postgres: parallel worker for PID 18027 
(standard_ExecutorRun+0x208)[0x10aa39768]
postgres: parallel worker for PID 18027 (ParallelQueryMain+0x2bc)[0x10aa4092c]
postgres: parallel worker for PID 18027 (ParallelWorkerMain+0x660)[0x10a874870]
postgres: parallel worker for PID 18027 
(StartBackgroundWorker+0x2a8)[0x10ab8abf8]
postgres: parallel worker for PID 18027 (+0x50290c)[0x10ab9290c]
postgres: parallel worker for PID 18027 (+0x5035e4)[0x10ab935e4]
postgres: parallel worker for PID 18027 (PostmasterMain+0x1304)[0x10ab96334]
postgres: parallel worker for PID 18027 (main+0x86c)[0x10a79daec]

regards, tom lane

[1] 
https://buildfarm.postgresql.org/cgi-bin/show_log.pl?nm=chimaera=2023-04-08%2012%3A07%3A08




Re: Parallel Full Hash Join

2023-04-04 Thread Thomas Munro
On Wed, Apr 5, 2023 at 7:37 AM Tom Lane  wrote:
> The comment is no longer in sync with the code: this if-test used to
> reject JOIN_FULL and JOIN_RIGHT, and no longer does so, but the comment
> still claims it should.  Shouldn't we drop the sentence beginning
> "Similarly"?  (I see that there's now one sub-section that still rejects
> such cases, but it no longer seems correct to claim that they're rejected
> overall.)

Yeah, thanks.  Done.




Re: Parallel Full Hash Join

2023-04-04 Thread Tom Lane
Thomas Munro  writes:
> I committed the main patch.

This left the following code in hash_inner_and_outer (joinpath.c):

/*
 * If the joinrel is parallel-safe, we may be able to consider a
 * partial hash join.  However, we can't handle JOIN_UNIQUE_OUTER,
 * because the outer path will be partial, and therefore we won't be
 * able to properly guarantee uniqueness.  Similarly, we can't handle
 * JOIN_FULL and JOIN_RIGHT, because they can produce false null
 * extended rows.  Also, the resulting path must not be parameterized.
 */
if (joinrel->consider_parallel &&
save_jointype != JOIN_UNIQUE_OUTER &&
outerrel->partial_pathlist != NIL &&
bms_is_empty(joinrel->lateral_relids))
{

The comment is no longer in sync with the code: this if-test used to
reject JOIN_FULL and JOIN_RIGHT, and no longer does so, but the comment
still claims it should.  Shouldn't we drop the sentence beginning
"Similarly"?  (I see that there's now one sub-section that still rejects
such cases, but it no longer seems correct to claim that they're rejected
overall.)

regards, tom lane




Re: Parallel Full Hash Join

2023-03-30 Thread Melanie Plageman
On Mon, Mar 27, 2023 at 7:04 PM Thomas Munro  wrote:
> I found another problem.  I realised that ... FULL JOIN ... LIMIT n
> might be able to give wrong answers with unlucky scheduling.
> Unfortunately I have been unable to reproduce the phenomenon I am
> imagining yet but I can't think of any mechanism that prevents the
> following sequence of events:
>
> P0 probes, pulls n tuples from the outer relation and then the
> executor starts to shut down (see commit 3452dc52 which pushed down
> LIMIT), but just then P1 attaches, right before P0 does.  P1
> continues, and finds < n outer tuples while probing and then runs out
> so it enters the unmatched scan phase, and starts emitting bogusly
> unmatched tuples.  Some outer tuples we needed to get the complete set
> of match bits and thus the right answer were buffered inside P0's
> subplan and abandoned.
>
> I've attached a simple fixup for this problem.  Short version: if
> you're abandoning your PHJ_BATCH_PROBE phase without reaching the end,
> you must be shutting down, so the executor must think it's OK to
> abandon tuples this process has buffered, so it must also be OK to
> throw all unmatched tuples out the window too, as if this process was
> about to emit them.  Right?

I understand the scenario you are thinking of, however, I question how
those incorrectly formed tuples would ever be returned by the query. The
hashjoin would only start to shutdown once enough tuples had been
emitted to satisfy the limit, at which point, those tuples buffered in
p0 may be emitted by this worker but wouldn't be included in the query
result, no?

I suppose even if what I said is true, we do not want the hashjoin node
to ever produce incorrect tuples. In which case, your fix seems correct to me.

> With all the long and abstract discussion of hard to explain problems
> in this thread and related threads, I thought I should take a step
> back and figure out a way to demonstrate what this thing really does
> visually.  I wanted to show that this is a very useful feature that
> unlocks previously unobtainable parallelism, and to show the
> compromise we've had to make so far in an intuitive way.  With some
> extra instrumentation hacked up locally, I produced the attached
> "progress" graphs for a very simple query: SELECT COUNT(*) FROM r FULL
> JOIN s USING (i).  Imagine a time axis along the bottom, but I didn't
> bother to add numbers because I'm just trying to convey the 'shape' of
> execution with relative times and synchronisation points.
>
> Figures 1-3 show that phases 'h' (hash) and 'p' (probe) are
> parallelised and finish sooner as we add more processes to help out,
> but 's' (= the unmatched inner tuple scan) is not.  Note that if all
> inner tuples are matched, 's' becomes extremely small and the
> parallelism is almost as fair as a plain old inner join, but here I've
> maximised it: all inner tuples were unmatched, because the two
> relations have no matches at all.  Even if we achieve perfect linear
> scalability for the other phases, the speedup will be governed by
> https://en.wikipedia.org/wiki/Amdahl%27s_law and the only thing that
> can mitigate that is if there is more useful work those early-quitting
> processes could do somewhere else in your query plan.
>
> Figure 4 shows that it gets a lot fairer in a multi-batch join,
> because there is usually useful work to do on other batches of the
> same join.  Notice how processes initially work on loading, probing
> and scanning different batches to reduce contention, but they are
> capable of ganging up to load and/or probe the same batch if there is
> nothing else left to do (for example P2 and P3 both work on p5 near
> the end).  For now, they can't do that for the s phases.  (BTW, the
> little gaps before loading is the allocation phase that I didn't
> bother to plot because they can't fit a label on them; this
> visualisation technique is a WIP.)
>
> With the "opportunistic" change we are discussing for later work,
> figure 4 would become completely fair (P0 and P2 would be able to join
> in and help out with s6 and s7), but single-batch figures 1-3 would
> not (that would require a different executor design AFAICT, or a
> eureka insight we haven't had yet; see long-winded discussion).

Cool diagrams!

> The last things I'm thinking about now:  Are the planner changes
> right?

I think the current changes are correct. I wonder if we have to change
anything in initial/final_cost_hashjoin to account for the fact that
for a single batch full/right parallel hash join, part of the
execution is serial. And, if so, do we need to consider the estimated
number of unmatched tuples to be emitted?

> Are the tests enough?

So, the tests currently in the patch set cover the unmatched tuple scan
phase fo

Re: Parallel Full Hash Join

2023-03-27 Thread Thomas Munro
On Sun, Mar 26, 2023 at 9:52 AM Melanie Plageman
 wrote:
> I have some very minor pieces of feedback, mainly about extraneous
> commas that made me uncomfortable ;)

Offensive punctuation removed.

> > discussion).  Therefore FULL JOIN inhibited page-based parallelism,
> > as the other join strategies can't do it either.
>
> I actually don't quite understand what this means? It's been awhile for
> me, so perhaps I'm being dense, but what is page-based parallelism?

Reworded.  I just meant our usual kind of "partial path" parallelism
(the kind when you don't know anything at all about the values of the
tuples that each process sees, and typically it's chopped up by
storage pages at the scan level).

> > That unfairness is considered acceptable for now, because it's better
> > than no parallelism at all.  The build and probe phases are run in
> > parallel, and the new scan-for-unmatched phase, while serial, is usually
> > applied to the smaller of the two relations and is either limited by
> > some multiple of work_mem, or it's too big and is partitioned into
> > batches and then the situation is improved by batch-level parallelism.
> > In future work on deadlock avoidance strategies, we may find a way to
> > parallelize the new phase safely.
>
> Is it worth mentioning something about parallel-oblivious parallel hash
> join not being able to do this still? Or is that obvious?

That's kind of what I meant above.

> > @@ -3116,18 +3256,31 @@ ExecHashTableDetachBatch(HashJoinTable hashtable)

> full/right joins should never fall into this code path, right?

Yeah, this is the normal way we detach from a batch.  This is reached
when shutting down the executor early, or when moving to the next
batch, etc.

***

I found another problem.  I realised that ... FULL JOIN ... LIMIT n
might be able to give wrong answers with unlucky scheduling.
Unfortunately I have been unable to reproduce the phenomenon I am
imagining yet but I can't think of any mechanism that prevents the
following sequence of events:

P0 probes, pulls n tuples from the outer relation and then the
executor starts to shut down (see commit 3452dc52 which pushed down
LIMIT), but just then P1 attaches, right before P0 does.  P1
continues, and finds < n outer tuples while probing and then runs out
so it enters the unmatched scan phase, and starts emitting bogusly
unmatched tuples.  Some outer tuples we needed to get the complete set
of match bits and thus the right answer were buffered inside P0's
subplan and abandoned.

I've attached a simple fixup for this problem.  Short version: if
you're abandoning your PHJ_BATCH_PROBE phase without reaching the end,
you must be shutting down, so the executor must think it's OK to
abandon tuples this process has buffered, so it must also be OK to
throw all unmatched tuples out the window too, as if this process was
about to emit them.  Right?

***

With all the long and abstract discussion of hard to explain problems
in this thread and related threads, I thought I should take a step
back and figure out a way to demonstrate what this thing really does
visually.  I wanted to show that this is a very useful feature that
unlocks previously unobtainable parallelism, and to show the
compromise we've had to make so far in an intuitive way.  With some
extra instrumentation hacked up locally, I produced the attached
"progress" graphs for a very simple query: SELECT COUNT(*) FROM r FULL
JOIN s USING (i).  Imagine a time axis along the bottom, but I didn't
bother to add numbers because I'm just trying to convey the 'shape' of
execution with relative times and synchronisation points.

Figures 1-3 show that phases 'h' (hash) and 'p' (probe) are
parallelised and finish sooner as we add more processes to help out,
but 's' (= the unmatched inner tuple scan) is not.  Note that if all
inner tuples are matched, 's' becomes extremely small and the
parallelism is almost as fair as a plain old inner join, but here I've
maximised it: all inner tuples were unmatched, because the two
relations have no matches at all.  Even if we achieve perfect linear
scalability for the other phases, the speedup will be governed by
https://en.wikipedia.org/wiki/Amdahl%27s_law and the only thing that
can mitigate that is if there is more useful work those early-quitting
processes could do somewhere else in your query plan.

Figure 4 shows that it gets a lot fairer in a multi-batch join,
because there is usually useful work to do on other batches of the
same join.  Notice how processes initially work on loading, probing
and scanning different batches to reduce contention, but they are
capable of ganging up to load and/or probe the same batch if there is
nothing else left to do (for example P2 and P3 both work on p5 near
the end).  For now, they can't do that for the s phases.  (BTW, the
little gaps before loading is the allocation phase that I didn't
bother to plot because they can't fit a label on them; this
visualisation technique is a WIP.)

With the 

Re: Parallel Full Hash Join

2023-03-25 Thread Melanie Plageman
On Sat, Mar 25, 2023 at 09:21:34AM +1300, Thomas Munro wrote:
>  * reuse the same umatched_scan_{chunk,idx} variables as above
>  * rename the list of chunks to scan to work_queue
>  * fix race/memory leak if we see PHJ_BATCH_SCAN when we attach (it
> wasn't OK to just fall through)

ah, good catch.

> I don't love the way that both ExecHashTableDetachBatch() and
> ExecParallelPrepHashTableForUnmatched() duplicate logic relating to
> the _SCAN/_FREE protocol, but I'm struggling to find a better idea.
> Perhaps I just need more coffee.

I'm not sure if I have strong feelings either way.
To confirm I understand, though: in ExecHashTableDetachBatch(), the call
to BarrierArriveAndDetachExceptLast() serves only to advance the barrier
phase through _SCAN, right? It doesn't really matter if this worker is
the last worker since BarrierArriveAndDetach() handles that for us.
There isn't another barrier function to do this (and I mostly think it
is fine), but I did have to think on it for a bit.

Oh, and, unrelated, but it is maybe worth updating the BarrierAttach()
function comment to mention BarrierArriveAndDetachExceptLast().

> I think your idea of opportunistically joining the scan if it's
> already running makes sense to explore for a later step, ie to make
> multi-batch PHFJ fully fair, and I think that should be a fairly easy
> code change, and I put in some comments where changes would be needed.

makes sense.

I have some very minor pieces of feedback, mainly about extraneous
commas that made me uncomfortable ;)

> From 8b526377eb4a4685628624e75743aedf37dd5bfe Mon Sep 17 00:00:00 2001
> From: Thomas Munro 
> Date: Fri, 24 Mar 2023 14:19:07 +1300
> Subject: [PATCH v12 1/2] Scan for unmatched hash join tuples in memory order.
> 
> In a full/right outer join, we need to scan every tuple in the hash
> table to find the ones that were not matched while probing, so that we

Given how you are using the word "so" here, I think that comma before it
is not needed.

> @@ -2083,58 +2079,45 @@ bool
>  ExecScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext *econtext)
>  {
>   HashJoinTable hashtable = hjstate->hj_HashTable;
> - HashJoinTuple hashTuple = hjstate->hj_CurTuple;
> + HashMemoryChunk chunk;
>  
> - for (;;)
> + while ((chunk = hashtable->unmatched_scan_chunk))
>   {
> - /*
> -  * hj_CurTuple is the address of the tuple last returned from 
> the
> -  * current bucket, or NULL if it's time to start scanning a new
> -  * bucket.
> -  */
> - if (hashTuple != NULL)
> - hashTuple = hashTuple->next.unshared;
> - else if (hjstate->hj_CurBucketNo < hashtable->nbuckets)
> - {
> - hashTuple = 
> hashtable->buckets.unshared[hjstate->hj_CurBucketNo];
> - hjstate->hj_CurBucketNo++;
> - }
> - else if (hjstate->hj_CurSkewBucketNo < hashtable->nSkewBuckets)
> + while (hashtable->unmatched_scan_idx < chunk->used)
>   {
> - int j = 
> hashtable->skewBucketNums[hjstate->hj_CurSkewBucketNo];
> + HashJoinTuple hashTuple = (HashJoinTuple)
> + (HASH_CHUNK_DATA(hashtable->unmatched_scan_chunk) +
> +  hashtable->unmatched_scan_idx);
>  
> - hashTuple = hashtable->skewBucket[j]->tuples;
> - hjstate->hj_CurSkewBucketNo++;
> - }
> - else
> - break;  /* finished all buckets 
> */
> + MinimalTuple tuple = HJTUPLE_MINTUPLE(hashTuple);
> + int hashTupleSize = 
> (HJTUPLE_OVERHEAD + tuple->t_len);
>  
> - while (hashTuple != NULL)
> - {
> - if 
> (!HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(hashTuple)))
> - {
> - TupleTableSlot *inntuple;
> + /* next tuple in this chunk */
> + hashtable->unmatched_scan_idx += 
> MAXALIGN(hashTupleSize);
>  
> - /* insert hashtable's tuple into exec slot */
> - inntuple = 
> ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple),
> - 
>  hjstate->hj_HashTupleSlot,
> - 
>  false);/* do not pfree */
> - econtext->ecxt_innertuple = inntuple;
> + if 
> (HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(hashTuple)))
> + continue;
>  
> - /*
> -  * Reset temp memory each time; although this 
> function doesn't
> -

Re: Parallel Full Hash Join

2023-03-24 Thread Thomas Munro
Here is a rebased and lightly hacked-upon version that I'm testing.

0001-Scan-for-unmatched-hash-join-tuples-in-memory-order.patch

 * this change can stand on its own, separately from any PHJ changes
 * renamed hashtable->current_chunk[_idx] to unmatched_scan_{chunk,idx}
 * introduced a local variable to avoid some x->y->z stuff
 * removed some references to no-longer-relevant hj_XXX variables in
the Prep function

I haven't attempted to prove anything about the performance of this
one yet, but it seems fairly obvious that it can't be worse than what
we're doing today.  I have suppressed the urge to look into improving
locality and software prefetching.

0002-Parallel-Hash-Full-Join.patch

 * reuse the same umatched_scan_{chunk,idx} variables as above
 * rename the list of chunks to scan to work_queue
 * fix race/memory leak if we see PHJ_BATCH_SCAN when we attach (it
wasn't OK to just fall through)

That "work queue" name/concept already exists in other places that
need to process every chunk, namely rebucketing and repartitioning.
In later work, I'd like to harmonise these work queues, but I'm not
trying to increase the size of this patch set at this time, I just
want to use consistent naming.

I don't love the way that both ExecHashTableDetachBatch() and
ExecParallelPrepHashTableForUnmatched() duplicate logic relating to
the _SCAN/_FREE protocol, but I'm struggling to find a better idea.
Perhaps I just need more coffee.

I think your idea of opportunistically joining the scan if it's
already running makes sense to explore for a later step, ie to make
multi-batch PHFJ fully fair, and I think that should be a fairly easy
code change, and I put in some comments where changes would be needed.

Continuing to test, more soon.
From 8b526377eb4a4685628624e75743aedf37dd5bfe Mon Sep 17 00:00:00 2001
From: Thomas Munro 
Date: Fri, 24 Mar 2023 14:19:07 +1300
Subject: [PATCH v12 1/2] Scan for unmatched hash join tuples in memory order.

In a full/right outer join, we need to scan every tuple in the hash
table to find the ones that were not matched while probing, so that we
can emit null-extended inner tuples.  Previously we did that in hash
table bucket order, which means that we dereferenced pointers to tuples
that were randomly scattered in memory (ie in an order derived from the
hash of the join key).

Change to a memory-order scan, using the linked list of memory chunks
that hold the tuples.  This isn't really being done for performance
reasons (a subject for later work), but it certainly can't be worse than
the previous random order.  The goal here is to harmonize the scan logic
with later work that will parallelize full joins, and works in chunks.

Author: Melanie Plageman 
Reviewed-by: Thomas Munro 
Discussion: https://postgr.es/m/CA%2BhUKG%2BA6ftXPz4oe92%2Bx8Er%2BxpGZqto70-Q_ERwRaSyA%3DafNg%40mail.gmail.com
---
 src/backend/executor/nodeHash.c | 85 +
 src/include/executor/hashjoin.h |  4 ++
 2 files changed, 38 insertions(+), 51 deletions(-)

diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c
index 748c9b0024..91fd806c97 100644
--- a/src/backend/executor/nodeHash.c
+++ b/src/backend/executor/nodeHash.c
@@ -517,6 +517,8 @@ ExecHashTableCreate(HashState *state, List *hashOperators, List *hashCollations,
 		hashtable->spaceAllowed * SKEW_HASH_MEM_PERCENT / 100;
 	hashtable->chunks = NULL;
 	hashtable->current_chunk = NULL;
+	hashtable->unmatched_scan_chunk = NULL;
+	hashtable->unmatched_scan_idx = 0;
 	hashtable->parallel_state = state->parallel_state;
 	hashtable->area = state->ps.state->es_query_dsa;
 	hashtable->batches = NULL;
@@ -2058,16 +2060,10 @@ ExecParallelScanHashBucket(HashJoinState *hjstate,
 void
 ExecPrepHashTableForUnmatched(HashJoinState *hjstate)
 {
-	/*--
-	 * During this scan we use the HashJoinState fields as follows:
-	 *
-	 * hj_CurBucketNo: next regular bucket to scan
-	 * hj_CurSkewBucketNo: next skew bucket (an index into skewBucketNums)
-	 * hj_CurTuple: last tuple returned, or NULL to start next bucket
-	 *--
-	 */
-	hjstate->hj_CurBucketNo = 0;
-	hjstate->hj_CurSkewBucketNo = 0;
+	HashJoinTable hashtable = hjstate->hj_HashTable;
+
+	hashtable->unmatched_scan_chunk = hashtable->chunks;
+	hashtable->unmatched_scan_idx = 0;
 	hjstate->hj_CurTuple = NULL;
 }
 
@@ -2083,58 +2079,45 @@ bool
 ExecScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext *econtext)
 {
 	HashJoinTable hashtable = hjstate->hj_HashTable;
-	HashJoinTuple hashTuple = hjstate->hj_CurTuple;
+	HashMemoryChunk chunk;
 
-	for (;;)
+	while ((chunk = hashtable->unmatched_scan_chunk))
 	{
-		/*
-		 * hj_CurTuple is the address of the tuple last returned from the
-		 * current bucket, or NULL if it's time to start scanning a new
-		 * bucket.
-		 */
-		if (hashTuple != NULL)
-			hashTuple = hashTuple->next.unshared;
-		else if (hjstate->hj_CurBucketNo < hashtable->nbuckets)
-		{
-			hashTuple = 

Re: Parallel Full Hash Join

2022-11-16 Thread Thomas Munro
On Thu, Nov 17, 2022 at 5:22 PM Ian Lawrence Barwick  wrote:
> This patch is marked as "Waiting for Committer" in the current commitfest [1]
> with yourself as committer; do you have any plans to move ahead with this?

Yeah, sorry for lack of progress.  Aiming to get this in shortly.




Re: Parallel Full Hash Join

2022-11-16 Thread Ian Lawrence Barwick
2022年4月8日(金) 20:30 Thomas Munro :
>
> On Wed, Jan 12, 2022 at 10:30 AM Melanie Plageman
>  wrote:
> > On Fri, Nov 26, 2021 at 3:11 PM Thomas Munro  wrote:
> > > #3 0x009cf57e in ExceptionalCondition (conditionName=0x29cae8
> > > "BarrierParticipants(>shared->batch_barrier) == 1",
> > > errorType=, fileName=0x2ae561 "nodeHash.c",
> > > lineNumber=lineNumber@entry=2224) at assert.c:69
> > > No locals.
> > > #4 0x0071575e in ExecParallelScanHashTableForUnmatched
> > > (hjstate=hjstate@entry=0x80a60a3c8,
> > > econtext=econtext@entry=0x80a60ae98) at nodeHash.c:2224
> >
> > I believe this assert can be safely removed.
>
> Agreed.
>
> I was looking at this with a view to committing it, but I need more
> time.  This will be at the front of my queue when the tree reopens.
> I'm trying to find the tooling I had somewhere that could let you test
> attaching and detaching at every phase.
>
> The attached version is just pgindent'd.

Hi Thomas

This patch is marked as "Waiting for Committer" in the current commitfest [1]
with yourself as committer; do you have any plans to move ahead with this?

[1] https://commitfest.postgresql.org/40/2903/

Regards

Ian Barwick




Re: Parallel Full Hash Join

2022-04-08 Thread Thomas Munro
On Wed, Jan 12, 2022 at 10:30 AM Melanie Plageman
 wrote:
> On Fri, Nov 26, 2021 at 3:11 PM Thomas Munro  wrote:
> > #3 0x009cf57e in ExceptionalCondition (conditionName=0x29cae8
> > "BarrierParticipants(>shared->batch_barrier) == 1",
> > errorType=, fileName=0x2ae561 "nodeHash.c",
> > lineNumber=lineNumber@entry=2224) at assert.c:69
> > No locals.
> > #4 0x0071575e in ExecParallelScanHashTableForUnmatched
> > (hjstate=hjstate@entry=0x80a60a3c8,
> > econtext=econtext@entry=0x80a60ae98) at nodeHash.c:2224
>
> I believe this assert can be safely removed.

Agreed.

I was looking at this with a view to committing it, but I need more
time.  This will be at the front of my queue when the tree reopens.
I'm trying to find the tooling I had somewhere that could let you test
attaching and detaching at every phase.

The attached version is just pgindent'd.
From e7453cae9b2a686d57f967fd41533546d463dd0c Mon Sep 17 00:00:00 2001
From: Thomas Munro 
Date: Fri, 2 Oct 2020 15:53:44 +1300
Subject: [PATCH v11 1/3] Fix race condition in parallel hash join batch
 cleanup.

With unlucky timing and parallel_leader_participation off, PHJ could
attempt to access per-batch state just as it was being freed.  There was
code intended to prevent that by checking for a cleared pointer, but it
was racy.  Fix, by introducing an extra barrier phase.  The new phase
PHJ_BUILD_RUNNING means that it's safe to access the per-batch state to
find a batch to help with, and PHJ_BUILD_DONE means that it is too late.
The last to detach will free the array of per-batch state as before, but
now it will also atomically advance the phase at the same time, so that
late attachers can avoid the hazard.  This mirrors the way per-batch
hash tables are freed (see phases PHJ_BATCH_PROBING and PHJ_BATCH_DONE).

The build barrier must make it to PHJ_BUILD_DONE before shared resources
can be safely destroyed. This works fine in most cases with the addition
of another synchronization point. However, when the inner side is empty,
the build barrier will only make it to PHJ_BUILD_HASHING_INNER before
workers attempt to detach from the hashtable. In the case of the empty
inner optimization, advance the build barrier to PHJ_BUILD_RUNNING
before proceeding to cleanup. See batch 0 batch barrier fast forward in
ExecParallelHashJoinSetUpBatches() for precedent.

Revealed by a build farm failure, where BarrierAttach() failed a sanity
check assertion, because the memory had been clobbered by dsa_free().

This should eventually be back-patched to all supported releases, but
the failure is rare and the previous attempt at this was reverted, so
let's do this in master only for now, ahead of some other changes that
will move things around a bit.

Author: Thomas Munro 
Author: Melanie Plageman 
Reported-by: Michael Paquier 
Discussion: https://postgr.es/m/20200929061142.GA29096%40paquier.xyz
---
 src/backend/executor/nodeHash.c | 49 +-
 src/backend/executor/nodeHashjoin.c | 54 -
 src/include/executor/hashjoin.h |  3 +-
 3 files changed, 73 insertions(+), 33 deletions(-)

diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c
index 3510a4247c..d7d1d77ed1 100644
--- a/src/backend/executor/nodeHash.c
+++ b/src/backend/executor/nodeHash.c
@@ -333,14 +333,21 @@ MultiExecParallelHash(HashState *node)
 	hashtable->nbuckets = pstate->nbuckets;
 	hashtable->log2_nbuckets = my_log2(hashtable->nbuckets);
 	hashtable->totalTuples = pstate->total_tuples;
-	ExecParallelHashEnsureBatchAccessors(hashtable);
+
+	/*
+	 * Unless we're completely done and the batch state has been freed, make
+	 * sure we have accessors.
+	 */
+	if (BarrierPhase(build_barrier) < PHJ_BUILD_DONE)
+		ExecParallelHashEnsureBatchAccessors(hashtable);
 
 	/*
 	 * The next synchronization point is in ExecHashJoin's HJ_BUILD_HASHTABLE
-	 * case, which will bring the build phase to PHJ_BUILD_DONE (if it isn't
-	 * there already).
+	 * case, which will bring the build phase to PHJ_BUILD_RUNNING (if it
+	 * isn't there already).
 	 */
 	Assert(BarrierPhase(build_barrier) == PHJ_BUILD_HASHING_OUTER ||
+		   BarrierPhase(build_barrier) == PHJ_BUILD_RUNNING ||
 		   BarrierPhase(build_barrier) == PHJ_BUILD_DONE);
 }
 
@@ -624,7 +631,7 @@ ExecHashTableCreate(HashState *state, List *hashOperators, List *hashCollations,
 		/*
 		 * The next Parallel Hash synchronization point is in
 		 * MultiExecParallelHash(), which will progress it all the way to
-		 * PHJ_BUILD_DONE.  The caller must not return control from this
+		 * PHJ_BUILD_RUNNING.  The caller must not return control from this
 		 * executor node between now and then.
 		 */
 	}
@@ -3065,14 +3072,11 @@ ExecParallelHashEnsureBatchAccessors(HashJoinTable hashtable)
 	}
 
 	/*
-	 * It's possible for a backend to start up very late so that the whole
-	 * join is finished and the shm state for tracking batches has already
-	 * been freed by ExecHashTableDetach().  In that case 

Re: Parallel Full Hash Join

2022-01-11 Thread Melanie Plageman
On Fri, Nov 26, 2021 at 3:11 PM Thomas Munro  wrote:
>
> On Sun, Nov 21, 2021 at 4:48 PM Justin Pryzby  wrote:
> > On Wed, Nov 17, 2021 at 01:45:06PM -0500, Melanie Plageman wrote:
> > > Yes, this looks like that issue.
> > >
> > > I've attached a v8 set with the fix I suggested in [1] included.
> > > (I added it to 0001).
> >
> > This is still crashing :(
> > https://cirrus-ci.com/task/6738329224871936
> > https://cirrus-ci.com/task/4895130286030848
>
> I added a core file backtrace to cfbot's CI recipe a few days ago, so
> now we have:
>
> https://cirrus-ci.com/task/5676480098205696
>
> #3 0x009cf57e in ExceptionalCondition (conditionName=0x29cae8
> "BarrierParticipants(>shared->batch_barrier) == 1",
> errorType=, fileName=0x2ae561 "nodeHash.c",
> lineNumber=lineNumber@entry=2224) at assert.c:69
> No locals.
> #4 0x0071575e in ExecParallelScanHashTableForUnmatched
> (hjstate=hjstate@entry=0x80a60a3c8,
> econtext=econtext@entry=0x80a60ae98) at nodeHash.c:2224

I believe this assert can be safely removed.

It is possible for a worker to attach to the batch barrier after the
"last" worker was elected to scan and emit unmatched inner tuples. This
is safe because the batch barrier is already in phase PHJ_BATCH_SCAN
and this newly attached worker will simply detach from the batch
barrier and look for a new batch to work on.

The order of events would be as follows:

W1: advances batch to PHJ_BATCH_SCAN
W2: attaches to batch barrier in ExecParallelHashJoinNewBatch()
W1: calls ExecParallelScanHashTableForUnmatched() (2 workers attached to
barrier at this point)
W2: detaches from the batch barrier

The attached v10 patch removes this assert and updates the comment in
ExecParallelScanHashTableForUnmatched().

I'm not sure if I should add more detail about this scenario in
ExecParallelHashJoinNewBatch() under PHJ_BATCH_SCAN or if the detail in
ExecParallelScanHashTableForUnmatched() is sufficient.

- Melanie
From 998a1cc5bbbe26ce4c0bf23be22867fd1635e2c6 Mon Sep 17 00:00:00 2001
From: Thomas Munro 
Date: Sat, 6 Mar 2021 12:06:16 +1300
Subject: [PATCH v10 2/3] Improve the naming of Parallel Hash Join phases.

Commit 3048898e dropped -ING from some wait event names.  Update the
corresponding barrier phases names to match.

While we're here making cosmetic changes, also rename "DONE" to "FREE".
That pairs better with "ALLOCATE", and describes the activity that
actually happens in that phase (as we do for the other phases) rather
than describing a state.  As for the growth barriers, rename their
"ALLOCATE" phase to "REALLOCATE", which is probably a better description
of what happens then.
---
 src/backend/executor/nodeHash.c | 68 +-
 src/backend/executor/nodeHashjoin.c | 91 +
 src/backend/utils/activity/wait_event.c |  8 +--
 src/include/executor/hashjoin.h | 38 +--
 src/include/utils/wait_event.h  |  4 +-
 5 files changed, 106 insertions(+), 103 deletions(-)

diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c
index 049d718425..4e233f07c4 100644
--- a/src/backend/executor/nodeHash.c
+++ b/src/backend/executor/nodeHash.c
@@ -246,10 +246,10 @@ MultiExecParallelHash(HashState *node)
 	 */
 	pstate = hashtable->parallel_state;
 	build_barrier = >build_barrier;
-	Assert(BarrierPhase(build_barrier) >= PHJ_BUILD_ALLOCATING);
+	Assert(BarrierPhase(build_barrier) >= PHJ_BUILD_ALLOCATE);
 	switch (BarrierPhase(build_barrier))
 	{
-		case PHJ_BUILD_ALLOCATING:
+		case PHJ_BUILD_ALLOCATE:
 
 			/*
 			 * Either I just allocated the initial hash table in
@@ -259,7 +259,7 @@ MultiExecParallelHash(HashState *node)
 			BarrierArriveAndWait(build_barrier, WAIT_EVENT_HASH_BUILD_ALLOCATE);
 			/* Fall through. */
 
-		case PHJ_BUILD_HASHING_INNER:
+		case PHJ_BUILD_HASH_INNER:
 
 			/*
 			 * It's time to begin hashing, or if we just arrived here then
@@ -271,10 +271,10 @@ MultiExecParallelHash(HashState *node)
 			 * below.
 			 */
 			if (PHJ_GROW_BATCHES_PHASE(BarrierAttach(>grow_batches_barrier)) !=
-PHJ_GROW_BATCHES_ELECTING)
+PHJ_GROW_BATCHES_ELECT)
 ExecParallelHashIncreaseNumBatches(hashtable);
 			if (PHJ_GROW_BUCKETS_PHASE(BarrierAttach(>grow_buckets_barrier)) !=
-PHJ_GROW_BUCKETS_ELECTING)
+PHJ_GROW_BUCKETS_ELECT)
 ExecParallelHashIncreaseNumBuckets(hashtable);
 			ExecParallelHashEnsureBatchAccessors(hashtable);
 			ExecParallelHashTableSetCurrentBatch(hashtable, 0);
@@ -338,17 +338,17 @@ MultiExecParallelHash(HashState *node)
 	 * Unless we're completely done and the batch state has been freed, make
 	 * sure we have accessors.
 	 */
-	if (BarrierPhase(build_barrier) < PHJ_BUILD_DONE)
+	if (BarrierPhase(build_barrier) < PHJ_BUILD_FREE)
 		ExecParallelHashEnsureBatchAccessors(hashtable);
 
 	/*
 	 * The next synchronization point is in ExecHashJoin's HJ_BUILD_HASHTABLE
-	 * case, which will bring the build phase to PHJ_BUILD_RUNNING (if it isn't
+	 * case, which will bring the build 

Re: Parallel Full Hash Join

2021-11-26 Thread Thomas Munro
On Sun, Nov 21, 2021 at 4:48 PM Justin Pryzby  wrote:
> On Wed, Nov 17, 2021 at 01:45:06PM -0500, Melanie Plageman wrote:
> > Yes, this looks like that issue.
> >
> > I've attached a v8 set with the fix I suggested in [1] included.
> > (I added it to 0001).
>
> This is still crashing :(
> https://cirrus-ci.com/task/6738329224871936
> https://cirrus-ci.com/task/4895130286030848

I added a core file backtrace to cfbot's CI recipe a few days ago, so
now we have:

https://cirrus-ci.com/task/5676480098205696

#3 0x009cf57e in ExceptionalCondition (conditionName=0x29cae8
"BarrierParticipants(>shared->batch_barrier) == 1",
errorType=, fileName=0x2ae561 "nodeHash.c",
lineNumber=lineNumber@entry=2224) at assert.c:69
No locals.
#4 0x0071575e in ExecParallelScanHashTableForUnmatched
(hjstate=hjstate@entry=0x80a60a3c8,
econtext=econtext@entry=0x80a60ae98) at nodeHash.c:2224




Re: Parallel Full Hash Join

2021-11-20 Thread Justin Pryzby
On Wed, Nov 17, 2021 at 01:45:06PM -0500, Melanie Plageman wrote:
> On Sat, Nov 6, 2021 at 11:04 PM Justin Pryzby  wrote:
> >
> > > Rebased patches attached. I will change status back to "Ready for 
> > > Committer"
> >
> > The CI showed a crash on freebsd, which I reproduced.
> > https://cirrus-ci.com/task/5203060415791104
> >
> > The crash is evidenced in 0001 - but only ~15% of the time.
> >
> > I think it's the same thing which was committed and then reverted here, so
> > maybe I'm not saying anything new.
> >
> > https://commitfest.postgresql.org/33/3031/
> > https://www.postgresql.org/message-id/flat/20200929061142.ga29...@paquier.xyz
> 
> Yes, this looks like that issue.
> 
> I've attached a v8 set with the fix I suggested in [1] included.
> (I added it to 0001).

This is still crashing :(
https://cirrus-ci.com/task/6738329224871936
https://cirrus-ci.com/task/4895130286030848

-- 
Justin




Re: Parallel Full Hash Join

2021-11-17 Thread Melanie Plageman
small mistake in v8.
v9 attached.

- Melanie


v9-0002-Improve-the-naming-of-Parallel-Hash-Join-phases.patch
Description: Binary data


v9-0003-Parallel-Hash-Full-Right-Outer-Join.patch
Description: Binary data


v9-0001-Fix-race-condition-in-parallel-hash-join-batch-cl.patch
Description: Binary data


Re: Parallel Full Hash Join

2021-11-17 Thread Melanie Plageman
On Sat, Nov 6, 2021 at 11:04 PM Justin Pryzby  wrote:
>
> > Rebased patches attached. I will change status back to "Ready for Committer"
>
> The CI showed a crash on freebsd, which I reproduced.
> https://cirrus-ci.com/task/5203060415791104
>
> The crash is evidenced in 0001 - but only ~15% of the time.
>
> I think it's the same thing which was committed and then reverted here, so
> maybe I'm not saying anything new.
>
> https://commitfest.postgresql.org/33/3031/
> https://www.postgresql.org/message-id/flat/20200929061142.ga29...@paquier.xyz
>
> (gdb) p pstate->build_barrier->phase
> Cannot access memory at address 0x7f82e0fa42f4
>
> #1  0x7f13de34f801 in __GI_abort () at abort.c:79
> #2  0x5638e6a16d28 in ExceptionalCondition 
> (conditionName=conditionName@entry=0x5638e6b62850 "!pstate || 
> BarrierPhase(>build_barrier) >= PHJ_BUILD_RUN",
> errorType=errorType@entry=0x5638e6a6f00b "FailedAssertion", 
> fileName=fileName@entry=0x5638e6b625be "nodeHash.c", 
> lineNumber=lineNumber@entry=3305) at assert.c:69
> #3  0x5638e678085b in ExecHashTableDetach (hashtable=0x5638e8e6ca88) at 
> nodeHash.c:3305
> #4  0x5638e6784656 in ExecShutdownHashJoin 
> (node=node@entry=0x5638e8e57cb8) at nodeHashjoin.c:1400
> #5  0x5638e67666d8 in ExecShutdownNode (node=0x5638e8e57cb8) at 
> execProcnode.c:812
> #6  ExecShutdownNode (node=0x5638e8e57cb8) at execProcnode.c:772
> #7  0x5638e67cd5b1 in planstate_tree_walker 
> (planstate=planstate@entry=0x5638e8e58580, walker=walker@entry=0x5638e6766680 
> , context=context@entry=0x0) at nodeFuncs.c:4009
> #8  0x5638e67666b2 in ExecShutdownNode (node=0x5638e8e58580) at 
> execProcnode.c:792
> #9  ExecShutdownNode (node=0x5638e8e58580) at execProcnode.c:772
> #10 0x5638e67cd5b1 in planstate_tree_walker 
> (planstate=planstate@entry=0x5638e8e58418, walker=walker@entry=0x5638e6766680 
> , context=context@entry=0x0) at nodeFuncs.c:4009
> #11 0x5638e67666b2 in ExecShutdownNode (node=0x5638e8e58418) at 
> execProcnode.c:792
> #12 ExecShutdownNode (node=node@entry=0x5638e8e58418) at execProcnode.c:772
> #13 0x5638e675f518 in ExecutePlan (execute_once=, 
> dest=0x5638e8df0058, direction=, numberTuples=0, 
> sendTuples=, operation=CMD_SELECT,
> use_parallel_mode=, planstate=0x5638e8e58418, 
> estate=0x5638e8e57a10) at execMain.c:1658
> #14 standard_ExecutorRun () at execMain.c:410
> #15 0x5638e6763e0a in ParallelQueryMain (seg=0x5638e8d823d8, 
> toc=0x7f13df4e9000) at execParallel.c:1493
> #16 0x5638e663f6c7 in ParallelWorkerMain () at parallel.c:1495
> #17 0x5638e68542e4 in StartBackgroundWorker () at bgworker.c:858
> #18 0x5638e6860f53 in do_start_bgworker (rw=) at 
> postmaster.c:5883
> #19 maybe_start_bgworkers () at postmaster.c:6108
> #20 0x5638e68619e5 in sigusr1_handler (postgres_signal_arg= out>) at postmaster.c:5272
> #21 
> #22 0x7f13de425ff7 in __GI___select (nfds=nfds@entry=7, 
> readfds=readfds@entry=0x7ffef03b8400, writefds=writefds@entry=0x0, 
> exceptfds=exceptfds@entry=0x0, timeout=timeout@entry=0x7ffef03b8360)
> at ../sysdeps/unix/sysv/linux/select.c:41
> #23 0x5638e68620ce in ServerLoop () at postmaster.c:1765
> #24 0x5638e6863bcc in PostmasterMain () at postmaster.c:1473
> #25 0x5638e658fd00 in main (argc=8, argv=0x5638e8d54730) at main.c:198

Yes, this looks like that issue.

I've attached a v8 set with the fix I suggested in [1] included.
(I added it to 0001).

- Melanie

[1] 
https://www.postgresql.org/message-id/flat/20200929061142.GA29096%40paquier.xyz


v8-0003-Parallel-Hash-Full-Right-Outer-Join.patch
Description: Binary data


v8-0002-Improve-the-naming-of-Parallel-Hash-Join-phases.patch
Description: Binary data


v8-0001-Fix-race-condition-in-parallel-hash-join-batch-cl.patch
Description: Binary data


Re: Parallel Full Hash Join

2021-11-06 Thread Justin Pryzby
> Rebased patches attached. I will change status back to "Ready for Committer"

The CI showed a crash on freebsd, which I reproduced.
https://cirrus-ci.com/task/5203060415791104

The crash is evidenced in 0001 - but only ~15% of the time.

I think it's the same thing which was committed and then reverted here, so
maybe I'm not saying anything new.

https://commitfest.postgresql.org/33/3031/
https://www.postgresql.org/message-id/flat/20200929061142.ga29...@paquier.xyz

(gdb) p pstate->build_barrier->phase 
Cannot access memory at address 0x7f82e0fa42f4

#1  0x7f13de34f801 in __GI_abort () at abort.c:79
#2  0x5638e6a16d28 in ExceptionalCondition 
(conditionName=conditionName@entry=0x5638e6b62850 "!pstate || 
BarrierPhase(>build_barrier) >= PHJ_BUILD_RUN",
errorType=errorType@entry=0x5638e6a6f00b "FailedAssertion", 
fileName=fileName@entry=0x5638e6b625be "nodeHash.c", 
lineNumber=lineNumber@entry=3305) at assert.c:69
#3  0x5638e678085b in ExecHashTableDetach (hashtable=0x5638e8e6ca88) at 
nodeHash.c:3305
#4  0x5638e6784656 in ExecShutdownHashJoin (node=node@entry=0x5638e8e57cb8) 
at nodeHashjoin.c:1400
#5  0x5638e67666d8 in ExecShutdownNode (node=0x5638e8e57cb8) at 
execProcnode.c:812
#6  ExecShutdownNode (node=0x5638e8e57cb8) at execProcnode.c:772
#7  0x5638e67cd5b1 in planstate_tree_walker 
(planstate=planstate@entry=0x5638e8e58580, walker=walker@entry=0x5638e6766680 
, context=context@entry=0x0) at nodeFuncs.c:4009
#8  0x5638e67666b2 in ExecShutdownNode (node=0x5638e8e58580) at 
execProcnode.c:792
#9  ExecShutdownNode (node=0x5638e8e58580) at execProcnode.c:772
#10 0x5638e67cd5b1 in planstate_tree_walker 
(planstate=planstate@entry=0x5638e8e58418, walker=walker@entry=0x5638e6766680 
, context=context@entry=0x0) at nodeFuncs.c:4009
#11 0x5638e67666b2 in ExecShutdownNode (node=0x5638e8e58418) at 
execProcnode.c:792
#12 ExecShutdownNode (node=node@entry=0x5638e8e58418) at execProcnode.c:772
#13 0x5638e675f518 in ExecutePlan (execute_once=, 
dest=0x5638e8df0058, direction=, numberTuples=0, 
sendTuples=, operation=CMD_SELECT,
use_parallel_mode=, planstate=0x5638e8e58418, 
estate=0x5638e8e57a10) at execMain.c:1658
#14 standard_ExecutorRun () at execMain.c:410
#15 0x5638e6763e0a in ParallelQueryMain (seg=0x5638e8d823d8, 
toc=0x7f13df4e9000) at execParallel.c:1493
#16 0x5638e663f6c7 in ParallelWorkerMain () at parallel.c:1495
#17 0x5638e68542e4 in StartBackgroundWorker () at bgworker.c:858
#18 0x5638e6860f53 in do_start_bgworker (rw=) at 
postmaster.c:5883
#19 maybe_start_bgworkers () at postmaster.c:6108
#20 0x5638e68619e5 in sigusr1_handler (postgres_signal_arg=) 
at postmaster.c:5272
#21 
#22 0x7f13de425ff7 in __GI___select (nfds=nfds@entry=7, 
readfds=readfds@entry=0x7ffef03b8400, writefds=writefds@entry=0x0, 
exceptfds=exceptfds@entry=0x0, timeout=timeout@entry=0x7ffef03b8360)
at ../sysdeps/unix/sysv/linux/select.c:41
#23 0x5638e68620ce in ServerLoop () at postmaster.c:1765
#24 0x5638e6863bcc in PostmasterMain () at postmaster.c:1473
#25 0x5638e658fd00 in main (argc=8, argv=0x5638e8d54730) at main.c:198




Re: Parallel Full Hash Join

2021-09-30 Thread Thomas Munro
On Tue, Sep 21, 2021 at 9:29 AM Jaime Casanova
 wrote:
> Do you intend to commit 0001 soon? Specially if this apply to 14 should
> be committed in the next days.

Thanks for the reminder.  Yes, I'm looking at this now, and looking
into the crash of this patch set on CI:

https://cirrus-ci.com/task/5282889613967360

Unfortunately, cfbot is using very simple and old CI rules which don't
have a core dump analysis step on that OS.  :-(  (I have a big upgrade
to all this CI stuff in the pipeline to fix that, get full access to
all logs, go faster, and many other improvements, after learning a lot
of tricks about running these types of systems over the past year --
more soon.)

> 0003: i'm testing this now, not at a big scale but just to try to find
> problems

Thanks!




Re: Parallel Full Hash Join

2021-09-20 Thread Jaime Casanova
On Fri, Jul 30, 2021 at 04:34:34PM -0400, Melanie Plageman wrote:
> On Sat, Jul 10, 2021 at 9:13 AM vignesh C  wrote:
> >
> > On Mon, May 31, 2021 at 10:47 AM Greg Nancarrow  wrote:
> > >
> > > On Sat, Mar 6, 2021 at 12:31 PM Thomas Munro  
> > > wrote:
> > > >
> > > > On Tue, Mar 2, 2021 at 11:27 PM Thomas Munro  
> > > > wrote:
> > > > > On Fri, Feb 12, 2021 at 11:02 AM Melanie Plageman
> > > > >  wrote:
> > > > > > I just attached the diff.
> > > > >
> > > > > Squashed into one patch for the cfbot to chew on, with a few minor
> > > > > adjustments to a few comments.
> > > >
> > > > I did some more minor tidying of comments and naming.  It's been on my
> > > > to-do-list to update some phase names after commit 3048898e, and while
> > > > doing that I couldn't resist the opportunity to change DONE to FREE,
> > > > which somehow hurts my brain less, and makes much more obvious sense
> > > > after the bugfix in CF #3031 that splits DONE into two separate
> > > > phases.  It also pairs obviously with ALLOCATE.  I include a copy of
> > > > that bugix here too as 0001, because I'll likely commit that first, so


Hi Thomas,

Do you intend to commit 0001 soon? Specially if this apply to 14 should
be committed in the next days.

> > > > I rebased the stack of patches that way.  0002 includes the renaming I
> > > > propose (master only).  Then 0003 is Melanie's patch, using the name
> > > > SCAN for the new match bit scan phase.  I've attached an updated
> > > > version of my "phase diagram" finger painting, to show how it looks
> > > > with these three patches.  "scan*" is new.
> > >

0002: my only concern is that this will cause innecesary pain in
backpatch-ing future code... but not doing that myself will let that to
the experts

0003: i'm testing this now, not at a big scale but just to try to find
problems

-- 
Jaime Casanova
Director de Servicios Profesionales
SystemGuards - Consultores de PostgreSQL




Re: Parallel Full Hash Join

2021-07-30 Thread Melanie Plageman
On Sat, Jul 10, 2021 at 9:13 AM vignesh C  wrote:
>
> On Mon, May 31, 2021 at 10:47 AM Greg Nancarrow  wrote:
> >
> > On Sat, Mar 6, 2021 at 12:31 PM Thomas Munro  wrote:
> > >
> > > On Tue, Mar 2, 2021 at 11:27 PM Thomas Munro  
> > > wrote:
> > > > On Fri, Feb 12, 2021 at 11:02 AM Melanie Plageman
> > > >  wrote:
> > > > > I just attached the diff.
> > > >
> > > > Squashed into one patch for the cfbot to chew on, with a few minor
> > > > adjustments to a few comments.
> > >
> > > I did some more minor tidying of comments and naming.  It's been on my
> > > to-do-list to update some phase names after commit 3048898e, and while
> > > doing that I couldn't resist the opportunity to change DONE to FREE,
> > > which somehow hurts my brain less, and makes much more obvious sense
> > > after the bugfix in CF #3031 that splits DONE into two separate
> > > phases.  It also pairs obviously with ALLOCATE.  I include a copy of
> > > that bugix here too as 0001, because I'll likely commit that first, so
> > > I rebased the stack of patches that way.  0002 includes the renaming I
> > > propose (master only).  Then 0003 is Melanie's patch, using the name
> > > SCAN for the new match bit scan phase.  I've attached an updated
> > > version of my "phase diagram" finger painting, to show how it looks
> > > with these three patches.  "scan*" is new.
> >
> > Patches 0002, 0003 no longer apply to the master branch, seemingly
> > because of subsequent changes to pgstat, so need rebasing.
>
> I am changing the status to "Waiting on Author" as the patch does not
> apply on Head.
>
> Regards,
> Vignesh
>
>

Rebased patches attached. I will change status back to "Ready for Committer"
From 954bc84ca79e217c216c0ed8160853c49c19b609 Mon Sep 17 00:00:00 2001
From: Melanie Plageman 
Date: Wed, 4 Nov 2020 14:25:33 -0800
Subject: [PATCH v7 3/3] Parallel Hash {Full,Right} Outer Join.

Previously, parallel full and right outer joins were not supported due
to a potential deadlock hazard (see discussion).

For now, sidestep the problem by terminating parallelism for the
unmatched inner tuple scan. The last process to arrive at the barrier
prepares for the unmatched inner tuple scan in HJ_NEED_NEW_OUTER and
transitions to HJ_FILL_INNER, scanning the hash table and emitting
unmatched inner tuples.  Other processes are free to go and work on
other batches, if there are any.

To make parallel and serial hash join more consistent, change the serial
version to scan match bits in tuple chunk order, instead of doing it in
hash table bucket order.

Author: Melanie Plageman 
Reviewed-by: Thomas Munro 
Discussion: https://postgr.es/m/CA%2BhUKG%2BA6ftXPz4oe92%2Bx8Er%2BxpGZqto70-Q_ERwRaSyA%3DafNg%40mail.gmail.com
---
 src/backend/executor/nodeHash.c | 205 ++--
 src/backend/executor/nodeHashjoin.c |  59 +++
 src/backend/optimizer/path/joinpath.c   |  14 +-
 src/include/executor/hashjoin.h |  15 +-
 src/include/executor/nodeHash.h |   3 +
 src/test/regress/expected/join_hash.out |  56 ++-
 src/test/regress/sql/join_hash.sql  |  23 ++-
 7 files changed, 283 insertions(+), 92 deletions(-)

diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c
index 3ba688e8e0..e7d420ee12 100644
--- a/src/backend/executor/nodeHash.c
+++ b/src/backend/executor/nodeHash.c
@@ -517,6 +517,7 @@ ExecHashTableCreate(HashState *state, List *hashOperators, List *hashCollations,
 		hashtable->spaceAllowed * SKEW_HASH_MEM_PERCENT / 100;
 	hashtable->chunks = NULL;
 	hashtable->current_chunk = NULL;
+	hashtable->current_chunk_idx = 0;
 	hashtable->parallel_state = state->parallel_state;
 	hashtable->area = state->ps.state->es_query_dsa;
 	hashtable->batches = NULL;
@@ -2070,16 +2071,72 @@ void
 ExecPrepHashTableForUnmatched(HashJoinState *hjstate)
 {
 	/*--
-	 * During this scan we use the HashJoinState fields as follows:
+	 * During this scan we use the HashJoinTable fields as follows:
 	 *
-	 * hj_CurBucketNo: next regular bucket to scan
-	 * hj_CurSkewBucketNo: next skew bucket (an index into skewBucketNums)
-	 * hj_CurTuple: last tuple returned, or NULL to start next bucket
+	 * current_chunk: current HashMemoryChunk to scan
+	 * current_chunk_idx: index in current HashMemoryChunk
 	 *--
 	 */
+	HashJoinTable hashtable = hjstate->hj_HashTable;
+
 	hjstate->hj_CurBucketNo = 0;
 	hjstate->hj_CurSkewBucketNo = 0;
 	hjstate->hj_CurTuple = NULL;
+	hashtable->current_chunk = hashtable->chunks;
+	hashtable->current_chunk_idx = 0;
+}
+
+/*
+ * ExecParallelPrepHashTableForUnmatched
+ *		set up for a series of ExecParallelScanHashTableForUnmatched calls
+ *		return true if this worker is elected to do the unmatched inner scan
+ */
+bool
+ExecParallelPrepHashTableForUnmatched(HashJoinState *hjstate)
+{
+	/*--
+	 * During this scan we use the ParallelHashJoinBatchAccessor fields for the
+	 * current batch as follows:
+	 *
+	 * current_chunk: current HashMemoryChunk to scan
+	 * 

Re: Parallel Full Hash Join

2021-07-10 Thread vignesh C
On Mon, May 31, 2021 at 10:47 AM Greg Nancarrow  wrote:
>
> On Sat, Mar 6, 2021 at 12:31 PM Thomas Munro  wrote:
> >
> > On Tue, Mar 2, 2021 at 11:27 PM Thomas Munro  wrote:
> > > On Fri, Feb 12, 2021 at 11:02 AM Melanie Plageman
> > >  wrote:
> > > > I just attached the diff.
> > >
> > > Squashed into one patch for the cfbot to chew on, with a few minor
> > > adjustments to a few comments.
> >
> > I did some more minor tidying of comments and naming.  It's been on my
> > to-do-list to update some phase names after commit 3048898e, and while
> > doing that I couldn't resist the opportunity to change DONE to FREE,
> > which somehow hurts my brain less, and makes much more obvious sense
> > after the bugfix in CF #3031 that splits DONE into two separate
> > phases.  It also pairs obviously with ALLOCATE.  I include a copy of
> > that bugix here too as 0001, because I'll likely commit that first, so
> > I rebased the stack of patches that way.  0002 includes the renaming I
> > propose (master only).  Then 0003 is Melanie's patch, using the name
> > SCAN for the new match bit scan phase.  I've attached an updated
> > version of my "phase diagram" finger painting, to show how it looks
> > with these three patches.  "scan*" is new.
>
> Patches 0002, 0003 no longer apply to the master branch, seemingly
> because of subsequent changes to pgstat, so need rebasing.

I am changing the status to "Waiting on Author" as the patch does not
apply on Head.

Regards,
Vignesh




Re: Parallel Full Hash Join

2021-05-30 Thread Greg Nancarrow
On Sat, Mar 6, 2021 at 12:31 PM Thomas Munro  wrote:
>
> On Tue, Mar 2, 2021 at 11:27 PM Thomas Munro  wrote:
> > On Fri, Feb 12, 2021 at 11:02 AM Melanie Plageman
> >  wrote:
> > > I just attached the diff.
> >
> > Squashed into one patch for the cfbot to chew on, with a few minor
> > adjustments to a few comments.
>
> I did some more minor tidying of comments and naming.  It's been on my
> to-do-list to update some phase names after commit 3048898e, and while
> doing that I couldn't resist the opportunity to change DONE to FREE,
> which somehow hurts my brain less, and makes much more obvious sense
> after the bugfix in CF #3031 that splits DONE into two separate
> phases.  It also pairs obviously with ALLOCATE.  I include a copy of
> that bugix here too as 0001, because I'll likely commit that first, so
> I rebased the stack of patches that way.  0002 includes the renaming I
> propose (master only).  Then 0003 is Melanie's patch, using the name
> SCAN for the new match bit scan phase.  I've attached an updated
> version of my "phase diagram" finger painting, to show how it looks
> with these three patches.  "scan*" is new.

Patches 0002, 0003 no longer apply to the master branch, seemingly
because of subsequent changes to pgstat, so need rebasing.

Regards,
Greg Nancarrow
Fujitsu Australia




Re: Parallel Full Hash Join

2021-04-06 Thread Zhihong Yu
On Tue, Apr 6, 2021 at 11:59 AM Melanie Plageman 
wrote:

> On Fri, Apr 2, 2021 at 3:06 PM Zhihong Yu  wrote:
> >
> > Hi,
> > For v6-0003-Parallel-Hash-Full-Right-Outer-Join.patch
> >
> > +* current_chunk_idx: index in current HashMemoryChunk
> >
> > The above comment seems to be better fit for
> ExecScanHashTableForUnmatched(), instead of
> ExecParallelPrepHashTableForUnmatched.
> > I wonder where current_chunk_idx should belong (considering the above
> comment and what the code does).
> >
> > +   while (hashtable->current_chunk_idx <
> hashtable->current_chunk->used)
> > ...
> > +   next = hashtable->current_chunk->next.unshared;
> > +   hashtable->current_chunk = next;
> > +   hashtable->current_chunk_idx = 0;
> >
> > Each time we advance to the next chunk, current_chunk_idx is reset. It
> seems current_chunk_idx can be placed inside chunk.
> > Maybe the consideration is that, with the current formation we save
> space by putting current_chunk_idx field at a higher level.
> > If that is the case, a comment should be added.
> >
>
> Thank you for the review. I think that moving the current_chunk_idx into
> the HashMemoryChunk would probably take up too much space.
>
> Other places that we loop through the tuples in the chunk, we are able
> to just keep a local idx, like here in
> ExecParallelHashIncreaseNumBuckets():
>
> case PHJ_GROW_BUCKETS_REINSERTING:
> ...
> while ((chunk = ExecParallelHashPopChunkQueue(hashtable,
> _s)))
> {
> size_tidx = 0;
>
> while (idx < chunk->used)
>
> but, since we cannot do that while also emitting tuples, I thought,
> let's just stash the index in the hashtable for use in serial hash join
> and the batch accessor for parallel hash join. A comment to this effect
> sounds good to me.
>

>From the way HashJoinTable is used, I don't have better idea w.r.t. the
location of current_chunk_idx.
It is not worth introducing another level of mapping between HashJoinTable
and the chunk index.

So the current formation is fine with additional comment
in ParallelHashJoinBatchAccessor (current comment doesn't explicitly
mention current_chunk_idx).

Cheers


Re: Parallel Full Hash Join

2021-04-06 Thread Melanie Plageman
On Fri, Apr 2, 2021 at 3:06 PM Zhihong Yu  wrote:
>
> Hi,
> For v6-0003-Parallel-Hash-Full-Right-Outer-Join.patch
>
> +* current_chunk_idx: index in current HashMemoryChunk
>
> The above comment seems to be better fit for ExecScanHashTableForUnmatched(), 
> instead of ExecParallelPrepHashTableForUnmatched.
> I wonder where current_chunk_idx should belong (considering the above comment 
> and what the code does).
>
> +   while (hashtable->current_chunk_idx < hashtable->current_chunk->used)
> ...
> +   next = hashtable->current_chunk->next.unshared;
> +   hashtable->current_chunk = next;
> +   hashtable->current_chunk_idx = 0;
>
> Each time we advance to the next chunk, current_chunk_idx is reset. It seems 
> current_chunk_idx can be placed inside chunk.
> Maybe the consideration is that, with the current formation we save space by 
> putting current_chunk_idx field at a higher level.
> If that is the case, a comment should be added.
>

Thank you for the review. I think that moving the current_chunk_idx into
the HashMemoryChunk would probably take up too much space.

Other places that we loop through the tuples in the chunk, we are able
to just keep a local idx, like here in
ExecParallelHashIncreaseNumBuckets():

case PHJ_GROW_BUCKETS_REINSERTING:
...
while ((chunk = ExecParallelHashPopChunkQueue(hashtable, _s)))
{
size_tidx = 0;

while (idx < chunk->used)

but, since we cannot do that while also emitting tuples, I thought,
let's just stash the index in the hashtable for use in serial hash join
and the batch accessor for parallel hash join. A comment to this effect
sounds good to me.




Re: Parallel Full Hash Join

2021-04-02 Thread Zhihong Yu
Hi,
For v6-0003-Parallel-Hash-Full-Right-Outer-Join.patch

+* current_chunk_idx: index in current HashMemoryChunk

The above comment seems to be better fit
for ExecScanHashTableForUnmatched(), instead
of ExecParallelPrepHashTableForUnmatched.
I wonder where current_chunk_idx should belong (considering the above
comment and what the code does).

+   while (hashtable->current_chunk_idx <
hashtable->current_chunk->used)
...
+   next = hashtable->current_chunk->next.unshared;
+   hashtable->current_chunk = next;
+   hashtable->current_chunk_idx = 0;

Each time we advance to the next chunk, current_chunk_idx is reset. It
seems current_chunk_idx can be placed inside chunk.
Maybe the consideration is that, with the current formation we save space
by putting current_chunk_idx field at a higher level.
If that is the case, a comment should be added.

Cheers

On Fri, Mar 5, 2021 at 5:31 PM Thomas Munro  wrote:

> On Tue, Mar 2, 2021 at 11:27 PM Thomas Munro 
> wrote:
> > On Fri, Feb 12, 2021 at 11:02 AM Melanie Plageman
> >  wrote:
> > > I just attached the diff.
> >
> > Squashed into one patch for the cfbot to chew on, with a few minor
> > adjustments to a few comments.
>
> I did some more minor tidying of comments and naming.  It's been on my
> to-do-list to update some phase names after commit 3048898e, and while
> doing that I couldn't resist the opportunity to change DONE to FREE,
> which somehow hurts my brain less, and makes much more obvious sense
> after the bugfix in CF #3031 that splits DONE into two separate
> phases.  It also pairs obviously with ALLOCATE.  I include a copy of
> that bugix here too as 0001, because I'll likely commit that first, so
> I rebased the stack of patches that way.  0002 includes the renaming I
> propose (master only).  Then 0003 is Melanie's patch, using the name
> SCAN for the new match bit scan phase.  I've attached an updated
> version of my "phase diagram" finger painting, to show how it looks
> with these three patches.  "scan*" is new.
>


Re: Parallel Full Hash Join

2021-04-02 Thread Melanie Plageman
On Fri, Mar 5, 2021 at 8:31 PM Thomas Munro  wrote:
>
> On Tue, Mar 2, 2021 at 11:27 PM Thomas Munro  wrote:
> > On Fri, Feb 12, 2021 at 11:02 AM Melanie Plageman
> >  wrote:
> > > I just attached the diff.
> >
> > Squashed into one patch for the cfbot to chew on, with a few minor
> > adjustments to a few comments.
>
> I did some more minor tidying of comments and naming.  It's been on my
> to-do-list to update some phase names after commit 3048898e, and while
> doing that I couldn't resist the opportunity to change DONE to FREE,
> which somehow hurts my brain less, and makes much more obvious sense
> after the bugfix in CF #3031 that splits DONE into two separate
> phases.  It also pairs obviously with ALLOCATE.  I include a copy of
> that bugix here too as 0001, because I'll likely commit that first, so
> I rebased the stack of patches that way.  0002 includes the renaming I
> propose (master only).  Then 0003 is Melanie's patch, using the name
> SCAN for the new match bit scan phase.  I've attached an updated
> version of my "phase diagram" finger painting, to show how it looks
> with these three patches.  "scan*" is new.

Feedback on
v6-0002-Improve-the-naming-of-Parallel-Hash-Join-phases.patch

I like renaming DONE to FREE and ALLOCATE TO REALLOCATE in the grow
barriers. FREE only makes sense for the Build barrier if you keep the
added PHJ_BUILD_RUN phase, though, I assume you would change this patch
if you decided not to add the new build barrier phase.

I like the addition of the asterisks to indicate a phase is executed by
a single arbitrary process. I was thinking, shall we add one of these to
HJ_FILL_INNER since it is only done by one process in parallel right and
full hash join? Maybe that's confusing because serial hash join uses
that state machine too, though. Maybe **? Maybe we should invent a
complicated symbolic language :)

One tiny, random, unimportant thing: The function prototype for
ExecParallelHashJoinPartitionOuter() calls its parameter "node" and, in
the definition, it is called "hjstate". This feels like a good patch to
throw in that tiny random change to make the name the same.

static void ExecParallelHashJoinPartitionOuter(HashJoinState *node);

static void
ExecParallelHashJoinPartitionOuter(HashJoinState *hjstate)




Re: Parallel Full Hash Join

2021-03-02 Thread Thomas Munro
On Fri, Feb 12, 2021 at 11:02 AM Melanie Plageman
 wrote:
> I just attached the diff.

Squashed into one patch for the cfbot to chew on, with a few minor
adjustments to a few comments.
From 87c74af25940b0fc85186b0defe6e21ea2324c28 Mon Sep 17 00:00:00 2001
From: Melanie Plageman 
Date: Wed, 4 Nov 2020 14:25:33 -0800
Subject: [PATCH v5] Parallel Hash {Full,Right} Outer Join.

Previously, parallel full and right outer joins were not supported due
to a potential deadlock hazard posed by allowing workers to wait on a
barrier after barrier participants have started emitting tuples. More
details on the deadlock hazard can be found in the referenced
discussion.

For now, sidestep the problem by terminating parallelism for the
unmatched inner tuple scan. The last process to arrive at the barrier
prepares for the unmatched inner tuple scan in HJ_NEED_NEW_OUTER and
transitions to HJ_FILL_INNER, scanning the hash table and emitting
unmatched inner tuples.

To align parallel and serial hash join, change
ExecScanHashTableForUnmatched() to also scan HashMemoryChunks for the
unmatched tuple scan instead of accessing tuples through the hash table
buckets.

Author: Melanie Plageman 
Author: Thomas Munro 
Discussion: https://postgr.es/m/CA%2BhUKG%2BA6ftXPz4oe92%2Bx8Er%2BxpGZqto70-Q_ERwRaSyA%3DafNg%40mail.gmail.com
---
 src/backend/executor/nodeHash.c | 205 ++--
 src/backend/executor/nodeHashjoin.c |  58 +++
 src/backend/optimizer/path/joinpath.c   |  14 +-
 src/include/executor/hashjoin.h |  15 +-
 src/include/executor/nodeHash.h |   3 +
 src/test/regress/expected/join_hash.out |  56 ++-
 src/test/regress/sql/join_hash.sql  |  23 ++-
 7 files changed, 283 insertions(+), 91 deletions(-)

diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c
index c5f2d1d22b..6305688efd 100644
--- a/src/backend/executor/nodeHash.c
+++ b/src/backend/executor/nodeHash.c
@@ -510,6 +510,7 @@ ExecHashTableCreate(HashState *state, List *hashOperators, List *hashCollations,
 		hashtable->spaceAllowed * SKEW_HASH_MEM_PERCENT / 100;
 	hashtable->chunks = NULL;
 	hashtable->current_chunk = NULL;
+	hashtable->current_chunk_idx = 0;
 	hashtable->parallel_state = state->parallel_state;
 	hashtable->area = state->ps.state->es_query_dsa;
 	hashtable->batches = NULL;
@@ -2046,16 +2047,72 @@ void
 ExecPrepHashTableForUnmatched(HashJoinState *hjstate)
 {
 	/*--
-	 * During this scan we use the HashJoinState fields as follows:
+	 * During this scan we use the HashJoinTable fields as follows:
 	 *
-	 * hj_CurBucketNo: next regular bucket to scan
-	 * hj_CurSkewBucketNo: next skew bucket (an index into skewBucketNums)
-	 * hj_CurTuple: last tuple returned, or NULL to start next bucket
+	 * current_chunk: current HashMemoryChunk to scan
+	 * current_chunk_idx: index in current HashMemoryChunk
 	 *--
 	 */
+	HashJoinTable hashtable = hjstate->hj_HashTable;
+
 	hjstate->hj_CurBucketNo = 0;
 	hjstate->hj_CurSkewBucketNo = 0;
 	hjstate->hj_CurTuple = NULL;
+	hashtable->current_chunk = hashtable->chunks;
+	hashtable->current_chunk_idx = 0;
+}
+
+/*
+ * ExecParallelPrepHashTableForUnmatched
+ *		set up for a series of ExecParallelScanHashTableForUnmatched calls
+ *		return true if this worker is elected to do the unmatched inner scan
+ */
+bool
+ExecParallelPrepHashTableForUnmatched(HashJoinState *hjstate)
+{
+	/*--
+	 * During this scan we use the ParallelHashJoinBatchAccessor fields for the
+	 * current batch as follows:
+	 *
+	 * current_chunk: current HashMemoryChunk to scan
+	 * current_chunk_idx: index in current HashMemoryChunk
+	 *--
+	 */
+	HashJoinTable hashtable = hjstate->hj_HashTable;
+	int			curbatch = hashtable->curbatch;
+	ParallelHashJoinBatchAccessor *batch_accessor = >batches[curbatch];
+	ParallelHashJoinBatch *batch = hashtable->batches[curbatch].shared;
+	bool		last = false;
+
+	hjstate->hj_CurBucketNo = 0;
+	hjstate->hj_CurSkewBucketNo = 0;
+	hjstate->hj_CurTuple = NULL;
+	if (curbatch < 0)
+		return false;
+	last = BarrierArriveAndDetachExceptLast(>batch_barrier);
+	if (!last)
+	{
+		hashtable->batches[hashtable->curbatch].done = true;
+		/* Make sure any temporary files are closed. */
+		sts_end_parallel_scan(hashtable->batches[curbatch].inner_tuples);
+		sts_end_parallel_scan(hashtable->batches[curbatch].outer_tuples);
+
+		/*
+		 * Track the largest batch we've been attached to.  Though each
+		 * backend might see a different subset of batches, explain.c will
+		 * scan the results from all backends to find the largest value.
+		 */
+		hashtable->spacePeak =
+			Max(hashtable->spacePeak, batch->size + sizeof(dsa_pointer_atomic) * hashtable->nbuckets);
+		hashtable->curbatch = -1;
+	}
+	else
+	{
+		batch_accessor->shared_chunk = batch->chunks;
+		batch_accessor->current_chunk = dsa_get_address(hashtable->area, batch_accessor->shared_chunk);
+		batch_accessor->current_chunk_idx = 0;
+	}
+	return last;
 }
 
 /*
@@ -2069,60 

Re: Parallel Full Hash Join

2021-02-11 Thread Melanie Plageman
On Tue, Dec 29, 2020 at 03:28:12PM +1300, Thomas Munro wrote:
> I had some feedback I meant to
> post in November but didn't get around to:
> 
>   *  PHJ_BATCH_PROBING-- all probe
> - *  PHJ_BATCH_DONE   -- end
> +
> + *  PHJ_BATCH_DONE   -- queries not requiring inner fill done
> + *  PHJ_BATCH_FILL_INNER_DONE -- inner fill completed, all queries done
> 
> Would it be better/tidier to keep _DONE as the final phase?  That is,
> to switch around these two final phases.  Or does that make it too
> hard to coordinate the detach-and-cleanup logic?

I updated this to use your suggestion. My rationale for having
PHJ_BATCH_DONE and then PHJ_BATCH_FILL_INNER_DONE was that, for a worker
attaching to the batch for the first time, it might be confusing that it
is in the PHJ_BATCH_FILL_INNER state (not the DONE state) and yet that
worker still just detaches and moves on. It didn't seem intuitive.
Anyway, I think that is all sort of confusing and unnecessary. I changed
it to PHJ_BATCH_FILLING_INNER -- then when a worker who hasn't ever been
attached to this batch before attaches, it will be in the
PHJ_BATCH_FILLING_INNER phase, which it cannot help with and it will
detach and move on.

> 
> +/*
> + * ExecPrepHashTableForUnmatched
> + * set up for a series of ExecScanHashTableForUnmatched calls
> + * return true if this worker is elected to do the
> unmatched inner scan
> + */
> +bool
> +ExecParallelPrepHashTableForUnmatched(HashJoinState *hjstate)
> 
> Comment name doesn't match function name.

Updated -- and a few other comment updates too.

I just attached the diff.
>From 213c36f9e125f52eb6731005d5dcdadca73a Mon Sep 17 00:00:00 2001
From: Melanie Plageman 
Date: Thu, 11 Feb 2021 16:31:37 -0500
Subject: [PATCH v4 2/2] Update comments and phase naming

---
 src/backend/executor/nodeHash.c | 19 +--
 src/backend/executor/nodeHashjoin.c |  4 ++--
 src/include/executor/hashjoin.h |  4 ++--
 3 files changed, 17 insertions(+), 10 deletions(-)

diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c
index dd8d12203a..6305688efd 100644
--- a/src/backend/executor/nodeHash.c
+++ b/src/backend/executor/nodeHash.c
@@ -2047,11 +2047,10 @@ void
 ExecPrepHashTableForUnmatched(HashJoinState *hjstate)
 {
 	/*--
-	 * During this scan we use the HashJoinState fields as follows:
+	 * During this scan we use the HashJoinTable fields as follows:
 	 *
-	 * hj_CurBucketNo: next regular bucket to scan
-	 * hj_CurSkewBucketNo: next skew bucket (an index into skewBucketNums)
-	 * hj_CurTuple: last tuple returned, or NULL to start next bucket
+	 * current_chunk: current HashMemoryChunk to scan
+	 * current_chunk_idx: index in current HashMemoryChunk
 	 *--
 	 */
 	HashJoinTable hashtable = hjstate->hj_HashTable;
@@ -2064,13 +2063,21 @@ ExecPrepHashTableForUnmatched(HashJoinState *hjstate)
 }
 
 /*
- * ExecPrepHashTableForUnmatched
- *		set up for a series of ExecScanHashTableForUnmatched calls
+ * ExecParallelPrepHashTableForUnmatched
+ *		set up for a series of ExecParallelScanHashTableForUnmatched calls
  *		return true if this worker is elected to do the unmatched inner scan
  */
 bool
 ExecParallelPrepHashTableForUnmatched(HashJoinState *hjstate)
 {
+	/*--
+	 * During this scan we use the ParallelHashJoinBatchAccessor fields for the
+	 * current batch as follows:
+	 *
+	 * current_chunk: current HashMemoryChunk to scan
+	 * current_chunk_idx: index in current HashMemoryChunk
+	 *--
+	 */
 	HashJoinTable hashtable = hjstate->hj_HashTable;
 	int			curbatch = hashtable->curbatch;
 	ParallelHashJoinBatchAccessor *batch_accessor = >batches[curbatch];
diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c
index 37b49369aa..40c483cd0c 100644
--- a/src/backend/executor/nodeHashjoin.c
+++ b/src/backend/executor/nodeHashjoin.c
@@ -1183,10 +1183,10 @@ ExecParallelHashJoinNewBatch(HashJoinState *hjstate)
 	ExecParallelHashTableSetCurrentBatch(hashtable, batchno);
 	sts_begin_parallel_scan(hashtable->batches[batchno].outer_tuples);
 	return true;
-case PHJ_BATCH_DONE:
+case PHJ_BATCH_FILLING_INNER:
 	/* Fall through. */
 
-case PHJ_BATCH_FILL_INNER_DONE:
+case PHJ_BATCH_DONE:
 
 	/*
 	 * Already done.  Detach and go around again (if any
diff --git a/src/include/executor/hashjoin.h b/src/include/executor/hashjoin.h
index 634a212142..66fea4ac58 100644
--- a/src/include/executor/hashjoin.h
+++ b/src/include/executor/hashjoin.h
@@ -274,8 +274,8 @@ typedef struct ParallelHashJoinState
 #define PHJ_BATCH_ALLOCATING			1
 #define PHJ_BATCH_LOADING2
 #define PHJ_BATCH_PROBING3
-#define PHJ_BATCH_DONE			4
-#define PHJ_BATCH_FILL_INNER_DONE		5
+#define PHJ_BATCH_FILLING_INNER			4
+#define PHJ_BATCH_DONE		5
 
 /* The phases of batch growth while hashing, for grow_batches_barrier. */
 #define PHJ_GROW_BATCHES_ELECTING		0
-- 
2.25.1



Re: Parallel Full Hash Join

2020-12-28 Thread Thomas Munro
On Mon, Dec 28, 2020 at 9:49 PM Masahiko Sawada  wrote:
> On Thu, Nov 5, 2020 at 7:34 AM Melanie Plageman
>  wrote:
> > I've attached a patch with the corrections I mentioned upthread.
> > I've gone ahead and run pgindent, though, I can't say that I'm very
> > happy with the result.
> >
> > I'm still not quite happy with the name
> > BarrierArriveAndDetachExceptLast(). It's so literal. As you said, there
> > probably isn't a nice name for this concept, since it is a function with
> > the purpose of terminating parallelism.
>
> You sent in your patch, v3-0001-Support-Parallel-FOJ-and-ROJ.patch to
> pgsql-hackers on Nov 5, but you did not post it to the next
> CommitFest[1].  If this was intentional, then you need to take no
> action.  However, if you want your patch to be reviewed as part of the
> upcoming CommitFest, then you need to add it yourself before
> 2021-01-01 AOE[2]. Also, rebasing to the current HEAD may be required
> as almost two months passed since when this patch is submitted. Thanks
> for your contributions.

Thanks for this reminder Sawada-san.  I had some feedback I meant to
post in November but didn't get around to:

+bool
+BarrierArriveAndDetachExceptLast(Barrier *barrier)

I committed this part (7888b099).  I've attached a rebase of the rest
of Melanie's v3 patch.

+WAIT_EVENT_HASH_BATCH_PROBE,

That new wait event isn't needed (we can't and don't wait).

  *  PHJ_BATCH_PROBING-- all probe
- *  PHJ_BATCH_DONE   -- end
+
+ *  PHJ_BATCH_DONE   -- queries not requiring inner fill done
+ *  PHJ_BATCH_FILL_INNER_DONE -- inner fill completed, all queries done

Would it be better/tidier to keep _DONE as the final phase?  That is,
to switch around these two final phases.  Or does that make it too
hard to coordinate the detach-and-cleanup logic?

+/*
+ * ExecPrepHashTableForUnmatched
+ * set up for a series of ExecScanHashTableForUnmatched calls
+ * return true if this worker is elected to do the
unmatched inner scan
+ */
+bool
+ExecParallelPrepHashTableForUnmatched(HashJoinState *hjstate)

Comment name doesn't match function name.
From 9199bfcfa84acbcfeb9a8d3c21962096c7ff645c Mon Sep 17 00:00:00 2001
From: Thomas Munro 
Date: Thu, 5 Nov 2020 16:20:24 +1300
Subject: [PATCH v4 1/2] Parallel Hash Full/Right Join.

Previously we allowed PHJ only for inner and left outer joins.
Share the hash table match bits between backends, so we can also also
handle full and right joins.  In order to do that without introducing
any deadlock risks, for now we drop down to a single process for the
unmatched scan at the end of each batch.  Other processes detach and
look for another batch to help with.  If there aren't any more batches,
they'll finish the hash join early, making work distribution suboptimal.
Improving that might require bigger executor changes.

Also switch the unmatched tuple scan to work in memory-chunk order,
rather than bucket order.  This prepares for potential later
improvements that would use chunks as parallel grain, and seems to be a
little cache-friendlier than the bucket-scan scheme scheme in the
meantime.

Author: Melanie Plageman 
Reviewed-by: Thomas Munro 
Discussion: https://postgr.es/m/CA%2BhUKG%2BA6ftXPz4oe92%2Bx8Er%2BxpGZqto70-Q_ERwRaSyA%3DafNg%40mail.gmail.com
---
 src/backend/executor/nodeHash.c | 190 ++--
 src/backend/executor/nodeHashjoin.c |  61 
 src/backend/optimizer/path/joinpath.c   |  14 +-
 src/include/executor/hashjoin.h |  15 +-
 src/include/executor/nodeHash.h |   3 +
 src/test/regress/expected/join_hash.out |  56 ++-
 src/test/regress/sql/join_hash.sql  |  23 ++-
 7 files changed, 274 insertions(+), 88 deletions(-)

diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c
index ea69eeb2a1..36cc752163 100644
--- a/src/backend/executor/nodeHash.c
+++ b/src/backend/executor/nodeHash.c
@@ -510,6 +510,7 @@ ExecHashTableCreate(HashState *state, List *hashOperators, List *hashCollations,
 		hashtable->spaceAllowed * SKEW_HASH_MEM_PERCENT / 100;
 	hashtable->chunks = NULL;
 	hashtable->current_chunk = NULL;
+	hashtable->current_chunk_idx = 0;
 	hashtable->parallel_state = state->parallel_state;
 	hashtable->area = state->ps.state->es_query_dsa;
 	hashtable->batches = NULL;
@@ -2053,9 +2054,58 @@ ExecPrepHashTableForUnmatched(HashJoinState *hjstate)
 	 * hj_CurTuple: last tuple returned, or NULL to start next bucket
 	 *--
 	 */
+	HashJoinTable hashtable = hjstate->hj_HashTable;
+
 	hjstate->hj_CurBucketNo = 0;
 	hjstate->hj_CurSkewBucketNo = 0;
 	hjstate->hj_CurTuple = NULL;
+	hashtable->current_chunk = hashtable->chunks;
+	hashtable->current_chunk_idx = 0;
+}
+
+/*
+ * ExecPrepHashTableForUnmatched
+ *		set up for a series of ExecScanHashTableForUnmatched calls
+ *		return true if this worker is elected to do the unmatched inner scan
+ */
+bool
+ExecParallelPrepHashTableForUnmatched(HashJoinState *hjstate)
+{
+	HashJoinTable 

Re: Parallel Full Hash Join

2020-12-28 Thread Masahiko Sawada
Hi Melanie,

On Thu, Nov 5, 2020 at 7:34 AM Melanie Plageman
 wrote:
>
> I've attached a patch with the corrections I mentioned upthread.
> I've gone ahead and run pgindent, though, I can't say that I'm very
> happy with the result.
>
> I'm still not quite happy with the name
> BarrierArriveAndDetachExceptLast(). It's so literal. As you said, there
> probably isn't a nice name for this concept, since it is a function with
> the purpose of terminating parallelism.

You sent in your patch, v3-0001-Support-Parallel-FOJ-and-ROJ.patch to
pgsql-hackers on Nov 5, but you did not post it to the next
CommitFest[1].  If this was intentional, then you need to take no
action.  However, if you want your patch to be reviewed as part of the
upcoming CommitFest, then you need to add it yourself before
2021-01-01 AOE[2]. Also, rebasing to the current HEAD may be required
as almost two months passed since when this patch is submitted. Thanks
for your contributions.

Regards,

[1] https://commitfest.postgresql.org/31/
[2] https://en.wikipedia.org/wiki/Anywhere_on_Earth

Regards,

-- 
Masahiko Sawada
EnterpriseDB:  https://www.enterprisedb.com/




Re: Parallel Full Hash Join

2020-11-04 Thread Melanie Plageman
I've attached a patch with the corrections I mentioned upthread.
I've gone ahead and run pgindent, though, I can't say that I'm very
happy with the result.

I'm still not quite happy with the name
BarrierArriveAndDetachExceptLast(). It's so literal. As you said, there
probably isn't a nice name for this concept, since it is a function with
the purpose of terminating parallelism.

Regards,
Melanie (Microsoft)
From 0b13b62a8aac071393ac65b19dc1ec86daa967f2 Mon Sep 17 00:00:00 2001
From: Melanie Plageman 
Date: Wed, 4 Nov 2020 14:25:33 -0800
Subject: [PATCH v3] Support Parallel FOJ and ROJ

Previously, parallel full and right outer join were not supported due to
a potential deadlock hazard posed by allowing workers to wait on a
barrier after barrier participants have started emitting tuples. More
details on the deadlock hazard can be found in the thread [1].

For now, sidestep the problem by terminating parallelism for the
unmatched inner tuple scan. The last worker to arrive at the barrier
preps for the unmatched inner tuple scan in HJ_NEED_NEW_OUTER and
transitions to HJ_FILL_INNER, scanning the hash table and emitting
unmatched inner tuples.

To align parallel and serial hash join, change
ExecScanHashTableForUnmatched() to also scan HashMemoryChunks for the
unmatched tuple scan instead of accessing tuples through the hash table
buckets.

[1] https://www.postgresql.org/message-id/flat/CA%2BhUKG%2BA6ftXPz4oe92%2Bx8Er%2BxpGZqto70-Q_ERwRaSyA%3DafNg%40mail.gmail.com
---
 src/backend/executor/nodeHash.c | 190 ++--
 src/backend/executor/nodeHashjoin.c |  61 
 src/backend/optimizer/path/joinpath.c   |  14 +-
 src/backend/postmaster/pgstat.c |   3 +
 src/backend/storage/ipc/barrier.c   |  21 +++
 src/include/executor/hashjoin.h |  15 +-
 src/include/executor/nodeHash.h |   3 +
 src/include/pgstat.h|   1 +
 src/include/storage/barrier.h   |   1 +
 src/test/regress/expected/join_hash.out |  56 ++-
 src/test/regress/sql/join_hash.sql  |  23 ++-
 11 files changed, 300 insertions(+), 88 deletions(-)

diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c
index ea69eeb2a1..36cc752163 100644
--- a/src/backend/executor/nodeHash.c
+++ b/src/backend/executor/nodeHash.c
@@ -510,6 +510,7 @@ ExecHashTableCreate(HashState *state, List *hashOperators, List *hashCollations,
 		hashtable->spaceAllowed * SKEW_HASH_MEM_PERCENT / 100;
 	hashtable->chunks = NULL;
 	hashtable->current_chunk = NULL;
+	hashtable->current_chunk_idx = 0;
 	hashtable->parallel_state = state->parallel_state;
 	hashtable->area = state->ps.state->es_query_dsa;
 	hashtable->batches = NULL;
@@ -2053,9 +2054,58 @@ ExecPrepHashTableForUnmatched(HashJoinState *hjstate)
 	 * hj_CurTuple: last tuple returned, or NULL to start next bucket
 	 *--
 	 */
+	HashJoinTable hashtable = hjstate->hj_HashTable;
+
 	hjstate->hj_CurBucketNo = 0;
 	hjstate->hj_CurSkewBucketNo = 0;
 	hjstate->hj_CurTuple = NULL;
+	hashtable->current_chunk = hashtable->chunks;
+	hashtable->current_chunk_idx = 0;
+}
+
+/*
+ * ExecPrepHashTableForUnmatched
+ *		set up for a series of ExecScanHashTableForUnmatched calls
+ *		return true if this worker is elected to do the unmatched inner scan
+ */
+bool
+ExecParallelPrepHashTableForUnmatched(HashJoinState *hjstate)
+{
+	HashJoinTable hashtable = hjstate->hj_HashTable;
+	int			curbatch = hashtable->curbatch;
+	ParallelHashJoinBatchAccessor *batch_accessor = >batches[curbatch];
+	ParallelHashJoinBatch *batch = hashtable->batches[curbatch].shared;
+	bool		last = false;
+
+	hjstate->hj_CurBucketNo = 0;
+	hjstate->hj_CurSkewBucketNo = 0;
+	hjstate->hj_CurTuple = NULL;
+	if (curbatch < 0)
+		return false;
+	last = BarrierArriveAndDetachExceptLast(>batch_barrier);
+	if (!last)
+	{
+		hashtable->batches[hashtable->curbatch].done = true;
+		/* Make sure any temporary files are closed. */
+		sts_end_parallel_scan(hashtable->batches[curbatch].inner_tuples);
+		sts_end_parallel_scan(hashtable->batches[curbatch].outer_tuples);
+
+		/*
+		 * Track the largest batch we've been attached to.  Though each
+		 * backend might see a different subset of batches, explain.c will
+		 * scan the results from all backends to find the largest value.
+		 */
+		hashtable->spacePeak =
+			Max(hashtable->spacePeak, batch->size + sizeof(dsa_pointer_atomic) * hashtable->nbuckets);
+		hashtable->curbatch = -1;
+	}
+	else
+	{
+		batch_accessor->shared_chunk = batch->chunks;
+		batch_accessor->current_chunk = dsa_get_address(hashtable->area, batch_accessor->shared_chunk);
+		batch_accessor->current_chunk_idx = 0;
+	}
+	return last;
 }
 
 /*
@@ -2069,60 +2119,110 @@ ExecPrepHashTableForUnmatched(HashJoinState *hjstate)
 bool
 ExecScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext *econtext)
 {
+	HashMemoryChunk next;
 	HashJoinTable hashtable = hjstate->hj_HashTable;
-	HashJoinTuple hashTuple = hjstate->hj_CurTuple;
 
-	for (;;)
+	while 

Re: Parallel Full Hash Join

2020-09-29 Thread Melanie Plageman
On Mon, Sep 21, 2020 at 8:34 PM Thomas Munro  wrote:

> On Tue, Sep 22, 2020 at 8:49 AM Melanie Plageman
>  wrote:
> > On Wed, Sep 11, 2019 at 11:23 PM Thomas Munro 
> wrote:
>
> I took it for a very quick spin and saw simple cases working nicely,
> but TPC-DS queries 51 and 97 (which contain full joins) couldn't be
> convinced to use it.  Hmm.
>

Thanks for taking a look, Thomas!

Both query 51 and query 97 have full outer joins of two CTEs, each of
which are aggregate queries.

During planning when constructing the joinrel and choosing paths, in
hash_inner_and_outer(), we don't consider parallel hash parallel hash
join paths because the outerrel and innerrel do not have
partial_pathlists.

This code

  if (joinrel->consider_parallel &&
save_jointype != JOIN_UNIQUE_OUTER &&
outerrel->partial_pathlist != NIL &&
bms_is_empty(joinrel->lateral_relids))

gates the code to generate partial paths for hash join.

My understanding of this is that if the inner and outerrel don't have
partial paths, then they can't be executed in parallel, so the join
could not be executed in parallel.

For the two TPC-DS queries, even if they use parallel aggs, the finalize
agg will have to be done by a single worker, so I don't think they could
be joined with a parallel hash join.

I added some logging inside the "if" statement and ran join_hash.sql in
regress to see what nodes were typically in the pathlist and partial
pathlist. All of them had basically just sequential scans as the outer
and inner rel paths. regress examples are definitely meant to be
minimal, so this probably wasn't the best place to look for examples of
more complex rels that can be joined with a parallel hash join.


>
> >> Some other notes on the patch:
>
> From a quick peek:
>
> +/*
> + * Upon arriving at the barrier, if this worker is not the last
> worker attached,
> + * detach from the barrier and return false. If this worker is the last
> worker,
> + * remain attached and advance the phase of the barrier, return true
> to indicate
> + * you are the last or "elected" worker who is still attached to the
> barrier.
> + * Another name I considered was BarrierUniqueify or BarrierSoloAssign
> + */
> +bool
> +BarrierDetachOrElect(Barrier *barrier)
>
> I tried to find some existing naming in writing about
> barriers/phasers, but nothing is jumping out at me.  I think a lot of
> this stuff comes from super computing where I guess "make all of the
> threads give up except one" isn't a primitive they'd be too excited
> about :-)
>
> BarrierArriveAndElectOrDetach()... gah, no.
>

You're right that Arrive should be in there.
So, I went with BarrierArriveAndDetachExceptLast()
It's specific, if not clever.


>
> +last = BarrierDetachOrElect(>batch_barrier);
>
> I'd be nice to add some assertions after that, in the 'last' path,
> that there's only one participant and that the phase is as expected,
> just to make it even clearer to the reader, and a comment in the other
> path that we are no longer attached.
>

Assert and comment added to the single worker path.
The other path is just back to HJ_NEED_NEW_BATCH and workers will detach
there as before, so I'm not sure where we could add the comment about
the other workers detaching.


>
> +hjstate->hj_AllocatedBucketRange = 0;
> ...
> +pg_atomic_uint32 bucket;/* bucket allocator for unmatched inner
> scan */
> ...
> +//volatile int mybp = 0; while (mybp == 0)
>
> Some leftover fragments of the bucket-scan version and debugging stuff.
>

cleaned up (and rebased).

I also changed ExecScanHashTableForUnmatched() to scan HashMemoryChunks
in the hashtable instead of using the buckets to align parallel and
serial hash join code.

Originally, I had that code freeing the chunks of the hashtable after
finishing scanning them, however, I noticed this query from regress
failing:

select * from
(values (1, array[10,20]), (2, array[20,30])) as v1(v1x,v1ys)
left join (values (1, 10), (2, 20)) as v2(v2x,v2y) on v2x = v1x
left join unnest(v1ys) as u1(u1y) on u1y = v2y;

It is because the hash join gets rescanned and because there is only one
batch, ExecReScanHashJoin reuses the same hashtable.

 QUERY PLAN
-
 Nested Loop Left Join
   ->  Values Scan on "*VALUES*"
   ->  Hash Right Join
 Hash Cond: (u1.u1y = "*VALUES*_1".column2)
 Filter: ("*VALUES*_1".column1 = "*VALUES*".column1)
 ->  Function Scan on unnest u1
 ->  Hash
   ->  Values Scan on "*VALUES*_1"

I was freeing the hashtable as I scanned each chunk, which clearly
doesn't work for a single batch hash join which gets rescanned.

I don't see anything specific to parallel hash join in ExecReScanHashJoin(),
so, it seems like the same rules apply to parallel hash join. So, I will
have to remove the logic that frees the hash table after scanning each
chunk from the parallel function as well.

In addition, I still 

Re: Parallel Full Hash Join

2020-09-21 Thread Thomas Munro
On Tue, Sep 22, 2020 at 8:49 AM Melanie Plageman
 wrote:
> On Wed, Sep 11, 2019 at 11:23 PM Thomas Munro  wrote:
>> 1.  You could probably make it so that the PHJ_BATCH_SCAN_INNER phase
>> in this patch (the scan for unmatched tuples) is executed by only one
>> process, using the "detach-and-see-if-you-were-last" trick.  Melanie
>> proposed that for an equivalent problem in the looping hash join.  I
>> think it probably works, but it gives up a lot of parallelism and thus
>> won't scale as nicely as the attached patch.
>
> I have attached a patch which implements this
> (v1-0001-Parallel-FOJ-ROJ-single-worker-scan-buckets.patch).

Hi Melanie,

Thanks for working on this!  I have a feeling this is going to be much
easier to land than the mighty hash loop patch.  And it's good to get
one of our blocking design questions nailed down for both patches.

I took it for a very quick spin and saw simple cases working nicely,
but TPC-DS queries 51 and 97 (which contain full joins) couldn't be
convinced to use it.  Hmm.

> For starters, in order to support parallel FOJ and ROJ, I re-enabled
> setting the match bit for the tuples in the hashtable which
> 3e4818e9dd5be294d97c disabled. I did so using the code suggested in [1],
> reading the match bit to see if it is already set before setting it.

Cool.  I'm quite keen to add a "fill_inner" parameter for
ExecHashJoinImpl() and have an N-dimensional lookup table of
ExecHashJoin variants, so that this and much other related branching
can be constant-folded out of existence by the compiler in common
cases, which is why I think this is all fine, but that's for another
day...

> Then, workers except for the last worker detach after exhausting the
> outer side of a batch, leaving one worker to proceed to HJ_FILL_INNER
> and do the scan of the hash table and emit unmatched inner tuples.

+1

Doing better is pretty complicated within our current execution model,
and I think this is a good compromise for now.

Costing for uneven distribution is tricky; depending on your plan
shape, specifically whether there is something else to do afterwards
to pick up the slack, it might or might not affect the total run time
of the query.  It seems like there's not much we can do about that.

> I have also attached a variant on this patch which I am proposing to
> replace it (v1-0001-Parallel-FOJ-ROJ-single-worker-scan-chunks.patch)
> which has a new ExecParallelScanHashTableForUnmatched() in which the
> single worker doing the unmatched scan scans one HashMemoryChunk at a
> time and then frees them as it goes. I thought this might perform better
> than the version which uses the buckets because 1) it should do a bit
> less pointer chasing and 2) it frees each chunk of the hash table as it
> scans it which (maybe) would save a bit of time during
> ExecHashTableDetachBatch() when it goes through and frees the hash
> table, but, my preliminary tests showed a negligible difference between
> this and the version using buckets. I will do a bit more testing,
> though.

+1

I agree that it's the better of those two options.

>> [stuff about deadlocks]
>
> The leader exclusion tactics and the spooling idea don't solve the
> execution order deadlock possibility, so, this "all except last detach
> and last does unmatched inner scan" seems like the best way to solve
> both types of deadlock.

Agreed (at least as long as our threads of query execution are made
out of C call stacks and OS processes that block).

>> Some other notes on the patch:
>>
>> Aside from the deadlock problem, there are some minor details to tidy
>> up (handling of late starters probably not quite right, rescans not
>> yet considered).
>
> These would not be an issue with only one worker doing the scan but
> would have to be handled in a potential new parallel-enabled solution
> like I suggested above.

Makes sense.  Not sure why I thought anything special was needed for rescans.

>> There is a fun hard-coded parameter that controls
>> the parallel step size in terms of cache lines for the unmatched scan;
>> I found that 8 was a lot faster than 4, but no slower than 128 on my
>> laptop, so I set it to 8.
>
> I didn't add this cache line optimization to my chunk scanning method. I
> could do so. Do you think it is more relevant, less relevant, or the
> same if only one worker is doing the unmatched inner scan?

Yeah it's irrelevant for a single process, and even more irrelevant if
we go with your chunk-based version.

>> More thoughts along those micro-optimistic
>> lines: instead of match bit in the header, you could tag the pointer
>> and sometimes avoid having to follow it, and you could prefetch next
>> non-matching tuple's cacheline by looking a head a bit.
>
> I would be happy to try doing this once we get the rest of the patch
> ironed out so that seeing how much of a performance difference it makes
> is more straightforward.

Ignore that, I have no idea if the maintenance overhead for such an

Re: Parallel Full Hash Join

2020-09-21 Thread Melanie Plageman
ple from P1, while
> P2 is try to write to a full queue, and P1 waits for P2.
>
> 3.  You could introduce some kind of overflow for tuple queues, so
> that tuple queues can never block because they're full (until you run
> out of extra memory buffers or disk and error out).  I haven't
> seriously looked into this but I'm starting to suspect it's the
> industrial strength general solution to the problem and variants of it
> that show up in other parallelism projects (Parallel Repartition).  As
> Robert mentioned last time I talked about this[2], you'd probably only
> want to allow spooling (rather than waiting) when the leader is
> actually waiting for other processes; I'm not sure how exactly to
> control that.
>
> 4.  Goetz Graefe's writing about parallel sorting
> comes close to this topic, which he calls flow control deadlocks.  He
> mentions the possibility of infinite spooling like (3) as a solution.
> He's describing a world where producers and consumers are running
> concurrently, and the consumer doesn't just decide to start running
> the subplan (what we call "leader participation"), so he doesn't
> actually have a problem like Gather deadlock.  He describes
> planner-enforced rules that allow deadlock free execution even with
> fixed-size tuple queue flow control by careful controlling where
> order-forcing operators are allowed to appear, so he doesn't have a
> problem like Gather Merge deadlock.  I'm not proposing we should
> create a whole bunch of producer and consumer processes to run
> different plan fragments, but I think you can virtualise the general
> idea in an async executor with "streams", and that also solves other
> problems when you start working with partitions in a world where it's
> not even sure how many workers will show up.  I see this as a long
> term architectural goal requiring vast amounts of energy to achieve,
> hence my new interest in (3) for now.
>
> Hypothetical inter-node deadlock hazard:
>
> Right now I think it is the case the whenever any node begins pulling
> tuples from a subplan, it continues to do so until either the query
> ends early or the subplan runs out of tuples.  For example, Append
> processes its subplans one at a time until they're done -- it doesn't
> jump back and forth.  Parallel Append doesn't necessarily run them in
> the order that they appear in the plan, but it still runs each one to
> completion before picking another one.  If we ever had a node that
> didn't adhere to that rule, then two Parallel Full Hash Join nodes
> could dead lock, if some of the workers were stuck waiting in one
> while some were stuck waiting in the other.
>
> If we were happy to decree that that is a rule of the current
> PostgreSQL executor, then this hypothetical problem would go away.
> For example, consider the old patch I recently rebased[3] to allow
> Append over a bunch of FDWs representing remote shards to return
> tuples as soon as they're ready, not necessarily sequentially (and I
> think several others have worked on similar patches).  To be
> committable under such a rule that applies globally to the whole
> executor, that patch would only be allowed to *start* them in any
> order, but once it's started pulling tuples from a given subplan it'd
> have to pull them all to completion before considering another node.
>
> (Again, that problem goes away in an async model like (4), which will
> also be able to do much more interesting things with FDWs, and it's
> the FDW thing that I think generates more interest in async execution
> than my rambling about abstract parallel query problems.)
>
>
The leader exclusion tactics and the spooling idea don't solve the
execution order deadlock possibility, so, this "all except last detach
and last does unmatched inner scan" seems like the best way to solve
both types of deadlock.
There is another option that could maintain some parallelism for the
unmatched inner scan.

This method is exactly like the "all except last detach and last does
unmatched inner scan" method from the perspective of the main hash join
state machine. The difference is in ExecParallelHashJoinNewBatch(). In
the batch_barrier phase machine, workers loop around looking for batches
that are not done.

In this "detach for now" method, all workers except the last one detach
from a batch after exhausting the outer side. They will mark the batch
they were just working on as "provisionally done" (as opposed to
"done"). The last worker advances the batch_barrier from
PHJ_BATCH_PROBING to PHJ_BATCH_SCAN_INNER.

All detached workers then proceed to HJ_NEED_NEW_BATCH and try to find
another batch to work on. If there are no batches that are neither
"done" or "provisionally done", then th

Parallel Full Hash Join

2019-09-12 Thread Thomas Munro
Hello,

While thinking about looping hash joins (an alternative strategy for
limiting hash join memory usage currently being investigated by
Melanie Plageman in a nearby thread[1]), the topic of parallel query
deadlock hazards came back to haunt me.  I wanted to illustrate the
problems I'm aware of with the concrete code where I ran into this
stuff, so here is a new-but-still-broken implementation of $SUBJECT.
This was removed from the original PHJ submission when I got stuck and
ran out of time in the release cycle for 11.  Since the original
discussion is buried in long threads and some of it was also a bit
confused, here's a fresh description of the problems as I see them.
Hopefully these thoughts might help Melanie's project move forward,
because it's closely related, but I didn't want to dump another patch
into that other thread.  Hence this new thread.

I haven't succeeded in actually observing a deadlock with the attached
patch (though I did last year, very rarely), but I also haven't tried
very hard.  The patch seems to produce the right answers and is pretty
scalable, so it's really frustrating not to be able to get it over the
line.

Tuple queue deadlock hazard:

If the leader process is executing the subplan itself and waiting for
all processes to arrive in ExecParallelHashEndProbe() (in this patch)
while another process has filled up its tuple queue and is waiting for
the leader to read some tuples an unblock it, they will deadlock
forever.  That can't happen in the the committed version of PHJ,
because it never waits for barriers after it has begun emitting
tuples.

Some possible ways to fix this:

1.  You could probably make it so that the PHJ_BATCH_SCAN_INNER phase
in this patch (the scan for unmatched tuples) is executed by only one
process, using the "detach-and-see-if-you-were-last" trick.  Melanie
proposed that for an equivalent problem in the looping hash join.  I
think it probably works, but it gives up a lot of parallelism and thus
won't scale as nicely as the attached patch.

2.  You could probably make it so that only the leader process drops
out of executing the inner unmatched scan, and then I think you
wouldn't have this very specific problem at the cost of losing some
(but not all) parallelism (ie the leader), but there might be other
variants of the problem.  For example, a GatherMerge leader process
might be blocked waiting for the next tuple for a tuple from P1, while
P2 is try to write to a full queue, and P1 waits for P2.

3.  You could introduce some kind of overflow for tuple queues, so
that tuple queues can never block because they're full (until you run
out of extra memory buffers or disk and error out).  I haven't
seriously looked into this but I'm starting to suspect it's the
industrial strength general solution to the problem and variants of it
that show up in other parallelism projects (Parallel Repartition).  As
Robert mentioned last time I talked about this[2], you'd probably only
want to allow spooling (rather than waiting) when the leader is
actually waiting for other processes; I'm not sure how exactly to
control that.

4.  Goetz Graefe's writing about parallel sorting
comes close to this topic, which he calls flow control deadlocks.  He
mentions the possibility of infinite spooling like (3) as a solution.
He's describing a world where producers and consumers are running
concurrently, and the consumer doesn't just decide to start running
the subplan (what we call "leader participation"), so he doesn't
actually have a problem like Gather deadlock.  He describes
planner-enforced rules that allow deadlock free execution even with
fixed-size tuple queue flow control by careful controlling where
order-forcing operators are allowed to appear, so he doesn't have a
problem like Gather Merge deadlock.  I'm not proposing we should
create a whole bunch of producer and consumer processes to run
different plan fragments, but I think you can virtualise the general
idea in an async executor with "streams", and that also solves other
problems when you start working with partitions in a world where it's
not even sure how many workers will show up.  I see this as a long
term architectural goal requiring vast amounts of energy to achieve,
hence my new interest in (3) for now.

Hypothetical inter-node deadlock hazard:

Right now I think it is the case the whenever any node begins pulling
tuples from a subplan, it continues to do so until either the query
ends early or the subplan runs out of tuples.  For example, Append
processes its subplans one at a time until they're done -- it doesn't
jump back and forth.  Parallel Append doesn't necessarily run them in
the order that they appear in the plan, but it still runs each one to
completion before picking another one.  If we ever had a node that
didn't adhere to that rule, then two Parallel Full Hash Join nodes
could dead lock, if some of the workers were stuck waiting in one
while some were stuck wai