edgeList.add(
new DefaultEdge<Text, NullWritable>(
new Text(tokens[i]),
null
)
);
This is the issue: NullWritable and null are not the same.
You should replace null with NullWritable.get(), or even better use:
new EdgeNoValue<Text>(new Text(tokens[i]))
Let me know if this doesn't work.
From: Zachary Hanif <[email protected]<mailto:[email protected]>>
Reply-To: "[email protected]<mailto:[email protected]>"
<[email protected]<mailto:[email protected]>>
Date: Wednesday, February 13, 2013 1:11 PM
To: "[email protected]<mailto:[email protected]>"
<[email protected]<mailto:[email protected]>>
Subject: Re: Giraph/Netty issues on a cluster
Sure thing!
Credit where it's due, this is heavily cribbed from
https://github.com/castagna/jena-grande/tree/master/src/main/java/org/apache/jena/grande/giraph
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.giraph.bsp.BspUtils;
import org.apache.giraph.vertex.Vertex;
import org.apache.giraph.graph.DefaultEdge;
import org.apache.giraph.graph.Edge;
import org.apache.giraph.io.formats.TextVertexInputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
public class TestingVertexInputFormat extends TextVertexInputFormat<Text,
DoubleWritable, NullWritable, DoubleWritable> {
private static final Logger log =
LoggerFactory.getLogger(TestingVertexReader.class);
@Override
public TextVertexReader createVertexReader(InputSplit split,
TaskAttemptContext context) throws IOException {
return new TestingVertexReader();
}
public class TestingVertexReader extends TextVertexInputFormat<Text,
DoubleWritable, NullWritable, DoubleWritable>.TextVertexReader {
@Override
public boolean nextVertex() throws IOException, InterruptedException {
boolean result = getRecordReader().nextKeyValue();
return result;
}
@Override
public Vertex<Text, DoubleWritable, NullWritable, DoubleWritable>
getCurrentVertex() throws IOException, InterruptedException {
Configuration conf = getContext().getConfiguration();
String line = getRecordReader().getCurrentValue().toString();
Vertex<Text, DoubleWritable, NullWritable, DoubleWritable> vertex =
BspUtils.createVertex(conf);
log.info<http://log.info>("tokens() --> {}", "originalString = ",
line);
String tokens[] = line.split(",");
Text vertexId = new Text(tokens[0]);
DoubleWritable vertexValue = new
DoubleWritable(Double.valueOf(tokens[1]));
List<Edge<Text, NullWritable>> edgeList = Lists.newArrayList();
for ( int i = 2; i < tokens.length; i++ ) {
if ( !tokens[0].equals(tokens[i]) ) {
edgeList.add(
new DefaultEdge<Text, NullWritable>(
new Text(tokens[i]),
null
)
);
}
}
if(vertexValue.get() != -1.0 || vertexValue.get() != 1.0){
vertexValue = new DoubleWritable(Double.valueOf(1.0));
log.info<http://log.info>("tokens() --> {}", "val1 = ",
tokens[0]);
log.info<http://log.info>("tokens() --> {}", "val2 = ",
tokens[1]);
log.info<http://log.info>("tokens() --> {}", "val2 = ", line);
log.info<http://log.info>("tokens() --> {}", "key = ",
vertexId);
log.info<http://log.info>("tokens() --> {}", "value = ",
vertexValue);
}
vertex.initialize ( vertexId, vertexValue, edgeList );
return vertex;
}
}
}
On Wed, Feb 13, 2013 at 3:59 PM, Alessandro Presta
<[email protected]<mailto:[email protected]>> wrote:
Can you post your VertexInputFormat code?
From: Zachary Hanif <[email protected]<mailto:[email protected]>>
Reply-To: "[email protected]<mailto:[email protected]>"
<[email protected]<mailto:[email protected]>>
Date: Wednesday, February 13, 2013 12:31 PM
To: "[email protected]<mailto:[email protected]>"
<[email protected]<mailto:[email protected]>>
Subject: Re: Giraph/Netty issues on a cluster
It is my own code. I'm staring at my VertexInputFormat class right now. It
extends TextVertexInputFormat<Text, DoubleWritable, NullWritable,
DoubleWritable>. I cannot imagine why a value would not be set for these
vertexes, but I'll drop in some code to more stringently ensure value creation.
Why would this begin to fail on a distributed deployment (multiple workers) but
not with a single worker? The dataset is identical between the two executions.
On Wed, Feb 13, 2013 at 2:35 PM, Alessandro Presta
<[email protected]<mailto:[email protected]>> wrote:
Hi Zachary,
Are you running one of the examples or your own code?
It seems to me that a call to edge.getValue() is returning null, which should
never happen.
Alessandro
From: Zachary Hanif <[email protected]<mailto:[email protected]>>
Reply-To: "[email protected]<mailto:[email protected]>"
<[email protected]<mailto:[email protected]>>
Date: Wednesday, February 13, 2013 11:29 AM
To: "[email protected]<mailto:[email protected]>"
<[email protected]<mailto:[email protected]>>
Subject: Giraph/Netty issues on a cluster
(How embarrassing! I forgot a subject header in a previous attempt to post
this. Please reply to this thread, not the other.)
Hi everyone,
I am having some odd issues when trying to run a Giraph 0.2 job across my CDH
3u3 cluster. After building the jar, and deploying it across the cluster, I
start to notice a handful of my nodes reporting the following error:
2013-02-13 17:47:43,341 WARN
org.apache.giraph.comm.netty.handler.ResponseClientHandler: exceptionCaught:
Channel failed with remote address
<EDITED_INTERNAL_DNS>/10.2.0.16:30001<http://10.2.0.16:30001>
java.lang.NullPointerException
at
org.apache.giraph.vertex.EdgeListVertexBase.write(EdgeListVertexBase.java:106)
at
org.apache.giraph.partition.SimplePartition.write(SimplePartition.java:169)
at
org.apache.giraph.comm.requests.SendVertexRequest.writeRequest(SendVertexRequest.java:71)
at
org.apache.giraph.comm.requests.WritableRequest.write(WritableRequest.java:127)
at
org.apache.giraph.comm.netty.handler.RequestEncoder.encode(RequestEncoder.java:96)
at
org.jboss.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:61)
at
org.jboss.netty.handler.execution.ExecutionHandler.handleDownstream(ExecutionHandler.java:185)
at org.jboss.netty.channel.Channels.write(Channels.java:712)
at org.jboss.netty.channel.Channels.write(Channels.java:679)
at org.jboss.netty.channel.AbstractChannel.write(AbstractChannel.java:246)
at
org.apache.giraph.comm.netty.NettyClient.sendWritableRequest(NettyClient.java:655)
at
org.apache.giraph.comm.netty.NettyWorkerClient.sendWritableRequest(NettyWorkerClient.java:144)
at
org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor.doRequest(NettyWorkerClientRequestProcessor.java:425)
at
org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor.sendPartitionRequest(NettyWorkerClientRequestProcessor.java:195)
at
org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor.flush(NettyWorkerClientRequestProcessor.java:365)
at
org.apache.giraph.worker.InputSplitsCallable.call(InputSplitsCallable.java:190)
at
org.apache.giraph.worker.InputSplitsCallable.call(InputSplitsCallable.java:58)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
at java.util.concurrent.FutureTask.run(FutureTask.java:166)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
at java.lang.Thread.run(Thread.java:722)
What would be causing this? All other Hadoop jobs run well on the cluster, and
when the Giraph job is run with only one worker, it completes without any
issues. When run with any number of workers >1, the above error occurs. I have
referenced this
post<http://mail-archives.apache.org/mod_mbox/giraph-user/201209.mbox/%3ccaeq6y7shc4in-l73nr7abizspmrrfw9sfa8tmi3myqml8vk...@mail.gmail.com%3E>
where superficially similar issues were discussed, but the root cause appears
to be different, and suggested methods of resolution are not panning out.
As extra background, the 'remote address' changes, as the error cycles through
my available cluster nodes, and the failing workers do not seem to favor one
physical machine over another. Not all nodes present this issue, only a handful
per job. Is there soemthing simple that I am missing?