Here's a new rebased and debugged patch set.

On Tue, Aug 1, 2017 at 1:11 PM, Andres Freund <> wrote:
> - Echoing concerns from other threads (Robert: ping): I'm doubtful that
>   it makes sense to size the number of parallel workers solely based on
>   the parallel scan node's size.  I don't think it's this patch's job to
>   change that, but to me it seriously amplifys that - I'd bet there's a
>   lot of cases with nontrivial joins where the benefit from parallelism
>   on the join level is bigger than on the scan level itself.  And the
>   number of rows in the upper nodes might also be bigger than on the
>   scan node level, making it more important to have higher number of
>   nodes.

Agreed that this is bogus.  The number of workers is really determined
by the outer path (the probe side), except that if the inner path (the
build side) is not big enough to warrant parallel workers at all then
parallelism is inhibited on that side.  That prevents small tables
from being loaded by Parallel Hash.  That is something we want, but
it's probably not doing it for the right reasons with the right
threshold -- about which more below.

> - If I understand the code in initial_cost_hashjoin() correctly, we
>   count the synchronization overhead once, independent of the number of
>   workers.  But on the other hand we calculate the throughput by
>   dividing by the number of workers.  Do you think that's right?

It's how long you think the average participant will have to wait for
the last participant to arrive, and I think that's mainly determined
by the parallel grain, not the number of workers.  If you're a work
that has reached the end of a scan, the best case is that every other
worker has already reached the end too and the worst case is that
another worker read the last granule (currently page) just before you
hit the end, so you'll have to wait for it to process a granule's
worth of work.

To show this I used dtrace to measure the number of microseconds spent
waiting at the barrier before probing while running a 5 million row
self-join 100 times, and got the following histograms:

1 worker:

           value  ------------- Distribution ------------- count
             < 0 |                                         0
               0 |@@@@@@@@@@@@@@@@@@@@@@                   110
              20 |                                         1
              40 |@                                        5
              60 |@@@@@                                    24
              80 |@@@                                      14
             100 |@                                        5
             120 |@@@                                      16
             140 |@@@                                      17
             160 |@@                                       8
             180 |                                         0

2 workers:

           value  ------------- Distribution ------------- count
             < 0 |                                         0
               0 |@@@@@@@@@@@@@@                           107
              20 |                                         1
              40 |@@@                                      21
              60 |@@@                                      25
              80 |@@                                       16
             100 |@@                                       14
             120 |@@@@@                                    38
             140 |@@@@@@@                                  51
             160 |@@@                                      20
             180 |                                         3
             200 |                                         1
             220 |                                         3
             240 |                                         0

3 workers:

           value  ------------- Distribution ------------- count
             < 0 |                                         0
               0 |@@@@@@@@@@@                              113
              20 |@@                                       15
              40 |@@@                                      29
              60 |@@@@                                     35
              80 |@@@@                                     37
             100 |@@@@@@                                   56
             120 |@@@@@                                    51
             140 |@@@                                      31
             160 |@@                                       21
             180 |@                                        6

4 workers:

           value  ------------- Distribution ------------- count
             < 0 |                                         0
               0 |@@@@@@@@@@                               121
              20 |                                         4
              40 |@@@                                      39
              60 |@@                                       29
              80 |@@                                       24
             100 |@@@@@@@                                  88
             120 |@@@@@@@                                  82
             140 |@@@@@                                    58
             160 |@@                                       26
             180 |@                                        15
             200 |@                                        9
             220 |                                         4
             240 |                                         1
             260 |                                         0

I didn't know what to expect above my machine's core count of 4, but
this is for 8:

           value  ------------- Distribution ------------- count
             < 0 |                                         0
               0 |@@@@@                                    116
              20 |                                         2
              40 |@@                                       36
              60 |@@@                                      69
              80 |@@@@                                     95
             100 |@@@@@                                    113
             120 |@@@                                      74
             140 |@@@                                      71
             160 |@@                                       44
             180 |@@                                       36
             200 |@                                        30
             220 |@                                        14
             240 |@                                        18
             260 |                                         8
             280 |                                         3
             300 |                                         4

