Re: Spilling hashed SetOps and aggregates to disk

2018-06-21 Thread Jeff Davis
On Thu, 2018-06-21 at 13:44 -0700, David Gershuni wrote:
> To handle hash collisions, we can do the following:
> 
> 1) We track the current hash code we’re processing, in ascending
> order.
> 
> 2) Instead of combining one group at at time, we’ll maintain a list
> of
> all groups we’ve seen that match the current hash code.
> 
> 3) When we peak at a spill file, if its next group’s hash code
> matches
> our current hash code, we’ll check to see if that group matches any
> of the
> groups in our list. If so, we’ll combine it with the matching group.
> If not,
> we’ll add this group to our list.
> 
> 4) Once the head of each spill file exceeds the current hash code,
> we’ll
> emit our list as output, empty it, and advance to the next hash code.
> 
> Does that seem reasonable?

It seems algorithmically reasonable but awfully complex. I don't think
that's a good path to take.

I still only see two viable approaches: 

(1) Spill tuples into hash partitions: works and is a reasonably simple
extension of our existing code. This is basically what the last patch I
posted does (posted before grouping sets, so needs to be updated).

(2) Spill tuples into a sort: I like this approach because it can be
done simply (potentially less code than #1), and could be further
improved without adding a ton of complexity. It may even eliminate the
need for the planner to choose between hashagg and sort. The problem
here is data types that have a hash opclass but no btree opclass. I
might try to sidestep this problem by saying that data types with no
btree opclass will not obey work_mem.

Additionally, there are two other ideas that we might want to do
regardless of which approach we take:

* support evicting transition states from the hash table, so that we
can handle clustered input better

* refactor grouping sets so that phases are separate executor nodes so
that we can undo some of the complexity in nodeAgg.c

Regards,
Jeff Davis




Re: Spilling hashed SetOps and aggregates to disk

2018-06-21 Thread David Gershuni
On Jun 21, 2018, at 1:04 PM, Jeff Davis  wrote:
> On Thu, 2018-06-21 at 11:04 -0700, David Gershuni wrote:
>> This approach seems functionally correct, but I don't like the idea
>> of
>> transforming O(N) tuples of disk I/O into O(S*N) tuples of disk I/O
>> (in the worst case).
> 
> It's the same amount of I/O as the idea you suggested as putting the
> hash tables into separate phases, but it may consume more disk space at
> once in the worst case.
> 
> We can mitigate that if it becomes a problem later.

That’s true. Both fall-back approaches would suffer from high I/O.
However, the HashSort-based would offer much better performance,
assuming the sorting issue is overcome.


>> b) is accomplished by not sorting groups by their actual grouping
>> keys, but
>> instead sorting by their hash values. This works because we don't
>> need a 
>> true sort. We just need to process identical groups in a consistent
>> order
>> during the merge step. As long as we're careful about collisions
>> during the
>> merge, this should work.
> 
> Can you expand on how we should handle collisions? If all we can do is
> hash and compare equality, then it seems complex to draw group
> boundaries.

In the HashSort algorithm, the sort order is only used as a tool in the
merge step. In the merge step, we will process one grouping set
at a time.

By sorting before merging, we ensure that we’ll encounter
all partial aggregations of a group at once by looking at the head of each
spill file. This allows us to maintain a ‘current group’ and combine all
instances of it from each spill file. Then when no more instances of that
group are present, we emit the group as output. So the order can be
anything, as long as it’s consistent.

To handle hash collisions, we can do the following:

1) We track the current hash code we’re processing, in ascending
order.

2) Instead of combining one group at at time, we’ll maintain a list of
all groups we’ve seen that match the current hash code.

3) When we peak at a spill file, if its next group’s hash code matches
our current hash code, we’ll check to see if that group matches any of the
groups in our list. If so, we’ll combine it with the matching group. If not,
we’ll add this group to our list.

4) Once the head of each spill file exceeds the current hash code, we’ll
emit our list as output, empty it, and advance to the next hash code.

Does that seem reasonable?

Best,
David
Salesforce

Re: Spilling hashed SetOps and aggregates to disk

2018-06-21 Thread Jeff Davis
On Thu, 2018-06-21 at 11:04 -0700, David Gershuni wrote:
> This approach seems functionally correct, but I don't like the idea
> of
> transforming O(N) tuples of disk I/O into O(S*N) tuples of disk I/O
> (in the worst case).

It's the same amount of I/O as the idea you suggested as putting the
hash tables into separate phases, but it may consume more disk space at
once in the worst case.

We can mitigate that if it becomes a problem later.

> b) is accomplished by not sorting groups by their actual grouping
> keys, but
> instead sorting by their hash values. This works because we don't
> need a 
> true sort. We just need to process identical groups in a consistent
> order
> during the merge step. As long as we're careful about collisions
> during the
> merge, this should work.

Can you expand on how we should handle collisions? If all we can do is
hash and compare equality, then it seems complex to draw group
boundaries.

Regards,
Jeff Davis




Re: Spilling hashed SetOps and aggregates to disk

2018-06-21 Thread David Gershuni


> On Jun 19, 2018, at 10:36 PM, Jeff Davis  wrote:
> 
> But I am worried that I am missing something, because it appears that
> for AGG_MIXED, we wait until the end of the last phase to emit the
> contents of the hash tables. Wouldn't they be complete after the first
> phase?

You're right. They're complete after the first phase phase, but I believe
they're not emitted until the end in order to minimize the complexity of
the code. I don't think the possibility of exceeding work_mem was taken
into account when this decision was made.


> I was imagining that (in the simplest approach) each hash table would
> have its own set of spilled tuples. If grouping set A can be advanced
> (because it's present in the hash table) but B cannot (because it's not
> in the hash table and we are at the memory limit), then it goes into
> the spill file for B but not A. That means that A is still finished
> after the first pass.
...
> I realize that this simple approach could be inefficient if multiple
> hash tables are spilling because we'd duplicate the spilled tuples. But
> let me know if it won't work at all.

This approach seems functionally correct, but I don't like the idea of
transforming O(N) tuples of disk I/O into O(S*N) tuples of disk I/O
(in the worst case). As of yesterday, I think I have a better approach
that will work even for non-sortable grouping keys. (See below).


> I am having a hard time trying to satisfy all of the constraints that
> have been raised so far:
> 
> * handle data types with hash ops but no btree ops
> * handle many different group size distributions and input orders
> efficiently
> * avoid regressions or inefficiencies for grouping sets
> * handle variable-size (particularly O(n)-space) transition states
> 
> without taking on a lot of complexity. I like to work toward the right
> design, but I think simplicity also factors in to the right design.
> NodeAgg.c is already one of the larger and more complicated files that
> we have.

Yes, there are a lot of moving pieces here! And it will be non-trivial to
satisfy them all with one approach. I'm working on a new aggstrategy
that could be used in conjunction with AGG_SORT. This approach could
replace the current AGG_HASH and would satisfy all of the requirements
you mentioned for aggregates that have serial/deserial functions. 

My approach is based on the HashSort algorithm that I mentioned before
but with new modifications to a) handle multiple grouping sets and b) not
require btree ops on the grouping keys.

a) is accomplished by hashing all grouping sets simultaneously and putting
each set in its own memory context. Whenever work_mem is exceeded, we
sort and spill the hash table with the largest memory context.

b) is accomplished by not sorting groups by their actual grouping keys, but
instead sorting by their hash values. This works because we don't need a 
true sort. We just need to process identical groups in a consistent order
during the merge step. As long as we're careful about collisions during the
merge, this should work.

