Hi Nathan,

Thanks a lot for  your patient explanation.  Now I’m clear.

Best!

From: Nathan Leung <[email protected]<mailto:[email protected]>>
Reply-To: 
"[email protected]<mailto:[email protected]>" 
<[email protected]<mailto:[email protected]>>
Date: Wednesday, July 30, 2014 at 12:09 PM
To: user 
<[email protected]<mailto:[email protected]>>
Subject: Re: why complete latency and failure rate is so high of my spout.

Your ack sequence is right, but the timeout is not just the simple sum of these 
times.  Consider a simple topology similar to yours:

spout s1 -> bolt b1, with 1 s1 task and 1 b1 task

s1 emits very quickly (quickly enough that the time taken doesn't matter for 
the example)
b1 takes 100 ms to process

If max spout pending is 1000, then when the spout does an emit, it will have up 
to 1000 messages that it must wait for.  If we emit 1000 message right away, 
then the spout task will reach max spout pending, the 1000th message will have 
to wait for all 999 messages in front of it to be processed before it gets 
executed by b1.  Therefore its complete latency will be nearly 100s (999 
messages * 100ms / message in b1).  If I keep emitting at the spout, then all 
subsequent messages will also have nearly 100s complete latency.  You can see 
in this example that if you lower the max spout pending to 100, then message 
100 would have expected complete latency of 10s, and all subsequent messages 
would be the same.

You do have to be careful because if topology.max.spout.pending is too low you 
can cause your topology to be starved for data.  I think the general 
recommendation is to set it to some initial value like 1024 and keep increasing 
it until you stop gaining throughput.  For your topology where the bolts seem 
to have high processing times you could probably start with a lower value than 
1024.


On Wed, Jul 30, 2014 at 1:34 PM, Wei, Xin 
<[email protected]<mailto:[email protected]>> wrote:
Hi Nathan,

Thanks a lot for your explanation.  There are still something I’m not very 
clear:
1.  "The topology.max.spout.pending configures how many messages can be 
un-acked from each spout before it stops sending messages.” - how a message in 
the spout would be got acked?
In my case, the topology is like this:
ActiveMQ — > JMS_SPOUt  — >  MESSAGE-FILTER_BOLT —> AGGREGATOR_BOLT — > 
OFFERGENERATOR_BOLT

There is no cycle there.   According to my understanding , A message in the 
spout got acked  only when :
1). jms_spout get message from activeMQ and emit it o message_filter — assume 
spending time t1
2). MessageFilter_bolt receive it and ack it then emit to aggregator — assume 
time t2
3). Aggregator receive it and acked it   — assume time t3
4).  Offergenerator bolt receive it and ack it — assume time t4
5). Finally the jms_spout would ack this message — assume time t5
Only :
t1+t2+t3+t4+t5 <=   message timeout secs,

then this message could be counted as successful.  Is my understanding correct 
or not ?

2. "he complete latency seems kind of high, but maybe setting max spout pending 
to a lower value would help reduce it.”

Why set the max spout pending to lower value would reduce complete latency?  
I’m thinking increase the value would reduce the latency.
In our code, I batch read 5000 messages one time, so 2 spout would read 10000 
messages one time. Assume I set the max spout pending to 1000, if the unacked 
messages is 2000, then the spout would stop sending messages, the left 8000 
messages would be just idle there and wait for sending. But if the max spout is 
10000, then it will still keep sending. Will not 10000 max pending be more 
efficient?

Or my understanding is wrong, I should think it like this way:   since already 
2000 messages got un – acked, but you still keep sending messages, giving a 
analogy, the traffic is already jammed, but you still keep the meter off and 
let all cars flow into the crowded highway. The better way is to turn on the 
meter, let the car wait a little bit and enter the highway one by one.

A little bit too long, thanks a lot for your patience to read it.

From: Nathan Leung <[email protected]<mailto:[email protected]>>
Reply-To: 
"[email protected]<mailto:[email protected]>" 
<[email protected]<mailto:[email protected]>>
Date: Wednesday, July 30, 2014 at 9:44 AM
To: user 
<[email protected]<mailto:[email protected]>>
Subject: Re: why complete latency and failure rate is so high of my spout.

