Hi! Trident is a Transactional Topology coordinated by micro-batch. In this case you set MAX_BATCH_SIZE_CONF to 100, batch size is set up by 100. So first 5 lines represent first batch (100 datas emitted from CSVSpout), and rest lines represent second batch.
You may want to use State to persist and query your intermediate result when you want to aggregate whole batches. Hope this helps. Thanks! Jungtaek Lim (HeartSaVioR) 2015-06-03 5:55 GMT+09:00 Ashish Soni <[email protected]>: > Hi All , > > Can someone please help me why trident is printing count separately for > same account > Please see below for the account 01524655555 instead of printing total as > 21 it is printing total in 2 steps 4 and 17 > > > > [01822777777, 21] > [01959111111, 20] > [01697799999, 21] > [05051111111, 21] > [01524655555, 17] > [01209222222, 21] > [01723888888, 21] > [01339500000, 21] > [01524655555, 4] > > TridentTopology topology = new TridentTopology(); > topology.newStream("cdrevent", new > CSVSpout("C:\\Users\\eassoni\\Downloads\\testdata.csv", ',', false)). > groupBy(new Fields("field_1")) > .aggregate(new Fields("field_1"), new Count(),new Fields("count")) > .parallelismHint(5).each(new Fields("field_1","count"), new > Utils.PrintFilter()).parallelismHint(5); > Config config = new Config(); > config.put(RichSpoutBatchExecutor.MAX_BATCH_SIZE_CONF, 100); > LocalCluster cluster = new LocalCluster(); > cluster.submitTopology("cdreventTopology", config, topology.build()); > backtype.storm.utils.Utils.sleep(10000); > cluster.killTopology("cdreventTopology"); > -- Name : 임 정택 Blog : http://www.heartsavior.net / http://dev.heartsavior.net Twitter : http://twitter.com/heartsavior LinkedIn : http://www.linkedin.com/in/heartsavior
