It's a reasonable ask (row indices) in some interactive use cases we've come across. We're working on providing support for this at a higher level of abstraction.
Sent while mobile. Pls excuse typos etc. On Dec 31, 2013 11:34 AM, "Aureliano Buendia" <[email protected]> wrote: > > > > On Mon, Dec 30, 2013 at 8:31 PM, Andrew Ash <[email protected]> wrote: > >> Hi Aureliano, >> >> It's very easy to get lines into (start byte number, line) using Hadoop's >> TextInputFormat. See how SparkContext's textFile() method does it here: >> https://github.com/apache/incubator-spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala#L291 >> > > Thanks for pointing this. while start byte number provides a globally > unique index for each line, my application needs the line number. > > It seems best to go with the source file containing the line numbers, > instead of recreating this is in hadoop/spark. > > >> >> What is the use case where you must have the global line number in the >> file, vs a global ordered unique identifier (my suggestion above) or a >> partition-local line number (discussed extensively below)? >> >> Also if you have any way to do this in plain Hadoop, Spark can use that >> as well. >> >> The fundamental difficulty is that knowing global line number breaks the >> assumption Hadoop makes everywhere that each record is independent of all >> the others. Maybe you should consider adding a line number to the >> beginning of every line on import time into HDFS instead of doing it >> afterwards in Spark. >> >> Cheers! >> Andrew >> >> >> On Mon, Dec 30, 2013 at 12:15 PM, Aureliano Buendia <[email protected] >> > wrote: >> >>> I assumed that number of lines in each partition, except the last >>> partition, is equal. Isn't this the case? In that case Guillaume's approach >>> makes sense. >>> >>> All of these methods are inefficient. Spark needs to support this >>> feature at lower level, as Michael suggested. >>> >>> >>> On Mon, Dec 30, 2013 at 8:01 PM, Guillaume Pitel < >>> [email protected]> wrote: >>> >>>> You're assuming each partition has the same line count. I don't think >>>> it's true (actually, I'm almost certain it's false). And anyway your code >>>> also require two maps. >>>> >>>> In my code, the sorting as well as the other operations are performed >>>> on a very small dataset : one element per partition >>>> >>>> Guillaume >>>> >>>> >>>> >>>> >>>>> Did you try the code I sent ? I think the sortBy is probably in the >>>>> wrong direction, so change it with -i instead of i >>>>> >>>> >>>> I'm confused why would need in memory sorting. We just use a loop >>>> like any other loops in spark. Why shouldn't this solve the problem?: >>>> >>>> val count = lines.count() // lines is the rdd >>>> val partitionLinesCount = count / rdd.partitions.length >>>> linesWithIndex = lines.mapPartitionsWithIndex { (pi, it) => >>>> var i = pi * partitionLinesCount >>>> it.map { >>>> *line => (i, line)* >>>> i += 1 >>>> } >>>> } >>>> >>>> >>>> >>>> -- >>>> [image: eXenSa] >>>> *Guillaume PITEL, Président* >>>> +33(0)6 25 48 86 80 / +33(0)9 70 44 67 53 >>>> >>>> eXenSa S.A.S. <http://www.exensa.com/> >>>> 41, rue Périer - 92120 Montrouge - FRANCE >>>> Tel +33(0)1 84 16 36 77 / Fax +33(0)9 72 28 37 05 >>>> >>> >>> >> >
<<exensa_logo_mail.png>>
