mimaison commented on code in PR #19387: URL: https://github.com/apache/kafka/pull/19387#discussion_r2032797613
########## storage/src/main/java/org/apache/kafka/storage/internals/log/LogCleaner.java: ########## @@ -0,0 +1,652 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.log; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.errors.KafkaStorageException; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.server.config.BrokerReconfigurable; +import org.apache.kafka.server.config.ServerConfigs; +import org.apache.kafka.server.metrics.KafkaMetricsGroup; +import org.apache.kafka.server.util.ShutdownableThread; +import org.apache.kafka.storage.internals.utils.Throttler; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.security.DigestException; +import java.security.NoSuchAlgorithmException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.stream.IntStream; + +/** + * The cleaner is responsible for removing obsolete records from logs which have the "compact" retention strategy. + * A message with key K and offset O is obsolete if there exists a message with key K and offset O' such that O < O'. + * <p> + * Each log can be thought of being split into two sections of segments: a "clean" section which has previously been cleaned followed by a + * "dirty" section that has not yet been cleaned. The dirty section is further divided into the "cleanable" section followed by an "uncleanable" section. + * The uncleanable section is excluded from cleaning. The active log segment is always uncleanable. If there is a + * compaction lag time set, segments whose largest message timestamp is within the compaction lag time of the cleaning operation are also uncleanable. + * <p> + * The cleaning is carried out by a pool of background threads. Each thread chooses the dirtiest log that has the "compact" retention policy + * and cleans that. The dirtiness of the log is guessed by taking the ratio of bytes in the dirty section of the log to the total bytes in the log. + * <p> + * To clean a log the cleaner first builds a mapping of key=>last_offset for the dirty section of the log. See {@link OffsetMap} for details of + * the implementation of the mapping. + * <p> + * Once the key=>last_offset map is built, the log is cleaned by recopying each log segment but omitting any key that appears in the offset map with a + * higher offset than what is found in the segment (i.e. messages with a key that appears in the dirty section of the log). + * <p> + * To avoid segments shrinking to very small sizes with repeated cleanings we implement a rule by which if we will merge successive segments when + * doing a cleaning if their log and index size are less than the maximum log and index size prior to the clean beginning. + * <p> + * Cleaned segments are swapped into the log as they become available. + * <p> + * One nuance that the cleaner must handle is log truncation. If a log is truncated while it is being cleaned the cleaning of that log is aborted. + * <p> + * Messages with null payload are treated as deletes for the purpose of log compaction. This means that they receive special treatment by the cleaner. + * The cleaner will only retain delete records for a period of time to avoid accumulating space indefinitely. This period of time is configurable on a per-topic + * basis and is measured from the time the segment enters the clean portion of the log (at which point any prior message with that key has been removed). + * Delete markers in the clean section of the log that are older than this time will not be retained when log segments are being recopied as part of cleaning. + * This time is tracked by setting the base timestamp of a record batch with delete markers when the batch is recopied in the first cleaning that encounters + * it. The relative timestamps of the records in the batch are also modified when recopied in this cleaning according to the new base timestamp of the batch. + * <p> + * Note that cleaning is more complicated with the idempotent/transactional producer capabilities. The following + * are the key points: + * <p> + * <ol> + * <li>In order to maintain sequence number continuity for active producers, we always retain the last batch + * from each producerId, even if all the records from the batch have been removed. The batch will be removed + * once the producer either writes a new batch or is expired due to inactivity.</li> + * <li>We do not clean beyond the last stable offset. This ensures that all records observed by the cleaner have + * been decided (i.e. committed or aborted). In particular, this allows us to use the transaction index to + * collect the aborted transactions ahead of time.</li> + * <li>Records from aborted transactions are removed by the cleaner immediately without regard to record keys.</li> + * <li>Transaction markers are retained until all record batches from the same transaction have been removed and + * a sufficient amount of time has passed to reasonably ensure that an active consumer wouldn't consume any + * data from the transaction prior to reaching the offset of the marker. This follows the same logic used for + * tombstone deletion.</li> + * </ol> + */ +public class LogCleaner implements BrokerReconfigurable { + private static final Logger LOG = LoggerFactory.getLogger(LogCleaner.class); Review Comment: There's also instances in `Cleaner` and `CleanerThread`. As I said I'm unsure which approach to take, so I'd like to get input from other committers too. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org