Hi, I have a giraph application that runs fine; however, when I add a MasterCompute object (definition following) all of the map tasks time out. I have hadoop configured to run with 8 map processes and giraph to use one worker.
Here's the definition of the MasterCompute object:
class BPMasterComputer extends MasterCompute{
override def compute() {
val agg =
getAggregator("VOTE_TO_STOP_AGG").asInstanceOf[BooleanAndAggregator]
val res = agg.getAggregatedValue.get
if (res) haltComputation
agg.setAggregatedValue(true)
}
override def initialize() {
registerAggregator("VOTE_TO_STOP_AGG", classOf[BooleanAndAggregator])
val agg =
getAggregator("VOTE_TO_STOP_AGG").asInstanceOf[BooleanAndAggregator]
agg.setAggregatedValue(true)
}
override def write(out: DataOutput) {}
override def readFields(in: DataInput) {}
}
(as far as I can tell, there is no state that needs to be read/written.) I
then register this class as the MasterCompute class in the giraph job:
job.setMasterComputeClass(classOf[BPMasterComputer])
and then use the aggregator in the compute method of my vertices:
class BPVertex extends EdgeListVertex[IntWritable, WrappedValue, Text,
PackagedMessage] with Loggable {
override def compute(msgs: java.util.Iterator[PackagedMessage]) {
...
var stop = false
val agg =
getAggregator("VOTE_TO_STOP_AGG").asInstanceOf[BooleanAndAggregator]
... code to modify stop and vote to halt ...
agg.aggregate(stop)
}
}
Is there some other method that I am not calling that I should? Or some step
that I'm missing? Any suggestions as to why/how these additions are causing
the processes to block would be appreciated!
Thanks,
Nick West
Benchmark Solutions
101 Park Avenue - 7th Floor
New York, NY 10178
Tel +1.212.220.4739 | Mobile +1.646.267.4324
www.benchmarksolutions.com <http://www.benchmarksolutions.com/>
[cid:[email protected]]
<<inline: image001.png>>
