Re: Task not serializable: java.io.NotSerializableException: org.apache.spark.SparkContext
Which code do you used, do you caused by your own code or something in spark itself? On Tue, Jul 22, 2014 at 8:50 AM, hsy...@gmail.com hsy...@gmail.com wrote: I have the same problem On Sat, Jul 19, 2014 at 12:31 AM, lihu lihu...@gmail.com wrote: Hi, Everyone. I have a piece of following code. When I run it, it occurred the error just like below, it seem that the SparkContext is not serializable, but i do not try to use the SparkContext except the broadcast. [In fact, this code is in the MLLib, I just try to broadcast the centerArrays ] it can success in the redeceBykey operation, but failed at the collect operation, this confused me. INFO DAGScheduler: Failed to run collect at KMeans.scala:235 [error] (run-main-0) org.apache.spark.SparkException: Job aborted: Task not serializable: java.io.NotSerializableException: org.apache.spark.SparkContext org.apache.spark.SparkException: Job aborted: Task not serializable: java.io.NotSerializableException: org.apache.spark.SparkContext at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026) private def initKMeansParallel(data: RDD[Array[Double]]): Array[ClusterCenters] = { @transient val sc = data.sparkContext // I try to add the transient annotation here, but it doesn't work // Initialize each run's center to a random point val seed = new XORShiftRandom().nextInt() val sample = data.takeSample(true, runs, seed).toSeq val centers = Array.tabulate(runs)(r = ArrayBuffer(sample(r))) // On each step, sample 2 * k points on average for each run with probability proportional // to their squared distance from that run's current centers for (step - 0 until initializationSteps) { val centerArrays = sc.broadcast(centers.map(_.toArray)) val sumCosts = data.flatMap { point = for (r - 0 until runs) yield (r, KMeans.pointCost(centerArrays.value(r), point)) }.reduceByKey(_ + _).collectAsMap() //can pass at this point val chosen = data.mapPartitionsWithIndex { (index, points) = val rand = new XORShiftRandom(seed ^ (step 16) ^ index) for { p - points r - 0 until runs if rand.nextDouble() KMeans.pointCost(centerArrays.value(r), p) * 2 * k / sumCosts(r) } yield (r, p) }.collect() // failed at this point. for ((r, p) - chosen) { centers(r) += p } } -- *Best Wishes!* *Li Hu(李浒) | Graduate Student* *Institute for Interdisciplinary Information Sciences(IIIS http://iiis.tsinghua.edu.cn/) * *Tsinghua University, China* *Email: lihu...@gmail.com lihu...@gmail.com* *Tel : +86 15120081920* *Homepage: http://iiis.tsinghua.edu.cn/zh/lihu/ http://iiis.tsinghua.edu.cn/zh/lihu/*
Re: Task not serializable: java.io.NotSerializableException: org.apache.spark.SparkContext
You can set the Java option -Dsun.io.serialization.extendedDebugInfo=true to have more information about the object be printed. It will help you trace down the how the SparkContext is getting included in some kind of closure. TD On Thu, Jul 24, 2014 at 9:48 AM, lihu lihu...@gmail.com wrote: Which code do you used, do you caused by your own code or something in spark itself? On Tue, Jul 22, 2014 at 8:50 AM, hsy...@gmail.com hsy...@gmail.com wrote: I have the same problem On Sat, Jul 19, 2014 at 12:31 AM, lihu lihu...@gmail.com wrote: Hi, Everyone. I have a piece of following code. When I run it, it occurred the error just like below, it seem that the SparkContext is not serializable, but i do not try to use the SparkContext except the broadcast. [In fact, this code is in the MLLib, I just try to broadcast the centerArrays ] it can success in the redeceBykey operation, but failed at the collect operation, this confused me. INFO DAGScheduler: Failed to run collect at KMeans.scala:235 [error] (run-main-0) org.apache.spark.SparkException: Job aborted: Task not serializable: java.io.NotSerializableException: org.apache.spark.SparkContext org.apache.spark.SparkException: Job aborted: Task not serializable: java.io.NotSerializableException: org.apache.spark.SparkContext at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026) private def initKMeansParallel(data: RDD[Array[Double]]): Array[ClusterCenters] = { @transient val sc = data.sparkContext // I try to add the transient annotation here, but it doesn't work // Initialize each run's center to a random point val seed = new XORShiftRandom().nextInt() val sample = data.takeSample(true, runs, seed).toSeq val centers = Array.tabulate(runs)(r = ArrayBuffer(sample(r))) // On each step, sample 2 * k points on average for each run with probability proportional // to their squared distance from that run's current centers for (step - 0 until initializationSteps) { val centerArrays = sc.broadcast(centers.map(_.toArray)) val sumCosts = data.flatMap { point = for (r - 0 until runs) yield (r, KMeans.pointCost(centerArrays.value(r), point)) }.reduceByKey(_ + _).collectAsMap() //can pass at this point val chosen = data.mapPartitionsWithIndex { (index, points) = val rand = new XORShiftRandom(seed ^ (step 16) ^ index) for { p - points r - 0 until runs if rand.nextDouble() KMeans.pointCost(centerArrays.value(r), p) * 2 * k / sumCosts(r) } yield (r, p) }.collect() // failed at this point. for ((r, p) - chosen) { centers(r) += p } } -- *Best Wishes!* *Li Hu(李浒) | Graduate Student* *Institute for Interdisciplinary Information Sciences(IIIS http://iiis.tsinghua.edu.cn/) * *Tsinghua University, China* *Email: lihu...@gmail.com lihu...@gmail.com* *Tel : +86 15120081920 %2B86%2015120081920* *Homepage: http://iiis.tsinghua.edu.cn/zh/lihu/ http://iiis.tsinghua.edu.cn/zh/lihu/*
Re: Task not serializable: java.io.NotSerializableException: org.apache.spark.SparkContext
I have the same problem On Sat, Jul 19, 2014 at 12:31 AM, lihu lihu...@gmail.com wrote: Hi, Everyone. I have a piece of following code. When I run it, it occurred the error just like below, it seem that the SparkContext is not serializable, but i do not try to use the SparkContext except the broadcast. [In fact, this code is in the MLLib, I just try to broadcast the centerArrays ] it can success in the redeceBykey operation, but failed at the collect operation, this confused me. INFO DAGScheduler: Failed to run collect at KMeans.scala:235 [error] (run-main-0) org.apache.spark.SparkException: Job aborted: Task not serializable: java.io.NotSerializableException: org.apache.spark.SparkContext org.apache.spark.SparkException: Job aborted: Task not serializable: java.io.NotSerializableException: org.apache.spark.SparkContext at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026) private def initKMeansParallel(data: RDD[Array[Double]]): Array[ClusterCenters] = { @transient val sc = data.sparkContext // I try to add the transient annotation here, but it doesn't work // Initialize each run's center to a random point val seed = new XORShiftRandom().nextInt() val sample = data.takeSample(true, runs, seed).toSeq val centers = Array.tabulate(runs)(r = ArrayBuffer(sample(r))) // On each step, sample 2 * k points on average for each run with probability proportional // to their squared distance from that run's current centers for (step - 0 until initializationSteps) { val centerArrays = sc.broadcast(centers.map(_.toArray)) val sumCosts = data.flatMap { point = for (r - 0 until runs) yield (r, KMeans.pointCost(centerArrays.value(r), point)) }.reduceByKey(_ + _).collectAsMap() //can pass at this point val chosen = data.mapPartitionsWithIndex { (index, points) = val rand = new XORShiftRandom(seed ^ (step 16) ^ index) for { p - points r - 0 until runs if rand.nextDouble() KMeans.pointCost(centerArrays.value(r), p) * 2 * k / sumCosts(r) } yield (r, p) }.collect() // failed at this point. for ((r, p) - chosen) { centers(r) += p } }