Driver OOM while using reduceByKey

2014-05-29 Thread haitao .yao
Hi,

 I used 1g memory for the driver java process and got OOM error on
driver side before reduceByKey. After analyzed the heap dump, the biggest
object is org.apache.spark.MapStatus, which occupied over 900MB memory.

Here's my question:


1. Is there any optimization switches that I can tune to avoid this? I have
used the compression on output with spark.io.compression.codec.

2. Why the workers send all the data back to driver to run reduceByKey?
With the current implementation, if I use reduceByKey on TBs of data, that
will be a disaster for driver. Maybe I'm wrong about the assumption of the
spark implementation.


And here's my code snippet:


```

val cntNew = spark.accumulator(0)

val cntOld = spark.accumulator(0)

val cntErr = spark.accumulator(0)


val sequenceFileUrl = args(0)

val seq = spark.sequenceFile[Text, BytesWritable](sequenceFileUrl)

val stat = seq.map(pair = convertData(

  pair._2, cntNew, cntOld, cntErr

)).reduceByKey(_ + _)

stat.saveAsSequenceFile(args(1)

```


Thanks.


-- 

haitao.yao@China


Re: Driver OOM while using reduceByKey

2014-05-29 Thread Matei Zaharia
That hash map is just a list of where each task ran, it’s not the actual data. 
How many map and reduce tasks do you have? Maybe you need to give the driver a 
bit more memory, or use fewer tasks (e.g. do reduceByKey(_ + _, 100) to use 
only 100 tasks).

Matei

On May 29, 2014, at 2:03 AM, haitao .yao yao.e...@gmail.com wrote:

 Hi,
 
  I used 1g memory for the driver java process and got OOM error on driver 
 side before reduceByKey. After analyzed the heap dump, the biggest object is 
 org.apache.spark.MapStatus, which occupied over 900MB memory. 
 
 Here's my question: 
 
 
 1. Is there any optimization switches that I can tune to avoid this? I have 
 used the compression on output with spark.io.compression.codec.  
 
 2. Why the workers send all the data back to driver to run reduceByKey? With 
 the current implementation, if I use reduceByKey on TBs of data, that will be 
 a disaster for driver. Maybe I'm wrong about the assumption of the spark 
 implementation.
 
 
 And here's my code snippet:
 
 
 ```
 
 val cntNew = spark.accumulator(0)
 
 val cntOld = spark.accumulator(0)
 
 val cntErr = spark.accumulator(0)
 
 
 val sequenceFileUrl = args(0)
 
 val seq = spark.sequenceFile[Text, BytesWritable](sequenceFileUrl)
 
 val stat = seq.map(pair = convertData(
 
   pair._2, cntNew, cntOld, cntErr
 
 )).reduceByKey(_ + _)
 
 stat.saveAsSequenceFile(args(1)
 
 ```
 
 
 Thanks.
 
 
 -- 
 
 haitao.yao@China



Re: Driver OOM while using reduceByKey

2014-05-29 Thread haitao .yao
Thanks. it worked.


2014-05-30 1:53 GMT+08:00 Matei Zaharia matei.zaha...@gmail.com:

 That hash map is just a list of where each task ran, it’s not the actual
 data. How many map and reduce tasks do you have? Maybe you need to give the
 driver a bit more memory, or use fewer tasks (e.g. do reduceByKey(_ + _,
 100) to use only 100 tasks).

 Matei

 On May 29, 2014, at 2:03 AM, haitao .yao yao.e...@gmail.com wrote:

  Hi,
 
   I used 1g memory for the driver java process and got OOM error on
 driver side before reduceByKey. After analyzed the heap dump, the biggest
 object is org.apache.spark.MapStatus, which occupied over 900MB memory.
 
  Here's my question:
 
 
  1. Is there any optimization switches that I can tune to avoid this? I
 have used the compression on output with spark.io.compression.codec.
 
  2. Why the workers send all the data back to driver to run reduceByKey?
 With the current implementation, if I use reduceByKey on TBs of data, that
 will be a disaster for driver. Maybe I'm wrong about the assumption of the
 spark implementation.
 
 
  And here's my code snippet:
 
 
  ```
 
  val cntNew = spark.accumulator(0)
 
  val cntOld = spark.accumulator(0)
 
  val cntErr = spark.accumulator(0)
 
 
  val sequenceFileUrl = args(0)
 
  val seq = spark.sequenceFile[Text, BytesWritable](sequenceFileUrl)
 
  val stat = seq.map(pair = convertData(
 
pair._2, cntNew, cntOld, cntErr
 
  )).reduceByKey(_ + _)
 
  stat.saveAsSequenceFile(args(1)
 
  ```
 
 
  Thanks.
 
 
  --
 
  haitao.yao@China




-- 
haitao.yao@Beijing