If the tuples in the spout are timed out, why were they acked? On Fri, Apr 17, 2015 at 8:59 AM, Carlos Perelló Marín < [email protected]> wrote:
> Storm has a 30 seconds timeout on every single tuple that has been emitted > by the spout. If your bolt is taking around 1 second per tuple to complete, > that means that if your bolt emits more than 30 tuples, all those tuples > will timeout and go back to the spout to be emitted again. I would give it > a try and set the TOPOLOGY_MAX_SPOUT_PENDING to 20 and see how your numbers > look like. > > On 17 April 2015 at 17:44, Bae, Jae Hyeon <[email protected]> wrote: > >> Hi Carlos >> >> Thanks for your answer but I am concerning pending property is not the >> answer because Storm UI is showing the number of spout acked messages is >> greater than the bolt's one. >> >> I need to study about Storm more but I am wondering what I am doing wrong >> here. I am using KafkaSpout. >> >> On Fri, Apr 17, 2015 at 2:22 AM, Carlos Perelló Marín < >> [email protected]> wrote: >> >>> Hi Jae, >>> >>> I think the setting you want is config.TOPOLOGY_MAX_SPOUT_PENDING. Once >>> you set that config setting to a value, that value will be the number of >>> messages emitted by your spout that are still being processed in your >>> topology. >>> >>> So if you set that to 100 in your previous scenario last bolt in your >>> stream may have 40 messages acked and spout would have around 140 messages >>> emitted, 40 fully processed and 100 split between all your bolts pending to >>> be fully processed. At least that's the theory and how I did understand >>> it... >>> >>> On 17 April 2015 at 07:51, Bae, Jae Hyeon <[email protected]> wrote: >>> >>>> Hi Storm users >>>> >>>> I am a newbie for Storm and I am implement guaranteed delivery with >>>> anchoring emit and ack but it's not working as I expected. >>>> >>>> I implemented the Bolt as the following: >>>> >>>> class MetricsTestBolt(config: Config, topology: String) extends >>>> BaseRichBolt { >>>> private[this] val logger = >>>> LoggerFactory.getLogger(classOf[MetricsTestBolt]) >>>> var throughput: Int = 1 >>>> >>>> override def prepare(stormConf: util.Map[_, _], context: >>>> TopologyContext, collector: OutputCollector): Unit = { >>>> super.prepare(stormConf, context, collector) >>>> >>>> val throughput = >>>> config.getConfig("topologies."+topology).getInt("rate-limit") >>>> logger.info("creating RateLimiter with the threshold: " + throughput) >>>> } >>>> override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit >>>> = () >>>> >>>> override def execute(input: Tuple): Unit = { >>>> Thread.sleep(1000 / throughput) >>>> emit(input, new Values()) >>>> ack(input) >>>> } >>>> } >>>> >>>> Thread.sleep() is for simulating the bolt being backed up. Actually >>>> MetricsTestBolt is the final output stage and it does not have to emit >>>> anything. >>>> >>>> My question is the following Storm UI: >>>> >>>> [image: Inline image 2] >>>> >>>> Spout is acking 1120 messages but Bolts acked only 40 messages. >>>> >>>> Do you have any idea how to block Spout when Bolt is backed up? >>>> >>>> Thank you >>>> Best, Jae >>>> >>> >>> >>> >>> -- >>> >>> Carlos Perelló Marínhttps://www.serverdensity.com >>> >>> >> > > > -- > > Carlos Perelló Marínhttps://www.serverdensity.com > >
