I've been thinking about how parallelism interacts with sorting over
the last few days and I wanted to share a few preliminary thoughts.  I
definitely don't have all the answers worked out here yet, so thoughts
are welcome.  But here are a few observations:

1. Parallel sort is useful but within parallel queries and for utility
commands like CREATE INDEX.  Index builds can take a long time, and
that time is often dominated by the time needed to sort the data, so
being able to do that faster would be good.

2. Within parallel query, there are two reasons to care about data
that is in sorted order.  First, we might need to deliver the results
to the user in a particular order, because they've specified ORDER BY
whatever.  Second, the optimal join strategy might be a merge join,
which requires that both relations be sorted according to the join

3. The current Gather node reads tuples from the workers in
round-robin fashion, skipping over workers that don't have a tuple
ready yet when it needs one.  It seems potentially useful to have a
Gather Merge node which would assume that the results from each worker
are ordered with respect to each other, and do a final merge pass over
those.  Then we could get the toplevel query ordering we want using a
plan like this:

Gather Merge
-> Sort
  -> Parallel Seq Scan on foo
      Filter: something

4. Gather Merge would be an executor node, and thus not available to
any code that uses tuplesort.c directly.  Also, it seems fairly
mediocre for merge joins.  The best we could do is something like

Merge Join
-> Gather Merge
  -> Sort
    -> Parallel Seq Scan
-> Gather Merge
  -> Sort
    -> Parallel Seq Scan

The problem with this plan is that the join itself is not done in
parallel, only the sorting.  That's not great, especially if there are
more joins that need to be done afterwards, necessarily not in
parallel.[2]  It's possible that one side of the join could be an
Index Scan rather than Gather Merge -> Sort -> Parallel Seq Scan, but
that doesn't change the overall picture here much.

5. Really nailing the merge join case seems to require partitioning
both relations in a fashion compatible with the join attribute, and
then joining the partitions separately.  Consider an operator
Repartition which reads rows from its child plan and returns those
where hash(joincol) % NumberOfWorkers == MyWorkerNumber.  The rest are
sent to the worker whose worker number is hash(joincol) %
NumberOfWorkers and are returned by its copy of the corrresponding
Repartition operator.  Then we could express a merge join reasonably
well as:

Gather (Merge)
-> Merge Join
  -> Sort
    -> Repartition
      -> Parallel Seq Scan
  -> Sort
    -> Repartition
      -> Parallel Seq Scan

The Gather could be a Gather Merge if the merge join ordering matches
the final output ordering, or a simple Gather if it doesn't.
Additional join steps could be inserted between the Gather (Merge)
operator and the merge join.  So this is a big improvement over the
plan shown under point #4.  However, it's probably still not optimal,
because we probably want to have substantially more partitions than
there are workers.  Otherwise, if some workers finish before others,
it's hard to spread the load.  Getting this right probably requires
some sort of cooperation between Gather and Repartition where they
agree on a number of partitions and then the workers repeatedly pick a
partition, run the plan for that partition, and then loop around to
get the next unfinished partition until all are completed.

6. Even without repartitioning, if one side of the join has a usable
index, we could instead do this:

Gather (Merge)
-> Merge Join
  -> Sort
      -> Parallel Seq Scan
  -> Index Scan

However, this might not be a good idea: we'll scan the index once per
worker.  If we had a Parallel Index Scan which worked like a Parallel
Seq Scan, in that it returned only a subset of the results in each
worker but in the same order that the non-parallel version would have
returned them, we could instead do this, which might or might not be

Gather (Merge)
-> Merge Join
  -> Sort
    -> Repartition
      -> Parallel Seq Scan
  -> Repartition
    -> Parallel Index Scan

Here we scan the index just once (spread across all the workers) but
we've got to repartition the rows we read from it, so I'm not sure how
that's going to work out.  Parallel index scan is of course useful
apart from merge joins, because you can do something like this to
preserve the ordering it creates:

Gather Merge
-> Nested Loop
  -> Parallel Index Scan on a
  -> Index Scan on b
    Index Qual: b.x = a.x

7. Another option, instead or in addition to introducing a Repartition
operator, is to make the sort itself parallel-aware.  Each worker
reads rows until it fills work_mem, quicksorts them, and dumps them
out as a run.  Suppose there are few enough runs that we don't need
multiple merge passes, and that we have some way of making every
worker available of every run performed by any worker.  Then any one
or more of the workers can get the sorted results out by performing a
final merge pass over the runs we produced.  We could support various
models for reading the results of the sort: return every tuple to
every worker, return every tuple to some worker but don't return any
given tuple to more than one worker; return all tuples in the leader.
So if we just want to sort a big pile of tuples, the plan can now look
like this:

-> Parallel Sort
    Output Mode: Leader Only
  -> Parallel Seq Scan

I'm not sure if that's better or worse or exactly equivalent to the
Gather Merge > Sort > Parallel Seq Scan approach.  If we want to do a
parallel merge join, we now have options like this:

Gather (Merge)
-> Merge Join
  -> Parallel Sort
    Output Mode: Each Tuple Once
    -> Parallel Seq Scan
  -> Index Scan


Gather (Merge)
-> Merge Join
  -> Repartition
    -> Parallel Sort
      Output Mode: Each Tuple Once
      -> Parallel Seq Scan
  -> Repartition
    -> Parallel Sort
      Output Mode: Each Tuple To Every Worker
      -> Parallel Seq Scan

OK, that's all I've got.  So in the space of one email, I've proposed
executor nodes for Gather Merge, Repartition, Partial Index Scan, and
Parallel Sort (with three different output modes).  And I don't know
which ones are actually most interesting, or whether we need them all.
Whee!  Nor do I know whether any of this can work for code that
currently uses tuplesort.c directly.  Double whee!

Thoughts welcome.


Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company

[1] Nested loops preserve the input ordering, but there is no special
reason for the input to have an ordering in the first place unless
it's useful for a merge join higher up in the plan tree or unless it
matches the final query ordering.  Hash joins do not benefit from any
particular input ordering, and in fact they destroy the input ordering
if they go to multiple batches; so we always treat the output of a
hash join as unordered.

[2] Currently, Gather nodes cannot appear in a plan tree directly or
indirectly beneath other Gather nodes, partly because it's not exactly
clear what the semantics of such a thing would be. Therefore, the plan
shown here precludes a parallel join between the output of the merge
join and anything else.

Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:

Reply via email to