Taewoo Kim created ASTERIXDB-1892:
-------------------------------------

             Summary: A similarity join fails with "Failed to insert a new 
buffer into the aggregate operator! [HyracksDataException]" exception.
                 Key: ASTERIXDB-1892
                 URL: https://issues.apache.org/jira/browse/ASTERIXDB-1892
             Project: Apache AsterixDB
          Issue Type: Bug
            Reporter: Taewoo Kim


Note: this currently happens in the similarity join branch. It doesn't happen 
in the master branch. However, this symptom can happen in a situation where the 
input data size to the group-by is large. 

The following query fails with "Failed to insert a new buffer into the 
aggregate operator! [HyracksDataException]" exception. This query fetches 
50,000 tuples and uses them as the inner side when doing the similarity join. 
The original data set size is about 20GB. And there are 8 nodes (each has two 
partitions).

{code}
use dataverse exp;
count(
for $o in dataset
"AmazonReviewNoDup"
for $p in dataset
"AmazonReviewProductID"
for $i in dataset
"AmazonReviewNoDup"
where $p.asin /* +indexnl */ = $i.asin and $p.id >=
int64("3748")
and $p.id <=
int64("8747")
and /* +skip-index */ similarity-jaccard(word-tokens($o.summary), 
word-tokens($i.summary)) >= 0.8 and $i.id < $o.id
return {"oid":$o.id, "iid":$i.id}
);
{code}

compiler.groupmemory size: 128MB

The cause of this issue:
When the optimizer sees the group-by operator during the plan optimization, it 
calls ExternalGroupByPOperator.calculateGroupByTableCardinality() method to set 
the cardinality of the hash table size. This ensures that the hash table size 
is never grown beyond the "compiler.groupmemory" size.

And in the actual external hash group-by operator descriptor, it has two phases 
- build and merge. If the input data can be fit into the memory, then the final 
result will be generated in the build phase. If not, some of partitions will be 
spilled to the disk and these spilled partitions will be gradually merged 
during the merge phase.

So, it first builds an intermediate result if the given memory is not big 
compared to the data size (spilled partitions). Then, it tries to aggregate 
them using a different seed for the hash function. At this time during the 
merge phase, the cardinality of hash table is set to the number of tuples in 
each spilled partition. This cardinality is just given without considering the 
fact that the hash table can be grown beyound the budget. 

And for the hash table, we only compact the content frames, not the header 
frames, The header frame can grow up to the certain size. This size can be 
bigger than the allocated memory size. The structure of the hash table can be 
found in the following.

https://docs.google.com/presentation/d/1AExoTqQlx9va-AaiZ6OSPxBuQ3NJqz-cG5NGrjdk5FU/edit

So, a possible solution would be:
During the merge phase, calculate the cardinality of the hash table based on 
the memory budget. And compare this number to the actual cardinality that the 
caller wants to set. Pick the smaller one to be the cardinality of the hash 
table. Then, we will be safe. Compacting the header part of the hash table will 
help in some sense. But, it doesn't solve the issue fundamentally. 

In this observed case,
the cardinality of the hash table during the merge phase was 51,395,283. The 
header part alone will take 392 MB if all tuples are unique. And the budget was 
128 MB. And, the number of partitions was 24. A spilled partition size was 
2.8GB. All of these partitions were spilled to the disk. And at that time, the 
hash table alone takes 126MB, not including the data table part. Thus, 
additional memory couldn't be allocated.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to