Thanks that's why I was worried and tested my application again :).

On 24 April 2015 at 23:22, Michal Michalski <michal.michal...@boxever.com>
wrote:

> Yes.
>
> Kind regards,
> Michał Michalski,
> michal.michal...@boxever.com
>
> On 24 April 2015 at 17:12, Jeetendra Gangele <gangele...@gmail.com> wrote:
>
>> you used ZipWithUniqueID?
>>
>> On 24 April 2015 at 21:28, Michal Michalski <michal.michal...@boxever.com
>> > wrote:
>>
>>> I somehow missed zipWithIndex (and Sean's email), thanks for hint. I
>>> mean - I saw it before, but I just thought it's not doing what I want. I've
>>> re-read the description now and it looks like it might be actually what I
>>> need. Thanks.
>>>
>>> Kind regards,
>>> Michał Michalski,
>>> michal.michal...@boxever.com
>>>
>>> On 24 April 2015 at 16:26, Ganelin, Ilya <ilya.gane...@capitalone.com>
>>> wrote:
>>>
>>>>  To maintain the order you can use zipWithIndex as Sean Owen pointed
>>>> out. This is the same as zipWithUniqueId except the assigned number is the
>>>> index of the data in the RDD which I believe matches the order of data as
>>>> it's stored on HDFS.
>>>>
>>>>
>>>>
>>>> Sent with Good (www.good.com)
>>>>
>>>>
>>>> -----Original Message-----
>>>> *From: *Michal Michalski [michal.michal...@boxever.com]
>>>> *Sent: *Friday, April 24, 2015 11:18 AM Eastern Standard Time
>>>> *To: *Ganelin, Ilya
>>>> *Cc: *Spico Florin; user
>>>> *Subject: *Re: Does HadoopRDD.zipWithIndex method preserve the order
>>>> of the input data from Hadoop?
>>>>
>>>> I read it one by one as I need to maintain the order, but it doesn't
>>>> mean that I process them one by one later. Input lines refer to different
>>>> entities I update, so once I read them in order, I group them by the id of
>>>> the entity I want to update, sort the updates on per-entity basis and
>>>> process them further in parallel (including writing data to C* and Kafka at
>>>> the very end). That's what I use Spark for - the first step I ask about is
>>>> just a requirement related to the input format I get and need to support.
>>>> Everything what happens after that is just a normal data processing job
>>>> that you want to distribute.
>>>>
>>>>  Kind regards,
>>>> Michał Michalski,
>>>> michal.michal...@boxever.com
>>>>
>>>> On 24 April 2015 at 16:10, Ganelin, Ilya <ilya.gane...@capitalone.com>
>>>> wrote:
>>>>
>>>>> If you're reading a file one by line then you should simply use Java's
>>>>> Hadoop FileSystem class to read the file with a BuffereInputStream. I 
>>>>> don't
>>>>> think you need an RDD here.
>>>>>
>>>>>
>>>>>
>>>>> Sent with Good (www.good.com)
>>>>>
>>>>>
>>>>> -----Original Message-----
>>>>> *From: *Michal Michalski [michal.michal...@boxever.com]
>>>>>  *Sent: *Friday, April 24, 2015 11:04 AM Eastern Standard Time
>>>>> *To: *Ganelin, Ilya
>>>>> *Cc: *Spico Florin; user
>>>>> *Subject: *Re: Does HadoopRDD.zipWithIndex method preserve the order
>>>>> of the input data from Hadoop?
>>>>>
>>>>> The problem I'm facing is that I need to process lines from input file
>>>>> in the order they're stored in the file, as they define the order of
>>>>> updates I need to apply on some data and these updates are not commutative
>>>>> so that order matters. Unfortunately the input is purely order-based,
>>>>> theres no timestamp per line etc. in the file and I'd prefer to avoid
>>>>> preparing the file in advance by adding ordinals before / after each line.
>>>>> From the approaches you suggested first two won't work as there's nothing 
>>>>> I
>>>>> could sort by. I'm not sure about the third one - I'm just not sure what
>>>>> you meant there to be honest :-)
>>>>>
>>>>>  Kind regards,
>>>>> Michał Michalski,
>>>>> michal.michal...@boxever.com
>>>>>
>>>>> On 24 April 2015 at 15:48, Ganelin, Ilya <ilya.gane...@capitalone.com>
>>>>> wrote:
>>>>>
>>>>>> Michael - you need to sort your RDD. Check out the shuffle
>>>>>> documentation on the Spark Programming Guide. It talks about this
>>>>>> specifically. You can resolve this in a couple of ways - either by
>>>>>> collecting your RDD and sorting it, using sortBy, or not worrying about 
>>>>>> the
>>>>>> internal ordering. You can still extract elements in order by using a
>>>>>> filter with the zip if e.g RDD.filter(s => s._2 < 50).sortBy(_._1)
>>>>>>
>>>>>>
>>>>>>
>>>>>> Sent with Good (www.good.com)
>>>>>>
>>>>>>
>>>>>>
>>>>>> -----Original Message-----
>>>>>> *From: *Michal Michalski [michal.michal...@boxever.com]
>>>>>> *Sent: *Friday, April 24, 2015 10:41 AM Eastern Standard Time
>>>>>> *To: *Spico Florin
>>>>>> *Cc: *user
>>>>>> *Subject: *Re: Does HadoopRDD.zipWithIndex method preserve the order
>>>>>> of the input data from Hadoop?
>>>>>>
>>>>>> Of course after you do it, you probably want to call
>>>>>> repartition(somevalue) on your RDD to "get your paralellism back".
>>>>>>
>>>>>>  Kind regards,
>>>>>> Michał Michalski,
>>>>>> michal.michal...@boxever.com
>>>>>>
>>>>>> On 24 April 2015 at 15:28, Michal Michalski <
>>>>>> michal.michal...@boxever.com> wrote:
>>>>>>
>>>>>>> I did a quick test as I was curious about it too. I created a file
>>>>>>> with numbers from 0 to 999, in order, line by line. Then I did:
>>>>>>>
>>>>>>> scala> val numbers = sc.textFile("./numbers.txt")
>>>>>>> scala> val zipped = numbers.zipWithUniqueId
>>>>>>> scala> zipped.foreach(i => println(i))
>>>>>>>
>>>>>>> Expected result if the order was preserved would be something like:
>>>>>>> (0, 0), (1, 1) etc.
>>>>>>> Unfortunately, the output looks like this:
>>>>>>>
>>>>>>>  (126,1)
>>>>>>> (223,2)
>>>>>>> (320,3)
>>>>>>> (1,0)
>>>>>>> (127,11)
>>>>>>> (2,10)
>>>>>>>  (...)
>>>>>>>
>>>>>>> The workaround I found that works for me for my specific use case
>>>>>>> (relatively small input files) is setting explicitly the number of
>>>>>>> partitions to 1 when reading a single *text* file:
>>>>>>>
>>>>>>> scala> val numbers_sp = sc.textFile("./numbers.txt", 1)
>>>>>>>
>>>>>>> Than the output is exactly as I would expect.
>>>>>>>
>>>>>>> I didn't dive into the code too much, but I took a very quick look
>>>>>>> at it and figured out - correct me if I missed something, it's Friday
>>>>>>> afternoon! ;-)  - that this workaround will work fine for all the input
>>>>>>> formats inheriting from org.apache.hadoop.mapred.FileInputFormat 
>>>>>>> including
>>>>>>> TextInputFormat, of course - see the implementation of getSplits() 
>>>>>>> method
>>>>>>> there (
>>>>>>> http://grepcode.com/file/repo1.maven.org/maven2/org.jvnet.hudson.hadoop/hadoop-core/0.19.1-hudson-2/org/apache/hadoop/mapred/FileInputFormat.java#FileInputFormat.getSplits%28org.apache.hadoop.mapred.JobConf%2Cint%29
>>>>>>> ).
>>>>>>> The numSplits variable passed there is exactly the same value as you
>>>>>>> provide as a second argument to textFile, which is minPartitions. 
>>>>>>> However,
>>>>>>> while *min* suggests that we can only define a minimal number of
>>>>>>> partitions, while we have no control over the max, from what I can see 
>>>>>>> in
>>>>>>> the code, that value specifies the *exact* number of partitions per the
>>>>>>> FileInputFormat.getSplits implementation. Of course it can differ for 
>>>>>>> other
>>>>>>> input formats, but in this case it should work just fine.
>>>>>>>
>>>>>>>
>>>>>>>  Kind regards,
>>>>>>> Michał Michalski,
>>>>>>> michal.michal...@boxever.com
>>>>>>>
>>>>>>> On 24 April 2015 at 14:05, Spico Florin <spicoflo...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hello!
>>>>>>>>   I know that HadoopRDD partitions are built based on the number of
>>>>>>>> splits in HDFS. I'm wondering if these partitions preserve the initial
>>>>>>>> order of data in file.
>>>>>>>> As an example, if I have an HDFS (myTextFile) file that has these
>>>>>>>> splits:
>>>>>>>>
>>>>>>>> split 0-> line 1, ..., line k
>>>>>>>> split 1->line k+1,..., line k+n
>>>>>>>> splt 2->line k+n, line k+n+m
>>>>>>>>
>>>>>>>> and the code
>>>>>>>> val lines=sc.textFile("hdfs://mytextFile")
>>>>>>>> lines.zipWithIndex()
>>>>>>>>
>>>>>>>> will the order of lines preserved?
>>>>>>>> (line 1, zipIndex 1) , .. (line k, zipIndex k), and so one.
>>>>>>>>
>>>>>>>> I found this question on stackoverflow (
>>>>>>>> http://stackoverflow.com/questions/26046410/how-can-i-obtain-an-element-position-in-sparks-rdd)
>>>>>>>> whose answer intrigued me:
>>>>>>>> "Essentially, RDD's zipWithIndex() method seems to do this, but it
>>>>>>>> won't preserve the original ordering of the data the RDD was created 
>>>>>>>> from"
>>>>>>>>
>>>>>>>> Can you please confirm that is this the correct answer?
>>>>>>>>
>>>>>>>> Thanks.
>>>>>>>>  Florin
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>  ------------------------------
>>>>>>
>>>>>> The information contained in this e-mail is confidential and/or
>>>>>> proprietary to Capital One and/or its affiliates. The information
>>>>>> transmitted herewith is intended only for use by the individual or entity
>>>>>> to which it is addressed.  If the reader of this message is not the
>>>>>> intended recipient, you are hereby notified that any review,
>>>>>> retransmission, dissemination, distribution, copying or other use of, or
>>>>>> taking of any action in reliance upon this information is strictly
>>>>>> prohibited. If you have received this communication in error, please
>>>>>> contact the sender and delete the material from your computer.
>>>>>>
>>>>>
>>>>>
>>>>> ------------------------------
>>>>>
>>>>> The information contained in this e-mail is confidential and/or
>>>>> proprietary to Capital One and/or its affiliates. The information
>>>>> transmitted herewith is intended only for use by the individual or entity
>>>>> to which it is addressed.  If the reader of this message is not the
>>>>> intended recipient, you are hereby notified that any review,
>>>>> retransmission, dissemination, distribution, copying or other use of, or
>>>>> taking of any action in reliance upon this information is strictly
>>>>> prohibited. If you have received this communication in error, please
>>>>> contact the sender and delete the material from your computer.
>>>>>
>>>>
>>>>
>>>> ------------------------------
>>>>
>>>> The information contained in this e-mail is confidential and/or
>>>> proprietary to Capital One and/or its affiliates. The information
>>>> transmitted herewith is intended only for use by the individual or entity
>>>> to which it is addressed.  If the reader of this message is not the
>>>> intended recipient, you are hereby notified that any review,
>>>> retransmission, dissemination, distribution, copying or other use of, or
>>>> taking of any action in reliance upon this information is strictly
>>>> prohibited. If you have received this communication in error, please
>>>> contact the sender and delete the material from your computer.
>>>>
>>>
>>>
>>
>>
>


--

Reply via email to