Im trying to find communities in FB graphs with Giraph on Hadoop. Im
using Netbeans and have imported all related libraries on the project
(and installed Hadoop and giraph on ubuntu 14.04 desktop) I Have
converted all graph data
(http://snap.stanford.edu/data/egonets-Facebook.html) in a txt file as
follows:
"328#306,275,4,218,78,195,181,273#127,76,78,55,4,132"
which means user_id: 328 ,
user_friends_in_fb(edges):306,275,4,218,78,195,181,273
,properties_of_user: anonimized_feature_number {127,76,78,55,4,132}
I want to import all data into vertexes and edges and then perform
mapReduce jobs on them, but I am not familiar with the process...
I have created 3 classes : FbGraphInputFormat , FbGraphOutputFormat,
FbGraphState
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import org.apache.giraph.edge.Edge;
import org.apache.giraph.edge.EdgeFactory;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.io.formats.TextVertexInputFormat;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
public class FbGraphInputFormat extends TextVertexInputFormat<Text,
FbGraphState, DoubleWritable> {
@Override
public TextVertexReader createVertexReader(InputSplit is,
TaskAttemptContext tac) throws IOException {
return new FbGraphReader();
}
protected class FbGraphReader extends TextVertexReader {
@Override
public boolean nextVertex() throws IOException,
InterruptedException {
return getRecordReader().nextKeyValue();
}
@Override
public Vertex<Text, FbGraphState, DoubleWritable>
getCurrentVertex() throws IOException, InterruptedException {
String line = getRecordReader().getCurrentValue().toString();
String[] tokens = line.trim().split("#");
if (tokens.length < 2) {
throw new IllegalArgumentException("Invalid line: (" +
line + ")");
}
FbGraphState state = new FbGraphState();
Text id = new Text(tokens[0]);
state.setValue(id.toString());
state.setNodeWeight(1.0);
Map<Text, DoubleWritable> edgeMap = new HashMap<>();
ArrayList<Edge<Text, DoubleWritable>> edgesList = new
ArrayList<>();
String[] edges = (tokens.length > 2) ? tokens[1].split(",")
: new String[0];
for (int i = 0; i < edges.length; i++) {
double weight = 1.0;
Text edgeKey = new Text(edges[i]);
edgeMap.put(edgeKey, new DoubleWritable(weight));
// edgesList.add(EdgeFactory.create(new
// LongWritable(edgeKey),new LongWritable(weight)));
}
for (Map.Entry<Text, DoubleWritable> entry
: edgeMap.entrySet()) {
edgesList.add(EdgeFactory.create(entry.getKey(),
entry.getValue()));
}
Vertex<Text, FbGraphState, DoubleWritable> vertex =
this.getConf().createVertex();
vertex.initialize(id, state, edgesList);
return vertex;
}
}
--------------------------------------------------
import java.io.IOException;
import org.apache.giraph.edge.Edge;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.io.formats.TextVertexOutputFormat;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
public class FbGraphOutputFormat extends
TextVertexOutputFormat<Text,FbGraphState,DoubleWritable>{
@Override
public TextVertexWriter createVertexWriter(TaskAttemptContext tac)
throws IOException, InterruptedException {
return new FbGraphWriter();
}
private static class FbGraphWriter extends TextVertexWriter {
public FbGraphWriter() {
}
@Override
public void writeVertex(Vertex<Text, FbGraphState,
DoubleWritable> vertex) throws IOException, InterruptedException {
StringBuilder b = new StringBuilder();
b.append(vertex.getValue().getValue());
b.append("\t");
b.append(vertex.getValue().getNodeWeight());
b.append("\t");
for (Edge<Text,DoubleWritable> e: vertex.getEdges()){
b.append(e.getTargetVertexId());
b.append(":");
b.append(e.getValue());
b.append(",");
}
b.setLength(b.length() - 1);
getRecordWriter().write(vertex.getId(), new
Text(b.toString()));
}
}
}
-------------------------------------------------
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
public class FbGraphState implements Writable {
private String value;
private double nodeWeight;
public FbGraphState() {
}
public void write(DataOutput d) throws IOException {
WritableUtils.writeString(d, value);
d.writeDouble(nodeWeight);
}
@Override
public void readFields(DataInput di) throws IOException {
value = WritableUtils.readString(di);
nodeWeight = di.readLong();
}
public void setValue(String value) {
this.value = value;
}
public void setNodeWeight(double nodeWeight) {
this.nodeWeight = nodeWeight;
}
public String getValue() {
return value;
}
public double getNodeWeight() {
return nodeWeight;
}
}
Am I correct so far? How do I run this? I mean from netbeans what other
class should I create in order to input the txt file with the data? and
see the results. And if I accomplish that how do I perfom a map reduce
job on the data? are they saved someplace in this manner in order to
later retrieve them? Do I read them from txt file every time and create
vertexes on the fly and perform jobs in the same class?
Is there any example for Graph data with properties input in Giraph?
cause Shortests path example is not a fit for me, I dont have the same
data input format.
Thanx Anu