I'm still unclear on if orderBy/groupBy + aggregates is a viable approach
or when one could rely on the last or first aggregate functions, but a
working alternative is to use window functions with row_number and a filter
kind of like this:

import spark.implicits._

val reverseOrdering = Seq("a", "b").map(col => df(col).desc)

val windowSpec = Window.partitionBy("group_id").orderBy(reverseOrdering:_*)

  .filter("row_number == 1")

Would love to know if there's a better way!

On Mon, Aug 28, 2017 at 9:19 AM, Everett Anderson <ever...@nuna.com> wrote:

> Hi,
> I'm struggling a little with some unintuitive behavior with the Scala API.
> (Spark 2.0.2)
> I wrote something like
> df.orderBy("a", "b")
>   .groupBy("group_id")
>   .agg(sum("col_to_sum").as("total"),
>        last("row_id").as("last_row_id")))
> and expected a result with a unique group_id column, a column called
> "total" that's the sum of all col_to_sum in each group, and a column called
> "last_row_id" that's the last row_id seen in each group when the groups are
> sorted by columns a and b.
> However, the result is actually non-deterministic and changes based on the
> initial sorting and partitioning of df.
> I also tried
> df.orderBy("group_id", "a", "b")
>   .groupBy("group_id")
>   .agg(sum("col_to_sum").as("total"),
>        last("row_id").as("last_row_id")))
> thinking the problem might be that the groupBy does another shuffle that
> loses the ordering, but that also doesn't seem to work.
> Looking through the code
> <https://github.com/apache/spark/blob/branch-2.0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Last.scala>,
> both the Last and First aggregate functions have this comment:
> Even if [[Last]] is used on an already sorted column, if
> we do partial aggregation and final aggregation
> (when mergeExpression
> is used) its result will not be deterministic
> (unless the input table is sorted and has
> a single partition, and we use a single reducer to do the aggregation.).
> Some questions:
>    1. What's the best way to take some values from the last row in an
>    ordered group while performing some other aggregates over the entire group?
>    2. Given these comments on last and first, when would these functions
>    be useful? It would be rare to bring an entire Spark table to a single
>    partition.
> Thanks!

