Just curious, is this set to be merged at some point? On Thu Jan 22 2015 at 4:34:46 PM Ankur Dave <ankurd...@gmail.com> wrote:
> At 2015-01-22 02:06:37 -0800, NicolasC <nicolas.ch...@inria.fr> wrote: > > I try to execute a simple program that runs the ShortestPaths algorithm > > (org.apache.spark.graphx.lib.ShortestPaths) on a small grid graph. > > I use Spark 1.2.0 downloaded from spark.apache.org. > > > > This program runs more than 2 hours when the grid size is 70x70 as > above, and is then killed > > by the resource manager of the cluster (Torque). After a 5-6 minutes of > execution, the > > Spark master UI does not even respond. > > > > For a grid size of 30x30, the program terminates in about 20 seconds, > and for a grid size > > of 50x50 it finishes in about 80 seconds. The problem appears for a grid > size of 70x70 and > > above. > > Unfortunately this problem is due to a Spark bug. In later iterations of > iterative algorithms, the lineage maintained for fault tolerance grows long > and causes Spark to consume increasing amounts of resources for scheduling > and task serialization. > > The workaround is to checkpoint the graph periodically, which writes it to > stable storage and interrupts the lineage chain before it grows too long. > > If you're able to recompile Spark, you can do this by applying the patch > to GraphX at the end of this mail, and before running graph algorithms, > calling > > sc.setCheckpointDir("/tmp") > > to set the checkpoint directory as desired. > > Ankur > > === patch begins here === > > diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala > b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala > index 5e55620..1fbbb87 100644 > --- a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala > +++ b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala > @@ -126,6 +126,8 @@ object Pregel extends Logging { > // Loop > var prevG: Graph[VD, ED] = null > var i = 0 > + val checkpoint = g.vertices.context.getCheckpointDir.nonEmpty > + val checkpointFrequency = 25 > while (activeMessages > 0 && i < maxIterations) { > // Receive the messages. Vertices that didn't get any messages do > not appear in newVerts. > val newVerts = g.vertices.innerJoin(messages)(vprog).cache() > @@ -139,6 +141,14 @@ object Pregel extends Logging { > // get to send messages. We must cache messages so it can be > materialized on the next line, > // allowing us to uncache the previous iteration. > messages = g.mapReduceTriplets(sendMsg, mergeMsg, Some((newVerts, > activeDirection))).cache() > + > + if (checkpoint && i % checkpointFrequency == checkpointFrequency - > 1) { > + logInfo("Checkpointing in iteration " + i) > + g.vertices.checkpoint() > + g.edges.checkpoint() > + messages.checkpoint() > + } > + > // The call to count() materializes `messages`, `newVerts`, and the > vertices of `g`. This > // hides oldMessages (depended on by newVerts), newVerts (depended > on by messages), and the > // vertices of prevG (depended on by newVerts, oldMessages, and the > vertices of g). > > --------------------------------------------------------------------- > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >