Actually the program hangs just by calling dataAllRDD.count(). I suspect creating the RDD is not successful when its elements are too big. When nY = 3000, dataAllRDD.count() works (each element of dataAll = 3000*400*64 bits = 9.6 MB), but when nY = 4000, it hangs (4000*400*64 bits = 12.8 MB).
What are the limiting factors to the size of the elements of an RDD? sparkuser2345 wrote > I have an array 'dataAll' of key-value pairs where each value is an array > of arrays. I would like to parallelize a task over the elements of > 'dataAll' to the workers. In the dummy example below, the number of > elements in 'dataAll' is 3 but in real application it would be tens to > hundreds. > > Without parallelizing dataAll, 'result' is calculated in less than a > second: > > import org.jblas.DoubleMatrix > > val nY = 5000 > val nX = 400 > > val dataAll = Array((1, Array.fill(nY)(Array.fill(nX)(1.0))), > (2, Array.fill(nY)(Array.fill(nX)(1.0))), > (3, Array.fill(nY)(Array.fill(nX)(1.0)))) > > val w1 = DoubleMatrix.ones(400) > > // This finishes in less than a second: > val result = dataAll.map { dat => > val c = dat._1 > val dataArr = dat._2 > // Map over the Arrays within dataArr: > val test = dataArr.map { arr => > val test2 = new DoubleMatrix(arr.length, 1, arr:_*) > val out = test2.dot(w1) > out > } > (c, test) > } > > However, when I parallelize dataAll, the same task freezes: > > val dataAllRDD = sc.parallelize(dataAll, 3) > > // This doesn't finish in several minutes: > val result = dataAllRDD.map { dat => > val c = dat._1 > val dataArr = dat._2 > // Map over the Arrays within dataArr: > val test = dataArr.map { arr => > val test2 = new DoubleMatrix(arr.length, 1, arr:_*) > val out = test2.dot(w1) > out > } > (c, test) > }.collect > > After sending the above task, nothing is written to the worker logs (as > viewed through the web UI), but the following output is printed in the > Spark shell where I'm running the task: > > 14/08/11 18:17:31 INFO SparkContext: Starting job: collect at > <console> > :33 > 14/08/11 18:17:31 INFO DAGScheduler: Got job 0 (collect at > <console> > :33) with 3 output partitions (allowLocal=false) > 14/08/11 18:17:31 INFO DAGScheduler: Final stage: Stage 0 (collect at > <console> > :33) > 14/08/11 18:17:31 INFO DAGScheduler: Parents of final stage: List() > 14/08/11 18:17:31 INFO DAGScheduler: Missing parents: List() > 14/08/11 18:17:31 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[1] at > map at > <console> > :23), which has no missing parents > 14/08/11 18:17:32 INFO DAGScheduler: Submitting 3 missing tasks from Stage > 0 (MappedRDD[1] at map at > <console> > :23) > 14/08/11 18:17:32 INFO TaskSchedulerImpl: Adding task set 0.0 with 3 tasks > 14/08/11 18:17:32 INFO TaskSetManager: Starting task 0.0:0 as TID 0 on > executor 2: > <executor_2_IP> > (PROCESS_LOCAL) > 14/08/11 18:17:32 INFO TaskSetManager: Serialized task 0.0:0 as 16154060 > bytes in 69 ms > 14/08/11 18:17:32 INFO TaskSetManager: Starting task 0.0:1 as TID 1 on > executor 1: > <executor_1_IP> > (PROCESS_LOCAL) > 14/08/11 18:17:32 INFO TaskSetManager: Serialized task 0.0:1 as 16154060 > bytes in 81 ms > 14/08/11 18:17:32 INFO TaskSetManager: Starting task 0.0:2 as TID 2 on > executor 0: > <executor_0_IP> > (PROCESS_LOCAL) > 14/08/11 18:17:32 INFO TaskSetManager: Serialized task 0.0:2 as 16154060 > bytes in 66 ms > > > dataAllRDD.map does work with smaller array though (e.g. nY = 100; > finishes in less than a second). > > Why is dataAllRDD.map so much slower than dataAll.map, or even not > executing at all? > > The Spark version I'm using is 0.9.0. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Parallelizing-a-task-makes-it-freeze-tp11900p11967.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org