The topology.max.spout.pending configures how many messages can be un-acked 
from each spout before it stops sending messages.  So in your example, each 
spout task can have 10 thousand messages waiting to be acked before it throttle 
itself and stops emitting.  Of course if some of those messages are acked, then 
it will be able to emit more messages.  This is important because you do not 
want to have too much data pending.  If you have a lot of data pending, then it 
will increase the amount of time that it takes to process the message because 
the complete latency is counted starting when you emit the tuple even if it's 
just waiting in the spouts output queue.  If the message times out without 
getting acked at the spout, then it will get counted as a failure, which is why 
you were seeing so many failures.  Changing the timeout to 90s probably also 
played a big role in reducing your failure count.

The complete latency seems kind of high, but maybe setting max spout pending to 
a lower value would help reduce it.


On Wed, Jul 30, 2014 at 12:38 PM, Wei, Xin 
<[email protected]<mailto:[email protected]>> wrote:
Hi There,

Yesterday, I changed some configurations of storm settings,  right now , the 
spout failure rate dropped to 0.  As shown below:

Topology stats
Window  Emitted Transferred     Complete latency (ms)   Acked   Failed
10m 
0s<http://pppdc9prd470.corp.intuit.net:8080/topology/nearline-3-1406737061?window=600>
      8766    8766    43077.391       5290    0
3h 0m 
0s<http://pppdc9prd470.corp.intuit.net:8080/topology/nearline-3-1406737061?window=10800>
  8766    8766    43077.391       5290    0
1d 0h 0m 
0s<http://pppdc9prd470.corp.intuit.net:8080/topology/nearline-3-1406737061?window=86400>
       8766    8766    43077.391       5290    0
All 
time<http://pppdc9prd470.corp.intuit.net:8080/topology/nearline-3-1406737061?window=%3Aall-time>
    8766    8766    43077.391       5290    0
Spouts (All time)
Id      Executors       Tasks   Emitted Transferred     Complete latency (ms)   
Acked   Failed  Last error
JMS_QUEUE_SPOUT<http://pppdc9prd470.corp.intuit.net:8080/topology/nearline-3-1406737061/component/JMS_QUEUE_SPOUT>
      2       2       5290    5290    43077.391       5290    0
Bolts (All time)
Id      Executors       Tasks   Emitted Transferred     Capacity (last 10m)     
Execute latency (ms)    Executed        Process latency (ms)    Acked   Failed  
Last error
AGGREGATOR_BOLT<http://pppdc9prd470.corp.intuit.net:8080/topology/nearline-3-1406737061/component/AGGREGATOR_BOLT>
      8       8       1738    1738    0.080   83.264  1738    81.243  1738    0
MESSAGEFILTER_BOLT<http://pppdc9prd470.corp.intuit.net:8080/topology/nearline-3-1406737061/component/MESSAGEFILTER_BOLT>
        8       8       1738    1738    0.091   29.833  5290    24.918  5290    0
OFFER_GENERATOR_BOLT<http://pppdc9prd470.corp.intuit.net:8080/topology/nearline-3-1406737061/component/OFFER_GENERATOR_BOLT>
    8       8       0       0       0.031   25.993  1738    24.296  1738    0

The topology configuration is listed below:

