Thanks Matei. We have tested the fix and it's working perfectly.

Andrew, we set spark.shuffle.spill=false but the application goes out of 
memory. I think that is expected.

Regards,Ajay 


On Friday, June 6, 2014 3:49 AM, Andrew Ash <and...@andrewash.com> wrote:
 


Hi Ajay,

Can you please try running the same code with spark.shuffle.spill=false and see 
if the numbers turn out correctly?  That parameter controls whether or not the 
buggy code that Matei fixed in ExternalAppendOnlyMap is used.

FWIW I saw similar issues in 0.9.0 but no longer in 0.9.1 after I think some 
fixes in spilling landed.

Andrew



On Thu, Jun 5, 2014 at 3:05 PM, Matei Zaharia <matei.zaha...@gmail.com> wrote:

Hey Ajay, thanks for reporting this. There was indeed a bug, specifically in 
the way join tasks spill to disk (which happened when you had more concurrent 
tasks competing for memory). I’ve posted a patch for it here: 
https://github.com/apache/spark/pull/986. Feel free to try that if you’d like; 
it will also be in 0.9.2 and 1.0.1.
>
>
>Matei
>
>
>On Jun 5, 2014, at 12:19 AM, Ajay Srivastava <a_k_srivast...@yahoo.com> wrote:
>
>Sorry for replying late. It was night here.
>>
>>
>>Lian/Matei,
>>Here is the code snippet -
>>    sparkConf.set("spark.executor.memory", "10g")
>>    sparkConf.set("spark.cores.max", "5")
>>    
>>    val sc = new SparkContext(sparkConf)
>>    
>>    val accId2LocRDD = 
>>sc.textFile("hdfs://bbr-dev178:9000/data/subDbSpark/account2location").map(getKeyValueFromString(_,
>> 0, ',', true))
>>      
>>    val accId2DemoRDD = 
>>sc.textFile("hdfs://bbr-dev178:9000/data/subDbSpark/account2demographic_planType").map(getKeyValueFromString(_,
>> 0, ',', true))
>>    
>>    val joinedRDD = accId2LocRDD.join(accId2DemoRDD)
>>
>>
>>  def getKeyValueFromString(line: String, keyIndex: Int, delimit: Char, 
>>retFullLine: Boolean): Tuple2[String, String] = {
>>    val splits = line.split(delimit)
>>    if (splits.length <= 1) {
>>      (null, null)
>>    } else if (retFullLine) {
>>      (splits(keyIndex), line)
>>    } else{
>>        (splits(keyIndex), splits(splits.length-keyIndex-1))
>>    }
>>  }
>>
>>    
>>
>>Both of these files have 10 M records with same unique keys. Size of the file 
>>is nearly 280 MB and block size in hdfs is 256 MB. The output of join should 
>>contain 10 M records.
>>
>>
>>
>>We have done some more experiments -
>>1) Running cogroup instead of join - it also gives incorrect count.
>>2) Running union followed by groupbykey and then filtering records with two 
>>entries in sequence - It also gives incorrect count.
>>3) Increase spark.executor.memory to 50 g and everything works fine. Count 
>>comes 10 M for join,cogroup and union/groupbykey/filter transformations.
>>
>>
>>
>>I thought that 10g is enough memory for executors but even if the memory is 
>>less it should not result in incorrect computation. Probably there is a 
>>problem in reconstructing RDDs when memory is not enough. 
>>
>>
>>
>>Thanks Chen for your observation. I get this problem on single worker so 
>>there will not be any mismatch of jars. On two workers, since executor memory 
>>gets doubled the code works fine.
>>
>>
>>
>>Regards,
>>Ajay
>>
>>
>>
>>
>>On Thursday, June 5, 2014 1:35 AM, Matei Zaharia <matei.zaha...@gmail.com> 
>>wrote:
>> 
>>
>>
>>If this isn’t the problem, it would be great if you can post the code for the 
>>program.
>>
>>
>>Matei
>>
>>
>>
>>On Jun 4, 2014, at 12:58 PM, Xu (Simon) Chen <xche...@gmail.com> wrote:
>>
>>Maybe your two workers have different assembly jar files?
>>>I just ran into a similar problem that my spark-shell is using a different 
>>>jar file than my workers - got really confusing results.
>>>On Jun 4, 2014 8:33 AM, "Ajay Srivastava" <a_k_srivast...@yahoo.com> wrote:
>>>
>>>Hi,
>>>>
>>>>
>>>>I am doing join of two RDDs which giving different results ( counting 
>>>>number of records ) each time I run this code on same input.
>>>>
>>>>
>>>>The input files are large enough to be divided in two splits. When the 
>>>>program runs on two workers with single core assigned to these, output is 
>>>>consistent and looks correct. But when single worker is used with two or 
>>>>more than two cores, the result seems to be random. Every time, count of 
>>>>joined record is different.
>>>>
>>>>
>>>>Does this sound like a defect or I need to take care of something while 
>>>>using join ? I am using spark-0.9.1.
>>>>
>>>>
>>>>
>>>>Regards
>>>>Ajay
>>
>>
>>
>

Reply via email to