Github user rxin commented on a diff in the pull request:
https://github.com/apache/spark/pull/3054#discussion_r20124833
--- Diff:
graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala ---
@@ -30,54 +31,71 @@ import
org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap
* @tparam ED the edge attribute type
* @tparam VD the vertex attribute type
*
- * @param srcIds the source vertex id of each edge
- * @param dstIds the destination vertex id of each edge
+ * @param localSrcIds the local source vertex id of each edge as an index
into `local2global` and
+ * `vertexAttrs`
+ * @param localDstIds the local destination vertex id of each edge as an
index into `local2global`
+ * and `vertexAttrs`
* @param data the attribute associated with each edge
- * @param index a clustered index on source vertex id
- * @param vertices a map from referenced vertex ids to their corresponding
attributes. Must
- * contain all vertex ids from `srcIds` and `dstIds`, though not
necessarily valid attributes for
- * those vertex ids. The mask is not used.
+ * @param index a clustered index on source vertex id as a map from each
global source vertex id to
+ * the offset in the edge arrays where the cluster for that vertex id
begins
+ * @param global2local a map from referenced vertex ids to local ids which
index into vertexAttrs
+ * @param local2global an array of global vertex ids where the offsets are
local vertex ids
+ * @param vertexAttrs an array of vertex attributes where the offsets are
local vertex ids
* @param activeSet an optional active vertex set for filtering
computation on the edges
*/
private[graphx]
class EdgePartition[
@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED:
ClassTag, VD: ClassTag](
- val srcIds: Array[VertexId] = null,
- val dstIds: Array[VertexId] = null,
+ val localSrcIds: Array[Int] = null,
+ val localDstIds: Array[Int] = null,
val data: Array[ED] = null,
val index: GraphXPrimitiveKeyOpenHashMap[VertexId, Int] = null,
- val vertices: VertexPartition[VD] = null,
+ val global2local: GraphXPrimitiveKeyOpenHashMap[VertexId, Int] = null,
+ val local2global: Array[VertexId] = null,
+ val vertexAttrs: Array[VD] = null,
val activeSet: Option[VertexSet] = None
) extends Serializable {
/** Return a new `EdgePartition` with the specified edge data. */
- def withData[ED2: ClassTag](data_ : Array[ED2]): EdgePartition[ED2, VD]
= {
- new EdgePartition(srcIds, dstIds, data_, index, vertices, activeSet)
- }
-
- /** Return a new `EdgePartition` with the specified vertex partition. */
- def withVertices[VD2: ClassTag](
- vertices_ : VertexPartition[VD2]): EdgePartition[ED, VD2] = {
- new EdgePartition(srcIds, dstIds, data, index, vertices_, activeSet)
+ def withData[ED2: ClassTag](data: Array[ED2]): EdgePartition[ED2, VD] = {
+ new EdgePartition(
+ localSrcIds, localDstIds, data, index, global2local, local2global,
vertexAttrs, activeSet)
}
/** Return a new `EdgePartition` with the specified active set, provided
as an iterator. */
def withActiveSet(iter: Iterator[VertexId]): EdgePartition[ED, VD] = {
- val newActiveSet = new VertexSet
- iter.foreach(newActiveSet.add(_))
- new EdgePartition(srcIds, dstIds, data, index, vertices,
Some(newActiveSet))
- }
-
- /** Return a new `EdgePartition` with the specified active set. */
- def withActiveSet(activeSet_ : Option[VertexSet]): EdgePartition[ED, VD]
= {
- new EdgePartition(srcIds, dstIds, data, index, vertices, activeSet_)
+ val activeSet = new VertexSet
+ while (iter.hasNext) { activeSet.add(iter.next()) }
+ new EdgePartition(
+ localSrcIds, localDstIds, data, index, global2local, local2global,
vertexAttrs,
+ Some(activeSet))
}
/** Return a new `EdgePartition` with updates to vertex attributes
specified in `iter`. */
def updateVertices(iter: Iterator[(VertexId, VD)]): EdgePartition[ED,
VD] = {
- this.withVertices(vertices.innerJoinKeepLeft(iter))
+ val newVertexAttrs = new Array[VD](vertexAttrs.length)
+ System.arraycopy(vertexAttrs, 0, newVertexAttrs, 0, vertexAttrs.length)
+ while (iter.hasNext) {
+ val kv = iter.next()
+ newVertexAttrs(global2local(kv._1)) = kv._2
+ }
+ new EdgePartition(
+ localSrcIds, localDstIds, data, index, global2local, local2global,
newVertexAttrs,
+ activeSet)
}
+ /** Return a new `EdgePartition` without any locally cached vertex
attributes. */
+ def clearVertices[VD2: ClassTag](): EdgePartition[ED, VD2] = {
+ val newVertexAttrs = new Array[VD2](vertexAttrs.length)
+ new EdgePartition(
+ localSrcIds, localDstIds, data, index, global2local, local2global,
newVertexAttrs,
+ activeSet)
+ }
+
+ private def srcIds(pos: Int): VertexId = local2global(localSrcIds(pos))
--- End diff --
u can add @inline to both srcIds and dstIds
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]