I'm still unclear on if orderBy/groupBy + aggregates is a viable approach
or when one could rely on the last or first aggregate functions, but a
working alternative is to use window functions with row_number and a filter
kind of like this:

import spark.implicits._

val reverseOrdering = Seq("a", "b").map(col => df(col).desc)

val windowSpec = Window.partitionBy("group_id").orderBy(reverseOrdering:_*)

df.select("group_id",
          "row_id",
          sum("col_to_sum").over(windowSpec).as("total"),
          row_number().over(windowSpec).as("row_number"))
  .filter("row_number == 1")
  .select($"group_id",
          $"row_id".as("last_row_id"),
          $"total")

Would love to know if there's a better way!

On Mon, Aug 28, 2017 at 9:19 AM, Everett Anderson <ever...@nuna.com> wrote:

> Hi,
>
> I'm struggling a little with some unintuitive behavior with the Scala API.
> (Spark 2.0.2)
>
> I wrote something like
>
> df.orderBy("a", "b")
>   .groupBy("group_id")
>   .agg(sum("col_to_sum").as("total"),
>        last("row_id").as("last_row_id")))
>
> and expected a result with a unique group_id column, a column called
> "total" that's the sum of all col_to_sum in each group, and a column called
> "last_row_id" that's the last row_id seen in each group when the groups are
> sorted by columns a and b.
>
> However, the result is actually non-deterministic and changes based on the
> initial sorting and partitioning of df.
>
> I also tried
>
> df.orderBy("group_id", "a", "b")
>   .groupBy("group_id")
>   .agg(sum("col_to_sum").as("total"),
>        last("row_id").as("last_row_id")))
>
> thinking the problem might be that the groupBy does another shuffle that
> loses the ordering, but that also doesn't seem to work.
>
> Looking through the code
> <https://github.com/apache/spark/blob/branch-2.0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Last.scala>,
> both the Last and First aggregate functions have this comment:
>
> Even if [[Last]] is used on an already sorted column, if
> we do partial aggregation and final aggregation
> (when mergeExpression
> is used) its result will not be deterministic
> (unless the input table is sorted and has
> a single partition, and we use a single reducer to do the aggregation.).
>
>
> Some questions:
>
>    1. What's the best way to take some values from the last row in an
>    ordered group while performing some other aggregates over the entire group?
>
>    2. Given these comments on last and first, when would these functions
>    be useful? It would be rare to bring an entire Spark table to a single
>    partition.
>
> Thanks!
>
>

Reply via email to