Hi all,

To continue and go a little bit further, it looks this is specific to
Solaris.

I have run the same topology, in Local Mode (LocalCluster)
- on ubuntu 14.X and solaris
- 1 event every 10 ms (emit_batch_interval)
- following cmd line on both machines  java -server -Xmx2g -Xms2g
-XX:MaxNewSize=1g -XX:+UseParNewGC -XX:+UseConcMarkSweepGC  .... topology
main class, custom param
- java 1.8


The behavior on linux is "as expected"
2015-07-03T13:43:*34.243*+0200 [Thread-11-__acker] INFO
 backtype.storm.daemon.task - Emitting direct: 1; __acker __ack_ack
[7224296586433968782]
2015-07-03T13:43:*34.243*+0200 [Thread-21-$mastercoord-bg0] INFO
 backtype.storm.daemon.executor - Processing received message source:
__acker:3, stream: __ack_ack, id: {}, [7224296586433968782]

not on solaris
2015-07-03T14:28:*15.736*+0200 [Thread-11-__acker] INFO
 backtype.storm.daemon.task - Emitting direct: 1; __acker __ack_ack
[2253796205321724272]
2015-07-03T14:28:*15.744*+0200 [Thread-21-$mastercoord-bg0] INFO
 backtype.storm.daemon.executor - Processing received message source:
__acker:3, stream
: __ack_ack, id: {}, [2253796205321724272]


Looking at the code / trace, it looks like this could be related to
DisruptorQueue.consumeBatch which internally use SequenceBarrier.

Am I completly wrong? Can someone help me confirm / infirm this (and the
fact this is maybe related to Disruptor 2.10 behavior)?

Regards

On Thu, Jul 2, 2015 at 5:41 PM, Olivier Mallassi <[email protected]
> wrote:

