Hi Aureliano, It's very easy to get lines into (start byte number, line) using Hadoop's TextInputFormat. See how SparkContext's textFile() method does it here: https://github.com/apache/incubator-spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala#L291
What is the use case where you must have the global line number in the file, vs a global ordered unique identifier (my suggestion above) or a partition-local line number (discussed extensively below)? Also if you have any way to do this in plain Hadoop, Spark can use that as well. The fundamental difficulty is that knowing global line number breaks the assumption Hadoop makes everywhere that each record is independent of all the others. Maybe you should consider adding a line number to the beginning of every line on import time into HDFS instead of doing it afterwards in Spark. Cheers! Andrew On Mon, Dec 30, 2013 at 12:15 PM, Aureliano Buendia <[email protected]>wrote: > I assumed that number of lines in each partition, except the last > partition, is equal. Isn't this the case? In that case Guillaume's approach > makes sense. > > All of these methods are inefficient. Spark needs to support this feature > at lower level, as Michael suggested. > > > On Mon, Dec 30, 2013 at 8:01 PM, Guillaume Pitel < > [email protected]> wrote: > >> You're assuming each partition has the same line count. I don't think >> it's true (actually, I'm almost certain it's false). And anyway your code >> also require two maps. >> >> In my code, the sorting as well as the other operations are performed on >> a very small dataset : one element per partition >> >> Guillaume >> >> >> >> >>> Did you try the code I sent ? I think the sortBy is probably in the >>> wrong direction, so change it with -i instead of i >>> >> >> I'm confused why would need in memory sorting. We just use a loop like >> any other loops in spark. Why shouldn't this solve the problem?: >> >> val count = lines.count() // lines is the rdd >> val partitionLinesCount = count / rdd.partitions.length >> linesWithIndex = lines.mapPartitionsWithIndex { (pi, it) => >> var i = pi * partitionLinesCount >> it.map { >> *line => (i, line)* >> i += 1 >> } >> } >> >> >> >> -- >> [image: eXenSa] >> *Guillaume PITEL, Président* >> +33(0)6 25 48 86 80 / +33(0)9 70 44 67 53 >> >> eXenSa S.A.S. <http://www.exensa.com/> >> 41, rue Périer - 92120 Montrouge - FRANCE >> Tel +33(0)1 84 16 36 77 / Fax +33(0)9 72 28 37 05 >> > >
<<exensa_logo_mail.png>>