> A lot of the complexity we already have is that grouping sets are
> performed in one giant executor node rather than exposing the
> individual phases as separate executor nodes. I suppose the reason
> (correct me if I'm wrong) is that there are two outputs from a phase:
> the aggregated groups, and the original input tuples in a new sort
> order. That seems solvable -- the executor passes bitmaps around until
> BitmapHeapScan turns them into tuples.

That is an interesting idea, but it's somewhat orthogonal to spill 
implementation.
Still, it could be a nice way to decompose aggregate nodes that require multiple
phases/strategies. I'm curious to hear what others think.

Best,
David


Re: Spilling hashed SetOps and aggregates to disk

2018-06-19 Thread Jeff Davis
On Fri, 2018-06-15 at 12:30 -0700, David Gershuni wrote:
> For example, imagine a tuple that belongs to a group G in grouping
> set A had to be
> spilled because it also belonged to an evicted group from grouping
> set B. Then group
> G would remain in set A’s hash table at the end of the phase, but it
> wouldn’t have
> aggregated the values from the spilled tuple. Of course, work-arounds 
> are possible,
> but the current approach would not work as is.

I was imagining that (in the simplest approach) each hash table would
have its own set of spilled tuples. If grouping set A can be advanced
(because it's present in the hash table) but B cannot (because it's not
in the hash table and we are at the memory limit), then it goes into
the spill file for B but not A. That means that A is still finished
after the first pass.

But I am worried that I am missing something, because it appears that
for AGG_MIXED, we wait until the end of the last phase to emit the
contents of the hash tables. Wouldn't they be complete after the first
phase?

I realize that this simple approach could be inefficient if multiple
hash tables are spilling because we'd duplicate the spilled tuples. But
let me know if it won't work at all.

> But as you mentioned, this solution relies on all grouping keys being
> sortable, so we
> would need a fallback plan. For this, we could use your hash-based
> approach, but we
> would have to make modifications. One idea would be to split each
> grouping set into its
> own aggregate phase, so that only one hash table is in memory at a
> time. This would
> eliminate the performance benefits of grouping sets when keys aren’t
> sortable, but at
> least they would still work. 

I am having a hard time trying to satisfy all of the constraints that
have been raised so far:

* handle data types with hash ops but no btree ops
* handle many different group size distributions and input orders
efficiently
* avoid regressions or inefficiencies for grouping sets
* handle variable-size (particularly O(n)-space) transition states

without taking on a lot of complexity. I like to work toward the right
design, but I think simplicity also factors in to the right design.
NodeAgg.c is already one of the larger and more complicated files that
we have.

A lot of the complexity we already have is that grouping sets are
performed in one giant executor node rather than exposing the
individual phases as separate executor nodes. I suppose the reason
(correct me if I'm wrong) is that there are two outputs from a phase:
the aggregated groups, and the original input tuples in a new sort
order. That seems solvable -- the executor passes bitmaps around until
BitmapHeapScan turns them into tuples.

Regards,
Jeff Davis




Re: Spilling hashed SetOps and aggregates to disk

2018-06-15 Thread David Gershuni


> On Jun 13, 2018, at 12:53 PM, Jeff Davis  wrote:
> 
>> 
>> An adaptive hash agg node would start as a hash agg, and turn into a
>> "partial hash agg + sort + final group agg" when OOM is detected.
>> 
>> The benefit over ordinary sort+group agg is that the sort is
>> happening
>> on a potentially much smaller data set. When the buffer is large
>> enough to capture enough key repetitions, the output of the partial
>> hash agg can be orders of magnitude smaller than the scan.
>> 
>> My proposal was to use this for all group aggs, not only when the
>> planner chooses a hash agg, because it can speed up the sort and
>> reduce temp storage considerably. I guess the trick lies in
>> estimating
>> that cardinality reduction to avoid applying this when there's no
>> hope
>> of getting a payoff. The overhead of such a lazy hash table isn't
>> much, really. But yes, its cache locality is terrible, so it needs to
>> be considered.
> 
> I think this is a promising approach because it means the planner has
> one less decion to make. And the planner doesn't have great information
> to make its decision on, anyway (ndistinct estimates are hard enough on
> plain tables, and all bets are off a few levels up in the plan).
> 
> Unfortunately, it means that either all data types must support hashing
> and sorting[1], or we need to have a fallback path that can get by with
> hashing alone (which would not be tested nearly as well as more typical
> paths).

Jeff, I agree with you, but we should also take grouping sets into consideration
because they can cause the executor to create multiple hash tables in memory
simultaneously, each growing at a different rate. Not only does this make 
eviction
more complicated, but it actually prevents your original approach from working
because it violates the assumption that all entries left in the hash table at 
the end
of a phase are complete and can be flushed to output. 

For example, imagine a tuple that belongs to a group G in grouping set A had to 
be
spilled because it also belonged to an evicted group from grouping set B. Then 
group
G would remain in set A’s hash table at the end of the phase, but it wouldn’t 
have
aggregated the values from the spilled tuple. Of course, work-arounds are 
possible,
but the current approach would not work as is.

Claudio’s proposal mostly solves this problem. In fact, his proposal is very 
similar to
the HashSort algorithm from [1] that I mentioned. We would still need to think 
about
how to choose which hash table to evict from. For example, we could evict a 
group
from the largest hash table (if we tracked memory usage independently for each 
one). 

But as you mentioned, this solution relies on all grouping keys being sortable, 
so we
would need a fallback plan. For this, we could use your hash-based approach, 
but we
would have to make modifications. One idea would be to split each grouping set 
into its
own aggregate phase, so that only one hash table is in memory at a time. This 
would
eliminate the performance benefits of grouping sets when keys aren’t sortable, 
but at
least they would still work. 

Best,
David
Salesforce

[1] Revisiting Aggregation for Data Intensive Applications: A Performance Study
https://arxiv.org/pdf/1311.0059.pdf





Re: Spilling hashed SetOps and aggregates to disk

2018-06-13 Thread Jeff Davis
On Wed, 2018-06-13 at 11:50 -0300, Claudio Freire wrote:
> In essence, the technique I've been using always uses a fixed-size
> hash table to do partial grouping. The table is never grown, when
> collisions do happen, the existing entry in the hash table is flushed
> to disk and the aggregate state in that bucket reset for the incoming
> key. To avoid excessive spilling due to frequent collisions, we use a
> kind of "lazy cuckoo" hash table. Lazy in the sense that it does no
> relocations, it just checks 2 hash values, and if it has to evict, it
> evicts the "oldest" value (with some cheap definition of "old").

...

> An adaptive hash agg node would start as a hash agg, and turn into a
> "partial hash agg + sort + final group agg" when OOM is detected.
> 
> The benefit over ordinary sort+group agg is that the sort is
> happening
> on a potentially much smaller data set. When the buffer is large
> enough to capture enough key repetitions, the output of the partial
> hash agg can be orders of magnitude smaller than the scan.
> 
> My proposal was to use this for all group aggs, not only when the
> planner chooses a hash agg, because it can speed up the sort and
> reduce temp storage considerably. I guess the trick lies in
> estimating
> that cardinality reduction to avoid applying this when there's no
> hope
> of getting a payoff. The overhead of such a lazy hash table isn't
> much, really. But yes, its cache locality is terrible, so it needs to
> be considered.

I think this is a promising approach because it means the planner has
one less decion to make. And the planner doesn't have great information
to make its decision on, anyway (ndistinct estimates are hard enough on
plain tables, and all bets are off a few levels up in the plan).

Unfortunately, it means that either all data types must support hashing
and sorting[1], or we need to have a fallback path that can get by with
hashing alone (which would not be tested nearly as well as more typical
paths).

I still like this idea, though.

Regards,
Jeff Davis


[1] https://www.postgresql.org/message-id/9584.1528739975%40sss.pgh.pa.
us




Re: Spilling hashed SetOps and aggregates to disk

2018-06-13 Thread Jeff Davis
On Wed, 2018-06-13 at 10:08 -0400, Robert Haas wrote:
> I wasn't using the term "average" in a mathematical sense.  I suppose
> I really meant "typical".  I agree that thinking about cases where
> the
> group size is nonuniform is a good idea, but I don't think I agree
> that all uniform distributions are created equal.  Uniform
> distributions with 1 row per group are very different than uniform
> distributions with 1000 rows per group.

The only mechanism we have for dealing efficiently with skewed groups
is hashing; no alternative has been proposed.

I think what you are saying is that the case of medium-large groups
with clustered input are kind of like skewed groups because they have
enough locality to benefit from grouping before spilling. I can see
that.

So how do we handle both of these cases (skewed groups and clustered
groups) well?

I tend toward a simple approach, which is to just put whatever we can
in the hash table. Once it's full, if the hit rate on the hash table
(the whole table) drops below a certain threshold, we just dump all the
transition states out to disk and empty the hash table.

That would handle both skewed groups and clustering fairly well.
Clustering would cause the hit rate to rapidly go to zero when we are
past the current batch of groups, causing fairly quick switching to new
groups which should handle Tomas's case[1]. And it would also handle
ordinary skewed groups fairly well -- it may write them out sometimes
(depending on how smart our eviction strategy is), but the cost of that
is not necessarily bad because it will go back into the hash table
quickly.

It also offers an incremental implementation strategy: something
resembling my patch could be first (which doesn't dump transition
states at all), followed by something that can dump transition states,
followed by some tweaking to make it smarter.

That still leaves the question about what to do with the small groups:
partition them (like my patch does) or feed them into a sort and group
them?

Regards,
Jeff Davis

[1] https://www.postgresql.org/message-id/46734151-3245-54ac-76fc-17742
fb0e6d9%402ndquadrant.com



Re: Spilling hashed SetOps and aggregates to disk

2018-06-13 Thread Claudio Freire
On Tue, Jun 5, 2018 at 5:05 AM Tomas Vondra
 wrote:
>
> On 06/05/2018 07:46 AM, Jeff Davis wrote:
> > On Tue, 2018-06-05 at 07:04 +0200, Tomas Vondra wrote:
> >> I expect the eviction strategy to be the primary design challenge of
> >> this patch. The other bits will be mostly determined by this one
> >> piece.
> >
> > Not sure I agree that this is the primary challenge.
> >
> > The cases that benefit from eviction are probably a minority. I see two
> > categories that would benefit:
> >
> >* Natural clustering in the heap. This sounds fairly common, but a
> > lot of the cases that come to mind are too low-cardinality to be
> > compelling; e.g. timestamps grouped by hour/day/month. If someone has
> > run into a high-cardinality natural grouping case, let me know.
> >* ARRAY_AGG (or similar): individual state values can be large enough
> > that we need to evict to avoid exceeding work_mem even if not adding
> > any new groups.
> >
> > In either case, it seems like a fairly simple eviction strategy would
> > work. For instance, we could just evict the entire hash table if
> > work_mem is exceeded or if the hit rate on the hash table falls below a
> > certain threshold. If there was really something important that should
> > have stayed in the hash table, it will go back in soon anyway.
> >
> > So why should eviction be a major driver for the entire design? I agree
> > it should be an area of improvement for the future, so let me know if
> > you see a major problem, but I haven't been as focused on eviction.
> >
>
> My concern is more about what happens when the input tuple ordering is
> inherently incompatible with the eviction strategy, greatly increasing
> the amount of data written to disk during evictions.
>
> Say for example that we can fit 1000 groups into work_mem, and that
> there are 2000 groups in the input data set. If the input is correlated
> with the groups, everything is peachy because we'll evict the first
> batch, and then group the remaining 1000 groups (or something like
> that). But if the input data is random (which can easily happen, e.g.
> for IP addresses, UUIDs and such) we'll hit the limit repeatedly and
> will evict much sooner.
>
> I know you suggested simply dumping the whole hash table and starting
> from scratch while we talked about this at pgcon, but ISTM it has
> exactly this issue.
>
> But I don't know if there actually is a better option - maybe we simply
> have to accept this problem. After all, running slowly is still better
> than OOM (which may or may not happen now).
>
> I wonder if we can somehow detect this at run-time and maybe fall-back
> to groupagg. E.g. we could compare number of groups vs. number of input
> tuples when we first hit the limit. It's a rough heuristics, but maybe
> sufficient.

I've been applying a strategy like that to do massive streaming
aggregation quite successfully.

The code I have is in python and in a private repo. There have been
talks of both opensourcing it, and of porting it into postgres as a
kind of aggregate node, because it sounds like a good idea. It seems
very related to this thread.

In essence, the technique I've been using always uses a fixed-size
hash table to do partial grouping. The table is never grown, when
collisions do happen, the existing entry in the hash table is flushed
to disk and the aggregate state in that bucket reset for the incoming
key. To avoid excessive spilling due to frequent collisions, we use a
kind of "lazy cuckoo" hash table. Lazy in the sense that it does no
relocations, it just checks 2 hash values, and if it has to evict, it
evicts the "oldest" value (with some cheap definition of "old").

The technique works very well to reduce temporary data size with a
fixed amount of working memory. The resulting spill file can then be
processed again to finalize the computation.

What I was pondering PG could do, is feed the spilled tuples to a sort
node, using the partial hash aggregation as a data-reducing node only.

scan -> partial hash agg -> sort -> final group agg

The group agg would have to know to consume and combine aggregate
states instead of producing them, but in essence it seems a relatively
efficient process.

An adaptive hash agg node would start as a hash agg, and turn into a
"partial hash agg + sort + final group agg" when OOM is detected.

The benefit over ordinary sort+group agg is that the sort is happening
on a potentially much smaller data set. When the buffer is large
enough to capture enough key repetitions, the output of the partial
hash agg can be orders of magnitude smaller than the scan.

My proposal was to use this for all group aggs, not only when the
planner chooses a hash agg, because it can speed up the sort and
reduce temp storage considerably. I guess the trick lies in estimating
that cardinality reduction to avoid applying this when there's no hope
of getting a payoff. The overhead of such a lazy hash table isn't
much, really. But yes, its cache locality is 

Re: Spilling hashed SetOps and aggregates to disk

2018-06-13 Thread Robert Haas
On Mon, Jun 11, 2018 at 1:50 PM, Jeff Davis  wrote:
> I think average group size is the wrong way to look at it, because
> averages are only useful for a normal distribution. The two most
> interesting cases are:
>
> 1. Fairly uniform group size (e.g. normal dist)
> 2. Skew values, power law distribution, etc., where some groups are
> very large and most are tiny by comparison. I am calling the very large
> groups "skewed groups".

I wasn't using the term "average" in a mathematical sense.  I suppose
I really meant "typical".  I agree that thinking about cases where the
group size is nonuniform is a good idea, but I don't think I agree
that all uniform distributions are created equal.  Uniform
distributions with 1 row per group are very different than uniform
distributions with 1000 rows per group.

> If we get the skewed groups into the hash table, and avoid OOM, I think
> that is a success. My patch did that, except it didn't account for two
> cases:
>
>   (a) clustered input, where skewed groups may not appear until the
> hash table is already full; and
>   (b) variable-sized transition values that grow with the group size

I think that many of the algorithms under consideration could be
implemented without worrying about variable-sized transition values,
and then the approach could be extended later.  However, whether or
not a given algorithm can handle clustered input seems like a fairly
basic property of the algorithm.  I don't think we should set the bar
too high because no algorithm is going to be perfect in every case; at
the same time, clustered input is pretty common in real-world
scenarios, and an algorithm that handles such cases better has a
significant leg up over one that can't, all other things being equal.
I'm not sure I remember precisely what your proposal was any more.

-- 
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company



Re: Spilling hashed SetOps and aggregates to disk

2018-06-11 Thread Tomas Vondra
On 06/11/2018 08:13 PM, Jeff Davis wrote:
> On Mon, 2018-06-11 at 19:33 +0200, Tomas Vondra wrote:
>> For example we hit the work_mem limit after processing 10% tuples,
>>  switching to sort would mean spill+sort of 900GB of data. Or we 
>> might say - hmm, we're 10% through, so we expect hitting the limit
>> 10x, so let's spill the hash table and then do sort on that,
>> writing and sorting only 10GB of data. (Or merging it in some
>> hash-based way, per Robert's earlier message.)
> 
> Your example depends on large groups and a high degree of group
> clustering. That's fine, but it's a special case,
> 

True, it's a special case and it won't work for other cases. It was
merely an example for Andres.

OTOH it's not entirely unrealistic, I think. Consider something like

  SELECT
extract(year from ts) AS y,
extract(month from ts) AS m,
extract(day from ts) AS d,
string_agg(x),
array_agg(y)
  FROM fact_table
  GROUP BY y, m, d;

which is likely very correlated (assuming new data is appended to the
table), and the string_agg/array_agg are likely to produce fairly large
groups (about proportional to the number of tuples in the group).

Another example might be about HLL aggregate, although in that case the
transition state does not grow, so it may not be that bad (and the
default estimate of 1kB would work pretty nicely). But there certainly
are other aggregates with large transition state, where this might not
be the case, and we currently have no way to communicate that to the
planner - except for setting work_mem much lower :-/

However, I now realize I've ignored the fact that we typically don't
sort the whole table but only a very few columns, so the example was not
entirely fair - we would not sort the whole remaining 900GB but likely
much less.

> and complexity does have a cost, too.

Sure.


regards

-- 
Tomas Vondra  http://www.2ndQuadrant.com
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services



Re: Spilling hashed SetOps and aggregates to disk

2018-06-11 Thread Tom Lane
Jeff Davis  writes:
> Also, I am not sure we should really be designing around data types
> where it makes sense to group and then don't supply a btree opclass.
> Seems like they are likely to hit a problem soon anyway.

It's not that unreasonable to have a hash opclass and no btree opclass;
the datatype might not have a natural linear ordering.

But in any case, I think Robert's point was that he'd prefer to avoid
having a poorly-tested special-case code path for that situation, which
seems like a good idea independently of performance considerations.

regards, tom lane



Re: Spilling hashed SetOps and aggregates to disk

2018-06-11 Thread Jeff Davis
On Mon, 2018-06-11 at 11:55 -0400, Robert Haas wrote:
> performance degradation is not necessarily much better than OOM.  I
> suspect that part of the reason why both Andres and David proposed
> algorithms that combined hashing and sorting is out of a desire to
> cater somewhat to both few-tuples-per-group and many-tuples-per-
> group.
> 

I think average group size is the wrong way to look at it, because
averages are only useful for a normal distribution. The two most
interesting cases are:

1. Fairly uniform group size (e.g. normal dist)
2. Skew values, power law distribution, etc., where some groups are
very large and most are tiny by comparison. I am calling the very large
groups "skewed groups".

For #1, hashing and sorting are both reasonable, and it depends on a
lot of factors (number of groups, clustering, available memory).

For #2, hashing is really good and sorting is really bad. That's
because (at present) we sort all of the tuples before doing any
aggregation, so it expends a lot of effort on the skewed groups.
Hashing can keep skewed groups in memory and avoid spilling a large
fraction of the input tuples at all.

If we get the skewed groups into the hash table, and avoid OOM, I think
that is a success. My patch did that, except it didn't account for two
cases:

  (a) clustered input, where skewed groups may not appear until the
hash table is already full; and
  (b) variable-sized transition values that grow with the group size

> One big advantage to just partitioning the input tuples by hash code
> and then proceeding batch by batch is that it works for any aggregate
> that can support hash aggregation in the first place.  You do not

Agreed, but I think we should evaluate Andres's idea of feeding spilled
tuples to a Sort, because the overall complexity might be lower even
accounting for the special cases you're worried about.

Also, I am not sure we should really be designing around data types
where it makes sense to group and then don't supply a btree opclass.
Seems like they are likely to hit a problem soon anyway.

Regards,
Jeff Davis





Re: Spilling hashed SetOps and aggregates to disk

2018-06-11 Thread Tomas Vondra




On 06/11/2018 07:14 PM, Andres Freund wrote:

Hi,

On 2018-06-11 17:29:52 +0200, Tomas Vondra wrote:

It would be great to get something that performs better than just falling
back to sort (and I was advocating for that), but I'm worried we might be
moving the goalposts way too far.


I'm unclear on why that'd have that bad performance in relevant
cases. You're not going to hit the path unless the number of groups is
pretty large (or work_mem is ridiculously small, in which case we don't
care). With a large number of groups the sorting path isn't particularly
inefficient, because repeatedly storing the input values isn't such a
large fraction in comparison to the number of groups (and their
transition values).  Which scenarios are you concerned about?



Say you have a 1TB table, and keeping the groups in memory would require 
 work_mem=2GB. After hitting the work_mem limit, there may be pretty 
large amount of tuples you'd have to spill to disk and sort.


For example we hit the work_mem limit after processing 10% tuples, 
switching to sort would mean spill+sort of 900GB of data. Or we might 
say - hmm, we're 10% through, so we expect hitting the limit 10x, so 
let's spill the hash table and then do sort on that, writing and sorting 
only 10GB of data. (Or merging it in some hash-based way, per Robert's 
earlier message.)


I don't quite understand the argument that the number of groups needs to 
be pretty large for us to hit this. So what if the groups are 2x or 10x 
more than work_mem? It can still be cheaper than switching to sort-based 
approach, no?



regards

--
Tomas Vondra  http://www.2ndQuadrant.com
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services



Re: Spilling hashed SetOps and aggregates to disk

2018-06-11 Thread Andres Freund
Hi,

On 2018-06-11 17:29:52 +0200, Tomas Vondra wrote:
> It would be great to get something that performs better than just falling
> back to sort (and I was advocating for that), but I'm worried we might be
> moving the goalposts way too far.

I'm unclear on why that'd have that bad performance in relevant
cases. You're not going to hit the path unless the number of groups is
pretty large (or work_mem is ridiculously small, in which case we don't
care). With a large number of groups the sorting path isn't particularly
inefficient, because repeatedly storing the input values isn't such a
large fraction in comparison to the number of groups (and their
transition values).  Which scenarios are you concerned about?

Greetings,

Andres Freund



Re: Spilling hashed SetOps and aggregates to disk

2018-06-11 Thread Robert Haas
On Mon, Jun 11, 2018 at 11:29 AM, Tomas Vondra
 wrote:
> I think the underlying question is whether we want to treat this as an
> emergency safety against OOM (which should be a rare thing in most cases) or
> something allowing us to pick hash aggregate more often.
>
> It would be great to get something that performs better than just falling
> back to sort (and I was advocating for that), but I'm worried we might be
> moving the goalposts way too far.

Well, the scope of the first patch is always negotiable, but I think
we should try to think beyond the first patch in design discussions,
so that we don't paint ourselves into a corner.  I also think it's
worth noting that an emergency safety which also causes a 10x
performance degradation is not necessarily much better than OOM.  I
suspect that part of the reason why both Andres and David proposed
algorithms that combined hashing and sorting is out of a desire to
cater somewhat to both few-tuples-per-group and many-tuples-per-group.
AFAICS, Andres's algorithm will be better for few-tuples-per-group
(maybe with a few large groups mixed in) and David's algorithm will be
better for many-tuples-per-group (maybe with some small groups mixed
in), but in each case the inclusion of a sorting component seems like
it will ease the pain in the opposite use case.  However, I wonder
whether it will be better still to just acknowledge that those two
cases really need separate algorithms and then think about the best
approach for each one individually.

One big advantage to just partitioning the input tuples by hash code
and then proceeding batch by batch is that it works for any aggregate
that can support hash aggregation in the first place.  You do not need
a btree opclass.  You do not need support for serialize/deserialize or
combine.  If you have serialize/deserialize, it's better, because now
you can spill growing transition values out of the hash table on the
fly.  But the basic contours of the algorithm don't change even if
such support is lacking.  That's a significant advantage in my view,
because with some of these more complex strategies, we'll end up with
code paths for data types lacking the necessary support which are
almost never exercised in practice.  That's likely to lead to bugs.

-- 
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company



Re: Spilling hashed SetOps and aggregates to disk

2018-06-11 Thread Tomas Vondra

On 06/11/2018 04:25 PM, Robert Haas wrote:



> ...


Maybe that's not exactly what Tomas (or you) meant by eviction
strategy, though.  Not sure.  But it does seem to me that we need to
pick the algorithm we're going to use, more or less, in order to
decide what infrastructure we need, and at least to some extent, we
ought to let our choice of algorithm be informed by the desire not to
need too much infrastructure.



I was using eviction strategy in a somewhat narrower sense - pretty much 
just "Which groups to evict from the hash table?" but you're right the 
question is more general and depends on which scheme we end up using 
(just hashagg, hash+sort, something else ...)


regards

--
Tomas Vondra  http://www.2ndQuadrant.com
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services



Re: Spilling hashed SetOps and aggregates to disk

2018-06-11 Thread Tomas Vondra




On 06/11/2018 05:16 PM, Robert Haas wrote:

On Wed, Jun 6, 2018 at 8:16 PM, Tomas Vondra
 wrote:

... and this is pretty much what Jeff Davis suggested, I think. The
trouble is, each of those cases behaves nicely/terribly in different
corner cases.


That's a really good point.  If the number of groups is pretty small
compared to the number of input tuples, then you really only ever want
to dump out transition values.  By doing so, you minimize the amount
of data you have to write.  But if the number of groups is, say, half
the number of input tuples, then computing transition values before
you have all the values that belong to that group is probably a waste
of time.  I wonder if we could decide what to do by comparing the
number of input tuples consumed to the number of groups created at the
time we run out of memory.  If we've got less than, I dunno, five
tuples per group, then assume most groups are small.  Pick a strategy
that (mainly) spools input tuples.  Otherwise, pick a strategy that
spools transition tuples.

In either case, it seems like we can pick a pure hashing strategy or
switch to using both hashing and sorting.  For example, IIUC, Andres's
proposal involves spooling mostly input tuples, but can also spool
transition tuples if the transition values consume more memory as they
absorb more tuples.  However, you could do a similar kind of thing
without needing sort support.  When you see a value that's not doesn't
fit in your in-memory hash table, use the hash code to route it to 1
of N batch files.  Have a second set of batch files for transition
tuples in case you need to kick things out of the in-memory hash
table.  Once you finish reading the input, emit all the values that
remain in the in-memory hash table and then process each batch file
separately.

Similarly, David's strategy involves spooling only transition tuples
and then sorting on the group key, but it's again possible to do
something similar without relying on sorting.  Instead of flushing the
in-memory hash table to a tuple store, split the transition tuples it
contains among N batch files based on the hash code.  Once you've read
all of the input, go back and reprocess each batch file, combining
transition values whenever the same group keys appear in more than one
transition tuple.



Yeah, essentially something like the batching in hash joins.


To me, the pure-hashing strategies look a little simpler, but maybe
there's enough performance benefit from combining hashing and sorting
that it's worth the complexity, or maybe we should just accept
whichever variant somebody's willing to code.  But I think we almost
have to have separate handling for many-row-per-group and
few-rows-per-group, because those seem fundamentally different.



I think the underlying question is whether we want to treat this as an 
emergency safety against OOM (which should be a rare thing in most 
cases) or something allowing us to pick hash aggregate more often.


It would be great to get something that performs better than just 
falling back to sort (and I was advocating for that), but I'm worried we 
might be moving the goalposts way too far.


regards

--
Tomas Vondra  http://www.2ndQuadrant.com
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services



Re: Spilling hashed SetOps and aggregates to disk

2018-06-11 Thread Robert Haas
On Wed, Jun 6, 2018 at 8:16 PM, Tomas Vondra
 wrote:
> ... and this is pretty much what Jeff Davis suggested, I think. The
> trouble is, each of those cases behaves nicely/terribly in different
> corner cases.

That's a really good point.  If the number of groups is pretty small
compared to the number of input tuples, then you really only ever want
to dump out transition values.  By doing so, you minimize the amount
of data you have to write.  But if the number of groups is, say, half
the number of input tuples, then computing transition values before
you have all the values that belong to that group is probably a waste
of time.  I wonder if we could decide what to do by comparing the
number of input tuples consumed to the number of groups created at the
time we run out of memory.  If we've got less than, I dunno, five
tuples per group, then assume most groups are small.  Pick a strategy
that (mainly) spools input tuples.  Otherwise, pick a strategy that
spools transition tuples.

In either case, it seems like we can pick a pure hashing strategy or
switch to using both hashing and sorting.  For example, IIUC, Andres's
proposal involves spooling mostly input tuples, but can also spool
transition tuples if the transition values consume more memory as they
absorb more tuples.  However, you could do a similar kind of thing
without needing sort support.  When you see a value that's not doesn't
fit in your in-memory hash table, use the hash code to route it to 1
of N batch files.  Have a second set of batch files for transition
tuples in case you need to kick things out of the in-memory hash
table.  Once you finish reading the input, emit all the values that
remain in the in-memory hash table and then process each batch file
separately.

Similarly, David's strategy involves spooling only transition tuples
and then sorting on the group key, but it's again possible to do
something similar without relying on sorting.  Instead of flushing the
in-memory hash table to a tuple store, split the transition tuples it
contains among N batch files based on the hash code.  Once you've read
all of the input, go back and reprocess each batch file, combining
transition values whenever the same group keys appear in more than one
transition tuple.

To me, the pure-hashing strategies look a little simpler, but maybe
there's enough performance benefit from combining hashing and sorting
that it's worth the complexity, or maybe we should just accept
whichever variant somebody's willing to code.  But I think we almost
have to have separate handling for many-row-per-group and
few-rows-per-group, because those seem fundamentally different.

-- 
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company



Re: Spilling hashed SetOps and aggregates to disk

2018-06-11 Thread Robert Haas
On Tue, Jun 5, 2018 at 1:46 AM, Jeff Davis  wrote:
> On Tue, 2018-06-05 at 07:04 +0200, Tomas Vondra wrote:
>> I expect the eviction strategy to be the primary design challenge of
>> this patch. The other bits will be mostly determined by this one
>> piece.
>
> Not sure I agree that this is the primary challenge.
>
> The cases that benefit from eviction are probably a minority. I see two
> categories that would benefit:
>
>   * Natural clustering in the heap. This sounds fairly common, but a
> lot of the cases that come to mind are too low-cardinality to be
> compelling; e.g. timestamps grouped by hour/day/month. If someone has
> run into a high-cardinality natural grouping case, let me know.
>   * ARRAY_AGG (or similar): individual state values can be large enough
> that we need to evict to avoid exceeding work_mem even if not adding
> any new groups.
>
> In either case, it seems like a fairly simple eviction strategy would
> work. For instance, we could just evict the entire hash table if
> work_mem is exceeded or if the hit rate on the hash table falls below a
> certain threshold. If there was really something important that should
> have stayed in the hash table, it will go back in soon anyway.
>
> So why should eviction be a major driver for the entire design? I agree
> it should be an area of improvement for the future, so let me know if
> you see a major problem, but I haven't been as focused on eviction.

Hmm, I think the eviction strategy is pretty important.  For example,
as we discussed in the hallway at PGCon, if we're flushing the whole
hash table (or one of several hash tables), we can account for memory
usage chunk-by-chunk instead of palloc-by-palloc, which as you pointed
out is more accurate and substantially less expensive.  If we are
evicting individual tuples, though, it's quite possible that evicting
even a large number of tuples might not free up any chunks, so we'd
probably have to do accounting palloc-by-palloc.  Working harder to
get a less accurate answer isn't real appealing even if the absolute
overhead is low.

As Andres mentioned, it also matters what kind of thing we evict.  If
we push raw input tuples to a spool file in lieu of creating new
groups and deal with those in a second pass with a new hash table, we
don't need anything extra at all.  If the second pass uses sort +
group, we need the aggregates to be both hashable and sortable.  If we
evict transition tuples rather than input tuples, we need
serialization + deserialization functions.  If we do that in a way
that might create multiple transition values for the same combination
of grouping values, then we also need combine functions.

Maybe that's not exactly what Tomas (or you) meant by eviction
strategy, though.  Not sure.  But it does seem to me that we need to
pick the algorithm we're going to use, more or less, in order to
decide what infrastructure we need, and at least to some extent, we
ought to let our choice of algorithm be informed by the desire not to
need too much infrastructure.

-- 
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company



Re: Spilling hashed SetOps and aggregates to disk

2018-06-07 Thread David Gershuni
As Serge mentioned, we’ve implemented spill-to-disk for SetOps and Aggregates 
at Salesforce. We were hitting OOMs often enough that this became a high 
priority for us. However, our current spill implementation is based on dynahash 
from 9.6, and we’re not happy with its performance (it was primarily an OOM 
stop-gap and was not focused on optimizing performance).  

Because of this, we’ve spec’d out a new spill-to-disk design for hash-based 
aggregates (and later SetOps), and we plan to begin implementation very soon.  
Since this is an important fix for the community as well, we would be happy 
(and would prefer) to share our spill-to-disk implementation. 

Our new spill approach is very similar to Jeff’s and Heikki’s and is designed 
to use simplehash.h. It’s based on an algorithm called “HybridHash with 
Pre-Partitioning” found in [1]. It may later make sense to implement the 
“HashSort” Algorithm from [1] as well, which works better for highly skewed 
grouping keys. The optimizer could eventually choose between the two based on 
the stats available. We also like Heikki’s suggestions to use logtape.c to 
reduce disk usage and a trie-based approach to control the size of partitions 
dynamically.

We’ve also been grappling with how to handle the implementation challenges 
pointed out in this thread. These include:
• tracking memory usage
• choosing a smart eviction policy (which is touched on in [2])
• serializing opaque user-defined transition values when eviction is 
required

For 1), we plan to use our WithStats memory context, which Serge mentioned.
For 2), we plan to start with a simple non-eviction policy since we don’t have 
the stats do anything smarter (i.e. insert until we fill the hash table, then 
spill until we finish processing the batch, with evictions only happening if a 
group’s transition value grows too large).
For 3), we don’t have a good solution yet. We could serialize/deserialize for 
built-in types and rely on users to provide serialize/deserialize functions for 
user-defined aggregates going forward.