Topology Configuration
Key     Value
dev.zookeeper.path      /tmp/dev-storm-zookeeper
drpc.childopts  -Xmx768m
drpc.invocations.port   3773
drpc.port       3772
drpc.queue.size 128
drpc.request.timeout.secs       600
drpc.worker.threads     64
java.library.path       /usr/local/lib
logviewer.appender.name<http://logviewer.appender.name> A1
logviewer.childopts     -Xmx128m
logviewer.port  8000
nimbus.childopts        -Xmx1024m -Djava.net.preferIPv4Stack=true
nimbus.cleanup.inbox.freq.secs  600
nimbus.file.copy.expiration.secs        600
nimbus.host     zookeeper
nimbus.inbox.jar.expiration.secs        3600
nimbus.monitor.freq.secs        10
nimbus.reassign true
nimbus.supervisor.timeout.secs  60
nimbus.task.launch.secs 120
nimbus.task.timeout.secs        30
nimbus.thrift.port      6627
nimbus.topology.validator       backtype.storm.nimbus.DefaultTopologyValidator
storm.cluster.mode      distributed
storm.id<http://storm.id>       nearline-3-1406737061
storm.local.dir /app_local/storm
storm.local.mode.zmq    false
storm.messaging.netty.buffer_size       5242880
storm.messaging.netty.client_worker_threads     1
storm.messaging.netty.max_retries       30
storm.messaging.netty.max_wait_ms       1000
storm.messaging.netty.min_wait_ms       100
storm.messaging.netty.server_worker_threads     1
storm.messaging.transport       backtype.storm.messaging.zmq
storm.thrift.transport  backtype.storm.security.auth.SimpleTransportPlugin
storm.zookeeper.connection.timeout      15000
storm.zookeeper.port    2181
storm.zookeeper.retry.interval  1000
storm.zookeeper.retry.intervalceiling.millis    30000
storm.zookeeper.retry.times     5
storm.zookeeper.root    /storm
storm.zookeeper.servers ["zookeeper"]
storm.zookeeper.session.timeout 20000
supervisor.childopts    -Xmx256m -Djava.net.preferIPv4Stack=true
supervisor.enable       true
supervisor.heartbeat.frequency.secs     5
supervisor.monitor.frequency.secs       3
supervisor.slots.ports  [6700 6701 6702 6703]
supervisor.worker.start.timeout.secs    120
supervisor.worker.timeout.secs  30
task.heartbeat.frequency.secs   3
task.refresh.poll.secs  10
topology.acker.executors        4
topology.builtin.metrics.bucket.size.secs       60
topology.debug  false
topology.disruptor.wait.strategy        com.lmax.disruptor.BlockingWaitStrategy
topology.enable.message.timeouts        true
topology.error.throttle.interval.secs   10
topology.executor.receive.buffer.size   16384
topology.executor.send.buffer.size      16384
topology.fall.back.on.java.serialization        true
topology.kryo.decorators        []
topology.kryo.factory   backtype.storm.serialization.DefaultKryoFactory
topology.kryo.register
topology.max.error.report.per.interval  5
topology.max.spout.pending      10000
topology.max.task.parallelism
topology.message.timeout.secs   90
topology.name<http://topology.name>     nearline
topology.optimize       true
topology.receiver.buffer.size   8
topology.skip.missing.kryo.registrations        false
topology.sleep.spout.wait.strategy.time.ms<http://topology.sleep.spout.wait.strategy.time.ms>
   1
topology.spout.wait.strategy    backtype.storm.spout.SleepSpoutWaitStrategy
topology.state.synchronization.timeout.secs     60
topology.stats.sample.rate      1
topology.tasks
topology.tick.tuple.freq.secs
topology.transfer.buffer.size   32
topology.trident.batch.emit.interval.millis     500
topology.tuple.serializer       
backtype.storm.serialization.types.ListDelegateSerializer
topology.worker.childopts
topology.worker.shared.thread.pool.size 4
topology.workers        4
transactional.zookeeper.port
transactional.zookeeper.root    /transactional
transactional.zookeeper.servers
ui.childopts    -Xmx768m
ui.port 8080
worker.childopts        -Xmx768m -Djava.net.preferIPv4Stack=false 
-DNEARLINE_DATA_ENV=dev -DNEARLINE_APP_ENV=dev -DNEARLINE_QUEUES_ENV=dev 
-Dauthfilter.appcred.default.encrypt.file=/home/xwei/FP_AppCred_Encrypt.txt 
-Dauthfilter.appcred.default.passphrase.file=/home/xwei/FP_AppCred_Passphrase.txt
worker.heartbeat.frequency.secs 1
zmq.hwm 0
zmq.linger.millis       5000
zmq.threads     1
The settiings I changed:
1.  topology.acker.executors    I adjust it to 4.
2. Topology.max.spout.pending    change it to 10000
3. topology.message.timeout.secs   change it from 30 to 90 secs

I think the NO 2 topology.max.spout.pending is the critical factor which make 
big differences. Can anybody tell me what that setting does?


Thanks a lot for help.





Reply via email to