Why not use a zipped RDD? http://spark.incubator.apache.org/docs/latest/api/core/index.html#org.apache.spark.rdd.ZippedRDD
I have used something like this in the past. > val index = sc.parallelize(Range.Long(0, rdd.count, 1), rdd.partitions.size) > val rddWithIndex = rdd.zip(index) If that doesn't work, then you could try zipPartitions as well, since it has slightly more relaxed constraints. Thanks, Shankari On Tue, Dec 31, 2013 at 11:39 AM, Christopher Nguyen <[email protected]> wrote: > It's a reasonable ask (row indices) in some interactive use cases we've > come across. We're working on providing support for this at a higher level > of abstraction. > > Sent while mobile. Pls excuse typos etc. > On Dec 31, 2013 11:34 AM, "Aureliano Buendia" <[email protected]> > wrote: > >> >> >> >> On Mon, Dec 30, 2013 at 8:31 PM, Andrew Ash <[email protected]> wrote: >> >>> Hi Aureliano, >>> >>> It's very easy to get lines into (start byte number, line) using >>> Hadoop's TextInputFormat. See how SparkContext's textFile() method does it >>> here: >>> https://github.com/apache/incubator-spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala#L291 >>> >> >> Thanks for pointing this. while start byte number provides a globally >> unique index for each line, my application needs the line number. >> >> It seems best to go with the source file containing the line numbers, >> instead of recreating this is in hadoop/spark. >> >> >>> >>> What is the use case where you must have the global line number in the >>> file, vs a global ordered unique identifier (my suggestion above) or a >>> partition-local line number (discussed extensively below)? >>> >>> Also if you have any way to do this in plain Hadoop, Spark can use that >>> as well. >>> >>> The fundamental difficulty is that knowing global line number breaks the >>> assumption Hadoop makes everywhere that each record is independent of all >>> the others. Maybe you should consider adding a line number to the >>> beginning of every line on import time into HDFS instead of doing it >>> afterwards in Spark. >>> >>> Cheers! >>> Andrew >>> >>> >>> On Mon, Dec 30, 2013 at 12:15 PM, Aureliano Buendia < >>> [email protected]> wrote: >>> >>>> I assumed that number of lines in each partition, except the last >>>> partition, is equal. Isn't this the case? In that case Guillaume's approach >>>> makes sense. >>>> >>>> All of these methods are inefficient. Spark needs to support this >>>> feature at lower level, as Michael suggested. >>>> >>>> >>>> On Mon, Dec 30, 2013 at 8:01 PM, Guillaume Pitel < >>>> [email protected]> wrote: >>>> >>>>> You're assuming each partition has the same line count. I don't >>>>> think it's true (actually, I'm almost certain it's false). And anyway your >>>>> code also require two maps. >>>>> >>>>> In my code, the sorting as well as the other operations are performed >>>>> on a very small dataset : one element per partition >>>>> >>>>> Guillaume >>>>> >>>>> >>>>> >>>>> >>>>>> Did you try the code I sent ? I think the sortBy is probably in the >>>>>> wrong direction, so change it with -i instead of i >>>>>> >>>>> >>>>> I'm confused why would need in memory sorting. We just use a loop >>>>> like any other loops in spark. Why shouldn't this solve the problem?: >>>>> >>>>> val count = lines.count() // lines is the rdd >>>>> val partitionLinesCount = count / rdd.partitions.length >>>>> linesWithIndex = lines.mapPartitionsWithIndex { (pi, it) => >>>>> var i = pi * partitionLinesCount >>>>> it.map { >>>>> *line => (i, line)* >>>>> i += 1 >>>>> } >>>>> } >>>>> >>>>> >>>>> >>>>> -- >>>>> [image: eXenSa] >>>>> *Guillaume PITEL, Président* >>>>> +33(0)6 25 48 86 80 / +33(0)9 70 44 67 53 >>>>> >>>>> eXenSa S.A.S. <http://www.exensa.com/> >>>>> 41, rue Périer - 92120 Montrouge - FRANCE >>>>> Tel +33(0)1 84 16 36 77 / Fax +33(0)9 72 28 37 05 >>>>> >>>> >>>> >>> >>
<<exensa_logo_mail.png>>
