+1 Michael Rose Hysterix CB is the way to go if REST APIs are invoked from topology
*Timeout in topology becomes controllable :)* Regards Sai On Thu, Oct 23, 2014 at 1:57 PM, Michael Rose <[email protected]> wrote: > It's a last ditch mechanism for replaying work which might have gotten > stuck. Storm is an at-least-once processing system and doesn't aim to > provide exactly once / transactional behavior with base Storm. Trident aims > to implement that on top of the underlying at-least-once system. > > Timed out in-flight tuples will *not* be cleared, this is true. > Controlling latencies within a topology is a key to making Storm work. We > have IO work isolated by Hystrix commands to ensure we're always coming in > under our timeout period. We've experimented with using global streams to > "kill" a particular tuple tree, essentially adding some unique work to a > time-based cache to drop it at each bolt. It ultimately wasn't really > necessary by instead improving the consistency of external IO through > circuit breaking. > > If you take away nothing else, remember that Storm is at least once > processing. Its goal is to ensure processing eventually happens for > everything, no matter how many times it might take. It's up to you to > remove bad input or park it. > > Michael Rose (@Xorlev <https://twitter.com/xorlev>) > Senior Platform Engineer, FullContact <http://www.fullcontact.com/> > [email protected] > > On Thu, Oct 23, 2014 at 2:00 PM, Sam Mati <[email protected]> wrote: > >> Hi all. I'm hoping somebody can explain this behavior to me because it >> seems pretty unexpected. >> >> I'm seeing that timing out tuples does *nothing* except call "fail" on >> the Spout. The tuple itself will still be processed through the Topology, >> except acking/failing will have no effect. Another problem is that the >> number of pending tuples will increase — timed out tuples do not count as >> pending even though they will flow through the topology. Unless I'm >> missing something, these two combined problems make timing out tuples, at >> best. utterly pointless, and at worst very problematic (as it will just >> throw more tuples into a topology that is already maxed out). >> >> Here's my topology: >> - I have a spout. On nextTuple, it either re-emits a tuple that has >> failed, and if none are present, creates a new tuple. >> - I have a bolt that takes 4 seconds to ack a tuple. >> - topology.max.spout.pending = 5 >> - topology.message.timeout.secs = 5 >> >> I would expect 1 or 2 tuples to get acked, and 4 or 3 tuples to timeout >> — then the Bolt would next process the *resent* tuples. Over time, more >> and more tuples would be acked (though they would frequently time out). >> >> What I'm seeing instead is that even though tuples are timed-out, they >> are still being processed by the Bolt. I'm assuming there is buffer/queue >> for the Bolt, and that timed-out tuples are not cleared from it. >> Regardless, this leads to all tuples timing out, since the Bolt will >> eventually only process tuples that have been timed out. >> >> I'm assuming, and hoping, that I'm missing something obvious here… >> >> Two questions: >> 1. Can I prevent Bolts from processing already-timed-out tuples? >> 2. What is the point of timing out tuples? It does *nothing* but call >> *fail* on the Spout even though the tuple will still be processed by the >> rest of the Topology! >> >> Thanks, >> -Sam >> >> >> Spout: >> public class SampleSpout extends BaseRichSpout { >> private static Logger logger = >> LoggerFactory.getLogger(SampleSpout.class); >> >> SpoutOutputCollector collector; >> Map<Integer, List<Object>> pending_map = new HashMap<Integer, >> List<Object>>(); >> Queue<List<Object>> replay_queue = new >> LinkedBlockingQueue<List<Object>>(); >> >> int contentCounter; >> int curMsgId; >> >> @Override >> public void declareOutputFields(OutputFieldsDeclarer declarer) { >> // unique-id always increments each time we emit. >> // msg-id gets incremented only when new tuples are created. >> declarer.declare(new Fields("msg-id", "content")); >> } >> >> @Override >> public void open(Map conf, TopologyContext context, >> SpoutOutputCollector spoutOutputCollector) { >> collector = spoutOutputCollector; >> } >> >> @Override >> public void nextTuple() { >> // either replay a failed tuple, or create a new one >> List<Object> tuple = null; >> if (replay_queue.size() > 0){ >> tuple = replay_queue.poll(); >> }else{ >> tuple = new ArrayList<Object>(); >> tuple.add(null); >> tuple.add("Content #" + contentCounter++); >> } >> >> // increment msgId and set it as the first item in the tuple >> int msgId = this.curMsgId++; >> tuple.set(0, msgId); >> logger.info("Emitting: " + tuple); >> // add this tuple to the 'pending' map, and emit it. >> pending_map.put(msgId, tuple); >> collector.emit(tuple, msgId); >> Utils.sleep(100); >> } >> >> @Override >> public void ack(Object msgId){ >> // remove tuple from pending_map since it's no longer pending >> List<Object> acked_tuple = pending_map.remove(msgId); >> logger.info("Acked: " + acked_tuple); >> } >> >> @Override >> public void fail(Object msgId){ >> // remove tuple from pending_map since it's no longer pending >> List<Object> failed_tuple = pending_map.remove(msgId); >> logger.info("Failed: " + failed_tuple); >> >> // put a copy into the replay queue >> ArrayList<Object> copy = new ArrayList<Object>(failed_tuple); >> replay_queue.add(copy); >> } >> } >> >> >> Bolt: >> public class SamplePrintBolt extends BaseRichBolt { >> >> private static Logger logger = >> LoggerFactory.getLogger(SamplePrintBolt.class); >> >> OutputCollector collector; >> >> @Override >> public void prepare(Map stormConf, TopologyContext context, >> OutputCollector outputCollector) { >> collector = outputCollector; >> } >> >> @Override >> public void execute(Tuple input) { >> logger.info("I see: " + input.getValues()); >> Utils.sleep(4000); >> logger.info("Done sleeping. Acking: " + input.getValues()); >> collector.ack(input); >> } >> >> @Override >> public void declareOutputFields(OutputFieldsDeclarer declarer) { >> // doesn't emit >> } >> } >> >> >> Main: >> public static void main(String[] args) throws Exception { >> Config conf = new Config(); >> conf.setMaxSpoutPending(5); >> conf.setMessageTimeoutSecs(5); >> >> TopologyBuilder builder = new TopologyBuilder(); >> builder.setSpout("spout", new SampleSpout()); >> builder.setBolt("bolt1", new >> SamplePrintBolt()).shuffleGrouping("spout"); >> >> LocalCluster cluster = new LocalCluster(); >> cluster.submitTopology("local", conf, builder.createTopology()); >> } >> >> >> Output: >> 30084 [Thread-10-spout] INFO >> com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [0, Content >> #0] >> 30085 [Thread-8-bolt1] INFO >> com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [0, Content >> #0]. Will now sleep... >> 30097 [Thread-10-spout] INFO >> com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [1, Content >> #1] >> 30097 [Thread-10-spout] INFO >> com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [2, Content >> #2] >> 30097 [Thread-10-spout] INFO >> com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [3, Content >> #3] >> 30097 [Thread-10-spout] INFO >> com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [4, Content >> #4] >> 34086 [Thread-8-bolt1] INFO >> com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. >> Acking: [0, Content #0] >> 34086 [Thread-8-bolt1] INFO >> com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [1, Content >> #1]. Will now sleep... >> 34087 [Thread-10-spout] INFO >> com.appnexus.bsg.billing.storminator.SampleSpout - Acked: [0, Content #0] >> 34087 [Thread-10-spout] INFO >> com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [5, Content >> #5] >> 38087 [Thread-8-bolt1] INFO >> com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. >> Acking: [1, Content #1] >> 38087 [Thread-8-bolt1] INFO >> com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [2, Content >> #2]. Will now sleep... >> 38089 [Thread-10-spout] INFO >> com.appnexus.bsg.billing.storminator.SampleSpout - Acked: [1, Content #1] >> 38089 [Thread-10-spout] INFO >> com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [6, Content >> #6] >> *-- So far, so good… however, now it's time for things to timeout.* >> 40082 [Thread-10-spout] INFO >> com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [5, Content #5] >> 40082 [Thread-10-spout] INFO >> com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [4, Content #4] >> 40082 [Thread-10-spout] INFO >> com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [3, Content #3] >> 40083 [Thread-10-spout] INFO >> com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [2, Content #2] >> 40083 [Thread-10-spout] INFO >> com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [7, Content >> #5] >> 40084 [Thread-10-spout] INFO >> com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [8, Content >> #4] >> 40084 [Thread-10-spout] INFO >> com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [9, Content >> #3] >> 40085 [Thread-10-spout] INFO >> com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [10, Content >> #2] >> 42088 [Thread-8-bolt1] INFO >> com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. >> Acking: [2, Content #2] >> *-- Acking a timed-out tuple… this does nothing.* >> 42088 [Thread-8-bolt1] INFO >> com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [3, Content >> #3]. Will now sleep… >> *-- Why is it looking at tuple #3? This has already failed.* >> 45084 [Thread-10-spout] INFO >> com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [6, Content #6] >> 45085 [Thread-10-spout] INFO >> com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [11, Content >> #6] >> 46089 [Thread-8-bolt1] INFO >> com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. >> Acking: [3, Content #3] >> 46089 [Thread-8-bolt1] INFO >> com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [4, Content >> #4]. Will now sleep... >> 50084 [Thread-10-spout] INFO >> com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [10, Content #2] >> 50085 [Thread-10-spout] INFO >> com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [7, Content #5] >> 50085 [Thread-10-spout] INFO >> com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [8, Content #4] >> 50085 [Thread-10-spout] INFO >> com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [9, Content #3] >> 50085 [Thread-10-spout] INFO >> com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [12, Content >> #2] >> 50085 [Thread-10-spout] INFO >> com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [13, Content >> #5] >> 50085 [Thread-10-spout] INFO >> com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [14, Content >> #4] >> 50085 [Thread-10-spout] INFO >> com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [15, Content >> #3] >> *-- More timeouts**…* >> 50090 [Thread-8-bolt1] INFO >> com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. >> Acking: [4, Content #4] >> 50090 [Thread-8-bolt1] INFO >> com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [5, Content >> #5]. Will now sleep... >> 54091 [Thread-8-bolt1] INFO >> com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. >> Acking: [5, Content #5] >> *-- Yet the Bolt looks at tuple #5 which timed out 15 seconds ago…* >> 54091 [Thread-8-bolt1] INFO >> com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [6, Content >> #6]. Will now sleep... >> 55085 [Thread-10-spout] INFO >> com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [11, Content #6] >> 55085 [Thread-10-spout] INFO >> com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [16, Content >> #6] >> 58091 [Thread-8-bolt1] INFO >> com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. >> Acking: [6, Content #6] >> 58092 [Thread-8-bolt1] INFO >> com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [7, Content >> #5]. Will now sleep... >> 60085 [Thread-10-spout] INFO >> com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [15, Content #3] >> 60086 [Thread-10-spout] INFO >> com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [12, Content #2] >> 60086 [Thread-10-spout] INFO >> com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [13, Content #5] >> 60086 [Thread-10-spout] INFO >> com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [14, Content #4] >> 60086 [Thread-10-spout] INFO >> com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [17, Content >> #3] >> 60086 [Thread-10-spout] INFO >> com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [18, Content >> #2] >> 60086 [Thread-10-spout] INFO >> com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [19, Content >> #5] >> 60086 [Thread-10-spout] INFO >> com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [20, Content >> #4] >> 62093 [Thread-8-bolt1] INFO >> com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. >> Acking: [7, Content #5] >> 62093 [Thread-8-bolt1] INFO >> com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [8, Content >> #4]. Will now sleep… >> *-- It's clear that the Bolt looks at tuples even if they have >> timed-out. It's queue will get longer and longer and tuples will always >> timeout.* >> 65086 [Thread-10-spout] INFO >> com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [16, Content #6] >> 65087 [Thread-10-spout] INFO >> com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [21, Content >> #6] >> 66094 [Thread-8-bolt1] INFO >> com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. >> Acking: [8, Content #4] >> 66094 [Thread-8-bolt1] INFO >> com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [9, Content >> #3]. Will now sleep... >> 70087 [Thread-10-spout] INFO >> com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [20, Content #4] >> 70087 [Thread-10-spout] INFO >> com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [19, Content #5] >> 70087 [Thread-10-spout] INFO >> com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [18, Content #2] >> 70088 [Thread-10-spout] INFO >> com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [17, Content #3] >> 70088 [Thread-10-spout] INFO >> com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [22, Content >> #4] >> 70088 [Thread-10-spout] INFO >> com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [23, Content >> #5] >> 70088 [Thread-10-spout] INFO >> com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [24, Content >> #2] >> 70088 [Thread-10-spout] INFO >> com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [25, Content >> #3] >> 70095 [Thread-8-bolt1] INFO >> com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. >> Acking: [9, Content #3] >> 70095 [Thread-8-bolt1] INFO >> com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [10, Content >> #2]. Will now sleep... >> 74096 [Thread-8-bolt1] INFO >> com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. >> Acking: [10, Content #2] >> 74096 [Thread-8-bolt1] INFO >> com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [11, Content >> #6]. Will now sleep... >> 75088 [Thread-10-spout] INFO >> com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [21, Content #6] >> 75088 [Thread-10-spout] INFO >> com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [26, Content >> #6] >> 78097 [Thread-8-bolt1] INFO >> com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. >> Acking: [11, Content #6] >> 78097 [Thread-8-bolt1] INFO >> com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [12, Content >> #2]. Will now sleep... >> 80087 [Thread-10-spout] INFO >> com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [25, Content #3] >> 80087 [Thread-10-spout] INFO >> com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [24, Content #2] >> 80087 [Thread-10-spout] INFO >> com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [23, Content #5] >> 80087 [Thread-10-spout] INFO >> com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [22, Content #4] >> 80087 [Thread-10-spout] INFO >> com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [27, Content >> #3] >> 80087 [Thread-10-spout] INFO >> com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [28, Content >> #2] >> 80088 [Thread-10-spout] INFO >> com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [29, Content >> #5] >> 80088 [Thread-10-spout] INFO >> com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [30, Content >> #4] >> 82098 [Thread-8-bolt1] INFO >> com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. >> Acking: [12, Content #2] >> 82098 [Thread-8-bolt1] INFO >> com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [13, Content >> #5]. Will now sleep... >> 85088 [Thread-10-spout] INFO >> com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [26, Content #6] >> 85088 [Thread-10-spout] INFO >> com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [31, Content >> #6] >> 86098 [Thread-8-bolt1] INFO >> com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. >> Acking: [13, Content #5] >> 86099 [Thread-8-bolt1] INFO >> com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [14, Content >> #4]. Will now sleep... >> 90100 [Thread-8-bolt1] INFO >> com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. >> Acking: [14, Content #4] >> 90101 [Thread-8-bolt1] INFO >> com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [15, Content >> #3]. Will now sleep... >> 90216 [Thread-10-spout] INFO >> com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [29, Content #5] >> 90216 [Thread-10-spout] INFO >> com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [30, Content #4] >> 90216 [Thread-10-spout] INFO >> com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [28, Content #2] >> 90217 [Thread-10-spout] INFO >> com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [27, Content #3] >> 90217 [Thread-10-spout] INFO >> com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [32, Content >> #5] >> 90217 [Thread-10-spout] INFO >> com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [33, Content >> #4] >> 90217 [Thread-10-spout] INFO >> com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [34, Content >> #2] >> 90217 [Thread-10-spout] INFO >> com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [35, Content >> #3] >> 94101 [Thread-8-bolt1] INFO >> com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. >> Acking: [15, Content #3] >> 94101 [Thread-8-bolt1] INFO >> com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [16, Content >> #6]. Will now sleep… >> *-- Problem gets exacerbated… Bolt is now looking at tuples that have >> failed 30 seconds ago.* >> > >
