[ https://issues.apache.org/jira/browse/SPARK-5036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14263786#comment-14263786 ]
Apache Spark commented on SPARK-5036: ------------------------------------- User 'shijinkui' has created a pull request for this issue: https://github.com/apache/spark/pull/3866 > Better support sending partial messages in Pregel API > ----------------------------------------------------- > > Key: SPARK-5036 > URL: https://issues.apache.org/jira/browse/SPARK-5036 > Project: Spark > Issue Type: Improvement > Components: GraphX > Reporter: sjk > Attachments: s1.jpeg, s2.jpeg > > > Better support sending partial messages in Pregel API > 1. the reqirement > In many iterative graph algorithms, only a part of the vertexes (we call them > ActiveVertexes) need to send messages to their neighbours in each iteration. > In many cases, ActiveVertexes are the vertexes that their attributes do not > change between the previous and current iteration. To implement this > requirement, we can use Pregel API + a flag (e.g., `bool isAttrChanged`) in > each vertex's attribute. > However, after `aggregateMessage` or `mapReduceTriplets` of each iteration, > we need to reset this flag to the init value in every vertex, which needs a > heavy `joinVertices`. > We find a more efficient way to meet this requirement and want to discuss it > here. > Look at a simple example as follows: > In i-th iteartion, the previous attribute of each vertex is `Attr` and the > newly computed attribute is `NewAttr`: > |VID| Attr| NewAttr| Neighbours| > |:----|:-----|:----|:------| > | 1 | 4| 5| 2, 3 | > | 2 | 3| 2| 1, 4 | > | 3 | 2| 2| 1, 4 | > | 4| 3| 4| 1, 2, 3 | > Our requirement is that: > 1. Set each vertex's `Attr` to be `NewAttr` in i-th iteration > 2. For each vertex whose `Attr!=NewAttr`, send message to its neighbours > in the next iteration's `aggregateMessage`. > We found it is hard to implement this requirment using current Pregel API > efficiently. The reason is that we not only need to perform `pregel()` to > compute the `NewAttr` (2) but also need to perform `outJoin()` to satisfy > (1). > A simple idea is to keep a `isAttrChanged:Boolean` (solution 1) or > `flag:Int` (solution 2) in each vertex's attribute. > 2. two solution > ----------- > 2.1 solution 1: label and reset `isAttrChanged:Boolean` of Vertex Attr > ![alt text](s1.jpeg "Title") > 1. init message by `aggregateMessage` > it return a messageRDD > 2. `innerJoin` > compute the messages on the received vertex, return a new VertexRDD > which have the computed value by customed logic function `vprog`, set > `isAttrChanged = true` > 3. `outerJoinVertices` > update the changed vertex to the whole graph. now the graph is new. > 4. `aggregateMessage`. it return a messageRDD > 5. `joinVertices` reset erery `isAttrChanged` of Vertex attr to false > ``` > // here reset the isAttrChanged to false > g = updateG.joinVertices(updateG.vertices) { > (vid, oriVertex, updateGVertex) => updateGVertex.reset() > } > ``` > here need to reset the vertex attribute object's variable as false > if don't reset the `isAttrChanged`, it will send message next iteration > directly. > **result:** > * Edge: 890041895 > * Vertex: 181640208 > * Iterate: 150 times > * Cost total: 8.4h > * can't run until the 0 message > solution 2. color vertex > ![alt text](s2.jpeg "Title") > iterate process: > 1. innerJoin > `vprog` using as a partial function, looks like `vprog(curIter, _: > VertexId, _: VD, _: A)` > ` i = i + 1; val curIter = i`. > in `vprog`, user can fetch `curIter` and assign to `falg`. > 2. outerJoinVertices > `graph = graph.outerJoinVertices(changedVerts) { (vid, old, newOpt) => > newOpt.getOrElse(old)}.cache()` > 3. aggregateMessages > sendMsg is partial function, looks like `sendMsg(curIter, _: > EdgeContext[VD, ED, A]` > **in `sendMsg`, compare `curIter` with `flag`, determine whether > sending message** > #### result > raw data from > * vertex: 181640208 > * edge: 890041895 > | | iteration average cost | 150 iteration cost | 420 iteration cost | > | ------------ | ------------- | ------------ | ------------ | > | solution 1 | 188m | 7.8h | cannot finish | > | solution 2 | 24 | 1.2h | 3.1h | > | compare | 7x | 6.5x | finished in 3.1 | > > ## the end > > i think the second solution(Pregel + a flag) is better. > this can really support the iterative graph algorithms which only part of the > vertexes send messages to their neighbours in each iteration. > we shall use it in product environment. > pr: https://github.com/apache/spark/pull/3866 > ----EOF---- -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org