I see.

What you described is a known issue in the older version of Kafka, that
some high traffic topics in the bootstrap mode may effectively "starve"
other topics in the fetch response, since brokers used to naively fill in
the bytes that meets the max.bytes configuration and returns. This is fixed
in 1.1 version via incremental fetch request:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-227%3A+Introduce+Incremental+FetchRequests+to+Increase+Partition+Scalability

The basic idea is to not always request topics like A,B,C; instead if the
previous request asks for topics A,B,C and got all data from A, then next
request would be B,C,A, etc. So if you are on older versions of Kafka I'd
suggest you upgrade to newer version.

If you cannot upgrade atm, another suggest as I mentioned above is to
change the segment sizes so you can have much larger, and hence fewer
segment files.

Guozhang


On Wed, Jan 23, 2019 at 8:54 AM Niklas Lönn <niklas.l...@gmail.com> wrote:

> Hi Guozhang,
>
> I think I went a bit ahead of myself in describing the situation, I had an
> attachment with the context in detail, maybe it was filtered out. Lets try
> again =)
>
> We have a topology looking something like this:
>
> input-topic[20 partitions, compacted]
>     |
> use-case-repartition[20 partitions, infinite retention, segment.ms=10min]
>     |
> use-case-changelog
>
> We have previously hit the TooManyOpenFiles issue and "solved" it by
> raising the bar to something extreme.
> Later we found out that we wanted rep factor 3 on all internal topics, so
> we reset the app and BOOM, now we hit a too many memory mapped files limit
> instead
>
> the input topic contains 30 days of data, where we pretty much have records
> in every 10minute window for every partition.
> This means if nothing consumes the repartition topic we will have 6 (10 min
> slots) * 24 hours * 30 days * 20 partitions * 3 (.index .log .timeindex
> files) * 3 replication factor / 5 brokers in cluster = *155.520 *open files
> just to have this repartition topic in place.
>
> You would say, yeah but no problem as it would be deleted and you would not
> reach such high numbers? But doesn't seem to be the case.
> What happened in our case is that, due to how the broker multiplexes the
> topic partitions for the subscribers, the streams application piled up all
> the repartition records, and only when caught up, all the downstream
> processes started taking place. I do see this as a design flaw in some
> component, probably the broker. It cant be the desired behaviour. How many
> open files do I need to be able to have open in a year of data when
> resetting/reprocessing an application?
>
> By adding more threads than input topic partitions, I managed to force the
> broker to give out these records earlier and issue was mitigated.
>
> Ideally the downstream records should be processed somewhere near in time
> as the source record.
>
> Lets take one partition, containing 1.000.000 records this is the observed
> behaviour I have seen: (Somewhat simplified)
>
> Time     Consumer offset Input topic     Records in input topic
>  Consumer offset repartition topic     Records in repartition topic
> 00:00    0                                               1.000.000
>              0                                                        0
> 00:01    100.000                                    1.000.000
>          0                                                        100.000
> 00:02    200.000                                    1.000.000
>          0                                                        200.000
> 00:03    300.000                                    1.000.000
>          0                                                        300.000
> 00:04    400.000                                    1.000.000
>          0                                                        400.000
> 00:05    500.000                                    1.000.000
>          0                                                        500.000
> 00:06    600.000                                    1.000.000
>          0                                                        600.000
> 00:07    700.000                                    1.000.000
>          0                                                        700.000
> 00:08    800.000                                    1.000.000
>          0                                                        800.000
> 00:09    900.000                                    1.000.000
>          0                                                        900.000
> 00:10    1.000.000                                 1.000.000
>        0                                                        1000.000
> 00:11    1.000.000                                 1.000.000
>        100.000                                             1000.000
> 00:12    1.000.000                                 1.000.000
>        200.000                                             1000.000
> 00:13    1.000.000                                 1.000.000
>        300.000                                             1000.000
> 00:14    1.000.000                                 1.000.000
>        400.000                                             1000.000
> 00:15    1.000.000                                 1.000.000
>        500.000                                             1000.000
> 00:16    1.000.000                                 1.000.000
>        600.000                                             1000.000
> 00:17    1.000.000                                 1.000.000
>        700.000                                             1000.000
> 00:18    1.000.000                                 1.000.000
>        800.000                                             1000.000
> 00:19    1.000.000                                 1.000.000
>        900.000                                             1000.000
> 00:20    1.000.000                                 1.000.000
>        1.000.000                                          1000.000
>
> As you can see, there is no parallel execution and its due to that the
> broker does not give any records from repartition topic until input topic
> is depleted.
> By adding more threads than input partitions I managed to mitigate this
> behaviour somewhat, but still not close to balanced.
>
> Ideally in such a situation where we rebuild stream states, I would more
> expect a behaviour like this:
>
> Time     Consumer offset Input topic     Records in input topic
>  Consumer offset repartition topic     Records in repartition topic
> 00:00    0                                               1.000.000
>              0                                                        0
> 00:01    100.000                                    1.000.000
>          0                                                        100.000
> 00:02    200.000                                    1.000.000
>          100.000                                             200.000
> 00:03    300.000                                    1.000.000
>          200.000                                             300.000
> 00:04    400.000                                    1.000.000
>          300.000                                             400.000
> 00:05    500.000                                    1.000.000
>          400.000                                             500.000
> 00:06    600.000                                    1.000.000
>          500.000                                             600.000
> 00:07    700.000                                    1.000.000
>          600.000                                             700.000
> 00:08    800.000                                    1.000.000
>          700.000                                             800.000
> 00:09    900.000                                    1.000.000
>          800.000                                             900.000
> 00:10    1.000.000                                 1.000.000
>        900.000                                             1000.000
> 00:10    1.000.000                                 1.000.000
>        1.000.000                                          1000.000
>
>
> Kind regards
> Niklas
>
> On Tue, Jan 22, 2019 at 6:48 PM Guozhang Wang <wangg...@gmail.com> wrote:
>
> > Hello Niklas,
> >
> > If you can monitor your repartition topic's consumer lag, and it was
> > increasing consistently, it means your downstream processor cannot simply
> > keep up with the throughput of the upstream processor. Usually it means
> > your downstream operators is heavier (e.g. aggregations, joins that are
> all
> > stateful) than your upstreams (e.g. simply for shuffling the data to
> > repartition topics), and since tasks assignment only consider a task as
> the
> > smallest unit of work and did not differentiate "heavy" and "light"
> tasks,
> > such imbalance of task assignment may happen. At the moment, to resolve
> > this you should add more resources to make sure the heavy tasks get
> enough
> > computational resource assigned (more threads, e.g.).
> >
> > If your observed consumer lag stays plateau after increasing to some
> point,
> > it means your consumer can actually keep up with some constant lag; if
> you
> > hit your open file limits before seeing this, it means you either need to
> > increase your open file limits, OR you can simply increase the segment
> size
> > to reduce num. files via "StreamsConfig.TOPIC_PREFIX"to set the value of
> > TopicConfig.SEGMENT_BYTES_CONFIG.
> >
> >
> > Guozhang
> >
> >
> > On Tue, Jan 22, 2019 at 4:38 AM Niklas Lönn <niklas.l...@gmail.com>
> wrote:
> >
> > > Hi Kafka Devs & Users,
> > >
> > > We recently had an issue where we processed a lot of old data and we
> > > crashed our brokers due to too many memory mapped files.
> > > It seems to me that the nature of Kafka / Kafka Streams is a bit
> > > suboptimal in terms of resource management. (Keeping all files open all
> > the
> > > time, maybe there should be something managing this more on-demand?)
> > >
> > > In the issue I described, the repartition topic was produced very fast,
> > > but not consumed, causing a lot of segments and files to be open at the
> > > same time.
> > >
> > > I have worked around the issue by making sure I have more threads than
> > > partitions to force tasks to subscribe to internal topics only, but
> > seems a
> > > bit hacky and maybe there should be some guidance in documentation if
> > > considered part of design..
> > >
> > > After quite some testing and code reversing it seems that the nature of
> > > this imbalance lies within how the broker multiplexes the consumed
> > > topic-partitions.
> > >
> > > I have attached a slide that I will present to my team to explain the
> > > issue in a bit more detail, it might be good to check it out to
> > understand
> > > the context.
> > >
> > > Any thoughts about my findings and concerns?
> > >
> > > Kind regards
> > > Niklas
> > >
> >
> >
> > --
> > -- Guozhang
> >
>


-- 
-- Guozhang

Reply via email to