I'm still unclear on if orderBy/groupBy + aggregates is a viable approach or when one could rely on the last or first aggregate functions, but a working alternative is to use window functions with row_number and a filter kind of like this:
import spark.implicits._ val reverseOrdering = Seq("a", "b").map(col => df(col).desc) val windowSpec = Window.partitionBy("group_id").orderBy(reverseOrdering:_*) df.select("group_id", "row_id", sum("col_to_sum").over(windowSpec).as("total"), row_number().over(windowSpec).as("row_number")) .filter("row_number == 1") .select($"group_id", $"row_id".as("last_row_id"), $"total") Would love to know if there's a better way! On Mon, Aug 28, 2017 at 9:19 AM, Everett Anderson <ever...@nuna.com> wrote: > Hi, > > I'm struggling a little with some unintuitive behavior with the Scala API. > (Spark 2.0.2) > > I wrote something like > > df.orderBy("a", "b") > .groupBy("group_id") > .agg(sum("col_to_sum").as("total"), > last("row_id").as("last_row_id"))) > > and expected a result with a unique group_id column, a column called > "total" that's the sum of all col_to_sum in each group, and a column called > "last_row_id" that's the last row_id seen in each group when the groups are > sorted by columns a and b. > > However, the result is actually non-deterministic and changes based on the > initial sorting and partitioning of df. > > I also tried > > df.orderBy("group_id", "a", "b") > .groupBy("group_id") > .agg(sum("col_to_sum").as("total"), > last("row_id").as("last_row_id"))) > > thinking the problem might be that the groupBy does another shuffle that > loses the ordering, but that also doesn't seem to work. > > Looking through the code > <https://github.com/apache/spark/blob/branch-2.0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Last.scala>, > both the Last and First aggregate functions have this comment: > > Even if [[Last]] is used on an already sorted column, if > we do partial aggregation and final aggregation > (when mergeExpression > is used) its result will not be deterministic > (unless the input table is sorted and has > a single partition, and we use a single reducer to do the aggregation.). > > > Some questions: > > 1. What's the best way to take some values from the last row in an > ordered group while performing some other aggregates over the entire group? > > 2. Given these comments on last and first, when would these functions > be useful? It would be rare to bring an entire Spark table to a single > partition. > > Thanks! > >