But we’re open to suggestions :)

Regards,
David
Salesforce

[1] Revisiting Aggregation for Data Intensive Applications: A Performance Study
https://arxiv.org/pdf/1311.0059.pdf

[2] DB2 with BLU acceleration: so much more than just a column store
http://delivery.acm.org/10.1145/254/2536233/p1080-raman.pdf?ip=204.14.239.107=2536233=ACTIVE%20SERVICE=37B0A9F49C26EEFC%2E37B0A9F49C26EEFC%2E4D4702B0C3E38B35%2E4D4702B0C3E38B35&__acm__=1528414374_aeb9f862ae2acc26db305d591095e3f7


Re: Spilling hashed SetOps and aggregates to disk

2018-06-07 Thread Tomas Vondra

On 06/07/2018 02:18 AM, Andres Freund wrote:

On 2018-06-06 17:17:52 -0700, Andres Freund wrote:

On 2018-06-07 12:11:37 +1200, David Rowley wrote:

On 7 June 2018 at 08:11, Tomas Vondra  wrote:

On 06/06/2018 04:11 PM, Andres Freund wrote:

Consider e.g. a scheme where we'd switch from hashed aggregation to
sorted aggregation due to memory limits, but already have a number of
transition values in the hash table. Whenever the size of the transition
values in the hashtable exceeds memory size, we write one of them to the
tuplesort (with serialized transition value). From then on further input
rows for that group would only be written to the tuplesort, as the group
isn't present in the hashtable anymore.



