Hi,

I'm not exactly sure what's your codes like though, ISTM this is a correct
behaviour.
If the size of data that a driver fetches exceeds the limit, the driver
throws this exception.
(See
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala#L68
)
So, in your case, your driver tries to fetch 1345.5 MB data of 4 models
from executors, then it fails.
Thanks,

On Sat, Mar 5, 2016 at 6:11 AM, James Jia <james...@berkeley.edu> wrote:

> I'm running a distributed KMeans algorithm with 4 executors.
>
> I have a RDD[Data]. I use mapPartition to run a learner on each data 
> partition, and then call reduce with my custom model reduce function to 
> reduce the result of the model to start a new iteration.
>
> The model size is around ~330 MB. I would expect that the required memory for 
> the serialized result at the driver to be at most 2*300 MB in order to reduce 
> two models, but it looks like Spark is serializing all of the models to the 
> driver before reducing.
>
> The error message says that the total size of the serialized results is 
> 1345.5MB, which is approximately 4 * 330 MB. I know I can set the driver's 
> max result size, but I just want to confirm that this is expected behavior.
>
> Thanks!
>
> James
>
> Stage 0:==============>                                            (1 + 3) / 
> 4]16/02/19 05:59:28 ERROR TaskSetManager: Total size of serialized results of 
> 4 tasks (1345.5 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
>
> org.apache.spark.SparkException: Job aborted due to stage failure: Total size 
> of serialized results of 4 tasks (1345.5 MB) is bigger than 
> spark.driver.maxResultSize (1024.0 MB)
>
>   at org.apache.spark.scheduler.DAGScheduler.org 
> <http://org.apache.spark.scheduler.dagscheduler.org/>$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
>
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
>
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
>
>   at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>
>   at 
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)
>
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
>
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
>
>   at scala.Option.foreach(Option.scala:257)
>
>   at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
>
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)
>
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
>
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
>
>   at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>
>   at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
>
>   at org.apache.spark.SparkContext.runJob(SparkContext.scala:1824)
>
>   at org.apache.spark.SparkContext.runJob(SparkContext.scala:1944)
>
>   at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1007)
>
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
>
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
>
>   at org.apache.spark.rdd.RDD.withScope(RDD.scala:310)
>
>   at org.apache.spark.rdd.RDD.reduce(RDD.scala:989)
>
>   at BIDMach.RunOnSpark$.runOnSpark(RunOnSpark.scala:50)
>
>   ... 50 elided
>
>


-- 
---
Takeshi Yamamuro

Reply via email to