Just curious, is this set to be merged at some point?

On Thu Jan 22 2015 at 4:34:46 PM Ankur Dave <ankurd...@gmail.com> wrote:

> At 2015-01-22 02:06:37 -0800, NicolasC <nicolas.ch...@inria.fr> wrote:
> > I try to execute a simple program that runs the ShortestPaths algorithm
> > (org.apache.spark.graphx.lib.ShortestPaths) on a small grid graph.
> > I use Spark 1.2.0 downloaded from spark.apache.org.
> >
> > This program runs more than 2 hours when the grid size is 70x70 as
> above, and is then killed
> > by the resource manager of the cluster (Torque). After a 5-6 minutes of
> execution, the
> > Spark master UI does not even respond.
> >
> > For a grid size of 30x30, the program terminates in about 20 seconds,
> and for a grid size
> > of 50x50 it finishes in about 80 seconds. The problem appears for a grid
> size of 70x70 and
> > above.
>
> Unfortunately this problem is due to a Spark bug. In later iterations of
> iterative algorithms, the lineage maintained for fault tolerance grows long
> and causes Spark to consume increasing amounts of resources for scheduling
> and task serialization.
>
> The workaround is to checkpoint the graph periodically, which writes it to
> stable storage and interrupts the lineage chain before it grows too long.
>
> If you're able to recompile Spark, you can do this by applying the patch
> to GraphX at the end of this mail, and before running graph algorithms,
> calling
>
>     sc.setCheckpointDir("/tmp")
>
> to set the checkpoint directory as desired.
>
> Ankur
>
> === patch begins here ===
>
> diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
> b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
> index 5e55620..1fbbb87 100644
> --- a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
> +++ b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
> @@ -126,6 +126,8 @@ object Pregel extends Logging {
>      // Loop
>      var prevG: Graph[VD, ED] = null
>      var i = 0
> +    val checkpoint = g.vertices.context.getCheckpointDir.nonEmpty
> +    val checkpointFrequency = 25
>      while (activeMessages > 0 && i < maxIterations) {
>        // Receive the messages. Vertices that didn't get any messages do
> not appear in newVerts.
>        val newVerts = g.vertices.innerJoin(messages)(vprog).cache()
> @@ -139,6 +141,14 @@ object Pregel extends Logging {
>        // get to send messages. We must cache messages so it can be
> materialized on the next line,
>        // allowing us to uncache the previous iteration.
>        messages = g.mapReduceTriplets(sendMsg, mergeMsg, Some((newVerts,
> activeDirection))).cache()
> +
> +      if (checkpoint && i % checkpointFrequency == checkpointFrequency -
> 1) {
> +        logInfo("Checkpointing in iteration " + i)
> +        g.vertices.checkpoint()
> +        g.edges.checkpoint()
> +        messages.checkpoint()
> +      }
> +
>        // The call to count() materializes `messages`, `newVerts`, and the
> vertices of `g`. This
>        // hides oldMessages (depended on by newVerts), newVerts (depended
> on by messages), and the
>        // vertices of prevG (depended on by newVerts, oldMessages, and the
> vertices of g).
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>

Reply via email to