Ah, so you're suggesting that during the second pass we'd deserialize
the transition value and then add the tuples to it, instead of building
a new transition value. Got it.


Having to deserialize every time we add a new tuple sounds terrible
from a performance point of view.


I didn't mean that we do that, and I don't think David understood it as
that either. I was talking about the approach where the second pass is a
sort rather than hash based aggregation.  Then we would *not* need to
deserialize more than exactly once.


s/David/Tomas/, obviously. Sorry, it's been a long day.



Solution is simple: drink more coffee. ;-)

regards

--
Tomas Vondra  http://www.2ndQuadrant.com
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services



Re: Spilling hashed SetOps and aggregates to disk

2018-06-06 Thread David Rowley
On 6 June 2018 at 01:17, David Rowley  wrote:
> On 6 June 2018 at 01:09, Andres Freund  wrote:
>> On 2018-06-06 01:06:39 +1200, David Rowley wrote:
>>> My concern is that only accounting memory for the group and not the
>>> state is only solving half the problem. It might be fine for
>>> aggregates that don't stray far from their aggtransspace, but for the
>>> other ones, we could still see OOM.
>>
>>> If solving the problem completely is too hard, then a half fix (maybe
>>> 3/4) is better than nothing, but if we can get a design for a full fix
>>> before too much work is done, then isn't that better?
>>
>> I don't think we actually disagree.  I was really primarily talking
>> about the case where we can't really do better because we don't have
>> serialization support.  I mean we could just rescan from scratch, using
>> a groupagg, but that obviously sucks.
>
> I don't think we do. To take yours to the 100% solution might just
> take adding the memory accounting to palloc that Jeff proposed a few
> years ago and use that accounting to decide when we should switch
> method.
>
> However, I don't quite fully recall how the patch accounted for memory
> consumed by sub-contexts and if getting the entire consumption
> required recursively looking at subcontexts. If that's the case then
> checking the consumption would likely cost too much if it was done
> after each tuple was aggregated.

I wonder if the whole internal state memory accounting problem could
be solved by just adding an aggregate supporting function for internal
state aggregates that returns the number of bytes consumed by the
state. It might be good enough to fall back on aggtransspace when the
function is not defined. Such a function would be about 3 lines long
for string_agg and array_agg, and these are the problem aggregates.


-- 
 David Rowley   http://www.2ndQuadrant.com/
 PostgreSQL Development, 24x7 Support, Training & Services



Re: Spilling hashed SetOps and aggregates to disk

2018-06-06 Thread Andres Freund
On 2018-06-06 17:17:52 -0700, Andres Freund wrote:
> On 2018-06-07 12:11:37 +1200, David Rowley wrote:
> > On 7 June 2018 at 08:11, Tomas Vondra  wrote:
> > > On 06/06/2018 04:11 PM, Andres Freund wrote:
> > >> Consider e.g. a scheme where we'd switch from hashed aggregation to
> > >> sorted aggregation due to memory limits, but already have a number of
> > >> transition values in the hash table. Whenever the size of the transition
> > >> values in the hashtable exceeds memory size, we write one of them to the
> > >> tuplesort (with serialized transition value). From then on further input
> > >> rows for that group would only be written to the tuplesort, as the group
> > >> isn't present in the hashtable anymore.
> > >>
> > >
> > > Ah, so you're suggesting that during the second pass we'd deserialize
> > > the transition value and then add the tuples to it, instead of building
> > > a new transition value. Got it.
> > 
> > Having to deserialize every time we add a new tuple sounds terrible
> > from a performance point of view.
> 
> I didn't mean that we do that, and I don't think David understood it as
> that either. I was talking about the approach where the second pass is a
> sort rather than hash based aggregation.  Then we would *not* need to
> deserialize more than exactly once.

s/David/Tomas/, obviously. Sorry, it's been a long day.

Greetings,

Andres Freund



Re: Spilling hashed SetOps and aggregates to disk

2018-06-06 Thread Andres Freund
On 2018-06-07 12:11:37 +1200, David Rowley wrote:
> On 7 June 2018 at 08:11, Tomas Vondra  wrote:
> > On 06/06/2018 04:11 PM, Andres Freund wrote:
> >> Consider e.g. a scheme where we'd switch from hashed aggregation to
> >> sorted aggregation due to memory limits, but already have a number of
> >> transition values in the hash table. Whenever the size of the transition
> >> values in the hashtable exceeds memory size, we write one of them to the
> >> tuplesort (with serialized transition value). From then on further input
> >> rows for that group would only be written to the tuplesort, as the group
> >> isn't present in the hashtable anymore.
> >>
> >
> > Ah, so you're suggesting that during the second pass we'd deserialize
> > the transition value and then add the tuples to it, instead of building
> > a new transition value. Got it.
> 
> Having to deserialize every time we add a new tuple sounds terrible
> from a performance point of view.

I didn't mean that we do that, and I don't think David understood it as
that either. I was talking about the approach where the second pass is a
sort rather than hash based aggregation.  Then we would *not* need to
deserialize more than exactly once.


> Can't we just:
> 
> 1. HashAgg until the hash table reaches work_mem.
> 2. Spill the entire table to disk.
> 3. Destroy the table and create a new one.
> 4. If more tuples: goto 1
> 5. Merge sort and combine each dumped set of tuples.

Well, that requires sorting without really much advantage.  I was more
thinking of

1.  HashAgg until the hash table reaches work_mem.
2.  If entry is in HashTable advance() there, goto 3a. Otherwise put into
tuplestore, goto 2.
3.  If this increases the size of transition values too much, spill
tuple into tuplestore.
4.  Project tuples from hashtable, destroy.
5.  Sort tuplestore with a comparator that sorts extra column with
serialized states first. Project.

Greetings,

Andres Freund



Re: Spilling hashed SetOps and aggregates to disk

2018-06-06 Thread Tomas Vondra



