Hi,
I was running a giraph job where I constantly got the following
communication related errors. The symptom is that in super step 0, most of
the workers succeeded but a few of the workers produced the errors below,
the machines that caused the connection reset are different in each failed
worker. To rule out the probability of the cluster setup error, I also ran
a different job and it worked fine. So, the error must be caused by this
particular giraph job. My giraph job is just normal message propagation
type of job, except that the message is not a of a unique type. Therefore,
I defined a special message type (also copied in this email) that
incorporates two different types of messages: integer message and double
array message. I have tried all day but still couldn't ping point the
source of the bug. Can anyone give me some hints on what may have caused
this error?
Thanks a lot,
java.lang.IllegalStateException: run: Caught an unrecoverable exception
flush: Got ExecutionException
at
org.apache.giraph.graph.GraphMapper.run(GraphMapper.java:859)
at
org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:369)
at org.apache.hadoop.mapred.Child$4.run(Child.java:259)
at java.security.AccessController.doPrivileged(Native
Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059)
at org.apache.hadoop.mapred.Child.main(Child.java:253)
Caused by: java.lang.IllegalStateException: flush: Got ExecutionException
at
org.apache.giraph.comm.BasicRPCCommunications.flush(BasicRPCCommunications.java:1082)
at
org.apache.giraph.graph.BspServiceWorker.finishSuperstep(BspServiceWorker.java:1080)
at
org.apache.giraph.graph.GraphMapper.map(GraphMapper.java:806)
at
org.apache.giraph.graph.GraphMapper.run(GraphMapper.java:850)
... 7 more
Caused by: java.util.concurrent.ExecutionException:
java.lang.reflect.UndeclaredThrowableException
at
java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:222)
at
java.util.concurrent.FutureTask.get(FutureTask.java:83)
at
org.apache.giraph.comm.BasicRPCCommunications.flush(BasicRPCCommunications.java:1077)
... 10 more
Caused by: java.lang.reflect.UndeclaredThrowableException
at $Proxy3.getName(Unknown Source)
at
org.apache.giraph.comm.BasicRPCCommunications$PeerFlushExecutor.run(BasicRPCCommunications.java:335)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
at
java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
at
java.util.concurrent.FutureTask.run(FutureTask.java:138)
at
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)
Caused by: java.io.IOException: Call to
idp35.almaden.ibm.com/172.16.0.35:30083 failed on local exception:
java.io.IOException: Connection reset by peer
at
org.apache.hadoop.ipc.Client.wrapException(Client.java:1065)
at org.apache.hadoop.ipc.Client.call(Client.java:1033)
at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:224)
... 8 more
Caused by: java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcher.read0(Native Method)
at
sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:21)
at
sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:202)
at sun.nio.ch.IOUtil.read(IOUtil.java:175)
at
sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:243)
at
org.apache.hadoop.net.SocketInputStream$Reader.performIO(SocketInputStream.java:55)
at
org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:142)
at
org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:155)
at
org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:128)
at
java.io.FilterInputStream.read(FilterInputStream.java:116)
at
org.apache.hadoop.ipc.Client$Connection$PingInputStream.read(Client.java:343)
at
java.io.BufferedInputStream.fill(BufferedInputStream.java:218)
at
java.io.BufferedInputStream.read(BufferedInputStream.java:237)
at
java.io.DataInputStream.readInt(DataInputStream.java:370)
at
org.apache.hadoop.ipc.Client$Connection.receiveResponse(Client.java:767)
at
org.apache.hadoop.ipc.Client$Connection.run(Client.java:712)
My special messge type:
public class MyMessageWritable implements Writable{
public byte msgType=0;
public long vertexID=-1;
public double[] arrayMsg=null;
public int intMsg=-1;
public MyMessageWritable ()
{
}
public MyMessageWritable (long id, byte tp, int msg)
{
vertexID=id;
msgType=tp;
intMsg=msg;
}
public MyMessageWritable (long id, byte tp, double[] arr)
{
vertexID=id;
msgType=tp;
arrayMsg=arr;
}
@Override
public void readFields(DataInput in) throws IOException {
vertexID=in.readLong();
msgType=in.readByte();
switch(msgType)
{
case 1:
case 4:
intMsg=in.readInt();
break;
case 2:
case 3:
if(arrayMsg==null)
arrayMsg=new double[MyVertex.K];
for(int i=0; i<MyVertex.K; i++)
arrayMsg[i]=in.readDouble();
break;
default:
throw new IOException("message type
invalid: "+msgType);
}
}
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(vertexID);
out.writeByte(msgType);
switch(msgType)
{
case 1:
case 4:
out.writeInt(intMsg);
break;
case 2:
case 3:
if(arrayMsg==null)
throw new IOException("array message is
null");
for(int i=0; i<MyVertex.K; i++)
out.writeDouble(arrayMsg[i]);
break;
default:
throw new IOException("message type
invalid: "+msgType);
}
}