EshwarSR commented on a change in pull request #24230: [SPARK-27295][GraphX] 
Provision to provide the initial scores for source nodes while running 
Personalized Page Rank
URL: https://github.com/apache/spark/pull/24230#discussion_r270094583
 
 

 ##########
 File path: graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
 ##########
 @@ -196,11 +196,45 @@ object PageRank extends Logging {
     require(sources.nonEmpty, s"The list of sources must be non-empty," +
       s" but got ${sources.mkString("[", ",", "]")}")
 
+    val sourcesWithScores = sources zip Array.fill(sources.size)(1.0)
+    runParallelPersonalizedPageRank(graph, numIter, resetProb, 
sourcesWithScores)
+  }
+
+  /**
+   * Run Personalized PageRank for a fixed number of iterations, for a
+   * set of starting nodes in parallel. Returns a graph with vertex attributes
+   * containing the pagerank relative to all starting nodes (as a sparse 
vector) and
+   * edge attributes the normalized edge weight
+   *
+   * @tparam VD The original vertex attribute (not used)
+   * @tparam ED The original edge attribute (not used)
+   *
+   * @param graph The graph on which to compute personalized pagerank
+   * @param numIter The number of iterations to run
+   * @param resetProb The random reset probability
+   * @param sources The list of (source, initial score) to compute 
personalized pagerank from
+   * @return the graph with vertex attributes
+   *         containing the pagerank relative to all starting nodes (as a 
sparse vector
+   *         indexed by the position of nodes in the sources list) and
+   *         edge attributes the normalized edge weight
+   */
+  def runParallelPersonalizedPageRank[VD: ClassTag, ED: ClassTag](
+      graph: Graph[VD, ED],
+      numIter: Int,
+      resetProb: Double = 0.15,
+      sources: Array[(VertexId, Double)]): Graph[Vector, Double] = {
+    require(numIter > 0, s"Number of iterations must be greater than 0," +
+      s" but got ${numIter}")
+    require(resetProb >= 0 && resetProb <= 1, s"Random reset probability must 
belong" +
+      s" to [0, 1], but got ${resetProb}")
+    require(sources.nonEmpty, s"The list of sources must be non-empty," +
+      s" but got ${sources.mkString("[", ",", "]")}")
+
     val zero = Vectors.sparse(sources.size, List()).asBreeze
-    // map of vid -> vector where for each vid, the _position of vid in 
source_ is set to 1.0
+    // map of vid -> vector where for each vid, the _position of vid in 
source_ is set to provided score
     val sourcesInitMap = sources.zipWithIndex.map { case (vid, i) =>
 
 Review comment:
   Done

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to