Thanks that's why I was worried and tested my application again :). On 24 April 2015 at 23:22, Michal Michalski <michal.michal...@boxever.com> wrote:
> Yes. > > Kind regards, > Michał Michalski, > michal.michal...@boxever.com > > On 24 April 2015 at 17:12, Jeetendra Gangele <gangele...@gmail.com> wrote: > >> you used ZipWithUniqueID? >> >> On 24 April 2015 at 21:28, Michal Michalski <michal.michal...@boxever.com >> > wrote: >> >>> I somehow missed zipWithIndex (and Sean's email), thanks for hint. I >>> mean - I saw it before, but I just thought it's not doing what I want. I've >>> re-read the description now and it looks like it might be actually what I >>> need. Thanks. >>> >>> Kind regards, >>> Michał Michalski, >>> michal.michal...@boxever.com >>> >>> On 24 April 2015 at 16:26, Ganelin, Ilya <ilya.gane...@capitalone.com> >>> wrote: >>> >>>> To maintain the order you can use zipWithIndex as Sean Owen pointed >>>> out. This is the same as zipWithUniqueId except the assigned number is the >>>> index of the data in the RDD which I believe matches the order of data as >>>> it's stored on HDFS. >>>> >>>> >>>> >>>> Sent with Good (www.good.com) >>>> >>>> >>>> -----Original Message----- >>>> *From: *Michal Michalski [michal.michal...@boxever.com] >>>> *Sent: *Friday, April 24, 2015 11:18 AM Eastern Standard Time >>>> *To: *Ganelin, Ilya >>>> *Cc: *Spico Florin; user >>>> *Subject: *Re: Does HadoopRDD.zipWithIndex method preserve the order >>>> of the input data from Hadoop? >>>> >>>> I read it one by one as I need to maintain the order, but it doesn't >>>> mean that I process them one by one later. Input lines refer to different >>>> entities I update, so once I read them in order, I group them by the id of >>>> the entity I want to update, sort the updates on per-entity basis and >>>> process them further in parallel (including writing data to C* and Kafka at >>>> the very end). That's what I use Spark for - the first step I ask about is >>>> just a requirement related to the input format I get and need to support. >>>> Everything what happens after that is just a normal data processing job >>>> that you want to distribute. >>>> >>>> Kind regards, >>>> Michał Michalski, >>>> michal.michal...@boxever.com >>>> >>>> On 24 April 2015 at 16:10, Ganelin, Ilya <ilya.gane...@capitalone.com> >>>> wrote: >>>> >>>>> If you're reading a file one by line then you should simply use Java's >>>>> Hadoop FileSystem class to read the file with a BuffereInputStream. I >>>>> don't >>>>> think you need an RDD here. >>>>> >>>>> >>>>> >>>>> Sent with Good (www.good.com) >>>>> >>>>> >>>>> -----Original Message----- >>>>> *From: *Michal Michalski [michal.michal...@boxever.com] >>>>> *Sent: *Friday, April 24, 2015 11:04 AM Eastern Standard Time >>>>> *To: *Ganelin, Ilya >>>>> *Cc: *Spico Florin; user >>>>> *Subject: *Re: Does HadoopRDD.zipWithIndex method preserve the order >>>>> of the input data from Hadoop? >>>>> >>>>> The problem I'm facing is that I need to process lines from input file >>>>> in the order they're stored in the file, as they define the order of >>>>> updates I need to apply on some data and these updates are not commutative >>>>> so that order matters. Unfortunately the input is purely order-based, >>>>> theres no timestamp per line etc. in the file and I'd prefer to avoid >>>>> preparing the file in advance by adding ordinals before / after each line. >>>>> From the approaches you suggested first two won't work as there's nothing >>>>> I >>>>> could sort by. I'm not sure about the third one - I'm just not sure what >>>>> you meant there to be honest :-) >>>>> >>>>> Kind regards, >>>>> Michał Michalski, >>>>> michal.michal...@boxever.com >>>>> >>>>> On 24 April 2015 at 15:48, Ganelin, Ilya <ilya.gane...@capitalone.com> >>>>> wrote: >>>>> >>>>>> Michael - you need to sort your RDD. Check out the shuffle >>>>>> documentation on the Spark Programming Guide. It talks about this >>>>>> specifically. You can resolve this in a couple of ways - either by >>>>>> collecting your RDD and sorting it, using sortBy, or not worrying about >>>>>> the >>>>>> internal ordering. You can still extract elements in order by using a >>>>>> filter with the zip if e.g RDD.filter(s => s._2 < 50).sortBy(_._1) >>>>>> >>>>>> >>>>>> >>>>>> Sent with Good (www.good.com) >>>>>> >>>>>> >>>>>> >>>>>> -----Original Message----- >>>>>> *From: *Michal Michalski [michal.michal...@boxever.com] >>>>>> *Sent: *Friday, April 24, 2015 10:41 AM Eastern Standard Time >>>>>> *To: *Spico Florin >>>>>> *Cc: *user >>>>>> *Subject: *Re: Does HadoopRDD.zipWithIndex method preserve the order >>>>>> of the input data from Hadoop? >>>>>> >>>>>> Of course after you do it, you probably want to call >>>>>> repartition(somevalue) on your RDD to "get your paralellism back". >>>>>> >>>>>> Kind regards, >>>>>> Michał Michalski, >>>>>> michal.michal...@boxever.com >>>>>> >>>>>> On 24 April 2015 at 15:28, Michal Michalski < >>>>>> michal.michal...@boxever.com> wrote: >>>>>> >>>>>>> I did a quick test as I was curious about it too. I created a file >>>>>>> with numbers from 0 to 999, in order, line by line. Then I did: >>>>>>> >>>>>>> scala> val numbers = sc.textFile("./numbers.txt") >>>>>>> scala> val zipped = numbers.zipWithUniqueId >>>>>>> scala> zipped.foreach(i => println(i)) >>>>>>> >>>>>>> Expected result if the order was preserved would be something like: >>>>>>> (0, 0), (1, 1) etc. >>>>>>> Unfortunately, the output looks like this: >>>>>>> >>>>>>> (126,1) >>>>>>> (223,2) >>>>>>> (320,3) >>>>>>> (1,0) >>>>>>> (127,11) >>>>>>> (2,10) >>>>>>> (...) >>>>>>> >>>>>>> The workaround I found that works for me for my specific use case >>>>>>> (relatively small input files) is setting explicitly the number of >>>>>>> partitions to 1 when reading a single *text* file: >>>>>>> >>>>>>> scala> val numbers_sp = sc.textFile("./numbers.txt", 1) >>>>>>> >>>>>>> Than the output is exactly as I would expect. >>>>>>> >>>>>>> I didn't dive into the code too much, but I took a very quick look >>>>>>> at it and figured out - correct me if I missed something, it's Friday >>>>>>> afternoon! ;-) - that this workaround will work fine for all the input >>>>>>> formats inheriting from org.apache.hadoop.mapred.FileInputFormat >>>>>>> including >>>>>>> TextInputFormat, of course - see the implementation of getSplits() >>>>>>> method >>>>>>> there ( >>>>>>> http://grepcode.com/file/repo1.maven.org/maven2/org.jvnet.hudson.hadoop/hadoop-core/0.19.1-hudson-2/org/apache/hadoop/mapred/FileInputFormat.java#FileInputFormat.getSplits%28org.apache.hadoop.mapred.JobConf%2Cint%29 >>>>>>> ). >>>>>>> The numSplits variable passed there is exactly the same value as you >>>>>>> provide as a second argument to textFile, which is minPartitions. >>>>>>> However, >>>>>>> while *min* suggests that we can only define a minimal number of >>>>>>> partitions, while we have no control over the max, from what I can see >>>>>>> in >>>>>>> the code, that value specifies the *exact* number of partitions per the >>>>>>> FileInputFormat.getSplits implementation. Of course it can differ for >>>>>>> other >>>>>>> input formats, but in this case it should work just fine. >>>>>>> >>>>>>> >>>>>>> Kind regards, >>>>>>> Michał Michalski, >>>>>>> michal.michal...@boxever.com >>>>>>> >>>>>>> On 24 April 2015 at 14:05, Spico Florin <spicoflo...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hello! >>>>>>>> I know that HadoopRDD partitions are built based on the number of >>>>>>>> splits in HDFS. I'm wondering if these partitions preserve the initial >>>>>>>> order of data in file. >>>>>>>> As an example, if I have an HDFS (myTextFile) file that has these >>>>>>>> splits: >>>>>>>> >>>>>>>> split 0-> line 1, ..., line k >>>>>>>> split 1->line k+1,..., line k+n >>>>>>>> splt 2->line k+n, line k+n+m >>>>>>>> >>>>>>>> and the code >>>>>>>> val lines=sc.textFile("hdfs://mytextFile") >>>>>>>> lines.zipWithIndex() >>>>>>>> >>>>>>>> will the order of lines preserved? >>>>>>>> (line 1, zipIndex 1) , .. (line k, zipIndex k), and so one. >>>>>>>> >>>>>>>> I found this question on stackoverflow ( >>>>>>>> http://stackoverflow.com/questions/26046410/how-can-i-obtain-an-element-position-in-sparks-rdd) >>>>>>>> whose answer intrigued me: >>>>>>>> "Essentially, RDD's zipWithIndex() method seems to do this, but it >>>>>>>> won't preserve the original ordering of the data the RDD was created >>>>>>>> from" >>>>>>>> >>>>>>>> Can you please confirm that is this the correct answer? >>>>>>>> >>>>>>>> Thanks. >>>>>>>> Florin >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>>> ------------------------------ >>>>>> >>>>>> The information contained in this e-mail is confidential and/or >>>>>> proprietary to Capital One and/or its affiliates. The information >>>>>> transmitted herewith is intended only for use by the individual or entity >>>>>> to which it is addressed. If the reader of this message is not the >>>>>> intended recipient, you are hereby notified that any review, >>>>>> retransmission, dissemination, distribution, copying or other use of, or >>>>>> taking of any action in reliance upon this information is strictly >>>>>> prohibited. If you have received this communication in error, please >>>>>> contact the sender and delete the material from your computer. >>>>>> >>>>> >>>>> >>>>> ------------------------------ >>>>> >>>>> The information contained in this e-mail is confidential and/or >>>>> proprietary to Capital One and/or its affiliates. The information >>>>> transmitted herewith is intended only for use by the individual or entity >>>>> to which it is addressed. If the reader of this message is not the >>>>> intended recipient, you are hereby notified that any review, >>>>> retransmission, dissemination, distribution, copying or other use of, or >>>>> taking of any action in reliance upon this information is strictly >>>>> prohibited. If you have received this communication in error, please >>>>> contact the sender and delete the material from your computer. >>>>> >>>> >>>> >>>> ------------------------------ >>>> >>>> The information contained in this e-mail is confidential and/or >>>> proprietary to Capital One and/or its affiliates. The information >>>> transmitted herewith is intended only for use by the individual or entity >>>> to which it is addressed. If the reader of this message is not the >>>> intended recipient, you are hereby notified that any review, >>>> retransmission, dissemination, distribution, copying or other use of, or >>>> taking of any action in reliance upon this information is strictly >>>> prohibited. If you have received this communication in error, please >>>> contact the sender and delete the material from your computer. >>>> >>> >>> >> >> > --