On 06/07/2018 02:11 AM, David Rowley wrote:
> On 7 June 2018 at 08:11, Tomas Vondra  wrote:
>> On 06/06/2018 04:11 PM, Andres Freund wrote:
>>> Consider e.g. a scheme where we'd switch from hashed aggregation to
>>> sorted aggregation due to memory limits, but already have a number of
>>> transition values in the hash table. Whenever the size of the transition
>>> values in the hashtable exceeds memory size, we write one of them to the
>>> tuplesort (with serialized transition value). From then on further input
>>> rows for that group would only be written to the tuplesort, as the group
>>> isn't present in the hashtable anymore.
>>>
>>
>> Ah, so you're suggesting that during the second pass we'd deserialize
>> the transition value and then add the tuples to it, instead of building
>> a new transition value. Got it.
> 
> Having to deserialize every time we add a new tuple sounds terrible
> from a performance point of view.
> 

I don't think Andres was suggesting that. I think the scheme was to sort
the tuples and read all the tuples for a group at once (so we would
deserialize just once).

> Can't we just:
> 
> 1. HashAgg until the hash table reaches work_mem.
> 2. Spill the entire table to disk.
> 3. Destroy the table and create a new one.
> 4. If more tuples: goto 1
> 5. Merge sort and combine each dumped set of tuples.
> 

... and this is pretty much what Jeff Davis suggested, I think. The
trouble is, each of those cases behaves nicely/terribly in different
corner cases.

regards

-- 
Tomas Vondra  http://www.2ndQuadrant.com
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services



Re: Spilling hashed SetOps and aggregates to disk

2018-06-06 Thread David Rowley
On 7 June 2018 at 08:11, Tomas Vondra  wrote:
> On 06/06/2018 04:11 PM, Andres Freund wrote:
>> Consider e.g. a scheme where we'd switch from hashed aggregation to
>> sorted aggregation due to memory limits, but already have a number of
>> transition values in the hash table. Whenever the size of the transition
>> values in the hashtable exceeds memory size, we write one of them to the
>> tuplesort (with serialized transition value). From then on further input
>> rows for that group would only be written to the tuplesort, as the group
>> isn't present in the hashtable anymore.
>>
>
> Ah, so you're suggesting that during the second pass we'd deserialize
> the transition value and then add the tuples to it, instead of building
> a new transition value. Got it.

Having to deserialize every time we add a new tuple sounds terrible
from a performance point of view.

Can't we just:

1. HashAgg until the hash table reaches work_mem.
2. Spill the entire table to disk.
3. Destroy the table and create a new one.
4. If more tuples: goto 1
5. Merge sort and combine each dumped set of tuples.

-- 
 David Rowley   http://www.2ndQuadrant.com/
 PostgreSQL Development, 24x7 Support, Training & Services



Re: Spilling hashed SetOps and aggregates to disk

2018-06-06 Thread Tomas Vondra
On 06/06/2018 04:11 PM, Andres Freund wrote:
> On 2018-06-06 16:06:18 +0200, Tomas Vondra wrote:
>> On 06/06/2018 04:01 PM, Andres Freund wrote:
>>> Hi,
>>>
>>> On 2018-06-06 15:58:16 +0200, Tomas Vondra wrote:
 The other issue is that serialize/deserialize is only a part of a problem -
 you also need to know how to do "combine", and not all aggregates can do
 that ... (certainly not in universal way).
>>>
>>> There are several schemes where only serialize/deserialize are needed,
>>> no?  There are a number of fairly sensible schemes where there won't be
>>> multiple transition values for the same group, no?
>>>
>>
>> Possibly, not sure what schemes you have in mind exactly ...
>>
>> But if you know there's only a single transition value, why would you need
>> serialize/deserialize at all. Why couldn't you just finalize the value and
>> serialize that?
> 
> Because you don't necessarily have all the necessary input rows
> yet.
> 
> Consider e.g. a scheme where we'd switch from hashed aggregation to
> sorted aggregation due to memory limits, but already have a number of
> transition values in the hash table. Whenever the size of the transition
> values in the hashtable exceeds memory size, we write one of them to the
> tuplesort (with serialized transition value). From then on further input
> rows for that group would only be written to the tuplesort, as the group
> isn't present in the hashtable anymore.
> 

Ah, so you're suggesting that during the second pass we'd deserialize
the transition value and then add the tuples to it, instead of building
a new transition value. Got it.

That being said, I'm not sure if such generic serialize/deserialize can
work, but I'd guess no, otherwise we'd probably use it when implementing
the parallel aggregate.

regards

-- 
Tomas Vondra  http://www.2ndQuadrant.com
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services



Re: Spilling hashed SetOps and aggregates to disk

2018-06-06 Thread Andres Freund
On 2018-06-06 16:06:18 +0200, Tomas Vondra wrote:
> On 06/06/2018 04:01 PM, Andres Freund wrote:
> > Hi,
> > 
> > On 2018-06-06 15:58:16 +0200, Tomas Vondra wrote:
> > > The other issue is that serialize/deserialize is only a part of a problem 
> > > -
> > > you also need to know how to do "combine", and not all aggregates can do
> > > that ... (certainly not in universal way).
> > 
> > There are several schemes where only serialize/deserialize are needed,
> > no?  There are a number of fairly sensible schemes where there won't be
> > multiple transition values for the same group, no?
> > 
> 
> Possibly, not sure what schemes you have in mind exactly ...
> 
> But if you know there's only a single transition value, why would you need
> serialize/deserialize at all. Why couldn't you just finalize the value and
> serialize that?

Because you don't necessarily have all the necessary input rows
yet.

Consider e.g. a scheme where we'd switch from hashed aggregation to
sorted aggregation due to memory limits, but already have a number of
transition values in the hash table. Whenever the size of the transition
values in the hashtable exceeds memory size, we write one of them to the
tuplesort (with serialized transition value). From then on further input
rows for that group would only be written to the tuplesort, as the group
isn't present in the hashtable anymore.

Greetings,

Andres Freund



Re: Spilling hashed SetOps and aggregates to disk

2018-06-06 Thread Tomas Vondra

On 06/06/2018 04:01 PM, Andres Freund wrote:

Hi,

On 2018-06-06 15:58:16 +0200, Tomas Vondra wrote:

The other issue is that serialize/deserialize is only a part of a problem -
you also need to know how to do "combine", and not all aggregates can do
that ... (certainly not in universal way).


There are several schemes where only serialize/deserialize are needed,
no?  There are a number of fairly sensible schemes where there won't be
multiple transition values for the same group, no?



Possibly, not sure what schemes you have in mind exactly ...

But if you know there's only a single transition value, why would you 
need serialize/deserialize at all. Why couldn't you just finalize the 
value and serialize that?


regards

--
Tomas Vondra  http://www.2ndQuadrant.com
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services



Re: Spilling hashed SetOps and aggregates to disk

2018-06-06 Thread Tomas Vondra




On 06/05/2018 07:39 PM, David Fetter wrote:

On Tue, Jun 05, 2018 at 01:27:01PM -0400, Tom Lane wrote:

David Fetter  writes:

On Tue, Jun 05, 2018 at 02:56:23PM +1200, David Rowley wrote:

True. Although not all built in aggregates have those defined.



Just out of curiosity, which ones don't? As of
3f85c62d9e825eedd1315d249ef1ad793ca78ed4, pg_aggregate has both of
those as NOT NULL.


NOT NULL isn't too relevant; that's just protecting the fixed-width
nature of the catalog rows.  What's important is which ones are zero.


Thanks for helping me understand this better.


# select aggfnoid::regprocedure, aggkind from pg_aggregate where (aggserialfn=0 
or aggdeserialfn=0) and aggtranstype = 'internal'::regtype;
aggfnoid   | aggkind
--+-
[snip]
(19 rows)

Probably the ordered-set/hypothetical ones aren't relevant for this
issue.

Whether or not we feel like fixing the above "normal" aggs for this,
the patch would have to not fail on extension aggregates that don't
support serialization.


Could there be some kind of default serialization with reasonable
properties?



Not really, because the aggregates often use "internal" i.e. a pointer 
referencing whothehellknowswhat, and how do you serialize/deserialize 
that? The other issue is that serialize/deserialize is only a part of a 
problem - you also need to know how to do "combine", and not all 
aggregates can do that ... (certainly not in universal way).


regards

--
Tomas Vondra  http://www.2ndQuadrant.com
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services



Re: Spilling hashed SetOps and aggregates to disk

2018-06-05 Thread Andres Freund
Hi,

On 2018-06-05 10:47:49 -0700, Jeff Davis wrote:
> The thing I don't like about it is that it requires running two memory-
> hungry operations at once. How much of work_mem do we use for sorted
> runs, and how much do we use for the hash table?

Is that necessarily true? I'd assume that we'd use a small amount of
memory for the tuplesort, enough to avoid unnecessary disk spills for
each tuple. But a few kb should be enough - think it's fine to
aggressively spill to disk, we after all already have handled the case
of smaller number of input rows.  Then at the end of the run, we empty
out the hashtable, and free it. Only then we do to the sort.

One thing this wouldn't handle are datatypes that support hashing, but
no sorting. Not exactly common.

Greetings,

Andres Freund



Re: Spilling hashed SetOps and aggregates to disk

2018-06-05 Thread Jeff Davis
On Tue, 2018-06-05 at 05:42 -0700, Andres Freund wrote:
> That's an interesting idea, but it seems simpler to stick to
> > hashing
> > rather than using a combination strategy. It also seems like it
> > would
> > take less CPU effort.
> Isn't the locality of access going to considerably better with the
> sort
> based approach?

I think I see what you mean, but without measuring it's hard to say.
You may be able to achieve similar locality in the hashing approach by
using a smaller hash table -- an effect that I think I observed
previously but I'd have to rerun the numbers.

> > 
> > What advantages do you have in mind? My patch partitions the
> > spilled
> > data, so it should have similar disk costs as a sort approach.
> I think one part of it is that I think the amount of code is going to
> be
> lower - we essentially have already all the code to handle sort based
> aggs, and to have both sort and hash based aggs in the same query.
> We'd
> mostly need a way to scan the hashtable and stuff it into a
> tuplesort,
> that's not hard.  nodeAgg.c is already more than complex enough, I'm
> not
> sure that full blown partitioning is worth the cost.

The thing I like about your idea is that we wouldn't need to make a
choice at plan time, we just always do the hybrid hashing/sorting
unless we know that it's already sorted.

The thing I don't like about it is that it requires running two memory-
hungry operations at once. How much of work_mem do we use for sorted
runs, and how much do we use for the hash table?

Regards,
Jeff Davis




Re: Re: Spilling hashed SetOps and aggregates to disk

2018-06-05 Thread Jeff Davis
On Tue, 2018-06-05 at 08:39 -0700, se...@rielau.com wrote:
> Strange. We don't see this overeahd and measure a lot more than just
> a single metric:

Let's investigate again. I wasn't able to repro the overhead on x86;
Robert saw it on a POWER machine, and it was somewhat noisy. I don't
think we were ever very sure the overhead existed.

My basic opinion is that we make small changes all the time that may
have a small performance impact, and we shouldn't let that become a
blocker for an important feature. Nor should we let it make the design
overly complicated or awkward. We should just see what we can
reasonably do to understand and mitigate it.

Regards,
Jeff Davis




Re: Spilling hashed SetOps and aggregates to disk

2018-06-05 Thread Tom Lane
David Fetter  writes:
> On Tue, Jun 05, 2018 at 02:56:23PM +1200, David Rowley wrote:
>> True. Although not all built in aggregates have those defined.

> Just out of curiosity, which ones don't? As of
> 3f85c62d9e825eedd1315d249ef1ad793ca78ed4, pg_aggregate has both of
> those as NOT NULL.

NOT NULL isn't too relevant; that's just protecting the fixed-width
nature of the catalog rows.  What's important is which ones are zero.

# select aggfnoid::regprocedure, aggkind from pg_aggregate where (aggserialfn=0 
or aggdeserialfn=0) and aggtranstype = 'internal'::regtype;
   aggfnoid   | aggkind 
--+-
 array_agg(anynonarray)   | n
 array_agg(anyarray)  | n
 string_agg(text,text)| n
 string_agg(bytea,bytea)  | n
 json_agg(anyelement) | n
 json_object_agg("any","any") | n
 jsonb_agg(anyelement)| n
 jsonb_object_agg("any","any")| n
 percentile_disc(double precision,anyelement) | o
 percentile_cont(double precision,double precision)   | o
 percentile_cont(double precision,interval)   | o
 percentile_disc(double precision[],anyelement)   | o
 percentile_cont(double precision[],double precision) | o
 percentile_cont(double precision[],interval) | o
 mode(anyelement) | o
 rank("any")  | h
 percent_rank("any")  | h
 cume_dist("any") | h
 dense_rank("any")| h
