I have been facing a scenario recently wherein the topology is just hung.
In stack trace, I can see that KafkaSpout is stuck on metrics_tick.invoke.
Since it is not able to find free slot in the disruptor queue. Acker taks
is blocked on disruptor queue as well. I noticed that Nathan had a fixed a
deadlock scenario by introducing an overflow buffer. But this overflow
buffer is not used when metrics are emitted. Could that not result in the
same deadlock?
Here is the stack of KafkaSpout
"Thread-109-KafkaSpout" #145 prio=5 os_prio=0 tid=0x00007f7f20aae800
nid=0x3c81 runnable [0x00007f7cceae9000]
java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
at
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:338)
at
com.lmax.disruptor.SingleThreadedClaimStrategy.waitForFreeSlotAt(SingleThreadedClaimStrategy.java:129)
at
com.lmax.disruptor.SingleThreadedClaimStrategy.incrementAndGet(SingleThreadedClaimStrategy.java:81)
at com.lmax.disruptor.Sequencer.next(Sequencer.java:127)
at
backtype.storm.utils.DisruptorQueue.publishDirect(DisruptorQueue.java:174)
at
backtype.storm.utils.DisruptorQueue.publish(DisruptorQueue.java:167)
at backtype.storm.disruptor$publish.invoke(disruptor.clj:66)
at
backtype.storm.daemon.executor$mk_executor_transfer_fn$this__6431.invoke(executor.clj:190)
at
backtype.storm.daemon.executor$mk_executor_transfer_fn$this__6431.invoke(executor.clj:197)
at backtype.storm.daemon.task$send_unanchored.invoke(task.clj:112)
at backtype.storm.daemon.task$send_unanchored.invoke(task.clj:117)
at
backtype.storm.daemon.executor$metrics_tick.invoke(executor.clj:296)
at
backtype.storm.daemon.executor$fn__6579$tuple_action_fn__6585.invoke(executor.clj:438)
at
backtype.storm.daemon.executor$mk_task_receiver$fn__6570.invoke(executor.clj:404)
at
backtype.storm.disruptor$clojure_handler$reify__1605.onEvent(disruptor.clj:58)
at
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125)
at
backtype.storm.utils.DisruptorQueue.consumeBatch(DisruptorQueue.java:87)
at backtype.storm.disruptor$consume_batch.invoke(disruptor.clj:76)
at
backtype.storm.daemon.executor$fn__6579$fn__6594$fn__6623.invoke(executor.clj:542)
at backtype.storm.util$async_loop$fn__459.invoke(util.clj:463)
at clojure.lang.AFn.run(AFn.java:24)
at java.lang.Thread.run(Thread.java:745)
--
Regards,
Abhishek Agarwal