You shouldn't use LinearDRPC, but use Trident + DRPC instead.

On Sat, Dec 21, 2013 at 1:38 PM, Ozone <[email protected]> wrote:

> Hello,
>
> I'm trying to get a DRPC topology where
>
> NumberBolt(generated numbers) --> PartialCount (partial counts) --> Count
> (counts the partial counts to give the sum)
>
> When I run the below attached code,I get the numbers only from the first
> PartialCount bolt.
> Suggestions on how to get this working ?
>
> Thanks
>
> >>>>>>>>>>>>>>>>>>>>>class storm.starter.NumberDRPC$NumberBolt::prepare
> taskid :2 baseNumber : 6
> >>>>>>>>>>>>>>>>>>>>>class storm.starter.NumberDRPC$NumberBolt::prepare
> taskid :3 baseNumber : 6
> >>>>>>>>>>>>>>>>>>>>>class storm.starter.NumberDRPC$NumberBolt::execute
> taskid :3  BatchId : 5275468440815904402 Number : 9
> >>>>>>>>>>>>>>>>>>>>>class storm.starter.NumberDRPC$NumberBolt::execute
> taskid :2  BatchId : 5275468440815904402 Number : 8
> >>>>>>>>>>>>>>>>>>>>>class
> storm.starter.NumberDRPC$PartialCount::FinishBatch BatchId :
> 5275468440815904402 PartialCount : 9
> >>>>>>>>>>>>>>>>>>>>>class
> storm.starter.NumberDRPC$PartialCount::FinishBatch BatchId :
> 5275468440815904402 PartialCount : 0
> >>>>>>>>>>>>>>>>>>>>>class storm.starter.NumberDRPC$Count::FinishBatch
> BatchId : 5275468440815904402 Count : 9
> >>>>>>>>>>>>>>>>>>>>> results--> 9
>
>
> I was expecting the result to be 17, not 9
>
>
> Attached is the code..
>
> package storm.starter;
>
> import java.util.Map;
>
> import backtype.storm.Config;
> import backtype.storm.LocalCluster;
> import backtype.storm.LocalDRPC;
> import backtype.storm.coordination.BatchOutputCollector;
> import backtype.storm.drpc.LinearDRPCTopologyBuilder;
> import backtype.storm.task.TopologyContext;
> import backtype.storm.topology.BasicOutputCollector;
> import backtype.storm.topology.OutputFieldsDeclarer;
> import backtype.storm.topology.base.BaseBasicBolt;
> import backtype.storm.topology.base.BaseBatchBolt;
> import backtype.storm.tuple.Fields;
> import backtype.storm.tuple.Tuple;
> import backtype.storm.tuple.Values;
>
> public class NumberDRPC {
>
> public static class NumberBolt extends BaseBasicBolt {
> Integer taskIndex;
> Integer baseNumber;
>
> @Override
> public void prepare(Map stormConf, TopologyContext context) {
> taskIndex = context.getThisTaskId();
>  baseNumber = 6;
>
> System.out.println(">>>>>>>>>>>>>>>>>>>>>" + this.getClass()
>  + "::prepare taskid :" + taskIndex + " baseNumber : "
> + baseNumber);
> }
>
> @Override
> public void execute(Tuple input, BasicOutputCollector collector) {
> Object id = input.getValue(0);
>
> Integer v1 = baseNumber + taskIndex;
> System.out.println(">>>>>>>>>>>>>>>>>>>>>" + this.getClass()
>  + "::execute taskid :" + taskIndex + "  BatchId : " + id
> + " Number : " + v1);
>  collector.emit(new Values(id, 6 + taskIndex));
>
> }
>
> @Override
>  public void declareOutputFields(OutputFieldsDeclarer declarer) {
> declarer.declare(new Fields("id", "number"));
>  }
>
> }
>
> public static class PartialCount extends BaseBatchBolt {
>
> BatchOutputCollector _collector;
> Object _id;
> Integer _count = 0;
>  Integer taskIndex;
>
> @Override
> public void prepare(Map conf, TopologyContext context,
>  BatchOutputCollector collector, Object id) {
> _collector = collector;
> _id = id;
>  taskIndex = context.getThisTaskId();
>
> }
>
> @Override
>  public void execute(Tuple tuple) {
> _count += tuple.getInteger(1);
> System.out.println(">>>>>>>>>>>>>>>>>>>>>" + this.getClass()
>  + "::execute taskid :" + taskIndex + " BatchId : " + _id + " Number : "
> + _count);
>
> }
>
> @Override
> public void finishBatch() {
>  _collector.emit(new Values(_id, _count));
> System.out.println(">>>>>>>>>>>>>>>>>>>>>" + this.getClass()
>  + "::FinishBatch taskid :" + taskIndex + " BatchId : " + _id + "
> PartialCount : "
> + _count);
>
> }
>
> @Override
> public void declareOutputFields(OutputFieldsDeclarer declarer) {
>  declarer.declare(new Fields("id", "partial-count"));
> }
> }
>
> public static class Count extends BaseBatchBolt {
>
> BatchOutputCollector _collector;
>  Object _id;
> Integer _count = 0;
>
> @Override
> public void prepare(Map conf, TopologyContext context,
>  BatchOutputCollector collector, Object id) {
> _collector = collector;
> _id = id;
>
> }
>
> @Override
> public void execute(Tuple tuple) {
>  _count += tuple.getInteger(1);
>
> }
>
> @Override
>  public void finishBatch() {
> _collector.emit(new Values(_id, _count));
> System.out.println(">>>>>>>>>>>>>>>>>>>>>" + this.getClass()
>  + "::FinishBatch BatchId : " + _id + " Count : "
> + _count);
>
> }
>
> @Override
> public void declareOutputFields(OutputFieldsDeclarer declarer) {
> declarer.declare(new Fields("id", "count"));
>
> }
>
> }
>
> public static void main(String[] args) throws Throwable {
>
> LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder(
> "count");
>
>  builder.addBolt(new NumberBolt(), 2).allGrouping();
> builder.addBolt(new PartialCount(), 2).fieldsGrouping(
>  new Fields("id", "number"));
> builder.addBolt(new Count()).fieldsGrouping(
> new Fields("id", "partial-count"));
>
> Config conf = new Config();
> conf.setMaxTaskParallelism(2);
> LocalDRPC drpc = new LocalDRPC();
>  LocalCluster cluster = new LocalCluster();
> cluster.submitTopology("count-drpc", conf,
> builder.createLocalTopology(drpc));
>
> System.out.println(">>>>>>>>>>>>>>>>>>>>> results--> "
> + drpc.execute("count", ""));
>
> Thread.sleep(1000);
> cluster.shutdown();
> drpc.shutdown();
>
> }
>
> }
>

Reply via email to