(19 rows)

Probably the ordered-set/hypothetical ones aren't relevant for this
issue.

Whether or not we feel like fixing the above "normal" aggs for this,
the patch would have to not fail on extension aggregates that don't
support serialization.

regards, tom lane



Re: Spilling hashed SetOps and aggregates to disk

2018-06-05 Thread Andres Freund
On 2018-06-05 19:04:11 +0200, David Fetter wrote:
> On Tue, Jun 05, 2018 at 02:56:23PM +1200, David Rowley wrote:
> > On 5 June 2018 at 06:52, Andres Freund  wrote:
> > > That part has gotten a bit easier since, because we have serialize
> > > / deserialize operations for aggregates these days.
> > 
> > True. Although not all built in aggregates have those defined.
> 
> Just out of curiosity, which ones don't? As of
> 3f85c62d9e825eedd1315d249ef1ad793ca78ed4, pg_aggregate has both of
> those as NOT NULL.

That doesn't mean much. We commonly store 0 / InvalidOid for such things
in the catalog. Allows us to still map the tuples to structs (due to
fixed width).

Greetings,

Andres Freund



Re: Spilling hashed SetOps and aggregates to disk

2018-06-05 Thread David Fetter
On Tue, Jun 05, 2018 at 02:56:23PM +1200, David Rowley wrote:
> On 5 June 2018 at 06:52, Andres Freund  wrote:
> > That part has gotten a bit easier since, because we have serialize
> > / deserialize operations for aggregates these days.
> 
> True. Although not all built in aggregates have those defined.

Just out of curiosity, which ones don't? As of
3f85c62d9e825eedd1315d249ef1ad793ca78ed4, pg_aggregate has both of
those as NOT NULL.

Best,
David.
-- 
David Fetter  http://fetter.org/
Phone: +1 415 235 3778

Remember to vote!
Consider donating to Postgres: http://www.postgresql.org/about/donate



Re: Spilling hashed SetOps and aggregates to disk

2018-06-05 Thread David Fetter
On Tue, Jun 05, 2018 at 02:56:23PM +1200, David Rowley wrote:
> On 5 June 2018 at 06:52, Andres Freund  wrote:
> > That part has gotten a bit easier since, because we have serialize /
> > deserialize operations for aggregates these days.
> 
> True. Although not all built in aggregates have those defined.

That is a SMoP which we could, at some point, enforce by requiring
that they always be defined.  Is there something other than round
tuits that's preventing that?

Best,
David.
-- 
David Fetter  http://fetter.org/
Phone: +1 415 235 3778

Remember to vote!
Consider donating to Postgres: http://www.postgresql.org/about/donate



Re: Spilling hashed SetOps and aggregates to disk

2018-06-05 Thread Tomas Vondra

On 06/05/2018 03:41 PM, se...@rielau.com wrote:
In our code base we have added a WithStats-Flavor for creating memory 
contexts.
This api accepts a pointer to metric for accounting and it is inherited 
by all subcontexts unless overridden.
So we only needed to change context creation API where we wanted (such 
as TopTansactionContext, Message Context, ..)

That's quite trivial, actually.


I think that's pretty much what we tried when this patch was first 
discussed in 2015, but it added measurable overhead (1-3%) even when the 
accounting was disabled. Which is not quite desirable, of course.


Also we have fixed all those missing hash spills - albeit based on the 
9.6 hash table design I think.


I don't think this part of the code changed all that much.


If there is interest by the community we are very willing to share.


Absolutely! I think it'd be great to share both the code and the 
reasoning behind the design.



Cheers
Serge
Salesforce


cheers

--
Tomas Vondra  http://www.2ndQuadrant.com
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services



RE: Re: Spilling hashed SetOps and aggregates to disk

2018-06-05 Thread serge
In our code base we have added a WithStats-Flavor for creating memory contexts.
This api accepts a pointer to metric for accounting and it is inherited by all 
subcontexts unless overridden.
So we only needed to change context creation API where we wanted (such as 
TopTansactionContext, Message Context, ..)
That's quite trivial, actually.
 
Also we have fixed all those missing hash spills - albeit based on the 9.6 hash 
table design I think.
 
If there is interest by the community we are very willing to share.
 
Cheers
Serge
Salesforce


Re: Spilling hashed SetOps and aggregates to disk

2018-06-05 Thread Tomas Vondra

On 06/05/2018 03:17 PM, David Rowley wrote:

On 6 June 2018 at 01:09, Andres Freund  wrote:

On 2018-06-06 01:06:39 +1200, David Rowley wrote:

My concern is that only accounting memory for the group and not the
state is only solving half the problem. It might be fine for
aggregates that don't stray far from their aggtransspace, but for the
other ones, we could still see OOM.



If solving the problem completely is too hard, then a half fix (maybe
3/4) is better than nothing, but if we can get a design for a full fix
before too much work is done, then isn't that better?


I don't think we actually disagree.  I was really primarily talking
about the case where we can't really do better because we don't have
serialization support.  I mean we could just rescan from scratch, using
a groupagg, but that obviously sucks.


I don't think we do. To take yours to the 100% solution might just
take adding the memory accounting to palloc that Jeff proposed a few
years ago and use that accounting to decide when we should switch
method.

However, I don't quite fully recall how the patch accounted for
memory consumed by sub-contexts and if getting the entire
consumption required recursively looking at subcontexts. If that's
the case then checking the consumption would likely cost too much if
it was done after each tuple was aggregated.


Yeah, a simple wrapper would not really fix the issue, because 
allocating memory in the subcontext would not update the accounting 
information. So we'd be oblivious of possibly large amounts of memory. I 
don't quite see how is this related to (not) having serialization 
support, as it's more about not knowing we already hit the memory limit.



That being said, I don't think array_agg actually has the issue, at 
least since b419865a814abbca12bdd6eef6a3d5ed67f432e1 (it does behave 
differently in aggregate and non-aggregate contexts, IIRC).


I don't know how common this issue is, though. I don't think any 
built-in aggregates create additional contexts, but I may be wrong. But 
we have this damn extensibility thing, where users can write their own 
aggregates ...


regards

--
Tomas Vondra  http://www.2ndQuadrant.com
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services



Re: Spilling hashed SetOps and aggregates to disk

2018-06-05 Thread David Rowley
On 6 June 2018 at 01:09, Andres Freund  wrote:
> On 2018-06-06 01:06:39 +1200, David Rowley wrote:
>> My concern is that only accounting memory for the group and not the
>> state is only solving half the problem. It might be fine for
>> aggregates that don't stray far from their aggtransspace, but for the
>> other ones, we could still see OOM.
>
>> If solving the problem completely is too hard, then a half fix (maybe
>> 3/4) is better than nothing, but if we can get a design for a full fix
>> before too much work is done, then isn't that better?
>
> I don't think we actually disagree.  I was really primarily talking
> about the case where we can't really do better because we don't have
> serialization support.  I mean we could just rescan from scratch, using
> a groupagg, but that obviously sucks.

I don't think we do. To take yours to the 100% solution might just
take adding the memory accounting to palloc that Jeff proposed a few
years ago and use that accounting to decide when we should switch
method.

However, I don't quite fully recall how the patch accounted for memory
consumed by sub-contexts and if getting the entire consumption
required recursively looking at subcontexts. If that's the case then
checking the consumption would likely cost too much if it was done
after each tuple was aggregated.


-- 
 David Rowley   http://www.2ndQuadrant.com/
 PostgreSQL Development, 24x7 Support, Training & Services



Re: Spilling hashed SetOps and aggregates to disk

2018-06-05 Thread Tomas Vondra

On 06/05/2018 02:49 PM, Andres Freund wrote:

Hi,

On 2018-06-05 10:05:35 +0200, Tomas Vondra wrote:

My concern is more about what happens when the input tuple ordering is
inherently incompatible with the eviction strategy, greatly increasing the
amount of data written to disk during evictions.

Say for example that we can fit 1000 groups into work_mem, and that there
are 2000 groups in the input data set. If the input is correlated with the
groups, everything is peachy because we'll evict the first batch, and then
group the remaining 1000 groups (or something like that). But if the input
data is random (which can easily happen, e.g. for IP addresses, UUIDs and
such) we'll hit the limit repeatedly and will evict much sooner.



I know you suggested simply dumping the whole hash table and starting from
scratch while we talked about this at pgcon, but ISTM it has exactly this
issue.


Yea, that's the case I was thinking of where going to sorting will very
likely have better performance.

I think it'd even be sensible to have a "skew tuple" like
optimization. When detecting getting closer to memory exhaustion, move
new groups to the tuplesort, but already hashed tuples stay in the
hashtable.  That'd still need tuples being moved to the sort in the
cases where the transition values get to big (say array_agg), but that
should be comparatively rare.  I'm sure we could do better in selecting
the hash-tabled values than just taking the first incoming ones, but
that shouldn't be too bad.



Not sure. I'd imagine the "compression" due to aggregating many tuples 
into much smaller amount of memory would be a clear win, so I don't find 
the "let's just dump all input tuples into a file" very attractive.


I think evicting a fraction of the aggregate state (say, ~25%) would 
work better - choosing the "oldest" items seems OK, as those likely 
represent many input tuples (thus having a high "compaction" ratio). Or 
we could evict items representing rare groups, to make space for the 
common ones. Perhaps a combined eviction strategy (10% of each type) 
would work well. I'm conveniently ignoring the fact that we don't have 
information to determine this, at the moment, of course.


I'm sure it's possible to make better decisions based on some cost 
estimates, both at plan time and then during execution.


That being said, I don't want to over-think / over-engineer this. 
Getting something that reduces the risk of OOM in the first step is a 
good enough improvement. If we can do something better next, great.


regards

--
Tomas Vondra  http://www.2ndQuadrant.com
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services



Re: Spilling hashed SetOps and aggregates to disk

2018-06-05 Thread Andres Freund
Hi,

On 2018-06-06 01:06:39 +1200, David Rowley wrote:
> On 6 June 2018 at 00:57, Andres Freund  wrote:
> > I think it's ok to only handle this gracefully if serialization is
> > supported.
> >
> > But I think my proposal to continue use a hashtable for the already
> > known groups, and sorting for additional groups would largely address
> > that largely, right?  We couldn't deal with groups becoming too large,
> > but easily with the number of groups becoming too large.
> 
> My concern is that only accounting memory for the group and not the
> state is only solving half the problem. It might be fine for
> aggregates that don't stray far from their aggtransspace, but for the
> other ones, we could still see OOM.

> If solving the problem completely is too hard, then a half fix (maybe
> 3/4) is better than nothing, but if we can get a design for a full fix
> before too much work is done, then isn't that better?

I don't think we actually disagree.  I was really primarily talking
about the case where we can't really do better because we don't have
serialization support.  I mean we could just rescan from scratch, using
a groupagg, but that obviously sucks.

Greetings,

Andres Freund



Re: Spilling hashed SetOps and aggregates to disk

