Github user rxin commented on a diff in the pull request:
https://github.com/apache/spark/pull/3100#discussion_r20126158
--- Diff: graphx/src/main/scala/org/apache/spark/graphx/Graph.scala ---
@@ -326,8 +336,54 @@ abstract class Graph[VD: ClassTag, ED: ClassTag]
protected () extends Serializab
: VertexRDD[A]
/**
- * Joins the vertices with entries in the `table` RDD and merges the
results using `mapFunc`. The
- * input table should contain at most one entry for each vertex. If no
entry in `other` is
+ * Aggregates values from the neighboring edges and vertices of each
vertex. The user-supplied
+ * `sendMsg` function is invoked on each edge of the graph, generating 0
or more messages to be
+ * sent to either vertex in the edge. The `mergeMsg` function is then
used to combine all messages
+ * destined to the same vertex.
+ *
+ * @tparam A the type of message to be sent to each vertex
+ *
+ * @param sendMsg runs on each edge, sending messages to neighboring
vertices using the
+ * [[EdgeContext]].
+ * @param mergeMsg used to combine messages from `sendMsg` destined to
the same vertex. This
+ * combiner should be commutative and associative.
+ * @param tripletFields which fields should be included in the
[[EdgeContext]] passed to the
+ * `sendMsg` function. If not all fields are needed, specifying this
can improve performance.
+ * @param activeSetOpt an efficient way to run the aggregation on a
subset of the edges if
+ * desired. This is done by specifying a set of "active" vertices and
an edge direction. The
+ * `sendMsg` function will then run on only edges connected to active
vertices by edges in the
+ * specified direction. If the direction is `In`, `sendMsg` will only
be run on edges with
+ * destination in the active set. If the direction is `Out`, `sendMsg`
will only be run on edges
+ * originating from vertices in the active set. If the direction is
`Either`, `sendMsg` will be
+ * run on edges with *either* vertex in the active set. If the
direction is `Both`, `sendMsg`
+ * will be run on edges with *both* vertices in the active set. The
active set must have the
+ * same index as the graph's vertices.
+ *
+ * @example We can use this function to compute the in-degree of each
+ * vertex
+ * {{{
+ * val rawGraph: Graph[_, _] = Graph.textFile("twittergraph")
+ * val inDeg: RDD[(VertexId, Int)] =
+ * aggregateMessages[Int](ctx => ctx.sendToDst(1), _ + _)
+ * }}}
+ *
+ * @note By expressing computation at the edge level we achieve
+ * maximum parallelism. This is one of the core functions in the
+ * Graph API in that enables neighborhood level computation. For
+ * example this function can be used to count neighbors satisfying a
+ * predicate or implement PageRank.
+ *
+ */
+ def aggregateMessages[A: ClassTag](
+ sendMsg: EdgeContext[VD, ED, A] => Unit,
+ mergeMsg: (A, A) => A,
+ tripletFields: TripletFields = TripletFields.All,
+ activeSetOpt: Option[(VertexRDD[_], EdgeDirection)] = None)
--- End diff --
it'd be better to create two versions of this, where the first version is
public and doesn't have activeSetOpt. Then create an internal private version
that have all four fields. basically it'd be better to not expose activeSetOpt
since it is kinda complicated and scary to new users.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]