I have a topology, that looks like
*All Bolts emits Fields "id", "json"*
*Spout emits only id*
*All bolts/spout uses the stream <ComponentName>_stream name while emitting*

*Topology Definition*

*==========================================*

builder.setSpout("citySpout", citySpout,10);

builder.setBolt("impactBolt", impactBolt, 5).fieldsGrouping("citySpout",
"citySpout_stream", new Fields("id"));

builder.setBolt("restaurantBolt", restaurantBolt,
5).fieldsGrouping("impactBolt", "impactBolt_stream", new Fields("id"));

builder.setBolt("theatresBolt", theatresBolt,
5).fieldsGrouping("impactBolt", "impactBolt_stream", new Fields("id"));

builder.setBolt("libraryBolt", libraryBolt, 5).fieldsGrouping("impactBolt",
"impactBolt_stream", new Fields("id"));

builder.setBolt("transportBolt", transportBolt,
5).fieldsGrouping("impactBolt", "impactBolt_stream", new Fields("id"));

builder.setBolt("crimeBolt", crimeBolt, 5).fieldsGrouping("impactBolt", "
impactBolt_stream", new Fields("id"));

builder.setBolt("combinerBolt", combinerBolt, 5)

         .fieldsGrouping("restaurantBolt", "restaurantBolt_stream", new
Fields("id"))

        .fieldsGrouping("theatresBolt", "theatresBolt_stream", new
Fields("id"))

        .fieldsGrouping("libraryBolt", "libraryBolt_stream", new
Fields("id"))

        .fieldsGrouping("transportBolt", "transportBolt_stream", new
Fields("id"))

        .fieldsGrouping("crimeBolt", "crimeBolt_stream", new Fields("id"));


*CombinerBolt*

*============== *

public class CombinerBolt extends BaseRichBolt {

...

...

public void execute(Tuple input) {

String id = getId(tuple); //Gets the value corresponding to "id" from tuple

List<Object> idList = Arrays.asList(new Object[] { id });

GlobalStreamId streamId = new GlobalStreamId(tuple.getSourceComponent(),
tuple.getSourceStreamId());

Map<GlobalStreamId, Tuple> parts;

if (!pendingTuples.containsKey(idList)) {

parts = new HashMap<GlobalStreamId, Tuple>();

pendingTuples.put(idList, parts);

} else {

parts = pendingTuples.get(idList);

logger.debug("Pending Tuples [Count:" + pendingTuples.size() + "]");

if (parts.containsKey(streamId)) {

logger.warn("Received same side of single join twice, overriding");

}

parts.put(streamId, tuple);

if (parts.size() == _numSources) { //_numSources is computed at prepare(..)
using  context.getThisSources().size();

pendingTuples.remove(idList);

List<Tuple> tuples = new ArrayList<Tuple>(parts.values());

try {

processTuple(tuples); //This is where the actual document merging is done

} catch (Exception exc) {

logger.error("There was an exception processing Tuples [" + tuples + "]",
exc);

}

}

}

getOutputCollector().ack(tuple);

}

...

...

}

In my citySpout I have a work-In-Progress (WIP) set (in redis) [having a
"set" ensures that we don't have multiple transactions for the same city at
the same time], where every id (city) that is emitted is put in , and it is
removed when corresponding ack or failed is invoked on spout.

*1. *I'm seeing a lot of "Received same side of single join twice,
overriding"... my expectation, was otherwise, as I'm acking without waiting
for join...so there shouldn't be a lot of retry happening

*2. *I deactivated the topology and could see the WIP going down to "0" in
few seconds, however I continued to see my bolts working even when the WIP
has nothing , for another few seconds..... items from WIP are removed only
when an ACK/FAIL is received at Spout. Based on the details provided in
https://github.com/nathanmarz/storm/wiki/Acking-framework-implementation  ,
my expectation was processing will stop the moment WIP is 0.

I'm curious, why my bolts are getting data.

I will continue to do more investigation. In the meant time  , if you see
any glaring issue with this approach pls. let me know.

Thanks,
Prasun

Reply via email to