Actually the program hangs just by calling dataAllRDD.count(). I suspect
creating the RDD is not successful when its elements are too big. When nY =
3000, dataAllRDD.count() works (each element of dataAll = 3000*400*64 bits =
9.6 MB), but when nY = 4000, it hangs (4000*400*64 bits = 12.8 MB). 

What are the limiting factors to the size of the elements of an RDD? 

sparkuser2345 wrote
> I have an array 'dataAll' of key-value pairs where each value is an array
> of arrays. I would like to parallelize a task over the elements of
> 'dataAll' to the workers. In the dummy example below, the number of
> elements in 'dataAll' is 3 but in real application it would be tens to
> hundreds. 
> Without parallelizing dataAll, 'result' is calculated in less than a
> second: 
> import org.jblas.DoubleMatrix  
> val nY = 5000
> val nX = 400
> val dataAll = Array((1, Array.fill(nY)(Array.fill(nX)(1.0))),
>                     (2, Array.fill(nY)(Array.fill(nX)(1.0))),
>                     (3, Array.fill(nY)(Array.fill(nX)(1.0))))
> val w1 = DoubleMatrix.ones(400)
> // This finishes in less than a second: 
> val result = { dat =>
>   val c       = dat._1
>   val dataArr = dat._2
>   // Map over the Arrays within dataArr: 
>   val test = { arr =>
>     val test2 = new DoubleMatrix(arr.length, 1, arr:_*)
>     val out =
>     out
>   }
>   (c, test)
> }
> However, when I parallelize dataAll, the same task freezes: 
> val dataAllRDD = sc.parallelize(dataAll, 3)
> // This doesn't finish in several minutes: 
> val result = { dat =>
>   val c       = dat._1
>   val dataArr = dat._2
>   // Map over the Arrays within dataArr: 
>   val test = { arr =>
>     val test2 = new DoubleMatrix(arr.length, 1, arr:_*)
>     val out =
>     out
>   }
>   (c, test)
> }.collect
> After sending the above task, nothing is written to the worker logs (as
> viewed through the web UI), but the following output is printed in the
> Spark shell where I'm running the task: 
> 14/08/11 18:17:31 INFO SparkContext: Starting job: collect at 
> <console>
> :33
> 14/08/11 18:17:31 INFO DAGScheduler: Got job 0 (collect at 
> <console>
> :33) with 3 output partitions (allowLocal=false)
> 14/08/11 18:17:31 INFO DAGScheduler: Final stage: Stage 0 (collect at 
> <console>
> :33)
> 14/08/11 18:17:31 INFO DAGScheduler: Parents of final stage: List()
> 14/08/11 18:17:31 INFO DAGScheduler: Missing parents: List()
> 14/08/11 18:17:31 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[1] at
> map at 
> <console>
> :23), which has no missing parents
> 14/08/11 18:17:32 INFO DAGScheduler: Submitting 3 missing tasks from Stage
> 0 (MappedRDD[1] at map at 
> <console>
> :23)
> 14/08/11 18:17:32 INFO TaskSchedulerImpl: Adding task set 0.0 with 3 tasks
> 14/08/11 18:17:32 INFO TaskSetManager: Starting task 0.0:0 as TID 0 on
> executor 2: 
> <executor_2_IP>
> 14/08/11 18:17:32 INFO TaskSetManager: Serialized task 0.0:0 as 16154060
> bytes in 69 ms
> 14/08/11 18:17:32 INFO TaskSetManager: Starting task 0.0:1 as TID 1 on
> executor 1: 
> <executor_1_IP>
> 14/08/11 18:17:32 INFO TaskSetManager: Serialized task 0.0:1 as 16154060
> bytes in 81 ms
> 14/08/11 18:17:32 INFO TaskSetManager: Starting task 0.0:2 as TID 2 on
> executor 0: 
> <executor_0_IP>
> 14/08/11 18:17:32 INFO TaskSetManager: Serialized task 0.0:2 as 16154060
> bytes in 66 ms
> does work with smaller array though (e.g. nY = 100;
> finishes in less than a second). 
> Why is so much slower than, or even not
> executing at all? 
> The Spark version I'm using is 0.9.0.

View this message in context:
Sent from the Apache Spark User List mailing list archive at

To unsubscribe, e-mail:
For additional commands, e-mail:

Reply via email to