and it looks like each time the emitDirect/consumeBatch method is called
(to send an ack to mastercoord), I could expect a few ms wait
2015-07-02T16:10:00.*128+0200* backtype.storm.daemon.task Thread-9-b-1
[INFO] Emitting: b-1 __ack_ack [4382299576881565710 1329423615189887198]
Caller+0 at
clojure.tools.logging$eval1$fn__7.invoke(NO_SOURCE_FILE:0)
Caller+1 at
clojure.tools.logging.impl$fn__15$G__8__26.invoke(impl.clj:16)
Caller+2 at clojure.tools.logging$log_STAR_.invoke(logging.clj:59)
Caller+3 at
backtype.storm.daemon.task$mk_tasks_fn$fn__3639.invoke(task.clj:152)
Caller+4 at
backtype.storm.daemon.task$send_unanchored.invoke(task.clj:111)
Caller+5 at
backtype.storm.daemon.task$send_unanchored.invoke(task.clj:117)
Caller+6 at
backtype.storm.daemon.executor$fn__4722$fn$reify__4767.ack(executor.clj:707)
Caller+7 at backtype.storm.task.OutputCollector.*ack*
(OutputCollector.java:213)
2015-07-02T16:10:00.*128+0200* backtype.storm.daemon.executor
Thread-5-__acker [INFO] Processing received message source: b-1:5, stream:
__ack_ack, id: {}, [4382299576881565710 1329423615189887198]
Caller+0 at
clojure.tools.logging$eval1$fn__7.invoke(NO_SOURCE_FILE:0)
Caller+1 at
clojure.tools.logging.impl$fn__15$G__8__26.invoke(impl.clj:16)
Caller+2 at clojure.tools.logging$log_STAR_.invoke(logging.clj:59)
Caller+3 at
backtype.storm.daemon.executor$mk_task_receiver$fn__4645.invoke(executor.clj:399)
Caller+4 at
backtype.storm.disruptor$clojure_handler$reify__1446.onEvent(disruptor.clj:58)
Caller+5 at
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125)
Caller+6 at
backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
Caller+7 at backtype.storm.disruptor$*consume_batch_when_available*
.invoke(disruptor.clj:80)
2015-07-02T16:10:00.*128+0200* backtype.storm.daemon.task *Thread-5-__acker*
[INFO] Emitting direct: 1; __acker __ack_ack [4382299576881565710]
Caller+0 at
clojure.tools.logging$eval1$fn__7.invoke(NO_SOURCE_FILE:0)
Caller+1 at
clojure.tools.logging.impl$fn__15$G__8__26.invoke(impl.clj:16)
Caller+2 at clojure.tools.logging$log_STAR_.invoke(logging.clj:59)
Caller+3 at
backtype.storm.daemon.task$mk_tasks_fn$fn__3639.invoke(task.clj:134)
Caller+4 at
backtype.storm.daemon.executor$fn__4722$fn__4734$bolt_emit__4761.invoke(executor.clj:662)
Caller+5 at
backtype.storm.daemon.executor$fn__4722$fn$reify__4767.emitDirect(executor.clj:700)
Caller+6 at
backtype.storm.task.OutputCollector.emitDirect(OutputCollector.java:208)
Caller+7 at backtype.storm.task.OutputCollector.*emitDirect*
(OutputCollector.java:135)
2015-07-02T16:10:00.*133+0200* backtype.storm.daemon.executor
*Thread-15-$mastercoord-bg0* [INFO] Processing received message source:
__acker:3, stream: __ack_ack, id: {}, [4382299576881565710]
Caller+0 at
clojure.tools.logging$eval1$fn__7.invoke(NO_SOURCE_FILE:0)
Caller+1 at
clojure.tools.logging.impl$fn__15$G__8__26.invoke(impl.clj:16)
Caller+2 at clojure.tools.logging$log_STAR_.invoke(logging.clj:59)
Caller+3 at
backtype.storm.daemon.executor$mk_task_receiver$fn__4645.invoke(executor.clj:399)
Caller+4 at
backtype.storm.disruptor$clojure_handler$reify__1446.onEvent(disruptor.clj:58)
Caller+5 at
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125)
Caller+6 at
backtype.storm.utils.DisruptorQueue.consumeBatch(DisruptorQueue.java:87)
Caller+7 at backtype.storm.disruptor$*consume_batch*
.invoke(disruptor.clj:76)
Is it something someone has observed on Linux platform? I am on Solaris.
On Thu, Jul 2, 2015 at 12:04 PM, Olivier Mallassi <
[email protected]> wrote:
> Hi all
>
> I investigated in more details (1 event per sec, emitTimeInterval 1sec)
> and it appears most of my 9ms are spent during exchange with _acker (stream
> __ack_ack).
>
> Regarding the following traces and if I understood correctly, I spend 7, 8
> ms between the "Emitting direct __acker _ack_ack" and "Processing"
>
> 2015-07-02T08:57:49.311+0200 backtype.storm.daemon.task 0 [INFO] Emitting:
> b-0 __ack_ack [671062448715243437 2548740402887507265]
> 2015-07-02T08:57:49.311+0200 backtype.storm.daemon.executor 0 [INFO]
> Processing received message source: b-0:4, stream: __ack_ack, id: {},
> [671062448715243437 2548740402887507265]
> 2015-07-02T08:57:49.311+0200 backtype.storm.daemon.task 0 [INFO] Emitting
> direct: 1; __acker __ack_ack [671062448715243437]
> ******************************************
> 2015-07-02T08:57:49.318+0200 backtype.storm.daemon.executor 0 [INFO]
> Processing received message source: __acker:3, stream: __ack_ack, id: {},
> [671062448715243437]
> 2015-07-02T08:57:49.318+0200 backtype.storm.daemon.executor 0 [INFO]
> Acking message 175873:0
>
> 2015-07-02T08:57:49.318+0200 backtype.storm.daemon.task 0 [INFO] Emitting:
> $mastercoord-bg0 $commit [175873:0]
> 2015-07-02T08:57:49.319+0200 backtype.storm.daemon.task 0 [INFO] Emitting:
> $mastercoord-bg0 __ack_init [-4460222877314276996 -348189169909316069 1]
> 2015-07-02T08:57:49.319+0200 backtype.storm.daemon.executor 0 [INFO]
> Processing received message source: $mastercoord-bg0:1, stream: $commit,
> id: {-4460222877314276996=-348189169909316069}, [175873:0]
> 2015-07-02T08:57:49.319+0200 backtype.storm.daemon.executor 0 [INFO]
> Processing received message source: $mastercoord-bg0:1, stream: __ack_init,
> id: {}, [-4460222877314276996 -348189169909316069 1]
> [------------------------------------------ the Aggregator is called ]
> 2015-07-02T08:57:49.319+0200 backtype.storm.daemon.task 0 [INFO] Emitting:
> b-0 __ack_ack [-4460222877314276996 -348189169909316069]
> 2015-07-02T08:57:49.319+0200 backtype.storm.daemon.executor 0 [INFO]
> Processing received message source: b-0:4, stream: __ack_ack, id: {},
> [-4460222877314276996 -348189169909316069]
> 2015-07-02T08:57:49.319+0200 backtype.storm.daemon.task 0 [INFO] Emitting
> direct: 1; __acker __ack_ack [-4460222877314276996]
> ******************************************
> 2015-07-02T08:57:49.329+0200 backtype.storm.daemon.executor 0 [INFO]
> Processing received message source: __acker:3, stream: __ack_ack, id: {},
> [-4460222877314276996]
> 2015-07-02T08:57:49.329+0200 backtype.storm.daemon.executor 0 [INFO]
> Acking message 175873:0
>
>
> This time is systematic for all tuples (and not related to gc pauses...).
> Once again, when running the same topology (code + config) in local mode, I
> do not observe the same kind of wait.
>
> has anyone an idea of what's going on? is there a particular code section
> that I can look at ?
>
> thx.
>
> olivier.
>
>
>
> On Tue, Jun 30, 2015 at 5:37 PM, Olivier Mallassi <
> [email protected]> wrote:
>
>> Hi all
>>
>> Here is the code of my topology
>>
>> topology.newStream(this.getTopologyName(), spout)
>> .each(new Fields("event"), new
>> EventTypeFilter(MyEvent.class))
>> .each(new Fields("event"), flattern, new Fields( "a",
>> "b", "c"))
>> .parallelismHint(1)
>> .groupBy(new Fields("a", "b", "c"))
>> .persistentAggregate(new MemoryMapState.StateFactory<>(),
>> new Fields("event"),
>> new EventAggregator(),
>>
>> new Fields("aggr"))
>> .parallelismHint(1);
>>
>> I ran the topology with no parallelism, a troughtput of 20 events per sec
>> to avoid any types of contention...
>>
>>
>> By adding some traces, it looks like I spend ~8 ms between the end of the
>> "flattern bolt" and the getAll() method implemented in the MemoryMapState.
>>
>> Looks weird but is it the expected behavior? Do you have any ideas of
>> what can cause this delay (which is regular as a metronome) ?
>>
>> I am running storm 0.9.4.
>>
>> Regards.
>>
>> olivier.
>>
>>
>>
>>
>> On Mon, Jun 29, 2015 at 6:41 PM, Olivier Mallassi <
>> [email protected]> wrote:
>>
>>> Hello all
>>>
>>> I am facing an issue or at least something I cannot figure out regarding
>>> the end-to-end latency of my topology.
>>> So I guess you will be able to help me.
>>>
>>> *Topology *
>>> I am running a trident topology
>>> - a IBatchSpout emits events of different types (1 spout only)
>>> - a filter is applied and check the "instance of" the events. In most of
>>> the case, this filter returns true. (b-1)
>>> - A bolt (b-0) flattern the event
>>> - a group by 3 fields of the event is done
>>> - the events are aggregated using the persistentAggregate methods, a
>>> custom reducer and a simple MemoryMapState object
>>>
>>> Due to my current data set, The events are grouped into 19 different
>>> aggregates.
>>>
>>> In these cases, I run
>>> - a single worker with a single spout and multiple executors (10)
>>> - emit interval millis = 1
>>> - TOPOLOGY_EXECUTOR_RECEIVER / SEND_BUFFER SIZE are set to a large value
>>> 16384.
>>>
>>> I measure my end-to-end latency by adding a timestamp in the event, just
>>> before the _collector.emit() and then, in the reducer, by measuring the
>>> delta. So this latency is the time an event needs to be aggregated.
>>>
>>> Here are the numbers I got
>>>
>>> case#1 e*vents throughput* *40000* capacity Execute
>>> latency (ms) process latency (ms) complete latency(ms) mastercoord-bg0
>>> 10.031 $spoutcoord-spout0 0.022 0.243 0.181 __acker 0.004
>>> 0.004 0.002 b-0 0.045 0.006 0.002 b-1 0.159 0.004 0.003 spout0
>>> 0.150 1.673 1.919 *measured end2end latency (99%centile) *
>>> *9.5* *case #2 events throughput* *250*
>>> mastercoord-bg0 10.026 $spoutcoord-spout0 0.024 0.255 0.190
>>> __acker 0.006 0.005 0.002 b-0 0.006 0.020 0.024 b-1 0.004 0.018
>>> 0.016 spout0 0.017 0.185 0.135 *measured end2end latency
>>> (99%centile)* *9.5*
>>>
>>>
>>> The good news is that the end-to-end latency is quite stable where the
>>> throughput of generated events has significantly increase.
>>> The 'not so good' new is that I would like my latency to be under 1ms
>>> (again, a single worker and at least for low throughput). Do you have any
>>> ideas about how to configure the topology? maybe avoid using Trident and
>>> use Storm API directly can help?
>>>
>>> Do you know how I can better understand where I spend my time in this
>>> topology?
>>>
>>> Note that I have tried to increase the max.spout.pending but the
>>> end-2-end latency is worst (like if everything was, in the end, mono
>>> threaded)
>>>
>>> For sure, there is something I do not understand. Many thx for you help.
>>>
>>> Olivier
>>>
>>> PS: when I run the second case in local mode (on my dev laptop), I
>>> observe a end-2-end latency around 1 to 2 ms.
>>>
>>>
>>>
>>
>