I am testing the same situation with ExclamationTopology and it worked well. I will investigate why my first topology didn't work well.
On Fri, Apr 17, 2015 at 9:11 AM, Bae, Jae Hyeon <[email protected]> wrote: > If the tuples in the spout are timed out, why were they acked? > > On Fri, Apr 17, 2015 at 8:59 AM, Carlos Perelló Marín < > [email protected]> wrote: > >> Storm has a 30 seconds timeout on every single tuple that has been >> emitted by the spout. If your bolt is taking around 1 second per tuple to >> complete, that means that if your bolt emits more than 30 tuples, all those >> tuples will timeout and go back to the spout to be emitted again. I would >> give it a try and set the TOPOLOGY_MAX_SPOUT_PENDING to 20 and see how your >> numbers look like. >> >> On 17 April 2015 at 17:44, Bae, Jae Hyeon <[email protected]> wrote: >> >>> Hi Carlos >>> >>> Thanks for your answer but I am concerning pending property is not the >>> answer because Storm UI is showing the number of spout acked messages is >>> greater than the bolt's one. >>> >>> I need to study about Storm more but I am wondering what I am doing >>> wrong here. I am using KafkaSpout. >>> >>> On Fri, Apr 17, 2015 at 2:22 AM, Carlos Perelló Marín < >>> [email protected]> wrote: >>> >>>> Hi Jae, >>>> >>>> I think the setting you want is config.TOPOLOGY_MAX_SPOUT_PENDING. Once >>>> you set that config setting to a value, that value will be the number of >>>> messages emitted by your spout that are still being processed in your >>>> topology. >>>> >>>> So if you set that to 100 in your previous scenario last bolt in your >>>> stream may have 40 messages acked and spout would have around 140 messages >>>> emitted, 40 fully processed and 100 split between all your bolts pending to >>>> be fully processed. At least that's the theory and how I did understand >>>> it... >>>> >>>> On 17 April 2015 at 07:51, Bae, Jae Hyeon <[email protected]> wrote: >>>> >>>>> Hi Storm users >>>>> >>>>> I am a newbie for Storm and I am implement guaranteed delivery with >>>>> anchoring emit and ack but it's not working as I expected. >>>>> >>>>> I implemented the Bolt as the following: >>>>> >>>>> class MetricsTestBolt(config: Config, topology: String) extends >>>>> BaseRichBolt { >>>>> private[this] val logger = >>>>> LoggerFactory.getLogger(classOf[MetricsTestBolt]) >>>>> var throughput: Int = 1 >>>>> >>>>> override def prepare(stormConf: util.Map[_, _], context: >>>>> TopologyContext, collector: OutputCollector): Unit = { >>>>> super.prepare(stormConf, context, collector) >>>>> >>>>> val throughput = >>>>> config.getConfig("topologies."+topology).getInt("rate-limit") >>>>> logger.info("creating RateLimiter with the threshold: " + throughput) >>>>> } >>>>> override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit >>>>> = () >>>>> >>>>> override def execute(input: Tuple): Unit = { >>>>> Thread.sleep(1000 / throughput) >>>>> emit(input, new Values()) >>>>> ack(input) >>>>> } >>>>> } >>>>> >>>>> Thread.sleep() is for simulating the bolt being backed up. Actually >>>>> MetricsTestBolt is the final output stage and it does not have to emit >>>>> anything. >>>>> >>>>> My question is the following Storm UI: >>>>> >>>>> [image: Inline image 2] >>>>> >>>>> Spout is acking 1120 messages but Bolts acked only 40 messages. >>>>> >>>>> Do you have any idea how to block Spout when Bolt is backed up? >>>>> >>>>> Thank you >>>>> Best, Jae >>>>> >>>> >>>> >>>> >>>> -- >>>> >>>> Carlos Perelló Marínhttps://www.serverdensity.com >>>> >>>> >>> >> >> >> -- >> >> Carlos Perelló Marínhttps://www.serverdensity.com >> >> >
