Ah, so I misunderstood you too :).

My reading of org/ apache/spark/Aggregator.scala is that your function will
always see the items in the order that they are in the input RDD. An RDD
partition is always accessed as an iterator, so it will not be read out of
order.

On Wed, Nov 19, 2014 at 2:28 PM, Aniket Bhatnagar <
aniket.bhatna...@gmail.com> wrote:

> Thanks Daniel. I can understand that the keys will not be in sorted order
> but what I am trying to understanding is whether the functions are passed
> values in sorted order in a given partition.
>
> For example:
>
> sc.parallelize(1 to 8).map(i => (1, i)).sortBy(t => t._2).foldByKey(0)((a,
> b) => b).collect
> res0: Array[(Int, Int)] = Array((1,8))
>
> The fold always given me last value as 8 which suggests values preserve
> sorting earlier defined in stage in DAG?
>
> On Wed Nov 19 2014 at 18:10:11 Daniel Darabos <
> daniel.dara...@lynxanalytics.com> wrote:
>
>> Akhil, I think Aniket uses the word "persisted" in a different way than
>> what you mean. I.e. not in the RDD.persist() way. Aniket asks if running
>> combineByKey on a sorted RDD will result in a sorted RDD. (I.e. the sorting
>> is preserved.)
>>
>> I think the answer is no. combineByKey uses AppendOnlyMap, which is a
>> hashmap. That will shuffle your keys. You can quickly verify it in
>> spark-shell:
>>
>> scala> sc.parallelize(7 to 8).map(_ -> 1).reduceByKey(_ + _).collect
>> res0: Array[(Int, Int)] = Array((8,1), (7,1))
>>
>> (The initial size of the AppendOnlyMap seems to be 8, so 8 is the first
>> number that demonstrates this.)
>>
>> On Wed, Nov 19, 2014 at 9:05 AM, Akhil Das <ak...@sigmoidanalytics.com>
>> wrote:
>>
>>> If something is persisted you can easily see them under the Storage tab
>>> in the web ui.
>>>
>>> Thanks
>>> Best Regards
>>>
>>> On Tue, Nov 18, 2014 at 7:26 PM, Aniket Bhatnagar <
>>> aniket.bhatna...@gmail.com> wrote:
>>>
>>>> I am trying to figure out if sorting is persisted after applying Pair
>>>> RDD transformations and I am not able to decisively tell after reading the
>>>> documentation.
>>>>
>>>> For example:
>>>> val numbers = .. // RDD of numbers
>>>> val pairedNumbers = numbers.map(number => (number % 100, number))
>>>> val sortedPairedNumbers = pairedNumbers.sortBy(pairedNumber =>
>>>> pairedNumber._2) // Sort by values in the pair
>>>> val aggregates = sortedPairedNumbers.combineByKey(..)
>>>>
>>>> In this example, will the combine functions see values in sorted order?
>>>> What if I had done groupByKey and then combineByKey? What transformations
>>>> can unsort an already sorted data?
>>>>
>>>
>>>
>>

Reply via email to