Hi guys, i'm having some problems with custom classes in giraph. I made a
VertexInput and Output format, but i always getting the following error:
*java.io.IOException: ensureRemaining: Only * bytes remaining, trying to
read **
with different values where the "*" are placed. This was This problem
happen when a vertexIterator do next(), and there aren't any more vertex
left. This iterator it's invocated from a flush method, but i don't
understand, basically, why the "next()" method is failing. Here are some
logs and classes...
*My log is the following*:
15/09/08 00:52:21 INFO bsp.BspService: BspService: Connecting to ZooKeeper
with job giraph_yarn_application_1441683854213_0001, 1 on localhost:22181
15/09/08 00:52:21 INFO zookeeper.ZooKeeper: Client
environment:zookeeper.version=3.4.5-1392090, built on 09/30/2012 17:52 GMT
15/09/08 00:52:21 INFO zookeeper.ZooKeeper: Client environment:host.name
=localhost
15/09/08 00:52:21 INFO zookeeper.ZooKeeper: Client
environment:java.version=1.7.0_79
15/09/08 00:52:21 INFO zookeeper.ZooKeeper: Client
environment:java.vendor=Oracle Corporation
15/09/08 00:52:21 INFO zookeeper.ZooKeeper: Client
environment:java.home=/usr/lib/jvm/java-7-openjdk-amd64/jre
15/09/08 00:52:21 INFO zookeeper.ZooKeeper: Client
environment:java.class.path=.:${CLASSPATH}:./*:$HADOOP_CONF_DIR:$HADOOP_COMMON_HOME/share/hadoop/common/*:$HADOOP_COMMON_HOME/share/hadoop/common/lib/*:$HADOOP_H$
15/09/08 00:52:21 INFO zookeeper.ZooKeeper: Client
environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/l$
15/09/08 00:52:21 INFO zookeeper.ZooKeeper: Client
environment:java.io.tmpdir=/tmp
15/09/08 00:52:21 INFO zookeeper.ZooKeeper: Client
environment:java.compiler=<NA>
15/09/08 00:52:21 INFO zookeeper.ZooKeeper: Client environment:os.name=Linux
15/09/08 00:52:21 INFO zookeeper.ZooKeeper: Client environment:os.arch=amd64
15/09/08 00:52:21 INFO zookeeper.ZooKeeper: Client
environment:os.version=3.13.0-62-generic
15/09/08 00:52:21 INFO zookeeper.ZooKeeper: Client environment:user.name
=hduser
15/09/08 00:52:21 INFO zookeeper.ZooKeeper: Client
environment:user.home=/home/hduser
15/09/08 00:52:21 INFO zookeeper.ZooKeeper: Client
environment:user.dir=/app/hadoop/tmp/nm-local-dir/usercache/hduser/appcache/application_1441683854213_0001/container_1441683854213_0001_01_000003
15/09/08 00:52:21 INFO zookeeper.ZooKeeper: Initiating client connection,
connectString=localhost:22181 sessionTimeout=60000
watcher=org.apache.giraph.worker.BspServiceWorker@4256d3a0
15/09/08 00:52:21 INFO zookeeper.ClientCnxn: Opening socket connection to
server localhost/127.0.0.1:22181. Will not attempt to authenticate using
SASL (unknown error)
15/09/08 00:52:21 INFO zookeeper.ClientCnxn: Socket connection established
to localhost/127.0.0.1:22181, initiating session
15/09/08 00:52:21 INFO zookeeper.ClientCnxn: Session establishment complete
on server localhost/127.0.0.1:22181, sessionid = 0x14fab0de0bb0002,
negotiated timeout = 40000
15/09/08 00:52:21 INFO bsp.BspService: process: Asynchronous connection
complete.
15/09/08 00:52:21 INFO netty.NettyServer: NettyServer: Using execution
group with 8 threads for requestFrameDecoder.
15/09/08 00:52:21 INFO Configuration.deprecation: mapred.map.tasks is
deprecated. Instead, use mapreduce.job.maps
15/09/08 00:52:21 INFO netty.NettyServer: start: Started server
communication server: localhost/127.0.0.1:30001 with up to 16 threads on
bind attempt 0 with sendBufferSize = 32768 receiveBufferSize = 524288
15/09/08 00:52:21 INFO netty.NettyClient: NettyClient: Using execution
handler with 8 threads after request-encoder.
15/09/08 00:52:21 INFO graph.GraphTaskManager: setup: Registering health of
this worker...
15/09/08 00:52:21 INFO yarn.GiraphYarnTask: [STATUS: task-1] WORKER_ONLY
starting...
15/09/08 00:52:22 INFO bsp.BspService: getJobState: Job state already
exists
(/_hadoopBsp/giraph_yarn_application_1441683854213_0001/_masterJobState)
15/09/08 00:52:22 INFO bsp.BspService: getApplicationAttempt: Node
/_hadoopBsp/giraph_yarn_application_1441683854213_0001/_applicationAttemptsDir
already exists!
15/09/08 00:52:22 INFO bsp.BspService: getApplicationAttempt: Node
/_hadoopBsp/giraph_yarn_application_1441683854213_0001/_applicationAttemptsDir
already exists!
15/09/08 00:52:22 INFO worker.BspServiceWorker: registerHealth: Created my
health node for attempt=0, superstep=-1 with
/_hadoopBsp/giraph_yarn_application_1441683854213_0001/_applicationAttemptsDir/0/_superstepD$
15/09/08 00:52:22 INFO netty.NettyServer: start: Using Netty without
authentication.
15/09/08 00:52:22 INFO bsp.BspService: process:
partitionAssignmentsReadyChanged (partitions are assigned)
15/09/08 00:52:22 INFO worker.BspServiceWorker: startSuperstep:
Master(hostname=localhost, MRtaskID=0, port=30000)
15/09/08 00:52:22 INFO worker.BspServiceWorker: startSuperstep: Ready for
computation on superstep -1 since worker selection and vertex range
assignments are done in /_hadoopBsp/giraph_yarn_application_1441683854$
15/09/08 00:52:22 INFO yarn.GiraphYarnTask: [STATUS: task-1]
startSuperstep: WORKER_ONLY - Attempt=0, Superstep=-1
15/09/08 00:52:22 INFO netty.NettyClient: Using Netty without
authentication.
15/09/08 00:52:22 INFO netty.NettyClient: Using Netty without
authentication.
15/09/08 00:52:22 INFO netty.NettyClient: connectAllAddresses: Successfully
added 2 connections, (2 total connected) 0 failed, 0 failures total.
15/09/08 00:52:22 INFO netty.NettyServer: start: Using Netty without
authentication.
15/09/08 00:52:22 INFO handler.RequestDecoder: decode: Server window
metrics MBytes/sec received = 0, MBytesReceived = 0.0001, ave received req
MBytes = 0.0001, secs waited = 1.44168435E9
15/09/08 00:52:22 INFO worker.BspServiceWorker: loadInputSplits: Using 1
thread(s), originally 1 threads(s) for 1 total splits.
15/09/08 00:52:22 INFO worker.InputSplitsHandler: reserveInputSplit:
Reserved input split path
/_hadoopBsp/giraph_yarn_application_1441683854213_0001/_vertexInputSplitDir/0,
overall roughly 0.0% input splits rese$
15/09/08 00:52:22 INFO worker.InputSplitsCallable: getInputSplit: Reserved
/_hadoopBsp/giraph_yarn_application_1441683854213_0001/_vertexInputSplitDir/0
from ZooKeeper and got input split 'hdfs://hdnode01:54310/u$
15/09/08 00:52:22 INFO worker.InputSplitsCallable: loadFromInputSplit:
Finished loading
/_hadoopBsp/giraph_yarn_application_1441683854213_0001/_vertexInputSplitDir/0
(v=6, e=10)
15/09/08 00:52:22 INFO worker.InputSplitsCallable: call: Loaded 1 input
splits in 0.16241108 secs, (v=6, e=10) 36.94329 vertices/sec, 61.572155
edges/sec
15/09/08 00:52:22 ERROR utils.LogStacktraceCallable: Execution of callable
failed
java.lang.IllegalStateException: next: IOException
at
org.apache.giraph.utils.VertexIterator.next(VertexIterator.java:101)
at
org.apache.giraph.partition.BasicPartition.addPartitionVertices(BasicPartition.java:99)
at
org.apache.giraph.comm.requests.SendWorkerVerticesRequest.doRequest(SendWorkerVerticesRequest.java:115)
at
org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor.doRequest(NettyWorkerClientRequestProcessor.java:466)
at
org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor.flush(NettyWorkerClientRequestProcessor.java:412)
at
org.apache.giraph.worker.InputSplitsCallable.call(InputSplitsCallable.java:241)
at
org.apache.giraph.worker.InputSplitsCallable.call(InputSplitsCallable.java:60)
at
org.apache.giraph.utils.LogStacktraceCallable.call(LogStacktraceCallable.java:51)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: ensureRemaining: Only 0 bytes remaining,
trying to read 1
at
org.apache.giraph.utils.UnsafeReads.ensureRemaining(UnsafeReads.java:77)
at
org.apache.giraph.utils.UnsafeArrayReads.readByte(UnsafeArrayReads.java:123)
at
org.apache.giraph.utils.UnsafeReads.readLine(UnsafeReads.java:100)
at
pruebas.TextAndDoubleComplexWritable.readFields(TextAndDoubleComplexWritable.java:37)
at
org.apache.giraph.utils.WritableUtils.reinitializeVertexFromDataInput(WritableUtils.java:540)
at
org.apache.giraph.utils.VertexIterator.next(VertexIterator.java:98)
... 11 more
15/09/08 00:52:22 ERROR worker.BspServiceWorker: unregisterHealth: Got
failure, unregistering health on
/_hadoopBsp/giraph_yarn_application_1441683854213_0001/_applicationAttemptsDir/0/_superstepDir/-1/_workerHea$
15/09/08 00:52:22 ERROR yarn.GiraphYarnTask: GiraphYarnTask threw a
top-level exception, failing task
java.lang.RuntimeException: run: Caught an unrecoverable exception waitFor:
ExecutionException occurred while waiting for
org.apache.giraph.utils.ProgressableUtils$FutureWaitable@4bbf48f0
at
org.apache.giraph.yarn.GiraphYarnTask.run(GiraphYarnTask.java:104)
at
org.apache.giraph.yarn.GiraphYarnTask.main(GiraphYarnTask.java:183)
Caused by: java.lang.IllegalStateException: waitFor: ExecutionException
occurred while waiting for
org.apache.giraph.utils.ProgressableUtils$FutureWaitable@4bbf48f0
at
org.apache.giraph.utils.ProgressableUtils.waitFor(ProgressableUtils.java:193)
at
org.apache.giraph.utils.ProgressableUtils.waitForever(ProgressableUtils.java:151)
at
org.apache.giraph.utils.ProgressableUtils.waitForever(ProgressableUtils.java:136)
at
org.apache.giraph.utils.ProgressableUtils.getFutureResult(ProgressableUtils.java:99)
at
org.apache.giraph.utils.ProgressableUtils.getResultsWithNCallables(ProgressableUtils.java:233)
at
org.apache.giraph.worker.BspServiceWorker.loadInputSplits(BspServiceWorker.java:316)
at
org.apache.giraph.worker.BspServiceWorker.loadVertices(BspServiceWorker.java:409)
at
org.apache.giraph.worker.BspServiceWorker.setup(BspServiceWorker.java:629)
at
org.apache.giraph.graph.GraphTaskManager.execute(GraphTaskManager.java:284)
at org.apache.giraph.yarn.GiraphYarnTask.run(GiraphYarnTask.java:92)
... 1 more
Caused by: java.util.concurrent.ExecutionException:
java.lang.IllegalStateException: next: IOException
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:202)
at
org.apache.giraph.utils.ProgressableUtils$FutureWaitable.waitFor(ProgressableUtils.java:312)
at
org.apache.giraph.utils.ProgressableUtils.waitFor(ProgressableUtils.java:185)
... 10 more
Caused by: java.lang.IllegalStateException: next: IOException
at
org.apache.giraph.utils.VertexIterator.next(VertexIterator.java:101)
at
org.apache.giraph.partition.BasicPartition.addPartitionVertices(BasicPartition.java:99)
at
org.apache.giraph.comm.requests.SendWorkerVerticesRequest.doRequest(SendWorkerVerticesRequest.java:115)
at
org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor.doRequest(NettyWorkerClientRequestProcessor.java:466)
at
org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor.flush(NettyWorkerClientRequestProcessor.java:412)
at
org.apache.giraph.worker.InputSplitsCallable.call(InputSplitsCallable.java:241)
at
org.apache.giraph.worker.InputSplitsCallable.call(InputSplitsCallable.java:60)
at
org.apache.giraph.utils.LogStacktraceCallable.call(LogStacktraceCallable.java:51)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
*Caused by: java.io.IOException: ensureRemaining: Only 0 bytes remaining,
trying to read 1*
at
org.apache.giraph.utils.UnsafeReads.ensureRemaining(UnsafeReads.java:77)
at
org.apache.giraph.utils.UnsafeArrayReads.readByte(UnsafeArrayReads.java:123)
at
org.apache.giraph.utils.UnsafeReads.readLine(UnsafeReads.java:100)
at
pruebas.TextAndDoubleComplexWritable.readFields(TextAndDoubleComplexWritable.java:37)
at
org.apache.giraph.utils.WritableUtils.reinitializeVertexFromDataInput(WritableUtils.java:540)
at
org.apache.giraph.utils.VertexIterator.next(VertexIterator.java:98)
... 11 more
*My computation class:*
import java.io.IOException;
import org.apache.giraph.conf.StrConfOption;
import org.apache.giraph.edge.Edge;
import org.apache.giraph.examples.Algorithm;
import org.apache.giraph.graph.BasicComputation;
import org.apache.giraph.graph.Vertex;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.log4j.Logger;
import pruebas.TextAndDoubleComplexWritable;
/**
* Busca todos los caminos navegacionales. Adaptacion de
* https://github.com/MarcoLotz
* /GiraphBFSSO/blob/master/src/uk/co/qmul/giraph/structurebfs
* /SimpleBFSStructureComputation.java de @MarcoLotz, a los datos de
wikiquotes
* generados por el proyecto generacion_de_grafo_wikiquotes
*/
@Algorithm(name = "Busqueda de Caminos Navegacionales", description =
"Busca todos los caminos navegacionales de un grafo")
public class BusquedaDeCaminosNavegacionalesWikiquote
extends
BasicComputation<Text, TextAndDoubleComplexWritable,
DoubleWritable, Text> {
/**
* Id origen de la busqueda de caminos navegacionales, indica el primer
* vertice a procesar en el superstep 0
*/
public static final StrConfOption SOURCE_ID = new StrConfOption(
"BusquedaDeCaminosNavegacionales.sourceId", "Portada",
"El vertice de origen, de la busqueda de todos los caminos
navegacionales");
// public static final LongConfOption DEST_ID = new LongConfOption(
// "BusquedaDeCaminosNavegacionales.destId", 2,
// "El vertice de destino, de la busqueda de todos los caminos
navegacionales");
/** Class logger */
private static final Logger LOG = Logger
.getLogger(BusquedaDeCaminosNavegacionalesWikiquote.class);
/**
* Define a maximum number of supersteps
*/
public final int MAX_SUPERSTEPS = 5;
/**
* Is this vertex the source id?
*
* @param vertex
* Vertex
* @return True if the source id
*/
private boolean isStart(Vertex<Text, ?, ?> vertex) {
return vertex.getId().toString()
.equals(SOURCE_ID.get(getConf()).toString());
}
/**
* Send messages to all the connected vertices. The content of the
messages
* is not important, since just the event of receiving a message
removes the
* vertex from the inactive status.
*
* @param vertex
*/
public void BFSMessages(
Vertex<Text, TextAndDoubleComplexWritable, DoubleWritable>
vertex) {
for (Edge<Text, DoubleWritable> edge : vertex.getEdges()) {
sendMessage(edge.getTargetVertexId(), vertex.getId());
}
}
@Override
public void compute(
Vertex<Text, TextAndDoubleComplexWritable, DoubleWritable>
vertex,
Iterable<Text> messages) throws IOException {
// Forces convergence in maximum superstep
if (!(getSuperstep() == MAX_SUPERSTEPS)) {
// Only start vertex should work in the first superstep
// All the other should vote to halt and wait for
// messages.
// Si se corre desde JUnit, el primer superstep es el 1, pero al
// invocar desde la linea de comandos es 0
if (getSuperstep() == 0) {
if (isStart(vertex)) {
vertex.getValue().setVertexData(new
Double(getSuperstep()));
BFSMessages(vertex);
if (LOG.isInfoEnabled()) {
LOG.info("[Start Vertex] Vertex ID: " +
vertex.getId());
}
} else { // Initialise with infinite depth other vertex
vertex.getValue().setVertexData(
new Double(Integer.MAX_VALUE));
}
}
// if it is not the first Superstep (Superstep 0) :
// Check vertex ID
else {
// It is the first time that this vertex is being computed
if (vertex.getValue().getVertexData() == Integer.MAX_VALUE)
{
// The depth has the same value that the superstep
vertex.getValue().setVertexData(new
Double(getSuperstep()));
String idsDeVerticesPredecesores = "";
for (Text message : messages) {
idsDeVerticesPredecesores += message.toString();
}
vertex.getValue().setIds_vertices_anteriores(
idsDeVerticesPredecesores);
// Continue on the structure
BFSMessages(vertex);
}
// Else this vertex was already analysed in a previous
// iteration.
}
vertex.voteToHalt();
}
}
}
My
*Input format:*package pruebas;
import org.apache.giraph.edge.Edge;
import org.apache.giraph.edge.EdgeFactory;
import org.apache.giraph.io.formats.AdjacencyListTextVertexInputFormat;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
/**
* @author hduser
*
*/
public class IdTextWithComplexValueInputFormat
extends
AdjacencyListTextVertexInputFormat<Text,
TextAndDoubleComplexWritable, DoubleWritable> {
@Override
public AdjacencyListTextVertexReader createVertexReader(InputSplit
split,
TaskAttemptContext context) {
return new TextComplexValueDoubleAdjacencyListVertexReader();
}
protected class TextComplexValueDoubleAdjacencyListVertexReader extends
AdjacencyListTextVertexReader {
/**
* Constructor with
* {@link AdjacencyListTextVertexInputFormat.LineSanitizer}.
*
* @param lineSanitizer
* the sanitizer to use for reading
*/
public TextComplexValueDoubleAdjacencyListVertexReader() {
super();
}
@Override
public Text decodeId(String s) {
return new Text(s);
}
@Override
public TextAndDoubleComplexWritable decodeValue(String s) {
TextAndDoubleComplexWritable valorComplejo = new
TextAndDoubleComplexWritable();
valorComplejo.setVertexData(Double.valueOf(s));
valorComplejo.setIds_vertices_anteriores("");
return valorComplejo;
}
@Override
public Edge<Text, DoubleWritable> decodeEdge(String s1, String s2) {
return EdgeFactory.create(new Text(s1),
new DoubleWritable(Double.valueOf(s2)));
}
}
}
My *Output format:*
package pruebas;
import java.io.IOException;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.io.formats.IdWithValueTextOutputFormat;
import org.apache.giraph.io.formats.TextVertexOutputFormat;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
/**
* @author hduser
*
*/
public class IdTextWithComplexValueOutputFormat
extends
TextVertexOutputFormat<Text, TextAndDoubleComplexWritable,
DoubleWritable> {
/** Specify the output delimiter */
public static final String LINE_TOKENIZE_VALUE = "output.delimiter";
/** Default output delimiter */
public static final String LINE_TOKENIZE_VALUE_DEFAULT = "\t";
/** Reverse id and value order? */
public static final String REVERSE_ID_AND_VALUE =
"reverse.id.and.value";
/** Default is to not reverse id and value order. */
public static final boolean REVERSE_ID_AND_VALUE_DEFAULT = false;
@Override
public TextVertexOutputFormat<Text, TextAndDoubleComplexWritable,
DoubleWritable>.TextVertexWriter createVertexWriter(
TaskAttemptContext context) throws IOException,
InterruptedException {
return new IdWithValueVertexWriter();
}
/**
* Vertex writer used with {@link IdWithValueTextOutputFormat}.
*/
protected class IdWithValueVertexWriter extends
TextVertexWriterToEachLine {
/** Saved delimiter */
private String delimiter;
/** Cached reserve option */
private boolean reverseOutput;
@Override
public void initialize(TaskAttemptContext context) throws
IOException,
InterruptedException {
super.initialize(context);
delimiter = getConf().get(LINE_TOKENIZE_VALUE,
LINE_TOKENIZE_VALUE_DEFAULT);
reverseOutput = getConf().getBoolean(REVERSE_ID_AND_VALUE,
REVERSE_ID_AND_VALUE_DEFAULT);
}
protected Text convertVertexToLine(
Vertex<Text, TextAndDoubleComplexWritable, DoubleWritable>
vertex)
throws IOException {
StringBuilder str = new StringBuilder();
if (reverseOutput) {
imprimirCaminosNavegacionales(vertex, str);
str.append(delimiter);
str.append(vertex.getId().toString());
} else {
str.append(vertex.getId().toString());
str.append(delimiter);
imprimirCaminosNavegacionales(vertex, str);
}
return new Text(str.toString());
}
private void imprimirCaminosNavegacionales(
Vertex<Text, TextAndDoubleComplexWritable, DoubleWritable>
vertex,
StringBuilder str) {
str.append(vertex.getId());
str.append(LINE_TOKENIZE_VALUE_DEFAULT);
for (String idVerticeAnterior : vertex.getValue()
.getIds_vertices_anteriores()
.split(LINE_TOKENIZE_VALUE_DEFAULT)) {
str.append(idVerticeAnterior + "/" + vertex.getId());
str.append(LINE_TOKENIZE_VALUE_DEFAULT);
}
str.append(vertex.getValue().toString());
}
}
}
*My input file:*
Portada 0.0 Sugerencias 1.0
Proverbios 0.0
Neil 0.0 Luna 1.0 ideal 1.0 verdad 1.0
Categoria:Ingenieros 2.0 Categoria:Estadounidenses 2.0
Categoria:Astronautas 2.0
Categoria:Ingenieros 1.0 Neil 2.0
Categoria:Estadounidenses 1.0 Neil 2.0
Categoria:Astronautas 1.0 Neil 2.0
and i execute it with this *command*:
$HADOOP_HOME/bin/yarn jar
$GIRAPH_HOME/giraph-examples/target/giraph-examples-1.1.0-for-hadoop-2.4.0-jar-with-dependencies.jar
org.apache.giraph.GiraphRunner
lectura_de_grafo.BusquedaDeCaminosNavegacionalesWikiquote -vif
pruebas.IdTextWithComplexValueInputFormat -vip
/user/hduser/input/wiki-graph-chiquito.txt -vof
pruebas.IdTextWithComplexValueOutputFormat -op
/user/hduser/output/caminosNavegacionales -w 2 -yh 250