Thanks Naresh. But i am still not sure what should be a proper value for the numAckers()? I set it to 1000 and that does not seem to be improve? I keep getting thousands of failed acks.
-- Kushan Maskey 817.403.7500 On Wed, Sep 10, 2014 at 12:53 PM, Naresh Kosgi <nareshko...@gmail.com> wrote: > u can set it using the config object. The method I believe is called > num_ackers(int) > > On Wed, Sep 10, 2014 at 11:53 AM, Kushan Maskey < > kushan.mas...@mmillerassociates.com> wrote: > >> Thanks for the responses. >> >> Based on the document >> https://storm.incubator.apache.org/documentation/Running-topologies-on-a-production-cluster.html, >> I am able to set the TOPOLOGY_MAX_SPOUT_PENDING, but how can you set the >> TOPOLOGY_AKERS? >> I see TOPOLOGY_AKERS_EXECUTORS. Does that mean the same? How do you >> determine the TOPOLOGY_AKERS for a batch load of more than 100K data at 20 >> messages per seconds? Can any of you give me an idea on this? >> >> >> -- >> Kushan Maskey >> 817.403.7500 >> >> On Wed, Sep 10, 2014 at 4:18 AM, Spico Florin <spicoflo...@gmail.com> >> wrote: >> >>> Hello! >>> I'll consider to slow down the spout. Set up a value >>> for Config.TOPOLOGY_MAX_SPOUT_PENDING. It can happen that Cassanda, Solr >>> and CouchDatabase do not cope with the requency that you emit your messages >>> and thus you have backpressure. >>> Also, the spout should emit the messages anchored (( not sure here see >>> my post about this))in order that the set up of >>> Config.TOPOLOGY_MAX_SPOUT_PENDING to take effect. >>> Check this blog: >>> http://brianoneill.blogspot.ro/2012/08/a-big-data-trifecta-storm-kafka-and.html >>> >>> Hope that these help. >>> Best regards, >>> Florin >>> >>> >>> On Wed, Sep 10, 2014 at 6:25 AM, Vikas Agarwal <vi...@infoobjects.com> >>> wrote: >>> >>>> Kafka still contains the logs and they would be there upto the >>>> configured time of log retention period. Check server.properties of kafka >>>> and update the log retention period to 5 min and restart kafka and when >>>> kafka stablizes, shut down it and restart the it with original value of log >>>> retentions period property. >>>> >>>> >>>> On Tue, Sep 9, 2014 at 10:40 PM, Kushan Maskey < >>>> kushan.mas...@mmillerassociates.com> wrote: >>>> >>>>> I hope it did because I dont see the multiple tuple failure error. But >>>>> I see another issue. >>>>> I have stopped loading the batch process that sends messages to >>>>> Kafka. I killed my topology and then restarted again. I still see that >>>>> message are been loaded into Cassandra. Does that mean that storm still >>>>> trying to process the failed messages? Is htere a way to flush the old >>>>> message out from storm so I can fresh start it? >>>>> >>>>> -- >>>>> Kushan Maskey >>>>> 817.403.7500 >>>>> >>>>> On Tue, Sep 9, 2014 at 10:09 AM, Naresh Kosgi <nareshko...@gmail.com> >>>>> wrote: >>>>> >>>>>> Yes, that is what I was talking about. Hopefully that fixes it. >>>>>> >>>>>> On Tue, Sep 9, 2014 at 10:59 AM, Kushan Maskey < >>>>>> kushan.mas...@mmillerassociates.com> wrote: >>>>>> >>>>>>> Just realized that the tuple timeout you are talking about is the >>>>>>> "topology.message.timeout.secs" >>>>>>> which was set to 30 sec and now I made to to 120. >>>>>>> >>>>>>> -- >>>>>>> Kushan Maskey >>>>>>> 817.403.7500 >>>>>>> >>>>>>> On Tue, Sep 9, 2014 at 9:43 AM, Kushan Maskey < >>>>>>> kushan.mas...@mmillerassociates.com> wrote: >>>>>>> >>>>>>>> >>>>>>>> Thanks and apologies, I should I mentioned that in my question >>>>>>>> earlier. I am using storm 0.9.2 and using the inbuilt KafkaSpout. I do >>>>>>>> not >>>>>>>> implement any failure my self. Do I need to create my own custom >>>>>>>> KafkaSpout? >>>>>>>> >>>>>>>> I have not set timeout for tuples. In fact I dont know where to set >>>>>>>> that. Here is my storm config if that is where I need to set the time >>>>>>>> out. >>>>>>>> But non of them say anything about tuple timeout. >>>>>>>> >>>>>>>> dev.zookeeper.path/tmp/dev-storm-zookeeperdrpc.childopts-Xmx768m >>>>>>>> drpc.invocations.port3773drpc.port3772drpc.queue.size128 >>>>>>>> drpc.request.timeout.secs600drpc.worker.threads64java.library.path >>>>>>>> /usr/local/lib:/opt/local/lib:/usr/liblogviewer.appender.nameA1 >>>>>>>> logviewer.childopts-Xmx128mlogviewer.port8000nimbus.childopts >>>>>>>> -Xmx1024mnimbus.cleanup.inbox.freq.secs600 >>>>>>>> nimbus.file.copy.expiration.secs600nimbus.hostnmcxstrmd001 >>>>>>>> nimbus.inbox.jar.expiration.secs3600nimbus.monitor.freq.secs10 >>>>>>>> nimbus.reassigntruenimbus.supervisor.timeout.secs60 >>>>>>>> nimbus.task.launch.secs120nimbus.task.timeout.secs30 >>>>>>>> nimbus.thrift.max_buffer_size1048576nimbus.thrift.port6627 >>>>>>>> nimbus.topology.validator >>>>>>>> backtype.storm.nimbus.DefaultTopologyValidatorstorm.cluster.mode >>>>>>>> distributedstorm.local.dir/data/disk00/storm/localdir >>>>>>>> storm.local.mode.zmqfalsestorm.messaging.netty.buffer_size5242880 >>>>>>>> storm.messaging.netty.client_worker_threads1 >>>>>>>> storm.messaging.netty.flush.check.interval.ms10 >>>>>>>> storm.messaging.netty.max_retries30 >>>>>>>> storm.messaging.netty.max_wait_ms1000 >>>>>>>> storm.messaging.netty.min_wait_ms100 >>>>>>>> storm.messaging.netty.server_worker_threads1 >>>>>>>> storm.messaging.netty.transfer.batch.size262144 >>>>>>>> storm.messaging.transportbacktype.storm.messaging.netty.Context >>>>>>>> storm.thrift.transport >>>>>>>> backtype.storm.security.auth.SimpleTransportPlugin >>>>>>>> storm.zookeeper.connection.timeout15000storm.zookeeper.port2181 >>>>>>>> storm.zookeeper.retry.interval1000 >>>>>>>> storm.zookeeper.retry.intervalceiling.millis30000 >>>>>>>> storm.zookeeper.retry.times5storm.zookeeper.root/storm >>>>>>>> storm.zookeeper.serversnmcxstrmd001storm.zookeeper.session.timeout >>>>>>>> 20000supervisor.childopts-Xmx256msupervisor.enabletrue >>>>>>>> supervisor.heartbeat.frequency.secs5 >>>>>>>> supervisor.monitor.frequency.secs3supervisor.slots.ports >>>>>>>> 6700,6701,6702,6703,6704,6705,6706,6707,6708,6709,6710,6711,6712,6713,6714,6715,6716,6717,6718,6719,6720,6721,6722,6723,6724,6725,6726,6727,6728 >>>>>>>> supervisor.worker.start.timeout.secs120 >>>>>>>> supervisor.worker.timeout.secs30task.heartbeat.frequency.secs3 >>>>>>>> task.refresh.poll.secs10topology.acker.executors >>>>>>>> topology.builtin.metrics.bucket.size.secs60topology.debugfalse >>>>>>>> topology.disruptor.wait.strategy >>>>>>>> com.lmax.disruptor.BlockingWaitStrategy >>>>>>>> topology.enable.message.timeoutstrue >>>>>>>> topology.error.throttle.interval.secs10 >>>>>>>> topology.executor.receive.buffer.size1024 >>>>>>>> topology.executor.send.buffer.size1024 >>>>>>>> topology.fall.back.on.java.serializationtruetopology.kryo.factory >>>>>>>> backtype.storm.serialization.DefaultKryoFactory >>>>>>>> topology.max.error.report.per.interval5topology.max.spout.pending >>>>>>>> topology.max.task.parallelismtopology.message.timeout.secs30 >>>>>>>> topology.multilang.serializer >>>>>>>> backtype.storm.multilang.JsonSerializer >>>>>>>> topology.receiver.buffer.size8 >>>>>>>> topology.skip.missing.kryo.registrationsfalse >>>>>>>> topology.sleep.spout.wait.strategy.time.ms1 >>>>>>>> topology.spout.wait.strategy >>>>>>>> backtype.storm.spout.SleepSpoutWaitStrategy >>>>>>>> topology.state.synchronization.timeout.secs60 >>>>>>>> topology.stats.sample.rate0.05topology.tasks >>>>>>>> topology.tick.tuple.freq.secstopology.transfer.buffer.size1024 >>>>>>>> topology.trident.batch.emit.interval.millis500 >>>>>>>> topology.tuple.serializer >>>>>>>> backtype.storm.serialization.types.ListDelegateSerializer >>>>>>>> topology.worker.childoptstopology.worker.receiver.thread.count1 >>>>>>>> topology.worker.shared.thread.pool.size4topology.workers1 >>>>>>>> transactional.zookeeper.porttransactional.zookeeper.root >>>>>>>> /transactionaltransactional.zookeeper.serversui.childopts-Xmx768m >>>>>>>> ui.port8080worker.childopts-Xmx768mworker.heartbeat.frequency.secs1 >>>>>>>> zmq.hwm0zmq.linger.millis5000zmq.threads1 >>>>>>>> >>>>>>>> -- >>>>>>>> Kushan Maskey >>>>>>>> 817.403.7500 >>>>>>>> >>>>>>>> On Tue, Sep 9, 2014 at 9:23 AM, Naresh Kosgi <nareshko...@gmail.com >>>>>>>> > wrote: >>>>>>>> >>>>>>>>> What is your timeout setting for failing a tuple? Its hard to say >>>>>>>>> what is causing this issue without more information but the default >>>>>>>>> timeout >>>>>>>>> on tuples is 30 seconds and for some tuples it maybe taking longer >>>>>>>>> then 30 >>>>>>>>> seconds to process. Try increasing the timeout to 1 or 2 min? >>>>>>>>> >>>>>>>>> >>>>>>>>> "Why the ack/failure ack counts are so much higher than the >>>>>>>>> number of records I am trying to process?" >>>>>>>>> >>>>>>>>> how are you implementing the fail() method in your spout? on >>>>>>>>> failure, this method is called by the framework. It could be you are >>>>>>>>> reemitting the tuple to be processed and its failing again. This >>>>>>>>> could be >>>>>>>>> a reason why u have more failed tuples then records >>>>>>>>> >>>>>>>>> On Tue, Sep 9, 2014 at 10:06 AM, Kushan Maskey < >>>>>>>>> kushan.mas...@mmillerassociates.com> wrote: >>>>>>>>> >>>>>>>>>> I have a batch job where I process more than 100k records from >>>>>>>>>> file. I post all these message to Kafka topic. I have a topology >>>>>>>>>> that goes >>>>>>>>>> and fetches these records and dumps them into Cassandra database and >>>>>>>>>> also >>>>>>>>>> updates solr and couch databases. >>>>>>>>>> >>>>>>>>>> I have been trying to run the process multiple times to make sure >>>>>>>>>> that the process completes successfully. It does run successfully >>>>>>>>>> sometimes >>>>>>>>>> and sometimes it errors out saying the following error that says >>>>>>>>>> "Too many >>>>>>>>>> tuple failures" in the storm UI. >>>>>>>>>> >>>>>>>>>> java.lang.RuntimeException: java.lang.RuntimeException: Too many >>>>>>>>>> tuple failures at >>>>>>>>>> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) >>>>>>>>>> at >>>>>>>>>> backtype.storm.utils.DisruptorQueue.consumeBatch(DisruptorQueue.java:87) >>>>>>>>>> at backtype.storm.disruptor$consume_batch.invoke(disruptor.clj:76) at >>>>>>>>>> backtype.storm.daemon.executor$fn__5573$fn__5588$fn__5617.invoke(executor.clj:540) >>>>>>>>>> at backtype.storm.util$async_loop$fn__457.invoke(util.clj:431) at >>>>>>>>>> clojure.lang.AFn.run(AFn.java:24) at >>>>>>>>>> java.lang.Thread.run(Thread.java:744) >>>>>>>>>> Caused by: java.lang.RuntimeException: Too many tuple failures at >>>>>>>>>> storm.kafka.PartitionManager.fail(PartitionManager.java:210) at >>>>>>>>>> storm.kafka.KafkaSpout.fail(KafkaSpout.java:174) at >>>>>>>>>> backtype.storm.daemon.executor$fail_spout_msg.invoke(executor.clj:370) >>>>>>>>>> at >>>>>>>>>> backtype.storm.daemon.executor$fn$reify__5576.expire(executor.clj:430) >>>>>>>>>> at >>>>>>>>>> backtype.storm.utils.RotatingMap.rotate(RotatingMap.java:73) at >>>>>>>>>> backtype.storm.daemon.executor$fn__5573$tuple_action_fn__5579.invoke(executor.clj:435) >>>>>>>>>> at >>>>>>>>>> backtype.storm.daemon.executor$mk_task_receiver$fn__5564.invoke(executor.clj:402) >>>>>>>>>> at >>>>>>>>>> backtype.storm.disruptor$clojure_handler$reify__745.onEvent(disruptor.clj:58) >>>>>>>>>> at >>>>>>>>>> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125) >>>>>>>>>> ... 6 more >>>>>>>>>> >>>>>>>>>> once this failure happens, i also see that the number of records >>>>>>>>>> stored in Cassandra database if way much higher than the actual batch >>>>>>>>>> records count. How do I handle this error? Also when there is any >>>>>>>>>> kind of >>>>>>>>>> error/exception occurs then the ack failed values goes up form 0 to >>>>>>>>>> thousands. Why the ack/failure ack counts are so much higher thank >>>>>>>>>> the >>>>>>>>>> number of records I am trying to process? >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -- >>>>>>>>>> Kushan Maskey >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>>> >>>> -- >>>> Regards, >>>> Vikas Agarwal >>>> 91 – 9928301411 >>>> >>>> InfoObjects, Inc. >>>> Execution Matters >>>> http://www.infoobjects.com >>>> 2041 Mission College Boulevard, #280 >>>> Santa Clara, CA 95054 >>>> +1 (408) 988-2000 Work >>>> +1 (408) 716-2726 Fax >>>> >>>> >>> >> >