It's a reasonable ask (row indices) in some interactive use cases we've
come across. We're working on providing support for this at a higher level
of abstraction.

Sent while mobile. Pls excuse typos etc.
On Dec 31, 2013 11:34 AM, "Aureliano Buendia" <[email protected]> wrote:

>
>
>
> On Mon, Dec 30, 2013 at 8:31 PM, Andrew Ash <[email protected]> wrote:
>
>> Hi Aureliano,
>>
>> It's very easy to get lines into (start byte number, line) using Hadoop's
>> TextInputFormat.  See how SparkContext's textFile() method does it here:
>> https://github.com/apache/incubator-spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala#L291
>>
>
> Thanks for pointing this. while start byte number provides a globally
> unique index for each line, my application needs the line number.
>
> It seems best to go with the source file containing the line numbers,
> instead of recreating this is in hadoop/spark.
>
>
>>
>> What is the use case where you must have the global line number in the
>> file, vs a global ordered unique identifier (my suggestion above) or a
>> partition-local line number (discussed extensively below)?
>>
>> Also if you have any way to do this in plain Hadoop, Spark can use that
>> as well.
>>
>> The fundamental difficulty is that knowing global line number breaks the
>> assumption Hadoop makes everywhere that each record is independent of all
>> the others.  Maybe you should consider adding a line number to the
>> beginning of every line on import time into HDFS instead of doing it
>> afterwards in Spark.
>>
>> Cheers!
>> Andrew
>>
>>
>> On Mon, Dec 30, 2013 at 12:15 PM, Aureliano Buendia <[email protected]
>> > wrote:
>>
>>> I assumed that number of lines in each partition, except the last
>>> partition, is equal. Isn't this the case? In that case Guillaume's approach
>>> makes sense.
>>>
>>> All of these methods are inefficient. Spark needs to support this
>>> feature at lower level, as Michael suggested.
>>>
>>>
>>> On Mon, Dec 30, 2013 at 8:01 PM, Guillaume Pitel <
>>> [email protected]> wrote:
>>>
>>>>  You're assuming each partition has the same line count. I don't think
>>>> it's true (actually, I'm almost certain it's false). And anyway your code
>>>> also require two maps.
>>>>
>>>> In my code, the sorting as well as the other operations are performed
>>>> on a very small dataset : one element per partition
>>>>
>>>> Guillaume
>>>>
>>>>
>>>>
>>>>
>>>>> Did you try the code I sent ? I think the sortBy is probably in the
>>>>> wrong direction, so change it with -i instead of i
>>>>>
>>>>
>>>>  I'm confused why would need in memory sorting. We just use a loop
>>>> like any other loops in spark. Why shouldn't this solve the problem?:
>>>>
>>>>     val count = lines.count() // lines is the rdd
>>>>     val partitionLinesCount = count / rdd.partitions.length
>>>>     linesWithIndex = lines.mapPartitionsWithIndex { (pi, it) =>
>>>>       var i = pi * partitionLinesCount
>>>>       it.map {
>>>>         *line => (i, line)*
>>>>          i += 1
>>>>        }
>>>>     }
>>>>
>>>>
>>>>
>>>> --
>>>>    [image: eXenSa]
>>>>  *Guillaume PITEL, Président*
>>>> +33(0)6 25 48 86 80 / +33(0)9 70 44 67 53
>>>>
>>>>  eXenSa S.A.S. <http://www.exensa.com/>
>>>>  41, rue Périer - 92120 Montrouge - FRANCE
>>>> Tel +33(0)1 84 16 36 77 / Fax +33(0)9 72 28 37 05
>>>>
>>>
>>>
>>
>

<<exensa_logo_mail.png>>

Reply via email to