I have a topology, that looks like
*All Bolts emits Fields "id", "json"*
*Spout emits only id*
*All bolts/spout uses the stream <ComponentName>_stream name while emitting*
*Topology Definition*
*==========================================*
builder.setSpout("citySpout", citySpout,10);
builder.setBolt("impactBolt", impactBolt, 5).fieldsGrouping("citySpout",
"citySpout_stream", new Fields("id"));
builder.setBolt("restaurantBolt", restaurantBolt,
5).fieldsGrouping("impactBolt", "impactBolt_stream", new Fields("id"));
builder.setBolt("theatresBolt", theatresBolt,
5).fieldsGrouping("impactBolt", "impactBolt_stream", new Fields("id"));
builder.setBolt("libraryBolt", libraryBolt, 5).fieldsGrouping("impactBolt",
"impactBolt_stream", new Fields("id"));
builder.setBolt("transportBolt", transportBolt,
5).fieldsGrouping("impactBolt", "impactBolt_stream", new Fields("id"));
builder.setBolt("crimeBolt", crimeBolt, 5).fieldsGrouping("impactBolt", "
impactBolt_stream", new Fields("id"));
builder.setBolt("combinerBolt", combinerBolt, 5)
.fieldsGrouping("restaurantBolt", "restaurantBolt_stream", new
Fields("id"))
.fieldsGrouping("theatresBolt", "theatresBolt_stream", new
Fields("id"))
.fieldsGrouping("libraryBolt", "libraryBolt_stream", new
Fields("id"))
.fieldsGrouping("transportBolt", "transportBolt_stream", new
Fields("id"))
.fieldsGrouping("crimeBolt", "crimeBolt_stream", new Fields("id"));
*CombinerBolt*
*============== *
public class CombinerBolt extends BaseRichBolt {
...
...
public void execute(Tuple input) {
String id = getId(tuple); //Gets the value corresponding to "id" from tuple
List<Object> idList = Arrays.asList(new Object[] { id });
GlobalStreamId streamId = new GlobalStreamId(tuple.getSourceComponent(),
tuple.getSourceStreamId());
Map<GlobalStreamId, Tuple> parts;
if (!pendingTuples.containsKey(idList)) {
parts = new HashMap<GlobalStreamId, Tuple>();
pendingTuples.put(idList, parts);
} else {
parts = pendingTuples.get(idList);
logger.debug("Pending Tuples [Count:" + pendingTuples.size() + "]");
if (parts.containsKey(streamId)) {
logger.warn("Received same side of single join twice, overriding");
}
parts.put(streamId, tuple);
if (parts.size() == _numSources) { //_numSources is computed at prepare(..)
using context.getThisSources().size();
pendingTuples.remove(idList);
List<Tuple> tuples = new ArrayList<Tuple>(parts.values());
try {
processTuple(tuples); //This is where the actual document merging is done
} catch (Exception exc) {
logger.error("There was an exception processing Tuples [" + tuples + "]",
exc);
}
}
}
getOutputCollector().ack(tuple);
}
...
...
}
In my citySpout I have a work-In-Progress (WIP) set (in redis) [having a
"set" ensures that we don't have multiple transactions for the same city at
the same time], where every id (city) that is emitted is put in , and it is
removed when corresponding ack or failed is invoked on spout.
*1. *I'm seeing a lot of "Received same side of single join twice,
overriding"... my expectation, was otherwise, as I'm acking without waiting
for join...so there shouldn't be a lot of retry happening
*2. *I deactivated the topology and could see the WIP going down to "0" in
few seconds, however I continued to see my bolts working even when the WIP
has nothing , for another few seconds..... items from WIP are removed only
when an ACK/FAIL is received at Spout. Based on the details provided in
https://github.com/nathanmarz/storm/wiki/Acking-framework-implementation ,
my expectation was processing will stop the moment WIP is 0.
I'm curious, why my bolts are getting data.
I will continue to do more investigation. In the meant time , if you see
any glaring issue with this approach pls. let me know.
Thanks,
Prasun