Re: Task not serializable: java.io.NotSerializableException: org.apache.spark.SparkContext

2014-07-24 Thread lihu
​Which code do you used, do you caused by your own code or something in
spark itself?


On Tue, Jul 22, 2014 at 8:50 AM, hsy...@gmail.com hsy...@gmail.com wrote:

 I have the same problem


 On Sat, Jul 19, 2014 at 12:31 AM, lihu lihu...@gmail.com wrote:

 Hi,
 Everyone.  I have a piece of following code. When I run it,
 it occurred the error just like below, it seem that the SparkContext is not
 serializable, but i do not try to use the SparkContext except the broadcast.
 [In fact, this code is in the MLLib, I just try to broadcast the
  centerArrays ]

 it can success in the redeceBykey operation, but failed at the
 collect operation, this confused me.


 INFO DAGScheduler: Failed to run collect at KMeans.scala:235
 [error] (run-main-0) org.apache.spark.SparkException: Job aborted: Task
 not serializable: java.io.NotSerializableException:
 org.apache.spark.SparkContext
 org.apache.spark.SparkException: Job aborted: Task not serializable:
 java.io.NotSerializableException: org.apache.spark.SparkContext
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
  at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)




 private def initKMeansParallel(data: RDD[Array[Double]]):
 Array[ClusterCenters] = {

 @transient val sc = data.sparkContext   // I try to add the 
 transient
 annotation here, but it doesn't work

 // Initialize each run's center to a random point
 val seed = new XORShiftRandom().nextInt()
 val sample = data.takeSample(true, runs, seed).toSeq
 val centers = Array.tabulate(runs)(r = ArrayBuffer(sample(r)))

 // On each step, sample 2 * k points on average for each run with
 probability proportional
 // to their squared distance from that run's current centers
 for (step - 0 until initializationSteps) {
   val centerArrays = sc.broadcast(centers.map(_.toArray))
   val sumCosts = data.flatMap { point =
 for (r - 0 until runs) yield (r,
 KMeans.pointCost(centerArrays.value(r), point))
   }.reduceByKey(_ + _).collectAsMap()
   //can pass at this point
   val chosen = data.mapPartitionsWithIndex { (index, points) =
 val rand = new XORShiftRandom(seed ^ (step  16) ^ index)
 for {
   p - points
   r - 0 until runs
   if rand.nextDouble()  KMeans.pointCost(centerArrays.value(r),
 p) * 2 * k / sumCosts(r)
 } yield (r, p)
   }.collect()
   // failed at this
 point.
   for ((r, p) - chosen) {
 centers(r) += p
   }
 }








-- 
*Best Wishes!*

 *Li Hu(李浒) | Graduate Student*

*Institute for Interdisciplinary Information Sciences(IIIS
http://iiis.tsinghua.edu.cn/) *
*Tsinghua University, China*

*Email: lihu...@gmail.com lihu...@gmail.com*
*Tel  : +86 15120081920*
*Homepage: http://iiis.tsinghua.edu.cn/zh/lihu/
http://iiis.tsinghua.edu.cn/zh/lihu/*


Re: Task not serializable: java.io.NotSerializableException: org.apache.spark.SparkContext

2014-07-24 Thread Tathagata Das
You can set the Java option -Dsun.io.serialization.extendedDebugInfo=true to
have more information about the object be printed. It will help you trace
down the how the SparkContext is getting included in some kind of closure.

TD


On Thu, Jul 24, 2014 at 9:48 AM, lihu lihu...@gmail.com wrote:

 ​Which code do you used, do you caused by your own code or something in
 spark itself?


 On Tue, Jul 22, 2014 at 8:50 AM, hsy...@gmail.com hsy...@gmail.com
 wrote:

 I have the same problem


 On Sat, Jul 19, 2014 at 12:31 AM, lihu lihu...@gmail.com wrote:

 Hi,
 Everyone.  I have a piece of following code. When I run it,
 it occurred the error just like below, it seem that the SparkContext is not
 serializable, but i do not try to use the SparkContext except the broadcast.
 [In fact, this code is in the MLLib, I just try to broadcast the
  centerArrays ]

 it can success in the redeceBykey operation, but failed at the
 collect operation, this confused me.


 INFO DAGScheduler: Failed to run collect at KMeans.scala:235
 [error] (run-main-0) org.apache.spark.SparkException: Job aborted: Task
 not serializable: java.io.NotSerializableException:
 org.apache.spark.SparkContext
 org.apache.spark.SparkException: Job aborted: Task not serializable:
 java.io.NotSerializableException: org.apache.spark.SparkContext
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
  at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)




 private def initKMeansParallel(data: RDD[Array[Double]]):
 Array[ClusterCenters] = {

 @transient val sc = data.sparkContext   // I try to add the 
 transient
 annotation here, but it doesn't work

 // Initialize each run's center to a random point
 val seed = new XORShiftRandom().nextInt()
 val sample = data.takeSample(true, runs, seed).toSeq
 val centers = Array.tabulate(runs)(r = ArrayBuffer(sample(r)))

 // On each step, sample 2 * k points on average for each run with
 probability proportional
 // to their squared distance from that run's current centers
 for (step - 0 until initializationSteps) {
   val centerArrays = sc.broadcast(centers.map(_.toArray))
   val sumCosts = data.flatMap { point =
 for (r - 0 until runs) yield (r,
 KMeans.pointCost(centerArrays.value(r), point))
   }.reduceByKey(_ + _).collectAsMap()
 //can pass at this point
   val chosen = data.mapPartitionsWithIndex { (index, points) =
 val rand = new XORShiftRandom(seed ^ (step  16) ^ index)
 for {
   p - points
   r - 0 until runs
   if rand.nextDouble()  KMeans.pointCost(centerArrays.value(r),
 p) * 2 * k / sumCosts(r)
 } yield (r, p)
   }.collect()
 // failed at this
 point.
   for ((r, p) - chosen) {
 centers(r) += p
   }
 }








 --
 *Best Wishes!*

  *Li Hu(李浒) | Graduate Student*

 *Institute for Interdisciplinary Information Sciences(IIIS
 http://iiis.tsinghua.edu.cn/) *
 *Tsinghua University, China*

 *Email: lihu...@gmail.com lihu...@gmail.com*
 *Tel  : +86 15120081920 %2B86%2015120081920*
 *Homepage: http://iiis.tsinghua.edu.cn/zh/lihu/
 http://iiis.tsinghua.edu.cn/zh/lihu/*





Re: Task not serializable: java.io.NotSerializableException: org.apache.spark.SparkContext

2014-07-21 Thread hsy...@gmail.com
I have the same problem


On Sat, Jul 19, 2014 at 12:31 AM, lihu lihu...@gmail.com wrote:

 Hi,
 Everyone.  I have a piece of following code. When I run it,
 it occurred the error just like below, it seem that the SparkContext is not
 serializable, but i do not try to use the SparkContext except the broadcast.
 [In fact, this code is in the MLLib, I just try to broadcast the
  centerArrays ]

 it can success in the redeceBykey operation, but failed at the collect
 operation, this confused me.


 INFO DAGScheduler: Failed to run collect at KMeans.scala:235
 [error] (run-main-0) org.apache.spark.SparkException: Job aborted: Task
 not serializable: java.io.NotSerializableException:
 org.apache.spark.SparkContext
 org.apache.spark.SparkException: Job aborted: Task not serializable:
 java.io.NotSerializableException: org.apache.spark.SparkContext
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
  at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)




 private def initKMeansParallel(data: RDD[Array[Double]]):
 Array[ClusterCenters] = {

 @transient val sc = data.sparkContext   // I try to add the 
 transient
 annotation here, but it doesn't work

 // Initialize each run's center to a random point
 val seed = new XORShiftRandom().nextInt()
 val sample = data.takeSample(true, runs, seed).toSeq
 val centers = Array.tabulate(runs)(r = ArrayBuffer(sample(r)))

 // On each step, sample 2 * k points on average for each run with
 probability proportional
 // to their squared distance from that run's current centers
 for (step - 0 until initializationSteps) {
   val centerArrays = sc.broadcast(centers.map(_.toArray))
   val sumCosts = data.flatMap { point =
 for (r - 0 until runs) yield (r,
 KMeans.pointCost(centerArrays.value(r), point))
   }.reduceByKey(_ + _).collectAsMap()
   //can pass at this point
   val chosen = data.mapPartitionsWithIndex { (index, points) =
 val rand = new XORShiftRandom(seed ^ (step  16) ^ index)
 for {
   p - points
   r - 0 until runs
   if rand.nextDouble()  KMeans.pointCost(centerArrays.value(r),
 p) * 2 * k / sumCosts(r)
 } yield (r, p)
   }.collect()
   // failed at this
 point.
   for ((r, p) - chosen) {
 centers(r) += p
   }
 }