Hi Storm users
I am a newbie for Storm and I am implement guaranteed delivery with
anchoring emit and ack but it's not working as I expected.
I implemented the Bolt as the following:
class MetricsTestBolt(config: Config, topology: String) extends
BaseRichBolt {
private[this] val logger = LoggerFactory.getLogger(classOf[MetricsTestBolt])
var throughput: Int = 1
override def prepare(stormConf: util.Map[_, _], context: TopologyContext,
collector: OutputCollector): Unit = {
super.prepare(stormConf, context, collector)
val throughput =
config.getConfig("topologies."+topology).getInt("rate-limit")
logger.info("creating RateLimiter with the threshold: " + throughput)
}
override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = ()
override def execute(input: Tuple): Unit = {
Thread.sleep(1000 / throughput)
emit(input, new Values())
ack(input)
}
}
Thread.sleep() is for simulating the bolt being backed up. Actually
MetricsTestBolt is the final output stage and it does not have to emit
anything.
My question is the following Storm UI:
[image: Inline image 2]
Spout is acking 1120 messages but Bolts acked only 40 messages.
Do you have any idea how to block Spout when Bolt is backed up?
Thank you
Best, Jae