On Sat, Aug 13, 2016 at 7:18 PM, Thomas Munro
<thomas.mu...@enterprisedb.com> wrote:
> I would like to propose "barriers" for Postgres processes.  A barrier
> is a very simple mechanism for coordinating parallel computation, as
> found in many threading libraries.
> First, you initialise a Barrier object somewhere in shared memory,
> most likely in the DSM segment used by parallel query, by calling
> BarrierInit(&barrier, nworkers).  Then workers can call
> BarrierWait(&barrier) when they want to block until all workers arrive
> at the barrier.  When the final worker arrives, BarrierWait returns in
> all workers, releasing them to continue their work.  One arbitrary
> worker receives a different return value as a way of "electing" it to
> perform serial phases of computation.  For parallel phases of
> computation, the return value can be ignored.  For example, there may
> be preparation, merging, or post-processing phases which must be done
> by just one worker, interspersed with phases where all workers do
> something.
> My use case for this is coordinating the phases of parallel hash
> joins, but I strongly suspect there are other cases.  Parallel sort
> springs to mind, which is why I wanted to post this separately and
> earlier than my larger patch series, to get feedback from people
> working on other parallel features.

I was thinking about this over the weekend and I started to wonder
whether this is really going to work well for hash joins.  For
example, suppose that 6GB of work_mem is available and the projected
size of the hash table is 8GB.  Clearly, we're going to need 2
batches, but, if our estimates are accurate and the tuples split
evenly between batches, each batch will be only 4GB!  That means that
we can build up to 2GB of the hash table for the next batch before
we've finished with the hash table for the previous batch.  It seems
like a really good idea to try to take advantage of that as much as

The simple version of this is that when a worker gets done with its
own probe phase for batch X, it can immediately start building the
hash table for phase X+1, stopping if it fills up the unused portion
of work_mem before the old hash table goes away.  Of course, there are
some tricky issues with reading tapes that were originally created by
other backends, but if I understand correctly, Peter Geoghegan has
already done some work on that problem, and it seems like something we
can eventually solve, even if not in the first version.

The more complicated version of this is that we might want to delegate
one or more workers to start building as much of the next-batch hash
table as will fit instead of assisting with the current probe phase.
Once work_mem is full, they join the probe phase and continue until
it's done.  Again, tape management is an issue.  But you can see that
if you can make this work, in this example, you can reduce the
enforced pause between batches by about 50%; half the work is already
done by the time the old hash table goes away.  I bet that has a
chance of being fairly significant, especially for hash joins that
have tons of batches.  I once saw a 64-batch hash join outperform a
nested loop with inner index scan!

Anyway, my point here is that I'm not sure whether the barrier
mechanic is going to work well for computations with overlapping
phases, and I suspect that overlapping phases is going to be an
important technique, so we should make sure not to settle into a
synchronization model that makes it hard.

> A problem that I'm still grappling with is how to deal with workers
> that fail to launch.  What I'm proposing so far is based on static
> worker sets, where you can only give the number of workers at
> initialisation time, just like pthread_barrier_init.  Some other
> libraries allow for adjustable worker sets, and I'm wondering if a
> parallel leader might need to be able to adjust the barrier when it
> hears of a worker not starting.  More on that soon.

I think tying this into the number of workers for the parallel context
in general is going to get you into trouble.  For example, suppose
that we have an Append plan and beneath that we have two children of
each of which is a Hash Join.  Right now, Append is dumb, so it will
blindly throw all of the workers at the first Hash Join and then, as
they emerge, it will throw them at the second one.  However, we've had
previous discussions which I'm too lazy to look up right now about
making a Parallel Append that would split the workers between the two
hash joins; if one of them finished, then those workers could go join
the other hash join in medias res.

Now, in this world, it's clearly very bad if each hash join waits for
"all of the workers" to finish a given phase before beginning the next
phase.  In fact, it's probably going to result in both hash joins
hanging, followed by everybody waiting for each other forever.  The
actual condition that must be verified is that there are no workers
which are going to keep trying to probe the old hash table after we
blow it up.  In other words, we need to verify that every outer tuple
for the current batch has been joined.  I'm not sure how tight we want
to make the accounting, but consider the following plan:

Hash Join
-> Parallel Seq Scan on a
-> Hash
  -> Seq Scan on b

Whenever a worker first pulls a tuple from a, it claims an entire page
of tuples from the scan.  A hazard then exists until that backend has
pulled every tuple from that page and joined them all.  There is then
no hazard - for that backend - until it pulls the first tuple from the
next page.

A sort of dumb way of handling all this is to assume that once a
worker joins the hash join, it won't go off and do anything else until
the hash join is done.  Under that assumption, you just need some sort
of BarrierAttach() operation; workers that have never attached the
barrier aren't participating in the hash join at all and so they are
irrelevant - and now you know how many workers you need to await,
because you can keep a count how many have attached.  Perhaps you
simply turn away any workers that arrive after batch 0 is complete.

That approach is not entirely satisfying, though.  As discussed on the
thread about asynchronous and vectorized execution, it's desirable
that, when a particular branch of a parallel query can't use any more
workers - e.g. because they are all waiting for the next phase to
begin - those workers can leave and go do something else.
Contrariwise, if some workers are busy with another branch of the
query tree and they finish all the work over there, it's desirable for
those workers to be able to come join our branch of the query tree to
help out.  So it seems like it would be nicer here to be precise about
the bookkeeping: (1) track the number of workers that actually have a
hazard for this portion of the query tree right now and (2) be
prepared for a future day in which we will wish to allow a worker
which has no such active hazard to depart the query tree for a time or
indefinitely.  The fly in the ointment, of course, is that the
existence of the hazard depends on state we don't readily have access
to at the point where we want to make the decision...  and I don't
have a good idea how to solve that problem.  We probably want to do
something as simple as possible for now, but at the same try not to
make future improvements along these lines any more difficult than

> I thought about using a different name to avoid colliding with
> barrier.h and overloading the term: there are of course also compiler
> barriers and memory barriers.  But then I realised that that header
> was basically vacant real estate, and 'barrier' is the super-well
> established standard term for this parallel computing primitive.

If we're going to remove barrier.h, I think that should be a separate
commit from creating a new barrier.h.

Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company

Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:

Reply via email to