Github user avulanov commented on a diff in the pull request:
https://github.com/apache/spark/pull/3883#discussion_r35625660
--- Diff: graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala ---
@@ -158,4 +159,91 @@ object Pregel extends Logging {
g
} // end of apply
+
+ /**
+ * An additional functionality for [[Pregel.apply()]] using
`aggregateMessages`
+ *
+ * @tparam VD the vertex data type
+ * @tparam ED the edge data type
+ * @tparam A the Pregel message type
+ * @param graph the input graph.
+ * @param initialMsg the message each vertex will receive at the on
+ * the first iteration
+ * @param maxIterations the maximum number of iterations to run for
+ *
+ * @param tripletFields which fields should be included in the
[[EdgeContext]]
+ * passed to the `sendMsg` function. If not all fields are needed,
+ * specifying this can improve performance.
+ *
+ * @param vprog the user-defined vertex program which runs on each
+ * vertex and receives the inbound message and computes a new vertex
+ * value. On the first iteration the vertex program is invoked on
+ * all vertices and is passed the default message. On subsequent
+ * iterations the vertex program is only invoked on those vertices
+ * that receive messages.
+ *
+ * @param sendMsg a user supplied function that is applied to out
+ * edges of vertices that received messages in the current
+ * iteration
+ *
+ * @param mergeMsg a user supplied function that takes two incoming
+ * messages of type A and merges them into a single message of type
+ * A. ''This function must be commutative and associative and
+ * ideally the size of A should not increase.''
+ *
+ * @return the resulting graph at the end of the computation
+ */
+ def apply2[VD: ClassTag, ED: ClassTag, A: ClassTag]
+ (graph: Graph[VD, ED],
+ initialMsg: A,
+ maxIterations: Int = Int.MaxValue,
+ tripletFields: TripletFields = TripletFields.All)
+ (vprog: (VertexId, VD, A) => VD,
+ sendMsg: EdgeContext[VD, ED, A] => Unit,
+ mergeMsg: (A, A) => A)
+ : Graph[VD, ED] = {
+
+ var g = graph.mapVertices((vid, vdata) => vprog(vid, vdata,
initialMsg)).cache()
+ // compute the messages
+ var messages = g.aggregateMessages(sendMsg, mergeMsg)
+ var activeMessages = messages.count()
+ // Loop
+ var prevG: Graph[VD, ED] = null
+ var i = 0
+ while (activeMessages > 0 && i < maxIterations) {
+ // Receive the messages. Vertices that didn't get any messages do
not appear in newVerts.
+ val newVerts = g.vertices.innerJoin(messages)(vprog).cache()
+ // Update the graph with the new vertices.
+ prevG = g
+ g = g.outerJoinVertices(newVerts) { (vid, old, newOpt) =>
newOpt.getOrElse(old) }
--- End diff --
Just realized that it would not work, because the active set of vertices is
needed that will emit the messages during the aggregation stage. It seems that
you need to use `aggregateMessagesWithActiveSet` instead of `aggregateMessages`.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]