+1 Michael Rose Hysterix CB is the way to go if REST APIs are invoked from
topology

*Timeout in topology becomes controllable :)*


Regards
Sai

On Thu, Oct 23, 2014 at 1:57 PM, Michael Rose <[email protected]>
wrote:

> It's a last ditch mechanism for replaying work which might have gotten
> stuck. Storm is an at-least-once processing system and doesn't aim to
> provide exactly once / transactional behavior with base Storm. Trident aims
> to implement that on top of the underlying at-least-once system.
>
> Timed out in-flight tuples will *not* be cleared, this is true.
> Controlling latencies within a topology is a key to making Storm work. We
> have IO work isolated by Hystrix commands to ensure we're always coming in
> under our timeout period. We've experimented with using global streams to
> "kill" a particular tuple tree, essentially adding some unique work to a
> time-based cache to drop it at each bolt. It ultimately wasn't really
> necessary by instead improving the consistency of external IO through
> circuit breaking.
>
> If you take away nothing else, remember that Storm is at least once
> processing. Its goal is to ensure processing eventually happens for
> everything, no matter how many times it might take. It's up to you to
> remove bad input or park it.
>
> Michael Rose (@Xorlev <https://twitter.com/xorlev>)
> Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
> [email protected]
>
> On Thu, Oct 23, 2014 at 2:00 PM, Sam Mati <[email protected]> wrote:
>
>>  Hi all.  I'm hoping somebody can explain this behavior to me because it
>> seems pretty unexpected.
>>
>>  I'm seeing that timing out tuples does *nothing* except call "fail" on
>> the Spout.  The tuple itself will still be processed through the Topology,
>> except acking/failing will have no effect.  Another problem is that the
>> number of pending tuples will increase — timed out tuples do not count as
>> pending even though they will flow through the topology.  Unless I'm
>> missing something, these two combined problems make timing out tuples, at
>> best. utterly pointless, and at worst very problematic (as it will just
>> throw more tuples into a topology that is already maxed out).
>>
>>  Here's my topology:
>>  - I have a spout.  On nextTuple, it either re-emits a tuple that has
>> failed, and if none are present, creates a new tuple.
>>  - I have a bolt that takes 4 seconds to ack a tuple.
>>  - topology.max.spout.pending = 5
>>  - topology.message.timeout.secs = 5
>>
>>  I would expect 1 or 2 tuples to get acked, and 4 or 3 tuples to timeout
>> — then the Bolt would next process the *resent* tuples.  Over time, more
>> and more tuples would be acked (though they would frequently time out).
>>
>>  What I'm seeing instead is that even though tuples are timed-out, they
>> are still being processed by the Bolt.  I'm assuming there is buffer/queue
>> for the Bolt, and that timed-out tuples are not cleared from it.
>> Regardless, this leads to all tuples timing out, since the Bolt will
>> eventually only process tuples that have been timed out.
>>
>>  I'm assuming, and hoping, that I'm missing something obvious here…
>>
>>  Two questions:
>> 1.  Can I prevent Bolts from processing already-timed-out tuples?
>> 2.  What is the point of timing out tuples?  It does *nothing* but call
>> *fail* on the Spout even though the tuple will still be processed by the
>> rest of the Topology!
>>
>>  Thanks,
>> -Sam
>>
>>
>>  Spout:
>>  public class SampleSpout extends BaseRichSpout {
>>     private static Logger logger =
>> LoggerFactory.getLogger(SampleSpout.class);
>>
>>      SpoutOutputCollector collector;
>>     Map<Integer, List<Object>> pending_map = new HashMap<Integer,
>> List<Object>>();
>>     Queue<List<Object>> replay_queue = new
>> LinkedBlockingQueue<List<Object>>();
>>
>>      int contentCounter;
>>     int curMsgId;
>>
>>      @Override
>>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>         // unique-id always increments each time we emit.
>>         // msg-id gets incremented only when new tuples are created.
>>        declarer.declare(new Fields("msg-id", "content"));
>>     }
>>
>>      @Override
>>     public void open(Map conf, TopologyContext context,
>> SpoutOutputCollector spoutOutputCollector) {
>>         collector = spoutOutputCollector;
>>     }
>>
>>      @Override
>>     public void nextTuple() {
>>         // either replay a failed tuple, or create a new one
>>         List<Object> tuple = null;
>>         if (replay_queue.size() > 0){
>>             tuple = replay_queue.poll();
>>         }else{
>>             tuple = new ArrayList<Object>();
>>             tuple.add(null);
>>             tuple.add("Content #" + contentCounter++);
>>         }
>>
>>          // increment msgId and set it as the first item in the tuple
>>         int msgId = this.curMsgId++;
>>         tuple.set(0, msgId);
>>         logger.info("Emitting: " + tuple);
>>         // add this tuple to the 'pending' map, and emit it.
>>         pending_map.put(msgId, tuple);
>>         collector.emit(tuple, msgId);
>>         Utils.sleep(100);
>>     }
>>
>>      @Override
>>     public void ack(Object msgId){
>>         // remove tuple from pending_map since it's no longer pending
>>         List<Object> acked_tuple = pending_map.remove(msgId);
>>         logger.info("Acked: " + acked_tuple);
>>     }
>>
>>      @Override
>>     public void fail(Object msgId){
>>         // remove tuple from pending_map since it's no longer pending
>>         List<Object> failed_tuple = pending_map.remove(msgId);
>>         logger.info("Failed: " + failed_tuple);
>>
>>          // put a copy into the replay queue
>>         ArrayList<Object> copy = new ArrayList<Object>(failed_tuple);
>>         replay_queue.add(copy);
>>     }
>> }
>>
>>
>>  Bolt:
>>  public class SamplePrintBolt extends BaseRichBolt {
>>
>>      private static Logger logger =
>> LoggerFactory.getLogger(SamplePrintBolt.class);
>>
>>      OutputCollector collector;
>>
>>      @Override
>>     public void prepare(Map stormConf, TopologyContext context,
>> OutputCollector outputCollector) {
>>         collector = outputCollector;
>>     }
>>
>>      @Override
>>     public void execute(Tuple input) {
>>         logger.info("I see: " + input.getValues());
>>         Utils.sleep(4000);
>>         logger.info("Done sleeping. Acking: "  + input.getValues());
>>         collector.ack(input);
>>     }
>>
>>      @Override
>>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>         // doesn't emit
>>     }
>> }
>>
>>
>>  Main:
>> public static void main(String[] args) throws Exception {
>>          Config conf = new Config();
>>         conf.setMaxSpoutPending(5);
>>         conf.setMessageTimeoutSecs(5);
>>
>>          TopologyBuilder builder = new TopologyBuilder();
>>         builder.setSpout("spout", new SampleSpout());
>>         builder.setBolt("bolt1", new
>> SamplePrintBolt()).shuffleGrouping("spout");
>>
>>          LocalCluster cluster = new LocalCluster();
>>         cluster.submitTopology("local", conf, builder.createTopology());
>>  }
>>
>>
>>  Output:
>>  30084 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [0, Content
>> #0]
>> 30085 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [0, Content
>> #0]. Will now sleep...
>> 30097 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [1, Content
>> #1]
>> 30097 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [2, Content
>> #2]
>> 30097 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [3, Content
>> #3]
>> 30097 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [4, Content
>> #4]
>> 34086 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>> Acking: [0, Content #0]
>> 34086 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [1, Content
>> #1]. Will now sleep...
>> 34087 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Acked: [0, Content #0]
>> 34087 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [5, Content
>> #5]
>> 38087 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>> Acking: [1, Content #1]
>> 38087 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [2, Content
>> #2]. Will now sleep...
>> 38089 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Acked: [1, Content #1]
>> 38089 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [6, Content
>> #6]
>> *-- So far, so good… however, now it's time for things to timeout.*
>> 40082 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [5, Content #5]
>> 40082 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [4, Content #4]
>> 40082 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [3, Content #3]
>> 40083 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [2, Content #2]
>> 40083 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [7, Content
>> #5]
>> 40084 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [8, Content
>> #4]
>> 40084 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [9, Content
>> #3]
>> 40085 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [10, Content
>> #2]
>> 42088 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>> Acking: [2, Content #2]
>> *-- Acking a timed-out tuple… this does nothing.*
>> 42088 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [3, Content
>> #3]. Will now sleep…
>> *-- Why is it looking at tuple #3?  This has already failed.*
>> 45084 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [6, Content #6]
>> 45085 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [11, Content
>> #6]
>> 46089 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>> Acking: [3, Content #3]
>> 46089 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [4, Content
>> #4]. Will now sleep...
>> 50084 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [10, Content #2]
>> 50085 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [7, Content #5]
>> 50085 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [8, Content #4]
>> 50085 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [9, Content #3]
>> 50085 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [12, Content
>> #2]
>> 50085 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [13, Content
>> #5]
>> 50085 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [14, Content
>> #4]
>> 50085 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [15, Content
>> #3]
>> *-- More timeouts**…*
>> 50090 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>> Acking: [4, Content #4]
>> 50090 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [5, Content
>> #5]. Will now sleep...
>> 54091 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>> Acking: [5, Content #5]
>> *-- Yet the Bolt looks at tuple #5 which timed out 15 seconds ago…*
>> 54091 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [6, Content
>> #6]. Will now sleep...
>> 55085 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [11, Content #6]
>> 55085 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [16, Content
>> #6]
>> 58091 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>> Acking: [6, Content #6]
>> 58092 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [7, Content
>> #5]. Will now sleep...
>> 60085 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [15, Content #3]
>> 60086 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [12, Content #2]
>> 60086 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [13, Content #5]
>> 60086 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [14, Content #4]
>> 60086 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [17, Content
>> #3]
>> 60086 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [18, Content
>> #2]
>> 60086 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [19, Content
>> #5]
>> 60086 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [20, Content
>> #4]
>> 62093 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>> Acking: [7, Content #5]
>> 62093 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [8, Content
>> #4]. Will now sleep…
>> *-- It's clear that the Bolt looks at tuples even if they have
>> timed-out.  It's queue will get longer and longer and tuples will always
>> timeout.*
>> 65086 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [16, Content #6]
>> 65087 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [21, Content
>> #6]
>> 66094 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>> Acking: [8, Content #4]
>> 66094 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [9, Content
>> #3]. Will now sleep...
>> 70087 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [20, Content #4]
>> 70087 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [19, Content #5]
>> 70087 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [18, Content #2]
>> 70088 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [17, Content #3]
>> 70088 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [22, Content
>> #4]
>> 70088 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [23, Content
>> #5]
>> 70088 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [24, Content
>> #2]
>> 70088 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [25, Content
>> #3]
>> 70095 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>> Acking: [9, Content #3]
>> 70095 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [10, Content
>> #2]. Will now sleep...
>> 74096 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>> Acking: [10, Content #2]
>> 74096 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [11, Content
>> #6]. Will now sleep...
>> 75088 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [21, Content #6]
>> 75088 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [26, Content
>> #6]
>> 78097 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>> Acking: [11, Content #6]
>> 78097 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [12, Content
>> #2]. Will now sleep...
>> 80087 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [25, Content #3]
>> 80087 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [24, Content #2]
>> 80087 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [23, Content #5]
>> 80087 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [22, Content #4]
>> 80087 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [27, Content
>> #3]
>> 80087 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [28, Content
>> #2]
>> 80088 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [29, Content
>> #5]
>> 80088 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [30, Content
>> #4]
>> 82098 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>> Acking: [12, Content #2]
>> 82098 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [13, Content
>> #5]. Will now sleep...
>> 85088 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [26, Content #6]
>> 85088 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [31, Content
>> #6]
>> 86098 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>> Acking: [13, Content #5]
>> 86099 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [14, Content
>> #4]. Will now sleep...
>> 90100 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>> Acking: [14, Content #4]
>> 90101 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [15, Content
>> #3]. Will now sleep...
>> 90216 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [29, Content #5]
>> 90216 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [30, Content #4]
>> 90216 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [28, Content #2]
>> 90217 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Failed: [27, Content #3]
>> 90217 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [32, Content
>> #5]
>> 90217 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [33, Content
>> #4]
>> 90217 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [34, Content
>> #2]
>> 90217 [Thread-10-spout] INFO
>>  com.appnexus.bsg.billing.storminator.SampleSpout - Emitting: [35, Content
>> #3]
>> 94101 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping.
>> Acking: [15, Content #3]
>> 94101 [Thread-8-bolt1] INFO
>>  com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [16, Content
>> #6]. Will now sleep…
>>  *-- Problem gets exacerbated…  Bolt is now looking at tuples that have
>> failed 30 seconds ago.*
>>
>
>

Reply via email to