On Wed, Jan 1, 2014 at 9:35 AM, K. Shankari <[email protected]>wrote:
> Why not use a zipped RDD? > > http://spark.incubator.apache.org/docs/latest/api/core/index.html#org.apache.spark.rdd.ZippedRDD > I do not know why no one else suggested this. Of course it has 3 extra loops (one for counting rdd, one for generating the range, one for zipping). Apart from this performance problem, any other caveats? > > I have used something like this in the past. > > > val index = sc.parallelize(Range.Long(0, rdd.count, 1), > rdd.partitions.size) > > val rddWithIndex = rdd.zip(index) > > If that doesn't work, then you could try zipPartitions as well, since it > has slightly more relaxed constraints. > > Thanks, > Shankari > > > On Tue, Dec 31, 2013 at 11:39 AM, Christopher Nguyen <[email protected]>wrote: > >> It's a reasonable ask (row indices) in some interactive use cases we've >> come across. We're working on providing support for this at a higher level >> of abstraction. >> >> Sent while mobile. Pls excuse typos etc. >> On Dec 31, 2013 11:34 AM, "Aureliano Buendia" <[email protected]> >> wrote: >> >>> >>> >>> >>> On Mon, Dec 30, 2013 at 8:31 PM, Andrew Ash <[email protected]>wrote: >>> >>>> Hi Aureliano, >>>> >>>> It's very easy to get lines into (start byte number, line) using >>>> Hadoop's TextInputFormat. See how SparkContext's textFile() method does it >>>> here: >>>> https://github.com/apache/incubator-spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala#L291 >>>> >>> >>> Thanks for pointing this. while start byte number provides a globally >>> unique index for each line, my application needs the line number. >>> >>> It seems best to go with the source file containing the line numbers, >>> instead of recreating this is in hadoop/spark. >>> >>> >>>> >>>> What is the use case where you must have the global line number in the >>>> file, vs a global ordered unique identifier (my suggestion above) or a >>>> partition-local line number (discussed extensively below)? >>>> >>>> Also if you have any way to do this in plain Hadoop, Spark can use that >>>> as well. >>>> >>>> The fundamental difficulty is that knowing global line number breaks >>>> the assumption Hadoop makes everywhere that each record is independent of >>>> all the others. Maybe you should consider adding a line number to the >>>> beginning of every line on import time into HDFS instead of doing it >>>> afterwards in Spark. >>>> >>>> Cheers! >>>> Andrew >>>> >>>> >>>> On Mon, Dec 30, 2013 at 12:15 PM, Aureliano Buendia < >>>> [email protected]> wrote: >>>> >>>>> I assumed that number of lines in each partition, except the last >>>>> partition, is equal. Isn't this the case? In that case Guillaume's >>>>> approach >>>>> makes sense. >>>>> >>>>> All of these methods are inefficient. Spark needs to support this >>>>> feature at lower level, as Michael suggested. >>>>> >>>>> >>>>> On Mon, Dec 30, 2013 at 8:01 PM, Guillaume Pitel < >>>>> [email protected]> wrote: >>>>> >>>>>> You're assuming each partition has the same line count. I don't >>>>>> think it's true (actually, I'm almost certain it's false). And anyway >>>>>> your >>>>>> code also require two maps. >>>>>> >>>>>> In my code, the sorting as well as the other operations are performed >>>>>> on a very small dataset : one element per partition >>>>>> >>>>>> Guillaume >>>>>> >>>>>> >>>>>> >>>>>> >>>>>>> Did you try the code I sent ? I think the sortBy is probably in the >>>>>>> wrong direction, so change it with -i instead of i >>>>>>> >>>>>> >>>>>> I'm confused why would need in memory sorting. We just use a loop >>>>>> like any other loops in spark. Why shouldn't this solve the problem?: >>>>>> >>>>>> val count = lines.count() // lines is the rdd >>>>>> val partitionLinesCount = count / rdd.partitions.length >>>>>> linesWithIndex = lines.mapPartitionsWithIndex { (pi, it) => >>>>>> var i = pi * partitionLinesCount >>>>>> it.map { >>>>>> *line => (i, line)* >>>>>> i += 1 >>>>>> } >>>>>> } >>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> [image: eXenSa] >>>>>> *Guillaume PITEL, Président* >>>>>> +33(0)6 25 48 86 80 / +33(0)9 70 44 67 53 >>>>>> >>>>>> eXenSa S.A.S. <http://www.exensa.com/> >>>>>> 41, rue Périer - 92120 Montrouge - FRANCE >>>>>> Tel +33(0)1 84 16 36 77 / Fax +33(0)9 72 28 37 05 >>>>>> >>>>> >>>>> >>>> >>> >
<<exensa_logo_mail.png>>
