Storm has a 30 seconds timeout on every single tuple that has been emitted by the spout. If your bolt is taking around 1 second per tuple to complete, that means that if your bolt emits more than 30 tuples, all those tuples will timeout and go back to the spout to be emitted again. I would give it a try and set the TOPOLOGY_MAX_SPOUT_PENDING to 20 and see how your numbers look like.
On 17 April 2015 at 17:44, Bae, Jae Hyeon <[email protected]> wrote: > Hi Carlos > > Thanks for your answer but I am concerning pending property is not the > answer because Storm UI is showing the number of spout acked messages is > greater than the bolt's one. > > I need to study about Storm more but I am wondering what I am doing wrong > here. I am using KafkaSpout. > > On Fri, Apr 17, 2015 at 2:22 AM, Carlos Perelló Marín < > [email protected]> wrote: > >> Hi Jae, >> >> I think the setting you want is config.TOPOLOGY_MAX_SPOUT_PENDING. Once >> you set that config setting to a value, that value will be the number of >> messages emitted by your spout that are still being processed in your >> topology. >> >> So if you set that to 100 in your previous scenario last bolt in your >> stream may have 40 messages acked and spout would have around 140 messages >> emitted, 40 fully processed and 100 split between all your bolts pending to >> be fully processed. At least that's the theory and how I did understand >> it... >> >> On 17 April 2015 at 07:51, Bae, Jae Hyeon <[email protected]> wrote: >> >>> Hi Storm users >>> >>> I am a newbie for Storm and I am implement guaranteed delivery with >>> anchoring emit and ack but it's not working as I expected. >>> >>> I implemented the Bolt as the following: >>> >>> class MetricsTestBolt(config: Config, topology: String) extends >>> BaseRichBolt { >>> private[this] val logger = >>> LoggerFactory.getLogger(classOf[MetricsTestBolt]) >>> var throughput: Int = 1 >>> >>> override def prepare(stormConf: util.Map[_, _], context: >>> TopologyContext, collector: OutputCollector): Unit = { >>> super.prepare(stormConf, context, collector) >>> >>> val throughput = >>> config.getConfig("topologies."+topology).getInt("rate-limit") >>> logger.info("creating RateLimiter with the threshold: " + throughput) >>> } >>> override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = >>> () >>> >>> override def execute(input: Tuple): Unit = { >>> Thread.sleep(1000 / throughput) >>> emit(input, new Values()) >>> ack(input) >>> } >>> } >>> >>> Thread.sleep() is for simulating the bolt being backed up. Actually >>> MetricsTestBolt is the final output stage and it does not have to emit >>> anything. >>> >>> My question is the following Storm UI: >>> >>> [image: Inline image 2] >>> >>> Spout is acking 1120 messages but Bolts acked only 40 messages. >>> >>> Do you have any idea how to block Spout when Bolt is backed up? >>> >>> Thank you >>> Best, Jae >>> >> >> >> >> -- >> >> Carlos Perelló Marínhttps://www.serverdensity.com >> >> > -- Carlos Perelló Marínhttps://www.serverdensity.com
