Driver OOM while using reduceByKey
Hi, I used 1g memory for the driver java process and got OOM error on driver side before reduceByKey. After analyzed the heap dump, the biggest object is org.apache.spark.MapStatus, which occupied over 900MB memory. Here's my question: 1. Is there any optimization switches that I can tune to avoid this? I have used the compression on output with spark.io.compression.codec. 2. Why the workers send all the data back to driver to run reduceByKey? With the current implementation, if I use reduceByKey on TBs of data, that will be a disaster for driver. Maybe I'm wrong about the assumption of the spark implementation. And here's my code snippet: ``` val cntNew = spark.accumulator(0) val cntOld = spark.accumulator(0) val cntErr = spark.accumulator(0) val sequenceFileUrl = args(0) val seq = spark.sequenceFile[Text, BytesWritable](sequenceFileUrl) val stat = seq.map(pair = convertData( pair._2, cntNew, cntOld, cntErr )).reduceByKey(_ + _) stat.saveAsSequenceFile(args(1) ``` Thanks. -- haitao.yao@China
Re: Driver OOM while using reduceByKey
That hash map is just a list of where each task ran, it’s not the actual data. How many map and reduce tasks do you have? Maybe you need to give the driver a bit more memory, or use fewer tasks (e.g. do reduceByKey(_ + _, 100) to use only 100 tasks). Matei On May 29, 2014, at 2:03 AM, haitao .yao yao.e...@gmail.com wrote: Hi, I used 1g memory for the driver java process and got OOM error on driver side before reduceByKey. After analyzed the heap dump, the biggest object is org.apache.spark.MapStatus, which occupied over 900MB memory. Here's my question: 1. Is there any optimization switches that I can tune to avoid this? I have used the compression on output with spark.io.compression.codec. 2. Why the workers send all the data back to driver to run reduceByKey? With the current implementation, if I use reduceByKey on TBs of data, that will be a disaster for driver. Maybe I'm wrong about the assumption of the spark implementation. And here's my code snippet: ``` val cntNew = spark.accumulator(0) val cntOld = spark.accumulator(0) val cntErr = spark.accumulator(0) val sequenceFileUrl = args(0) val seq = spark.sequenceFile[Text, BytesWritable](sequenceFileUrl) val stat = seq.map(pair = convertData( pair._2, cntNew, cntOld, cntErr )).reduceByKey(_ + _) stat.saveAsSequenceFile(args(1) ``` Thanks. -- haitao.yao@China
Re: Driver OOM while using reduceByKey
Thanks. it worked. 2014-05-30 1:53 GMT+08:00 Matei Zaharia matei.zaha...@gmail.com: That hash map is just a list of where each task ran, it’s not the actual data. How many map and reduce tasks do you have? Maybe you need to give the driver a bit more memory, or use fewer tasks (e.g. do reduceByKey(_ + _, 100) to use only 100 tasks). Matei On May 29, 2014, at 2:03 AM, haitao .yao yao.e...@gmail.com wrote: Hi, I used 1g memory for the driver java process and got OOM error on driver side before reduceByKey. After analyzed the heap dump, the biggest object is org.apache.spark.MapStatus, which occupied over 900MB memory. Here's my question: 1. Is there any optimization switches that I can tune to avoid this? I have used the compression on output with spark.io.compression.codec. 2. Why the workers send all the data back to driver to run reduceByKey? With the current implementation, if I use reduceByKey on TBs of data, that will be a disaster for driver. Maybe I'm wrong about the assumption of the spark implementation. And here's my code snippet: ``` val cntNew = spark.accumulator(0) val cntOld = spark.accumulator(0) val cntErr = spark.accumulator(0) val sequenceFileUrl = args(0) val seq = spark.sequenceFile[Text, BytesWritable](sequenceFileUrl) val stat = seq.map(pair = convertData( pair._2, cntNew, cntOld, cntErr )).reduceByKey(_ + _) stat.saveAsSequenceFile(args(1) ``` Thanks. -- haitao.yao@China -- haitao.yao@Beijing