Hi, as my tests all fail around the same code, i must start thinking there must be a problem there :) though the code is quite simple and the failures happen for different reasons (once is even a divide by zero).
Basically i've refactored BasicRPCCommunications.putMsg*(), putVertexIdMessagesList(). As the previous code, I need to collect the set of vertexIds that are directed to vertices that don't exist in the current partition, as they are going to be resolved later (resolveVertexIndexSet in the trunk code). The reason why i changed this code is that it is dependent on the inMessages which is a hashmap that's not immutable, as it is the case of out-of-core sequencefiles. basically the trunk code does: for (partition in partitions) for (vertex in partition.vertices) messages = inMessages.remove(vertex) [...] leaving inMessages at the end of the for loops with just the messages directed to the aforementioned soon-to-be-created vertices. As this strategy doesn't work out for me, I changed the putMsg* methods to check it online in something like this: @Override public final void putMsg(I vertex, M msg) throws IOException { if (LOG.isDebugEnabled()) { LOG.debug("putMsg: Adding msg " + msg + " on vertex " + vertex); } checkForMessageToNonExistentVertex(vertex); inMessages.addMessage(vertex, msg); } where: private void checkForMessageToNonExistentVertex(I vertexId) { // Populate resolveVertexIndexSet in case a message was sent // to a non-existent vertex. It will be taken care of later by // vertexResolver at prepareSuperstep() Partition<I, V, E, M> partition; if ((partition = service.getPartition(vertexId)) == null) { throw new IllegalStateException( "Impossible that this worker " + service.getWorkerInfo() + " was sent " + blablablablab...; } else { if (partition.getVertex(vertexId) == null) { synchronized(nonExistentVerticesIndexSet) { nonExistentVerticesIndexSet.add(vertexId); } } } } so I can collect the vertexIds as they come without without searching for them later (which i actually cannot). basically instead of creating resolveVertexIds at the end i spread it on the incoming message management. As all my failures happen in this latest method, once because hashPartitioner divides by zero, once because it returns null for a vertex it actually owns etc, and given I haven't changed any code connected with partitioning whatsoever, i guess it must be due to some thread-safety of this code. So my question is: 1) can i access safely service.getPartition() and all the stuff underneath it inside of the RPC code such as putMsg(), putMsgList() etc? 2) if not, what strategy do you suggest to collect the nonExistentVerticesIndexSet that i need later on at prepareSuperstep()? -- Claudio Martella claudio.marte...@gmail.com