Thanks for the pointer.
I have been reading the code and trying to understand how to create an
efficient aggregate function but I must be missing something because it seems
to me that creating any kind of aggregation function which uses non primitive
types would have a high overhead.
Consider the following simple example: We have a column which contains the
numbers 1-10. We want to calculate a histogram for these values.
In an equivalent to the hand written code in
https://databricks.com/blog/2016/05/23/apache-spark-as-a-compiler-joining-a-billion-rows-per-second-on-a-laptop.html
The trivial solution (a student solution) would look something like this:
var hist = new int[10]
for (v in col) {
hist[v] += 1
}
The problem is that as far as I understand, spark wouldn’t create it this way.
Instead I would need to do something like “update hist in position v by +1”
which in practice would mean the array will be copied at least 3 times:
First it will be copied from its unsafe implementation to a scala sequence
(even worse, since arrays always use offsets, the copying would have to be done
element by element instead of a single memcopy), then since the array is
immutable, we will have to create a new version of it (by copying and changing
just the relevant element) and then we copy it back to the unsafe version.
I tried to look at examples in the code which have an intermediate buffer which
is not a simple structure. Basically, I see two such types of examples:
distinct operations (which, if I understand correctly, somehow internally has a
hashmap to contain the distinct values but I can’t find the code which
generates it) and collect functions (collect_list, collect_set) which do not
appear to do any code generation BUT define their own buffer as they will (the
buffer is NOT of a regular type).
So I was wondering, what is the right way to implement an efficient logic as
above.
I see two options:
1. Using UDAF – In this case I would define the buffer to have 10 integer
fields and manipulate each. This solution suffers from two problems: First it
is slow (especially if there are other aggregations which are using spark sql
expressions) and second it is limited (I can’t change the size of the array in
the middle. For example, assuming the above histogram is made on a groupby and
I know beforehand that in 99% of the cases there are 3 values but in 1% of the
cases there are 100 values. If I would have used an array I would just convert
to a bigger array the first time I see a value from the 100)
2. Implement similar to collect_list and collect_set. If I look at the
documentation for collect class, this uses the slower sort based aggregation
path because the number of elmenets can not be determined in advance even
though in the basic case above, we do know the size. (although I am not sure
how its performance would compare to the UDAF option). This appears to be
simpler than UDAF because I can use the data types I want directly, however I
can’t figure out how the code generation is done as I do not see the relevant
functions when doing debugCodegen on the result
I also believe there should be a third option by actually implementing the
proper expression, but I have no idea how to do that.
Can anyone point me in the right direction?
From: rxin [via Apache Spark Developers List]
[mailto:ml-node+s1001551n18985...@n3.nabble.com]
Sent: Monday, September 19, 2016 12:23 AM
To: Mendelson, Assaf
Subject: Re: Memory usage for spark types
Take a look at UnsafeArrayData and UnsafeMapData.
On Sun, Sep 18, 2016 at 9:06 AM, assaf.mendelson <[hidden
email]> wrote:
Hi,
I am trying to understand how spark types are kept in memory and accessed.
I tried to look at the code at the definition of MapType and ArrayType for
example and I can’t seem to find the relevant code for its actual
implementation.
I am trying to figure out how these two types are implemented to understand how
they match my needs.
In general, it appears the size of a map is the same as two arrays which is
about double the naïve array implementation: if I have 1000 rows, each with a
map from 10K integers to 10K integers, I find through caching the dataframe
that the total is is ~150MB (the naïve implementation of two arrays would code
1000*1*(4+4) or a total of ~80MB). I see the same size if I use two arrays.
Second, what would be the performance of updating the map/arrays as they are
immutable (i.e. some copying is required).
The reason I am asking this is because I wanted to do an aggregate function
which calculates a variation of a histogram.
The most naïve solution for this would be to have a map from the bin to the
count. But since we are talking about an immutable map, wouldn’t that cost a
lot more?
An even further optimization would be to use a mutable array where we combine
the key and value to a single value (key and value are both int in my case).
Assuming the maximum