Haha! Always good to see my problems resulting from my own foolishness :) Per your advice, I replaced
edgeList.add( > new DefaultEdge<Text, NullWritable>( > new Text(tokens[i]), > null > ) > ); > With new EdgeNoValue<Text>(new Text(tokens[i])) > And it worked beautifully. Thank you very much for your assistance! I do, if you have the time, have one remaining question: why did the previous code work when run with only one worker? On Wed, Feb 13, 2013 at 4:15 PM, Alessandro Presta <[email protected]>wrote: > edgeList.add( > new DefaultEdge<Text, NullWritable>( > new Text(tokens[i]), > null > ) > ); > > This is the issue: NullWritable and null are not the same. > You should replace null with NullWritable.get(), or even better use: > > new EdgeNoValue<Text>(new Text(tokens[i])) > > Let me know if this doesn't work. > > From: Zachary Hanif <[email protected]> > Reply-To: "[email protected]" <[email protected]> > Date: Wednesday, February 13, 2013 1:11 PM > > To: "[email protected]" <[email protected]> > Subject: Re: Giraph/Netty issues on a cluster > > Sure thing! > > Credit where it's due, this is heavily cribbed from > https://github.com/castagna/jena-grande/tree/master/src/main/java/org/apache/jena/grande/giraph > > import java.io.IOException; >> import java.util.ArrayList; >> import java.util.List; >> import java.util.Map; >> >> import org.apache.giraph.bsp.BspUtils; >> import org.apache.giraph.vertex.Vertex; >> import org.apache.giraph.graph.DefaultEdge; >> import org.apache.giraph.graph.Edge; >> import org.apache.giraph.io.formats.TextVertexInputFormat; >> import org.apache.hadoop.conf.Configuration; >> import org.apache.hadoop.io.DoubleWritable; >> import org.apache.hadoop.io.FloatWritable; >> import org.apache.hadoop.io.LongWritable; >> import org.apache.hadoop.io.NullWritable; >> import org.apache.hadoop.io.Text; >> import org.apache.hadoop.mapreduce.InputSplit; >> import org.apache.hadoop.mapreduce.TaskAttemptContext; >> import org.slf4j.Logger; >> import org.slf4j.LoggerFactory; >> >> import com.google.common.collect.Lists; >> import com.google.common.collect.Maps; >> >> >> public class TestingVertexInputFormat extends TextVertexInputFormat<Text, >> DoubleWritable, NullWritable, DoubleWritable> { >> >> private static final Logger log = >> LoggerFactory.getLogger(TestingVertexReader.class); >> >> @Override >> public TextVertexReader createVertexReader(InputSplit split, >> TaskAttemptContext context) throws IOException { >> return new TestingVertexReader(); >> } >> >> public class TestingVertexReader extends TextVertexInputFormat<Text, >> DoubleWritable, NullWritable, DoubleWritable>.TextVertexReader { >> >> @Override >> public boolean nextVertex() throws IOException, >> InterruptedException { >> boolean result = getRecordReader().nextKeyValue(); >> return result; >> } >> >> @Override >> public Vertex<Text, DoubleWritable, NullWritable, DoubleWritable> >> getCurrentVertex() throws IOException, InterruptedException { >> Configuration conf = getContext().getConfiguration(); >> String line = getRecordReader().getCurrentValue().toString(); >> Vertex<Text, DoubleWritable, NullWritable, DoubleWritable> >> vertex = BspUtils.createVertex(conf); >> log.info("tokens() --> {}", "originalString = ", line); >> String tokens[] = line.split(","); >> Text vertexId = new Text(tokens[0]); >> >> DoubleWritable vertexValue = new >> DoubleWritable(Double.valueOf(tokens[1])); >> List<Edge<Text, NullWritable>> edgeList = >> Lists.newArrayList(); >> for ( int i = 2; i < tokens.length; i++ ) { >> if ( !tokens[0].equals(tokens[i]) ) { >> edgeList.add( >> new DefaultEdge<Text, NullWritable>( >> new Text(tokens[i]), >> null >> ) >> ); >> } >> } >> if(vertexValue.get() != -1.0 || vertexValue.get() != 1.0){ >> vertexValue = new DoubleWritable(Double.valueOf(1.0)); >> log.info("tokens() --> {}", "val1 = ", tokens[0]); >> log.info("tokens() --> {}", "val2 = ", tokens[1]); >> log.info("tokens() --> {}", "val2 = ", line); >> log.info("tokens() --> {}", "key = ", vertexId); >> log.info("tokens() --> {}", "value = ", vertexValue); >> >> } >> vertex.initialize ( vertexId, vertexValue, edgeList ); >> return vertex; >> } >> } >> } >> > > On Wed, Feb 13, 2013 at 3:59 PM, Alessandro Presta <[email protected]>wrote: > >> Can you post your VertexInputFormat code? >> >> From: Zachary Hanif <[email protected]> >> Reply-To: "[email protected]" <[email protected]> >> Date: Wednesday, February 13, 2013 12:31 PM >> To: "[email protected]" <[email protected]> >> Subject: Re: Giraph/Netty issues on a cluster >> >> It is my own code. I'm staring at my VertexInputFormat class right now. >> It extends TextVertexInputFormat<Text, DoubleWritable, NullWritable, >> DoubleWritable>. I cannot imagine why a value would not be set for these >> vertexes, but I'll drop in some code to more stringently ensure value >> creation. >> >> Why would this begin to fail on a distributed deployment (multiple >> workers) but not with a single worker? The dataset is identical between the >> two executions. >> >> On Wed, Feb 13, 2013 at 2:35 PM, Alessandro Presta <[email protected]>wrote: >> >>> Hi Zachary, >>> >>> Are you running one of the examples or your own code? >>> It seems to me that a call to edge.getValue() is returning null, which >>> should never happen. >>> >>> Alessandro >>> >>> From: Zachary Hanif <[email protected]> >>> Reply-To: "[email protected]" <[email protected]> >>> Date: Wednesday, February 13, 2013 11:29 AM >>> To: "[email protected]" <[email protected]> >>> Subject: Giraph/Netty issues on a cluster >>> >>> (How embarrassing! I forgot a subject header in a previous attempt to >>> post this. Please reply to this thread, not the other.) >>> >>> Hi everyone, >>> >>> I am having some odd issues when trying to run a Giraph 0.2 job across >>> my CDH 3u3 cluster. After building the jar, and deploying it across the >>> cluster, I start to notice a handful of my nodes reporting the following >>> error: >>> >>> 2013-02-13 17:47:43,341 WARN >>>> org.apache.giraph.comm.netty.handler.ResponseClientHandler: >>>> exceptionCaught: Channel failed with remote address <EDITED_INTERNAL_DNS>/ >>>> 10.2.0.16:30001 >>>> java.lang.NullPointerException >>>> at >>>> org.apache.giraph.vertex.EdgeListVertexBase.write(EdgeListVertexBase.java:106) >>>> at >>>> org.apache.giraph.partition.SimplePartition.write(SimplePartition.java:169) >>>> at >>>> org.apache.giraph.comm.requests.SendVertexRequest.writeRequest(SendVertexRequest.java:71) >>>> at >>>> org.apache.giraph.comm.requests.WritableRequest.write(WritableRequest.java:127) >>>> at >>>> org.apache.giraph.comm.netty.handler.RequestEncoder.encode(RequestEncoder.java:96) >>>> at >>>> org.jboss.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:61) >>>> at >>>> org.jboss.netty.handler.execution.ExecutionHandler.handleDownstream(ExecutionHandler.java:185) >>>> at org.jboss.netty.channel.Channels.write(Channels.java:712) >>>> at org.jboss.netty.channel.Channels.write(Channels.java:679) >>>> at >>>> org.jboss.netty.channel.AbstractChannel.write(AbstractChannel.java:246) >>>> at >>>> org.apache.giraph.comm.netty.NettyClient.sendWritableRequest(NettyClient.java:655) >>>> at >>>> org.apache.giraph.comm.netty.NettyWorkerClient.sendWritableRequest(NettyWorkerClient.java:144) >>>> at >>>> org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor.doRequest(NettyWorkerClientRequestProcessor.java:425) >>>> at >>>> org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor.sendPartitionRequest(NettyWorkerClientRequestProcessor.java:195) >>>> at >>>> org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor.flush(NettyWorkerClientRequestProcessor.java:365) >>>> at >>>> org.apache.giraph.worker.InputSplitsCallable.call(InputSplitsCallable.java:190) >>>> at >>>> org.apache.giraph.worker.InputSplitsCallable.call(InputSplitsCallable.java:58) >>>> at >>>> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334) >>>> at java.util.concurrent.FutureTask.run(FutureTask.java:166) >>>> at >>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) >>>> at >>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) >>>> at java.lang.Thread.run(Thread.java:722) >>>> >>> >>> What would be causing this? All other Hadoop jobs run well on the >>> cluster, and when the Giraph job is run with only one worker, it completes >>> without any issues. When run with any number of workers >1, the above error >>> occurs. I have referenced this >>> post<http://mail-archives.apache.org/mod_mbox/giraph-user/201209.mbox/%3ccaeq6y7shc4in-l73nr7abizspmrrfw9sfa8tmi3myqml8vk...@mail.gmail.com%3E>where >>> superficially similar issues were discussed, but the root cause >>> appears to be different, and suggested methods of resolution are not >>> panning out. >>> >>> As extra background, the 'remote address' changes, as the error cycles >>> through my available cluster nodes, and the failing workers do not seem to >>> favor one physical machine over another. Not all nodes present this issue, >>> only a handful per job. Is there soemthing simple that I am missing? >>> >> >> >
