"Can you paste your cluster information ?"
What kind of information do you need?How can I get these informations?
"What are your message types?"
The message type is just LongWritable. I don't use collections during the graph
processing. I use collections just to load the input graph but it seems to
works perfectly. Is it possible to avoid allocation of primitive writables
(like LongWritable) to increase performances and use less memory ?
"How you invoke the job?"
Here is the command typed in my terminal to start the job : hadoop jar
hadoop/jars/test-connexity.jar \ lifo.giraph.test.Main \
/test-connexity \ /test-connexity-output \ 10
The first Giraph argument is the input file, the second is the output file and
the last is the number of workers.Please find attached the code of my Giraph
application. Main.java configure and start the job. VertexComputation.java
compute the data and the thow last file define how to load the input and save
the output graph.
PS : I'm not English, so I'm sorry if I do some language mistakes.
Thanks for your help.
Date: Fri, 26 Jul 2013 08:13:22 -0700
From: [email protected]
To: [email protected]
Subject: Re: Scaling Problem
Hi guys,
At some point, we do need to help with a guide for conserving
memory, but this is a generic Java problem. You can work around
it by avoiding objects as much as possible by using primitives
directly. If you need primitive collections see FastUtils,
Trove, etc. Combiners also save a lot of memory for messages.
What are your message types?
Avery
On 7/26/13 6:53 AM, Puneet Jain wrote:
Can you paste your cluster information ? I am also
struggling to make it work on 75M vertices and 100s of million
edges.
On Fri, Jul 26, 2013 at 8:02 AM, jerome
richard <[email protected]>
wrote:
Hi,
I
encountered a critical scaling problem using
Giraph. I made a very simple algorithm to test
Giraph on large graphs : a connexity test. It
works on relatively large graphs (3 072 441 nodes
and 117 185 083 edges) but not on very large graph
(52 000 000 nodes and 2 000 000 000 edges).
In
fact, during the processing of the biggest graph,
Giraph core seems to fail after the superstep 14
(15 on some jobs). The input graph size is 30 GB
stored as text and the output is also stored as
text. 9 working jobs are used to compute the
graph.
Here
is the tracktrace of jobs (this is the same for
the 9 jobs):
java.lang.IllegalStateException: run: Caught an
unrecoverable exception exists: Failed to check
/_hadoopBsp/job_201307260439_0006/_applicationAttemptsDir/0/_superstepDir/97/_addressesAndPartitions
after 3 tries!
at
org.apache.giraph.graph.GraphMapper.run(GraphMapper.java:101)
at
org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)
at
org.apache.hadoop.mapred.MapTask.run(MapTask.java:370)
at
org.apache.hadoop.mapred.Child$4.run(Child.java:255)
at
java.security.AccessController.doPrivileged(Native
Method)
at javax.security.auth.Subject.doAs(Unknown
Source)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1093)
at
org.apache.hadoop.mapred.Child.main(Child.java:249)
Caused by: java.lang.IllegalStateException:
exists: Failed to check
/_hadoopBsp/job_201307260439_0006/_applicationAttemptsDir/0/_superstepDir/97/_addressesAndPartitions
after 3 tries!
at
org.apache.giraph.zk.ZooKeeperExt.exists(ZooKeeperExt.java:369)
at
org.apache.giraph.worker.BspServiceWorker.startSuperstep(BspServiceWorker.java:678)
at
org.apache.giraph.graph.GraphTaskManager.execute(GraphTaskManager.java:248)
at
org.apache.giraph.graph.GraphMapper.run(GraphMapper.java:91)
... 7 more
Could
you help me to solve this problem?
If
you need the code of the program, I can put that
here (the code is relatively tiny).
Thanks,
Jérôme.
--
--Puneet
package lifo.giraph.test;
import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.job.GiraphJob;
import org.apache.giraph.io.formats.GiraphFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.fs.Path;
import java.util.Map;
public class Main
{
//public static int SOURCE_ID = 0;
public static void main(String[] args) throws Exception
{
if(args.length != 3)
{
String err = "Must have 3 arguments: <input path> <output path>
<number of workers>";
throw new IllegalArgumentException(err);
}
String jobName = "Giraph test";
Path inputPath = new Path(args[0]);
Path outputPath = new Path(args[1]);
int nbWorkers = Integer.parseInt(args[2]);
GiraphConfiguration configuration = new GiraphConfiguration();
configuration.setComputationClass(VertexComputation.class);
configuration.setVertexInputFormatClass(VertexInputFormat.class);
configuration.setVertexOutputFormatClass(VertexOutputFormat.class);
configuration.setWorkerConfiguration(nbWorkers, nbWorkers, 100.f);
GiraphFileInputFormat.addVertexInputPath(configuration, inputPath);
GiraphJob job = new GiraphJob(configuration, jobName);
FileOutputFormat.setOutputPath(job.getInternalJob(), outputPath);
//job.getConfiguration().setLong(SOURCE_ID, sourceId);
if(!job.run(true))
System.exit(1);
System.exit(0);
}
}
package lifo.giraph.test;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.graph.Computation;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
public class VertexComputation extends Computation<LongWritable, LongWritable,
NullWritable, LongWritable, LongWritable>
{
@Override
public void compute(final Vertex<LongWritable, LongWritable, NullWritable>
vertex, final Iterable<LongWritable> messages)
{
if(getSuperstep() == 0)
{
sendMessageToAllEdges(vertex, vertex.getValue());
return;
}
long min = Long.MAX_VALUE;
for(final LongWritable m : messages)
if(m.get() < min)
min = m.get();
if(vertex.getValue().get() <= min)
{
vertex.voteToHalt();
return;
}
final LongWritable newValue = new LongWritable(min);
vertex.setValue(newValue);
sendMessageToAllEdges(vertex, newValue);
}
}
package lifo.giraph.test;
import com.google.common.collect.Lists;
import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.io.formats.TextVertexInputFormat;
import org.apache.giraph.edge.Edge;
import org.apache.giraph.edge.EdgeFactory;
import org.apache.giraph.graph.Vertex;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import java.io.IOException;
import java.util.List;
import java.util.regex.Pattern;
/**
* Simple text-based VertexInputFormat for unweighted graphs with long ids.
* Each line consists of: vertex neighbor1 neighbor2 ...
*/
public class VertexInputFormat
extends TextVertexInputFormat<LongWritable, LongWritable, NullWritable>
implements ImmutableClassesGiraphConfigurable<LongWritable,
LongWritable, NullWritable>
{
private ImmutableClassesGiraphConfiguration<LongWritable, LongWritable,
NullWritable> conf;
@Override
public TextVertexReader createVertexReader(InputSplit split,
TaskAttemptContext context) throws IOException
{
return new VertexReader();
}
@Override
public void setConf(ImmutableClassesGiraphConfiguration<LongWritable,
LongWritable, NullWritable> configuration)
{
this.conf = configuration;
}
@Override
public ImmutableClassesGiraphConfiguration<LongWritable, LongWritable,
NullWritable> getConf()
{
return conf;
}
public class VertexReader extends TextVertexInputFormat<LongWritable,
LongWritable, NullWritable>.TextVertexReader
{
// Separator of the vertex and neighbors
private final Pattern separator = Pattern.compile("[\t ]");
@Override
public Vertex<LongWritable, LongWritable, NullWritable>
getCurrentVertex() throws IOException, InterruptedException
{
Vertex<LongWritable, LongWritable, NullWritable> vertex =
conf.createVertex();
String[] tokens =
separator.split(getRecordReader().getCurrentValue().toString());
List<Edge<LongWritable, NullWritable>> edges =
Lists.newArrayListWithCapacity(tokens.length - 1);
for(int n=1 ; n<tokens.length ; n++)
edges.add(EdgeFactory.create(new
LongWritable(Long.parseLong(tokens[n])), NullWritable.get()));
final long vertexId = Long.parseLong(tokens[0]);
vertex.initialize(new LongWritable(vertexId), new
LongWritable(vertexId), edges);
return vertex;
}
@Override
public boolean nextVertex() throws IOException, InterruptedException
{
return getRecordReader().nextKeyValue();
}
}
}
package lifo.giraph.test;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.io.formats.TextVertexOutputFormat;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import java.io.IOException;
import java.util.LinkedList;
import java.util.HashSet;
public class VertexOutputFormat extends TextVertexOutputFormat<LongWritable,
LongWritable, NullWritable>
{
private class VertexWriter extends TextVertexWriter
{
@Override
public void writeVertex(Vertex<LongWritable, LongWritable,
NullWritable> vertex) throws IOException, InterruptedException
{
final String nodeId = vertex.getId().toString();
final String result = vertex.getValue().toString();
getRecordWriter().write(new Text(nodeId), new Text(result));
}
}
@Override
public TextVertexWriter createVertexWriter(TaskAttemptContext context)
throws IOException, InterruptedException
{
return new VertexWriter();
}
}