mimaison commented on code in PR #19387:
URL: https://github.com/apache/kafka/pull/19387#discussion_r2032797613


##########
storage/src/main/java/org/apache/kafka/storage/internals/log/LogCleaner.java:
##########
@@ -0,0 +1,652 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.errors.KafkaStorageException;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.server.config.BrokerReconfigurable;
+import org.apache.kafka.server.config.ServerConfigs;
+import org.apache.kafka.server.metrics.KafkaMetricsGroup;
+import org.apache.kafka.server.util.ShutdownableThread;
+import org.apache.kafka.storage.internals.utils.Throttler;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.security.DigestException;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.stream.IntStream;
+
+/**
+ * The cleaner is responsible for removing obsolete records from logs which 
have the "compact" retention strategy.
+ * A message with key K and offset O is obsolete if there exists a message 
with key K and offset O' such that O < O'.
+ * <p>
+ * Each log can be thought of being split into two sections of segments: a 
"clean" section which has previously been cleaned followed by a
+ * "dirty" section that has not yet been cleaned. The dirty section is further 
divided into the "cleanable" section followed by an "uncleanable" section.
+ * The uncleanable section is excluded from cleaning. The active log segment 
is always uncleanable. If there is a
+ * compaction lag time set, segments whose largest message timestamp is within 
the compaction lag time of the cleaning operation are also uncleanable.
+ * <p>
+ * The cleaning is carried out by a pool of background threads. Each thread 
chooses the dirtiest log that has the "compact" retention policy
+ * and cleans that. The dirtiness of the log is guessed by taking the ratio of 
bytes in the dirty section of the log to the total bytes in the log.
+ * <p>
+ * To clean a log the cleaner first builds a mapping of key=>last_offset for 
the dirty section of the log. See {@link OffsetMap} for details of
+ * the implementation of the mapping.
+ * <p>
+ * Once the key=>last_offset map is built, the log is cleaned by recopying 
each log segment but omitting any key that appears in the offset map with a
+ * higher offset than what is found in the segment (i.e. messages with a key 
that appears in the dirty section of the log).
+ * <p>
+ * To avoid segments shrinking to very small sizes with repeated cleanings we 
implement a rule by which if we will merge successive segments when
+ * doing a cleaning if their log and index size are less than the maximum log 
and index size prior to the clean beginning.
+ * <p>
+ * Cleaned segments are swapped into the log as they become available.
+ * <p>
+ * One nuance that the cleaner must handle is log truncation. If a log is 
truncated while it is being cleaned the cleaning of that log is aborted.
+ * <p>
+ * Messages with null payload are treated as deletes for the purpose of log 
compaction. This means that they receive special treatment by the cleaner.
+ * The cleaner will only retain delete records for a period of time to avoid 
accumulating space indefinitely. This period of time is configurable on a 
per-topic
+ * basis and is measured from the time the segment enters the clean portion of 
the log (at which point any prior message with that key has been removed).
+ * Delete markers in the clean section of the log that are older than this 
time will not be retained when log segments are being recopied as part of 
cleaning.
+ * This time is tracked by setting the base timestamp of a record batch with 
delete markers when the batch is recopied in the first cleaning that encounters
+ * it. The relative timestamps of the records in the batch are also modified 
when recopied in this cleaning according to the new base timestamp of the batch.
+ * <p>
+ * Note that cleaning is more complicated with the idempotent/transactional 
producer capabilities. The following
+ * are the key points:
+ * <p>
+ * <ol>
+ * <li>In order to maintain sequence number continuity for active producers, 
we always retain the last batch
+ *    from each producerId, even if all the records from the batch have been 
removed. The batch will be removed
+ *    once the producer either writes a new batch or is expired due to 
inactivity.</li>
+ * <li>We do not clean beyond the last stable offset. This ensures that all 
records observed by the cleaner have
+ *    been decided (i.e. committed or aborted). In particular, this allows us 
to use the transaction index to
+ *    collect the aborted transactions ahead of time.</li>
+ * <li>Records from aborted transactions are removed by the cleaner 
immediately without regard to record keys.</li>
+ * <li>Transaction markers are retained until all record batches from the same 
transaction have been removed and
+ *    a sufficient amount of time has passed to reasonably ensure that an 
active consumer wouldn't consume any
+ *    data from the transaction prior to reaching the offset of the marker. 
This follows the same logic used for
+ *    tombstone deletion.</li>
+ * </ol>
+ */
+public class LogCleaner implements BrokerReconfigurable {
+    private static final Logger LOG = 
LoggerFactory.getLogger(LogCleaner.class);

Review Comment:
   There's also instances in `Cleaner` and `CleanerThread`. 
   
   As I said I'm unsure which approach to take, so I'd like to get input from 
other committers too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to