> and it looks like each time the emitDirect/consumeBatch method is called
> (to send an ack to mastercoord), I could expect a few ms wait
>
> 2015-07-02T16:10:00.*128+0200* backtype.storm.daemon.task Thread-9-b-1
> [INFO] Emitting: b-1 __ack_ack [4382299576881565710 1329423615189887198]
>  Caller+0        at
> clojure.tools.logging$eval1$fn__7.invoke(NO_SOURCE_FILE:0)
> Caller+1         at
> clojure.tools.logging.impl$fn__15$G__8__26.invoke(impl.clj:16)
> Caller+2         at clojure.tools.logging$log_STAR_.invoke(logging.clj:59)
> Caller+3         at
> backtype.storm.daemon.task$mk_tasks_fn$fn__3639.invoke(task.clj:152)
> Caller+4         at
> backtype.storm.daemon.task$send_unanchored.invoke(task.clj:111)
> Caller+5         at
> backtype.storm.daemon.task$send_unanchored.invoke(task.clj:117)
> Caller+6         at
> backtype.storm.daemon.executor$fn__4722$fn$reify__4767.ack(executor.clj:707)
> Caller+7         at backtype.storm.task.OutputCollector.*ack*
> (OutputCollector.java:213)
> 2015-07-02T16:10:00.*128+0200* backtype.storm.daemon.executor
> Thread-5-__acker [INFO] Processing received message source: b-1:5, stream:
> __ack_ack, id: {}, [4382299576881565710 1329423615189887198]
>  Caller+0        at
> clojure.tools.logging$eval1$fn__7.invoke(NO_SOURCE_FILE:0)
> Caller+1         at
> clojure.tools.logging.impl$fn__15$G__8__26.invoke(impl.clj:16)
> Caller+2         at clojure.tools.logging$log_STAR_.invoke(logging.clj:59)
> Caller+3         at
> backtype.storm.daemon.executor$mk_task_receiver$fn__4645.invoke(executor.clj:399)
> Caller+4         at
> backtype.storm.disruptor$clojure_handler$reify__1446.onEvent(disruptor.clj:58)
> Caller+5         at
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125)
> Caller+6         at
> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
> Caller+7         at backtype.storm.disruptor$
> *consume_batch_when_available*.invoke(disruptor.clj:80)
> 2015-07-02T16:10:00.*128+0200* backtype.storm.daemon.task
> *Thread-5-__acker* [INFO] Emitting direct: 1; __acker __ack_ack
> [4382299576881565710]
>  Caller+0        at
> clojure.tools.logging$eval1$fn__7.invoke(NO_SOURCE_FILE:0)
> Caller+1         at
> clojure.tools.logging.impl$fn__15$G__8__26.invoke(impl.clj:16)
> Caller+2         at clojure.tools.logging$log_STAR_.invoke(logging.clj:59)
> Caller+3         at
> backtype.storm.daemon.task$mk_tasks_fn$fn__3639.invoke(task.clj:134)
> Caller+4         at
> backtype.storm.daemon.executor$fn__4722$fn__4734$bolt_emit__4761.invoke(executor.clj:662)
> Caller+5         at
> backtype.storm.daemon.executor$fn__4722$fn$reify__4767.emitDirect(executor.clj:700)
> Caller+6         at
> backtype.storm.task.OutputCollector.emitDirect(OutputCollector.java:208)
> Caller+7         at backtype.storm.task.OutputCollector.*emitDirect*
> (OutputCollector.java:135)
> 2015-07-02T16:10:00.*133+0200* backtype.storm.daemon.executor
> *Thread-15-$mastercoord-bg0* [INFO] Processing received message source:
> __acker:3, stream: __ack_ack, id: {}, [4382299576881565710]
>  Caller+0        at
> clojure.tools.logging$eval1$fn__7.invoke(NO_SOURCE_FILE:0)
> Caller+1         at
> clojure.tools.logging.impl$fn__15$G__8__26.invoke(impl.clj:16)
> Caller+2         at clojure.tools.logging$log_STAR_.invoke(logging.clj:59)
> Caller+3         at
> backtype.storm.daemon.executor$mk_task_receiver$fn__4645.invoke(executor.clj:399)
> Caller+4         at
> backtype.storm.disruptor$clojure_handler$reify__1446.onEvent(disruptor.clj:58)
> Caller+5         at
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125)
> Caller+6         at
> backtype.storm.utils.DisruptorQueue.consumeBatch(DisruptorQueue.java:87)
> Caller+7         at backtype.storm.disruptor$*consume_batch*
> .invoke(disruptor.clj:76)
>
>
> Is it something someone has observed on Linux platform? I am on Solaris.
>
> On Thu, Jul 2, 2015 at 12:04 PM, Olivier Mallassi <
> [email protected]> wrote:
>
>> Hi all
>>
>> I investigated in more details (1 event per sec, emitTimeInterval 1sec)
>> and it appears most of my 9ms are spent during exchange with _acker (stream
>> __ack_ack).
>>
>> Regarding the following traces and if I understood correctly, I spend 7,
>> 8 ms between the "Emitting direct __acker _ack_ack" and "Processing"
>>
>> 2015-07-02T08:57:49.311+0200 backtype.storm.daemon.task 0 [INFO]
>> Emitting: b-0 __ack_ack [671062448715243437 2548740402887507265]
>> 2015-07-02T08:57:49.311+0200 backtype.storm.daemon.executor 0 [INFO]
>> Processing received message source: b-0:4, stream: __ack_ack, id: {},
>> [671062448715243437 2548740402887507265]
>> 2015-07-02T08:57:49.311+0200 backtype.storm.daemon.task 0 [INFO] Emitting
>> direct: 1; __acker __ack_ack [671062448715243437]
>> ******************************************
>> 2015-07-02T08:57:49.318+0200 backtype.storm.daemon.executor 0 [INFO]
>> Processing received message source: __acker:3, stream: __ack_ack, id: {},
>> [671062448715243437]
>> 2015-07-02T08:57:49.318+0200 backtype.storm.daemon.executor 0 [INFO]
>> Acking message 175873:0
>>
>> 2015-07-02T08:57:49.318+0200 backtype.storm.daemon.task 0 [INFO]
>> Emitting: $mastercoord-bg0 $commit [175873:0]
>> 2015-07-02T08:57:49.319+0200 backtype.storm.daemon.task 0 [INFO]
>> Emitting: $mastercoord-bg0 __ack_init [-4460222877314276996
>> -348189169909316069 1]
>> 2015-07-02T08:57:49.319+0200 backtype.storm.daemon.executor 0 [INFO]
>> Processing received message source: $mastercoord-bg0:1, stream: $commit,
>> id: {-4460222877314276996=-348189169909316069}, [175873:0]
>> 2015-07-02T08:57:49.319+0200 backtype.storm.daemon.executor 0 [INFO]
>> Processing received message source: $mastercoord-bg0:1, stream: __ack_init,
>> id: {}, [-4460222877314276996 -348189169909316069 1]
>> [------------------------------------------ the Aggregator is called ]
>> 2015-07-02T08:57:49.319+0200 backtype.storm.daemon.task 0 [INFO]
>> Emitting: b-0 __ack_ack [-4460222877314276996 -348189169909316069]
>> 2015-07-02T08:57:49.319+0200 backtype.storm.daemon.executor 0 [INFO]
>> Processing received message source: b-0:4, stream: __ack_ack, id: {},
>> [-4460222877314276996 -348189169909316069]
>> 2015-07-02T08:57:49.319+0200 backtype.storm.daemon.task 0 [INFO] Emitting
>> direct: 1; __acker __ack_ack [-4460222877314276996]
>> ******************************************
>> 2015-07-02T08:57:49.329+0200 backtype.storm.daemon.executor 0 [INFO]
>> Processing received message source: __acker:3, stream: __ack_ack, id: {},
>> [-4460222877314276996]
>> 2015-07-02T08:57:49.329+0200 backtype.storm.daemon.executor 0 [INFO]
>> Acking message 175873:0
>>
>>
>> This time is systematic for all tuples (and not related to gc pauses...).
>> Once again, when running the same topology (code + config) in local mode, I
>> do not observe the same kind of wait.
>>
>> has anyone an idea of what's going on? is there a particular code section
>> that I can look at ?
>>
>> thx.
>>
>> olivier.
>>
>>
>>
>> On Tue, Jun 30, 2015 at 5:37 PM, Olivier Mallassi <
>> [email protected]> wrote:
>>
>>> Hi all
>>>
>>> Here is the code of my topology
>>>
>>> topology.newStream(this.getTopologyName(), spout)
>>>                 .each(new Fields("event"), new
>>> EventTypeFilter(MyEvent.class))
>>>                 .each(new Fields("event"), flattern, new Fields( "a",
>>>  "b", "c"))
>>>                 .parallelismHint(1)
>>>                 .groupBy(new Fields("a", "b", "c"))
>>>                 .persistentAggregate(new MemoryMapState.StateFactory<>(),
>>>                         new Fields("event"),
>>>                         new EventAggregator(),
>>>
>>>                         new Fields("aggr"))
>>>                 .parallelismHint(1);
>>>
>>> I ran the topology with no parallelism, a troughtput of 20 events per
>>> sec to avoid any types of contention...
>>>
>>>
>>> By adding some traces, it looks like I spend ~8 ms between the end of
>>> the "flattern bolt" and the getAll() method implemented in the
>>> MemoryMapState.
>>>
>>> Looks weird but is it the expected behavior? Do you have any ideas of
>>> what can cause this delay (which is regular as a metronome) ?
>>>
>>> I am running storm 0.9.4.
>>>
>>> Regards.
>>>
>>> olivier.
>>>
>>>
>>>
>>>
>>> On Mon, Jun 29, 2015 at 6:41 PM, Olivier Mallassi <
>>> [email protected]> wrote:
>>>
>>>> Hello all
>>>>
>>>> I am facing an issue or at least something I cannot figure out
>>>> regarding the end-to-end latency of my topology.
>>>> So I guess you will be able to help me.
>>>>
>>>> *Topology *
>>>> I am running a trident topology
>>>> - a IBatchSpout emits events of different types (1 spout only)
>>>> - a filter is applied and check the "instance of" the events. In most
>>>> of the case, this filter returns true. (b-1)
>>>> - A bolt (b-0) flattern the event
>>>> - a group by 3 fields of the event is done
>>>> - the events are aggregated using the persistentAggregate methods, a
>>>> custom reducer and a simple MemoryMapState object
>>>>
>>>> Due to my current data set, The events are grouped into 19 different
>>>> aggregates.
>>>>
>>>> In these cases, I run
>>>> - a single worker with a single spout and multiple executors (10)
>>>> - emit interval millis = 1
>>>> - TOPOLOGY_EXECUTOR_RECEIVER / SEND_BUFFER SIZE are set to a large
>>>> value 16384.
>>>>
>>>> I measure my end-to-end latency by adding a timestamp in the event,
>>>> just before the _collector.emit() and then, in the reducer, by measuring
>>>> the delta. So this latency is the time an event needs to be aggregated.
>>>>
>>>> Here are the numbers I got
>>>>
>>>>   case#1 e*vents throughput*  *40000*                   capacity Execute
>>>> latency (ms) process latency (ms) complete latency(ms)  mastercoord-bg0
>>>>       10.031  $spoutcoord-spout0 0.022 0.243 0.181    __acker 0.004
>>>> 0.004 0.002    b-0 0.045 0.006 0.002    b-1 0.159 0.004 0.003    spout0
>>>> 0.150 1.673 1.919    *measured end2end latency (99%centile) *
>>>> *9.5*             *case #2 events throughput* *250*
>>>> mastercoord-bg0       10.026  $spoutcoord-spout0 0.024 0.255 0.190
>>>> __acker 0.006 0.005 0.002    b-0 0.006 0.020 0.024    b-1 0.004 0.018
>>>> 0.016    spout0 0.017 0.185 0.135    *measured end2end latency
>>>> (99%centile)*       *9.5*
>>>>
>>>>
>>>> The good news is that the end-to-end latency is quite stable where the
>>>> throughput of generated events has significantly increase.
>>>> The 'not so good' new is that I would like my latency to be under 1ms
>>>> (again, a single worker and at least for low throughput). Do you have any
>>>> ideas about how to configure the topology? maybe avoid using Trident and
>>>> use Storm API directly can help?
>>>>
>>>> Do you know how I can better understand where I spend my time in this
>>>> topology?
>>>>
>>>> Note that I have tried to increase the max.spout.pending but the
>>>> end-2-end latency is worst (like if everything was, in the end, mono
>>>> threaded)
>>>>
>>>> For sure, there is something I do not understand. Many thx for you
>>>> help.
>>>>
>>>> Olivier
>>>>
>>>> PS: when I run the second case in local mode (on my dev laptop), I
>>>> observe a end-2-end latency around 1 to 2 ms.
>>>>
>>>>
>>>>
>>>
>>
>

Reply via email to