Hello,
I am not sure this is the right way to submit my problem with giraph otherwise
I'm sorry.I have developped an algorithm inspired from the giraph shortest path
example but where the graphis constructed during the first supersteps and
before the shortest path search. However, my application works fine when used
with one worker (on one machine). However when more workers (4 on one machine
or on a cluster) are used the following error often appears:
...2013-12-31 16:27:33,472 INFO org.apache.giraph.comm.netty.NettyClient: Using
Netty without authentication.2013-12-31 16:27:33,478 INFO
org.apache.giraph.comm.netty.NettyServer: start: Using Netty without
authentication.2013-12-31 16:27:33,480 INFO
org.apache.giraph.comm.netty.NettyServer: start: Using Netty without
authentication.2013-12-31 16:27:33,482 INFO
org.apache.giraph.comm.netty.NettyServer: start: Using Netty without
authentication.2013-12-31 16:27:33,484 INFO
org.apache.giraph.comm.netty.NettyClient: Using Netty without
authentication.2013-12-31 16:27:33,485 INFO
org.apache.giraph.comm.netty.NettyClient: Using Netty without
authentication.2013-12-31 16:27:33,487 INFO
org.apache.giraph.comm.netty.NettyClient: Using Netty without
authentication.2013-12-31 16:27:33,494 INFO
org.apache.giraph.comm.netty.NettyClient: connectAllAddresses: Successfully
added 4 connections, (4 total connected) 0 failed, 0 failures total.2013-12-31
16:27:33,501 INFO org.apache.giraph.worker.BspServiceWorker: loadInputSplits:
Using 1 thread(s), originally 1 threads(s) for 1 total splits.2013-12-31
16:27:33,508 INFO org.apache.giraph.comm.SendPartitionCache:
SendPartitionCache: maxVerticesPerTransfer = 100002013-12-31 16:27:33,508 INFO
org.apache.giraph.comm.SendPartitionCache: SendPartitionCache:
maxEdgesPerTransfer = 800002013-12-31 16:27:33,524 INFO
org.apache.giraph.worker.InputSplitsCallable: call: Loaded 0 input splits in
0.020270009 secs, (v=0, e=0) 0.0 vertices/sec, 0.0 edges/sec2013-12-31
16:27:33,527 INFO org.apache.giraph.comm.netty.NettyClient: waitAllRequests:
Finished all requests. MBytes/sec sent = 0, MBytes/sec received = 0, MBytesSent
= 0, MBytesReceived = 0, ave sent req MBytes = 0, ave received req MBytes = 0,
secs waited = 0.6562013-12-31 16:27:33,527 INFO
org.apache.giraph.worker.BspServiceWorker: setup: Finally loaded a total of
(v=0, e=0)2013-12-31 16:27:33,598 INFO
org.apache.giraph.comm.netty.handler.RequestDecoder: decode: Server window
metrics MBytes/sec sent = 0, MBytes/sec received = 0, MBytesSent = 0,
MBytesReceived = 0, ave sent req MBytes = 0, ave received req MBytes = 0, secs
waited = 0.8162013-12-31 16:27:33,605 WARN
org.apache.giraph.comm.netty.handler.RequestServerHandler: exceptionCaught:
Channel failed with remote address /172.16.45.53:59257java.io.EOFException
at
org.jboss.netty.buffer.ChannelBufferInputStream.checkAvailable(ChannelBufferInputStream.java:231)
at
org.jboss.netty.buffer.ChannelBufferInputStream.readInt(ChannelBufferInputStream.java:174)
at org.apache.giraph.edge.ByteArrayEdges.readFields(ByteArrayEdges.java:172)
at
org.apache.giraph.utils.WritableUtils.reinitializeVertexFromDataInput(WritableUtils.java:480)
at
org.apache.giraph.utils.WritableUtils.readVertexFromDataInput(WritableUtils.java:511)
at
org.apache.giraph.partition.SimplePartition.readFields(SimplePartition.java:126)
at
org.apache.giraph.comm.requests.SendVertexRequest.readFieldsRequest(SendVertexRequest.java:66)
at
org.apache.giraph.comm.requests.WritableRequest.readFields(WritableRequest.java:120)
at
org.apache.giraph.comm.netty.handler.RequestDecoder.decode(RequestDecoder.java:92)
at
org.jboss.netty.handler.codec.oneone.OneToOneDecoder.handleUpstream(OneToOneDecoder.java:72)
at
org.jboss.netty.handler.execution.ChannelEventRunnable.run(ChannelEventRunnable.java:69)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:724)
the code for my vertex compute function :
public class MergeVertex extendsVertex<LongWritable,DoubleWritable,
DoubleWritable, NodeMessage> {
...
/*** * Convert a Vertex Id from its LongWritable format to Point format (2
Element Array Format) * @param lng LongWritable Format of the VertexId
* @return Alignment point Array */ public static int[]
cvtLongToPoint(LongWritable lng){ int[] point={0,0};
point[0]=(int) (lng.get()/1000); point[1]=(int)
(lng.get()% 1000);
return point; }
@Override public void compute(Iterable<NodeMessage> messages)
throws IOException {
int currentId[]= cvtLongToPoint(getId());
if (getSuperstep()==0) {
//NodeValue nv=new NodeValue();
setValue(new DoubleWritable(0d)); }
_signallength=getContext().getConfiguration().getInt("SignalLength",0);
if((getSuperstep() < _signallength && getId().get()!=0L) ||
(getSuperstep()== 0 && getId().get()==0L)){
LongWritable dstId=new LongWritable();
//Nodes which are on Graph "Spine" //Remaining Edges
Construction if(currentId[0]== currentId[1]){
//right Side for
(int i=currentId[1]+1;i<_signallength;i++){
dstId=cvtPointToLong(currentId[0]+1,i);
addVertexRequest(dstId,new DoubleWritable(Double.MAX_VALUE));
addEdgeRequest(getId(),EdgeFactory.create(dstId, new
DoubleWritable(computeCost(getId(),dstId)))); }
//Left Side for
(int i=currentId[0]+2;i<_signallength;i++){
dstId=cvtPointToLong(i,currentId[1]+1);
addVertexRequest(dstId,new DoubleWritable(Double.MAX_VALUE));
addEdgeRequest(getId(),EdgeFactory.create(dstId, new
DoubleWritable(computeCost(getId(),dstId)))); }
//Nodes which are not on Graph "Spine"
//Remaining Edges Construction
}else{
//right Side
if(currentId[0]+1<_signallength){ for
(int i=currentId[1]+1;i<_signallength;i++){
dstId=cvtPointToLong(currentId[0]+1,i);
addEdgeRequest(getId(),EdgeFactory.create(dstId, new
DoubleWritable(computeCost(getId(),dstId))));
} }
//Left Side
if(currentId[1]+1<_signallength){ for
(int i=currentId[0]+2;i<_signallength;i++){
dstId=cvtPointToLong(i,currentId[1]+1);
addEdgeRequest(getId(),EdgeFactory.create(dstId, new
DoubleWritable(computeCost(getId(),dstId))));
} }
}
//No need to other vertex than source to be active
if(getId().get() != 0L){
voteToHalt(); }
}else if (getSuperstep() >= _signallength && getSuperstep() <
2*_signallength){
double minDist; long minSource=0L;
if(getId().get() == 0L){
minDist=0; }else{
minDist=Double.MAX_VALUE; }
for(NodeMessage message : messages){
if(minDist > message.get()){
minDist=message.get();
minSource=message.getSourceID(); }
}
if (minDist < getValue().get()){
setValue(new DoubleWritable(minDist));
for (Edge<LongWritable, DoubleWritable> edge :
getEdges()) { double distance = minDist +
edge.getValue().get();
sendMessage(edge.getTargetVertexId(),
new NodeMessage(distance,getId().get()));
} }
//Only last Node is active
if(currentId[0] != _signallength-1 || currentId[1] != _signallength-1){
voteToHalt(); }
}else if(getSuperstep() >= 2*_signallength){
voteToHalt(); }
} If you need more details please don't hesitate.
Thanks in advance,Chadi