Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3054#discussion_r20125227
  
    --- Diff: 
graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala ---
    @@ -285,50 +337,116 @@ class EdgePartition[
       }
     
       /**
    -   * Upgrade the given edge iterator into a triplet iterator.
    +   * Send messages along edges and aggregate them at the receiving 
vertices. Implemented by scanning
    +   * all edges sequentially and filtering them with `idPred`.
    +   *
    +   * @param mapFunc the edge map function which generates messages to 
neighboring vertices
    +   * @param reduceFunc the combiner applied to messages destined to the 
same vertex
    +   * @param mapUsesSrcAttr whether or not `mapFunc` uses the edge's source 
vertex attribute
    +   * @param mapUsesDstAttr whether or not `mapFunc` uses the edge's 
destination vertex attribute
    +   * @param idPred a predicate to filter edges based on their source and 
destination vertex ids
        *
    -   * Be careful not to keep references to the objects from this iterator. 
To improve GC performance
    -   * the same object is re-used in `next()`.
    +   * @return iterator aggregated messages keyed by the receiving vertex id
        */
    -  def upgradeIterator(
    -      edgeIter: Iterator[Edge[ED]], includeSrc: Boolean = true, 
includeDst: Boolean = true)
    -    : Iterator[EdgeTriplet[VD, ED]] = {
    -    new ReusingEdgeTripletIterator(edgeIter, this, includeSrc, includeDst)
    +  def mapReduceTriplets[A: ClassTag](
    +      mapFunc: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
    +      reduceFunc: (A, A) => A,
    +      mapUsesSrcAttr: Boolean,
    +      mapUsesDstAttr: Boolean,
    +      idPred: (VertexId, VertexId) => Boolean): Iterator[(VertexId, A)] = {
    +    val aggregates = new Array[A](vertexAttrs.length)
    +    val bitset = new BitSet(vertexAttrs.length)
    +
    +    var edge = new EdgeTriplet[VD, ED]
    +    var i = 0
    +    while (i < size) {
    +      val localSrcId = localSrcIds(i)
    +      val srcId = local2global(localSrcId)
    +      val localDstId = localDstIds(i)
    +      val dstId = local2global(localDstId)
    +      if (idPred(srcId, dstId)) {
    +        edge.srcId = srcId
    +        edge.dstId = dstId
    +        edge.attr = data(i)
    +        if (mapUsesSrcAttr) { edge.srcAttr = vertexAttrs(localSrcId) }
    +        if (mapUsesDstAttr) { edge.dstAttr = vertexAttrs(localDstId) }
    +
    +        mapFunc(edge).foreach { kv =>
    +          val globalId = kv._1
    +          val msg = kv._2
    +          val localId = if (globalId == srcId) localSrcId else localDstId
    +          if (bitset.get(localId)) {
    +            aggregates(localId) = reduceFunc(aggregates(localId), msg)
    +          } else {
    +            aggregates(localId) = msg
    +            bitset.set(localId)
    +          }
    +        }
    +      }
    +      i += 1
    +    }
    +
    +    bitset.iterator.map { localId => (local2global(localId), 
aggregates(localId)) }
       }
     
       /**
    -   * Get an iterator over the edges in this partition whose source vertex 
ids match srcIdPred. The
    -   * iterator is generated using an index scan, so it is efficient at 
skipping edges that don't
    -   * match srcIdPred.
    +   * Send messages along edges and aggregate them at the receiving 
vertices. Implemented by
    +   * filtering the source vertex index with `srcIdPred`, then scanning 
edge clusters and filtering
    +   * with `dstIdPred`. Both `srcIdPred` and `dstIdPred` must match for an 
edge to run.
        *
    -   * Be careful not to keep references to the objects from this iterator. 
To improve GC performance
    -   * the same object is re-used in `next()`.
    -   */
    -  def indexIterator(srcIdPred: VertexId => Boolean): Iterator[Edge[ED]] =
    -    index.iterator.filter(kv => 
srcIdPred(kv._1)).flatMap(Function.tupled(clusterIterator))
    -
    -  /**
    -   * Get an iterator over the cluster of edges in this partition with 
source vertex id `srcId`. The
    -   * cluster must start at position `index`.
    +   * @param mapFunc the edge map function which generates messages to 
neighboring vertices
    +   * @param reduceFunc the combiner applied to messages destined to the 
same vertex
    +   * @param mapUsesSrcAttr whether or not `mapFunc` uses the edge's source 
vertex attribute
    +   * @param mapUsesDstAttr whether or not `mapFunc` uses the edge's 
destination vertex attribute
    +   * @param srcIdPred a predicate to filter edges based on their source 
vertex id
    +   * @param dstIdPred a predicate to filter edges based on their 
destination vertex id
        *
    -   * Be careful not to keep references to the objects from this iterator. 
To improve GC performance
    -   * the same object is re-used in `next()`.
    +   * @return iterator aggregated messages keyed by the receiving vertex id
        */
    -  private def clusterIterator(srcId: VertexId, index: Int) = new 
Iterator[Edge[ED]] {
    -    private[this] val edge = new Edge[ED]
    -    private[this] var pos = index
    +  def mapReduceTripletsWithIndex[A: ClassTag](
    --- End diff --
    
    mapReduceTripletsIndexScan


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to