Got the issue resolved.
1. I was not Anchoring to incoming tuple...so effectively, all the Bolts
after impactBolt , were not transactional. The ack of impact bolt was
causing spout's ack to be called. Proper DAG was not created.  So the
number I was seeing in WIP was not the true number of tuples that were
pending. Whole thing was confusing. After I put in proper anchoring and the
fix below...I can see the pendingTuples becoming zero occassionally...which
means ...it is working as expected.
2. I was not doing a *parts.put(...)* after *parts = new
HashMap<GlobalStreamId, Tuple>();* above (slip out). This was resulting in
leak.

Thanks,
Prasun


On Mon, May 19, 2014 at 1:12 AM, P Ghosh <[email protected]> wrote:

> I have a topology, that looks like
> *All Bolts emits Fields "id", "json"*
> *Spout emits only id*
> *All bolts/spout uses the stream <ComponentName>_stream name while
> emitting*
>
> *Topology Definition*
>
> *==========================================*
>
> builder.setSpout("citySpout", citySpout,10);
>
> builder.setBolt("impactBolt", impactBolt, 5).fieldsGrouping("citySpout",
> "citySpout_stream", new Fields("id"));
>
> builder.setBolt("restaurantBolt", restaurantBolt,
> 5).fieldsGrouping("impactBolt", "impactBolt_stream", new Fields("id"));
>
> builder.setBolt("theatresBolt", theatresBolt,
> 5).fieldsGrouping("impactBolt", "impactBolt_stream", new Fields("id"));
>
> builder.setBolt("libraryBolt", libraryBolt,
> 5).fieldsGrouping("impactBolt", "impactBolt_stream", new Fields("id"));
>
> builder.setBolt("transportBolt", transportBolt,
> 5).fieldsGrouping("impactBolt", "impactBolt_stream", new Fields("id"));
>
> builder.setBolt("crimeBolt", crimeBolt, 5).fieldsGrouping("impactBolt", "
> impactBolt_stream", new Fields("id"));
>
> builder.setBolt("combinerBolt", combinerBolt, 5)
>
>          .fieldsGrouping("restaurantBolt", "restaurantBolt_stream", new
> Fields("id"))
>
>         .fieldsGrouping("theatresBolt", "theatresBolt_stream", new
> Fields("id"))
>
>         .fieldsGrouping("libraryBolt", "libraryBolt_stream", new
> Fields("id"))
>
>         .fieldsGrouping("transportBolt", "transportBolt_stream", new
> Fields("id"))
>
>         .fieldsGrouping("crimeBolt", "crimeBolt_stream", new
> Fields("id"));
>
>
> *CombinerBolt*
>
> *============== *
>
> public class CombinerBolt extends BaseRichBolt {
>
> ...
>
> ...
>
>  public void execute(Tuple input) {
>
> String id = getId(tuple); //Gets the value corresponding to "id" from
> tuple
>
> List<Object> idList = Arrays.asList(new Object[] { id });
>
> GlobalStreamId streamId = new GlobalStreamId(tuple.getSourceComponent(),
> tuple.getSourceStreamId());
>
> Map<GlobalStreamId, Tuple> parts;
>
> if (!pendingTuples.containsKey(idList)) {
>
> parts = new HashMap<GlobalStreamId, Tuple>();
>
> pendingTuples.put(idList, parts);
>
> } else {
>
> parts = pendingTuples.get(idList);
>
> logger.debug("Pending Tuples [Count:" + pendingTuples.size() + "]");
>
> if (parts.containsKey(streamId)) {
>
> logger.warn("Received same side of single join twice, overriding");
>
> }
>
> parts.put(streamId, tuple);
>
> if (parts.size() == _numSources) { //_numSources is computed at
> prepare(..) using  context.getThisSources().size();
>
> pendingTuples.remove(idList);
>
> List<Tuple> tuples = new ArrayList<Tuple>(parts.values());
>
> try {
>
> processTuple(tuples); //This is where the actual document merging is done
>
> } catch (Exception exc) {
>
> logger.error("There was an exception processing Tuples [" + tuples + "]",
> exc);
>
> }
>
> }
>
>  }
>
> getOutputCollector().ack(tuple);
>
> }
>
> ...
>
>  ...
>
> }
>
> In my citySpout I have a work-In-Progress (WIP) set (in redis) [having a
> "set" ensures that we don't have multiple transactions for the same city at
> the same time], where every id (city) that is emitted is put in , and it is
> removed when corresponding ack or failed is invoked on spout.
>
> *1. *I'm seeing a lot of "Received same side of single join twice,
> overriding"... my expectation, was otherwise, as I'm acking without waiting
> for join...so there shouldn't be a lot of retry happening
>
> *2. *I deactivated the topology and could see the WIP going down to "0"
> in few seconds, however I continued to see my bolts working even when the
> WIP has nothing , for another few seconds..... items from WIP are removed
> only when an ACK/FAIL is received at Spout. Based on the details provided
> in
> https://github.com/nathanmarz/storm/wiki/Acking-framework-implementation , my 
> expectation was processing will stop the moment WIP is 0.
>
> I'm curious, why my bolts are getting data.
>
> I will continue to do more investigation. In the meant time  , if you see
> any glaring issue with this approach pls. let me know.
>
> Thanks,
> Prasun
>

Reply via email to