majialoong commented on code in PR #22426: URL: https://github.com/apache/kafka/pull/22426#discussion_r3331819533
########## storage/src/test/java/org/apache/kafka/storage/internals/log/LogLoaderTest.java: ########## @@ -0,0 +1,1863 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.log; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.compress.Compression; +import org.apache.kafka.common.errors.KafkaStorageException; +import org.apache.kafka.common.message.AbortedTxn; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.record.internal.ControlRecordType; +import org.apache.kafka.common.record.internal.DefaultRecordBatch; +import org.apache.kafka.common.record.internal.MemoryRecords; +import org.apache.kafka.common.record.internal.MemoryRecordsBuilder; +import org.apache.kafka.common.record.internal.Record; +import org.apache.kafka.common.record.internal.RecordBatch; +import org.apache.kafka.common.record.internal.SimpleRecord; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.coordinator.transaction.TransactionLogConfig; +import org.apache.kafka.metadata.MockConfigRepository; +import org.apache.kafka.server.common.TransactionVersion; +import org.apache.kafka.server.util.MockTime; +import org.apache.kafka.server.util.Scheduler; +import org.apache.kafka.storage.internals.checkpoint.CleanShutdownFileHandler; +import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache; +import org.apache.kafka.storage.log.metrics.BrokerTopicStats; +import org.apache.kafka.test.TestUtils; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalLong; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyList; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class LogLoaderTest { + + private final BrokerTopicStats brokerTopicStats = new BrokerTopicStats(); + private final ProducerStateManagerConfig producerStateManagerConfig = + new ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT, false); + private final File tmpDir = TestUtils.tempDirectory(); + private final File logDir = TestUtils.randomPartitionLogDir(tmpDir); + private final List<UnifiedLog> logsToClose = new ArrayList<>(); + private final MockTime mockTime = new MockTime(); + + @AfterEach + public void tearDown() throws IOException { + brokerTopicStats.close(); + logsToClose.forEach(l -> Utils.closeQuietly(l, "UnifiedLog")); + Utils.delete(tmpDir); + } + + private enum ErrorType { + IO_EXCEPTION, + RUNTIME_EXCEPTION, + KAFKA_STORAGE_EXCEPTION_WITH_IO_EXCEPTION_CAUSE, + KAFKA_STORAGE_EXCEPTION_WITHOUT_IO_EXCEPTION_CAUSE + } + + private static class SimulateError { + boolean hasError = false; + ErrorType errorType = ErrorType.RUNTIME_EXCEPTION; + } + + private record LogAndSegment(UnifiedLog log, LogSegment segment) { } + + @Test + public void testLogRecoveryIsCalledUponBrokerCrash() throws Exception { + // LogManager must realize correctly if the last shutdown was not clean and the logs need + // to run recovery while loading upon subsequent broker boot up. + File logDir = TestUtils.tempDirectory(); + LogConfig logConfig = new LogConfig(Map.of()); + List<File> logDirs = List.of(logDir); + TopicPartition topicPartition = new TopicPartition("foo", 0); + MockTime time = new MockTime(); + LogDirFailureChannel logDirFailureChannel = new LogDirFailureChannel(logDirs.size()); + + AtomicBoolean cleanShutdownInterceptedValue = new AtomicBoolean(false); + SimulateError simulateError = new SimulateError(); + + CleanShutdownFileHandler cleanShutdownFileHandler = new CleanShutdownFileHandler(logDir.getPath()); + { + LogManager logManager = initializeLogManagerForSimulatingErrorTest(logConfig, logDirs, topicPartition, time, cleanShutdownInterceptedValue, simulateError); + + // Load logs after a clean shutdown + cleanShutdownFileHandler.write(0L); + cleanShutdownInterceptedValue.set(false); + LogConfig defaultConfig = logManager.currentDefaultConfig(); + logManager.loadLogs(defaultConfig, logManager.fetchTopicConfigOverrides(defaultConfig, Set.of()), l -> false); + assertTrue(cleanShutdownInterceptedValue.get(), "Unexpected value intercepted for clean shutdown flag"); + assertFalse(cleanShutdownFileHandler.exists(), "Clean shutdown file must not exist after loadLogs has completed"); + // Load logs without clean shutdown file + cleanShutdownInterceptedValue.set(true); + defaultConfig = logManager.currentDefaultConfig(); + logManager.loadLogs(defaultConfig, logManager.fetchTopicConfigOverrides(defaultConfig, Set.of()), l -> false); + assertFalse(cleanShutdownInterceptedValue.get(), "Unexpected value intercepted for clean shutdown flag"); + assertFalse(cleanShutdownFileHandler.exists(), "Clean shutdown file must not exist after loadLogs has completed"); + // Create clean shutdown file and then simulate error while loading logs such that log loading does not complete. + cleanShutdownFileHandler.write(0L); + logManager.shutdown(); + } + + { + LogManager logManager = initializeLogManagerForSimulatingErrorTest(logConfig, logDirs, topicPartition, + logDirFailureChannel, time, cleanShutdownInterceptedValue, simulateError); + + Callable<Void> runLoadLogs = () -> { + LogConfig defaultConfig = logManager.currentDefaultConfig(); + logManager.loadLogs(defaultConfig, logManager.fetchTopicConfigOverrides(defaultConfig, Set.of()), l -> false); + return null; + }; + + // Simulate Runtime error + simulateError.hasError = true; + simulateError.errorType = ErrorType.RUNTIME_EXCEPTION; + assertThrows(RuntimeException.class, runLoadLogs::call); + assertFalse(cleanShutdownFileHandler.exists(), "Clean shutdown file must not have existed"); + assertFalse(logDirFailureChannel.hasOfflineLogDir(logDir.getAbsolutePath()), "log dir should not turn offline when Runtime Exception thrown"); + + // Simulate Kafka storage error with IOException cause + // in this case, the logDir will be added into offline list before KafkaStorageThrown. So we don't verify it here + simulateError.errorType = ErrorType.KAFKA_STORAGE_EXCEPTION_WITH_IO_EXCEPTION_CAUSE; + assertDoesNotThrow(runLoadLogs::call, "KafkaStorageException with IOException cause should be caught and handled"); + + // Simulate Kafka storage error without IOException cause + simulateError.errorType = ErrorType.KAFKA_STORAGE_EXCEPTION_WITHOUT_IO_EXCEPTION_CAUSE; + assertThrows(KafkaStorageException.class, runLoadLogs::call, "should throw exception when KafkaStorageException without IOException cause"); + assertFalse(logDirFailureChannel.hasOfflineLogDir(logDir.getAbsolutePath()), "log dir should not turn offline when KafkaStorageException without IOException cause thrown"); + + // Simulate IO error + simulateError.errorType = ErrorType.IO_EXCEPTION; + assertDoesNotThrow(runLoadLogs::call, "IOException should be caught and handled"); + assertTrue(logDirFailureChannel.hasOfflineLogDir(logDir.getAbsolutePath()), "the log dir should turn offline after IOException thrown"); + + // Do not simulate error on next call to LogManager#loadLogs. LogManager must understand that log had unclean shutdown the last time. + simulateError.hasError = false; + cleanShutdownInterceptedValue.set(true); + LogConfig defaultConfig = logManager.currentDefaultConfig(); + logManager.loadLogs(defaultConfig, logManager.fetchTopicConfigOverrides(defaultConfig, Set.of()), l -> false); + assertFalse(cleanShutdownInterceptedValue.get(), "Unexpected value for clean shutdown flag"); + logManager.shutdown(); + } + } + + private LogManager initializeLogManagerForSimulatingErrorTest( + LogConfig logConfig, + List<File> logDirs, + TopicPartition topicPartition, + MockTime time, + AtomicBoolean cleanShutdownInterceptedValue, + SimulateError simulateError + ) throws IOException { + return initializeLogManagerForSimulatingErrorTest(logConfig, logDirs, topicPartition, + new LogDirFailureChannel(logDirs.size()), time, cleanShutdownInterceptedValue, simulateError); + } + + private LogManager initializeLogManagerForSimulatingErrorTest( + LogConfig logConfig, + List<File> logDirs, + TopicPartition topicPartition, + LogDirFailureChannel logDirFailureChannel, + MockTime time, + AtomicBoolean cleanShutdownInterceptedValue, + SimulateError simulateError + ) throws IOException { + LogManager logManager = interceptedLogManager(logConfig, logDirs, logDirFailureChannel, time, + cleanShutdownInterceptedValue, simulateError); + logManager.getOrCreateLog(topicPartition, true, false, Optional.empty()); + assertFalse(logDirFailureChannel.hasOfflineLogDir(logDir.getAbsolutePath()), "log dir should not be offline before load logs"); Review Comment: Thanks for the patch! It looks like this is now using the class-level `logDir`, whereas the Scala version used the `logDir` created inside `testLogRecoveryIsCalledUponBrokerCrash`. This means the assertion may check a different directory after the migration. Should this use the local `logDir` instead? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
