Github user rxin commented on a diff in the pull request:
https://github.com/apache/spark/pull/3100#discussion_r20204491
--- Diff:
graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala ---
@@ -280,55 +344,197 @@ class EdgePartition[
* It is safe to keep references to the objects from this iterator.
*/
def tripletIterator(
- includeSrc: Boolean = true, includeDst: Boolean = true):
Iterator[EdgeTriplet[VD, ED]] = {
- new EdgeTripletIterator(this, includeSrc, includeDst)
+ includeSrc: Boolean = true, includeDst: Boolean = true)
+ : Iterator[EdgeTriplet[VD, ED]] = new Iterator[EdgeTriplet[VD, ED]] {
+ private[this] var pos = 0
+
+ override def hasNext: Boolean = pos < EdgePartition.this.size
+
+ override def next() = {
+ val triplet = new EdgeTriplet[VD, ED]
+ val localSrcId = localSrcIds(pos)
+ val localDstId = localDstIds(pos)
+ triplet.srcId = local2global(localSrcId)
+ triplet.dstId = local2global(localDstId)
+ if (includeSrc) {
+ triplet.srcAttr = vertexAttrs(localSrcId)
+ }
+ if (includeDst) {
+ triplet.dstAttr = vertexAttrs(localDstId)
+ }
+ triplet.attr = data(pos)
+ pos += 1
+ triplet
+ }
}
/**
- * Upgrade the given edge iterator into a triplet iterator.
+ * Send messages along edges and aggregate them at the receiving
vertices. Implemented by scanning
+ * all edges sequentially.
+ *
+ * @param sendMsg generates messages to neighboring vertices of an edge
+ * @param mergeMsg the combiner applied to messages destined to the same
vertex
+ * @param tripletFields which triplet fields `sendMsg` uses
+ * @param srcMustBeActive if true, edges will only be considered if
their source vertex is in the
+ * active set
+ * @param dstMustBeActive if true, edges will only be considered if
their destination vertex is in
+ * the active set
+ * @param maySatisfyEither if true, only one vertex need be in the
active set for an edge to be
+ * considered
*
- * Be careful not to keep references to the objects from this iterator.
To improve GC performance
- * the same object is re-used in `next()`.
+ * @return iterator aggregated messages keyed by the receiving vertex id
*/
- def upgradeIterator(
- edgeIter: Iterator[Edge[ED]], includeSrc: Boolean = true,
includeDst: Boolean = true)
- : Iterator[EdgeTriplet[VD, ED]] = {
- new ReusingEdgeTripletIterator(edgeIter, this, includeSrc, includeDst)
+ def aggregateMessagesEdgeScan[A: ClassTag](
+ sendMsg: EdgeContext[VD, ED, A] => Unit,
+ mergeMsg: (A, A) => A,
+ tripletFields: TripletFields,
+ srcMustBeActive: Boolean,
--- End diff --
note: srcMustBeActive, dstMustBeActive, maySatisfyEither is confusing
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]