Hi all

I investigated in more details (1 event per sec, emitTimeInterval 1sec) and
it appears most of my 9ms are spent during exchange with _acker (stream
__ack_ack).

Regarding the following traces and if I understood correctly, I spend 7, 8
ms between the "Emitting direct __acker _ack_ack" and "Processing"

2015-07-02T08:57:49.311+0200 backtype.storm.daemon.task 0 [INFO] Emitting:
b-0 __ack_ack [671062448715243437 2548740402887507265]
2015-07-02T08:57:49.311+0200 backtype.storm.daemon.executor 0 [INFO]
Processing received message source: b-0:4, stream: __ack_ack, id: {},
[671062448715243437 2548740402887507265]
2015-07-02T08:57:49.311+0200 backtype.storm.daemon.task 0 [INFO] Emitting
direct: 1; __acker __ack_ack [671062448715243437]
******************************************
2015-07-02T08:57:49.318+0200 backtype.storm.daemon.executor 0 [INFO]
Processing received message source: __acker:3, stream: __ack_ack, id: {},
[671062448715243437]
2015-07-02T08:57:49.318+0200 backtype.storm.daemon.executor 0 [INFO] Acking
message 175873:0

2015-07-02T08:57:49.318+0200 backtype.storm.daemon.task 0 [INFO] Emitting:
$mastercoord-bg0 $commit [175873:0]
2015-07-02T08:57:49.319+0200 backtype.storm.daemon.task 0 [INFO] Emitting:
$mastercoord-bg0 __ack_init [-4460222877314276996 -348189169909316069 1]
2015-07-02T08:57:49.319+0200 backtype.storm.daemon.executor 0 [INFO]
Processing received message source: $mastercoord-bg0:1, stream: $commit,
id: {-4460222877314276996=-348189169909316069}, [175873:0]
2015-07-02T08:57:49.319+0200 backtype.storm.daemon.executor 0 [INFO]
Processing received message source: $mastercoord-bg0:1, stream: __ack_init,
id: {}, [-4460222877314276996 -348189169909316069 1]
[------------------------------------------ the Aggregator is called ]
2015-07-02T08:57:49.319+0200 backtype.storm.daemon.task 0 [INFO] Emitting:
b-0 __ack_ack [-4460222877314276996 -348189169909316069]
2015-07-02T08:57:49.319+0200 backtype.storm.daemon.executor 0 [INFO]
Processing received message source: b-0:4, stream: __ack_ack, id: {},
[-4460222877314276996 -348189169909316069]
2015-07-02T08:57:49.319+0200 backtype.storm.daemon.task 0 [INFO] Emitting
direct: 1; __acker __ack_ack [-4460222877314276996]
******************************************
2015-07-02T08:57:49.329+0200 backtype.storm.daemon.executor 0 [INFO]
Processing received message source: __acker:3, stream: __ack_ack, id: {},
[-4460222877314276996]
2015-07-02T08:57:49.329+0200 backtype.storm.daemon.executor 0 [INFO] Acking
message 175873:0


This time is systematic for all tuples (and not related to gc pauses...).
Once again, when running the same topology (code + config) in local mode, I
do not observe the same kind of wait.

has anyone an idea of what's going on? is there a particular code section
that I can look at ?

thx.

olivier.



On Tue, Jun 30, 2015 at 5:37 PM, Olivier Mallassi <
[email protected]> wrote:

> Hi all
>
> Here is the code of my topology
>
> topology.newStream(this.getTopologyName(), spout)
>                 .each(new Fields("event"), new
> EventTypeFilter(MyEvent.class))
>                 .each(new Fields("event"), flattern, new Fields( "a",
>  "b", "c"))
>                 .parallelismHint(1)
>                 .groupBy(new Fields("a", "b", "c"))
>                 .persistentAggregate(new MemoryMapState.StateFactory<>(),
>                         new Fields("event"),
>                         new EventAggregator(),
>
>                         new Fields("aggr"))
>                 .parallelismHint(1);
>
> I ran the topology with no parallelism, a troughtput of 20 events per sec
> to avoid any types of contention...
>
>
> By adding some traces, it looks like I spend ~8 ms between the end of the
> "flattern bolt" and the getAll() method implemented in the MemoryMapState.
>
> Looks weird but is it the expected behavior? Do you have any ideas of what
> can cause this delay (which is regular as a metronome) ?
>
> I am running storm 0.9.4.
>
> Regards.
>
> olivier.
>
>
>
>
> On Mon, Jun 29, 2015 at 6:41 PM, Olivier Mallassi <
> [email protected]> wrote:
>
>> Hello all
>>
>> I am facing an issue or at least something I cannot figure out regarding
>> the end-to-end latency of my topology.
>> So I guess you will be able to help me.
>>
>> *Topology *
>> I am running a trident topology
>> - a IBatchSpout emits events of different types (1 spout only)
>> - a filter is applied and check the "instance of" the events. In most of
>> the case, this filter returns true. (b-1)
>> - A bolt (b-0) flattern the event
>> - a group by 3 fields of the event is done
>> - the events are aggregated using the persistentAggregate methods, a
>> custom reducer and a simple MemoryMapState object
>>
>> Due to my current data set, The events are grouped into 19 different
>> aggregates.
>>
>> In these cases, I run
>> - a single worker with a single spout and multiple executors (10)
>> - emit interval millis = 1
>> - TOPOLOGY_EXECUTOR_RECEIVER / SEND_BUFFER SIZE are set to a large value
>> 16384.
>>
>> I measure my end-to-end latency by adding a timestamp in the event, just
>> before the _collector.emit() and then, in the reducer, by measuring the
>> delta. So this latency is the time an event needs to be aggregated.
>>
>> Here are the numbers I got
>>
>>   case#1 e*vents throughput*  *40000*                   capacity Execute
>> latency (ms) process latency (ms) complete latency(ms)  mastercoord-bg0
>>     10.031  $spoutcoord-spout0 0.022 0.243 0.181    __acker 0.004 0.004
>> 0.002    b-0 0.045 0.006 0.002    b-1 0.159 0.004 0.003    spout0 0.150
>> 1.673 1.919    *measured end2end latency (99%centile) *       *9.5*
>>        *case #2 events throughput* *250*        mastercoord-bg0
>> 10.026  $spoutcoord-spout0 0.024 0.255 0.190    __acker 0.006 0.005 0.002
>>    b-0 0.006 0.020 0.024    b-1 0.004 0.018 0.016    spout0 0.017 0.185
>> 0.135    *measured end2end latency (99%centile)*       *9.5*
>>
>>
>> The good news is that the end-to-end latency is quite stable where the
>> throughput of generated events has significantly increase.
>> The 'not so good' new is that I would like my latency to be under 1ms
>> (again, a single worker and at least for low throughput). Do you have any
>> ideas about how to configure the topology? maybe avoid using Trident and
>> use Storm API directly can help?
>>
>> Do you know how I can better understand where I spend my time in this
>> topology?
>>
>> Note that I have tried to increase the max.spout.pending but the
>> end-2-end latency is worst (like if everything was, in the end, mono
>> threaded)
>>
>> For sure, there is something I do not understand. Many thx for you help.
>>
>> Olivier
>>
>> PS: when I run the second case in local mode (on my dev laptop), I
>> observe a end-2-end latency around 1 to 2 ms.
>>
>>
>>
>

Reply via email to