[ 
https://issues.apache.org/jira/browse/GIRAPH-114?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Avery Ching reassigned GIRAPH-114:
----------------------------------

    Assignee: Sebastian Schelter
    
> Inconsistent message map handling in 
> BasicRPCCommunications.LargeMessageFlushExecutor
> -------------------------------------------------------------------------------------
>
>                 Key: GIRAPH-114
>                 URL: https://issues.apache.org/jira/browse/GIRAPH-114
>             Project: Giraph
>          Issue Type: Bug
>    Affects Versions: 0.70.0
>            Reporter: Sebastian Schelter
>            Assignee: Sebastian Schelter
>            Priority: Critical
>         Attachments: GIRAPH-114.patch
>
>
> I'm currently implementing a simple algorithm to identify all the connected 
> components of a graph. The algorithm ran well in a local IDE unit tests on 
> toy data and in a local single node hadoop instance using a graph of ~100k 
> edges.
> When I tested it on a real cluster with the wikipedia pagelink graph (5.7M 
> vertices, 130M edges), I ran into strange exceptions like this:
> {noformat} 
> 2011-12-21 12:03:57,015 INFO org.apache.hadoop.mapred.TaskInProgress: Error 
> from attempt_201112131541_0034_m_000027_0: java.lang.IllegalStateException: 
> run: Caught an unrecoverable exception flush: Got ExecutionException
>       at org.apache.giraph.graph.GraphMapper.run(GraphMapper.java:641)
>       at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763)
>       at org.apache.hadoop.mapred.MapTask.run(MapTask.java:369)
>       at org.apache.hadoop.mapred.Child$4.run(Child.java:259)
>       at java.security.AccessController.doPrivileged(Native Method)
>       at javax.security.auth.Subject.doAs(Subject.java:396)
>       at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059)
>       at org.apache.hadoop.mapred.Child.main(Child.java:253)
> Caused by: java.lang.IllegalStateException: flush: Got ExecutionException
>       at 
> org.apache.giraph.comm.BasicRPCCommunications.flush(BasicRPCCommunications.java:946)
>       at 
> org.apache.giraph.graph.BspServiceWorker.finishSuperstep(BspServiceWorker.java:916)
>       at org.apache.giraph.graph.GraphMapper.map(GraphMapper.java:588)
>       at org.apache.giraph.graph.GraphMapper.run(GraphMapper.java:632)
>       ... 7 more
> Caused by: java.util.concurrent.ExecutionException: 
> java.lang.IllegalStateException: run: Impossible for no messages in 1603276
>       at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:222)
>       at java.util.concurrent.FutureTask.get(FutureTask.java:83)
>       at 
> org.apache.giraph.comm.BasicRPCCommunications.flush(BasicRPCCommunications.java:941)
>       ... 10 more
> Caused by: java.lang.IllegalStateException: run: Impossible for no messages 
> in 1603276
>       at 
> org.apache.giraph.comm.BasicRPCCommunications$PeerFlushExecutor.run(BasicRPCCommunications.java:245)
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
>       at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
>       at java.util.concurrent.FutureTask.run(FutureTask.java:138)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>       at java.lang.Thread.run(Thread.java:662)
> {noformat} 
> The exception is thrown because a vertex with no message to send to is found 
> in the datastructure holding the outgoing messages.
> I tracked this behavior down:
> In *BasicRPCCommunications:541-546* the map holding the outgoing messages for 
> vertices of a particular machine is created. It's stored in two places 
> _BasicRPCCommunications.outMessages_ and as member variable 
> _outMessagesPerPeer_ of its _PeerConnection_ :
> {noformat} 
> outMsgMap = new HashMap<I, MsgList<M>>();
> outMessages.put(addrUnresolved, outMsgMap);
> PeerConnection peerConnection = new PeerConnection(outMsgMap, peer, isProxy);
> {noformat} 
>       
> In case that there are a lot of messages available for a particular vertex, a 
> large flush is trigged via _LargeMessageFlushExecutor_ (I guess this only 
> happened in the wikipedia test). During this flush the list of messages for 
> the vertex is sent out and replaced with an empty list in 
> *BasicRPCCommunications:341*
> {noformat}
> outMessageList = peerConnection.outMessagesPerPeer.get(destVertex);
> peerConnection.outMessagesPerPeer.put(destVertex, new MsgList<M>());
> {noformat}
> Now in the last flush that is trigggered at the end of the superstep we 
> encounter an empty message list for the vertex and therefore the exception is 
> thrown in *BasicRPCCommunications:228-247*
> {noformat}
> for (Entry<I, MsgList<M>> entry : 
> peerConnection.outMessagesPerPeer.entrySet()) {
> ...
>   if (entry.getValue().isEmpty()) {
>     throw new IllegalStateException(...);
> }
> {noformat}
> Simply removing the list for the vertex when executing the large flush solved 
> the issue (patch to come).
> I'd like to note that it is generally very dangerous to let different classes 
> have access to a datastructure directly and it produces subtle bugs like 
> this. It would be better to think of a centralized way of handling the 
> datastructure. 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

Reply via email to