It's true that the fraction of waits that go into the 0-20us bucket
(because the last to arrive at a barrier doesn't have to wait at all)
decreases as you add more workers, but above 1 worker the main story
is the bell curve (?) we see clustered around 100-120us, and it
doesn't seem to be moving.

If we call the fraction of samples outside the 0-20us bucket
"wait_probability" and call their average wait time
"expected_wait_cost", then one way to estimate this is something like:

   wait_probability * expected_wait_cost
 = (1 - 1 / participants) * (tuples_per_grain * cost_per_tuple * 0.5)

I don't think we can do that today, because we don't have access to
tuples_per_grain from the subplan.  That would in theory come
ultimately from the scan, adjusted as appropriate by selectivity
estimates.  The grain could in future be more than one page at a time
as proposed by David Rowley and others, or "it's complicated" for a
Parallel Append.  But I'm not sure if that's correct, doable or worth
doing, hence my attempt to provide a single knob to model this for

I did some experiments to find a value of
parallel_synchronization_cost that avoids Parallel Hash when it won't
pay off, like this:

 * a "big" table with 1 million rows to be the outer relation
 * a "small" table with a range of sizes from 5k to 100k rows to hash
 * both tables have a unique integer key "a" and a 60 byte text column "b"
 * query (a): SELECT COUNT(*) FROM big JOIN small USING (a)
 * query (b): ... WHERE length(small.b) * 2 - len(small.b) = length(small.b)
 * work_mem set high enough that we never have multiple batches
 * one warmup run and then the median of 3 measurements
 * all default except min_parallel_table_scan_size = 0
 * 4 core developer machine
 * -O2, no asserts

Just to be clear:  The following number aren't supposed to be
impressive and are way shorter than the queries that Parallel Hash
feature is really intended to help with.  That's because we're
searching for the threshold below which Parallel Hash *doesn't* help,
and that involves running queries where there isn't much to hash.  The
times are for the complete query (ie include probing too, not just the
hash table build), and show "parallel-oblivious-hash-join-time ->
parallel-aware-hash-join-time" for queries "a" and "b" on patched
master.  I also compared with unpatched master to confirm that the
parallel-oblivious times on the left of the arrows match unpatched
master's, modulo a bit of noise.

1 worker:

 5,000 rows hashed:  (a) 157ms -> 166ms, (b) 166ms -> 183ms
 7,500 rows hashed:  (a) 162ms -> 174ms, (b) 176ms -> 182ms
10,000 rows hashed:  (a) 161ms -> 170ms, (b) 181ms -> 210ms
12,500 rows hashed:  (a) 169ms -> 175ms, (b) 194ms -> 188ms
15,000 rows hashed:  (a) 175ms -> 181ms, (b) 199ms -> 195ms
17,500 rows hashed:  (a) 173ms -> 175ms, (b) 201ms -> 202ms
20,000 rows hashed:  (a) 179ms -> 179ms, (b) 210ms -> 195ms <== a & b threshold
30,000 rows hashed:  (a) 196ms -> 192ms, (b) 244ms -> 218ms
40,000 rows hashed:  (a) 201ms -> 197ms, (b) 265ms -> 228ms
50,000 rows hashed:  (a) 217ms -> 251ms, (b) 294ms -> 249ms
60,000 rows hashed:  (a) 228ms -> 222ms, (b) 324ms -> 268ms
70,000 rows hashed:  (a) 230ms -> 214ms, (b) 338ms -> 275ms
80,000 rows hashed:  (a) 243ms -> 229ms, (b) 366ms -> 291ms
90,000 rows hashed:  (a) 256ms -> 239ms, (b) 391ms -> 311ms
100,000 rows hashed: (a) 266ms -> 248ms, (b) 420ms -> 326ms

2 workers:

 5,000 rows hashed:  (a) 110ms -> 115ms, (b) 118ms -> 127ms
 7,500 rows hashed:  (a) 115ms -> 128ms, (b) 131ms -> 128ms
10,000 rows hashed:  (a) 114ms -> 116ms, (b) 135ms -> 148ms
12,500 rows hashed:  (a) 126ms -> 126ms, (b) 145ms -> 131ms
15,000 rows hashed:  (a) 134ms -> 142ms, (b) 151ms -> 134ms
17,500 rows hashed:  (a) 125ms -> 122ms, (b) 153ms -> 147ms <== a & b threshold
20,000 rows hashed:  (a) 126ms -> 124ms, (b) 160ms -> 136ms
30,000 rows hashed:  (a) 144ms -> 132ms, (b) 191ms -> 152ms
40,000 rows hashed:  (a) 165ms -> 151ms, (b) 213ms -> 158ms
50,000 rows hashed:  (a) 161ms -> 143ms, (b) 240ms -> 171ms
60,000 rows hashed:  (a) 171ms -> 150ms, (b) 266ms -> 186ms
70,000 rows hashed:  (a) 176ms -> 151ms, (b) 283ms -> 190ms
80,000 rows hashed:  (a) 181ms -> 156ms, (b) 315ms -> 204ms
90,000 rows hashed:  (a) 189ms -> 164ms, (b) 338ms -> 214ms
100,000 rows hashed: (a) 207ms -> 177ms, (b) 362ms -> 232ms

3 workers:

 5,000 rows hashed:  (a)  90ms -> 103ms, (b) 107ms -> 118ms
 7,500 rows hashed:  (a) 106ms -> 104ms, (b) 115ms -> 118ms
10,000 rows hashed:  (a) 100ms ->  95ms, (b) 121ms -> 110ms <== b threshold
12,500 rows hashed:  (a) 103ms -> 120ms, (b) 134ms -> 113ms
15,000 rows hashed:  (a) 134ms -> 110ms, (b) 142ms -> 116ms <== a threshold
17,500 rows hashed:  (a) 110ms -> 104ms, (b) 146ms -> 123ms
20,000 rows hashed:  (a) 107ms -> 103ms, (b) 151ms -> 120ms
30,000 rows hashed:  (a) 124ms -> 110ms, (b) 183ms -> 135ms
40,000 rows hashed:  (a) 125ms -> 108ms, (b) 209ms -> 137ms
50,000 rows hashed:  (a) 133ms -> 115ms, (b) 238ms -> 150ms
60,000 rows hashed:  (a) 143ms -> 119ms, (b) 266ms -> 159ms
70,000 rows hashed:  (a) 146ms -> 120ms, (b) 288ms -> 165ms
80,000 rows hashed:  (a) 150ms -> 129ms, (b) 316ms -> 176ms
90,000 rows hashed:  (a) 159ms -> 126ms, (b) 343ms -> 187ms
100,000 rows hashed: (a) 176ms -> 136ms, (b) 370ms -> 195ms

4 workers:

 5,000 rows hashed:  (a)  93ms -> 103ms, (b) 109ms -> 117ms
 7,500 rows hashed:  (a) 106ms -> 102ms, (b) 121ms -> 115ms <== b threshold
10,000 rows hashed:  (a)  99ms -> 100ms, (b) 126ms -> 113ms
12,500 rows hashed:  (a) 107ms -> 102ms, (b) 137ms -> 117ms <== a threshold
15,000 rows hashed:  (a) 111ms -> 107ms, (b) 145ms -> 115ms
17,500 rows hashed:  (a) 110ms ->  10ms, (b) 151ms -> 118ms
20,000 rows hashed:  (a) 108ms -> 103ms, (b) 160ms -> 120ms
30,000 rows hashed:  (a) 120ms -> 108ms, (b) 196ms -> 127ms
40,000 rows hashed:  (a) 129ms -> 109ms, (b) 225ms -> 134ms
50,000 rows hashed:  (a) 140ms -> 121ms, (b) 262ms -> 148ms
60,000 rows hashed:  (a) 152ms -> 123ms, (b) 294ms -> 154ms
70,000 rows hashed:  (a) 157ms -> 122ms, (b) 322ms -> 165ms
80,000 rows hashed:  (a) 154ms -> 138ms, (b) 372ms -> 201ms
90,000 rows hashed:  (a) 186ms -> 122ms, (b) 408ms -> 180ms
100,000 rows hashed: (a) 170ms -> 124ms, (b) 421ms -> 186ms

I found that a good value of parallel_synchronization_cost that
enables Parallel Hash somewhere around those thresholds is 250 for
these test queries, so I have set that as the default in the new patch

All of this might be considered moot, because I still needed to frob
min_parallel_table_scan_size to get a Parallel Hash below 90,0000 rows
anyway due to the policies in compute_parallel_worker().  So really
there is no danger of tables like the TPC-H "nation" and "region"
tables being loaded by Parallel Hash even if you set
parallel_synchronization_cost to 0, and probably no reason to worry to
much about its default value for now.  It could probably be argued
that we shouldn't have the GUC at all, but at least it provides a
handy way to enable and disable Parallel Hash!

One hidden factor here is that it takes a while for workers to start
up and the leader can scan thousands of rows before they arrive.  This
effect will presumably be exaggerated on systems with slow
fork/equivalent (Windows, some commercial Unices IIRC), and minimised
by someone writing a patch to reuse parallel workers.  I haven't tried
to investigate that effect because it doesn't seem very interesting or
likely to persist but it may contribute the experimental thresholds I

> - I haven't really grokked the deadlock issue you address. Could you
>   expand the comments on that? Possibly somewhere central referenced by
>   the various parts.

The central place is leader_gate.c.  What's wrong with the explanation in there?

Let me restate the problem here, and the three solutions I considered:

Problem: The leader must never be allowed to wait for other
participants that have emitted tuples (it doesn't matter whether that
waiting takes the form of latches, condition variables, barriers,
shm_queues or anything else).  Any participant that has emitted tuples
might currently be blocked waiting for the leader to drain the tuple
queue, so a deadlock could be created.

Concrete example: In this case, once we get past PHJ_PHASE_PROBING we
have to allow only the leader or the workers to continue.  Otherwise
some worker might be trying to finish probing by emitting tuples,
while the leader might be in BarrierWait() waiting for everyone to
finish probing.  This problems affects only outer joins (they have
wait to start PHJ_PHASE_UNMATCHED after probing) and multibatch joins
(they wait to be able to load the next batch).

Solution 1:  LeaderGate is a simple mechanism for reaching consensus
on whether the leader or a set of workers will be allowed to run after
a certain point, in this case the end of probing.  Concretely this
means that either the leader or any workers will drop out early at
that point, leaving nothing left to do.  This is made slightly more
complicated by the fact that we don't know up front if there are any
workers yet.

Solution 2:  Teach tuple queues to spill to disk instead of blocking
when full.  I think this behaviour should probably only be activated
while the leader is running the plan rather than draining tuple
queues; the current block-when-full behaviour would still be
appropriate if the leader is simply unable to drain queues fast
enough.  Then the deadlock risk would go away.

Solution 3:  An asynchronous executor model where you don't actually
wait synchronously at barriers -- instead you detach and go and do
something else, but come back and reattach when there is progress to
be made.  I have some ideas about that but they are dependent on the
async execution project reaching a fairly advanced state first.

When I wrote it, I figured that leader_gate.c was cheap and would do
for now, but I have to admit that it's quite confusing and it sucks
that later batches lose a core.  I'm now thinking that 2 may be a
better idea.  My first thought is that Gather needs a way to advertise
that it's busy while running the plan, shm_mq needs a slightly
different all-or-nothing nowait mode, and TupleQueue needs to write to
a shared tuplestore or other temp file-backed mechanism when
appropriate.  Thoughts?

> - maybe I'm overly paranoid, but it might not be bad to add some extra
>   checks for ExecReScanHashJoin ensuring that it doesn't get called when
>   workers are still doing something.

Check out ExecReScanGather(): it shuts down and waits for all workers
to complete, which makes the assumptions in ExecReScanHashJoin() true.
If a node below Gather but above Hash Join could initiate a rescan
then the assumptions would not hold.  I am not sure what it would mean
though and we don't generate any such plans today to my knowledge.  It
doesn't seem to make sense for the inner side of Nested Loop to be
partial.  Have I missed something here?

It looks like some details may have changed here due to 41b0dd98 and
nearby commits, and I may need to implement at least ReInitializeDSM.

I also need a regression test to hit the rescan but I'm not sure how
to write one currently.  In an earlier version of this patch set I
could do it by setting shared_tuple_cost (a GUC I no longer have) to a
negative number, which essentially turned our optimiser into a
pessimiser capable of producing a nested loop that rescans a gather
node, forking workers for every row...

> - seems like you're dereffing tuple unnecessarily here:
> +               tuple = (HashJoinTuple)
> +                       dsa_get_address(hashtable->area, 
> detached_chain_shared);
> +               ExecHashTransferSkewTuples(hashtable, detached_chain,

Yes, here lurked a bug, fixed.

> - The names here could probably improved some:
> +               case WAIT_EVENT_HASH_SHRINKING1:
> +                       event_name = "Hash/Shrinking1";
> +                       break;
> +               case WAIT_EVENT_HASH_SHRINKING2:
> +                       event_name = "Hash/Shrinking2";
> +                       break;
> +               case WAIT_EVENT_HASH_SHRINKING3:
> +                       event_name = "Hash/Shrinking3";
> +                       break;
> +               case WAIT_EVENT_HASH_SHRINKING4:
> +                       event_name = "Hash/Shrinking4";


> - why are we restricting rows_total bit to parallel aware?
> +       /*
> +        * If parallel-aware, the executor will also need an estimate of the 
> total
> +        * number of rows expected from all participants so that it can size 
> the
> +        * shared hash table.
> +        */
> +       if (best_path->jpath.path.parallel_aware)
> +       {
> +               hash_plan->plan.parallel_aware = true;
> +               hash_plan->rows_total = best_path->inner_rows_total;
> +       }
> +

I could set it unconditionally and then skip this bit that receives the number:

    rows = node->plan.parallel_aware ? node->rows_total : outerNode->plan_rows;

Do you think it would be better to push plan_rows_total into Plan instead?

> - seems we need a few more test - I don't think the existing tests are
>   properly going to exercise the skew stuff, multiple batches, etc?
>   This is nontrivial code, I'd really like to see a high test coverage
>   of the new code.

I've added some regression tests in a patch to apply before making any
changes.  You would have to change the "explain (costs off)" to
"explain analyze" to verify the claims I put in comments about the
number of batches and peak memory usage in the work_mem management
tests.  I chose to put them into join.sql, and then a later patch adds
parallel-aware versions.  (An alternative would be to put them into
select.sql and select_parallel.sql, but it seemed better to keep the
non-parallel, parallel with parallel-oblivious join and parallel-aware
cases next to each other.)

While testing I found a timing bug that could produce incorrect query
results because of the empty hash table optimisation, because it had
an incorrect value hashtable->totalTuples == 0.  Fixed (see code in
the "finish:" case in MultiExecHash()).

Last week I finally figured out a way to test different startup
timings, considering the complexity created by the "flash mob" problem
I described when I first proposed dynamic barriers[1].  If you build
with -DBARRIER_DEBUG in the attached patch set you get a new GUC
"barrier_attach_sequence" which you can set like this:

  SET barrier_attach_phases = 'HashJoin.barrier:2,7,0';

That list of number tells it which phase each participant should
simulate attaching at.  In that example the leader will attach at
phase 2 (PHJ_PHASE_BUILDING), worker 0 will attach at 7
(PHJ_PHASE_RESETTING_BATCH(1)) and worker 1 will attach at 0
(PHJ_PHASE_BEGINNING).  Note that *someone* has to start at 0 or bad
things will happen.

Using this technique I can now use simple scripts to test every case
in the switch statements that appear in three places in the patch.
See attached file parallel-hash-attach-phases.sql.

I'm not sure whether, and if so how, to package any such tests for the
regression suite, since they require a special debug build.

Ideally I would also like to find a way to tell Gather not to run the
plan in the leader (a bit like single_copy mode, except allowing
multiple workers to run the plan, and raising an error out if no
workers could be launched).

> - might not hurt to reindent before the final submission

Will do.

> - Unsurprisingly, please implement the FIXME ;)

This must refer to a note about cleaning up skew buckets after they're
not needed, which I've now done.

Some other things:

Previously I failed to initialise the atomics in the shared skew hash
table correctly, and also I used memset to overwrite atomics when
loading a new batch.  This worked on modern systems but would of
course fail when using emulated atomics.  Fixed in the attached.

In the process I discovered that initialising and clearing large hash
tables this way is quite a lot slower than memset on my machine under
simple test conditions.  I think it might be worth experimenting with
a array-oriented atomic operations that have a specialisation for 0
that just uses memset if it can (something like
pg_atomic_init_u64_array(base, stride, n, 0)).  I also think it may be
interesting to parallelise the initialisation and reset of the hash
table, since I've seen cases where I have 7 backends waiting on a
barrier while one initialises a couple of GB of memory for several
seconds.  Those are just small optimisations though and I'm not
planning to investigate them until after the basic patch is in
committable form.


Thomas Munro

Attachment: parallel-hash-v19.patchset.tgz
Description: GNU Zip compressed data

Attachment: parallel-hash-attach-phases.sql
Description: Binary data

Sent via pgsql-hackers mailing list (
To make changes to your subscription:

Reply via email to