Hi Nathan, Thanks a lot for your patient explanation. Now I’m clear.
Best! From: Nathan Leung <[email protected]<mailto:[email protected]>> Reply-To: "[email protected]<mailto:[email protected]>" <[email protected]<mailto:[email protected]>> Date: Wednesday, July 30, 2014 at 12:09 PM To: user <[email protected]<mailto:[email protected]>> Subject: Re: why complete latency and failure rate is so high of my spout. Your ack sequence is right, but the timeout is not just the simple sum of these times. Consider a simple topology similar to yours: spout s1 -> bolt b1, with 1 s1 task and 1 b1 task s1 emits very quickly (quickly enough that the time taken doesn't matter for the example) b1 takes 100 ms to process If max spout pending is 1000, then when the spout does an emit, it will have up to 1000 messages that it must wait for. If we emit 1000 message right away, then the spout task will reach max spout pending, the 1000th message will have to wait for all 999 messages in front of it to be processed before it gets executed by b1. Therefore its complete latency will be nearly 100s (999 messages * 100ms / message in b1). If I keep emitting at the spout, then all subsequent messages will also have nearly 100s complete latency. You can see in this example that if you lower the max spout pending to 100, then message 100 would have expected complete latency of 10s, and all subsequent messages would be the same. You do have to be careful because if topology.max.spout.pending is too low you can cause your topology to be starved for data. I think the general recommendation is to set it to some initial value like 1024 and keep increasing it until you stop gaining throughput. For your topology where the bolts seem to have high processing times you could probably start with a lower value than 1024. On Wed, Jul 30, 2014 at 1:34 PM, Wei, Xin <[email protected]<mailto:[email protected]>> wrote: Hi Nathan, Thanks a lot for your explanation. There are still something I’m not very clear: 1. "The topology.max.spout.pending configures how many messages can be un-acked from each spout before it stops sending messages.” - how a message in the spout would be got acked? In my case, the topology is like this: ActiveMQ — > JMS_SPOUt — > MESSAGE-FILTER_BOLT —> AGGREGATOR_BOLT — > OFFERGENERATOR_BOLT There is no cycle there. According to my understanding , A message in the spout got acked only when : 1). jms_spout get message from activeMQ and emit it o message_filter — assume spending time t1 2). MessageFilter_bolt receive it and ack it then emit to aggregator — assume time t2 3). Aggregator receive it and acked it — assume time t3 4). Offergenerator bolt receive it and ack it — assume time t4 5). Finally the jms_spout would ack this message — assume time t5 Only : t1+t2+t3+t4+t5 <= message timeout secs, then this message could be counted as successful. Is my understanding correct or not ? 2. "he complete latency seems kind of high, but maybe setting max spout pending to a lower value would help reduce it.” Why set the max spout pending to lower value would reduce complete latency? I’m thinking increase the value would reduce the latency. In our code, I batch read 5000 messages one time, so 2 spout would read 10000 messages one time. Assume I set the max spout pending to 1000, if the unacked messages is 2000, then the spout would stop sending messages, the left 8000 messages would be just idle there and wait for sending. But if the max spout is 10000, then it will still keep sending. Will not 10000 max pending be more efficient? Or my understanding is wrong, I should think it like this way: since already 2000 messages got un – acked, but you still keep sending messages, giving a analogy, the traffic is already jammed, but you still keep the meter off and let all cars flow into the crowded highway. The better way is to turn on the meter, let the car wait a little bit and enter the highway one by one. A little bit too long, thanks a lot for your patience to read it. From: Nathan Leung <[email protected]<mailto:[email protected]>> Reply-To: "[email protected]<mailto:[email protected]>" <[email protected]<mailto:[email protected]>> Date: Wednesday, July 30, 2014 at 9:44 AM To: user <[email protected]<mailto:[email protected]>> Subject: Re: why complete latency and failure rate is so high of my spout. The topology.max.spout.pending configures how many messages can be un-acked from each spout before it stops sending messages. So in your example, each spout task can have 10 thousand messages waiting to be acked before it throttle itself and stops emitting. Of course if some of those messages are acked, then it will be able to emit more messages. This is important because you do not want to have too much data pending. If you have a lot of data pending, then it will increase the amount of time that it takes to process the message because the complete latency is counted starting when you emit the tuple even if it's just waiting in the spouts output queue. If the message times out without getting acked at the spout, then it will get counted as a failure, which is why you were seeing so many failures. Changing the timeout to 90s probably also played a big role in reducing your failure count. The complete latency seems kind of high, but maybe setting max spout pending to a lower value would help reduce it. On Wed, Jul 30, 2014 at 12:38 PM, Wei, Xin <[email protected]<mailto:[email protected]>> wrote: Hi There, Yesterday, I changed some configurations of storm settings, right now , the spout failure rate dropped to 0. As shown below: Topology stats Window Emitted Transferred Complete latency (ms) Acked Failed 10m 0s<http://pppdc9prd470.corp.intuit.net:8080/topology/nearline-3-1406737061?window=600> 8766 8766 43077.391 5290 0 3h 0m 0s<http://pppdc9prd470.corp.intuit.net:8080/topology/nearline-3-1406737061?window=10800> 8766 8766 43077.391 5290 0 1d 0h 0m 0s<http://pppdc9prd470.corp.intuit.net:8080/topology/nearline-3-1406737061?window=86400> 8766 8766 43077.391 5290 0 All time<http://pppdc9prd470.corp.intuit.net:8080/topology/nearline-3-1406737061?window=%3Aall-time> 8766 8766 43077.391 5290 0 Spouts (All time) Id Executors Tasks Emitted Transferred Complete latency (ms) Acked Failed Last error JMS_QUEUE_SPOUT<http://pppdc9prd470.corp.intuit.net:8080/topology/nearline-3-1406737061/component/JMS_QUEUE_SPOUT> 2 2 5290 5290 43077.391 5290 0 Bolts (All time) Id Executors Tasks Emitted Transferred Capacity (last 10m) Execute latency (ms) Executed Process latency (ms) Acked Failed Last error AGGREGATOR_BOLT<http://pppdc9prd470.corp.intuit.net:8080/topology/nearline-3-1406737061/component/AGGREGATOR_BOLT> 8 8 1738 1738 0.080 83.264 1738 81.243 1738 0 MESSAGEFILTER_BOLT<http://pppdc9prd470.corp.intuit.net:8080/topology/nearline-3-1406737061/component/MESSAGEFILTER_BOLT> 8 8 1738 1738 0.091 29.833 5290 24.918 5290 0 OFFER_GENERATOR_BOLT<http://pppdc9prd470.corp.intuit.net:8080/topology/nearline-3-1406737061/component/OFFER_GENERATOR_BOLT> 8 8 0 0 0.031 25.993 1738 24.296 1738 0 The topology configuration is listed below: Topology Configuration Key Value dev.zookeeper.path /tmp/dev-storm-zookeeper drpc.childopts -Xmx768m drpc.invocations.port 3773 drpc.port 3772 drpc.queue.size 128 drpc.request.timeout.secs 600 drpc.worker.threads 64 java.library.path /usr/local/lib logviewer.appender.name<http://logviewer.appender.name> A1 logviewer.childopts -Xmx128m logviewer.port 8000 nimbus.childopts -Xmx1024m -Djava.net.preferIPv4Stack=true nimbus.cleanup.inbox.freq.secs 600 nimbus.file.copy.expiration.secs 600 nimbus.host zookeeper nimbus.inbox.jar.expiration.secs 3600 nimbus.monitor.freq.secs 10 nimbus.reassign true nimbus.supervisor.timeout.secs 60 nimbus.task.launch.secs 120 nimbus.task.timeout.secs 30 nimbus.thrift.port 6627 nimbus.topology.validator backtype.storm.nimbus.DefaultTopologyValidator storm.cluster.mode distributed storm.id<http://storm.id> nearline-3-1406737061 storm.local.dir /app_local/storm storm.local.mode.zmq false storm.messaging.netty.buffer_size 5242880 storm.messaging.netty.client_worker_threads 1 storm.messaging.netty.max_retries 30 storm.messaging.netty.max_wait_ms 1000 storm.messaging.netty.min_wait_ms 100 storm.messaging.netty.server_worker_threads 1 storm.messaging.transport backtype.storm.messaging.zmq storm.thrift.transport backtype.storm.security.auth.SimpleTransportPlugin storm.zookeeper.connection.timeout 15000 storm.zookeeper.port 2181 storm.zookeeper.retry.interval 1000 storm.zookeeper.retry.intervalceiling.millis 30000 storm.zookeeper.retry.times 5 storm.zookeeper.root /storm storm.zookeeper.servers ["zookeeper"] storm.zookeeper.session.timeout 20000 supervisor.childopts -Xmx256m -Djava.net.preferIPv4Stack=true supervisor.enable true supervisor.heartbeat.frequency.secs 5 supervisor.monitor.frequency.secs 3 supervisor.slots.ports [6700 6701 6702 6703] supervisor.worker.start.timeout.secs 120 supervisor.worker.timeout.secs 30 task.heartbeat.frequency.secs 3 task.refresh.poll.secs 10 topology.acker.executors 4 topology.builtin.metrics.bucket.size.secs 60 topology.debug false topology.disruptor.wait.strategy com.lmax.disruptor.BlockingWaitStrategy topology.enable.message.timeouts true topology.error.throttle.interval.secs 10 topology.executor.receive.buffer.size 16384 topology.executor.send.buffer.size 16384 topology.fall.back.on.java.serialization true topology.kryo.decorators [] topology.kryo.factory backtype.storm.serialization.DefaultKryoFactory topology.kryo.register topology.max.error.report.per.interval 5 topology.max.spout.pending 10000 topology.max.task.parallelism topology.message.timeout.secs 90 topology.name<http://topology.name> nearline topology.optimize true topology.receiver.buffer.size 8 topology.skip.missing.kryo.registrations false topology.sleep.spout.wait.strategy.time.ms<http://topology.sleep.spout.wait.strategy.time.ms> 1 topology.spout.wait.strategy backtype.storm.spout.SleepSpoutWaitStrategy topology.state.synchronization.timeout.secs 60 topology.stats.sample.rate 1 topology.tasks topology.tick.tuple.freq.secs topology.transfer.buffer.size 32 topology.trident.batch.emit.interval.millis 500 topology.tuple.serializer backtype.storm.serialization.types.ListDelegateSerializer topology.worker.childopts topology.worker.shared.thread.pool.size 4 topology.workers 4 transactional.zookeeper.port transactional.zookeeper.root /transactional transactional.zookeeper.servers ui.childopts -Xmx768m ui.port 8080 worker.childopts -Xmx768m -Djava.net.preferIPv4Stack=false -DNEARLINE_DATA_ENV=dev -DNEARLINE_APP_ENV=dev -DNEARLINE_QUEUES_ENV=dev -Dauthfilter.appcred.default.encrypt.file=/home/xwei/FP_AppCred_Encrypt.txt -Dauthfilter.appcred.default.passphrase.file=/home/xwei/FP_AppCred_Passphrase.txt worker.heartbeat.frequency.secs 1 zmq.hwm 0 zmq.linger.millis 5000 zmq.threads 1 The settiings I changed: 1. topology.acker.executors I adjust it to 4. 2. Topology.max.spout.pending change it to 10000 3. topology.message.timeout.secs change it from 30 to 90 secs I think the NO 2 topology.max.spout.pending is the critical factor which make big differences. Can anybody tell me what that setting does? Thanks a lot for help.
