Hi Jae,

I think the setting you want is config.TOPOLOGY_MAX_SPOUT_PENDING. Once you
set that config setting to a value, that value will be the number of
messages emitted by your spout that are still being processed in your
topology.

So if you set that to 100 in your previous scenario last bolt in your
stream may have 40 messages acked and spout would have around 140 messages
emitted, 40 fully processed and 100 split between all your bolts pending to
be fully processed. At least that's the theory and how I did understand
it...

On 17 April 2015 at 07:51, Bae, Jae Hyeon <[email protected]> wrote:

> Hi Storm users
>
> I am a newbie for Storm and I am implement guaranteed delivery with
> anchoring emit and ack but it's not working as I expected.
>
> I implemented the Bolt as the following:
>
> class MetricsTestBolt(config: Config, topology: String) extends
> BaseRichBolt {
> private[this] val logger =
> LoggerFactory.getLogger(classOf[MetricsTestBolt])
> var throughput: Int = 1
>
> override def prepare(stormConf: util.Map[_, _], context: TopologyContext,
> collector: OutputCollector): Unit = {
> super.prepare(stormConf, context, collector)
>
> val throughput =
> config.getConfig("topologies."+topology).getInt("rate-limit")
> logger.info("creating RateLimiter with the threshold: " + throughput)
> }
> override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = ()
>
> override def execute(input: Tuple): Unit = {
> Thread.sleep(1000 / throughput)
> emit(input, new Values())
> ack(input)
> }
> }
>
> Thread.sleep() is for simulating the bolt being backed up. Actually
> MetricsTestBolt is the final output stage and it does not have to emit
> anything.
>
> My question is the following Storm UI:
>
> [image: Inline image 2]
>
> Spout is acking 1120 messages but Bolts acked only 40 messages.
>
> Do you have any idea how to block Spout when Bolt is backed up?
>
> Thank you
> Best, Jae
>



-- 

Carlos Perelló Marínhttps://www.serverdensity.com

Reply via email to