If the tuples in the spout are timed out, why were they acked?

On Fri, Apr 17, 2015 at 8:59 AM, Carlos Perelló Marín <
[email protected]> wrote:

> Storm has a 30 seconds timeout on every single tuple that has been emitted
> by the spout. If your bolt is taking around 1 second per tuple to complete,
> that means that if your bolt emits more than 30 tuples, all those tuples
> will timeout and go back to the spout to be emitted again. I would give it
> a try and set the TOPOLOGY_MAX_SPOUT_PENDING to 20 and see how your numbers
> look like.
>
> On 17 April 2015 at 17:44, Bae, Jae Hyeon <[email protected]> wrote:
>
>> Hi Carlos
>>
>> Thanks for your answer but I am concerning pending property is not the
>> answer because Storm UI is showing the number of spout acked messages is
>> greater than the bolt's one.
>>
>> I need to study about Storm more but I am wondering what I am doing wrong
>> here. I am using KafkaSpout.
>>
>> On Fri, Apr 17, 2015 at 2:22 AM, Carlos Perelló Marín <
>> [email protected]> wrote:
>>
>>> Hi Jae,
>>>
>>> I think the setting you want is config.TOPOLOGY_MAX_SPOUT_PENDING. Once
>>> you set that config setting to a value, that value will be the number of
>>> messages emitted by your spout that are still being processed in your
>>> topology.
>>>
>>> So if you set that to 100 in your previous scenario last bolt in your
>>> stream may have 40 messages acked and spout would have around 140 messages
>>> emitted, 40 fully processed and 100 split between all your bolts pending to
>>> be fully processed. At least that's the theory and how I did understand
>>> it...
>>>
>>> On 17 April 2015 at 07:51, Bae, Jae Hyeon <[email protected]> wrote:
>>>
>>>> Hi Storm users
>>>>
>>>> I am a newbie for Storm and I am implement guaranteed delivery with
>>>> anchoring emit and ack but it's not working as I expected.
>>>>
>>>> I implemented the Bolt as the following:
>>>>
>>>> class MetricsTestBolt(config: Config, topology: String) extends
>>>> BaseRichBolt {
>>>> private[this] val logger =
>>>> LoggerFactory.getLogger(classOf[MetricsTestBolt])
>>>> var throughput: Int = 1
>>>>
>>>> override def prepare(stormConf: util.Map[_, _], context:
>>>> TopologyContext, collector: OutputCollector): Unit = {
>>>> super.prepare(stormConf, context, collector)
>>>>
>>>> val throughput =
>>>> config.getConfig("topologies."+topology).getInt("rate-limit")
>>>> logger.info("creating RateLimiter with the threshold: " + throughput)
>>>> }
>>>> override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit
>>>> = ()
>>>>
>>>> override def execute(input: Tuple): Unit = {
>>>> Thread.sleep(1000 / throughput)
>>>> emit(input, new Values())
>>>> ack(input)
>>>> }
>>>> }
>>>>
>>>> Thread.sleep() is for simulating the bolt being backed up. Actually
>>>> MetricsTestBolt is the final output stage and it does not have to emit
>>>> anything.
>>>>
>>>> My question is the following Storm UI:
>>>>
>>>> [image: Inline image 2]
>>>>
>>>> Spout is acking 1120 messages but Bolts acked only 40 messages.
>>>>
>>>> Do you have any idea how to block Spout when Bolt is backed up?
>>>>
>>>> Thank you
>>>> Best, Jae
>>>>
>>>
>>>
>>>
>>> --
>>>
>>> Carlos Perelló Marínhttps://www.serverdensity.com
>>>
>>>
>>
>
>
> --
>
> Carlos Perelló Marínhttps://www.serverdensity.com
>
>

Reply via email to