I am testing the same situation with ExclamationTopology and it worked
well. I will investigate why my first topology didn't work well.

On Fri, Apr 17, 2015 at 9:11 AM, Bae, Jae Hyeon <[email protected]> wrote:

> If the tuples in the spout are timed out, why were they acked?
>
> On Fri, Apr 17, 2015 at 8:59 AM, Carlos Perelló Marín <
> [email protected]> wrote:
>
>> Storm has a 30 seconds timeout on every single tuple that has been
>> emitted by the spout. If your bolt is taking around 1 second per tuple to
>> complete, that means that if your bolt emits more than 30 tuples, all those
>> tuples will timeout and go back to the spout to be emitted again. I would
>> give it a try and set the TOPOLOGY_MAX_SPOUT_PENDING to 20 and see how your
>> numbers look like.
>>
>> On 17 April 2015 at 17:44, Bae, Jae Hyeon <[email protected]> wrote:
>>
>>> Hi Carlos
>>>
>>> Thanks for your answer but I am concerning pending property is not the
>>> answer because Storm UI is showing the number of spout acked messages is
>>> greater than the bolt's one.
>>>
>>> I need to study about Storm more but I am wondering what I am doing
>>> wrong here. I am using KafkaSpout.
>>>
>>> On Fri, Apr 17, 2015 at 2:22 AM, Carlos Perelló Marín <
>>> [email protected]> wrote:
>>>
>>>> Hi Jae,
>>>>
>>>> I think the setting you want is config.TOPOLOGY_MAX_SPOUT_PENDING. Once
>>>> you set that config setting to a value, that value will be the number of
>>>> messages emitted by your spout that are still being processed in your
>>>> topology.
>>>>
>>>> So if you set that to 100 in your previous scenario last bolt in your
>>>> stream may have 40 messages acked and spout would have around 140 messages
>>>> emitted, 40 fully processed and 100 split between all your bolts pending to
>>>> be fully processed. At least that's the theory and how I did understand
>>>> it...
>>>>
>>>> On 17 April 2015 at 07:51, Bae, Jae Hyeon <[email protected]> wrote:
>>>>
>>>>> Hi Storm users
>>>>>
>>>>> I am a newbie for Storm and I am implement guaranteed delivery with
>>>>> anchoring emit and ack but it's not working as I expected.
>>>>>
>>>>> I implemented the Bolt as the following:
>>>>>
>>>>> class MetricsTestBolt(config: Config, topology: String) extends
>>>>> BaseRichBolt {
>>>>> private[this] val logger =
>>>>> LoggerFactory.getLogger(classOf[MetricsTestBolt])
>>>>> var throughput: Int = 1
>>>>>
>>>>> override def prepare(stormConf: util.Map[_, _], context:
>>>>> TopologyContext, collector: OutputCollector): Unit = {
>>>>> super.prepare(stormConf, context, collector)
>>>>>
>>>>> val throughput =
>>>>> config.getConfig("topologies."+topology).getInt("rate-limit")
>>>>> logger.info("creating RateLimiter with the threshold: " + throughput)
>>>>> }
>>>>> override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit
>>>>> = ()
>>>>>
>>>>> override def execute(input: Tuple): Unit = {
>>>>> Thread.sleep(1000 / throughput)
>>>>> emit(input, new Values())
>>>>> ack(input)
>>>>> }
>>>>> }
>>>>>
>>>>> Thread.sleep() is for simulating the bolt being backed up. Actually
>>>>> MetricsTestBolt is the final output stage and it does not have to emit
>>>>> anything.
>>>>>
>>>>> My question is the following Storm UI:
>>>>>
>>>>> [image: Inline image 2]
>>>>>
>>>>> Spout is acking 1120 messages but Bolts acked only 40 messages.
>>>>>
>>>>> Do you have any idea how to block Spout when Bolt is backed up?
>>>>>
>>>>> Thank you
>>>>> Best, Jae
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>>
>>>> Carlos Perelló Marínhttps://www.serverdensity.com
>>>>
>>>>
>>>
>>
>>
>> --
>>
>> Carlos Perelló Marínhttps://www.serverdensity.com
>>
>>
>

Reply via email to