2018-06-05 Thread David Rowley
On 6 June 2018 at 00:57, Andres Freund  wrote:
> On 2018-06-06 00:53:42 +1200, David Rowley wrote:
>> On 6 June 2018 at 00:45, Andres Freund  wrote:
>> > On 2018-06-05 09:35:13 +0200, Tomas Vondra wrote:
>> >> I wonder if an aggregate might use a custom context
>> >> internally (I don't recall anything like that). The accounting capability
>> >> seems potentially useful for other places, and those might not use 
>> >> AllocSet
>> >> (or at least not directly).
>> >
>> > Yea, that seems like a big issue.
>>
>> Unfortunately, at least one of the built-in ones do. See initArrayResultArr.
>
> I think it's ok to only handle this gracefully if serialization is
> supported.
>
> But I think my proposal to continue use a hashtable for the already
> known groups, and sorting for additional groups would largely address
> that largely, right?  We couldn't deal with groups becoming too large,
> but easily with the number of groups becoming too large.

My concern is that only accounting memory for the group and not the
state is only solving half the problem. It might be fine for
aggregates that don't stray far from their aggtransspace, but for the
other ones, we could still see OOM.

If solving the problem completely is too hard, then a half fix (maybe
3/4) is better than nothing, but if we can get a design for a full fix
before too much work is done, then isn't that better?


-- 
 David Rowley   http://www.2ndQuadrant.com/
 PostgreSQL Development, 24x7 Support, Training & Services



Re: Spilling hashed SetOps and aggregates to disk

2018-06-05 Thread Andres Freund
On 2018-06-06 00:53:42 +1200, David Rowley wrote:
> On 6 June 2018 at 00:45, Andres Freund  wrote:
> > On 2018-06-05 09:35:13 +0200, Tomas Vondra wrote:
> >> I wonder if an aggregate might use a custom context
> >> internally (I don't recall anything like that). The accounting capability
> >> seems potentially useful for other places, and those might not use AllocSet
> >> (or at least not directly).
> >
> > Yea, that seems like a big issue.
> 
> Unfortunately, at least one of the built-in ones do. See initArrayResultArr.

I think it's ok to only handle this gracefully if serialization is
supported.

But I think my proposal to continue use a hashtable for the already
known groups, and sorting for additional groups would largely address
that largely, right?  We couldn't deal with groups becoming too large,
but easily with the number of groups becoming too large.

Greetings,

Andres Freund



Re: Spilling hashed SetOps and aggregates to disk

2018-06-05 Thread David Rowley
On 6 June 2018 at 00:45, Andres Freund  wrote:
> On 2018-06-05 09:35:13 +0200, Tomas Vondra wrote:
>> I wonder if an aggregate might use a custom context
>> internally (I don't recall anything like that). The accounting capability
>> seems potentially useful for other places, and those might not use AllocSet
>> (or at least not directly).
>
> Yea, that seems like a big issue.

Unfortunately, at least one of the built-in ones do. See initArrayResultArr.

-- 
 David Rowley   http://www.2ndQuadrant.com/
 PostgreSQL Development, 24x7 Support, Training & Services



Re: Spilling hashed SetOps and aggregates to disk

2018-06-05 Thread Andres Freund
Hi,

On 2018-06-05 10:05:35 +0200, Tomas Vondra wrote:
> My concern is more about what happens when the input tuple ordering is
> inherently incompatible with the eviction strategy, greatly increasing the
> amount of data written to disk during evictions.
> 
> Say for example that we can fit 1000 groups into work_mem, and that there
> are 2000 groups in the input data set. If the input is correlated with the
> groups, everything is peachy because we'll evict the first batch, and then
> group the remaining 1000 groups (or something like that). But if the input
> data is random (which can easily happen, e.g. for IP addresses, UUIDs and
> such) we'll hit the limit repeatedly and will evict much sooner.

> I know you suggested simply dumping the whole hash table and starting from
> scratch while we talked about this at pgcon, but ISTM it has exactly this
> issue.

Yea, that's the case I was thinking of where going to sorting will very
likely have better performance.

I think it'd even be sensible to have a "skew tuple" like
optimization. When detecting getting closer to memory exhaustion, move
new groups to the tuplesort, but already hashed tuples stay in the
hashtable.  That'd still need tuples being moved to the sort in the
cases where the transition values get to big (say array_agg), but that
should be comparatively rare.  I'm sure we could do better in selecting
the hash-tabled values than just taking the first incoming ones, but
that shouldn't be too bad.

Greetings,

Andres Freund



Re: Spilling hashed SetOps and aggregates to disk

2018-06-05 Thread Tomas Vondra

On 06/05/2018 07:46 AM, Jeff Davis wrote:

On Tue, 2018-06-05 at 07:04 +0200, Tomas Vondra wrote:

I expect the eviction strategy to be the primary design challenge of
this patch. The other bits will be mostly determined by this one
piece.


Not sure I agree that this is the primary challenge.

The cases that benefit from eviction are probably a minority. I see two
categories that would benefit:

   * Natural clustering in the heap. This sounds fairly common, but a
lot of the cases that come to mind are too low-cardinality to be
compelling; e.g. timestamps grouped by hour/day/month. If someone has
run into a high-cardinality natural grouping case, let me know.
   * ARRAY_AGG (or similar): individual state values can be large enough
that we need to evict to avoid exceeding work_mem even if not adding
any new groups.

In either case, it seems like a fairly simple eviction strategy would
work. For instance, we could just evict the entire hash table if
work_mem is exceeded or if the hit rate on the hash table falls below a
certain threshold. If there was really something important that should
have stayed in the hash table, it will go back in soon anyway.

So why should eviction be a major driver for the entire design? I agree
it should be an area of improvement for the future, so let me know if
you see a major problem, but I haven't been as focused on eviction.



My concern is more about what happens when the input tuple ordering is 
inherently incompatible with the eviction strategy, greatly increasing 
the amount of data written to disk during evictions.


Say for example that we can fit 1000 groups into work_mem, and that 
there are 2000 groups in the input data set. If the input is correlated 
with the groups, everything is peachy because we'll evict the first 
batch, and then group the remaining 1000 groups (or something like 
that). But if the input data is random (which can easily happen, e.g. 
for IP addresses, UUIDs and such) we'll hit the limit repeatedly and 
will evict much sooner.


I know you suggested simply dumping the whole hash table and starting 
from scratch while we talked about this at pgcon, but ISTM it has 
exactly this issue.


But I don't know if there actually is a better option - maybe we simply 
have to accept this problem. After all, running slowly is still better 
than OOM (which may or may not happen now).


I wonder if we can somehow detect this at run-time and maybe fall-back 
to groupagg. E.g. we could compare number of groups vs. number of input 
tuples when we first hit the limit. It's a rough heuristics, but maybe 
sufficient.



While the primary goal of the patch is addressing the OOM risks in
hash
aggregate, I think it'd be a mistake to see it just that way. I see
it
could allow us to perform hash aggregate more often, even if we know
the
groups won't fit into work_mem. If we could estimate how much of the
aggregate state we'll have to spill to disk, we could still prefer
hashagg over groupagg. We would pay the price for eviction, but on
large
data sets that can be massively cheaper than having to do sort.


Agreed. I ran some tests of my patch in the last round, and they
strongly supported choosing HashAgg a lot more often. A lot of sort
improvements have been made though, so I really need to run some new
numbers.



Yeah, Peter made the sort stuff a lot faster. But I think there still is 
a lot of cases where the hashagg would greatly reduce the amount of data 
that needs to be written to disk, and that's not something the sort 
improvements could improve. If you need to aggregate a 1TB table, it's 
still going to be ~1TB of data you need to write to disk during on-disk 
sort. But if there is only a reasonable number of groups, that greatly 
simplifies things.



far away), but it would be unfortunate to make this improvement
impossible/more difficult in the future.


If you see anything that would make this difficult in the future, let
me know.



Sure. Sorry for being too hand-wavy in this thread, and possibly moving 
the goal posts further away :-/ I got excited by you planning to work on 
this again and perhaps a bit carried away ;-)



regards

--
Tomas Vondra  http://www.2ndQuadrant.com
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services



Re: Spilling hashed SetOps and aggregates to disk

2018-06-05 Thread David Rowley
On 5 June 2018 at 17:04, Tomas Vondra  wrote:
> On 06/05/2018 04:56 AM, David Rowley wrote:
>> Isn't there still a problem determining when the memory exhaustion
>> actually happens though?   As far as I know, we've still little
>> knowledge how much memory each aggregate state occupies.
>>
>> Jeff tried to solve this in [1], but from what I remember, there was
>> too much concern about the overhead of the additional accounting code.
>>
>> [1] 
>> https://www.postgresql.org/message-id/flat/CAKJS1f8yvvvj-sVDv_bcxkzcZKq0ZOTVhX0dHfnYDct2Mycq5Q%40mail.gmail.com#cakjs1f8yvvvj-svdv_bcxkzczkq0zotvhx0dhfnydct2myc...@mail.gmail.com
>>
>
> I had a chat with Jeff Davis at pgcon about this, and IIRC he suggested
> a couple of people who were originally worried about the overhead seem
> to be accepting it now.

Is there any great need to make everything pay the small price for
this? Couldn't we just create a new MemoryContextMethod that
duplicates aset.c, but has the require additional accounting built in
at the implementation level, then make execGrouping.c use that
allocator for its hash table? The code would not really need to be
duplicated, we could just do things the same way as like.c does with
like_match.c and include a .c file. We'd need another interface
function in MemoryContextMethods to support getting the total memory
allocated in the context, but that does not seem like a problem.


-- 
 David Rowley   http://www.2ndQuadrant.com/
 PostgreSQL Development, 24x7 Support, Training & Services



Re: Spilling hashed SetOps and aggregates to disk

2018-06-05 Thread Tomas Vondra



On 06/05/2018 09:22 AM, David Rowley wrote:

On 5 June 2018 at 17:04, Tomas Vondra  wrote:

On 06/05/2018 04:56 AM, David Rowley wrote:

Isn't there still a problem determining when the memory exhaustion
actually happens though?   As far as I know, we've still little
knowledge how much memory each aggregate state occupies.

Jeff tried to solve this in [1], but from what I remember, there was
too much concern about the overhead of the additional accounting code.

[1] 
https://www.postgresql.org/message-id/flat/CAKJS1f8yvvvj-sVDv_bcxkzcZKq0ZOTVhX0dHfnYDct2Mycq5Q%40mail.gmail.com#cakjs1f8yvvvj-svdv_bcxkzczkq0zotvhx0dhfnydct2myc...@mail.gmail.com



I had a chat with Jeff Davis at pgcon about this, and IIRC he suggested
a couple of people who were originally worried about the overhead seem
to be accepting it now.


Is there any great need to make everything pay the small price for
this? Couldn't we just create a new MemoryContextMethod that
duplicates aset.c, but has the require additional accounting built in
at the implementation level, then make execGrouping.c use that
allocator for its hash table? The code would not really need to be
duplicated, we could just do things the same way as like.c does with
like_match.c and include a .c file. We'd need another interface
function in MemoryContextMethods to support getting the total memory
allocated in the context, but that does not seem like a problem.



There probably is not a need, but there was not a great way to enable it 
explicitly only for some contexts. IIRC what was originally considered 
back in 2015 was some sort of a flag in the memory context, and the 
overhead was about the same as with the int64 arithmetic alone.


But I don't think we've considered copying the whole AllocSet. Maybe 
that would work, not sure. I wonder if an aggregate might use a custom 
context internally (I don't recall anything like that). The accounting 
capability seems potentially useful for other places, and those might 
not use AllocSet (or at least not directly).


All of this of course assumes the overhead is still there. I sure will 
do some new measurements.


regards

--
Tomas Vondra  http://www.2ndQuadrant.com
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services



Re: Spilling hashed SetOps and aggregates to disk

2018-06-04 Thread Jeff Davis
On Tue, 2018-06-05 at 07:04 +0200, Tomas Vondra wrote:
> I expect the eviction strategy to be the primary design challenge of
> this patch. The other bits will be mostly determined by this one
> piece.

Not sure I agree that this is the primary challenge.

The cases that benefit from eviction are probably a minority. I see two
categories that would benefit:

  * Natural clustering in the heap. This sounds fairly common, but a
lot of the cases that come to mind are too low-cardinality to be
compelling; e.g. timestamps grouped by hour/day/month. If someone has
run into a high-cardinality natural grouping case, let me know.
  * ARRAY_AGG (or similar): individual state values can be large enough
that we need to evict to avoid exceeding work_mem even if not adding
any new groups.

In either case, it seems like a fairly simple eviction strategy would
work. For instance, we could just evict the entire hash table if
work_mem is exceeded or if the hit rate on the hash table falls below a
certain threshold. If there was really something important that should
have stayed in the hash table, it will go back in soon anyway.

So why should eviction be a major driver for the entire design? I agree
it should be an area of improvement for the future, so let me know if
you see a major problem, but I haven't been as focused on eviction.

> While the primary goal of the patch is addressing the OOM risks in
> hash
> aggregate, I think it'd be a mistake to see it just that way. I see
> it
> could allow us to perform hash aggregate more often, even if we know
> the
> groups won't fit into work_mem. If we could estimate how much of the
> aggregate state we'll have to spill to disk, we could still prefer
> hashagg over groupagg. We would pay the price for eviction, but on
> large
> data sets that can be massively cheaper than having to do sort.

Agreed. I ran some tests of my patch in the last round, and they
strongly supported choosing HashAgg a lot more often. A lot of sort
improvements have been made though, so I really need to run some new
numbers.

> far away), but it would be unfortunate to make this improvement
> impossible/more difficult in the future.

If you see anything that would make this difficult in the future, let
me know.

Regards,
     Jeff Davis




Re: Spilling hashed SetOps and aggregates to disk

2018-06-04 Thread Jeff Davis
On Mon, 2018-06-04 at 11:52 -0700, Andres Freund wrote:
> I wonder whether, at least for aggregates, the better fix wouldn't be
> to
> switch to feeding the tuples into tuplesort upon memory exhaustion
> and
> doing a sort based aggregate.  We have most of the infrastructure to
> do

That's an interesting idea, but it seems simpler to stick to hashing
rather than using a combination strategy. It also seems like it would
take less CPU effort.

What advantages do you have in mind? My patch partitions the spilled
data, so it should have similar disk costs as a sort approach.

Regards,
Jeff Davis




Re: Spilling hashed SetOps and aggregates to disk

2018-06-04 Thread Tomas Vondra
On 06/05/2018 04:56 AM, David Rowley wrote:
> On 5 June 2018 at 06:52, Andres Freund  wrote:
>> That part has gotten a bit easier since, because we have serialize /
>> deserialize operations for aggregates these days.
> 
> True. Although not all built in aggregates have those defined.

Not sure what to do about those, unfortunately. We could stop adding
more groups and hope the aggregate state does not grow further, but
that's about it.

> 
>> I wonder whether, at least for aggregates, the better fix wouldn't be to
>> switch to feeding the tuples into tuplesort upon memory exhaustion and
>> doing a sort based aggregate.  We have most of the infrastructure to do
>> that due to grouping sets. It's just the pre-existing in-memory tuples
>> that'd be problematic, in that the current transition values would need
>> to serialized as well.  But with a stable sort that'd not be
>> particularly problematic, and that could easily be achieved.

That's one of the possible solutions, yes. It's hard to say if it's
better or worse than the other approaches, because that depends on the
number of tuples vs. number of groups etc.

Evicting some (or all) of the in-memory groups can be easily better or
worse, depending on the data set. And I'm not sure the code complexity
would be significantly different.

I expect the eviction strategy to be the primary design challenge of
this patch. The other bits will be mostly determined by this one piece.

While the primary goal of the patch is addressing the OOM risks in hash
aggregate, I think it'd be a mistake to see it just that way. I see it
could allow us to perform hash aggregate more often, even if we know the
groups won't fit into work_mem. If we could estimate how much of the
aggregate state we'll have to spill to disk, we could still prefer
hashagg over groupagg. We would pay the price for eviction, but on large
data sets that can be massively cheaper than having to do sort.

I admit this is a bit hand-wavy, and the costing depends on us being
able to estimate the amount of evicted data. I certainly don't expect to
get this right away (that would move the goal posts for the patch very
far away), but it would be unfortunate to make this improvement
impossible/more difficult in the future.

> 
> Isn't there still a problem determining when the memory exhaustion
> actually happens though?   As far as I know, we've still little
> knowledge how much memory each aggregate state occupies.
> 
> Jeff tried to solve this in [1], but from what I remember, there was
> too much concern about the overhead of the additional accounting code.
> 
> [1] 
> https://www.postgresql.org/message-id/flat/CAKJS1f8yvvvj-sVDv_bcxkzcZKq0ZOTVhX0dHfnYDct2Mycq5Q%40mail.gmail.com#cakjs1f8yvvvj-svdv_bcxkzczkq0zotvhx0dhfnydct2myc...@mail.gmail.com
> 

I had a chat with Jeff Davis at pgcon about this, and IIRC he suggested
a couple of people who were originally worried about the overhead seem
to be accepting it now.

IMHO we do want a memory-bound hash aggregate, and doing some sort of
memory accounting is a must-have part of that. I don't see a way around
this, really. We need to minimize the overhead, of course.

I do not recall the exact approach we ended up with in 2015, though, or
what the measurements with that version were. I see 1-3% mentioned early
in the thread, and there were some additional improvements in subsequent
patch version I think.

I don't think we can realistically improve this (accounting at block
level), and there was a discussion if this is actually an overhead or
merely due to different binary layout.

