Taewoo Kim created ASTERIXDB-1892:
-------------------------------------
Summary: A similarity join fails with "Failed to insert a new
buffer into the aggregate operator! [HyracksDataException]" exception.
Key: ASTERIXDB-1892
URL: https://issues.apache.org/jira/browse/ASTERIXDB-1892
Project: Apache AsterixDB
Issue Type: Bug
Reporter: Taewoo Kim
Note: this currently happens in the similarity join branch. It doesn't happen
in the master branch. However, this symptom can happen in a situation where the
input data size to the group-by is large.
The following query fails with "Failed to insert a new buffer into the
aggregate operator! [HyracksDataException]" exception. This query fetches
50,000 tuples and uses them as the inner side when doing the similarity join.
The original data set size is about 20GB. And there are 8 nodes (each has two
partitions).
{code}
use dataverse exp;
count(
for $o in dataset
"AmazonReviewNoDup"
for $p in dataset
"AmazonReviewProductID"
for $i in dataset
"AmazonReviewNoDup"
where $p.asin /* +indexnl */ = $i.asin and $p.id >=
int64("3748")
and $p.id <=
int64("8747")
and /* +skip-index */ similarity-jaccard(word-tokens($o.summary),
word-tokens($i.summary)) >= 0.8 and $i.id < $o.id
return {"oid":$o.id, "iid":$i.id}
);
{code}
compiler.groupmemory size: 128MB
The cause of this issue:
When the optimizer sees the group-by operator during the plan optimization, it
calls ExternalGroupByPOperator.calculateGroupByTableCardinality() method to set
the cardinality of the hash table size. This ensures that the hash table size
is never grown beyond the "compiler.groupmemory" size.
And in the actual external hash group-by operator descriptor, it has two phases
- build and merge. If the input data can be fit into the memory, then the final
result will be generated in the build phase. If not, some of partitions will be
spilled to the disk and these spilled partitions will be gradually merged
during the merge phase.
So, it first builds an intermediate result if the given memory is not big
compared to the data size (spilled partitions). Then, it tries to aggregate
them using a different seed for the hash function. At this time during the
merge phase, the cardinality of hash table is set to the number of tuples in
each spilled partition. This cardinality is just given without considering the
fact that the hash table can be grown beyound the budget.
And for the hash table, we only compact the content frames, not the header
frames, The header frame can grow up to the certain size. This size can be
bigger than the allocated memory size. The structure of the hash table can be
found in the following.
https://docs.google.com/presentation/d/1AExoTqQlx9va-AaiZ6OSPxBuQ3NJqz-cG5NGrjdk5FU/edit
So, a possible solution would be:
During the merge phase, calculate the cardinality of the hash table based on
the memory budget. And compare this number to the actual cardinality that the
caller wants to set. Pick the smaller one to be the cardinality of the hash
table. Then, we will be safe. Compacting the header part of the hash table will
help in some sense. But, it doesn't solve the issue fundamentally.
In this observed case,
the cardinality of the hash table during the merge phase was 51,395,283. The
header part alone will take 392 MB if all tuples are unique. And the budget was
128 MB. And, the number of partitions was 24. A spilled partition size was
2.8GB. All of these partitions were spilled to the disk. And at that time, the
hash table alone takes 126MB, not including the data table part. Thus,
additional memory couldn't be allocated.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)