Hi Storm users

I am a newbie for Storm and I am implement guaranteed delivery with
anchoring emit and ack but it's not working as I expected.

I implemented the Bolt as the following:

class MetricsTestBolt(config: Config, topology: String) extends
BaseRichBolt {
private[this] val logger = LoggerFactory.getLogger(classOf[MetricsTestBolt])
var throughput: Int = 1

override def prepare(stormConf: util.Map[_, _], context: TopologyContext,
collector: OutputCollector): Unit = {
super.prepare(stormConf, context, collector)

val throughput =
config.getConfig("topologies."+topology).getInt("rate-limit")
logger.info("creating RateLimiter with the threshold: " + throughput)
}
override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = ()

override def execute(input: Tuple): Unit = {
Thread.sleep(1000 / throughput)
emit(input, new Values())
ack(input)
}
}

Thread.sleep() is for simulating the bolt being backed up. Actually
MetricsTestBolt is the final output stage and it does not have to emit
anything.

My question is the following Storm UI:

[image: Inline image 2]

Spout is acking 1120 messages but Bolts acked only 40 messages.

Do you have any idea how to block Spout when Bolt is backed up?

Thank you
Best, Jae

Reply via email to