Thanks Naresh.

But i am still not sure what should be a proper value for the numAckers()?
I set it to 1000 and that does not seem to be improve? I keep getting
thousands of failed acks.

--
Kushan Maskey
817.403.7500

On Wed, Sep 10, 2014 at 12:53 PM, Naresh Kosgi <nareshko...@gmail.com>
wrote:

> u can set it using the config object.  The method I believe is called
> num_ackers(int)
>
> On Wed, Sep 10, 2014 at 11:53 AM, Kushan Maskey <
> kushan.mas...@mmillerassociates.com> wrote:
>
>> Thanks for the responses.
>>
>> Based on the document
>> https://storm.incubator.apache.org/documentation/Running-topologies-on-a-production-cluster.html,
>> I am able to set the TOPOLOGY_MAX_SPOUT_PENDING, but how can you set the 
>> TOPOLOGY_AKERS?
>> I see TOPOLOGY_AKERS_EXECUTORS. Does that mean the same? How do you
>> determine the TOPOLOGY_AKERS for a batch load of more than 100K data at 20
>> messages per seconds? Can any of you give me an idea on this?
>>
>>
>> --
>> Kushan Maskey
>> 817.403.7500
>>
>> On Wed, Sep 10, 2014 at 4:18 AM, Spico Florin <spicoflo...@gmail.com>
>> wrote:
>>
>>> Hello!
>>>   I'll consider to slow down the spout. Set up a value
>>> for Config.TOPOLOGY_MAX_SPOUT_PENDING. It can happen that Cassanda, Solr
>>> and CouchDatabase do not cope with the requency that you emit your messages
>>> and thus you have backpressure.
>>>   Also, the spout should emit the messages anchored (( not sure here see
>>> my post about this))in order that the set up of
>>>  Config.TOPOLOGY_MAX_SPOUT_PENDING to take effect.
>>> Check this blog:
>>> http://brianoneill.blogspot.ro/2012/08/a-big-data-trifecta-storm-kafka-and.html
>>>
>>> Hope that these help.
>>> Best regards,
>>>  Florin
>>>
>>>
>>> On Wed, Sep 10, 2014 at 6:25 AM, Vikas Agarwal <vi...@infoobjects.com>
>>> wrote:
>>>
>>>> Kafka still contains the logs and they would be there upto the
>>>> configured time of log retention period. Check server.properties of kafka
>>>> and update the log retention period to 5 min and restart kafka and when
>>>> kafka stablizes, shut down it and restart the it with original value of log
>>>> retentions period property.
>>>>
>>>>
>>>> On Tue, Sep 9, 2014 at 10:40 PM, Kushan Maskey <
>>>> kushan.mas...@mmillerassociates.com> wrote:
>>>>
>>>>> I hope it did because I dont see the multiple tuple failure error. But
>>>>> I see another issue.
>>>>> I have stopped loading the batch process that sends messages to
>>>>>  Kafka. I killed my topology and then restarted again. I still see that
>>>>> message are been loaded into Cassandra. Does that mean that storm still
>>>>> trying to process the failed messages? Is htere a way to flush the old
>>>>> message out from storm so I can fresh start it?
>>>>>
>>>>> --
>>>>> Kushan Maskey
>>>>> 817.403.7500
>>>>>
>>>>> On Tue, Sep 9, 2014 at 10:09 AM, Naresh Kosgi <nareshko...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Yes, that is what I was talking about.  Hopefully that fixes it.
>>>>>>
>>>>>> On Tue, Sep 9, 2014 at 10:59 AM, Kushan Maskey <
>>>>>> kushan.mas...@mmillerassociates.com> wrote:
>>>>>>
>>>>>>> Just realized that the tuple timeout you are talking about is the 
>>>>>>> "topology.message.timeout.secs"
>>>>>>> which was set to 30 sec and now I made to to 120.
>>>>>>>
>>>>>>> --
>>>>>>> Kushan Maskey
>>>>>>> 817.403.7500
>>>>>>>
>>>>>>> On Tue, Sep 9, 2014 at 9:43 AM, Kushan Maskey <
>>>>>>> kushan.mas...@mmillerassociates.com> wrote:
>>>>>>>
>>>>>>>>
>>>>>>>> Thanks and apologies, I should I mentioned that in my question
>>>>>>>> earlier. I am using storm 0.9.2 and using the inbuilt KafkaSpout. I do 
>>>>>>>> not
>>>>>>>> implement any failure my self. Do I need to create my own custom
>>>>>>>> KafkaSpout?
>>>>>>>>
>>>>>>>> I have not set timeout for tuples. In fact I dont know where to set
>>>>>>>> that. Here is my storm config if that is where I need to set the time 
>>>>>>>> out.
>>>>>>>> But non of them say anything about tuple timeout.
>>>>>>>>
>>>>>>>> dev.zookeeper.path/tmp/dev-storm-zookeeperdrpc.childopts-Xmx768m
>>>>>>>> drpc.invocations.port3773drpc.port3772drpc.queue.size128
>>>>>>>> drpc.request.timeout.secs600drpc.worker.threads64java.library.path
>>>>>>>> /usr/local/lib:/opt/local/lib:/usr/liblogviewer.appender.nameA1
>>>>>>>> logviewer.childopts-Xmx128mlogviewer.port8000nimbus.childopts
>>>>>>>> -Xmx1024mnimbus.cleanup.inbox.freq.secs600
>>>>>>>> nimbus.file.copy.expiration.secs600nimbus.hostnmcxstrmd001
>>>>>>>> nimbus.inbox.jar.expiration.secs3600nimbus.monitor.freq.secs10
>>>>>>>> nimbus.reassigntruenimbus.supervisor.timeout.secs60
>>>>>>>> nimbus.task.launch.secs120nimbus.task.timeout.secs30
>>>>>>>> nimbus.thrift.max_buffer_size1048576nimbus.thrift.port6627
>>>>>>>> nimbus.topology.validator
>>>>>>>> backtype.storm.nimbus.DefaultTopologyValidatorstorm.cluster.mode
>>>>>>>> distributedstorm.local.dir/data/disk00/storm/localdir
>>>>>>>> storm.local.mode.zmqfalsestorm.messaging.netty.buffer_size5242880
>>>>>>>> storm.messaging.netty.client_worker_threads1
>>>>>>>> storm.messaging.netty.flush.check.interval.ms10
>>>>>>>> storm.messaging.netty.max_retries30
>>>>>>>> storm.messaging.netty.max_wait_ms1000
>>>>>>>> storm.messaging.netty.min_wait_ms100
>>>>>>>> storm.messaging.netty.server_worker_threads1
>>>>>>>> storm.messaging.netty.transfer.batch.size262144
>>>>>>>> storm.messaging.transportbacktype.storm.messaging.netty.Context
>>>>>>>> storm.thrift.transport
>>>>>>>> backtype.storm.security.auth.SimpleTransportPlugin
>>>>>>>> storm.zookeeper.connection.timeout15000storm.zookeeper.port2181
>>>>>>>> storm.zookeeper.retry.interval1000
>>>>>>>> storm.zookeeper.retry.intervalceiling.millis30000
>>>>>>>> storm.zookeeper.retry.times5storm.zookeeper.root/storm
>>>>>>>> storm.zookeeper.serversnmcxstrmd001storm.zookeeper.session.timeout
>>>>>>>> 20000supervisor.childopts-Xmx256msupervisor.enabletrue
>>>>>>>> supervisor.heartbeat.frequency.secs5
>>>>>>>> supervisor.monitor.frequency.secs3supervisor.slots.ports
>>>>>>>> 6700,6701,6702,6703,6704,6705,6706,6707,6708,6709,6710,6711,6712,6713,6714,6715,6716,6717,6718,6719,6720,6721,6722,6723,6724,6725,6726,6727,6728
>>>>>>>> supervisor.worker.start.timeout.secs120
>>>>>>>> supervisor.worker.timeout.secs30task.heartbeat.frequency.secs3
>>>>>>>> task.refresh.poll.secs10topology.acker.executors
>>>>>>>> topology.builtin.metrics.bucket.size.secs60topology.debugfalse
>>>>>>>> topology.disruptor.wait.strategy
>>>>>>>> com.lmax.disruptor.BlockingWaitStrategy
>>>>>>>> topology.enable.message.timeoutstrue
>>>>>>>> topology.error.throttle.interval.secs10
>>>>>>>> topology.executor.receive.buffer.size1024
>>>>>>>> topology.executor.send.buffer.size1024
>>>>>>>> topology.fall.back.on.java.serializationtruetopology.kryo.factory
>>>>>>>> backtype.storm.serialization.DefaultKryoFactory
>>>>>>>> topology.max.error.report.per.interval5topology.max.spout.pending
>>>>>>>> topology.max.task.parallelismtopology.message.timeout.secs30
>>>>>>>> topology.multilang.serializer
>>>>>>>> backtype.storm.multilang.JsonSerializer
>>>>>>>> topology.receiver.buffer.size8
>>>>>>>> topology.skip.missing.kryo.registrationsfalse
>>>>>>>> topology.sleep.spout.wait.strategy.time.ms1
>>>>>>>> topology.spout.wait.strategy
>>>>>>>> backtype.storm.spout.SleepSpoutWaitStrategy
>>>>>>>> topology.state.synchronization.timeout.secs60
>>>>>>>> topology.stats.sample.rate0.05topology.tasks
>>>>>>>> topology.tick.tuple.freq.secstopology.transfer.buffer.size1024
>>>>>>>> topology.trident.batch.emit.interval.millis500
>>>>>>>> topology.tuple.serializer
>>>>>>>> backtype.storm.serialization.types.ListDelegateSerializer
>>>>>>>> topology.worker.childoptstopology.worker.receiver.thread.count1
>>>>>>>> topology.worker.shared.thread.pool.size4topology.workers1
>>>>>>>> transactional.zookeeper.porttransactional.zookeeper.root
>>>>>>>> /transactionaltransactional.zookeeper.serversui.childopts-Xmx768m
>>>>>>>> ui.port8080worker.childopts-Xmx768mworker.heartbeat.frequency.secs1
>>>>>>>> zmq.hwm0zmq.linger.millis5000zmq.threads1
>>>>>>>>
>>>>>>>> --
>>>>>>>> Kushan Maskey
>>>>>>>> 817.403.7500
>>>>>>>>
>>>>>>>> On Tue, Sep 9, 2014 at 9:23 AM, Naresh Kosgi <nareshko...@gmail.com
>>>>>>>> > wrote:
>>>>>>>>
>>>>>>>>> What is your timeout setting for failing a tuple? Its hard to say
>>>>>>>>> what is causing this issue without more information but the default 
>>>>>>>>> timeout
>>>>>>>>> on tuples is 30 seconds and for some tuples it maybe taking longer 
>>>>>>>>> then 30
>>>>>>>>> seconds to process.  Try increasing the timeout to 1 or 2 min?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> "Why the ack/failure ack counts are so much higher than the
>>>>>>>>> number of records I am trying to process?"
>>>>>>>>>
>>>>>>>>> how are you implementing the fail() method in your spout?  on
>>>>>>>>> failure, this method is called by the framework.  It could be you are
>>>>>>>>> reemitting the tuple to be processed and its failing again.  This 
>>>>>>>>> could be
>>>>>>>>> a reason why u have more failed tuples then records
>>>>>>>>>
>>>>>>>>> On Tue, Sep 9, 2014 at 10:06 AM, Kushan Maskey <
>>>>>>>>> kushan.mas...@mmillerassociates.com> wrote:
>>>>>>>>>
>>>>>>>>>> I have a batch job where I process more than 100k records from
>>>>>>>>>> file. I post all these message to Kafka topic. I have a topology 
>>>>>>>>>> that goes
>>>>>>>>>> and fetches these records and dumps them into Cassandra database and 
>>>>>>>>>> also
>>>>>>>>>> updates solr and couch databases.
>>>>>>>>>>
>>>>>>>>>> I have been trying to run the process multiple times to make sure
>>>>>>>>>> that the process completes successfully. It does run successfully 
>>>>>>>>>> sometimes
>>>>>>>>>> and sometimes it errors out saying the following error that says 
>>>>>>>>>> "Too many
>>>>>>>>>> tuple failures" in the storm UI.
>>>>>>>>>>
>>>>>>>>>> java.lang.RuntimeException: java.lang.RuntimeException: Too many
>>>>>>>>>> tuple failures at
>>>>>>>>>> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128)
>>>>>>>>>> at 
>>>>>>>>>> backtype.storm.utils.DisruptorQueue.consumeBatch(DisruptorQueue.java:87)
>>>>>>>>>> at backtype.storm.disruptor$consume_batch.invoke(disruptor.clj:76) at
>>>>>>>>>> backtype.storm.daemon.executor$fn__5573$fn__5588$fn__5617.invoke(executor.clj:540)
>>>>>>>>>> at backtype.storm.util$async_loop$fn__457.invoke(util.clj:431) at
>>>>>>>>>> clojure.lang.AFn.run(AFn.java:24) at 
>>>>>>>>>> java.lang.Thread.run(Thread.java:744)
>>>>>>>>>> Caused by: java.lang.RuntimeException: Too many tuple failures at
>>>>>>>>>> storm.kafka.PartitionManager.fail(PartitionManager.java:210) at
>>>>>>>>>> storm.kafka.KafkaSpout.fail(KafkaSpout.java:174) at
>>>>>>>>>> backtype.storm.daemon.executor$fail_spout_msg.invoke(executor.clj:370)
>>>>>>>>>>  at
>>>>>>>>>> backtype.storm.daemon.executor$fn$reify__5576.expire(executor.clj:430)
>>>>>>>>>>  at
>>>>>>>>>> backtype.storm.utils.RotatingMap.rotate(RotatingMap.java:73) at
>>>>>>>>>> backtype.storm.daemon.executor$fn__5573$tuple_action_fn__5579.invoke(executor.clj:435)
>>>>>>>>>> at
>>>>>>>>>> backtype.storm.daemon.executor$mk_task_receiver$fn__5564.invoke(executor.clj:402)
>>>>>>>>>> at
>>>>>>>>>> backtype.storm.disruptor$clojure_handler$reify__745.onEvent(disruptor.clj:58)
>>>>>>>>>> at
>>>>>>>>>> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125)
>>>>>>>>>> ... 6 more
>>>>>>>>>>
>>>>>>>>>> once this failure happens, i also see that the number of records
>>>>>>>>>> stored in Cassandra database if way much higher than the actual batch
>>>>>>>>>> records count. How do I handle this error? Also when there is any 
>>>>>>>>>> kind of
>>>>>>>>>> error/exception occurs then the ack failed values goes up form 0 to
>>>>>>>>>> thousands. Why the ack/failure ack counts are so much higher thank 
>>>>>>>>>> the
>>>>>>>>>> number of records I am trying to process?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> Kushan Maskey
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Regards,
>>>> Vikas Agarwal
>>>> 91 – 9928301411
>>>>
>>>> InfoObjects, Inc.
>>>> Execution Matters
>>>> http://www.infoobjects.com
>>>> 2041 Mission College Boulevard, #280
>>>> Santa Clara, CA 95054
>>>> +1 (408) 988-2000 Work
>>>> +1 (408) 716-2726 Fax
>>>>
>>>>
>>>
>>
>

Reply via email to