Ok great, thanks. I've been working with 0.1, I can get things to
compile (see below code) but they still are not running, the maps hang
(also below). I have no idea how to fix it, I may consider updating
that code I have that compiles to 0.2 and see if it works then. The
only difference I can see is that 0.2 requires everything have a
"message"
-bash-3.2$ hadoop jar target/giraph-0.1-jar-with-dependencies.jar
com.SimpleGiraphSumEdgeWeights /user/rfcompton/giraphTSPInput
/user/rfcompton/giraphTSPOutput 3 3
13/02/04 15:48:23 INFO mapred.JobClient: Running job: job_201301230932_1199
13/02/04 15:48:24 INFO mapred.JobClient: map 0% reduce 0%
13/02/04 15:48:35 INFO mapred.JobClient: map 25% reduce 0%
13/02/04 15:58:40 INFO mapred.JobClient: Task Id :
attempt_201301230932_1199_m_000003_0, Status : FAILED
java.lang.IllegalStateException: run: Caught an unrecoverable
exception setup: Offlining servers due to exception...
at org.apache.giraph.graph.GraphMapper.run(GraphMapper.java:641)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:647)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:323)
at org.apache.hadoop.mapred.Child$4.run(Child.java:270)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1157)
at org.apache.hadoop.mapred.Child.main(Child.java:264)
Caused by: java.lang.RuntimeException: setup: Offlining servers due to
exception...
at org.apache.giraph.graph.GraphMapper.setup(GraphMapper.java:466)
at org.apache.giraph.graph.GraphMapper.run(GraphMapper.java:630)
... 7 more
Caused by: java.lang.IllegalStateException: setup: loadVertices failed
at org.apache.giraph.graph.BspServiceWorker.setup(BspServiceWorker.java:582)
at org.apache.
Task attempt_201301230932_1199_m_000003_0 failed to report status for
600 seconds. Killing!
13/02/04 15:58:43 INFO mapred.JobClient: Task Id :
attempt_201301230932_1199_m_000002_0, Status : FAILED
Task attempt_201301230932_1199_m_000002_0 failed to report status for
600 seconds. Killing!
13/02/04 15:58:43 INFO mapred.JobClient: Task Id :
attempt_201301230932_1199_m_000000_0, Status : FAILED
=================================================================================================
This is the code I was using:
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import org.apache.giraph.comm.ArrayListWritable;
import org.apache.giraph.graph.BasicVertex;
import org.apache.giraph.graph.BspUtils;
import org.apache.giraph.graph.GiraphJob;
import org.apache.giraph.graph.EdgeListVertex;
import org.apache.giraph.graph.VertexReader;
import org.apache.giraph.graph.VertexWriter;
import org.apache.giraph.lib.TextVertexInputFormat;
import org.apache.giraph.lib.TextVertexInputFormat.TextVertexReader;
import org.apache.giraph.lib.TextVertexOutputFormat;
import org.apache.giraph.lib.TextVertexOutputFormat.TextVertexWriter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.log4j.Logger;
import org.json.JSONArray;
import org.json.JSONException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
import java.util.StringTokenizer;
/**
* Shows an example of a brute-force implementation of the Travelling
Salesman Problem
*/
public class SimpleGiraphSumEdgeWeights extends
EdgeListVertex<LongWritable, ArrayListWritable<DoubleWritable>,
FloatWritable, ArrayListWritable<Text>> implements Tool {
/** Configuration */
private Configuration conf;
/** Class logger */
private static final Logger LOG =
Logger.getLogger(SimpleGiraphSumEdgeWeights.class);
/** The shortest paths id */
public static String SOURCE_ID = "SimpleShortestPathsVertex.sourceId";
/** Default shortest paths id */
public static long SOURCE_ID_DEFAULT = 1;
/**
* Is this vertex the source id?
*
* @return True if the source id
*/
private boolean isSource() {
return (getVertexId().get() ==
getContext().getConfiguration().getLong(SOURCE_ID,
SOURCE_ID_DEFAULT));
}
public class Message extends ArrayListWritable<Text> {
public Message() {
super();
}
@Override
public void setClass() {
// TODO Auto-generated method stub
}
}
public class Valeur extends ArrayListWritable<DoubleWritable> {
public Valeur() {
super();
}
@Override
public void setClass() {
// TODO Auto-generated method stub
}
}
@Override
public void compute(Iterator<ArrayListWritable<Text>> msgIterator) {
System.out.println("**** LAUNCHING COMPUTATION FOR VERTEX
"+this.getVertexId().get()+", SUPERSTEP "+this.getSuperstep()+"
****");
//We get the source ID, we will need it
String sourceID = new
LongWritable(this.getContext().getConfiguration().getLong(SOURCE_ID,
SOURCE_ID_DEFAULT)).toString();
//We get the total number of verticles, and the current superstep
number, we will need it too
int J=1;
voteToHalt();
}
/**
* VertexInputFormat that supports {@link SimpleGiraphSumEdgeWeights}
*/
public static class SimpleShortestPathsVertexInputFormat extends
TextVertexInputFormat<LongWritable, ArrayListWritable<DoubleWritable>,
FloatWritable,
DoubleWritable> {
@Override
public VertexReader<LongWritable,
ArrayListWritable<DoubleWritable>, FloatWritable, DoubleWritable>
createVertexReader(InputSplit split,
TaskAttemptContext context)
throws IOException {
return new SimpleShortestPathsVertexReader(
textInputFormat.createRecordReader(split, context));
}
}
/**
* VertexReader that supports {@link SimpleGiraphSumEdgeWeights}. In this
* case, the edge values are not used. The files should be in the
* following JSON format:
* JSONArray(<vertex id>, <vertex value>,
* JSONArray(JSONArray(<dest vertex id>, <edge value>), ...))
* Here is an example with vertex id 1, vertex value 4.3, and two edges.
* First edge has a destination vertex 2, edge value 2.1.
* Second edge has a destination vertex 3, edge value 0.7.
* [1,4.3,[[2,2.1],[3,0.7]]]
*/
public static class SimpleShortestPathsVertexReader extends
TextVertexReader<LongWritable,
ArrayListWritable<DoubleWritable>, FloatWritable, DoubleWritable> {
public SimpleShortestPathsVertexReader(
RecordReader<LongWritable, Text> lineRecordReader) {
super(lineRecordReader);
}
public class Valeur extends ArrayListWritable<DoubleWritable> {
public Valeur() {
super();
}
@Override
public void setClass() {
// TODO Auto-generated method stub
}
}
@Override
public BasicVertex<LongWritable,
ArrayListWritable<DoubleWritable>, FloatWritable,
DoubleWritable> getCurrentVertex()
throws IOException, InterruptedException {
BasicVertex<LongWritable, ArrayListWritable<DoubleWritable>,
FloatWritable,
DoubleWritable> vertex = BspUtils.<LongWritable,
ArrayListWritable<DoubleWritable>, FloatWritable,
DoubleWritable>createVertex(getContext().getConfiguration());
Text line = getRecordReader().getCurrentValue();
try {
JSONArray jsonVertex = new JSONArray(line.toString());
LongWritable vertexId = new LongWritable(jsonVertex.getLong(0));
Valeur vertexValue = new Valeur();
vertexValue.add(new DoubleWritable(jsonVertex.getDouble(1)));
Map<LongWritable, FloatWritable> edges = Maps.newHashMap();
JSONArray jsonEdgeArray = jsonVertex.getJSONArray(2);
for (int i = 0; i < jsonEdgeArray.length(); ++i) {
JSONArray jsonEdge = jsonEdgeArray.getJSONArray(i);
edges.put(new LongWritable(jsonEdge.getLong(0)),
new FloatWritable((float) jsonEdge.getDouble(1)));
}
vertex.initialize(vertexId, vertexValue, edges, null);
} catch (JSONException e) {
throw new IllegalArgumentException(
"next: Couldn't get vertex from line " +
line.toString(), e);
}
return vertex;
}
@Override
public boolean nextVertex() throws IOException, InterruptedException {
return getRecordReader().nextKeyValue();
}
}
/**
* VertexOutputFormat that supports {@link SimpleGiraphSumEdgeWeights}
*/
public static class SimpleShortestPathsVertexOutputFormat extends
TextVertexOutputFormat<LongWritable,
ArrayListWritable<DoubleWritable>,
FloatWritable> {
@Override
public VertexWriter<LongWritable,
ArrayListWritable<DoubleWritable>, FloatWritable>
createVertexWriter(TaskAttemptContext context)
throws IOException, InterruptedException {
RecordWriter<Text, Text> recordWriter =
textOutputFormat.getRecordWriter(context);
return new SimpleShortestPathsVertexWriter(recordWriter);
}
}
/**
* VertexWriter that supports {@link SimpleGiraphSumEdgeWeights}
*/
public static class SimpleShortestPathsVertexWriter extends
TextVertexWriter<LongWritable,
ArrayListWritable<DoubleWritable>, FloatWritable> {
public SimpleShortestPathsVertexWriter(
RecordWriter<Text, Text> lineRecordWriter) {
super(lineRecordWriter);
}
@Override
public void writeVertex(BasicVertex<LongWritable,
ArrayListWritable<DoubleWritable>,
FloatWritable, ?> vertex)
throws IOException, InterruptedException {
String sourceID = new
LongWritable(vertex.getContext().getConfiguration().getLong(SOURCE_ID,
SOURCE_ID_DEFAULT)).toString();
JSONArray jsonVertex = new JSONArray();
try {
jsonVertex.put(vertex.getVertexId().get());
jsonVertex.put(vertex.getVertexValue().toString());
JSONArray jsonEdgeArray = new JSONArray();
for (LongWritable targetVertexId : vertex) {
JSONArray jsonEdge = new JSONArray();
jsonEdge.put(targetVertexId.get());
jsonEdge.put(vertex.getEdgeValue(targetVertexId).get());
jsonEdgeArray.put(jsonEdge);
}
jsonVertex.put(jsonEdgeArray);
} catch (JSONException e) {
throw new IllegalArgumentException(
"writeVertex: Couldn't write vertex " + vertex);
}
getRecordWriter().write(new Text(jsonVertex.toString()), null);
}
}
@Override
public Configuration getConf() {
return conf;
}
@Override
public void setConf(Configuration conf) {
this.conf = conf;
}
@Override
public int run(String[] argArray) throws Exception {
Preconditions.checkArgument(argArray.length == 4,
"run: Must have 4 arguments <input path> <output path> " +
"<source vertex id> <# of workers>");
GiraphJob job = new GiraphJob(getConf(), getClass().getName());
job.setVertexClass(getClass());
job.setVertexInputFormatClass(
SimpleShortestPathsVertexInputFormat.class);
job.setVertexOutputFormatClass(
SimpleShortestPathsVertexOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(argArray[0]));
FileOutputFormat.setOutputPath(job, new Path(argArray[1]));
job.getConfiguration().setLong(SimpleGiraphSumEdgeWeights.SOURCE_ID,
Long.parseLong(argArray[2]));
job.setWorkerConfiguration(Integer.parseInt(argArray[3]),
Integer.parseInt(argArray[3]),
100.0f);
return job.run(true) ? 0 : -1;
}
public static void main(String[] args) throws Exception {
System.exit(ToolRunner.run(new SimpleGiraphSumEdgeWeights(), args));
}
}
On Fri, Feb 1, 2013 at 5:37 PM, Eli Reisman <[email protected]> wrote:
> Your best bet is to look over the two code components that users most often
> have to tweak or implement to write application code. That is, the Vertex
> implementations in examples/ and benchmark/ and the IO formats and related
> goodies like RecordReaders etc. that are mostly in the io/ dir. You might
> also take a look at the test suite for some quick ideas of how some of the
> moving parts fit together.
>
> If you have real work to do with Giraph, you're going to need to get used to
> 0.2 and its API. The old API is both limited in what kind of data it will
> process, and not compatible into the future. The API we have now, while
> evolving, is much much closer to being "final" than anything in 0.1 And
> regardless, we now have (in hindsight) the sure knowledge that none of the
> code you write for 0.1 will be portable into the future.
>
> I am first in line to be sorry about the state of the docs. There are
> efforts underway now to fix this. We all owe the users a collective apology
> for this. In lieu of proper apologies, feel free to ask any and all
> questions, no matter how dumb, they can't be as dumb as mine! The codebase
> is under heavy development and has a lot of confusingly-named moving parts
> so first get used to the plumbing an app writer has to know to function, get
> some apps up and running, then dig into the framework code and it will make
> more sense.
>
> One string to pull on to begin to look inside the framework is bin/giraph ->
> org.apache.giraph.GiraphRunner (hands job to Hadoop) -> ... ->
> o.a.g.graph.GraphMapper (is a mapper instance on a Hadoop cluster, started
> according to the Job sumbitted to Hadoop, but running our BSP code instead)
> -> o.a.g.graph.GraphTaskManager -> lots of places from there...
>
> The overarching BSP activity management for a single job run is basically
> all stemming out of GraphTaskManager now. You can look at setup() and
> execute() and get a decent idea of the major events in a job run, and where
> to look to get a better peek under the hood at any given task or event. Good
> luck!
>
>
> On Fri, Feb 1, 2013 at 4:59 PM, Gustavo Enrique Salazar Torres
> <[email protected]> wrote:
>>
>> Hi Ryan:
>>
>> It's the simplest thing:
>> 1. Define your type of parameters for a type of Vertex (for example
>> EdgeListVertex)
>> 2. Implement compute method.
>>
>> From what I saw out there in the M/R world, Giraph provides the simplest
>> way to work with graphs.
>>
>> Take a look at
>> https://cwiki.apache.org/confluence/display/GIRAPH/Shortest+Paths+Example
>> and use release 0.1 (http://www.apache.org/dyn/closer.cgi/incubator/giraph/)
>> because 0.2-SNAPSHOT is under heavy work.
>>
>> Hope this helps you.
>>
>> Gustavo
>>
>> On Fri, Feb 1, 2013 at 9:17 PM, Ryan Compton <[email protected]>
>> wrote:
>>>
>>> I am having trouble understand what all the classes do and the
>>> documentation looks like it might be out of date. I searched around
>>> and found this: https://github.com/edaboussi/Giraph but it won't
>>> compile with 0.2, any suggestions?
>>
>>
>>
>>
>