Repository: spark
Updated Branches:
  refs/heads/master 335491704 -> 1fec3ce4e


[SPARK-11496][GRAPHX] Parallel implementation of personalized pagerank

(Updated version of [PR-9457](https://github.com/apache/spark/pull/9457), 
rebased on latest Spark master, and using mllib-local).

This implements a parallel version of personalized pagerank, which runs all 
propagations for a list of source vertices in parallel.

I ran a few benchmarks on the full [DBpedia](http://dbpedia.org/) graph. When 
running personalized pagerank for only one source node, the existing 
implementation is twice as fast as the parallel one (because of the 
SparseVector overhead). However for 10 source nodes, the parallel 
implementation is four times as fast. When increasing the number of source 
nodes, this difference becomes even greater.

![image](https://cloud.githubusercontent.com/assets/2491/10927702/dd82e4fa-8256-11e5-89a8-4799b407f502.png)

Author: Yves Raimond <yraim...@netflix.com>

Closes #14998 from moustaki/parallel-ppr.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1fec3ce4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1fec3ce4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1fec3ce4

Branch: refs/heads/master
Commit: 1fec3ce4e19664aa9f9238d9491b0cb1511f9be1
Parents: 3354917
Author: Yves Raimond <yraim...@netflix.com>
Authored: Sat Sep 10 00:15:59 2016 -0700
Committer: DB Tsai <d...@netflix.com>
Committed: Sat Sep 10 00:15:59 2016 -0700

----------------------------------------------------------------------
 graphx/pom.xml                                  |  5 ++
 .../org/apache/spark/graphx/GraphOps.scala      | 12 ++-
 .../org/apache/spark/graphx/lib/PageRank.scala  | 81 ++++++++++++++++++++
 .../apache/spark/graphx/lib/PageRankSuite.scala | 24 ++++++
 4 files changed, 121 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1fec3ce4/graphx/pom.xml
----------------------------------------------------------------------
diff --git a/graphx/pom.xml b/graphx/pom.xml
index bd4e533..10d5ba9 100644
--- a/graphx/pom.xml
+++ b/graphx/pom.xml
@@ -47,6 +47,11 @@
       <scope>test</scope>
     </dependency>
     <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-mllib-local_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
       <groupId>org.apache.xbean</groupId>
       <artifactId>xbean-asm5-shaded</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/spark/blob/1fec3ce4/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
index 868658d..9090730 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
@@ -20,9 +20,10 @@ package org.apache.spark.graphx
 import scala.reflect.ClassTag
 import scala.util.Random
 
-import org.apache.spark.SparkException
 import org.apache.spark.graphx.lib._
+import org.apache.spark.ml.linalg.Vector
 import org.apache.spark.rdd.RDD
+import org.apache.spark.SparkException
 
 /**
  * Contains additional functionality for [[Graph]]. All operations are 
expressed in terms of the
@@ -392,6 +393,15 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: 
Graph[VD, ED]) extends Seriali
   }
 
   /**
+   * Run parallel personalized PageRank for a given array of source vertices, 
such
+   * that all random walks are started relative to the source vertices
+   */
+  def staticParallelPersonalizedPageRank(sources: Array[VertexId], numIter: 
Int,
+    resetProb: Double = 0.15) : Graph[Vector, Double] = {
+    PageRank.runParallelPersonalizedPageRank(graph, numIter, resetProb, 
sources)
+  }
+
+  /**
    * Run Personalized PageRank for a fixed number of iterations with
    * with all iterations originating at the source node
    * returning a graph with vertex attributes

http://git-wip-us.apache.org/repos/asf/spark/blob/1fec3ce4/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
index 2f5bd4e..f4b0075 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
@@ -19,8 +19,11 @@ package org.apache.spark.graphx.lib
 
 import scala.reflect.ClassTag
 
+import breeze.linalg.{Vector => BV}
+
 import org.apache.spark.graphx._
 import org.apache.spark.internal.Logging
+import org.apache.spark.ml.linalg.{Vector, Vectors}
 
 /**
  * PageRank algorithm implementation. There are two implementations of 
PageRank implemented.
@@ -163,6 +166,84 @@ object PageRank extends Logging {
   }
 
   /**
+   * Run Personalized PageRank for a fixed number of iterations, for a
+   * set of starting nodes in parallel. Returns a graph with vertex attributes
+   * containing the pagerank relative to all starting nodes (as a sparse 
vector) and
+   * edge attributes the normalized edge weight
+   *
+   * @tparam VD The original vertex attribute (not used)
+   * @tparam ED The original edge attribute (not used)
+   *
+   * @param graph The graph on which to compute personalized pagerank
+   * @param numIter The number of iterations to run
+   * @param resetProb The random reset probability
+   * @param sources The list of sources to compute personalized pagerank from
+   * @return the graph with vertex attributes
+   *         containing the pagerank relative to all starting nodes (as a 
sparse vector) and
+   *         edge attributes the normalized edge weight
+   */
+  def runParallelPersonalizedPageRank[VD: ClassTag, ED: ClassTag](graph: 
Graph[VD, ED],
+    numIter: Int, resetProb: Double = 0.15,
+    sources: Array[VertexId]): Graph[Vector, Double] = {
+    // TODO if one sources vertex id is outside of the int range
+    // we won't be able to store its activations in a sparse vector
+    val zero = Vectors.sparse(sources.size, List()).asBreeze
+    val sourcesInitMap = sources.zipWithIndex.map { case (vid, i) =>
+      val v = Vectors.sparse(sources.size, Array(i), Array(resetProb)).asBreeze
+      (vid, v)
+    }.toMap
+    val sc = graph.vertices.sparkContext
+    val sourcesInitMapBC = sc.broadcast(sourcesInitMap)
+    // Initialize the PageRank graph with each edge attribute having
+    // weight 1/outDegree and each source vertex with attribute 1.0.
+    var rankGraph = graph
+      // Associate the degree with each vertex
+      .outerJoinVertices(graph.outDegrees) { (vid, vdata, deg) => 
deg.getOrElse(0) }
+      // Set the weight on the edges based on the degree
+      .mapTriplets(e => 1.0 / e.srcAttr, TripletFields.Src)
+      .mapVertices { (vid, attr) =>
+        if (sourcesInitMapBC.value contains vid) {
+          sourcesInitMapBC.value(vid)
+        } else {
+          zero
+        }
+      }
+
+    var i = 0
+    while (i < numIter) {
+      val prevRankGraph = rankGraph
+      // Propagates the message along outbound edges
+      // and adding start nodes back in with activation resetProb
+      val rankUpdates = rankGraph.aggregateMessages[BV[Double]](
+        ctx => ctx.sendToDst(ctx.srcAttr :* ctx.attr),
+        (a : BV[Double], b : BV[Double]) => a :+ b, TripletFields.Src)
+
+      rankGraph = rankGraph.joinVertices(rankUpdates) {
+        (vid, oldRank, msgSum) =>
+          val popActivations: BV[Double] = msgSum :* (1.0 - resetProb)
+          val resetActivations = if (sourcesInitMapBC.value contains vid) {
+            sourcesInitMapBC.value(vid)
+          } else {
+            zero
+          }
+          popActivations :+ resetActivations
+        }.cache()
+
+      rankGraph.edges.foreachPartition(x => {}) // also materializes 
rankGraph.vertices
+      prevRankGraph.vertices.unpersist(false)
+      prevRankGraph.edges.unpersist(false)
+
+      logInfo(s"Parallel Personalized PageRank finished iteration $i.")
+
+      i += 1
+    }
+
+    rankGraph.mapVertices { (vid, attr) =>
+      Vectors.fromBreeze(attr)
+    }
+  }
+
+  /**
    * Run a dynamic version of PageRank returning a graph with vertex 
attributes containing the
    * PageRank and edge attributes containing the normalized edge weight.
    *

http://git-wip-us.apache.org/repos/asf/spark/blob/1fec3ce4/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala
----------------------------------------------------------------------
diff --git 
a/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala 
b/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala
index bdff314..b6305c8 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala
@@ -118,11 +118,29 @@ class PageRankSuite extends SparkFunSuite with 
LocalSparkContext {
       val dynamicRanks = starGraph.personalizedPageRank(0, 0, 
resetProb).vertices.cache()
       assert(compareRanks(staticRanks2, dynamicRanks) < errorTol)
 
+      val parallelStaticRanks1 = starGraph
+        .staticParallelPersonalizedPageRank(Array(0), 1, 
resetProb).mapVertices {
+          case (vertexId, vector) => vector(0)
+        }.vertices.cache()
+      assert(compareRanks(staticRanks1, parallelStaticRanks1) < errorTol)
+
+      val parallelStaticRanks2 = starGraph
+        .staticParallelPersonalizedPageRank(Array(0, 1), 2, 
resetProb).mapVertices {
+          case (vertexId, vector) => vector(0)
+        }.vertices.cache()
+      assert(compareRanks(staticRanks2, parallelStaticRanks2) < errorTol)
+
       // We have one outbound edge from 1 to 0
       val otherStaticRanks2 = starGraph.staticPersonalizedPageRank(1, numIter 
= 2, resetProb)
         .vertices.cache()
       val otherDynamicRanks = starGraph.personalizedPageRank(1, 0, 
resetProb).vertices.cache()
+      val otherParallelStaticRanks2 = starGraph
+        .staticParallelPersonalizedPageRank(Array(0, 1), 2, 
resetProb).mapVertices {
+          case (vertexId, vector) => vector(1)
+        }.vertices.cache()
       assert(compareRanks(otherDynamicRanks, otherStaticRanks2) < errorTol)
+      assert(compareRanks(otherStaticRanks2, otherParallelStaticRanks2) < 
errorTol)
+      assert(compareRanks(otherDynamicRanks, otherParallelStaticRanks2) < 
errorTol)
     }
   } // end of test Star PersonalPageRank
 
@@ -177,6 +195,12 @@ class PageRankSuite extends SparkFunSuite with 
LocalSparkContext {
       val dynamicRanks = chain.personalizedPageRank(4, tol, resetProb).vertices
 
       assert(compareRanks(staticRanks, dynamicRanks) < errorTol)
+
+      val parallelStaticRanks = chain
+        .staticParallelPersonalizedPageRank(Array(4), numIter, 
resetProb).mapVertices {
+          case (vertexId, vector) => vector(0)
+        }.vertices.cache()
+      assert(compareRanks(staticRanks, parallelStaticRanks) < errorTol)
     }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to