Max Seiden created SPARK-7629:
---------------------------------

             Summary: Unroll AggregateFunction "update" in the case of a single 
input expression
                 Key: SPARK-7629
                 URL: https://issues.apache.org/jira/browse/SPARK-7629
             Project: Spark
          Issue Type: Improvement
          Components: SQL
    Affects Versions: 1.2.1
            Reporter: Max Seiden
            Priority: Minor


h3. Summary
This was observed on 1.2.1. Many of the AggregateFunctions take a list of 
Expressions as input, wrap them up in an InterpretedProjection, and evaluate 
the InterpretedProjection to get the arguments for the actual aggregate 
expression. In the case where there is only a single input expression however, 
this leads to gross inefficiencies. 

Take CountDistinctFunction as an example, and assume it has a single input 
expression of type String. In this case spark uses an OpenHashSet[Any] to 
collect a distinct set of GenericRow(Array[Any](string)). This is hugely 
inefficient, since every String object must be wrapped up in first an Array and 
second a GenericRow, and then inserted into the OpenHashSet where all hashing 
and equality comparisons happen on the GenericRow. 

This means that any single OpenHashSet entry has unnecessary overhead from the 
GenericRow and Array[Any], and all hashcode and equality operations must go 
through the Row. In the case of hashcode, this means that every invocation 
requires a while loop and pattern match. In the case of equality, 
Seq[Any].equals is used, which requires (for both the candidate and existing 
objects) a pattern match, call to "canEqual", and call to "sameElements" - the 
last of these (in IterableLike as far as I can tell) constructs a Scala 
Iterator over the Array[Any] and does element-by-element comparisons. 

Needless to say, this requires far too many cycles in the case of a single 
input Expression, has a high and unnecessary memory overhead, and generates a 
ton of garbage. To give a concrete example, I am unable to compute a grand 
distinct on an input of 15M unique IP addresses in a 2GB JVM. After a few 
seconds of running, I start to get substantial GCs, and eventually get into a 
state where the JVM is stuck doing full GCs. Additionally a profile of the 
executor thread shows that 85% of the time is spent rehashing (perhaps 
explainable at that scale), but that 65% of the time is spent in 
GenericRow.hashCode. 

My proposed improvement is to unroll those aggregate functions in the case of a 
single input expression so that the inefficiencies described above can be 
avoided altogether. The only tricky bits here are dealing with NULLs (Row does 
that for us) and efficiently handling both the single input and multi-input 
cases within the same aggregate function impl.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to