You shouldn't use LinearDRPC, but use Trident + DRPC instead.
On Sat, Dec 21, 2013 at 1:38 PM, Ozone <[email protected]> wrote: > Hello, > > I'm trying to get a DRPC topology where > > NumberBolt(generated numbers) --> PartialCount (partial counts) --> Count > (counts the partial counts to give the sum) > > When I run the below attached code,I get the numbers only from the first > PartialCount bolt. > Suggestions on how to get this working ? > > Thanks > > >>>>>>>>>>>>>>>>>>>>>class storm.starter.NumberDRPC$NumberBolt::prepare > taskid :2 baseNumber : 6 > >>>>>>>>>>>>>>>>>>>>>class storm.starter.NumberDRPC$NumberBolt::prepare > taskid :3 baseNumber : 6 > >>>>>>>>>>>>>>>>>>>>>class storm.starter.NumberDRPC$NumberBolt::execute > taskid :3 BatchId : 5275468440815904402 Number : 9 > >>>>>>>>>>>>>>>>>>>>>class storm.starter.NumberDRPC$NumberBolt::execute > taskid :2 BatchId : 5275468440815904402 Number : 8 > >>>>>>>>>>>>>>>>>>>>>class > storm.starter.NumberDRPC$PartialCount::FinishBatch BatchId : > 5275468440815904402 PartialCount : 9 > >>>>>>>>>>>>>>>>>>>>>class > storm.starter.NumberDRPC$PartialCount::FinishBatch BatchId : > 5275468440815904402 PartialCount : 0 > >>>>>>>>>>>>>>>>>>>>>class storm.starter.NumberDRPC$Count::FinishBatch > BatchId : 5275468440815904402 Count : 9 > >>>>>>>>>>>>>>>>>>>>> results--> 9 > > > I was expecting the result to be 17, not 9 > > > Attached is the code.. > > package storm.starter; > > import java.util.Map; > > import backtype.storm.Config; > import backtype.storm.LocalCluster; > import backtype.storm.LocalDRPC; > import backtype.storm.coordination.BatchOutputCollector; > import backtype.storm.drpc.LinearDRPCTopologyBuilder; > import backtype.storm.task.TopologyContext; > import backtype.storm.topology.BasicOutputCollector; > import backtype.storm.topology.OutputFieldsDeclarer; > import backtype.storm.topology.base.BaseBasicBolt; > import backtype.storm.topology.base.BaseBatchBolt; > import backtype.storm.tuple.Fields; > import backtype.storm.tuple.Tuple; > import backtype.storm.tuple.Values; > > public class NumberDRPC { > > public static class NumberBolt extends BaseBasicBolt { > Integer taskIndex; > Integer baseNumber; > > @Override > public void prepare(Map stormConf, TopologyContext context) { > taskIndex = context.getThisTaskId(); > baseNumber = 6; > > System.out.println(">>>>>>>>>>>>>>>>>>>>>" + this.getClass() > + "::prepare taskid :" + taskIndex + " baseNumber : " > + baseNumber); > } > > @Override > public void execute(Tuple input, BasicOutputCollector collector) { > Object id = input.getValue(0); > > Integer v1 = baseNumber + taskIndex; > System.out.println(">>>>>>>>>>>>>>>>>>>>>" + this.getClass() > + "::execute taskid :" + taskIndex + " BatchId : " + id > + " Number : " + v1); > collector.emit(new Values(id, 6 + taskIndex)); > > } > > @Override > public void declareOutputFields(OutputFieldsDeclarer declarer) { > declarer.declare(new Fields("id", "number")); > } > > } > > public static class PartialCount extends BaseBatchBolt { > > BatchOutputCollector _collector; > Object _id; > Integer _count = 0; > Integer taskIndex; > > @Override > public void prepare(Map conf, TopologyContext context, > BatchOutputCollector collector, Object id) { > _collector = collector; > _id = id; > taskIndex = context.getThisTaskId(); > > } > > @Override > public void execute(Tuple tuple) { > _count += tuple.getInteger(1); > System.out.println(">>>>>>>>>>>>>>>>>>>>>" + this.getClass() > + "::execute taskid :" + taskIndex + " BatchId : " + _id + " Number : " > + _count); > > } > > @Override > public void finishBatch() { > _collector.emit(new Values(_id, _count)); > System.out.println(">>>>>>>>>>>>>>>>>>>>>" + this.getClass() > + "::FinishBatch taskid :" + taskIndex + " BatchId : " + _id + " > PartialCount : " > + _count); > > } > > @Override > public void declareOutputFields(OutputFieldsDeclarer declarer) { > declarer.declare(new Fields("id", "partial-count")); > } > } > > public static class Count extends BaseBatchBolt { > > BatchOutputCollector _collector; > Object _id; > Integer _count = 0; > > @Override > public void prepare(Map conf, TopologyContext context, > BatchOutputCollector collector, Object id) { > _collector = collector; > _id = id; > > } > > @Override > public void execute(Tuple tuple) { > _count += tuple.getInteger(1); > > } > > @Override > public void finishBatch() { > _collector.emit(new Values(_id, _count)); > System.out.println(">>>>>>>>>>>>>>>>>>>>>" + this.getClass() > + "::FinishBatch BatchId : " + _id + " Count : " > + _count); > > } > > @Override > public void declareOutputFields(OutputFieldsDeclarer declarer) { > declarer.declare(new Fields("id", "count")); > > } > > } > > public static void main(String[] args) throws Throwable { > > LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder( > "count"); > > builder.addBolt(new NumberBolt(), 2).allGrouping(); > builder.addBolt(new PartialCount(), 2).fieldsGrouping( > new Fields("id", "number")); > builder.addBolt(new Count()).fieldsGrouping( > new Fields("id", "partial-count")); > > Config conf = new Config(); > conf.setMaxTaskParallelism(2); > LocalDRPC drpc = new LocalDRPC(); > LocalCluster cluster = new LocalCluster(); > cluster.submitTopology("count-drpc", conf, > builder.createLocalTopology(drpc)); > > System.out.println(">>>>>>>>>>>>>>>>>>>>> results--> " > + drpc.execute("count", "")); > > Thread.sleep(1000); > cluster.shutdown(); > drpc.shutdown(); > > } > > } >
