Not a problem. Glad that you've not seen it anymore now.

If it occurs again please feel free to reach out to the community again.


Guozhang

On Thu, Jan 24, 2019 at 2:32 PM Niklas Lönn <niklas.l...@gmail.com> wrote:

> Hi.
>
> I have something good (and personally mysterious) to report.
>
> We do indeed run 1.1.x in production.
>
>  And today when I was almost finished cleaning up my test case for public
> display, I had been forced by corp policies to update osx, and suddenly
> when I had my test in a "non hacky improvised piece of iteration test code
> not asserting stuff" mode, I couldn't recreate the issue any more, not with
> the new or the old code.
>
> I suspect I was unlucky to hit some other issue in my os/firmware having
> very similar symptoms as we had in production, ran my test on another
> computer without this update and it was fine there as well.
>
> I guess that concludes that you are most likely very right with this 1.1
> bug and I was super unlucky to be able to recreate it locally due to other
> issues.
>
> Thanks for the support and rubber ducking :)
>
> Kind regards
> Niklas
>
> On Thu 24. Jan 2019 at 02:08, Guozhang Wang <wangg...@gmail.com> wrote:
>
> > I see (btw attachments are usually not allowed in AK mailing list, but if
> > you have it somewhere like gitcode and can share the url that works).
> >
> > Could you let me know how many physical cores do you have in total
> hosting
> > your app and how many threads did you configure? From your current
> > description there should have at least 40 tasks (20 reading from source
> > topics and writing to repartition topics, and 20 reading from repartition
> > topics), and I'd like to know how are these tasks be assigned to threads,
> > and how many threads may be executed in parallel from the hardware.
> >
> >
> > Guozhang
> >
> >
> > On Wed, Jan 23, 2019 at 1:21 PM Niklas Lönn <niklas.l...@gmail.com>
> wrote:
> >
> > > I have to double check what version of broker we run in production but
> > when
> > > testing and verifying the issue locally I did reproduce it with both
> > broker
> > > and client version 2.1.0
> > >
> > > Kind regards
> > > Niklas
> > >
> > > On Wed 23. Jan 2019 at 18:24, Guozhang Wang <wangg...@gmail.com>
> wrote:
> > >
> > > > I see.
> > > >
> > > > What you described is a known issue in the older version of Kafka,
> that
> > > > some high traffic topics in the bootstrap mode may effectively
> "starve"
> > > > other topics in the fetch response, since brokers used to naively
> fill
> > in
> > > > the bytes that meets the max.bytes configuration and returns. This is
> > > fixed
> > > > in 1.1 version via incremental fetch request:
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-227%3A+Introduce+Incremental+FetchRequests+to+Increase+Partition+Scalability
> > > >
> > > > The basic idea is to not always request topics like A,B,C; instead if
> > the
> > > > previous request asks for topics A,B,C and got all data from A, then
> > next
> > > > request would be B,C,A, etc. So if you are on older versions of Kafka
> > I'd
> > > > suggest you upgrade to newer version.
> > > >
> > > > If you cannot upgrade atm, another suggest as I mentioned above is to
> > > > change the segment sizes so you can have much larger, and hence fewer
> > > > segment files.
> > > >
> > > > Guozhang
> > > >
> > > >
> > > > On Wed, Jan 23, 2019 at 8:54 AM Niklas Lönn <niklas.l...@gmail.com>
> > > wrote:
> > > >
> > > > > Hi Guozhang,
> > > > >
> > > > > I think I went a bit ahead of myself in describing the situation, I
> > had
> > > > an
> > > > > attachment with the context in detail, maybe it was filtered out.
> > Lets
> > > > try
> > > > > again =)
> > > > >
> > > > > We have a topology looking something like this:
> > > > >
> > > > > input-topic[20 partitions, compacted]
> > > > >     |
> > > > > use-case-repartition[20 partitions, infinite retention, segment.ms
> > > > =10min]
> > > > >     |
> > > > > use-case-changelog
> > > > >
> > > > > We have previously hit the TooManyOpenFiles issue and "solved" it
> by
> > > > > raising the bar to something extreme.
> > > > > Later we found out that we wanted rep factor 3 on all internal
> > topics,
> > > so
> > > > > we reset the app and BOOM, now we hit a too many memory mapped
> files
> > > > limit
> > > > > instead
> > > > >
> > > > > the input topic contains 30 days of data, where we pretty much have
> > > > records
> > > > > in every 10minute window for every partition.
> > > > > This means if nothing consumes the repartition topic we will have 6
> > (10
> > > > min
> > > > > slots) * 24 hours * 30 days * 20 partitions * 3 (.index .log
> > .timeindex
> > > > > files) * 3 replication factor / 5 brokers in cluster = *155.520
> *open
> > > > files
> > > > > just to have this repartition topic in place.
> > > > >
> > > > > You would say, yeah but no problem as it would be deleted and you
> > would
> > > > not
> > > > > reach such high numbers? But doesn't seem to be the case.
> > > > > What happened in our case is that, due to how the broker
> multiplexes
> > > the
> > > > > topic partitions for the subscribers, the streams application piled
> > up
> > > > all
> > > > > the repartition records, and only when caught up, all the
> downstream
> > > > > processes started taking place. I do see this as a design flaw in
> > some
> > > > > component, probably the broker. It cant be the desired behaviour.
> How
> > > > many
> > > > > open files do I need to be able to have open in a year of data when
> > > > > resetting/reprocessing an application?
> > > > >
> > > > > By adding more threads than input topic partitions, I managed to
> > force
> > > > the
> > > > > broker to give out these records earlier and issue was mitigated.
> > > > >
> > > > > Ideally the downstream records should be processed somewhere near
> in
> > > time
> > > > > as the source record.
> > > > >
> > > > > Lets take one partition, containing 1.000.000 records this is the
> > > > observed
> > > > > behaviour I have seen: (Somewhat simplified)
> > > > >
> > > > > Time     Consumer offset Input topic     Records in input topic
> > > > >  Consumer offset repartition topic     Records in repartition topic
> > > > > 00:00    0                                               1.000.000
> > > > >              0
> >   0
> > > > > 00:01    100.000                                    1.000.000
> > > > >          0
> > > 100.000
> > > > > 00:02    200.000                                    1.000.000
> > > > >          0
> > > 200.000
> > > > > 00:03    300.000                                    1.000.000
> > > > >          0
> > > 300.000
> > > > > 00:04    400.000                                    1.000.000
> > > > >          0
> > > 400.000
> > > > > 00:05    500.000                                    1.000.000
> > > > >          0
> > > 500.000
> > > > > 00:06    600.000                                    1.000.000
> > > > >          0
> > > 600.000
> > > > > 00:07    700.000                                    1.000.000
> > > > >          0
> > > 700.000
> > > > > 00:08    800.000                                    1.000.000
> > > > >          0
> > > 800.000
> > > > > 00:09    900.000                                    1.000.000
> > > > >          0
> > > 900.000
> > > > > 00:10    1.000.000                                 1.000.000
> > > > >        0
> > > 1000.000
> > > > > 00:11    1.000.000                                 1.000.000
> > > > >        100.000                                             1000.000
> > > > > 00:12    1.000.000                                 1.000.000
> > > > >        200.000                                             1000.000
> > > > > 00:13    1.000.000                                 1.000.000
> > > > >        300.000                                             1000.000
> > > > > 00:14    1.000.000                                 1.000.000
> > > > >        400.000                                             1000.000
> > > > > 00:15    1.000.000                                 1.000.000
> > > > >        500.000                                             1000.000
> > > > > 00:16    1.000.000                                 1.000.000
> > > > >        600.000                                             1000.000
> > > > > 00:17    1.000.000                                 1.000.000
> > > > >        700.000                                             1000.000
> > > > > 00:18    1.000.000                                 1.000.000
> > > > >        800.000                                             1000.000
> > > > > 00:19    1.000.000                                 1.000.000
> > > > >        900.000                                             1000.000
> > > > > 00:20    1.000.000                                 1.000.000
> > > > >        1.000.000                                          1000.000
> > > > >
> > > > > As you can see, there is no parallel execution and its due to that
> > the
> > > > > broker does not give any records from repartition topic until input
> > > topic
> > > > > is depleted.
> > > > > By adding more threads than input partitions I managed to mitigate
> > this
> > > > > behaviour somewhat, but still not close to balanced.
> > > > >
> > > > > Ideally in such a situation where we rebuild stream states, I would
> > > more
> > > > > expect a behaviour like this:
> > > > >
> > > > > Time     Consumer offset Input topic     Records in input topic
> > > > >  Consumer offset repartition topic     Records in repartition topic
> > > > > 00:00    0                                               1.000.000
> > > > >              0
> >   0
> > > > > 00:01    100.000                                    1.000.000
> > > > >          0
> > > 100.000
> > > > > 00:02    200.000                                    1.000.000
> > > > >          100.000
>  200.000
> > > > > 00:03    300.000                                    1.000.000
> > > > >          200.000
>  300.000
> > > > > 00:04    400.000                                    1.000.000
> > > > >          300.000
>  400.000
> > > > > 00:05    500.000                                    1.000.000
> > > > >          400.000
>  500.000
> > > > > 00:06    600.000                                    1.000.000
> > > > >          500.000
>  600.000
> > > > > 00:07    700.000                                    1.000.000
> > > > >          600.000
>  700.000
> > > > > 00:08    800.000                                    1.000.000
> > > > >          700.000
>  800.000
> > > > > 00:09    900.000                                    1.000.000
> > > > >          800.000
>  900.000
> > > > > 00:10    1.000.000                                 1.000.000
> > > > >        900.000                                             1000.000
> > > > > 00:10    1.000.000                                 1.000.000
> > > > >        1.000.000                                          1000.000
> > > > >
> > > > >
> > > > > Kind regards
> > > > > Niklas
> > > > >
> > > > > On Tue, Jan 22, 2019 at 6:48 PM Guozhang Wang <wangg...@gmail.com>
> > > > wrote:
> > > > >
> > > > > > Hello Niklas,
> > > > > >
> > > > > > If you can monitor your repartition topic's consumer lag, and it
> > was
> > > > > > increasing consistently, it means your downstream processor
> cannot
> > > > simply
> > > > > > keep up with the throughput of the upstream processor. Usually it
> > > means
> > > > > > your downstream operators is heavier (e.g. aggregations, joins
> that
> > > are
> > > > > all
> > > > > > stateful) than your upstreams (e.g. simply for shuffling the data
> > to
> > > > > > repartition topics), and since tasks assignment only consider a
> > task
> > > as
> > > > > the
> > > > > > smallest unit of work and did not differentiate "heavy" and
> "light"
> > > > > tasks,
> > > > > > such imbalance of task assignment may happen. At the moment, to
> > > resolve
> > > > > > this you should add more resources to make sure the heavy tasks
> get
> > > > > enough
> > > > > > computational resource assigned (more threads, e.g.).
> > > > > >
> > > > > > If your observed consumer lag stays plateau after increasing to
> > some
> > > > > point,
> > > > > > it means your consumer can actually keep up with some constant
> lag;
> > > if
> > > > > you
> > > > > > hit your open file limits before seeing this, it means you either
> > > need
> > > > to
> > > > > > increase your open file limits, OR you can simply increase the
> > > segment
> > > > > size
> > > > > > to reduce num. files via "StreamsConfig.TOPIC_PREFIX"to set the
> > value
> > > > of
> > > > > > TopicConfig.SEGMENT_BYTES_CONFIG.
> > > > > >
> > > > > >
> > > > > > Guozhang
> > > > > >
> > > > > >
> > > > > > On Tue, Jan 22, 2019 at 4:38 AM Niklas Lönn <
> niklas.l...@gmail.com
> > >
> > > > > wrote:
> > > > > >
> > > > > > > Hi Kafka Devs & Users,
> > > > > > >
> > > > > > > We recently had an issue where we processed a lot of old data
> and
> > > we
> > > > > > > crashed our brokers due to too many memory mapped files.
> > > > > > > It seems to me that the nature of Kafka / Kafka Streams is a
> bit
> > > > > > > suboptimal in terms of resource management. (Keeping all files
> > open
> > > > all
> > > > > > the
> > > > > > > time, maybe there should be something managing this more
> > > on-demand?)
> > > > > > >
> > > > > > > In the issue I described, the repartition topic was produced
> very
> > > > fast,
> > > > > > > but not consumed, causing a lot of segments and files to be
> open
> > at
> > > > the
> > > > > > > same time.
> > > > > > >
> > > > > > > I have worked around the issue by making sure I have more
> threads
> > > > than
> > > > > > > partitions to force tasks to subscribe to internal topics only,
> > but
> > > > > > seems a
> > > > > > > bit hacky and maybe there should be some guidance in
> > documentation
> > > if
> > > > > > > considered part of design..
> > > > > > >
> > > > > > > After quite some testing and code reversing it seems that the
> > > nature
> > > > of
> > > > > > > this imbalance lies within how the broker multiplexes the
> > consumed
> > > > > > > topic-partitions.
> > > > > > >
> > > > > > > I have attached a slide that I will present to my team to
> explain
> > > the
> > > > > > > issue in a bit more detail, it might be good to check it out to
> > > > > > understand
> > > > > > > the context.
> > > > > > >
> > > > > > > Any thoughts about my findings and concerns?
> > > > > > >
> > > > > > > Kind regards
> > > > > > > Niklas
> > > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > -- Guozhang
> > > > > >
> > > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
> >
> > --
> > -- Guozhang
> >
>


-- 
-- Guozhang

Reply via email to