Hi all. I'm hoping somebody can explain this behavior to me because it seems
pretty unexpected.
I'm seeing that timing out tuples does nothing except call "fail" on the Spout.
The tuple itself will still be processed through the Topology, except
acking/failing will have no effect. Another problem is that the number of
pending tuples will increase — timed out tuples do not count as pending even
though they will flow through the topology. Unless I'm missing something,
these two combined problems make timing out tuples, at best. utterly pointless,
and at worst very problematic (as it will just throw more tuples into a
topology that is already maxed out).
Here's my topology:
- I have a spout. On nextTuple, it either re-emits a tuple that has failed,
and if none are present, creates a new tuple.
- I have a bolt that takes 4 seconds to ack a tuple.
- topology.max.spout.pending = 5
- topology.message.timeout.secs = 5
I would expect 1 or 2 tuples to get acked, and 4 or 3 tuples to timeout — then
the Bolt would next process the resent tuples. Over time, more and more tuples
would be acked (though they would frequently time out).
What I'm seeing instead is that even though tuples are timed-out, they are
still being processed by the Bolt. I'm assuming there is buffer/queue for the
Bolt, and that timed-out tuples are not cleared from it. Regardless, this
leads to all tuples timing out, since the Bolt will eventually only process
tuples that have been timed out.
I'm assuming, and hoping, that I'm missing something obvious here…
Two questions:
1. Can I prevent Bolts from processing already-timed-out tuples?
2. What is the point of timing out tuples? It does nothing but call fail on
the Spout even though the tuple will still be processed by the rest of the
Topology!
Thanks,
-Sam
Spout:
public class SampleSpout extends BaseRichSpout {
private static Logger logger = LoggerFactory.getLogger(SampleSpout.class);
SpoutOutputCollector collector;
Map<Integer, List<Object>> pending_map = new HashMap<Integer,
List<Object>>();
Queue<List<Object>> replay_queue = new LinkedBlockingQueue<List<Object>>();
int contentCounter;
int curMsgId;
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// unique-id always increments each time we emit.
// msg-id gets incremented only when new tuples are created.
declarer.declare(new Fields("msg-id", "content"));
}
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector
spoutOutputCollector) {
collector = spoutOutputCollector;
}
@Override
public void nextTuple() {
// either replay a failed tuple, or create a new one
List<Object> tuple = null;
if (replay_queue.size() > 0){
tuple = replay_queue.poll();
}else{
tuple = new ArrayList<Object>();
tuple.add(null);
tuple.add("Content #" + contentCounter++);
}
// increment msgId and set it as the first item in the tuple
int msgId = this.curMsgId++;
tuple.set(0, msgId);
logger.info("Emitting: " + tuple);
// add this tuple to the 'pending' map, and emit it.
pending_map.put(msgId, tuple);
collector.emit(tuple, msgId);
Utils.sleep(100);
}
@Override
public void ack(Object msgId){
// remove tuple from pending_map since it's no longer pending
List<Object> acked_tuple = pending_map.remove(msgId);
logger.info("Acked: " + acked_tuple);
}
@Override
public void fail(Object msgId){
// remove tuple from pending_map since it's no longer pending
List<Object> failed_tuple = pending_map.remove(msgId);
logger.info("Failed: " + failed_tuple);
// put a copy into the replay queue
ArrayList<Object> copy = new ArrayList<Object>(failed_tuple);
replay_queue.add(copy);
}
}
Bolt:
public class SamplePrintBolt extends BaseRichBolt {
private static Logger logger =
LoggerFactory.getLogger(SamplePrintBolt.class);
OutputCollector collector;
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector
outputCollector) {
collector = outputCollector;
}
@Override
public void execute(Tuple input) {
logger.info("I see: " + input.getValues());
Utils.sleep(4000);
logger.info("Done sleeping. Acking: " + input.getValues());
collector.ack(input);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// doesn't emit
}
}
Main:
public static void main(String[] args) throws Exception {
Config conf = new Config();
conf.setMaxSpoutPending(5);
conf.setMessageTimeoutSecs(5);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new SampleSpout());
builder.setBolt("bolt1", new
SamplePrintBolt()).shuffleGrouping("spout");
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("local", conf, builder.createTopology());
}
Output:
30084 [Thread-10-spout] INFO com.appnexus.bsg.billing.storminator.SampleSpout
- Emitting: [0, Content #0]
30085 [Thread-8-bolt1] INFO
com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [0, Content #0].
Will now sleep...
30097 [Thread-10-spout] INFO com.appnexus.bsg.billing.storminator.SampleSpout
- Emitting: [1, Content #1]
30097 [Thread-10-spout] INFO com.appnexus.bsg.billing.storminator.SampleSpout
- Emitting: [2, Content #2]
30097 [Thread-10-spout] INFO com.appnexus.bsg.billing.storminator.SampleSpout
- Emitting: [3, Content #3]
30097 [Thread-10-spout] INFO com.appnexus.bsg.billing.storminator.SampleSpout
- Emitting: [4, Content #4]
34086 [Thread-8-bolt1] INFO
com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking:
[0, Content #0]
34086 [Thread-8-bolt1] INFO
com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [1, Content #1].
Will now sleep...
34087 [Thread-10-spout] INFO com.appnexus.bsg.billing.storminator.SampleSpout
- Acked: [0, Content #0]
34087 [Thread-10-spout] INFO com.appnexus.bsg.billing.storminator.SampleSpout
- Emitting: [5, Content #5]
38087 [Thread-8-bolt1] INFO
com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking:
[1, Content #1]
38087 [Thread-8-bolt1] INFO
com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [2, Content #2].
Will now sleep...
38089 [Thread-10-spout] INFO com.appnexus.bsg.billing.storminator.SampleSpout
- Acked: [1, Content #1]
38089 [Thread-10-spout] INFO com.appnexus.bsg.billing.storminator.SampleSpout
- Emitting: [6, Content #6]
-- So far, so good… however, now it's time for things to timeout.
40082 [Thread-10-spout] INFO com.appnexus.bsg.billing.storminator.SampleSpout
- Failed: [5, Content #5]
40082 [Thread-10-spout] INFO com.appnexus.bsg.billing.storminator.SampleSpout
- Failed: [4, Content #4]
40082 [Thread-10-spout] INFO com.appnexus.bsg.billing.storminator.SampleSpout
- Failed: [3, Content #3]
40083 [Thread-10-spout] INFO com.appnexus.bsg.billing.storminator.SampleSpout
- Failed: [2, Content #2]
40083 [Thread-10-spout] INFO com.appnexus.bsg.billing.storminator.SampleSpout
- Emitting: [7, Content #5]
40084 [Thread-10-spout] INFO com.appnexus.bsg.billing.storminator.SampleSpout
- Emitting: [8, Content #4]
40084 [Thread-10-spout] INFO com.appnexus.bsg.billing.storminator.SampleSpout
- Emitting: [9, Content #3]
40085 [Thread-10-spout] INFO com.appnexus.bsg.billing.storminator.SampleSpout
- Emitting: [10, Content #2]
42088 [Thread-8-bolt1] INFO
com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking:
[2, Content #2]
-- Acking a timed-out tuple… this does nothing.
42088 [Thread-8-bolt1] INFO
com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [3, Content #3].
Will now sleep…
-- Why is it looking at tuple #3? This has already failed.
45084 [Thread-10-spout] INFO com.appnexus.bsg.billing.storminator.SampleSpout
- Failed: [6, Content #6]
45085 [Thread-10-spout] INFO com.appnexus.bsg.billing.storminator.SampleSpout
- Emitting: [11, Content #6]
46089 [Thread-8-bolt1] INFO
com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking:
[3, Content #3]
46089 [Thread-8-bolt1] INFO
com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [4, Content #4].
Will now sleep...
50084 [Thread-10-spout] INFO com.appnexus.bsg.billing.storminator.SampleSpout
- Failed: [10, Content #2]
50085 [Thread-10-spout] INFO com.appnexus.bsg.billing.storminator.SampleSpout
- Failed: [7, Content #5]
50085 [Thread-10-spout] INFO com.appnexus.bsg.billing.storminator.SampleSpout
- Failed: [8, Content #4]
50085 [Thread-10-spout] INFO com.appnexus.bsg.billing.storminator.SampleSpout
- Failed: [9, Content #3]
50085 [Thread-10-spout] INFO com.appnexus.bsg.billing.storminator.SampleSpout
- Emitting: [12, Content #2]
50085 [Thread-10-spout] INFO com.appnexus.bsg.billing.storminator.SampleSpout
- Emitting: [13, Content #5]
50085 [Thread-10-spout] INFO com.appnexus.bsg.billing.storminator.SampleSpout
- Emitting: [14, Content #4]
50085 [Thread-10-spout] INFO com.appnexus.bsg.billing.storminator.SampleSpout
- Emitting: [15, Content #3]
-- More timeouts…
50090 [Thread-8-bolt1] INFO
com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking:
[4, Content #4]
50090 [Thread-8-bolt1] INFO
com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [5, Content #5].
Will now sleep...
54091 [Thread-8-bolt1] INFO
com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking:
[5, Content #5]
-- Yet the Bolt looks at tuple #5 which timed out 15 seconds ago…
54091 [Thread-8-bolt1] INFO
com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [6, Content #6].
Will now sleep...
55085 [Thread-10-spout] INFO com.appnexus.bsg.billing.storminator.SampleSpout
- Failed: [11, Content #6]
55085 [Thread-10-spout] INFO com.appnexus.bsg.billing.storminator.SampleSpout
- Emitting: [16, Content #6]
58091 [Thread-8-bolt1] INFO
com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking:
[6, Content #6]
58092 [Thread-8-bolt1] INFO
com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [7, Content #5].
Will now sleep...
60085 [Thread-10-spout] INFO com.appnexus.bsg.billing.storminator.SampleSpout
- Failed: [15, Content #3]
60086 [Thread-10-spout] INFO com.appnexus.bsg.billing.storminator.SampleSpout
- Failed: [12, Content #2]
60086 [Thread-10-spout] INFO com.appnexus.bsg.billing.storminator.SampleSpout
- Failed: [13, Content #5]
60086 [Thread-10-spout] INFO com.appnexus.bsg.billing.storminator.SampleSpout
- Failed: [14, Content #4]
60086 [Thread-10-spout] INFO com.appnexus.bsg.billing.storminator.SampleSpout
- Emitting: [17, Content #3]
60086 [Thread-10-spout] INFO com.appnexus.bsg.billing.storminator.SampleSpout
- Emitting: [18, Content #2]
60086 [Thread-10-spout] INFO com.appnexus.bsg.billing.storminator.SampleSpout
- Emitting: [19, Content #5]
60086 [Thread-10-spout] INFO com.appnexus.bsg.billing.storminator.SampleSpout
- Emitting: [20, Content #4]
62093 [Thread-8-bolt1] INFO
com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking:
[7, Content #5]
62093 [Thread-8-bolt1] INFO
com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [8, Content #4].
Will now sleep…
-- It's clear that the Bolt looks at tuples even if they have timed-out. It's
queue will get longer and longer and tuples will always timeout.
65086 [Thread-10-spout] INFO com.appnexus.bsg.billing.storminator.SampleSpout
- Failed: [16, Content #6]
65087 [Thread-10-spout] INFO com.appnexus.bsg.billing.storminator.SampleSpout
- Emitting: [21, Content #6]
66094 [Thread-8-bolt1] INFO
com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking:
[8, Content #4]
66094 [Thread-8-bolt1] INFO
com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [9, Content #3].
Will now sleep...
70087 [Thread-10-spout] INFO com.appnexus.bsg.billing.storminator.SampleSpout
- Failed: [20, Content #4]
70087 [Thread-10-spout] INFO com.appnexus.bsg.billing.storminator.SampleSpout
- Failed: [19, Content #5]
70087 [Thread-10-spout] INFO com.appnexus.bsg.billing.storminator.SampleSpout
- Failed: [18, Content #2]
70088 [Thread-10-spout] INFO com.appnexus.bsg.billing.storminator.SampleSpout
- Failed: [17, Content #3]
70088 [Thread-10-spout] INFO com.appnexus.bsg.billing.storminator.SampleSpout
- Emitting: [22, Content #4]
70088 [Thread-10-spout] INFO com.appnexus.bsg.billing.storminator.SampleSpout
- Emitting: [23, Content #5]
70088 [Thread-10-spout] INFO com.appnexus.bsg.billing.storminator.SampleSpout
- Emitting: [24, Content #2]
70088 [Thread-10-spout] INFO com.appnexus.bsg.billing.storminator.SampleSpout
- Emitting: [25, Content #3]
70095 [Thread-8-bolt1] INFO
com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking:
[9, Content #3]
70095 [Thread-8-bolt1] INFO
com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [10, Content #2].
Will now sleep...
74096 [Thread-8-bolt1] INFO
com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking:
[10, Content #2]
74096 [Thread-8-bolt1] INFO
com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [11, Content #6].
Will now sleep...
75088 [Thread-10-spout] INFO com.appnexus.bsg.billing.storminator.SampleSpout
- Failed: [21, Content #6]
75088 [Thread-10-spout] INFO com.appnexus.bsg.billing.storminator.SampleSpout
- Emitting: [26, Content #6]
78097 [Thread-8-bolt1] INFO
com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking:
[11, Content #6]
78097 [Thread-8-bolt1] INFO
com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [12, Content #2].
Will now sleep...
80087 [Thread-10-spout] INFO com.appnexus.bsg.billing.storminator.SampleSpout
- Failed: [25, Content #3]
80087 [Thread-10-spout] INFO com.appnexus.bsg.billing.storminator.SampleSpout
- Failed: [24, Content #2]
80087 [Thread-10-spout] INFO com.appnexus.bsg.billing.storminator.SampleSpout
- Failed: [23, Content #5]
80087 [Thread-10-spout] INFO com.appnexus.bsg.billing.storminator.SampleSpout
- Failed: [22, Content #4]
80087 [Thread-10-spout] INFO com.appnexus.bsg.billing.storminator.SampleSpout
- Emitting: [27, Content #3]
80087 [Thread-10-spout] INFO com.appnexus.bsg.billing.storminator.SampleSpout
- Emitting: [28, Content #2]
80088 [Thread-10-spout] INFO com.appnexus.bsg.billing.storminator.SampleSpout
- Emitting: [29, Content #5]
80088 [Thread-10-spout] INFO com.appnexus.bsg.billing.storminator.SampleSpout
- Emitting: [30, Content #4]
82098 [Thread-8-bolt1] INFO
com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking:
[12, Content #2]
82098 [Thread-8-bolt1] INFO
com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [13, Content #5].
Will now sleep...
85088 [Thread-10-spout] INFO com.appnexus.bsg.billing.storminator.SampleSpout
- Failed: [26, Content #6]
85088 [Thread-10-spout] INFO com.appnexus.bsg.billing.storminator.SampleSpout
- Emitting: [31, Content #6]
86098 [Thread-8-bolt1] INFO
com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking:
[13, Content #5]
86099 [Thread-8-bolt1] INFO
com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [14, Content #4].
Will now sleep...
90100 [Thread-8-bolt1] INFO
com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking:
[14, Content #4]
90101 [Thread-8-bolt1] INFO
com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [15, Content #3].
Will now sleep...
90216 [Thread-10-spout] INFO com.appnexus.bsg.billing.storminator.SampleSpout
- Failed: [29, Content #5]
90216 [Thread-10-spout] INFO com.appnexus.bsg.billing.storminator.SampleSpout
- Failed: [30, Content #4]
90216 [Thread-10-spout] INFO com.appnexus.bsg.billing.storminator.SampleSpout
- Failed: [28, Content #2]
90217 [Thread-10-spout] INFO com.appnexus.bsg.billing.storminator.SampleSpout
- Failed: [27, Content #3]
90217 [Thread-10-spout] INFO com.appnexus.bsg.billing.storminator.SampleSpout
- Emitting: [32, Content #5]
90217 [Thread-10-spout] INFO com.appnexus.bsg.billing.storminator.SampleSpout
- Emitting: [33, Content #4]
90217 [Thread-10-spout] INFO com.appnexus.bsg.billing.storminator.SampleSpout
- Emitting: [34, Content #2]
90217 [Thread-10-spout] INFO com.appnexus.bsg.billing.storminator.SampleSpout
- Emitting: [35, Content #3]
94101 [Thread-8-bolt1] INFO
com.appnexus.bsg.billing.storminator.SamplePrintBolt - Done sleeping. Acking:
[15, Content #3]
94101 [Thread-8-bolt1] INFO
com.appnexus.bsg.billing.storminator.SamplePrintBolt - I see: [16, Content #6].
Will now sleep…
-- Problem gets exacerbated… Bolt is now looking at tuples that have failed 30
seconds ago.