I created the following bug report based on this conversation: https://issues.apache.org/jira/browse/KAFKA-7506
//Niklas On Thu, Oct 11, 2018 at 4:12 PM Niklas Lönn <niklas.l...@gmail.com> wrote: > Hi again, > > On another note, does it really make much sense to limit by time this way? > wouldn't it be sufficient with just the 50mb? In use-cases like ours, > restarting/rebuilding the state opened almost 30K extra files on each > broker (It seems it is doing the repartitioning much faster than the > remaining transformations, so it becomes a quite big "buffer"). > > It looked like it was not possible to configure this (thereby i had to > patch the file directly), maybe it would be desirable to make it > configurable for use-cases like this one if, not considered a main use-case? > > Once my app caught up (took like 8 hours), the amount of open files > decreased again and it looks like the cleanup is doing its job. > > Kind regards > Niklas > > On Wed, Oct 10, 2018 at 4:38 PM Niklas Lönn <niklas.l...@gmail.com> wrote: > >> Thanks Guozhang, >> >> Thanks for a very good answer! >> I now understand, so the idea is that the client cleans up after itself >> and that way there is a minimal amount of garbage in the repartition topic. >> >> We actually figured out we had another max open files limit we did hit >> indeed, and adjusting that limit we now successfully managed to start our >> application without crashing the brokers. >> >> However, I think I discovered a bug in the repartitioning setup, let me >> first try to explain our setup: >> We have a compacted topic, containing mostly short lived values, where >> tombstones normally are created within some hours, but could be delayed as >> much as a month. >> I suspect the repartition segments honor the timestamps of the records, >> and when resetting the application we process records that are quite old, >> therefore creating many many segments and a lot of open files as a result. >> >> When running my application I noticed these messages: >> Fetch offset 213792 is out of range for partition >> app-id-KTABLE-AGGREGATE-STATE-STORE-0000000015-repartition-7, resetting >> offset >> Fetch offset 110227 is out of range for partition >> app-id-KTABLE-AGGREGATE-STATE-STORE-0000000015-repartition-2, resetting >> offset >> Resetting offset for partition >> app-id-KTABLE-AGGREGATE-STATE-STORE-0000000015-repartition-7 to offset >> 233302. >> Resetting offset for partition >> app-id-KTABLE-AGGREGATE-STATE-STORE-0000000015-repartition-2 to offset >> 119914. >> >> This effectively made my application skip messages and I verified by >> patching RepartitionTopicConfig.java that it is due to the undefined >> retention.ms, leaving a default retention on the records meaning that my >> application was competing with the log cleaner. >> >> By adding this line i got rid of these messages: >> tempTopicDefaultOverrides.put(TopicConfig.RETENTION_MS_CONFIG, "-1"); // >> Infinite >> >> My understanding is that this should be safe as the cleanup is handled by >> the client invoking the admin api? >> >> Kind regards >> Niklas >> >> >> On Tue, Oct 9, 2018 at 8:47 PM Guozhang Wang <wangg...@gmail.com> wrote: >> >>> Hi Niklas, >>> >>> Default value of segment.ms is set to 10min as part of this project >>> (introduced in Kafka 1.1.0): >>> >>> https://jira.apache.org/jira/browse/KAFKA-6150 >>> >>> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-204+%3A+Adding+records+deletion+operation+to+the+new+Admin+Client+API >>> >>> In KIP-204 (KAFKA-6150), we added admin request to periodically delete >>> records immediately upon committing offsets, to make repartition topics >>> really "transient", and along with it we set the default segment.ms to >>> 10min. The rationale is that to make record purging effective, we need to >>> have smaller segment size so that we can delete those files after the >>> purged offset is larger that the segment's last offset in time. >>> >>> >>> Which Kafka version are you using currently? Did you observe that data >>> purging did not happen (otherwise segment files should be garbage >>> collected >>> quickly), or is your traffic very small or commit infrequently which >>> resulted in ineffective purging? >>> >>> >>> Guozhang >>> >>> >>> >>> On Tue, Oct 9, 2018 at 4:07 AM, Niklas Lönn <niklas.l...@gmail.com> >>> wrote: >>> >>> > Hi, >>> > >>> > Recently we experienced a problem when resetting a streams application, >>> > doing quite a lot of operations based on 2 compacted source topics, >>> with 20 >>> > partitions. >>> > >>> > We crashed entire broker cluster with TooManyOpenFiles exception (We >>> have a >>> > multi million limit already) >>> > >>> > When inspecting the internal topics configuration I noticed that the >>> > repartition topics have a default config of: >>> > *Configs:segment.bytes=52428800,segment.index.bytes= >>> > 52428800,cleanup.policy=delete,segment.ms >>> > <http://segment.ms>=600000* >>> > >>> > My source topic is a compacted topic used as a KTable, and lets assume >>> I >>> > have data for every segment of 10min, I would quickly get 1.440 >>> segments >>> > per partition per day. >>> > >>> > Since this repartition topic is not even compacted, I cant understand >>> the >>> > reasoning behind having a default of 10min segment.ms and 50mb >>> > segment.bytes? >>> > >>> > Is there any best process regarding this? Potentially we could crash >>> the >>> > cluster every-time we need to reset an application. >>> > >>> > And does it make sense that it would keep so many open files at the >>> same >>> > time in the first place? Could it be a bug in file management of the >>> Kafka >>> > broker? >>> > >>> > Kind regards >>> > Niklas >>> > >>> >>> >>> >>> -- >>> -- Guozhang >>> >>