Re: [PR] KAFKA-16399: Add JBOD support in tiered storage [kafka]
showuon merged PR #15690: URL: https://github.com/apache/kafka/pull/15690 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16399: Add JBOD support in tiered storage [kafka]
showuon commented on PR #15690: URL: https://github.com/apache/kafka/pull/15690#issuecomment-2136661825 > @showuon Thanks for addressing [KAFKA-16709](https://issues.apache.org/jira/browse/KAFKA-16709) , [KAFKA-16711](https://issues.apache.org/jira/browse/KAFKA-16711). Do you want to rebase the PR with the trunk to pull those changes that remove the test flakiness? Just rebased. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16399: Add JBOD support in tiered storage [kafka]
satishd commented on PR #15690: URL: https://github.com/apache/kafka/pull/15690#issuecomment-2135628611 @showuon Thanks for addressing KAFKA-16709 , KAFKA-16711. Do you want to rebase the PR with the trunk to pull those changes that remove the test flakiness? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16399: Add JBOD support in tiered storage [kafka]
showuon commented on code in PR #15690: URL: https://github.com/apache/kafka/pull/15690#discussion_r1598169071 ## storage/src/test/java/org/apache/kafka/tiered/storage/utils/TieredStorageTestUtils.java: ## @@ -55,7 +57,7 @@ public class TieredStorageTestUtils { // Log cleanup interval is configured to be 500 ms. We need to wait at least that amount of time before // segments eligible for deletion gets physically removed. -public static final Integer STORAGE_WAIT_TIMEOUT_SEC = 5; +public static final Integer STORAGE_WAIT_TIMEOUT_SEC = 10; Review Comment: I've identified the flaky test is because of these issues: [KAFKA-16709](https://issues.apache.org/jira/browse/KAFKA-16709) and [KAFKA-16711](https://issues.apache.org/jira/browse/KAFKA-16711). Will work on fixing them first. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16399: Add JBOD support in tiered storage [kafka]
showuon commented on code in PR #15690: URL: https://github.com/apache/kafka/pull/15690#discussion_r1593390067 ## storage/src/test/java/org/apache/kafka/tiered/storage/utils/TieredStorageTestUtils.java: ## @@ -55,7 +57,7 @@ public class TieredStorageTestUtils { // Log cleanup interval is configured to be 500 ms. We need to wait at least that amount of time before // segments eligible for deletion gets physically removed. -public static final Integer STORAGE_WAIT_TIMEOUT_SEC = 5; +public static final Integer STORAGE_WAIT_TIMEOUT_SEC = 10; Review Comment: The test is still flaky. Investigating. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16399: Add JBOD support in tiered storage [kafka]
showuon commented on code in PR #15690: URL: https://github.com/apache/kafka/pull/15690#discussion_r1592297330 ## storage/src/test/java/org/apache/kafka/tiered/storage/utils/TieredStorageTestUtils.java: ## @@ -55,7 +57,7 @@ public class TieredStorageTestUtils { // Log cleanup interval is configured to be 500 ms. We need to wait at least that amount of time before // segments eligible for deletion gets physically removed. -public static final Integer STORAGE_WAIT_TIMEOUT_SEC = 5; +public static final Integer STORAGE_WAIT_TIMEOUT_SEC = 10; Review Comment: Yes, I think the test is flaky because the CI environment is quite slow, and maybe the IO is slower than we thought. From the log I can see, at `08:59:27,187`, we copied `0002.log` to remote. And after 1 second of `0002.log` copied to remote (`08:59:28,300`), it timed out and all resources started to get closed. I've waited 10 seconds for the log deletion to get completed, but obviously it's not enough for CI env. I've increased to 20 seconds and see if it fixes the issue. I think we've done what we can do to make it faster (i.e. set the configs to speed up the tests) ``` [2024-05-04 08:59:27,187] INFO [RemoteLogManager=0 partition=DcnVRVRSQd675ZLtCIn21A:topicB-0] Copied 0002.log to remote storage with segment-id: RemoteLogSegmentId{topicIdPartition=DcnVRVRSQd675ZLtCIn21A:topicB-0, id=gcVp790dRlmFCr_0tN0NTg} (kafka.log.remote.RemoteLogManager$RLMTask:792) [2024-05-04 08:59:28,300] INFO Closing topic-based RLMM resources [2024-05-04 08:59:28,304] INFO Closing the instance (org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask:328) [2024-05-04 08:59:28,308] INFO Exited from consumer task thread (org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask:151) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16399: Add JBOD support in tiered storage [kafka]
satishd commented on code in PR #15690: URL: https://github.com/apache/kafka/pull/15690#discussion_r1590586656 ## storage/src/test/java/org/apache/kafka/tiered/storage/utils/TieredStorageTestUtils.java: ## @@ -55,7 +57,7 @@ public class TieredStorageTestUtils { // Log cleanup interval is configured to be 500 ms. We need to wait at least that amount of time before // segments eligible for deletion gets physically removed. -public static final Integer STORAGE_WAIT_TIMEOUT_SEC = 5; +public static final Integer STORAGE_WAIT_TIMEOUT_SEC = 10; Review Comment: Why is it increased to 10 secs? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16399: Add JBOD support in tiered storage [kafka]
satishd commented on code in PR #15690: URL: https://github.com/apache/kafka/pull/15690#discussion_r1590586656 ## storage/src/test/java/org/apache/kafka/tiered/storage/utils/TieredStorageTestUtils.java: ## @@ -55,7 +57,7 @@ public class TieredStorageTestUtils { // Log cleanup interval is configured to be 500 ms. We need to wait at least that amount of time before // segments eligible for deletion gets physically removed. -public static final Integer STORAGE_WAIT_TIMEOUT_SEC = 5; +public static final Integer STORAGE_WAIT_TIMEOUT_SEC = 10; Review Comment: Why is it increased to 10 secs? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16399: Add JBOD support in tiered storage [kafka]
kamalcph commented on code in PR #15690: URL: https://github.com/apache/kafka/pull/15690#discussion_r1590764089 ## storage/src/test/java/org/apache/kafka/tiered/storage/utils/TieredStorageTestUtils.java: ## @@ -55,7 +57,7 @@ public class TieredStorageTestUtils { // Log cleanup interval is configured to be 500 ms. We need to wait at least that amount of time before // segments eligible for deletion gets physically removed. -public static final Integer STORAGE_WAIT_TIMEOUT_SEC = 5; +public static final Integer STORAGE_WAIT_TIMEOUT_SEC = 10; Review Comment: The AlterLogDirTest still flaky: ``` Build / JDK 17 and Scala 2.13 / executeTieredStorageTest(String).quorum=kraft – org.apache.kafka.tiered.storage.integration.AlterLogDirTest Error java.lang.AssertionError: [BrokerId=0] The base offset of the first log segment of topicB-0 in the log directory is 2 which is smaller than the expected offset 3. The directory of topicB-0 is made of the following files: 0003.timeindex 0002.log 0002.timeindex leader-epoch-checkpoint 0003.snapshot 0002.snapshot partition.metadata 0003.index 0002.index 0003.log Stacktrace java.lang.AssertionError: [BrokerId=0] The base offset of the first log segment of topicB-0 in the log directory is 2 which is smaller than the expected offset 3. The directory of topicB-0 is made of the following files: 0003.timeindex 0002.log 0002.timeindex leader-epoch-checkpoint 0003.snapshot 0002.snapshot partition.metadata 0003.index 0002.index 0003.log at org.apache.kafka.tiered.storage.utils.BrokerLocalStorage.waitForOffset(BrokerLocalStorage.java:129) at org.apache.kafka.tiered.storage.utils.BrokerLocalStorage.waitForEarliestLocalOffset(BrokerLocalStorage.java:86) at org.apache.kafka.tiered.storage.actions.ProduceAction.doExecute(ProduceAction.java:124) at org.apache.kafka.tiered.storage.TieredStorageTestAction.execute(TieredStorageTestAction.java:25) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16399: Add JBOD support in tiered storage [kafka]
kamalcph commented on code in PR #15690: URL: https://github.com/apache/kafka/pull/15690#discussion_r1590764089 ## storage/src/test/java/org/apache/kafka/tiered/storage/utils/TieredStorageTestUtils.java: ## @@ -55,7 +57,7 @@ public class TieredStorageTestUtils { // Log cleanup interval is configured to be 500 ms. We need to wait at least that amount of time before // segments eligible for deletion gets physically removed. -public static final Integer STORAGE_WAIT_TIMEOUT_SEC = 5; +public static final Integer STORAGE_WAIT_TIMEOUT_SEC = 10; Review Comment: The AlterLogDirTest seems flaky: ``` Build / JDK 17 and Scala 2.13 / executeTieredStorageTest(String).quorum=kraft – org.apache.kafka.tiered.storage.integration.AlterLogDirTest Error java.lang.AssertionError: [BrokerId=0] The base offset of the first log segment of topicB-0 in the log directory is 2 which is smaller than the expected offset 3. The directory of topicB-0 is made of the following files: 0003.timeindex 0002.log 0002.timeindex leader-epoch-checkpoint 0003.snapshot 0002.snapshot partition.metadata 0003.index 0002.index 0003.log Stacktrace java.lang.AssertionError: [BrokerId=0] The base offset of the first log segment of topicB-0 in the log directory is 2 which is smaller than the expected offset 3. The directory of topicB-0 is made of the following files: 0003.timeindex 0002.log 0002.timeindex leader-epoch-checkpoint 0003.snapshot 0002.snapshot partition.metadata 0003.index 0002.index 0003.log at org.apache.kafka.tiered.storage.utils.BrokerLocalStorage.waitForOffset(BrokerLocalStorage.java:129) at org.apache.kafka.tiered.storage.utils.BrokerLocalStorage.waitForEarliestLocalOffset(BrokerLocalStorage.java:86) at org.apache.kafka.tiered.storage.actions.ProduceAction.doExecute(ProduceAction.java:124) at org.apache.kafka.tiered.storage.TieredStorageTestAction.execute(TieredStorageTestAction.java:25) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16399: Add JBOD support in tiered storage [kafka]
kamalcph commented on code in PR #15690: URL: https://github.com/apache/kafka/pull/15690#discussion_r1590764089 ## storage/src/test/java/org/apache/kafka/tiered/storage/utils/TieredStorageTestUtils.java: ## @@ -55,7 +57,7 @@ public class TieredStorageTestUtils { // Log cleanup interval is configured to be 500 ms. We need to wait at least that amount of time before // segments eligible for deletion gets physically removed. -public static final Integer STORAGE_WAIT_TIMEOUT_SEC = 5; +public static final Integer STORAGE_WAIT_TIMEOUT_SEC = 10; Review Comment: The AlterLogDirTest seems flaky: ``` Build / JDK 17 and Scala 2.13 / executeTieredStorageTest(String).quorum=kraft – org.apache.kafka.tiered.storage.integration.AlterLogDirTest ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16399: Add JBOD support in tiered storage [kafka]
kamalcph commented on code in PR #15690: URL: https://github.com/apache/kafka/pull/15690#discussion_r1590762158 ## storage/src/test/java/org/apache/kafka/tiered/storage/integration/AlterLogDirTest.java: ## @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tiered.storage.integration; + +import org.apache.kafka.tiered.storage.TieredStorageTestBuilder; +import org.apache.kafka.tiered.storage.specs.KeyValueSpec; + +import java.util.Collections; +import java.util.List; + +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; + +public final class AlterLogDirTest extends BaseReassignReplicaTest { Review Comment: Can we extend `TieredStorageTestHarness` instead of `BaseReassignReplicaTest`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16399: Add JBOD support in tiered storage [kafka]
kamalcph commented on code in PR #15690: URL: https://github.com/apache/kafka/pull/15690#discussion_r1590749298 ## storage/src/test/java/org/apache/kafka/tiered/storage/utils/TieredStorageTestUtils.java: ## @@ -55,7 +57,7 @@ public class TieredStorageTestUtils { // Log cleanup interval is configured to be 500 ms. We need to wait at least that amount of time before // segments eligible for deletion gets physically removed. -public static final Integer STORAGE_WAIT_TIMEOUT_SEC = 5; +public static final Integer STORAGE_WAIT_TIMEOUT_SEC = 10; Review Comment: To get the context, Why the timeout was increased? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16399: Add JBOD support in tiered storage [kafka]
satishd commented on code in PR #15690: URL: https://github.com/apache/kafka/pull/15690#discussion_r1590570923 ## core/src/main/java/kafka/server/TierStateMachine.java: ## @@ -40,19 +90,176 @@ public interface TierStateMachine { */ PartitionFetchState start(TopicPartition topicPartition, PartitionFetchState currentFetchState, - PartitionData fetchPartitionData) throws Exception; + PartitionData fetchPartitionData) throws Exception { +OffsetAndEpoch epochAndLeaderLocalStartOffset = leader.fetchEarliestLocalOffset(topicPartition, currentFetchState.currentLeaderEpoch()); +int epoch = epochAndLeaderLocalStartOffset.leaderEpoch(); +long leaderLocalStartOffset = epochAndLeaderLocalStartOffset.offset(); + +long offsetToFetch; + replicaMgr.brokerTopicStats().topicStats(topicPartition.topic()).buildRemoteLogAuxStateRequestRate().mark(); + replicaMgr.brokerTopicStats().allTopicsStats().buildRemoteLogAuxStateRequestRate().mark(); + +UnifiedLog unifiedLog; +if (useFutureLog) { +unifiedLog = replicaMgr.futureLogOrException(topicPartition); +} else { +unifiedLog = replicaMgr.localLogOrException(topicPartition); +} + +try { +offsetToFetch = buildRemoteLogAuxState(topicPartition, currentFetchState.currentLeaderEpoch(), leaderLocalStartOffset, epoch, fetchPartitionData.logStartOffset(), unifiedLog); +} catch (RemoteStorageException e) { + replicaMgr.brokerTopicStats().topicStats(topicPartition.topic()).failedBuildRemoteLogAuxStateRate().mark(); + replicaMgr.brokerTopicStats().allTopicsStats().failedBuildRemoteLogAuxStateRate().mark(); +throw e; +} + +OffsetAndEpoch fetchLatestOffsetResult = leader.fetchLatestOffset(topicPartition, currentFetchState.currentLeaderEpoch()); +long leaderEndOffset = fetchLatestOffsetResult.offset(); + +long initialLag = leaderEndOffset - offsetToFetch; + +return PartitionFetchState.apply(currentFetchState.topicId(), offsetToFetch, Option.apply(initialLag), currentFetchState.currentLeaderEpoch(), +Fetching$.MODULE$, unifiedLog.latestEpoch()); + +} + +private OffsetForLeaderEpochResponseData.EpochEndOffset fetchEarlierEpochEndOffset(Integer epoch, + TopicPartition partition, + Integer currentLeaderEpoch) { +int previousEpoch = epoch - 1; + +// Find the end-offset for the epoch earlier to the given epoch from the leader +Map partitionsWithEpochs = new HashMap<>(); +partitionsWithEpochs.put(partition, new OffsetForLeaderEpochRequestData.OffsetForLeaderPartition().setPartition(partition.partition()).setCurrentLeaderEpoch(currentLeaderEpoch).setLeaderEpoch(previousEpoch)); +Option maybeEpochEndOffset = leader.fetchEpochEndOffsets(JavaConverters.mapAsScalaMap(partitionsWithEpochs)).get(partition); +if (maybeEpochEndOffset.isEmpty()) { +throw new KafkaException("No response received for partition: " + partition); +} + +OffsetForLeaderEpochResponseData.EpochEndOffset epochEndOffset = maybeEpochEndOffset.get(); +if (epochEndOffset.errorCode() != Errors.NONE.code()) { +throw Errors.forCode(epochEndOffset.errorCode()).exception(); +} + +return epochEndOffset; +} + +private List readLeaderEpochCheckpoint(RemoteLogManager rlm, + RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws IOException, RemoteStorageException { +InputStream inputStream = rlm.storageManager().fetchIndex(remoteLogSegmentMetadata, RemoteStorageManager.IndexType.LEADER_EPOCH); +try (BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))) { +CheckpointFile.CheckpointReadBuffer readBuffer = new CheckpointFile.CheckpointReadBuffer<>("", bufferedReader, 0, LeaderEpochCheckpointFile.FORMATTER); +return readBuffer.read(); +} +} + +private void buildProducerSnapshotFile(UnifiedLog unifiedLog, + long nextOffset, + RemoteLogSegmentMetadata remoteLogSegmentMetadata, + RemoteLogManager rlm) throws IOException, RemoteStorageException { +// Restore producer snapshot +File snapshotFile = LogFileUtils.producerSnapshotFile(unifiedLog.dir(), nextOffset); +Path tmpSnapshotFile = Paths.get(snapshotFile.getAbsolutePath() + ".tmp"); +// Copy it to snapshot file in atomic mann
Re: [PR] KAFKA-16399: Add JBOD support in tiered storage [kafka]
satishd commented on code in PR #15690: URL: https://github.com/apache/kafka/pull/15690#discussion_r1590570923 ## core/src/main/java/kafka/server/TierStateMachine.java: ## @@ -40,19 +90,176 @@ public interface TierStateMachine { */ PartitionFetchState start(TopicPartition topicPartition, PartitionFetchState currentFetchState, - PartitionData fetchPartitionData) throws Exception; + PartitionData fetchPartitionData) throws Exception { +OffsetAndEpoch epochAndLeaderLocalStartOffset = leader.fetchEarliestLocalOffset(topicPartition, currentFetchState.currentLeaderEpoch()); +int epoch = epochAndLeaderLocalStartOffset.leaderEpoch(); +long leaderLocalStartOffset = epochAndLeaderLocalStartOffset.offset(); + +long offsetToFetch; + replicaMgr.brokerTopicStats().topicStats(topicPartition.topic()).buildRemoteLogAuxStateRequestRate().mark(); + replicaMgr.brokerTopicStats().allTopicsStats().buildRemoteLogAuxStateRequestRate().mark(); + +UnifiedLog unifiedLog; +if (useFutureLog) { +unifiedLog = replicaMgr.futureLogOrException(topicPartition); +} else { +unifiedLog = replicaMgr.localLogOrException(topicPartition); +} + +try { +offsetToFetch = buildRemoteLogAuxState(topicPartition, currentFetchState.currentLeaderEpoch(), leaderLocalStartOffset, epoch, fetchPartitionData.logStartOffset(), unifiedLog); +} catch (RemoteStorageException e) { + replicaMgr.brokerTopicStats().topicStats(topicPartition.topic()).failedBuildRemoteLogAuxStateRate().mark(); + replicaMgr.brokerTopicStats().allTopicsStats().failedBuildRemoteLogAuxStateRate().mark(); +throw e; +} + +OffsetAndEpoch fetchLatestOffsetResult = leader.fetchLatestOffset(topicPartition, currentFetchState.currentLeaderEpoch()); +long leaderEndOffset = fetchLatestOffsetResult.offset(); + +long initialLag = leaderEndOffset - offsetToFetch; + +return PartitionFetchState.apply(currentFetchState.topicId(), offsetToFetch, Option.apply(initialLag), currentFetchState.currentLeaderEpoch(), +Fetching$.MODULE$, unifiedLog.latestEpoch()); + +} + +private OffsetForLeaderEpochResponseData.EpochEndOffset fetchEarlierEpochEndOffset(Integer epoch, + TopicPartition partition, + Integer currentLeaderEpoch) { +int previousEpoch = epoch - 1; + +// Find the end-offset for the epoch earlier to the given epoch from the leader +Map partitionsWithEpochs = new HashMap<>(); +partitionsWithEpochs.put(partition, new OffsetForLeaderEpochRequestData.OffsetForLeaderPartition().setPartition(partition.partition()).setCurrentLeaderEpoch(currentLeaderEpoch).setLeaderEpoch(previousEpoch)); +Option maybeEpochEndOffset = leader.fetchEpochEndOffsets(JavaConverters.mapAsScalaMap(partitionsWithEpochs)).get(partition); +if (maybeEpochEndOffset.isEmpty()) { +throw new KafkaException("No response received for partition: " + partition); +} + +OffsetForLeaderEpochResponseData.EpochEndOffset epochEndOffset = maybeEpochEndOffset.get(); +if (epochEndOffset.errorCode() != Errors.NONE.code()) { +throw Errors.forCode(epochEndOffset.errorCode()).exception(); +} + +return epochEndOffset; +} + +private List readLeaderEpochCheckpoint(RemoteLogManager rlm, + RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws IOException, RemoteStorageException { +InputStream inputStream = rlm.storageManager().fetchIndex(remoteLogSegmentMetadata, RemoteStorageManager.IndexType.LEADER_EPOCH); +try (BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))) { +CheckpointFile.CheckpointReadBuffer readBuffer = new CheckpointFile.CheckpointReadBuffer<>("", bufferedReader, 0, LeaderEpochCheckpointFile.FORMATTER); +return readBuffer.read(); +} +} + +private void buildProducerSnapshotFile(UnifiedLog unifiedLog, + long nextOffset, + RemoteLogSegmentMetadata remoteLogSegmentMetadata, + RemoteLogManager rlm) throws IOException, RemoteStorageException { +// Restore producer snapshot +File snapshotFile = LogFileUtils.producerSnapshotFile(unifiedLog.dir(), nextOffset); +Path tmpSnapshotFile = Paths.get(snapshotFile.getAbsolutePath() + ".tmp"); +// Copy it to snapshot file in atomic mann
Re: [PR] KAFKA-16399: Add JBOD support in tiered storage [kafka]
satishd commented on code in PR #15690: URL: https://github.com/apache/kafka/pull/15690#discussion_r1590567527 ## core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala: ## @@ -1858,16 +1858,6 @@ class KafkaConfigTest { } } - @Test - def testMultipleLogDirectoriesNotSupportedWithRemoteLogStorage(): Unit = { Review Comment: Can we add a positive test for mulitple log dirs also like the one created below for a single dir, may be refactor the below test to cover for both scenarios? ## core/src/main/java/kafka/server/TierStateMachine.java: ## @@ -40,19 +90,176 @@ public interface TierStateMachine { */ PartitionFetchState start(TopicPartition topicPartition, PartitionFetchState currentFetchState, - PartitionData fetchPartitionData) throws Exception; + PartitionData fetchPartitionData) throws Exception { +OffsetAndEpoch epochAndLeaderLocalStartOffset = leader.fetchEarliestLocalOffset(topicPartition, currentFetchState.currentLeaderEpoch()); +int epoch = epochAndLeaderLocalStartOffset.leaderEpoch(); +long leaderLocalStartOffset = epochAndLeaderLocalStartOffset.offset(); + +long offsetToFetch; + replicaMgr.brokerTopicStats().topicStats(topicPartition.topic()).buildRemoteLogAuxStateRequestRate().mark(); + replicaMgr.brokerTopicStats().allTopicsStats().buildRemoteLogAuxStateRequestRate().mark(); + +UnifiedLog unifiedLog; +if (useFutureLog) { +unifiedLog = replicaMgr.futureLogOrException(topicPartition); +} else { +unifiedLog = replicaMgr.localLogOrException(topicPartition); +} + +try { +offsetToFetch = buildRemoteLogAuxState(topicPartition, currentFetchState.currentLeaderEpoch(), leaderLocalStartOffset, epoch, fetchPartitionData.logStartOffset(), unifiedLog); +} catch (RemoteStorageException e) { + replicaMgr.brokerTopicStats().topicStats(topicPartition.topic()).failedBuildRemoteLogAuxStateRate().mark(); + replicaMgr.brokerTopicStats().allTopicsStats().failedBuildRemoteLogAuxStateRate().mark(); +throw e; +} + +OffsetAndEpoch fetchLatestOffsetResult = leader.fetchLatestOffset(topicPartition, currentFetchState.currentLeaderEpoch()); +long leaderEndOffset = fetchLatestOffsetResult.offset(); + +long initialLag = leaderEndOffset - offsetToFetch; + +return PartitionFetchState.apply(currentFetchState.topicId(), offsetToFetch, Option.apply(initialLag), currentFetchState.currentLeaderEpoch(), +Fetching$.MODULE$, unifiedLog.latestEpoch()); + +} + +private OffsetForLeaderEpochResponseData.EpochEndOffset fetchEarlierEpochEndOffset(Integer epoch, + TopicPartition partition, + Integer currentLeaderEpoch) { +int previousEpoch = epoch - 1; + +// Find the end-offset for the epoch earlier to the given epoch from the leader +Map partitionsWithEpochs = new HashMap<>(); +partitionsWithEpochs.put(partition, new OffsetForLeaderEpochRequestData.OffsetForLeaderPartition().setPartition(partition.partition()).setCurrentLeaderEpoch(currentLeaderEpoch).setLeaderEpoch(previousEpoch)); +Option maybeEpochEndOffset = leader.fetchEpochEndOffsets(JavaConverters.mapAsScalaMap(partitionsWithEpochs)).get(partition); +if (maybeEpochEndOffset.isEmpty()) { +throw new KafkaException("No response received for partition: " + partition); +} + +OffsetForLeaderEpochResponseData.EpochEndOffset epochEndOffset = maybeEpochEndOffset.get(); +if (epochEndOffset.errorCode() != Errors.NONE.code()) { +throw Errors.forCode(epochEndOffset.errorCode()).exception(); +} + +return epochEndOffset; +} + +private List readLeaderEpochCheckpoint(RemoteLogManager rlm, + RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws IOException, RemoteStorageException { +InputStream inputStream = rlm.storageManager().fetchIndex(remoteLogSegmentMetadata, RemoteStorageManager.IndexType.LEADER_EPOCH); +try (BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))) { +CheckpointFile.CheckpointReadBuffer readBuffer = new CheckpointFile.CheckpointReadBuffer<>("", bufferedReader, 0, LeaderEpochCheckpointFile.FORMATTER); +return readBuffer.read(); +} +} + +private void buildProducerSnapshotFile(UnifiedLog unifiedLog, + long nextOffset, + RemoteLogSegmentMe
Re: [PR] KAFKA-16399: Add JBOD support in tiered storage [kafka]
soarez commented on PR #15690: URL: https://github.com/apache/kafka/pull/15690#issuecomment-2092980307 Thanks for fixing the import. There are some failing tests, please have a look. I think we need to delete `kafka.server.KafkaConfigTest#testMultipleLogDirectoriesNotSupportedWithRemoteLogStorage()`. There is also at least another related test failure in `AlterLogDirTest`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16399: Add JBOD support in tiered storage [kafka]
kamalcph commented on PR #15690: URL: https://github.com/apache/kafka/pull/15690#issuecomment-2092058377 The latest changes lgtm. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16399: Add JBOD support in tiered storage [kafka]
soarez commented on code in PR #15690: URL: https://github.com/apache/kafka/pull/15690#discussion_r1587465657 ## core/src/main/java/kafka/server/TierStateMachine.java: ## @@ -17,15 +17,66 @@ package kafka.server; +import java.io.BufferedReader; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardCopyOption; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.Optional; Review Comment: `:core:checkstyleMain` is failing because this import is unused ``` > Task :core:checkstyleMain FAILED [ant:checkstyle] [ERROR] (...)/kafka/core/src/main/java/kafka/server/TierStateMachine.java:33:8: Unused import - java.util.Optional. [UnusedImports] ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16399: Add JBOD support in tiered storage [kafka]
showuon commented on code in PR #15690: URL: https://github.com/apache/kafka/pull/15690#discussion_r1587440706 ## core/src/main/java/kafka/server/TierStateMachine.java: ## @@ -17,15 +17,69 @@ package kafka.server; +import java.io.BufferedReader; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardCopyOption; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.Optional; +import kafka.cluster.Partition; +import kafka.log.UnifiedLog; +import kafka.log.remote.RemoteLogManager; +import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.message.FetchResponseData.PartitionData; +import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.common.CheckpointFile; +import org.apache.kafka.server.common.OffsetAndEpoch; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteStorageException; +import org.apache.kafka.server.log.remote.storage.RemoteStorageManager; +import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile; +import org.apache.kafka.storage.internals.log.EpochEntry; +import org.apache.kafka.storage.internals.log.LogFileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Option; +import scala.collection.JavaConverters; + +import static org.apache.kafka.storage.internals.log.LogStartOffsetIncrementReason.LeaderOffsetIncremented; /** - * This interface defines the APIs needed to handle any state transitions related to tiering + * This class defines the APIs and implementation needed to handle any state transitions related to tiering + * + * Currently, the tier state machine follows a synchronous execution, and we only need to start the machine. + * There is no need to advance the state. + * Review Comment: Updated. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16399: Add JBOD support in tiered storage [kafka]
soarez commented on code in PR #15690: URL: https://github.com/apache/kafka/pull/15690#discussion_r1587367900 ## core/src/main/java/kafka/server/TierStateMachine.java: ## @@ -17,15 +17,69 @@ package kafka.server; +import java.io.BufferedReader; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardCopyOption; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.Optional; +import kafka.cluster.Partition; +import kafka.log.UnifiedLog; +import kafka.log.remote.RemoteLogManager; +import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.message.FetchResponseData.PartitionData; +import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.common.CheckpointFile; +import org.apache.kafka.server.common.OffsetAndEpoch; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteStorageException; +import org.apache.kafka.server.log.remote.storage.RemoteStorageManager; +import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile; +import org.apache.kafka.storage.internals.log.EpochEntry; +import org.apache.kafka.storage.internals.log.LogFileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Option; +import scala.collection.JavaConverters; + +import static org.apache.kafka.storage.internals.log.LogStartOffsetIncrementReason.LeaderOffsetIncremented; /** - * This interface defines the APIs needed to handle any state transitions related to tiering + * This class defines the APIs and implementation needed to handle any state transitions related to tiering + * + * Currently, the tier state machine follows a synchronous execution, and we only need to start the machine. + * There is no need to advance the state. + * Review Comment: Do we still need this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16399: Add JBOD support in tiered storage [kafka]
showuon commented on PR #15690: URL: https://github.com/apache/kafka/pull/15690#issuecomment-2090048261 @soarez @kamalcph , thanks for the comments. I've updated the PR in this commit: https://github.com/apache/kafka/pull/15690/commits/97e2b47c68254928c0baf065eee14a7aeb6e12b2 . -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16399: Add JBOD support in tiered storage [kafka]
showuon commented on code in PR #15690: URL: https://github.com/apache/kafka/pull/15690#discussion_r1587357829 ## core/src/main/java/kafka/server/TierStateMachine.java: ## @@ -54,5 +180,109 @@ PartitionFetchState start(TopicPartition topicPartition, * @return the new PartitionFetchState if the tier state machine was advanced, otherwise, return the currentFetchState */ Optional maybeAdvanceState(TopicPartition topicPartition, -PartitionFetchState currentFetchState); +PartitionFetchState currentFetchState) { +// This is currently a no-op but will be used for implementing async tiering logic in KAFKA-13560. +return Optional.of(currentFetchState); +} + +/** + * It tries to build the required state for this partition from leader and remote storage so that it can start + * fetching records from the leader. The return value is the next offset to fetch from the leader, which is the + * next offset following the end offset of the remote log portion. + */ +private Long buildRemoteLogAuxState(TopicPartition topicPartition, +Integer currentLeaderEpoch, +Long leaderLocalLogStartOffset, +Integer epochForLeaderLocalLogStartOffset, +Long leaderLogStartOffset, +UnifiedLog unifiedLog) throws IOException, RemoteStorageException { + +long nextOffset; + +if (unifiedLog.remoteStorageSystemEnable() && unifiedLog.config().remoteStorageEnable()) { +if (replicaMgr.remoteLogManager().isEmpty()) throw new IllegalStateException("RemoteLogManager is not yet instantiated"); + +RemoteLogManager rlm = replicaMgr.remoteLogManager().get(); + +// Find the respective leader epoch for (leaderLocalLogStartOffset - 1). We need to build the leader epoch cache +// until that offset +long previousOffsetToLeaderLocalLogStartOffset = leaderLocalLogStartOffset - 1; +int targetEpoch; +// If the existing epoch is 0, no need to fetch from earlier epoch as the desired offset(leaderLogStartOffset - 1) +// will have the same epoch. +if (epochForLeaderLocalLogStartOffset == 0) { +targetEpoch = epochForLeaderLocalLogStartOffset; +} else { Review Comment: For this, I'd prefer to keeping the original way, which is more readable to me. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16399: Add JBOD support in tiered storage [kafka]
showuon commented on code in PR #15690: URL: https://github.com/apache/kafka/pull/15690#discussion_r1587356992 ## core/src/main/java/kafka/server/TierStateMachine.java: ## @@ -54,5 +180,109 @@ PartitionFetchState start(TopicPartition topicPartition, * @return the new PartitionFetchState if the tier state machine was advanced, otherwise, return the currentFetchState */ Optional maybeAdvanceState(TopicPartition topicPartition, -PartitionFetchState currentFetchState); +PartitionFetchState currentFetchState) { +// This is currently a no-op but will be used for implementing async tiering logic in KAFKA-13560. Review Comment: I think it makes sense we remove this unused placeholder first, and add it back when implemented. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16399: Add JBOD support in tiered storage [kafka]
showuon commented on code in PR #15690: URL: https://github.com/apache/kafka/pull/15690#discussion_r1587346384 ## storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestHarness.java: ## @@ -154,7 +154,7 @@ public static List remoteStorageManagers(Seq br @SuppressWarnings("deprecation") public static List localStorages(Seq brokers) { return JavaConverters.seqAsJavaList(brokers).stream() -.map(b -> new BrokerLocalStorage(b.config().brokerId(), b.config().logDirs().head(), +.map(b -> new BrokerLocalStorage(b.config().brokerId(), JavaConverters.asJava(b.config().logDirs().toSet()), Review Comment: You're right! Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16399: Add JBOD support in tiered storage [kafka]
showuon commented on code in PR #15690: URL: https://github.com/apache/kafka/pull/15690#discussion_r1587318846 ## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ## @@ -3287,11 +3287,9 @@ class ReplicaManagerTest { val path1 = TestUtils.tempRelativeDir("data").getAbsolutePath val path2 = TestUtils.tempRelativeDir("data2").getAbsolutePath if (enableRemoteStorage) { Review Comment: You're right! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16399: Add JBOD support in tiered storage [kafka]
showuon commented on code in PR #15690: URL: https://github.com/apache/kafka/pull/15690#discussion_r1587154091 ## core/src/main/java/kafka/server/TierStateMachine.java: ## @@ -54,5 +180,109 @@ PartitionFetchState start(TopicPartition topicPartition, * @return the new PartitionFetchState if the tier state machine was advanced, otherwise, return the currentFetchState */ Optional maybeAdvanceState(TopicPartition topicPartition, -PartitionFetchState currentFetchState); +PartitionFetchState currentFetchState) { +// This is currently a no-op but will be used for implementing async tiering logic in KAFKA-13560. +return Optional.of(currentFetchState); +} + +/** + * It tries to build the required state for this partition from leader and remote storage so that it can start + * fetching records from the leader. The return value is the next offset to fetch from the leader, which is the + * next offset following the end offset of the remote log portion. + */ +private Long buildRemoteLogAuxState(TopicPartition topicPartition, +Integer currentLeaderEpoch, +Long leaderLocalLogStartOffset, +Integer epochForLeaderLocalLogStartOffset, +Long leaderLogStartOffset, +UnifiedLog unifiedLog) throws IOException, RemoteStorageException { + +long nextOffset; + +if (unifiedLog.remoteStorageSystemEnable() && unifiedLog.config().remoteStorageEnable()) { +if (replicaMgr.remoteLogManager().isEmpty()) throw new IllegalStateException("RemoteLogManager is not yet instantiated"); + +RemoteLogManager rlm = replicaMgr.remoteLogManager().get(); + +// Find the respective leader epoch for (leaderLocalLogStartOffset - 1). We need to build the leader epoch cache +// until that offset +long previousOffsetToLeaderLocalLogStartOffset = leaderLocalLogStartOffset - 1; +int targetEpoch; +// If the existing epoch is 0, no need to fetch from earlier epoch as the desired offset(leaderLogStartOffset - 1) +// will have the same epoch. +if (epochForLeaderLocalLogStartOffset == 0) { +targetEpoch = epochForLeaderLocalLogStartOffset; +} else { +// Fetch the earlier epoch/end-offset(exclusive) from the leader. +OffsetForLeaderEpochResponseData.EpochEndOffset earlierEpochEndOffset = fetchEarlierEpochEndOffset(epochForLeaderLocalLogStartOffset, topicPartition, currentLeaderEpoch); +// Check if the target offset lies within the range of earlier epoch. Here, epoch's end-offset is exclusive. +if (earlierEpochEndOffset.endOffset() > previousOffsetToLeaderLocalLogStartOffset) { +// Always use the leader epoch from returned earlierEpochEndOffset. +// This gives the respective leader epoch, that will handle any gaps in epochs. +// For ex, leader epoch cache contains: +// leader-epoch start-offset +// 0 20 +// 1 85 +// <2> - gap no messages were appended in this leader epoch. +// 3 90 +// 4 98 +// There is a gap in leader epoch. For leaderLocalLogStartOffset as 90, leader-epoch is 3. +// fetchEarlierEpochEndOffset(2) will return leader-epoch as 1, end-offset as 90. +// So, for offset 89, we should return leader epoch as 1 like below. +targetEpoch = earlierEpochEndOffset.leaderEpoch(); +} else { +targetEpoch = epochForLeaderLocalLogStartOffset; +} +} + +Optional maybeRlsm = rlm.fetchRemoteLogSegmentMetadata(topicPartition, targetEpoch, previousOffsetToLeaderLocalLogStartOffset); + +if (maybeRlsm.isPresent()) { +RemoteLogSegmentMetadata remoteLogSegmentMetadata = maybeRlsm.get(); Review Comment: Nice refactor! Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16399: Add JBOD support in tiered storage [kafka]
soarez commented on code in PR #15690: URL: https://github.com/apache/kafka/pull/15690#discussion_r1582791175 ## core/src/main/java/kafka/server/TierStateMachine.java: ## @@ -54,5 +180,109 @@ PartitionFetchState start(TopicPartition topicPartition, * @return the new PartitionFetchState if the tier state machine was advanced, otherwise, return the currentFetchState */ Optional maybeAdvanceState(TopicPartition topicPartition, -PartitionFetchState currentFetchState); +PartitionFetchState currentFetchState) { +// This is currently a no-op but will be used for implementing async tiering logic in KAFKA-13560. +return Optional.of(currentFetchState); +} + +/** + * It tries to build the required state for this partition from leader and remote storage so that it can start + * fetching records from the leader. The return value is the next offset to fetch from the leader, which is the + * next offset following the end offset of the remote log portion. + */ +private Long buildRemoteLogAuxState(TopicPartition topicPartition, +Integer currentLeaderEpoch, +Long leaderLocalLogStartOffset, +Integer epochForLeaderLocalLogStartOffset, +Long leaderLogStartOffset, +UnifiedLog unifiedLog) throws IOException, RemoteStorageException { + +long nextOffset; + +if (unifiedLog.remoteStorageSystemEnable() && unifiedLog.config().remoteStorageEnable()) { +if (replicaMgr.remoteLogManager().isEmpty()) throw new IllegalStateException("RemoteLogManager is not yet instantiated"); + +RemoteLogManager rlm = replicaMgr.remoteLogManager().get(); + +// Find the respective leader epoch for (leaderLocalLogStartOffset - 1). We need to build the leader epoch cache +// until that offset +long previousOffsetToLeaderLocalLogStartOffset = leaderLocalLogStartOffset - 1; +int targetEpoch; +// If the existing epoch is 0, no need to fetch from earlier epoch as the desired offset(leaderLogStartOffset - 1) +// will have the same epoch. +if (epochForLeaderLocalLogStartOffset == 0) { +targetEpoch = epochForLeaderLocalLogStartOffset; +} else { Review Comment: @kamalcph what specific part of the review are you concerned with? My thinking is that a refactoring is already in place with the merge of `ReplicaAlterLogDirsTierStateMachine` and `ReplicaFetcherTierStateMachine`. In my experience is that it's generally easier to reason about what's happening after the code is simplified. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16399: Add JBOD support in tiered storage [kafka]
kamalcph commented on code in PR #15690: URL: https://github.com/apache/kafka/pull/15690#discussion_r1582059485 ## core/src/main/java/kafka/server/TierStateMachine.java: ## @@ -54,5 +180,109 @@ PartitionFetchState start(TopicPartition topicPartition, * @return the new PartitionFetchState if the tier state machine was advanced, otherwise, return the currentFetchState */ Optional maybeAdvanceState(TopicPartition topicPartition, -PartitionFetchState currentFetchState); +PartitionFetchState currentFetchState) { +// This is currently a no-op but will be used for implementing async tiering logic in KAFKA-13560. +return Optional.of(currentFetchState); +} + +/** + * It tries to build the required state for this partition from leader and remote storage so that it can start + * fetching records from the leader. The return value is the next offset to fetch from the leader, which is the + * next offset following the end offset of the remote log portion. + */ +private Long buildRemoteLogAuxState(TopicPartition topicPartition, +Integer currentLeaderEpoch, +Long leaderLocalLogStartOffset, +Integer epochForLeaderLocalLogStartOffset, +Long leaderLogStartOffset, +UnifiedLog unifiedLog) throws IOException, RemoteStorageException { + +long nextOffset; + +if (unifiedLog.remoteStorageSystemEnable() && unifiedLog.config().remoteStorageEnable()) { +if (replicaMgr.remoteLogManager().isEmpty()) throw new IllegalStateException("RemoteLogManager is not yet instantiated"); + +RemoteLogManager rlm = replicaMgr.remoteLogManager().get(); + +// Find the respective leader epoch for (leaderLocalLogStartOffset - 1). We need to build the leader epoch cache +// until that offset +long previousOffsetToLeaderLocalLogStartOffset = leaderLocalLogStartOffset - 1; +int targetEpoch; +// If the existing epoch is 0, no need to fetch from earlier epoch as the desired offset(leaderLogStartOffset - 1) +// will have the same epoch. +if (epochForLeaderLocalLogStartOffset == 0) { +targetEpoch = epochForLeaderLocalLogStartOffset; +} else { Review Comment: I would suggest to take the refactoring in the next/separate PR to review this PR effectively. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16399: Add JBOD support in tiered storage [kafka]
kamalcph commented on code in PR #15690: URL: https://github.com/apache/kafka/pull/15690#discussion_r1582055490 ## storage/src/test/java/org/apache/kafka/tiered/storage/integration/TransactionsWithTieredStoreTest.java: ## @@ -74,7 +74,7 @@ public Properties topicConfig() { public void maybeWaitForAtLeastOneSegmentUpload(scala.collection.Seq topicPartitions) { JavaConverters.seqAsJavaList(topicPartitions).forEach(topicPartition -> { List localStorages = JavaConverters.bufferAsJavaList(brokers()).stream() -.map(b -> new BrokerLocalStorage(b.config().brokerId(), b.config().logDirs().head(), STORAGE_WAIT_TIMEOUT_SEC)) +.map(b -> new BrokerLocalStorage(b.config().brokerId(), JavaConverters.asJava(b.config().logDirs().toSet()), STORAGE_WAIT_TIMEOUT_SEC)) Review Comment: ditto ## storage/src/test/java/org/apache/kafka/tiered/storage/utils/BrokerLocalStorage.java: ## @@ -31,31 +31,36 @@ import java.util.List; import java.util.Objects; import java.util.Optional; +import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.Collectors; public final class BrokerLocalStorage { private final Integer brokerId; -private final File brokerStorageDirectory; +private final Set brokerStorageDirectorys; Review Comment: nit: `brokerStorageDirectorys` -> `brokerStorageDirectories` ## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ## @@ -3287,11 +3287,9 @@ class ReplicaManagerTest { val path1 = TestUtils.tempRelativeDir("data").getAbsolutePath val path2 = TestUtils.tempRelativeDir("data2").getAbsolutePath if (enableRemoteStorage) { Review Comment: nit: do we need this `if` check? ## storage/src/test/java/org/apache/kafka/tiered/storage/utils/BrokerLocalStorage.java: ## @@ -141,7 +146,11 @@ private boolean isOffsetPresentInFirstLocalSegment(TopicPartition topicPartition if (offsetToSearch.equals(firstLogFileBaseOffset)) { return true; } -File partitionDir = new File(brokerStorageDirectory.getAbsolutePath(), topicPartition.toString()); +File partitionDir = brokerStorageDirectorys.stream() +.filter(dir -> dirContainsTopicPartition(topicPartition, dir)) +.findFirst() +.orElseThrow(() -> new IllegalArgumentException(String.format("[BrokerId=%d] Directory for the topic-partition %s " + +"was not found", brokerId, topicPartition))); Review Comment: previously, we were returning the `partitionDir` instead of `logDir`: ```suggestion File logDir = brokerStorageDirectorys.stream() .filter(dir -> dirContainsTopicPartition(topicPartition, dir)) .findFirst() .orElseThrow(() -> new IllegalArgumentException(String.format("[BrokerId=%d] Directory for the topic-partition %s " + "was not found", brokerId, topicPartition))); File partitionDir = new File(logDir.getAbsolutePath(), topicPartition.toString()); ``` ## storage/src/test/java/org/apache/kafka/tiered/storage/utils/BrokerLocalStorage.java: ## @@ -31,31 +31,36 @@ import java.util.List; import java.util.Objects; import java.util.Optional; +import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.Collectors; public final class BrokerLocalStorage { private final Integer brokerId; -private final File brokerStorageDirectory; +private final Set brokerStorageDirectorys; private final Integer storageWaitTimeoutSec; private final int storagePollPeriodSec = 1; private final Time time = Time.SYSTEM; public BrokerLocalStorage(Integer brokerId, - String storageDirname, + Set storageDirnames, Integer storageWaitTimeoutSec) { this.brokerId = brokerId; -this.brokerStorageDirectory = new File(storageDirname); +this.brokerStorageDirectorys = storageDirnames.stream().map(File::new).collect(Collectors.toSet()); this.storageWaitTimeoutSec = storageWaitTimeoutSec; } public Integer getBrokerId() { return brokerId; } +public Set getBrokerStorageDirectory() { Review Comment: rename `getBrokerStorageDirectory` -> `getBrokerStorageDirectories` ## storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestBuilder.java: ## @@ -313,6 +314,14 @@ public TieredStorageTestBuilder reassignReplica(String topic, return this; } +public TieredStorageTestBuilder alterLogDir(String topic, +Integer partition, Review Comment: nit: parameter alignment
Re: [PR] KAFKA-16399: Add JBOD support in tiered storage [kafka]
soarez commented on code in PR #15690: URL: https://github.com/apache/kafka/pull/15690#discussion_r1580567800 ## core/src/main/java/kafka/server/TierStateMachine.java: ## @@ -54,5 +180,109 @@ PartitionFetchState start(TopicPartition topicPartition, * @return the new PartitionFetchState if the tier state machine was advanced, otherwise, return the currentFetchState */ Optional maybeAdvanceState(TopicPartition topicPartition, -PartitionFetchState currentFetchState); +PartitionFetchState currentFetchState) { +// This is currently a no-op but will be used for implementing async tiering logic in KAFKA-13560. +return Optional.of(currentFetchState); +} + +/** + * It tries to build the required state for this partition from leader and remote storage so that it can start + * fetching records from the leader. The return value is the next offset to fetch from the leader, which is the + * next offset following the end offset of the remote log portion. + */ +private Long buildRemoteLogAuxState(TopicPartition topicPartition, +Integer currentLeaderEpoch, +Long leaderLocalLogStartOffset, +Integer epochForLeaderLocalLogStartOffset, +Long leaderLogStartOffset, +UnifiedLog unifiedLog) throws IOException, RemoteStorageException { + +long nextOffset; + +if (unifiedLog.remoteStorageSystemEnable() && unifiedLog.config().remoteStorageEnable()) { +if (replicaMgr.remoteLogManager().isEmpty()) throw new IllegalStateException("RemoteLogManager is not yet instantiated"); + +RemoteLogManager rlm = replicaMgr.remoteLogManager().get(); + +// Find the respective leader epoch for (leaderLocalLogStartOffset - 1). We need to build the leader epoch cache +// until that offset +long previousOffsetToLeaderLocalLogStartOffset = leaderLocalLogStartOffset - 1; +int targetEpoch; +// If the existing epoch is 0, no need to fetch from earlier epoch as the desired offset(leaderLogStartOffset - 1) +// will have the same epoch. +if (epochForLeaderLocalLogStartOffset == 0) { +targetEpoch = epochForLeaderLocalLogStartOffset; +} else { Review Comment: This can also be simplified: ```java int targetEpoch = epochForLeaderLocalLogStartOffset; // If the existing epoch is 0, no need to fetch from earlier epoch as the // desired offset (leaderLogStartOffset - 1) will have the same epoch. if (epochForLeaderLocalLogStartOffset != 0) { ... } ``` ## core/src/main/java/kafka/server/TierStateMachine.java: ## @@ -54,5 +180,109 @@ PartitionFetchState start(TopicPartition topicPartition, * @return the new PartitionFetchState if the tier state machine was advanced, otherwise, return the currentFetchState */ Optional maybeAdvanceState(TopicPartition topicPartition, -PartitionFetchState currentFetchState); +PartitionFetchState currentFetchState) { +// This is currently a no-op but will be used for implementing async tiering logic in KAFKA-13560. +return Optional.of(currentFetchState); +} + +/** + * It tries to build the required state for this partition from leader and remote storage so that it can start + * fetching records from the leader. The return value is the next offset to fetch from the leader, which is the + * next offset following the end offset of the remote log portion. + */ +private Long buildRemoteLogAuxState(TopicPartition topicPartition, +Integer currentLeaderEpoch, +Long leaderLocalLogStartOffset, +Integer epochForLeaderLocalLogStartOffset, +Long leaderLogStartOffset, +UnifiedLog unifiedLog) throws IOException, RemoteStorageException { + +long nextOffset; + +if (unifiedLog.remoteStorageSystemEnable() && unifiedLog.config().remoteStorageEnable()) { +if (replicaMgr.remoteLogManager().isEmpty()) throw new IllegalStateException("RemoteLogManager is not yet instantiated"); + +RemoteLogManager rlm = replicaMgr.remoteLogManager().get(); + +// Find the respective leader epoch for (leaderLocalLogStartOffset - 1). We need to build the leader epoch cache +// until that offset +long previousOffsetToLeaderLocalLogStartOffset = lead
Re: [PR] KAFKA:-16399: Add JBOD support in tiered storage [kafka]
showuon commented on PR #15690: URL: https://github.com/apache/kafka/pull/15690#issuecomment-2076987711 @mimaison @soarez , PR updated. Please take a look when available. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA:-16399: Add JBOD support in tiered storage [kafka]
showuon commented on code in PR #15690: URL: https://github.com/apache/kafka/pull/15690#discussion_r1579328510 ## storage/src/test/java/org/apache/kafka/tiered/storage/actions/AlterLogDirAction.java: ## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tiered.storage.actions; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.TopicPartitionReplica; +import org.apache.kafka.test.TestUtils; +import org.apache.kafka.tiered.storage.TieredStorageTestAction; +import org.apache.kafka.tiered.storage.TieredStorageTestContext; +import org.apache.kafka.tiered.storage.utils.BrokerLocalStorage; + +import java.io.File; +import java.io.PrintStream; +import java.util.Collections; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ExecutionException; + +public final class AlterLogDirAction implements TieredStorageTestAction { + +private final TopicPartition topicPartition; +private final int brokerId; + +public AlterLogDirAction(TopicPartition topicPartition, + int brokerId) { +this.topicPartition = topicPartition; +this.brokerId = brokerId; +} + +@Override +public void doExecute(TieredStorageTestContext context) throws InterruptedException, ExecutionException { +Optional localStorage = context.localStorages().stream().filter(storage -> storage.getBrokerId().intValue() == brokerId).findFirst(); +if (!localStorage.isPresent()) { +throw new IllegalArgumentException("cannot find local storage for this topic partition:" + topicPartition + " in this broker id:" + brokerId); +} + +Optional sourceDir = localStorage.get().getBrokerStorageDirectory().stream().filter(dir -> localStorage.get().isTopicPartitionFileExistInDir(topicPartition, dir)).findFirst(); +Optional targetDir = localStorage.get().getBrokerStorageDirectory().stream().filter(dir -> !localStorage.get().isTopicPartitionFileExistInDir(topicPartition, dir)).findFirst(); +if (!sourceDir.isPresent()) { +throw new IllegalArgumentException("No log dir with topic partition:" + topicPartition + " in this broker id:" + brokerId); +} + +if (!targetDir.isPresent()) { +throw new IllegalArgumentException("No log dir without topic partition:" + topicPartition + " in this broker id:" + brokerId); +} + +// build alterReplicaLogDirs request content to move from sourceDir to targetDir +Map logDirs = Collections.singletonMap(new TopicPartitionReplica(topicPartition.topic(), topicPartition.partition(), brokerId), targetDir.get().getAbsolutePath()); + +context.admin().alterReplicaLogDirs(logDirs); Review Comment: Nice suggestion. Updated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA:-16399: Add JBOD support in tiered storage [kafka]
showuon commented on code in PR #15690: URL: https://github.com/apache/kafka/pull/15690#discussion_r1579328199 ## core/src/main/java/kafka/server/TierStateMachine.java: ## @@ -40,7 +92,72 @@ public interface TierStateMachine { */ PartitionFetchState start(TopicPartition topicPartition, PartitionFetchState currentFetchState, - PartitionData fetchPartitionData) throws Exception; + PartitionData fetchPartitionData) throws Exception { +OffsetAndEpoch epochAndLeaderLocalStartOffset = leader.fetchEarliestLocalOffset(topicPartition, currentFetchState.currentLeaderEpoch()); +int epoch = epochAndLeaderLocalStartOffset.leaderEpoch(); +long leaderLocalStartOffset = epochAndLeaderLocalStartOffset.offset(); + +long offsetToFetch; + replicaMgr.brokerTopicStats().topicStats(topicPartition.topic()).buildRemoteLogAuxStateRequestRate().mark(); + replicaMgr.brokerTopicStats().allTopicsStats().buildRemoteLogAuxStateRequestRate().mark(); + +try { +offsetToFetch = buildRemoteLogAuxState(topicPartition, currentFetchState.currentLeaderEpoch(), leaderLocalStartOffset, epoch, fetchPartitionData.logStartOffset()); +} catch (RemoteStorageException e) { + replicaMgr.brokerTopicStats().topicStats(topicPartition.topic()).failedBuildRemoteLogAuxStateRate().mark(); + replicaMgr.brokerTopicStats().allTopicsStats().failedBuildRemoteLogAuxStateRate().mark(); +throw e; +} + +OffsetAndEpoch fetchLatestOffsetResult = leader.fetchLatestOffset(topicPartition, currentFetchState.currentLeaderEpoch()); +long leaderEndOffset = fetchLatestOffsetResult.offset(); + +long initialLag = leaderEndOffset - offsetToFetch; + +return PartitionFetchState.apply(currentFetchState.topicId(), offsetToFetch, Option.apply(initialLag), currentFetchState.currentLeaderEpoch(), +Fetching$.MODULE$, replicaMgr.localLogOrException(topicPartition).latestEpoch()); Review Comment: I've checked and because the we've done truncation earlier, so we'll return fetching state. And in fetching state, the `latestEpoch` is ignore. The `latestEpoch` is only used when doing truncation. But I still change it to the correct one in case we use that in the future. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA:-16399: Add JBOD support in tiered storage [kafka]
showuon commented on code in PR #15690: URL: https://github.com/apache/kafka/pull/15690#discussion_r1579325677 ## core/src/test/scala/unit/kafka/server/TierStateMachineTest.scala: ## @@ -153,12 +157,13 @@ class ReplicaFetcherTierStateMachineTest { assertEquals(11L, replicaState.logEndOffset) } - @Test - def testFencedOffsetResetAfterMovedToRemoteTier(): Unit = { + @ParameterizedTest + @ArgumentsSource(classOf[TierStateMachineTest.Params]) + def testFencedOffsetResetAfterMovedToRemoteTier(truncateOnFetch: Boolean, useFutureLog: Boolean): Unit = { val partition = new TopicPartition("topic", 0) var isErrorHandled = false val mockLeaderEndpoint = new MockLeaderEndPoint(truncateOnFetch = truncateOnFetch, version = version) -val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndpoint) { +val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndpoint, useFutureLog) { Review Comment: Unfortunately, you're correct! Removed the `useFutureLog`. I'll think about how to do unit test for it. Thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA:-16399: Add JBOD support in tiered storage [kafka]
showuon commented on code in PR #15690: URL: https://github.com/apache/kafka/pull/15690#discussion_r1579212500 ## storage/src/test/java/org/apache/kafka/tiered/storage/actions/AlterLogDirAction.java: ## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tiered.storage.actions; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.TopicPartitionReplica; +import org.apache.kafka.test.TestUtils; +import org.apache.kafka.tiered.storage.TieredStorageTestAction; +import org.apache.kafka.tiered.storage.TieredStorageTestContext; +import org.apache.kafka.tiered.storage.utils.BrokerLocalStorage; + +import java.io.File; +import java.io.PrintStream; +import java.util.Collections; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ExecutionException; + +public final class AlterLogDirAction implements TieredStorageTestAction { + +private final TopicPartition topicPartition; +private final int brokerId; + +public AlterLogDirAction(TopicPartition topicPartition, + int brokerId) { +this.topicPartition = topicPartition; +this.brokerId = brokerId; +} + +@Override +public void doExecute(TieredStorageTestContext context) throws InterruptedException, ExecutionException { +Optional localStorage = context.localStorages().stream().filter(storage -> storage.getBrokerId().intValue() == brokerId).findFirst(); +if (!localStorage.isPresent()) { +throw new IllegalArgumentException("cannot find local storage for this topic partition:" + topicPartition + " in this broker id:" + brokerId); +} + +Optional sourceDir = localStorage.get().getBrokerStorageDirectory().stream().filter(dir -> localStorage.get().isTopicPartitionFileExistInDir(topicPartition, dir)).findFirst(); +Optional targetDir = localStorage.get().getBrokerStorageDirectory().stream().filter(dir -> !localStorage.get().isTopicPartitionFileExistInDir(topicPartition, dir)).findFirst(); +if (!sourceDir.isPresent()) { +throw new IllegalArgumentException("No log dir with topic partition:" + topicPartition + " in this broker id:" + brokerId); +} + +if (!targetDir.isPresent()) { +throw new IllegalArgumentException("No log dir without topic partition:" + topicPartition + " in this broker id:" + brokerId); +} + +// build alterReplicaLogDirs request content to move from sourceDir to targetDir +Map logDirs = Collections.singletonMap(new TopicPartitionReplica(topicPartition.topic(), topicPartition.partition(), brokerId), targetDir.get().getAbsolutePath()); + +context.admin().alterReplicaLogDirs(logDirs); + +// wait until the topic partition folder disappearing from source dir and appearing in the target dir +TestUtils.waitForCondition(() -> localStorage.get().isTopicPartitionFileExistInDir(topicPartition, targetDir.get()) && + !localStorage.get().isTopicPartitionFileExistInDir(topicPartition, sourceDir.get()), +"Failed to alter dir:" + logDirs); +} + +@Override +public void describe(PrintStream output) { +output.print("alter di for topic partition:" + topicPartition + " in this broker id:" + brokerId); Review Comment: Ah, nice catch! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA:-16399: Add JBOD support in tiered storage [kafka]
showuon commented on code in PR #15690: URL: https://github.com/apache/kafka/pull/15690#discussion_r1565635052 ## core/src/main/java/kafka/server/TierStateMachine.java: ## @@ -40,7 +92,72 @@ public interface TierStateMachine { */ PartitionFetchState start(TopicPartition topicPartition, PartitionFetchState currentFetchState, - PartitionData fetchPartitionData) throws Exception; + PartitionData fetchPartitionData) throws Exception { +OffsetAndEpoch epochAndLeaderLocalStartOffset = leader.fetchEarliestLocalOffset(topicPartition, currentFetchState.currentLeaderEpoch()); +int epoch = epochAndLeaderLocalStartOffset.leaderEpoch(); +long leaderLocalStartOffset = epochAndLeaderLocalStartOffset.offset(); + +long offsetToFetch; + replicaMgr.brokerTopicStats().topicStats(topicPartition.topic()).buildRemoteLogAuxStateRequestRate().mark(); + replicaMgr.brokerTopicStats().allTopicsStats().buildRemoteLogAuxStateRequestRate().mark(); + +try { +offsetToFetch = buildRemoteLogAuxState(topicPartition, currentFetchState.currentLeaderEpoch(), leaderLocalStartOffset, epoch, fetchPartitionData.logStartOffset()); +} catch (RemoteStorageException e) { + replicaMgr.brokerTopicStats().topicStats(topicPartition.topic()).failedBuildRemoteLogAuxStateRate().mark(); + replicaMgr.brokerTopicStats().allTopicsStats().failedBuildRemoteLogAuxStateRate().mark(); +throw e; +} + +OffsetAndEpoch fetchLatestOffsetResult = leader.fetchLatestOffset(topicPartition, currentFetchState.currentLeaderEpoch()); +long leaderEndOffset = fetchLatestOffsetResult.offset(); + +long initialLag = leaderEndOffset - offsetToFetch; + +return PartitionFetchState.apply(currentFetchState.topicId(), offsetToFetch, Option.apply(initialLag), currentFetchState.currentLeaderEpoch(), +Fetching$.MODULE$, replicaMgr.localLogOrException(topicPartition).latestEpoch()); Review Comment: Ah, nice catch! Interestingly there's no test caught this error. Let me write a test for it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA:-16399: Add JBOD support in tiered storage [kafka]
mimaison commented on code in PR #15690: URL: https://github.com/apache/kafka/pull/15690#discussion_r1560966402 ## storage/src/test/java/org/apache/kafka/tiered/storage/actions/AlterLogDirAction.java: ## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tiered.storage.actions; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.TopicPartitionReplica; +import org.apache.kafka.test.TestUtils; +import org.apache.kafka.tiered.storage.TieredStorageTestAction; +import org.apache.kafka.tiered.storage.TieredStorageTestContext; +import org.apache.kafka.tiered.storage.utils.BrokerLocalStorage; + +import java.io.File; +import java.io.PrintStream; +import java.util.Collections; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ExecutionException; + +public final class AlterLogDirAction implements TieredStorageTestAction { + +private final TopicPartition topicPartition; +private final int brokerId; + +public AlterLogDirAction(TopicPartition topicPartition, + int brokerId) { +this.topicPartition = topicPartition; +this.brokerId = brokerId; +} + +@Override +public void doExecute(TieredStorageTestContext context) throws InterruptedException, ExecutionException { +Optional localStorage = context.localStorages().stream().filter(storage -> storage.getBrokerId().intValue() == brokerId).findFirst(); +if (!localStorage.isPresent()) { +throw new IllegalArgumentException("cannot find local storage for this topic partition:" + topicPartition + " in this broker id:" + brokerId); +} + +Optional sourceDir = localStorage.get().getBrokerStorageDirectory().stream().filter(dir -> localStorage.get().isTopicPartitionFileExistInDir(topicPartition, dir)).findFirst(); +Optional targetDir = localStorage.get().getBrokerStorageDirectory().stream().filter(dir -> !localStorage.get().isTopicPartitionFileExistInDir(topicPartition, dir)).findFirst(); +if (!sourceDir.isPresent()) { Review Comment: Should we move this before the computation of `targetDir`? ## storage/src/test/java/org/apache/kafka/tiered/storage/actions/AlterLogDirAction.java: ## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tiered.storage.actions; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.TopicPartitionReplica; +import org.apache.kafka.test.TestUtils; +import org.apache.kafka.tiered.storage.TieredStorageTestAction; +import org.apache.kafka.tiered.storage.TieredStorageTestContext; +import org.apache.kafka.tiered.storage.utils.BrokerLocalStorage; + +import java.io.File; +import java.io.PrintStream; +import java.util.Collections; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ExecutionException; + +public final class AlterLogDirAction implements TieredStorageTestAction { + +private final TopicPartition topicPartition; +private final int brokerId; + +public AlterLogDirAction(TopicPartition topicPartition, + int brokerId) { +this.topicPartition = topicPartition; +this.brokerId = brokerId; +} + +@Override +public void doExecute(TieredStorageTestContext context) throws InterruptedException, ExecutionException { +Optiona
Re: [PR] KAFKA:-16399: Add JBOD support in tiered storage [kafka]
soarez commented on code in PR #15690: URL: https://github.com/apache/kafka/pull/15690#discussion_r1560139446 ## core/src/main/java/kafka/server/TierStateMachine.java: ## @@ -40,7 +92,72 @@ public interface TierStateMachine { */ PartitionFetchState start(TopicPartition topicPartition, PartitionFetchState currentFetchState, - PartitionData fetchPartitionData) throws Exception; + PartitionData fetchPartitionData) throws Exception { +OffsetAndEpoch epochAndLeaderLocalStartOffset = leader.fetchEarliestLocalOffset(topicPartition, currentFetchState.currentLeaderEpoch()); +int epoch = epochAndLeaderLocalStartOffset.leaderEpoch(); +long leaderLocalStartOffset = epochAndLeaderLocalStartOffset.offset(); + +long offsetToFetch; + replicaMgr.brokerTopicStats().topicStats(topicPartition.topic()).buildRemoteLogAuxStateRequestRate().mark(); + replicaMgr.brokerTopicStats().allTopicsStats().buildRemoteLogAuxStateRequestRate().mark(); + +try { +offsetToFetch = buildRemoteLogAuxState(topicPartition, currentFetchState.currentLeaderEpoch(), leaderLocalStartOffset, epoch, fetchPartitionData.logStartOffset()); +} catch (RemoteStorageException e) { + replicaMgr.brokerTopicStats().topicStats(topicPartition.topic()).failedBuildRemoteLogAuxStateRate().mark(); + replicaMgr.brokerTopicStats().allTopicsStats().failedBuildRemoteLogAuxStateRate().mark(); +throw e; +} + +OffsetAndEpoch fetchLatestOffsetResult = leader.fetchLatestOffset(topicPartition, currentFetchState.currentLeaderEpoch()); +long leaderEndOffset = fetchLatestOffsetResult.offset(); + +long initialLag = leaderEndOffset - offsetToFetch; + +return PartitionFetchState.apply(currentFetchState.topicId(), offsetToFetch, Option.apply(initialLag), currentFetchState.currentLeaderEpoch(), +Fetching$.MODULE$, replicaMgr.localLogOrException(topicPartition).latestEpoch()); Review Comment: Should `replicaMgr.futureLogOrException` be used instead, if `useFutureLog`? ## storage/src/test/java/org/apache/kafka/tiered/storage/utils/BrokerLocalStorage.java: ## @@ -175,8 +186,38 @@ private OffsetHolder getEarliestLocalOffset(TopicPartition topicPartition) { return new OffsetHolder(LogFileUtils.offsetFromFileName(firstLogFile.get()), partitionFiles); } -private List getTopicPartitionFiles(TopicPartition topicPartition) { -File[] files = brokerStorageDirectory.listFiles((dir, name) -> name.equals(topicPartition.toString())); +public boolean isTopicPartitionFileExistInDir(TopicPartition topicPartition, File logDir) { Review Comment: Maybe `dirContainsTopicPartition` is a better name for this method? ## storage/src/test/java/org/apache/kafka/tiered/storage/utils/BrokerLocalStorage.java: ## @@ -31,31 +31,36 @@ import java.util.List; import java.util.Objects; import java.util.Optional; +import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.Collectors; public final class BrokerLocalStorage { private final Integer brokerId; -private final File brokerStorageDirectory; +private final Set brokerStorageDirectory; private final Integer storageWaitTimeoutSec; private final int storagePollPeriodSec = 1; private final Time time = Time.SYSTEM; public BrokerLocalStorage(Integer brokerId, - String storageDirname, + Set storageDirname, Integer storageWaitTimeoutSec) { this.brokerId = brokerId; -this.brokerStorageDirectory = new File(storageDirname); +this.brokerStorageDirectory = storageDirname.stream().map(File::new).collect(Collectors.toSet()); Review Comment: The names for parameter `storageDirname` and field `brokerStorageDirectory` should be pluralized. ## core/src/test/scala/unit/kafka/server/TierStateMachineTest.scala: ## @@ -23,20 +23,23 @@ import org.apache.kafka.common.protocol.ApiKeys import org.apache.kafka.common.record._ import org.apache.kafka.common.{TopicPartition, Uuid} import org.junit.jupiter.api.Assertions._ -import org.junit.jupiter.api.Test import kafka.server.FetcherThreadTestUtils.{initialFetchState, mkBatch} +import org.junit.jupiter.api.extension.ExtensionContext +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.{Arguments, ArgumentsProvider, ArgumentsSource} import scala.collection.Map -class ReplicaFetcherTierStateMachineTest { +class TierStateMachineTest { - val truncateOnFetch = true + val truncateOnFetch = false Review Comment: This can be removed ## core/src/test/scala/unit/kafka/server/T
[PR] KAFKA:-16399: Add JBOD support in tiered storage [kafka]
showuon opened a new pull request, #15690: URL: https://github.com/apache/kafka/pull/15690 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org