I see. For windowed aggregations the disk space (i.e.
"/tmp/kafka-streams/appname") as well as memory consumption on RocksDB
should not keep increasing forever. One thing to note is that you are using
a tumbling window where a new window will be created every minute, so
within 20 minutes of "event time", note this is not your processing time,
you will get 20 windows, and each update could be applied to each one of
the window in the worst case.

Hence, your space consumption would roughly be #. input traffic (mb / sec)
* avg #. windows for each input (worst case 20) * RocksDB space
amplification.

We have been using 0.10.0.1 in production with 10+ aggregations and we do
see memory usage climbing up to 50GB for a single node, again depending on
the input traffic, but it did not climbing forever.

Guozhang

On Wed, Nov 30, 2016 at 4:09 AM, Jon Yeargers <jon.yearg...@cedexis.com>
wrote:

> My apologies. In fact the 'aggregate' step includes this:
> 'TimeWindows.of(20
> * 60 * 1000L).advanceBy(60 * 1000L)'
>
> On Tue, Nov 29, 2016 at 9:12 PM, Guozhang Wang <wangg...@gmail.com> wrote:
>
> > Where does the "20 minutes" come from? I thought the "aggregate" operator
> > in your
> >
> > stream->aggregate->filter->foreach
> >
> > topology is not a windowed aggregation, so the aggregate results will
> keep
> > accumulating.
> >
> >
> > Guozhang
> >
> >
> > On Tue, Nov 29, 2016 at 8:40 PM, Jon Yeargers <jon.yearg...@cedexis.com>
> > wrote:
> >
> > > "keep increasing" - why? It seems (to me) that the aggregates should be
> > 20
> > > minutes long. After that the memory should be released.
> > >
> > > Not true?
> > >
> > > On Tue, Nov 29, 2016 at 3:53 PM, Guozhang Wang <wangg...@gmail.com>
> > wrote:
> > >
> > > > Jon,
> > > >
> > > > Note that in your "aggregate" function, if it is now windowed
> aggregate
> > > > then the aggregation results will keep increasing in your local state
> > > > stores unless you're pretty sure that the aggregate key space is
> > bounded.
> > > > This is not only related to disk space but also memory since the
> > current
> > > > default persistent state store, RocksDB, takes its own block cache
> both
> > > on
> > > > heap and out of heap.
> > > >
> > > > In addition, RocksDB has a write / space amplification. That is, if
> > your
> > > > estimate size of the aggregated state store takes 1GB, it will
> actually
> > > > take 1 * max-amplification-factor in worst case, and similarly for
> > block
> > > > cache and write buffer.
> > > >
> > > >
> > > > Guozhang
> > > >
> > > > On Tue, Nov 29, 2016 at 12:25 PM, Jon Yeargers <
> > jon.yearg...@cedexis.com
> > > >
> > > > wrote:
> > > >
> > > > > App eventually got OOM-killed. Consumed 53G of swap space.
> > > > >
> > > > > Does it require a different GC? Some extra settings for the java
> cmd
> > > > line?
> > > > >
> > > > > On Tue, Nov 29, 2016 at 12:05 PM, Jon Yeargers <
> > > jon.yearg...@cedexis.com
> > > > >
> > > > > wrote:
> > > > >
> > > > > >  I cloned/built 10.2.0-SNAPSHOT
> > > > > >
> > > > > > App hasn't been OOM-killed yet but it's up to 66% mem.
> > > > > >
> > > > > > App takes > 10 min to start now. Needless to say this is
> > problematic.
> > > > > >
> > > > > > The 'kafka-streams' scratch space has consumed 37G and still
> > > climbing.
> > > > > >
> > > > > >
> > > > > > On Tue, Nov 29, 2016 at 10:48 AM, Jon Yeargers <
> > > > jon.yearg...@cedexis.com
> > > > > >
> > > > > > wrote:
> > > > > >
> > > > > >> Does every broker need to be updated or just my client app(s)?
> > > > > >>
> > > > > >> On Tue, Nov 29, 2016 at 10:46 AM, Matthias J. Sax <
> > > > > matth...@confluent.io>
> > > > > >> wrote:
> > > > > >>
> > > > > >>> What version do you use?
> > > > > >>>
> > > > > >>> There is a memory leak in the latest version 0.10.1.0. The bug
> > got
> > > > > >>> already fixed in trunk and 0.10.1 branch.
> > > > > >>>
> > > > > >>> There is already a discussion about a 0.10.1.1 bug fix release.
> > For
> > > > > now,
> > > > > >>> you could build the Kafka Streams from the sources by yourself.
> > > > > >>>
> > > > > >>> -Matthias
> > > > > >>>
> > > > > >>>
> > > > > >>> On 11/29/16 10:30 AM, Jon Yeargers wrote:
> > > > > >>> > My KStreams app seems to be having some memory issues.
> > > > > >>> >
> > > > > >>> > 1. I start it `java -Xmx8G -jar <app>.jar`
> > > > > >>> >
> > > > > >>> > 2. Wait 5-10 minutes - see lots of 'org.apache.zookeeper.
> > > > ClientCnxn
> > > > > -
> > > > > >>> Got
> > > > > >>> > ping response for sessionid: 0xc58abee3e000013 after 0ms'
> > > messages
> > > > > >>> >
> > > > > >>> > 3. When it _finally_ starts reading values it typically goes
> > for
> > > a
> > > > > >>> minute
> > > > > >>> > or so, reads a few thousand values and then the OS kills it
> > with
> > > > 'Out
> > > > > >>> of
> > > > > >>> > memory' error.
> > > > > >>> >
> > > > > >>> > The topology is (essentially):
> > > > > >>> >
> > > > > >>> > stream->aggregate->filter->foreach
> > > > > >>> >
> > > > > >>> > It's reading values and creating a rolling average.
> > > > > >>> >
> > > > > >>> > During phase 2 (above) I see lots of IO wait and the
> 'scratch'
> > > > buffer
> > > > > >>> > (usually "/tmp/kafka-streams/appname") fills with 10s of Gb
> of
> > > .. ?
> > > > > (I
> > > > > >>> had
> > > > > >>> > to create a special scratch partition with 100Gb of space as
> > > kafka
> > > > > >>> would
> > > > > >>> > fill the / partition and make the system v v unhappy)
> > > > > >>> >
> > > > > >>> > Have I misconfigured something?
> > > > > >>> >
> > > > > >>>
> > > > > >>>
> > > > > >>
> > > > > >
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang

Reply via email to