Github user dbtsai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/14998#discussion_r78089652
  
    --- Diff: graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala 
---
    @@ -163,6 +166,85 @@ object PageRank extends Logging {
       }
     
       /**
    +   * Run Personalized PageRank for a fixed number of iterations, for a
    +   * set of starting nodes in parallel. Returns a graph with vertex 
attributes
    +   * containing the pagerank relative to all starting nodes (as a sparse 
vector) and
    +   * edge attributes the normalized edge weight
    +   *
    +   * @tparam VD The original vertex attribute (not used)
    +   * @tparam ED The original edge attribute (not used)
    +   *
    +   * @param graph The graph on which to compute personalized pagerank
    +   * @param numIter The number of iterations to run
    +   * @param resetProb The random reset probability
    +   * @param sources The list of sources to compute personalized pagerank 
from
    +   * @return the graph with vertex attributes
    +   *         containing the pagerank relative to all starting nodes (as a 
sparse vector) and
    +   *         edge attributes the normalized edge weight
    +   */
    +  def runParallelPersonalizedPageRank[VD: ClassTag, ED: ClassTag](graph: 
Graph[VD, ED],
    +    numIter: Int, resetProb: Double = 0.15,
    +    sources: Array[VertexId]): Graph[Vector, Double] = {
    +    // TODO if one sources vertex id is outside of the int range
    +    // we won't be able to store its activations in a sparse vector
    +    val zero = Vectors.sparse(sources.size, List()).asBreeze
    +    val sourcesInitMap = sources.zipWithIndex.map { case (vid, i) =>
    +      val v = Vectors.sparse(sources.size, Array(i), 
Array(resetProb)).asBreeze
    +      (vid, v)
    +    }.toMap
    +    val sc = graph.vertices.sparkContext
    +    val sourcesInitMapBC = sc.broadcast(sourcesInitMap)
    +    // Initialize the PageRank graph with each edge attribute having
    +    // weight 1/outDegree and each source vertex with attribute 1.0.
    +    var rankGraph = graph
    +      // Associate the degree with each vertex
    +      .outerJoinVertices(graph.outDegrees) { (vid, vdata, deg) => 
deg.getOrElse(0) }
    +      // Set the weight on the edges based on the degree
    +      .mapTriplets(e => 1.0 / e.srcAttr, TripletFields.Src)
    +      .mapVertices { (vid, attr) =>
    +        if (sourcesInitMapBC.value contains vid) {
    +          sourcesInitMapBC.value(vid)
    +        } else {
    +          zero
    +        }
    +      }
    +
    +    var i = 0
    +    while (i < numIter) {
    +      val prevRankGraph = rankGraph
    +      // Propagates the message along outbound edges
    +      // and adding start nodes back in with activation resetProb
    +      val rankUpdates = rankGraph.aggregateMessages[BV[Double]](
    +        ctx => ctx.sendToDst(ctx.srcAttr :* ctx.attr),
    +        (a : BV[Double], b : BV[Double]) => a :+ b, TripletFields.Src)
    +
    +      rankGraph = rankGraph.joinVertices(rankUpdates) {
    +        (vid, oldRank, msgSum) =>
    +          val popActivations: BV[Double] = msgSum :* (1.0 - resetProb)
    +          val resetActivations = if (sourcesInitMapBC.value contains vid) {
    +            sourcesInitMapBC.value(vid)
    +          } else {
    +            zero
    +          }
    +          popActivations :+ resetActivations
    +        }.cache()
    +
    +      rankGraph.edges.foreachPartition(x => {}) // also materializes 
rankGraph.vertices
    +      prevRankGraph.vertices.unpersist(false)
    +      prevRankGraph.edges.unpersist(false)
    +
    --- End diff --
    
    add `sourcesInitMapBC.destory(false)`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to