Hi all

Here is the code of my topology

topology.newStream(this.getTopologyName(), spout)
                .each(new Fields("event"), new
EventTypeFilter(MyEvent.class))
                .each(new Fields("event"), flattern, new Fields( "a",  "b",
"c"))
                .parallelismHint(1)
                .groupBy(new Fields("a", "b", "c"))
                .persistentAggregate(new MemoryMapState.StateFactory<>(),
                        new Fields("event"),
                        new EventAggregator(),

                        new Fields("aggr"))
                .parallelismHint(1);

I ran the topology with no parallelism, a troughtput of 20 events per sec
to avoid any types of contention...


By adding some traces, it looks like I spend ~8 ms between the end of the
"flattern bolt" and the getAll() method implemented in the MemoryMapState.

Looks weird but is it the expected behavior? Do you have any ideas of what
can cause this delay (which is regular as a metronome) ?

I am running storm 0.9.4.

Regards.

olivier.




On Mon, Jun 29, 2015 at 6:41 PM, Olivier Mallassi <
[email protected]> wrote:

> Hello all
>
> I am facing an issue or at least something I cannot figure out regarding
> the end-to-end latency of my topology.
> So I guess you will be able to help me.
>
> *Topology *
> I am running a trident topology
> - a IBatchSpout emits events of different types (1 spout only)
> - a filter is applied and check the "instance of" the events. In most of
> the case, this filter returns true. (b-1)
> - A bolt (b-0) flattern the event
> - a group by 3 fields of the event is done
> - the events are aggregated using the persistentAggregate methods, a
> custom reducer and a simple MemoryMapState object
>
> Due to my current data set, The events are grouped into 19 different
> aggregates.
>
> In these cases, I run
> - a single worker with a single spout and multiple executors (10)
> - emit interval millis = 1
> - TOPOLOGY_EXECUTOR_RECEIVER / SEND_BUFFER SIZE are set to a large value
> 16384.
>
> I measure my end-to-end latency by adding a timestamp in the event, just
> before the _collector.emit() and then, in the reducer, by measuring the
> delta. So this latency is the time an event needs to be aggregated.
>
> Here are the numbers I got
>
>   case#1 e*vents throughput*  *40000*                   capacity Execute
> latency (ms) process latency (ms) complete latency(ms)  mastercoord-bg0
>     10.031  $spoutcoord-spout0 0.022 0.243 0.181    __acker 0.004 0.004
> 0.002    b-0 0.045 0.006 0.002    b-1 0.159 0.004 0.003    spout0 0.150
> 1.673 1.919    *measured end2end latency (99%centile) *       *9.5*
>      *case #2 events throughput* *250*        mastercoord-bg0       10.026
> $spoutcoord-spout0 0.024 0.255 0.190    __acker 0.006 0.005 0.002    b-0
> 0.006 0.020 0.024    b-1 0.004 0.018 0.016    spout0 0.017 0.185 0.135    
> *measured
> end2end latency (99%centile)*       *9.5*
>
>
> The good news is that the end-to-end latency is quite stable where the
> throughput of generated events has significantly increase.
> The 'not so good' new is that I would like my latency to be under 1ms
> (again, a single worker and at least for low throughput). Do you have any
> ideas about how to configure the topology? maybe avoid using Trident and
> use Storm API directly can help?
>
> Do you know how I can better understand where I spend my time in this
> topology?
>
> Note that I have tried to increase the max.spout.pending but the end-2-end
> latency is worst (like if everything was, in the end, mono threaded)
>
> For sure, there is something I do not understand. Many thx for you help.
>
> Olivier
>
> PS: when I run the second case in local mode (on my dev laptop), I observe
> a end-2-end latency around 1 to 2 ms.
>
>
>

Reply via email to