Thanks Matei. We have tested the fix and it's working perfectly.
Andrew, we set spark.shuffle.spill=false but the application goes out of memory. I think that is expected. Regards,Ajay On Friday, June 6, 2014 3:49 AM, Andrew Ash <and...@andrewash.com> wrote: Hi Ajay, Can you please try running the same code with spark.shuffle.spill=false and see if the numbers turn out correctly? That parameter controls whether or not the buggy code that Matei fixed in ExternalAppendOnlyMap is used. FWIW I saw similar issues in 0.9.0 but no longer in 0.9.1 after I think some fixes in spilling landed. Andrew On Thu, Jun 5, 2014 at 3:05 PM, Matei Zaharia <matei.zaha...@gmail.com> wrote: Hey Ajay, thanks for reporting this. There was indeed a bug, specifically in the way join tasks spill to disk (which happened when you had more concurrent tasks competing for memory). I’ve posted a patch for it here: https://github.com/apache/spark/pull/986. Feel free to try that if you’d like; it will also be in 0.9.2 and 1.0.1. > > >Matei > > >On Jun 5, 2014, at 12:19 AM, Ajay Srivastava <a_k_srivast...@yahoo.com> wrote: > >Sorry for replying late. It was night here. >> >> >>Lian/Matei, >>Here is the code snippet - >> sparkConf.set("spark.executor.memory", "10g") >> sparkConf.set("spark.cores.max", "5") >> >> val sc = new SparkContext(sparkConf) >> >> val accId2LocRDD = >>sc.textFile("hdfs://bbr-dev178:9000/data/subDbSpark/account2location").map(getKeyValueFromString(_, >> 0, ',', true)) >> >> val accId2DemoRDD = >>sc.textFile("hdfs://bbr-dev178:9000/data/subDbSpark/account2demographic_planType").map(getKeyValueFromString(_, >> 0, ',', true)) >> >> val joinedRDD = accId2LocRDD.join(accId2DemoRDD) >> >> >> def getKeyValueFromString(line: String, keyIndex: Int, delimit: Char, >>retFullLine: Boolean): Tuple2[String, String] = { >> val splits = line.split(delimit) >> if (splits.length <= 1) { >> (null, null) >> } else if (retFullLine) { >> (splits(keyIndex), line) >> } else{ >> (splits(keyIndex), splits(splits.length-keyIndex-1)) >> } >> } >> >> >> >>Both of these files have 10 M records with same unique keys. Size of the file >>is nearly 280 MB and block size in hdfs is 256 MB. The output of join should >>contain 10 M records. >> >> >> >>We have done some more experiments - >>1) Running cogroup instead of join - it also gives incorrect count. >>2) Running union followed by groupbykey and then filtering records with two >>entries in sequence - It also gives incorrect count. >>3) Increase spark.executor.memory to 50 g and everything works fine. Count >>comes 10 M for join,cogroup and union/groupbykey/filter transformations. >> >> >> >>I thought that 10g is enough memory for executors but even if the memory is >>less it should not result in incorrect computation. Probably there is a >>problem in reconstructing RDDs when memory is not enough. >> >> >> >>Thanks Chen for your observation. I get this problem on single worker so >>there will not be any mismatch of jars. On two workers, since executor memory >>gets doubled the code works fine. >> >> >> >>Regards, >>Ajay >> >> >> >> >>On Thursday, June 5, 2014 1:35 AM, Matei Zaharia <matei.zaha...@gmail.com> >>wrote: >> >> >> >>If this isn’t the problem, it would be great if you can post the code for the >>program. >> >> >>Matei >> >> >> >>On Jun 4, 2014, at 12:58 PM, Xu (Simon) Chen <xche...@gmail.com> wrote: >> >>Maybe your two workers have different assembly jar files? >>>I just ran into a similar problem that my spark-shell is using a different >>>jar file than my workers - got really confusing results. >>>On Jun 4, 2014 8:33 AM, "Ajay Srivastava" <a_k_srivast...@yahoo.com> wrote: >>> >>>Hi, >>>> >>>> >>>>I am doing join of two RDDs which giving different results ( counting >>>>number of records ) each time I run this code on same input. >>>> >>>> >>>>The input files are large enough to be divided in two splits. When the >>>>program runs on two workers with single core assigned to these, output is >>>>consistent and looks correct. But when single worker is used with two or >>>>more than two cores, the result seems to be random. Every time, count of >>>>joined record is different. >>>> >>>> >>>>Does this sound like a defect or I need to take care of something while >>>>using join ? I am using spark-0.9.1. >>>> >>>> >>>> >>>>Regards >>>>Ajay >> >> >> >