regards

-- 
Tomas Vondra  http://www.2ndQuadrant.com
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services



Re: Spilling hashed SetOps and aggregates to disk

2018-06-04 Thread David Rowley
On 5 June 2018 at 06:52, Andres Freund  wrote:
> That part has gotten a bit easier since, because we have serialize /
> deserialize operations for aggregates these days.

True. Although not all built in aggregates have those defined.

> I wonder whether, at least for aggregates, the better fix wouldn't be to
> switch to feeding the tuples into tuplesort upon memory exhaustion and
> doing a sort based aggregate.  We have most of the infrastructure to do
> that due to grouping sets. It's just the pre-existing in-memory tuples
> that'd be problematic, in that the current transition values would need
> to serialized as well.  But with a stable sort that'd not be
> particularly problematic, and that could easily be achieved.

Isn't there still a problem determining when the memory exhaustion
actually happens though?   As far as I know, we've still little
knowledge how much memory each aggregate state occupies.

Jeff tried to solve this in [1], but from what I remember, there was
too much concern about the overhead of the additional accounting code.

[1] 
https://www.postgresql.org/message-id/flat/CAKJS1f8yvvvj-sVDv_bcxkzcZKq0ZOTVhX0dHfnYDct2Mycq5Q%40mail.gmail.com#cakjs1f8yvvvj-svdv_bcxkzczkq0zotvhx0dhfnydct2myc...@mail.gmail.com

-- 
 David Rowley   http://www.2ndQuadrant.com/
 PostgreSQL Development, 24x7 Support, Training & Services



Re: Spilling hashed SetOps and aggregates to disk

2018-06-04 Thread Andres Freund
Hi,

On 2018-06-04 10:32:47 +0200, Heikki Linnakangas wrote:
> Hash Aggs and SetOps are currently not spilled to disk. If the planner's
> estimate on the number of entries is badly off, you might run out of memory
> at execution time, if all the entries don't fit in memory.
> 
> For HashAggs, this was discussed in depth a couple of years ago at [1].
> SetOps have the same issue, but fixing that is simpler, as you don't need to
> handle arbitrary aggregate transition values and functions.

That part has gotten a bit easier since, because we have serialize /
deserialize operations for aggregates these days.

I wonder whether, at least for aggregates, the better fix wouldn't be to
switch to feeding the tuples into tuplesort upon memory exhaustion and
doing a sort based aggregate.  We have most of the infrastructure to do
that due to grouping sets. It's just the pre-existing in-memory tuples
that'd be problematic, in that the current transition values would need
to serialized as well.  But with a stable sort that'd not be
particularly problematic, and that could easily be achieved.

Greetings,

Andres Freund



Spilling hashed SetOps and aggregates to disk

2018-06-04 Thread Heikki Linnakangas

Hi,

Hash Aggs and SetOps are currently not spilled to disk. If the planner's 
estimate on the number of entries is badly off, you might run out of 
memory at execution time, if all the entries don't fit in memory.


For HashAggs, this was discussed in depth a couple of years ago at [1]. 
SetOps have the same issue, but fixing that is simpler, as you don't 
need to handle arbitrary aggregate transition values and functions.


So a while back, I started hacking on spilling SetOps, with the idea 
that the code to deal with that could later be reused to deal with 
HashAggs, too. I didn't get very far, but I'm posting this in this very 
unfinished form to show what I've got, because I had a chat on this with 
Jeff Davis and some others last week.


The logtape.c interface would be very useful for this. When you start 
spilling, you want to create many spill files, so that when reloaded, 
each file will fit comfortably in memory. With logtape.c, you can have 
many logical tapes, without the overhead of real files. Furthermore, if 
you need to re-spill because you a spill file grows too big in the first 
pass, logtape.c allows reusing the space "on-the-fly". The only problem 
with the current logtape interface is that it requires specifying the 
number of "tapes" upfront, when the tapeset is created. However, I was 
planning to change that, anyway [2].


[1] 
https://www.postgresql.org/message-id/1407706010.6623.16.camel%40jeff-desktop


[2] 
https://www.postgresql.org/message-id/420a0ec7-602c-d406-1e75-1ef7ddc58d83%40iki.fi


- Heikki

>From 1513a777ca1aa1df57f054ed8d15db9a734adf91 Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas 
Date: Thu, 3 May 2018 09:44:13 +0300
Subject: [PATCH 1/2] Optimize memory usage in SetOp executor node.

---
 src/backend/executor/nodeSetOp.c | 68 +++-
 src/include/nodes/execnodes.h|  4 ++-
 2 files changed, 56 insertions(+), 16 deletions(-)

diff --git a/src/backend/executor/nodeSetOp.c b/src/backend/executor/nodeSetOp.c
index 3fa4a5fcc6..8dd017b2ef 100644
--- a/src/backend/executor/nodeSetOp.c
+++ b/src/backend/executor/nodeSetOp.c
@@ -51,6 +51,8 @@
 #include "utils/memutils.h"
 
 
+#define NUM_PERGROUPS_PER_ALLOC 100
+
 /*
  * SetOpStatePerGroupData - per-group working state
  *
@@ -60,11 +62,19 @@
  * In SETOP_SORTED mode, we need only one of these structs, and it's kept in
  * the plan state node.  In SETOP_HASHED mode, the hash table contains one
  * of these for each tuple group.
+ *
+ * Unused per-group structs are kept in a linked list, in
+ * SetOpState.free_pergroups.  In that case, 'next' points to the next struct
+ * in the free-list.
  */
-typedef struct SetOpStatePerGroupData
+typedef union SetOpStatePerGroupData
 {
-	long		numLeft;		/* number of left-input dups in group */
-	long		numRight;		/* number of right-input dups in group */
+	struct
+	{
+		uint64		numLeft;		/* number of left-input dups in group */
+		uint64		numRight;		/* number of right-input dups in group */
+	} data;
+	SetOpStatePerGroup next;		/* next unused entry in the free list */
 }			SetOpStatePerGroupData;
 
 
@@ -79,7 +89,7 @@ static TupleTableSlot *setop_retrieve_hash_table(SetOpState *setopstate);
 static inline void
 initialize_counts(SetOpStatePerGroup pergroup)
 {
-	pergroup->numLeft = pergroup->numRight = 0;
+	pergroup->data.numLeft = pergroup->data.numRight = 0;
 }
 
 /*
@@ -89,9 +99,39 @@ static inline void
 advance_counts(SetOpStatePerGroup pergroup, int flag)
 {
 	if (flag)
-		pergroup->numRight++;
+		pergroup->data.numRight++;
 	else
-		pergroup->numLeft++;
+		pergroup->data.numLeft++;
+}
+
+/*
+ * Allocate a new per-group struct.
+ *
+ * To save on memory and palloc() call overhead, we allocate the per-group
+ * structs in batches.
+ */
+static SetOpStatePerGroup
+alloc_pergroup(SetOpState *setopstate)
+{
+	SetOpStatePerGroup pergroup;
+
+	if (!setopstate->free_pergroups)
+	{
+		int			i;
+
+		setopstate->free_pergroups =
+			MemoryContextAlloc(setopstate->hashtable->tablecxt,
+			   NUM_PERGROUPS_PER_ALLOC * sizeof(SetOpStatePerGroupData));
+
+		for (i = 0; i < NUM_PERGROUPS_PER_ALLOC - 1; i++)
+			setopstate->free_pergroups[i].next = >free_pergroups[i + 1];
+		setopstate->free_pergroups[NUM_PERGROUPS_PER_ALLOC - 1].next = NULL;
+	}
+
+	pergroup = setopstate->free_pergroups;
+	setopstate->free_pergroups = pergroup->next;
+
+	return pergroup;
 }
 
 /*
@@ -152,26 +192,26 @@ set_output_count(SetOpState *setopstate, SetOpStatePerGroup pergroup)
 	switch (plannode->cmd)
 	{
 		case SETOPCMD_INTERSECT:
-			if (pergroup->numLeft > 0 && pergroup->numRight > 0)
+			if (pergroup->data.numLeft > 0 && pergroup->data.numRight > 0)
 setopstate->numOutput = 1;
 			else
 setopstate->numOutput = 0;
 			break;
 		case SETOPCMD_INTERSECT_ALL:
 			setopstate->numOutput =
-(pergroup->numLeft < pergroup->numRight) ?
-pergroup->numLeft : pergroup->numRight;
+(pergroup->data.numLeft < pergroup->data.numRight) ?
+pergroup->data.numLeft :