Re: Kryo Serialization in Spark

2015-12-10 Thread manasdebashiskar
Are you sure you are using Kryo serialization. 
You are getting a java serialization error.
Are you setting up your sparkcontext with kryo serialization enabled?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Kryo-Serialization-in-Spark-tp25628p25678.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Kryo Serialization in Spark

2015-12-07 Thread prasad223
Hi All,

I'm unable to use Kryo serializer in my Spark program.
I'm loading a graph from an edgelist file using GraphLoader and performing a
BFS using pregel API.
But I get the below mentioned error while I'm running.
Can anybody tell me what is the right way to serialize a class in Spark and
what are the functions that needs to be implemented.


class MyRegistrator extends KryoRegistrator {
  override def registerClasses(kryo: Kryo) {
kryo.register(classOf[GraphBFS])
kryo.register(classOf[Config])
kryo.register(classOf[Iterator[(Long, Double)]])
  }
}


Class GraphBFS{

def vprog(id: VertexId, attr: Double, msg: Double): Double =
math.min(attr,msg) 

def sendMessage(triplet: EdgeTriplet[Double, Int]) :
Iterator[(VertexId, Double)] = {
var iter:Iterator[(VertexId, Double)] = Iterator.empty
val isSrcMarked = triplet.srcAttr != Double.PositiveInfinity
val isDstMarked = triplet.dstAttr != Double.PositiveInfinity
if(!(isSrcMarked && isDstMarked)){
if(isSrcMarked){
iter =
Iterator((triplet.dstId,triplet.srcAttr+1))
}else{
iter =
Iterator((triplet.srcId,triplet.dstAttr+1))
}
}
iter
}

def reduceMessage(a: Double, b: Double) = math.min(a,b)

 def main() {
..
  val bfs = initialGraph.pregel(initialMessage, maxIterations,
activeEdgeDirection)(vprog, sendMessage, reduceMessage)
.

 }
}



15/12/07 21:52:49 INFO BlockManager: Removing RDD 8
15/12/07 21:52:49 INFO BlockManager: Removing RDD 2
Exception in thread "main" org.apache.spark.SparkException: Task not
serializable
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:315)
at
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:305)
at
org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1893)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:683)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:682)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:286)
at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:682)
at
org.apache.spark.graphx.impl.VertexRDDImpl.mapVertexPartitions(VertexRDDImpl.scala:96)
at
org.apache.spark.graphx.impl.GraphImpl.mapVertices(GraphImpl.scala:132)
at org.apache.spark.graphx.Pregel$.apply(Pregel.scala:122)
at org.apache.spark.graphx.GraphOps.pregel(GraphOps.scala:362)
at GraphBFS.main(GraphBFS.scala:241)
at run$.main(GraphBFS.scala:268)
at run.main(GraphBFS.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665)
at
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170)
at
org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException: GraphBFS
Serialization stack:
- object not serializable (class: GraphBFS, value:
GraphBFS@575c3e9b)
- field (class: GraphBFS$$anonfun$17, name: $outer, type: class
GraphBFS)
- object (class GraphBFS$$anonfun$17, )
- field (class: org.apache.spark.graphx.Pregel$$anonfun$1, name:
vprog$1, type: interface scala.Function3)
- object (class org.apache.spark.graphx.Pregel$$anonfun$1,
)
- field (class: org.apache.spark.graphx.impl.GraphImpl$$anonfun$5,
name: f$1, type: interface scala.Function2)
- object (class org.apache.spark.graphx.impl.GraphImpl$$anonfun$5,
)
- field (class:
org.apache.spark.graphx.impl.VertexRDDImpl$$anonfun$1, name: f$1, type:
interface scala.Function1)
- object (class
org.apache.spark.graphx.impl.VertexRDDImpl$$anonfun$1, )

Thanks,
Prasad




-
Thanks,
Prasad
--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Kryo-Serialization-in-Spark-tp25628.html
Sent from the Apache Spark User List mailing list arc