Re: Checkpoint bugs in GraphX

2014-11-10 Thread GuoQiang Li
I have been trying to fix this bug.‍
The related PR: 
https://github.com/apache/spark/pull/2631‍


-- Original --
From:  Xu Lijie;lijie@gmail.com;
Date:  Tue, Nov 11, 2014 10:19 AM
To:  useru...@spark.apache.org; devdev@spark.apache.org; 

Subject:  Checkpoint bugs in GraphX



Hi, all. I'm not sure whether someone has reported this bug:


There should be a checkpoint() method in EdgeRDD and VertexRDD as follows:

override def checkpoint(): Unit = { partitionsRDD.checkpoint() }


Current EdgeRDD and VertexRDD use *RDD.checkpoint()*, which only checkpoint
the edges/vertices but not the critical partitionsRDD.


Also, the variables (partitionsRDD and targetStroageLevel) in EdgeRDD and
VertexRDD should be transient.

class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag]( @transient val
partitionsRDD: RDD[(PartitionID, EdgePartition[ED, VD])], @transient val
targetStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY) extends
RDD[Edge[ED]](partitionsRDD.context, List(new
OneToOneDependency(partitionsRDD))) {


class VertexRDD[@specialized VD: ClassTag]( @transient val partitionsRDD:
RDD[ShippableVertexPartition[VD]], @transient val targetStorageLevel:
StorageLevel = StorageLevel.MEMORY_ONLY) extends RDD[(VertexId,
VD)](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) {


These two bugs usually lead to stackoverflow error in iterative application
written by GraphX.

Re: Checkpoint bugs in GraphX

2014-11-10 Thread GuoQiang Li
Many methods are not required serialization EdgeRDD or VertexRDD(eg: 
graph.edges.‍‍count‍), moreover , partitionsRDD(or targetStorageLevel‍) need 
only in the driver. partitionsRDD (or targetStorageLevel) ‍is not serialized no 
effect.
‍




-- Original --
From:  Xu Lijie;lijie@gmail.com;
Date:  Tue, Nov 11, 2014 11:40 AM
To:  GuoQiang Liwi...@qq.com; 
Cc:  useru...@spark.apache.org; devdev@spark.apache.org; 
Subject:  Re: Checkpoint bugs in GraphX



Nice, we currently encounter a stackoverflow error caused by this bug.

We also found that val partitionsRDD: RDD[(PartitionID, EdgePartition[ED,
VD])],
val targetStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY) will not
be serialized even without adding @transient.

However, transient can affect the JVM stack. Our guess is that:

If we do not add @transient, the pointers of partitionsRDD and
targetStorageLevel
will be kept in the stack.
Or else, the stack will not keep any information of the two variables
during serialization/deserialization.

I'm wondering whether the guess is right.

2014-11-11 11:16 GMT+08:00 GuoQiang Li wi...@qq.com:

 I have been trying to fix this bug.‍
 The related PR:
 https://github.com/apache/spark/pull/2631‍

 -- Original --
 *From: * Xu Lijie;lijie@gmail.com;
 *Date: * Tue, Nov 11, 2014 10:19 AM
 *To: * useru...@spark.apache.org; devdev@spark.apache.org;
 *Subject: * Checkpoint bugs in GraphX

 Hi, all. I'm not sure whether someone has reported this bug:


 There should be a checkpoint() method in EdgeRDD and VertexRDD as follows:

 override def checkpoint(): Unit = { partitionsRDD.checkpoint() }


 Current EdgeRDD and VertexRDD use *RDD.checkpoint()*, which only checkpoint
 the edges/vertices but not the critical partitionsRDD.


 Also, the variables (partitionsRDD and targetStroageLevel) in EdgeRDD and
 VertexRDD should be transient.

 class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag]( @transient val
 partitionsRDD: RDD[(PartitionID, EdgePartition[ED, VD])], @transient val
 targetStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY) extends
 RDD[Edge[ED]](partitionsRDD.context, List(new
 OneToOneDependency(partitionsRDD))) {


 class VertexRDD[@specialized VD: ClassTag]( @transient val partitionsRDD:
 RDD[ShippableVertexPartition[VD]], @transient val targetStorageLevel:
 StorageLevel = StorageLevel.MEMORY_ONLY) extends RDD[(VertexId,
 VD)](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) {


 These two bugs usually lead to stackoverflow error in iterative application
 written by GraphX.