On Mon, Dec 30, 2013 at 8:31 PM, Andrew Ash <[email protected]> wrote:

> Hi Aureliano,
>
> It's very easy to get lines into (start byte number, line) using Hadoop's
> TextInputFormat.  See how SparkContext's textFile() method does it here:
> https://github.com/apache/incubator-spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala#L291
>

Thanks for pointing this. while start byte number provides a globally
unique index for each line, my application needs the line number.

It seems best to go with the source file containing the line numbers,
instead of recreating this is in hadoop/spark.


>
> What is the use case where you must have the global line number in the
> file, vs a global ordered unique identifier (my suggestion above) or a
> partition-local line number (discussed extensively below)?
>
> Also if you have any way to do this in plain Hadoop, Spark can use that as
> well.
>
> The fundamental difficulty is that knowing global line number breaks the
> assumption Hadoop makes everywhere that each record is independent of all
> the others.  Maybe you should consider adding a line number to the
> beginning of every line on import time into HDFS instead of doing it
> afterwards in Spark.
>
> Cheers!
> Andrew
>
>
> On Mon, Dec 30, 2013 at 12:15 PM, Aureliano Buendia 
> <[email protected]>wrote:
>
>> I assumed that number of lines in each partition, except the last
>> partition, is equal. Isn't this the case? In that case Guillaume's approach
>> makes sense.
>>
>> All of these methods are inefficient. Spark needs to support this feature
>> at lower level, as Michael suggested.
>>
>>
>> On Mon, Dec 30, 2013 at 8:01 PM, Guillaume Pitel <
>> [email protected]> wrote:
>>
>>>  You're assuming each partition has the same line count. I don't think
>>> it's true (actually, I'm almost certain it's false). And anyway your code
>>> also require two maps.
>>>
>>> In my code, the sorting as well as the other operations are performed on
>>> a very small dataset : one element per partition
>>>
>>> Guillaume
>>>
>>>
>>>
>>>
>>>> Did you try the code I sent ? I think the sortBy is probably in the
>>>> wrong direction, so change it with -i instead of i
>>>>
>>>
>>>  I'm confused why would need in memory sorting. We just use a loop like
>>> any other loops in spark. Why shouldn't this solve the problem?:
>>>
>>>     val count = lines.count() // lines is the rdd
>>>     val partitionLinesCount = count / rdd.partitions.length
>>>     linesWithIndex = lines.mapPartitionsWithIndex { (pi, it) =>
>>>       var i = pi * partitionLinesCount
>>>       it.map {
>>>         *line => (i, line)*
>>>          i += 1
>>>        }
>>>     }
>>>
>>>
>>>
>>> --
>>>    [image: eXenSa]
>>>  *Guillaume PITEL, Président*
>>> +33(0)6 25 48 86 80 / +33(0)9 70 44 67 53
>>>
>>>  eXenSa S.A.S. <http://www.exensa.com/>
>>>  41, rue Périer - 92120 Montrouge - FRANCE
>>> Tel +33(0)1 84 16 36 77 / Fax +33(0)9 72 28 37 05
>>>
>>
>>
>

<<exensa_logo_mail.png>>

Reply via email to