Got the issue resolved. 1. I was not Anchoring to incoming tuple...so effectively, all the Bolts after impactBolt , were not transactional. The ack of impact bolt was causing spout's ack to be called. Proper DAG was not created. So the number I was seeing in WIP was not the true number of tuples that were pending. Whole thing was confusing. After I put in proper anchoring and the fix below...I can see the pendingTuples becoming zero occassionally...which means ...it is working as expected. 2. I was not doing a *parts.put(...)* after *parts = new HashMap<GlobalStreamId, Tuple>();* above (slip out). This was resulting in leak.
Thanks, Prasun On Mon, May 19, 2014 at 1:12 AM, P Ghosh <[email protected]> wrote: > I have a topology, that looks like > *All Bolts emits Fields "id", "json"* > *Spout emits only id* > *All bolts/spout uses the stream <ComponentName>_stream name while > emitting* > > *Topology Definition* > > *==========================================* > > builder.setSpout("citySpout", citySpout,10); > > builder.setBolt("impactBolt", impactBolt, 5).fieldsGrouping("citySpout", > "citySpout_stream", new Fields("id")); > > builder.setBolt("restaurantBolt", restaurantBolt, > 5).fieldsGrouping("impactBolt", "impactBolt_stream", new Fields("id")); > > builder.setBolt("theatresBolt", theatresBolt, > 5).fieldsGrouping("impactBolt", "impactBolt_stream", new Fields("id")); > > builder.setBolt("libraryBolt", libraryBolt, > 5).fieldsGrouping("impactBolt", "impactBolt_stream", new Fields("id")); > > builder.setBolt("transportBolt", transportBolt, > 5).fieldsGrouping("impactBolt", "impactBolt_stream", new Fields("id")); > > builder.setBolt("crimeBolt", crimeBolt, 5).fieldsGrouping("impactBolt", " > impactBolt_stream", new Fields("id")); > > builder.setBolt("combinerBolt", combinerBolt, 5) > > .fieldsGrouping("restaurantBolt", "restaurantBolt_stream", new > Fields("id")) > > .fieldsGrouping("theatresBolt", "theatresBolt_stream", new > Fields("id")) > > .fieldsGrouping("libraryBolt", "libraryBolt_stream", new > Fields("id")) > > .fieldsGrouping("transportBolt", "transportBolt_stream", new > Fields("id")) > > .fieldsGrouping("crimeBolt", "crimeBolt_stream", new > Fields("id")); > > > *CombinerBolt* > > *============== * > > public class CombinerBolt extends BaseRichBolt { > > ... > > ... > > public void execute(Tuple input) { > > String id = getId(tuple); //Gets the value corresponding to "id" from > tuple > > List<Object> idList = Arrays.asList(new Object[] { id }); > > GlobalStreamId streamId = new GlobalStreamId(tuple.getSourceComponent(), > tuple.getSourceStreamId()); > > Map<GlobalStreamId, Tuple> parts; > > if (!pendingTuples.containsKey(idList)) { > > parts = new HashMap<GlobalStreamId, Tuple>(); > > pendingTuples.put(idList, parts); > > } else { > > parts = pendingTuples.get(idList); > > logger.debug("Pending Tuples [Count:" + pendingTuples.size() + "]"); > > if (parts.containsKey(streamId)) { > > logger.warn("Received same side of single join twice, overriding"); > > } > > parts.put(streamId, tuple); > > if (parts.size() == _numSources) { //_numSources is computed at > prepare(..) using context.getThisSources().size(); > > pendingTuples.remove(idList); > > List<Tuple> tuples = new ArrayList<Tuple>(parts.values()); > > try { > > processTuple(tuples); //This is where the actual document merging is done > > } catch (Exception exc) { > > logger.error("There was an exception processing Tuples [" + tuples + "]", > exc); > > } > > } > > } > > getOutputCollector().ack(tuple); > > } > > ... > > ... > > } > > In my citySpout I have a work-In-Progress (WIP) set (in redis) [having a > "set" ensures that we don't have multiple transactions for the same city at > the same time], where every id (city) that is emitted is put in , and it is > removed when corresponding ack or failed is invoked on spout. > > *1. *I'm seeing a lot of "Received same side of single join twice, > overriding"... my expectation, was otherwise, as I'm acking without waiting > for join...so there shouldn't be a lot of retry happening > > *2. *I deactivated the topology and could see the WIP going down to "0" > in few seconds, however I continued to see my bolts working even when the > WIP has nothing , for another few seconds..... items from WIP are removed > only when an ACK/FAIL is received at Spout. Based on the details provided > in > https://github.com/nathanmarz/storm/wiki/Acking-framework-implementation , my > expectation was processing will stop the moment WIP is 0. > > I'm curious, why my bolts are getting data. > > I will continue to do more investigation. In the meant time , if you see > any glaring issue with this approach pls. let me know. > > Thanks, > Prasun >
