Hi All, I have a UDAF that seems to perform poorly when its input is skewed. I have been debugging the UDAF implementation but I don't see any code that is causing the performance to degrade. More details on the data and the experiments I have run.
DataSet: Assume 3 columns, column1 being the key. Column1 Column2 Column3 a 1 x a 2 x a 3 x a 4 x a 5 x a 6 z 5 million row for a .... a 1000000 y b 9 y b 9 y b 10 y 3 million rows for b ... more rows total rows is 100 million a has 5 million rows.Column2 for a has 1 million unique values. b has 3 million rows. Column2 for b has 800000 unique values. Column 3 has just 100s of unique values not in the order of millions, for both a and b. Say totally there are 100 million rows as the input to a UDAF aggregation. And the skew in data is for the keys a and b. All other rows can be ignored and do not cause any performance issue/ hot partitions. The code does a dataSet.groupBy("Column1").agg(udaf("Column2", "Column3"). I commented out the UDAF implementation for update and merge methods, so essentially the UDAF was doing nothing. With this code (empty updated and merge for UDAF) the performance for a mircro-batch is 16 minutes per micro-batch, micro-batch containing 100 million rows, with 5million rows for a and 1 million unique values for Column2 for a. But when I pass empty values for Column2 with nothing else change, effectively reducing the 1 million unique values for Column2 to just 1 unique value, empty value. The batch processing time goes down to 4 minutes. So I am trying to understand why is there such a big performance difference? What in UDAF causes the processing time to increase in orders of magnitude when there is a skew in the data as observed above? Any insight from spark developers, contributors, or anyone else who has a deeper understanding of UDAF would be helpful. Thanks, Bharath