Yes, that’s what I meant. Sure, the numbers might not be actually sorted,
but the order of rows semantically are kept throughout non-shuffling
transforms. I’m on board with you on union as well.

Back to the original question, then, why is it important to coalesce to a
single partition? When you union two RDDs, for example, rdd1 = [“a, b,
c”], rdd2 = [“1, 2, 3”, “4, 5, 6”], then
rdd1.union(rdd2).saveAsTextFile(…) should’ve resulted in a file with three
lines “a, b, c”, “1, 2, 3”, and “4, 5, 6” because the partitions from the
two reds are concatenated.

Mingyu




On 4/29/14, 10:55 PM, "Patrick Wendell" <pwend...@gmail.com> wrote:

>If you call map() on an RDD it will retain the ordering it had before,
>but that is not necessarily a correct sort order for the new RDD.
>
>var rdd = sc.parallelize([2, 1, 3]);
>var sorted = rdd.map(x => (x, x)).sort(); // should be [1, 2, 3]
>var mapped = sorted.mapValues(x => 3 - x); // should be [2, 1, 0]
>
>Note that mapped is no longer sorted.
>
>When you union two RDD's together it will effectively concatenate the
>two orderings, which is also not a valid sorted order on the new RDD:
>
>rdd1 = [1,2,3]
>rdd2 = [1,4,5]
>
>rdd1.union(rdd2) = [1,2,3,1,4,5]
>
>On Tue, Apr 29, 2014 at 10:44 PM, Mingyu Kim <m...@palantir.com> wrote:
>> Thanks for the quick response!
>>
>> To better understand it, the reason sorted RDD has a well-defined
>>ordering
>> is because sortedRDD.getPartitions() returns the partitions in the right
>> order and each partition internally is properly sorted. So, if you have
>>
>> var rdd = sc.parallelize([2, 1, 3]);
>> var sorted = rdd.map(x => (x, x)).sort(); // should be [1, 2, 3]
>> var mapped = sorted.mapValues(x => x + 1); // should be [2, 3, 4]
>>
>> Since mapValues doesn’t change the order of partitions not change the
>> order of rows within the partitions, I think “mapped” should have the
>> exact same order as “sorted”. Sure, if a transform involves shuffling,
>>the
>> order will change. Am I mistaken? Is there an extra detail in sortedRDD
>> that guarantees a well-defined ordering?
>>
>> If it’s true that the order of partitions returned by
>>RDD.getPartitions()
>> and the row orders within the partitions determine the row order, I’m
>>not
>> sure why union doesn’t respect the order because union operation simply
>> concatenates the two lists of partitions from the two RDDs.
>>
>> Mingyu
>>
>>
>>
>>
>> On 4/29/14, 10:25 PM, "Patrick Wendell" <pwend...@gmail.com> wrote:
>>
>>>You are right, once you sort() the RDD, then yes it has a well defined
>>>ordering.
>>>
>>>But that ordering is lost as soon as you transform the RDD, including
>>>if you union it with another RDD.
>>>
>>>On Tue, Apr 29, 2014 at 10:22 PM, Mingyu Kim <m...@palantir.com> wrote:
>>>> Hi Patrick,
>>>>
>>>> I¹m a little confused about your comment that RDDs are not ordered. As
>>>>far
>>>> as I know, RDDs keep list of partitions that are ordered and this is
>>>>why I
>>>> can call RDD.take() and get the same first k rows every time I call it
>>>>and
>>>> RDD.take() returns the same entries as RDD.map(Š).take() because map
>>>> preserves the partition order. RDD order is also what allows me to get
>>>>the
>>>> top k out of RDD by doing RDD.sort().take().
>>>>
>>>> Am I misunderstanding it? Or, is it just when RDD is written to disk
>>>>that
>>>> the order is not well preserved? Thanks in advance!
>>>>
>>>> Mingyu
>>>>
>>>>
>>>>
>>>>
>>>> On 1/22/14, 4:46 PM, "Patrick Wendell" <pwend...@gmail.com> wrote:
>>>>
>>>>>Ah somehow after all this time I've never seen that!
>>>>>
>>>>>On Wed, Jan 22, 2014 at 4:45 PM, Aureliano Buendia
>>>>><buendia...@gmail.com>
>>>>>wrote:
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Thu, Jan 23, 2014 at 12:37 AM, Patrick Wendell
>>>>>><pwend...@gmail.com>
>>>>>> wrote:
>>>>>>>
>>>>>>> What is the ++ operator here? Is this something you defined?
>>>>>>
>>>>>>
>>>>>> No, it's an alias for union defined in RDD.scala:
>>>>>>
>>>>>> def ++(other: RDD[T]): RDD[T] = this.union(other)
>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Another issue is that RDD's are not ordered, so when you union two
>>>>>>> together it doesn't have a well defined ordering.
>>>>>>>
>>>>>>> If you do want to do this you could coalesce into one partition,
>>>>>>>then
>>>>>>> call MapPartitions and return an iterator that first adds your
>>>>>>>header
>>>>>>> and then the rest of the file, then call saveAsTextFile. Keep in
>>>>>>>mind
>>>>>>> this will only work if you coalesce into a single partition.
>>>>>>
>>>>>>
>>>>>> Thanks! I'll give this a try.
>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> myRdd.coalesce(1)
>>>>>>> .map(_.mkString(",")))
>>>>>>> .mapPartitions(it => (Seq("col1,col2,col3") ++ it).iterator)
>>>>>>> .saveAsTextFile("out.csv")
>>>>>>>
>>>>>>> - Patrick
>>>>>>>
>>>>>>> On Wed, Jan 22, 2014 at 11:12 AM, Aureliano Buendia
>>>>>>> <buendia...@gmail.com> wrote:
>>>>>>> > Hi,
>>>>>>> >
>>>>>>> > I'm trying to find a way to create a csv header when using
>>>>>>> > saveAsTextFile,
>>>>>>> > and I came up with this:
>>>>>>> >
>>>>>>> > (sc.makeRDD(Array("col1,col2,col3"), 1) ++
>>>>>>> > myRdd.coalesce(1).map(_.mkString(",")))
>>>>>>> >       .saveAsTextFile("out.csv")
>>>>>>> >
>>>>>>> > But it only saves the header part. Why is that the union method
>>>>>>>does
>>>>>>>not
>>>>>>> > return both RDD's?
>>>>>>
>>>>>>

Attachment: smime.p7s
Description: S/MIME cryptographic signature

Reply via email to