Hello everyone,
I ran into some issues while trying to figure out how to correctly use
aggregators, since I want to implement a global priority queue that
"schedules" processing on vertices. As a simple test to better
understand aggregator useage I ended up modifying the SimpleShortestPathsVertex
example and added the SumAggregator code
from the SimplePageRankVertex example to it (Workercontext and
compute()) (code posted below).
Though this test code does not do anything useful I was surprised to
see the following worker NullPointerExceptions during execution.
2012-05-23 14:44:59,267 INFO
org.apache.hadoop.mapred.TaskLogsTruncater: Initializing logs'
truncater with mapRetainSize=-1 and reduceRetainSize=-1
2012-05-23 14:44:59,469 INFO org.apache.hadoop.io.nativeio.NativeIO:
Initialized cache for UID to User mapping with a cache timeout of
14400 seconds.
2012-05-23 14:44:59,470 INFO org.apache.hadoop.io.nativeio.NativeIO:
Got UserName hadoop00 for UID 508 from the native implementation
2012-05-23 14:44:59,472 WARN org.apache.hadoop.mapred.Child: Error running child
java.lang.NullPointerException
at
org.apache.giraph.examples.linda.LindaAggregatorTest.compute(LindaAggregatorTest.java:104)
at org.apache.giraph.graph.GraphMapper.map(GraphMapper.java:593)
at org.apache.giraph.graph.GraphMapper.run(GraphMapper.java:648)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:369)
at org.apache.hadoop.mapred.Child$4.run(Child.java:259)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059)
at org.apache.hadoop.mapred.Child.main(Child.java:253)
2012-05-23 14:44:59,475 INFO org.apache.hadoop.mapred.Task: Runnning
cleanup for the task
So my question is. What are the pitfalls (method call order, setup,
superstep count) of aggregator usage, as following the description in
useAggregator did
not seem to help, so I am obviously missing some detail.
JAVA Code:
@Override
public void compute(Iterator<DoubleWritable> msgIterator) {
LongSumAggregator sumAggreg = (LongSumAggregator) getAggregator("sum");
if (getSuperstep() == 0) {
setVertexValue(new DoubleWritable(Double.MAX_VALUE));
}
double minDist = isSource() ? 0d : Double.MAX_VALUE;
while (msgIterator.hasNext()) {
minDist = Math.min(minDist, msgIterator.next().get());
}
if (LOG.isDebugEnabled()) {
LOG.debug("Vertex " + getVertexId() + " got minDist = " + minDist +
" vertex value = " + getVertexValue());
}
if (getSuperstep() >= 0) {
sumAggreg.aggregate(1L); // NPE at Line 104
}
if (minDist < getVertexValue().get()) {
setVertexValue(new DoubleWritable(minDist));
for (LongWritable targetVertexId : this) {
FloatWritable edgeValue = getEdgeValue(targetVertexId);
if (LOG.isDebugEnabled()) {
LOG.debug("Vertex " + getVertexId() + " sent to " +
targetVertexId + " = " +
(minDist + edgeValue.get()));
}
sendMsg(targetVertexId,
new DoubleWritable(minDist + edgeValue.get()));
}
}
voteToHalt();
}
public static class MyVertexWorkerContext extends
WorkerContext {
/** Final sum value for verification for local jobs */
private static long FINAL_SUM;
public static long getFinalSum() {
return FINAL_SUM;
}
@Override
public void preApplication()
throws InstantiationException, IllegalAccessException {
registerAggregator("sum", LongSumAggregator.class);
}
@Override
public void postApplication() {
System.out.println("PreApp");
LongSumAggregator sumAggreg =
(LongSumAggregator) getAggregator("sum");
FINAL_SUM = sumAggreg.getAggregatedValue().get();
LOG.info("aggregatedNumVertices=" + FINAL_SUM);
}
@Override
public void preSuperstep() {
System.out.println("PreSuperStep");
LongSumAggregator sumAggreg =
(LongSumAggregator) getAggregator("sum");
this.useAggregator("sum");
sumAggreg.setAggregatedValue(new LongWritable(0L));
}
@Override
public void postSuperstep() {}
}
Rest as in SimpleShortestPathsVertex.
Regards,
Nils
--
NEU: FreePhone 3-fach-Flat mit kostenlosem Smartphone!
Jetzt informieren: http://mobile.1und1.de/?ac=OM.PW.PW003K20328T7073a