I'm not sure about this, but I suspect the answer is:  spark doesn't
guarantee a stable sort, nor does it plan to in the future, so the
implementation has more flexibility.

But you might be interested in the work being done on secondary sort, which
could give you the guarantees you want:
https://issues.apache.org/jira/browse/SPARK-3655
On Jan 19, 2015 4:52 PM, "justin.uang" <justin.u...@gmail.com> wrote:

> Hi,
>
> I am trying to aggregate a key based on some timestamp, and I believe that
> spilling to disk is changing the order of the data fed into the combiner.
>
> I have some timeseries data that is of the form: ("key", "date", "other
> data")
>
>     Partition 1
>     ("A", 2, ...)
>     ("B", 4, ...)
>     ("A", 1, ...)
>     ("A", 3, ...)
>     ("B", 6, ...)
>
> which I then partition by key, then sort within the partition:
>
>     Partition 1
>     ("A", 1, ...)
>     ("A", 2, ...)
>     ("A", 3, ...)
>     ("A", 4, ...)
>
>     Partition 2
>     ("B", 4, ...)
>     ("B", 6, ...)
>
> If I run a combineByKey with the same partitioner, then the items for each
> key will be fed into the ExternalAppendOnlyMap in the correct order.
> However, if I spill, then the time slices are spilled to disk as multiple
> partial combiners. When its time to merge the spilled combiners for each
> key, the combiners are combined in the wrong order.
>
> For example, if during a groupByKey, [("A", 1, ...), ("A", 2...)] and
> [("A",
> 3, ...), ("A", 4, ...)] are spilled separately, it's possible that the
> combiners can be combined in the wrong order, like [("A", 3, ...), ("A", 4,
> ...), ("A", 1, ...), ("A", 2, ...)], which invalidates the invariant that
> all the values for A are passed in order to the combiners.
>
> I'm not an expert, but I suspect that this is because we use a heap ordered
> by key when iterating, which doesn't retain the order the spilled
> combiners.
> Perhaps we can order our mergeHeap by (hash_key, spill_index), where
> spill_index is incremented each time we spill? This would mean that we
> would
> pop and merge the combiners of each key in order, resulting in [("A", 1,
> ...), ("A", 2, ...), ("A", 3, ...), ("A", 4, ...)].
>
> Thanks in advance for the help! If there is a way to do this already in
> Spark 1.2, can someone point it out to me?
>
> Best,
>
> Justin
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Aggregations-based-on-sort-order-tp21245.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>

Reply via email to