Github user avulanov commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3883#discussion_r35621506
  
    --- Diff: graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala ---
    @@ -158,4 +159,91 @@ object Pregel extends Logging {
         g
       } // end of apply
     
    +
    +  /**
    +   * An additional functionality for [[Pregel.apply()]] using 
`aggregateMessages`
    +   *
    +   * @tparam VD the vertex data type
    +   * @tparam ED the edge data type
    +   * @tparam A the Pregel message type
    +   * @param graph the input graph.
    +   * @param initialMsg the message each vertex will receive at the on
    +   * the first iteration
    +   * @param maxIterations the maximum number of iterations to run for
    +   *
    +   * @param tripletFields which fields should be included in the 
[[EdgeContext]]
    +   * passed to the `sendMsg` function. If not all fields are needed,
    +   * specifying this can improve performance.
    +   *
    +   * @param vprog the user-defined vertex program which runs on each
    +   * vertex and receives the inbound message and computes a new vertex
    +   * value.  On the first iteration the vertex program is invoked on
    +   * all vertices and is passed the default message.  On subsequent
    +   * iterations the vertex program is only invoked on those vertices
    +   * that receive messages.
    +   *
    +   * @param sendMsg a user supplied function that is applied to out
    +   * edges of vertices that received messages in the current
    +   * iteration
    +   *
    +   * @param mergeMsg a user supplied function that takes two incoming
    +   * messages of type A and merges them into a single message of type
    +   * A.  ''This function must be commutative and associative and
    +   * ideally the size of A should not increase.''
    +   *
    +   * @return the resulting graph at the end of the computation
    +   */
    +  def apply2[VD: ClassTag, ED: ClassTag, A: ClassTag]
    +  (graph: Graph[VD, ED],
    +    initialMsg: A,
    +    maxIterations: Int = Int.MaxValue,
    +    tripletFields: TripletFields = TripletFields.All)
    +    (vprog: (VertexId, VD, A) => VD,
    +      sendMsg: EdgeContext[VD, ED, A] => Unit,
    +      mergeMsg: (A, A) => A)
    +  : Graph[VD, ED] = {
    +
    +    var g = graph.mapVertices((vid, vdata) => vprog(vid, vdata, 
initialMsg)).cache()
    +    // compute the messages
    +    var messages = g.aggregateMessages(sendMsg, mergeMsg)
    +    var activeMessages = messages.count()
    +    // Loop
    +    var prevG: Graph[VD, ED] = null
    +    var i = 0
    +    while (activeMessages > 0 && i < maxIterations) {
    +      // Receive the messages. Vertices that didn't get any messages do 
not appear in newVerts.
    +      val newVerts = g.vertices.innerJoin(messages)(vprog).cache()
    +      // Update the graph with the new vertices.
    +      prevG = g
    +      g = g.outerJoinVertices(newVerts) { (vid, old, newOpt) => 
newOpt.getOrElse(old) }
    --- End diff --
    
    It seems that `newVerts` is no longer needed as a separate variable because 
`aggregateMessages` does not need a working set as `mapReduceTriplets` does. If 
`newVerts` is removed from the lines 215 and 235 then I think that the 
following should work
    ```
          g = g.outerJoinVertices(messages) { (vid, old, mess) => mess match {
            case Some(mess) => vprog(vid, old, mess)
            case None => old }
          }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to