[GitHub] [kafka] satishd commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.
satishd commented on code in PR #13535: URL: https://github.com/apache/kafka/pull/13535#discussion_r1184873142 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -670,6 +875,14 @@ public void close() { } catch (InterruptedException e) { // ignore } +remoteStorageReaderThreadPool.shutdownNow(); +//waits for 2 mins to terminate the current tasks +try { +remoteStorageReaderThreadPool.awaitTermination(2, TimeUnit.MINUTES); Review Comment: It does not require that to be completed in 5 mins. `lifecycleManager.controlledShutdownFuture` is more about processing the controlled shutdown event to the controller for that broker. It will wait for 5 mins before proceeding with other sequence of actions. But that will not get affected because of the code introduced here. Logging subsystem handles unclean shutdown for log segments and it would have been already finished before RemoteLogManager is closed. So, they will not get affected because of this timeout. But we can have a short duration here like 10 secs, we can revisit introducing a config if it is really needed for closing the remote log subsystem. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.
satishd commented on code in PR #13535: URL: https://github.com/apache/kafka/pull/13535#discussion_r1184873142 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -670,6 +875,14 @@ public void close() { } catch (InterruptedException e) { // ignore } +remoteStorageReaderThreadPool.shutdownNow(); +//waits for 2 mins to terminate the current tasks +try { +remoteStorageReaderThreadPool.awaitTermination(2, TimeUnit.MINUTES); Review Comment: It does not require that to be completed in 5 mins. That code `lifecycleManager.controlledShutdownFuture` is more about processing the controlled shutdown event to the controller for that broker. It will wait for 5 mins before proceeding with other sequence of actions. But that will not get affected because of the code introduced here. Logging subsystem handles unclean shutdown for log segments and it would have been already finished before RemoteLogManager is closed. So, they will not get affected because of this timeout. But we can have a short duration here like 10 secs, we can revisit introducing a config if it is really needed for closing the remote log subsystem. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.
satishd commented on code in PR #13535: URL: https://github.com/apache/kafka/pull/13535#discussion_r1194768781 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -600,25 +623,204 @@ public String toString() { } } -long findHighestRemoteOffset(TopicIdPartition topicIdPartition) throws RemoteStorageException { -Optional offset = Optional.empty(); -Optional maybeLog = fetchLog.apply(topicIdPartition.topicPartition()); -if (maybeLog.isPresent()) { -UnifiedLog log = maybeLog.get(); -Option maybeLeaderEpochFileCache = log.leaderEpochCache(); -if (maybeLeaderEpochFileCache.isDefined()) { -LeaderEpochFileCache cache = maybeLeaderEpochFileCache.get(); -OptionalInt epoch = cache.latestEpoch(); -while (!offset.isPresent() && epoch.isPresent()) { -offset = remoteLogMetadataManager.highestOffsetForEpoch(topicIdPartition, epoch.getAsInt()); -epoch = cache.previousEpoch(epoch.getAsInt()); +public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) throws RemoteStorageException, IOException { +int fetchMaxBytes = remoteStorageFetchInfo.fetchMaxBytes; +TopicPartition tp = remoteStorageFetchInfo.topicPartition; +FetchRequest.PartitionData fetchInfo = remoteStorageFetchInfo.fetchInfo; + +boolean includeAbortedTxns = remoteStorageFetchInfo.fetchIsolation == FetchIsolation.TXN_COMMITTED; + +long offset = fetchInfo.fetchOffset; +int maxBytes = Math.min(fetchMaxBytes, fetchInfo.maxBytes); + +Optional logOptional = fetchLog.apply(tp); +OptionalInt epoch = OptionalInt.empty(); + +if (logOptional.isPresent()) { +Option leaderEpochCache = logOptional.get().leaderEpochCache(); +if (leaderEpochCache.isDefined()) { +epoch = leaderEpochCache.get().epochForOffset(offset); +} +} + +Optional rlsMetadataOptional = epoch.isPresent() +? fetchRemoteLogSegmentMetadata(tp, epoch.getAsInt(), offset) +: Optional.empty(); + +if (!rlsMetadataOptional.isPresent()) { +String epochStr = (epoch.isPresent()) ? Integer.toString(epoch.getAsInt()) : "NOT AVAILABLE"; +throw new OffsetOutOfRangeException("Received request for offset " + offset + " for leader epoch " ++ epochStr + " and partition " + tp + " which does not exist in remote tier."); +} + +RemoteLogSegmentMetadata remoteLogSegmentMetadata = rlsMetadataOptional.get(); +int startPos = lookupPositionForOffset(remoteLogSegmentMetadata, offset); +InputStream remoteSegInputStream = null; +try { +// Search forward for the position of the last offset that is greater than or equal to the target offset +remoteSegInputStream = remoteLogStorageManager.fetchLogSegment(remoteLogSegmentMetadata, startPos); +RemoteLogInputStream remoteLogInputStream = new RemoteLogInputStream(remoteSegInputStream); + +RecordBatch firstBatch = findFirstBatch(remoteLogInputStream, offset); + +if (firstBatch == null) +return new FetchDataInfo(new LogOffsetMetadata(offset), MemoryRecords.EMPTY, false, +includeAbortedTxns ? Optional.of(Collections.emptyList()) : Optional.empty()); + +int firstBatchSize = firstBatch.sizeInBytes(); +// An empty record is sent instead of an incomplete batch when +// - there is no minimum-one-message constraint and +// - the first batch size is more than maximum bytes that can be sent and +// - for FetchRequest version 3 or above. +if (!remoteStorageFetchInfo.minOneMessage && +!remoteStorageFetchInfo.hardMaxBytesLimit && +firstBatchSize > maxBytes) { +return new FetchDataInfo(new LogOffsetMetadata(offset), MemoryRecords.EMPTY); +} + +int updatedFetchSize = +remoteStorageFetchInfo.minOneMessage && firstBatchSize > maxBytes ? firstBatchSize : maxBytes; + +ByteBuffer buffer = ByteBuffer.allocate(updatedFetchSize); +int remainingBytes = updatedFetchSize; + +firstBatch.writeTo(buffer); +remainingBytes -= firstBatchSize; + +if (remainingBytes > 0) { +// read the input stream until min of (EOF stream or buffer's remaining capacity). +Utils.readFully(remoteSegInputStream, buffer); +} +buffer.flip(); + +FetchDataInfo fetchDataInfo = new FetchDataInfo( +new LogOffsetMetadata(offset, remoteLogSegmentMetadata.startOffset(), startPos), +MemoryRecords.readableRecords(buffer)); +
[GitHub] [kafka] satishd commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.
satishd commented on code in PR #13535: URL: https://github.com/apache/kafka/pull/13535#discussion_r1192195822 ## core/src/test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala: ## @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server + +import kafka.cluster.Partition +import org.apache.kafka.common.errors.NotLeaderOrFollowerException +import org.apache.kafka.common.protocol.{ApiKeys, Errors} +import org.apache.kafka.common.record.MemoryRecords +import org.apache.kafka.common.requests.FetchRequest +import org.apache.kafka.common.{TopicIdPartition, Uuid} +import org.apache.kafka.storage.internals.log._ +import org.junit.jupiter.api.Assertions._ +import org.junit.jupiter.api.Test +import org.mockito.Mockito.{mock, when} + +import java.util.Optional +import java.util.concurrent.CompletableFuture + +import scala.collection._ Review Comment: We have a few more tests that need to be pulled out from the existing 2.8.x repo with other changes and refactor from EasyMock to Mockito, we can convert this test into Java in that PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.
satishd commented on code in PR #13535: URL: https://github.com/apache/kafka/pull/13535#discussion_r1192210616 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -600,25 +623,210 @@ public String toString() { } } -long findHighestRemoteOffset(TopicIdPartition topicIdPartition) throws RemoteStorageException { -Optional offset = Optional.empty(); -Optional maybeLog = fetchLog.apply(topicIdPartition.topicPartition()); -if (maybeLog.isPresent()) { -UnifiedLog log = maybeLog.get(); -Option maybeLeaderEpochFileCache = log.leaderEpochCache(); -if (maybeLeaderEpochFileCache.isDefined()) { -LeaderEpochFileCache cache = maybeLeaderEpochFileCache.get(); -OptionalInt epoch = cache.latestEpoch(); -while (!offset.isPresent() && epoch.isPresent()) { -offset = remoteLogMetadataManager.highestOffsetForEpoch(topicIdPartition, epoch.getAsInt()); -epoch = cache.previousEpoch(epoch.getAsInt()); +public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) throws RemoteStorageException, IOException { +int fetchMaxBytes = remoteStorageFetchInfo.fetchMaxBytes; +TopicPartition tp = remoteStorageFetchInfo.topicPartition; +FetchRequest.PartitionData fetchInfo = remoteStorageFetchInfo.fetchInfo; + +boolean includeAbortedTxns = remoteStorageFetchInfo.fetchIsolation == FetchIsolation.TXN_COMMITTED; + +long offset = fetchInfo.fetchOffset; +int maxBytes = Math.min(fetchMaxBytes, fetchInfo.maxBytes); + +Optional logOptional = fetchLog.apply(tp); +OptionalInt epoch = OptionalInt.empty(); + +if (logOptional.isPresent()) { +Option leaderEpochCache = logOptional.get().leaderEpochCache(); +if (leaderEpochCache.isDefined()) { +epoch = leaderEpochCache.get().epochForOffset(offset); +} +} + +Optional rlsMetadataOptional = epoch.isPresent() +? fetchRemoteLogSegmentMetadata(tp, epoch.getAsInt(), offset) +: Optional.empty(); + +if (!rlsMetadataOptional.isPresent()) { +String epochStr = (epoch.isPresent()) ? Integer.toString(epoch.getAsInt()) : "NOT AVAILABLE"; +throw new OffsetOutOfRangeException("Received request for offset " + offset + " for leader epoch " ++ epochStr + " and partition " + tp + " which does not exist in remote tier."); +} + +RemoteLogSegmentMetadata remoteLogSegmentMetadata = rlsMetadataOptional.get(); +int startPos = lookupPositionForOffset(remoteLogSegmentMetadata, offset); +InputStream remoteSegInputStream = null; +try { +// Search forward for the position of the last offset that is greater than or equal to the target offset +remoteSegInputStream = remoteLogStorageManager.fetchLogSegment(remoteLogSegmentMetadata, startPos); +RemoteLogInputStream remoteLogInputStream = new RemoteLogInputStream(remoteSegInputStream); + +RecordBatch firstBatch = findFirstBatch(remoteLogInputStream, offset); + +if (firstBatch == null) +return new FetchDataInfo(new LogOffsetMetadata(offset), MemoryRecords.EMPTY, false, +includeAbortedTxns ? Optional.of(Collections.emptyList()) : Optional.empty()); + +int firstBatchSize = firstBatch.sizeInBytes(); +// An empty record is sent instead of an incomplete batch when +// - there is no minimum-one-message constraint and +// - the first batch size is more than maximum bytes that can be sent. +// - for FetchRequest version 3 or above and +if (!remoteStorageFetchInfo.minOneMessage && +!remoteStorageFetchInfo.hardMaxBytesLimit && +firstBatchSize > maxBytes) { +return new FetchDataInfo(new LogOffsetMetadata(offset), MemoryRecords.EMPTY); +} + +int updatedFetchSize = +remoteStorageFetchInfo.minOneMessage && firstBatchSize > maxBytes ? firstBatchSize : maxBytes; + +ByteBuffer buffer = ByteBuffer.allocate(updatedFetchSize); +int remainingBytes = updatedFetchSize; + +firstBatch.writeTo(buffer); +remainingBytes -= firstBatchSize; + +if (remainingBytes > 0) { +// read the input stream until min of (EOF stream or buffer's remaining capacity). +Utils.readFully(remoteSegInputStream, buffer); +} +buffer.flip(); + +FetchDataInfo fetchDataInfo = new FetchDataInfo( +new LogOffsetMetadata(offset, remoteLogSegmentMetadata.startOffset(), startPos), +MemoryRecords.readableRecords(buffer)); +
[GitHub] [kafka] satishd commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.
satishd commented on code in PR #13535: URL: https://github.com/apache/kafka/pull/13535#discussion_r1192196398 ## core/src/test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala: ## @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server + +import kafka.cluster.Partition +import org.apache.kafka.common.errors.NotLeaderOrFollowerException +import org.apache.kafka.common.protocol.{ApiKeys, Errors} +import org.apache.kafka.common.record.MemoryRecords +import org.apache.kafka.common.requests.FetchRequest +import org.apache.kafka.common.{TopicIdPartition, Uuid} +import org.apache.kafka.storage.internals.log._ +import org.junit.jupiter.api.Assertions._ +import org.junit.jupiter.api.Test +import org.mockito.Mockito.{mock, when} + +import java.util.Optional +import java.util.concurrent.CompletableFuture + +import scala.collection._ + +class DelayedRemoteFetchTest { + private val maxBytes = 1024 + private val replicaManager: ReplicaManager = mock(classOf[ReplicaManager]) + private val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "topic") + private val fetchOffset = 500L + private val logStartOffset = 0L + private val currentLeaderEpoch = Optional.of[Integer](10) + private val replicaId = 1 + + private val fetchStatus = FetchPartitionStatus( +startOffsetMetadata = new LogOffsetMetadata(fetchOffset), +fetchInfo = new FetchRequest.PartitionData(Uuid.ZERO_UUID, fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch)) + private val fetchParams = buildFollowerFetchParams(replicaId, maxWaitMs = 500) + + @Test + def testFetchWithFencedEpoch(): Unit = { Review Comment: This is not meant to be fenced epoch. It is covered in another test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.
satishd commented on code in PR #13535: URL: https://github.com/apache/kafka/pull/13535#discussion_r1192195822 ## core/src/test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala: ## @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server + +import kafka.cluster.Partition +import org.apache.kafka.common.errors.NotLeaderOrFollowerException +import org.apache.kafka.common.protocol.{ApiKeys, Errors} +import org.apache.kafka.common.record.MemoryRecords +import org.apache.kafka.common.requests.FetchRequest +import org.apache.kafka.common.{TopicIdPartition, Uuid} +import org.apache.kafka.storage.internals.log._ +import org.junit.jupiter.api.Assertions._ +import org.junit.jupiter.api.Test +import org.mockito.Mockito.{mock, when} + +import java.util.Optional +import java.util.concurrent.CompletableFuture + +import scala.collection._ Review Comment: We have a few more tests that need to be pulled out from the existing repo with other changes, we can convert this test into Java in that PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.
satishd commented on code in PR #13535: URL: https://github.com/apache/kafka/pull/13535#discussion_r1192183587 ## core/src/test/java/kafka/log/remote/RemoteLogReaderTest.java: ## @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.log.remote; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.OffsetOutOfRangeException; +import org.apache.kafka.common.record.Records; +import org.apache.kafka.server.log.remote.storage.RemoteStorageException; +import org.apache.kafka.storage.internals.log.FetchDataInfo; +import org.apache.kafka.storage.internals.log.LogOffsetMetadata; +import org.apache.kafka.storage.internals.log.RemoteLogReadResult; +import org.apache.kafka.storage.internals.log.RemoteStorageFetchInfo; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; + +import java.io.IOException; +import java.util.function.Consumer; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class RemoteLogReaderTest { Review Comment: Sure, you can add more unit tests. We have a few in Scala, which will be converted to Java and raised in a followup PR later. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.
satishd commented on code in PR #13535: URL: https://github.com/apache/kafka/pull/13535#discussion_r1189455928 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -600,25 +622,208 @@ public String toString() { } } -long findHighestRemoteOffset(TopicIdPartition topicIdPartition) throws RemoteStorageException { -Optional offset = Optional.empty(); -Optional maybeLog = fetchLog.apply(topicIdPartition.topicPartition()); -if (maybeLog.isPresent()) { -UnifiedLog log = maybeLog.get(); -Option maybeLeaderEpochFileCache = log.leaderEpochCache(); -if (maybeLeaderEpochFileCache.isDefined()) { -LeaderEpochFileCache cache = maybeLeaderEpochFileCache.get(); -OptionalInt epoch = cache.latestEpoch(); -while (!offset.isPresent() && epoch.isPresent()) { -offset = remoteLogMetadataManager.highestOffsetForEpoch(topicIdPartition, epoch.getAsInt()); -epoch = cache.previousEpoch(epoch.getAsInt()); +public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) throws RemoteStorageException, IOException { +int fetchMaxBytes = remoteStorageFetchInfo.fetchMaxBytes; +TopicPartition tp = remoteStorageFetchInfo.topicPartition; +FetchRequest.PartitionData fetchInfo = remoteStorageFetchInfo.fetchInfo; + +boolean includeAbortedTxns = remoteStorageFetchInfo.fetchIsolation == FetchIsolation.TXN_COMMITTED; + +long offset = fetchInfo.fetchOffset; +int maxBytes = Math.min(fetchMaxBytes, fetchInfo.maxBytes); + +Optional logOptional = fetchLog.apply(tp); +OptionalInt epoch = OptionalInt.empty(); + +if (logOptional.isPresent()) { +Option leaderEpochCache = logOptional.get().leaderEpochCache(); +if (leaderEpochCache.isDefined()) { +epoch = leaderEpochCache.get().epochForOffset(offset); +} +} + +Optional rlsMetadataOptional = epoch.isPresent() +? fetchRemoteLogSegmentMetadata(tp, epoch.getAsInt(), offset) +: Optional.empty(); + +if (!rlsMetadataOptional.isPresent()) { +String epochStr = (epoch.isPresent()) ? Integer.toString(epoch.getAsInt()) : "NOT AVAILABLE"; +throw new OffsetOutOfRangeException("Received request for offset " + offset + " for leader epoch " ++ epochStr + " and partition " + tp + " which does not exist in remote tier."); +} + +RemoteLogSegmentMetadata remoteLogSegmentMetadata = rlsMetadataOptional.get(); +int startPos = lookupPositionForOffset(remoteLogSegmentMetadata, offset); +InputStream remoteSegInputStream = null; +try { +// Search forward for the position of the last offset that is greater than or equal to the target offset +remoteSegInputStream = remoteLogStorageManager.fetchLogSegment(remoteLogSegmentMetadata, startPos); +RemoteLogInputStream remoteLogInputStream = new RemoteLogInputStream(remoteSegInputStream); + +RecordBatch firstBatch = findFirstBatch(remoteLogInputStream, offset); + +if (firstBatch == null) +return new FetchDataInfo(new LogOffsetMetadata(offset), MemoryRecords.EMPTY, false, +includeAbortedTxns ? Optional.of(Collections.emptyList()) : Optional.empty()); + +// An empty record is sent instead of an incomplete batch when there is no minimum-one-message constraint +// and for FetchRequest version 3 and above and the first batch size is more than maximum bytes that can be sent. +int firstBatchSize = firstBatch.sizeInBytes(); +if (!remoteStorageFetchInfo.minOneMessage && +!remoteStorageFetchInfo.hardMaxBytesLimit && +firstBatchSize > maxBytes) { +return new FetchDataInfo(new LogOffsetMetadata(offset), MemoryRecords.EMPTY); +} + +int updatedFetchSize = +remoteStorageFetchInfo.minOneMessage && firstBatchSize > maxBytes ? firstBatchSize : maxBytes; + +ByteBuffer buffer = ByteBuffer.allocate(updatedFetchSize); +int remainingBytes = updatedFetchSize; + +firstBatch.writeTo(buffer); +remainingBytes -= firstBatchSize; + +if (remainingBytes > 0) { +// read the input stream until min of (EOF stream or buffer's remaining capacity). +Utils.readFully(remoteSegInputStream, buffer); +} +buffer.flip(); + +FetchDataInfo fetchDataInfo = new FetchDataInfo( +new LogOffsetMetadata(offset, remoteLogSegmentMetadata.startOffset(), startPos), +MemoryRecords.readableRecords(buffer)); +if (includeAbortedTxns) { +
[GitHub] [kafka] satishd commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.
satishd commented on code in PR #13535: URL: https://github.com/apache/kafka/pull/13535#discussion_r1188566365 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -1288,17 +1373,46 @@ class ReplicaManager(val config: KafkaConfig, _: FencedLeaderEpochException | _: ReplicaNotAvailableException | _: KafkaStorageException | - _: OffsetOutOfRangeException | _: InconsistentTopicIdException) => - LogReadResult(info = new FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY), -divergingEpoch = None, -highWatermark = UnifiedLog.UnknownOffset, -leaderLogStartOffset = UnifiedLog.UnknownOffset, -leaderLogEndOffset = UnifiedLog.UnknownOffset, -followerLogStartOffset = UnifiedLog.UnknownOffset, -fetchTimeMs = -1L, -lastStableOffset = None, -exception = Some(e)) + createLogReadResult(e) +case e: OffsetOutOfRangeException => + // In case of offset out of range errors, check for remote log manager for non-compacted topics + // to fetch from remote storage. `log` instance should not be null here as that would have been caught earlier + // with NotLeaderForPartitionException or ReplicaNotAvailableException. + // If it is from a follower then send the offset metadata only as the data is already available in remote + // storage. + if (remoteLogManager.isDefined && log != null && log.remoteLogEnabled() && +// Check that the fetch offset is within the offset range within the remote storage layer. +log.logStartOffset <= offset && offset < log.localLogStartOffset()) { +// For follower fetch requests, throw an error saying that this offset is moved to tiered storage. +val highWatermark = log.highWatermark +val leaderLogStartOffset = log.logStartOffset Review Comment: It is fine as the offset can always be updated, we will send whatever is the value available at that time. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.
satishd commented on code in PR #13535: URL: https://github.com/apache/kafka/pull/13535#discussion_r1188479417 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -1288,17 +1373,46 @@ class ReplicaManager(val config: KafkaConfig, _: FencedLeaderEpochException | _: ReplicaNotAvailableException | _: KafkaStorageException | - _: OffsetOutOfRangeException | _: InconsistentTopicIdException) => - LogReadResult(info = new FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY), -divergingEpoch = None, -highWatermark = UnifiedLog.UnknownOffset, -leaderLogStartOffset = UnifiedLog.UnknownOffset, -leaderLogEndOffset = UnifiedLog.UnknownOffset, -followerLogStartOffset = UnifiedLog.UnknownOffset, -fetchTimeMs = -1L, -lastStableOffset = None, -exception = Some(e)) + createLogReadResult(e) +case e: OffsetOutOfRangeException => + // In case of offset out of range errors, check for remote log manager for non-compacted topics + // to fetch from remote storage. `log` instance should not be null here as that would have been caught earlier + // with NotLeaderForPartitionException or ReplicaNotAvailableException. + // If it is from a follower then send the offset metadata only as the data is already available in remote + // storage. + if (remoteLogManager.isDefined && log != null && log.remoteLogEnabled() && +// Check that the fetch offset is within the offset range within the remote storage layer. +log.logStartOffset <= offset && offset < log.localLogStartOffset()) { Review Comment: That should work fine because it will eventually throw offset out-of-range error if the target offset does not exist. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.
satishd commented on code in PR #13535: URL: https://github.com/apache/kafka/pull/13535#discussion_r1188470147 ## core/src/test/java/kafka/log/remote/RemoteLogReaderTest.java: ## @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.log.remote; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.OffsetOutOfRangeException; +import org.apache.kafka.common.record.Records; +import org.apache.kafka.server.log.remote.storage.RemoteStorageException; +import org.apache.kafka.storage.internals.log.FetchDataInfo; +import org.apache.kafka.storage.internals.log.LogOffsetMetadata; +import org.apache.kafka.storage.internals.log.RemoteLogReadResult; +import org.apache.kafka.storage.internals.log.RemoteStorageFetchInfo; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; + +import java.io.IOException; +import java.util.function.Consumer; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class RemoteLogReaderTest { +RemoteLogManager mockRLM = mock(RemoteLogManager.class); +LogOffsetMetadata logOffsetMetadata = new LogOffsetMetadata(100); +Records records = mock(Records.class); + + +@Test +public void testRemoteLogReaderWithoutError() throws RemoteStorageException, IOException { +FetchDataInfo fetchDataInfo = new FetchDataInfo(logOffsetMetadata, records); + when(mockRLM.read(any(RemoteStorageFetchInfo.class))).thenReturn(fetchDataInfo); + +Consumer callback = mock(Consumer.class); +RemoteStorageFetchInfo remoteStorageFetchInfo = new RemoteStorageFetchInfo(0, false, new TopicPartition("test", 0), null, null, false); +RemoteLogReader remoteLogReader = new RemoteLogReader(remoteStorageFetchInfo, mockRLM, callback); +remoteLogReader.call(); + +// verify the callback did get invoked with the expected remoteLogReadResult +ArgumentCaptor remoteLogReadResultArg = ArgumentCaptor.forClass(RemoteLogReadResult.class); +verify(callback, times(1)).accept(remoteLogReadResultArg.capture()); +RemoteLogReadResult actualRemoteLogReadResult = remoteLogReadResultArg.getValue(); +assertFalse(actualRemoteLogReadResult.error.isPresent()); +assertTrue(actualRemoteLogReadResult.fetchDataInfo.isPresent()); +assertEquals(fetchDataInfo, actualRemoteLogReadResult.fetchDataInfo.get()); +} + +@Test +public void testRemoteLogReaderWithError() throws RemoteStorageException, IOException { Review Comment: We will add more tests in followup PRs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.
satishd commented on code in PR #13535: URL: https://github.com/apache/kafka/pull/13535#discussion_r1187999169 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -1083,48 +1095,100 @@ class ReplicaManager(val config: KafkaConfig, fetchPartitionStatus += (topicIdPartition -> FetchPartitionStatus(logOffsetMetadata, partitionData)) }) } - val delayedFetch = new DelayedFetch( -params = params, -fetchPartitionStatus = fetchPartitionStatus, -replicaManager = this, -quota = quota, -responseCallback = responseCallback - ) - - // create a list of (topic, partition) pairs to use as keys for this delayed fetch operation - val delayedFetchKeys = fetchPartitionStatus.map { case (tp, _) => TopicPartitionOperationKey(tp) } - - // try to complete the request immediately, otherwise put it into the purgatory; - // this is because while the delayed fetch operation is being created, new requests - // may arrive and hence make this operation completable. - delayedFetchPurgatory.tryCompleteElseWatch(delayedFetch, delayedFetchKeys) + + if (remoteFetchInfo.isPresent) { Review Comment: Sure, that check was missed while pulling the changes. Good catch. Updated it with the latest commit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.
satishd commented on code in PR #13535: URL: https://github.com/apache/kafka/pull/13535#discussion_r1187999169 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -1083,48 +1095,100 @@ class ReplicaManager(val config: KafkaConfig, fetchPartitionStatus += (topicIdPartition -> FetchPartitionStatus(logOffsetMetadata, partitionData)) }) } - val delayedFetch = new DelayedFetch( -params = params, -fetchPartitionStatus = fetchPartitionStatus, -replicaManager = this, -quota = quota, -responseCallback = responseCallback - ) - - // create a list of (topic, partition) pairs to use as keys for this delayed fetch operation - val delayedFetchKeys = fetchPartitionStatus.map { case (tp, _) => TopicPartitionOperationKey(tp) } - - // try to complete the request immediately, otherwise put it into the purgatory; - // this is because while the delayed fetch operation is being created, new requests - // may arrive and hence make this operation completable. - delayedFetchPurgatory.tryCompleteElseWatch(delayedFetch, delayedFetchKeys) + + if (remoteFetchInfo.isPresent) { Review Comment: Sure, that check was missed. Good catch. Updated it with the latest commit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.
satishd commented on code in PR #13535: URL: https://github.com/apache/kafka/pull/13535#discussion_r1187999169 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -1083,48 +1095,100 @@ class ReplicaManager(val config: KafkaConfig, fetchPartitionStatus += (topicIdPartition -> FetchPartitionStatus(logOffsetMetadata, partitionData)) }) } - val delayedFetch = new DelayedFetch( -params = params, -fetchPartitionStatus = fetchPartitionStatus, -replicaManager = this, -quota = quota, -responseCallback = responseCallback - ) - - // create a list of (topic, partition) pairs to use as keys for this delayed fetch operation - val delayedFetchKeys = fetchPartitionStatus.map { case (tp, _) => TopicPartitionOperationKey(tp) } - - // try to complete the request immediately, otherwise put it into the purgatory; - // this is because while the delayed fetch operation is being created, new requests - // may arrive and hence make this operation completable. - delayedFetchPurgatory.tryCompleteElseWatch(delayedFetch, delayedFetchKeys) + + if (remoteFetchInfo.isPresent) { Review Comment: Sure, updated it with the latest commit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.
satishd commented on code in PR #13535: URL: https://github.com/apache/kafka/pull/13535#discussion_r1187412666 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -1083,48 +1095,100 @@ class ReplicaManager(val config: KafkaConfig, fetchPartitionStatus += (topicIdPartition -> FetchPartitionStatus(logOffsetMetadata, partitionData)) }) } - val delayedFetch = new DelayedFetch( -params = params, -fetchPartitionStatus = fetchPartitionStatus, -replicaManager = this, -quota = quota, -responseCallback = responseCallback - ) - - // create a list of (topic, partition) pairs to use as keys for this delayed fetch operation - val delayedFetchKeys = fetchPartitionStatus.map { case (tp, _) => TopicPartitionOperationKey(tp) } - - // try to complete the request immediately, otherwise put it into the purgatory; - // this is because while the delayed fetch operation is being created, new requests - // may arrive and hence make this operation completable. - delayedFetchPurgatory.tryCompleteElseWatch(delayedFetch, delayedFetchKeys) + + if (remoteFetchInfo.isPresent) { Review Comment: Do you mean to say that we should not return immediately if `remoteFetchInfo` exists because that should be served otherwise remote fetches may starve as long as there is enough data immediately available to be sent? So, the condition becomes ``` if (!remoteFetchInfo.isPresent && (params.maxWaitMs <= 0 || fetchInfos.isEmpty || bytesReadable >= params.minBytes || errorReadingData || hasDivergingEpoch || hasPreferredReadReplica)) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.
satishd commented on code in PR #13535: URL: https://github.com/apache/kafka/pull/13535#discussion_r1186813113 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -600,25 +622,208 @@ public String toString() { } } -long findHighestRemoteOffset(TopicIdPartition topicIdPartition) throws RemoteStorageException { -Optional offset = Optional.empty(); -Optional maybeLog = fetchLog.apply(topicIdPartition.topicPartition()); -if (maybeLog.isPresent()) { -UnifiedLog log = maybeLog.get(); -Option maybeLeaderEpochFileCache = log.leaderEpochCache(); -if (maybeLeaderEpochFileCache.isDefined()) { -LeaderEpochFileCache cache = maybeLeaderEpochFileCache.get(); -OptionalInt epoch = cache.latestEpoch(); -while (!offset.isPresent() && epoch.isPresent()) { -offset = remoteLogMetadataManager.highestOffsetForEpoch(topicIdPartition, epoch.getAsInt()); -epoch = cache.previousEpoch(epoch.getAsInt()); +public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) throws RemoteStorageException, IOException { +int fetchMaxBytes = remoteStorageFetchInfo.fetchMaxBytes; +TopicPartition tp = remoteStorageFetchInfo.topicPartition; +FetchRequest.PartitionData fetchInfo = remoteStorageFetchInfo.fetchInfo; + +boolean includeAbortedTxns = remoteStorageFetchInfo.fetchIsolation == FetchIsolation.TXN_COMMITTED; + +long offset = fetchInfo.fetchOffset; +int maxBytes = Math.min(fetchMaxBytes, fetchInfo.maxBytes); + +Optional logOptional = fetchLog.apply(tp); +OptionalInt epoch = OptionalInt.empty(); + +if (logOptional.isPresent()) { +Option leaderEpochCache = logOptional.get().leaderEpochCache(); +if (leaderEpochCache.isDefined()) { +epoch = leaderEpochCache.get().epochForOffset(offset); +} +} + +Optional rlsMetadataOptional = epoch.isPresent() +? fetchRemoteLogSegmentMetadata(tp, epoch.getAsInt(), offset) +: Optional.empty(); + +if (!rlsMetadataOptional.isPresent()) { +String epochStr = (epoch.isPresent()) ? Integer.toString(epoch.getAsInt()) : "NOT AVAILABLE"; +throw new OffsetOutOfRangeException("Received request for offset " + offset + " for leader epoch " ++ epochStr + " and partition " + tp + " which does not exist in remote tier."); +} + +RemoteLogSegmentMetadata remoteLogSegmentMetadata = rlsMetadataOptional.get(); +int startPos = lookupPositionForOffset(remoteLogSegmentMetadata, offset); +InputStream remoteSegInputStream = null; +try { +// Search forward for the position of the last offset that is greater than or equal to the target offset +remoteSegInputStream = remoteLogStorageManager.fetchLogSegment(remoteLogSegmentMetadata, startPos); +RemoteLogInputStream remoteLogInputStream = new RemoteLogInputStream(remoteSegInputStream); + +RecordBatch firstBatch = findFirstBatch(remoteLogInputStream, offset); + +if (firstBatch == null) +return new FetchDataInfo(new LogOffsetMetadata(offset), MemoryRecords.EMPTY, false, +includeAbortedTxns ? Optional.of(Collections.emptyList()) : Optional.empty()); + +// An empty record is sent instead of an incomplete batch when there is no minimum-one-message constraint +// and for FetchRequest version 3 and above and the first batch size is more than maximum bytes that can be sent. +int firstBatchSize = firstBatch.sizeInBytes(); +if (!remoteStorageFetchInfo.minOneMessage && +!remoteStorageFetchInfo.hardMaxBytesLimit && +firstBatchSize > maxBytes) { +return new FetchDataInfo(new LogOffsetMetadata(offset), MemoryRecords.EMPTY); +} + +int updatedFetchSize = +remoteStorageFetchInfo.minOneMessage && firstBatchSize > maxBytes ? firstBatchSize : maxBytes; + +ByteBuffer buffer = ByteBuffer.allocate(updatedFetchSize); +int remainingBytes = updatedFetchSize; + +firstBatch.writeTo(buffer); +remainingBytes -= firstBatchSize; + +if (remainingBytes > 0) { +// read the input stream until min of (EOF stream or buffer's remaining capacity). +Utils.readFully(remoteSegInputStream, buffer); +} +buffer.flip(); + +FetchDataInfo fetchDataInfo = new FetchDataInfo( +new LogOffsetMetadata(offset, remoteLogSegmentMetadata.startOffset(), startPos), +MemoryRecords.readableRecords(buffer)); +if (includeAbortedTxns) { +
[GitHub] [kafka] satishd commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.
satishd commented on code in PR #13535: URL: https://github.com/apache/kafka/pull/13535#discussion_r1186813113 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -600,25 +622,208 @@ public String toString() { } } -long findHighestRemoteOffset(TopicIdPartition topicIdPartition) throws RemoteStorageException { -Optional offset = Optional.empty(); -Optional maybeLog = fetchLog.apply(topicIdPartition.topicPartition()); -if (maybeLog.isPresent()) { -UnifiedLog log = maybeLog.get(); -Option maybeLeaderEpochFileCache = log.leaderEpochCache(); -if (maybeLeaderEpochFileCache.isDefined()) { -LeaderEpochFileCache cache = maybeLeaderEpochFileCache.get(); -OptionalInt epoch = cache.latestEpoch(); -while (!offset.isPresent() && epoch.isPresent()) { -offset = remoteLogMetadataManager.highestOffsetForEpoch(topicIdPartition, epoch.getAsInt()); -epoch = cache.previousEpoch(epoch.getAsInt()); +public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) throws RemoteStorageException, IOException { +int fetchMaxBytes = remoteStorageFetchInfo.fetchMaxBytes; +TopicPartition tp = remoteStorageFetchInfo.topicPartition; +FetchRequest.PartitionData fetchInfo = remoteStorageFetchInfo.fetchInfo; + +boolean includeAbortedTxns = remoteStorageFetchInfo.fetchIsolation == FetchIsolation.TXN_COMMITTED; + +long offset = fetchInfo.fetchOffset; +int maxBytes = Math.min(fetchMaxBytes, fetchInfo.maxBytes); + +Optional logOptional = fetchLog.apply(tp); +OptionalInt epoch = OptionalInt.empty(); + +if (logOptional.isPresent()) { +Option leaderEpochCache = logOptional.get().leaderEpochCache(); +if (leaderEpochCache.isDefined()) { +epoch = leaderEpochCache.get().epochForOffset(offset); +} +} + +Optional rlsMetadataOptional = epoch.isPresent() +? fetchRemoteLogSegmentMetadata(tp, epoch.getAsInt(), offset) +: Optional.empty(); + +if (!rlsMetadataOptional.isPresent()) { +String epochStr = (epoch.isPresent()) ? Integer.toString(epoch.getAsInt()) : "NOT AVAILABLE"; +throw new OffsetOutOfRangeException("Received request for offset " + offset + " for leader epoch " ++ epochStr + " and partition " + tp + " which does not exist in remote tier."); +} + +RemoteLogSegmentMetadata remoteLogSegmentMetadata = rlsMetadataOptional.get(); +int startPos = lookupPositionForOffset(remoteLogSegmentMetadata, offset); +InputStream remoteSegInputStream = null; +try { +// Search forward for the position of the last offset that is greater than or equal to the target offset +remoteSegInputStream = remoteLogStorageManager.fetchLogSegment(remoteLogSegmentMetadata, startPos); +RemoteLogInputStream remoteLogInputStream = new RemoteLogInputStream(remoteSegInputStream); + +RecordBatch firstBatch = findFirstBatch(remoteLogInputStream, offset); + +if (firstBatch == null) +return new FetchDataInfo(new LogOffsetMetadata(offset), MemoryRecords.EMPTY, false, +includeAbortedTxns ? Optional.of(Collections.emptyList()) : Optional.empty()); + +// An empty record is sent instead of an incomplete batch when there is no minimum-one-message constraint +// and for FetchRequest version 3 and above and the first batch size is more than maximum bytes that can be sent. +int firstBatchSize = firstBatch.sizeInBytes(); +if (!remoteStorageFetchInfo.minOneMessage && +!remoteStorageFetchInfo.hardMaxBytesLimit && +firstBatchSize > maxBytes) { +return new FetchDataInfo(new LogOffsetMetadata(offset), MemoryRecords.EMPTY); +} + +int updatedFetchSize = +remoteStorageFetchInfo.minOneMessage && firstBatchSize > maxBytes ? firstBatchSize : maxBytes; + +ByteBuffer buffer = ByteBuffer.allocate(updatedFetchSize); +int remainingBytes = updatedFetchSize; + +firstBatch.writeTo(buffer); +remainingBytes -= firstBatchSize; + +if (remainingBytes > 0) { +// read the input stream until min of (EOF stream or buffer's remaining capacity). +Utils.readFully(remoteSegInputStream, buffer); +} +buffer.flip(); + +FetchDataInfo fetchDataInfo = new FetchDataInfo( +new LogOffsetMetadata(offset, remoteLogSegmentMetadata.startOffset(), startPos), +MemoryRecords.readableRecords(buffer)); +if (includeAbortedTxns) { +
[GitHub] [kafka] satishd commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.
satishd commented on code in PR #13535: URL: https://github.com/apache/kafka/pull/13535#discussion_r1186812350 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -600,6 +622,176 @@ public String toString() { } } +public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) throws RemoteStorageException, IOException { +int fetchMaxBytes = remoteStorageFetchInfo.fetchMaxBytes; +TopicPartition tp = remoteStorageFetchInfo.topicPartition; +FetchRequest.PartitionData fetchInfo = remoteStorageFetchInfo.fetchInfo; + +boolean includeAbortedTxns = remoteStorageFetchInfo.fetchIsolation == FetchIsolation.TXN_COMMITTED; + +long offset = fetchInfo.fetchOffset; +int maxBytes = Math.min(fetchMaxBytes, fetchInfo.maxBytes); + +Optional logOptional = fetchLog.apply(tp); +OptionalInt epoch = OptionalInt.empty(); + +if (logOptional.isPresent()) { +Option leaderEpochCache = logOptional.get().leaderEpochCache(); +if (leaderEpochCache.isDefined()) { +epoch = leaderEpochCache.get().epochForOffset(offset); +} +} + +Optional rlsMetadata = epoch.isPresent() +? fetchRemoteLogSegmentMetadata(tp, epoch.getAsInt(), offset) +: Optional.empty(); + +if (!rlsMetadata.isPresent()) { +String epochStr = (epoch.isPresent()) ? Integer.toString(epoch.getAsInt()) : "NOT AVAILABLE"; +throw new OffsetOutOfRangeException("Received request for offset " + offset + " for leader epoch " ++ epochStr + " and partition " + tp + " which does not exist in remote tier."); +} + +int startPos = lookupPositionForOffset(rlsMetadata.get(), offset); +InputStream remoteSegInputStream = null; +try { +// Search forward for the position of the last offset that is greater than or equal to the target offset +remoteSegInputStream = remoteLogStorageManager.fetchLogSegment(rlsMetadata.get(), startPos); +RemoteLogInputStream remoteLogInputStream = new RemoteLogInputStream(remoteSegInputStream); + +RecordBatch firstBatch = findFirstBatch(remoteLogInputStream, offset); + +if (firstBatch == null) +return new FetchDataInfo(new LogOffsetMetadata(offset), MemoryRecords.EMPTY, false, +includeAbortedTxns ? Optional.of(Collections.emptyList()) : Optional.empty()); + +int updatedFetchSize = +remoteStorageFetchInfo.minOneMessage && firstBatch.sizeInBytes() > maxBytes +? firstBatch.sizeInBytes() : maxBytes; + +ByteBuffer buffer = ByteBuffer.allocate(updatedFetchSize); +int remainingBytes = updatedFetchSize; + +firstBatch.writeTo(buffer); +remainingBytes -= firstBatch.sizeInBytes(); + +if (remainingBytes > 0) { +// input stream is read till (startPos - 1) while getting the batch of records earlier. +// read the input stream until min of (EOF stream or buffer's remaining capacity). +Utils.readFully(remoteSegInputStream, buffer); +} +buffer.flip(); + +FetchDataInfo fetchDataInfo = new FetchDataInfo(new LogOffsetMetadata(offset), MemoryRecords.readableRecords(buffer)); +if (includeAbortedTxns) { +fetchDataInfo = addAbortedTransactions(firstBatch.baseOffset(), rlsMetadata.get(), fetchDataInfo); +} + +return fetchDataInfo; +} finally { +Utils.closeQuietly(remoteSegInputStream, "RemoteLogSegmentInputStream"); +} +} + +private int lookupPositionForOffset(RemoteLogSegmentMetadata remoteLogSegmentMetadata, long offset) { +return indexCache.lookupOffset(remoteLogSegmentMetadata, offset); +} + +private FetchDataInfo addAbortedTransactions(long startOffset, + RemoteLogSegmentMetadata segmentMetadata, + FetchDataInfo fetchInfo) throws RemoteStorageException { +int fetchSize = fetchInfo.records.sizeInBytes(); +OffsetPosition startOffsetPosition = new OffsetPosition(fetchInfo.fetchOffsetMetadata.messageOffset, +fetchInfo.fetchOffsetMetadata.relativePositionInSegment); + +OffsetIndex offsetIndex = indexCache.getIndexEntry(segmentMetadata).offsetIndex(); +long upperBoundOffset = offsetIndex.fetchUpperBoundOffset(startOffsetPosition, fetchSize) +.map(x -> x.offset).orElse(segmentMetadata.endOffset() + 1); + +final List abortedTransactions = new ArrayList<>(); + +Consumer> accumulator = +abortedTxns -> abortedTransactions.addAll(abortedTxns.stream() + .map(AbortedTxn
[GitHub] [kafka] satishd commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.
satishd commented on code in PR #13535: URL: https://github.com/apache/kafka/pull/13535#discussion_r1186714886 ## core/src/main/java/kafka/log/remote/RemoteLogReader.java: ## @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.log.remote; + +import org.apache.kafka.common.errors.OffsetOutOfRangeException; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.storage.internals.log.FetchDataInfo; +import org.apache.kafka.storage.internals.log.RemoteLogReadResult; +import org.apache.kafka.storage.internals.log.RemoteStorageFetchInfo; +import org.slf4j.Logger; + +import java.util.Optional; +import java.util.concurrent.Callable; +import java.util.function.Consumer; + +public class RemoteLogReader implements Callable { +private final Logger logger; +private final RemoteStorageFetchInfo fetchInfo; +private final RemoteLogManager rlm; +private final Consumer callback; + +public RemoteLogReader(RemoteStorageFetchInfo fetchInfo, + RemoteLogManager rlm, + Consumer callback) { +this.fetchInfo = fetchInfo; +this.rlm = rlm; +this.callback = callback; +logger = new LogContext() { +@Override +public String logPrefix() { +return "[" + Thread.currentThread().getName() + "]"; +} +}.logger(RemoteLogReader.class); +} + +@Override +public Void call() { +RemoteLogReadResult result; +try { +logger.debug("Reading records from remote storage for topic partition {}", fetchInfo.topicPartition); + +FetchDataInfo fetchDataInfo = rlm.read(fetchInfo); +result = new RemoteLogReadResult(Optional.of(fetchDataInfo), Optional.empty()); +} catch (OffsetOutOfRangeException e) { +result = new RemoteLogReadResult(Optional.empty(), Optional.of(e)); +} catch (Exception e) { Review Comment: As the issue is not related to this set of changes, we can look into it later. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.
satishd commented on code in PR #13535: URL: https://github.com/apache/kafka/pull/13535#discussion_r1184873142 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -670,6 +875,14 @@ public void close() { } catch (InterruptedException e) { // ignore } +remoteStorageReaderThreadPool.shutdownNow(); +//waits for 2 mins to terminate the current tasks +try { +remoteStorageReaderThreadPool.awaitTermination(2, TimeUnit.MINUTES); Review Comment: That code `lifecycleManager.controlledShutdownFuture` is more about processing the controlled shutdown event to the controller for that broker. It will wait for 5 mins before proceeding with other sequence of actions. But that will not get affected because of the code introduced here. Logging subsystem handles unclean shutdown for log segments and it would have been already finished before RemoteLogManager is closed. So, they will not get affected because of this timeout. But we can have a short duration here like 10 secs, we can revisit introducing a config if it is really needed for closing the remote log subsystem. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.
satishd commented on code in PR #13535: URL: https://github.com/apache/kafka/pull/13535#discussion_r1181519032 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -600,25 +622,208 @@ public String toString() { } } -long findHighestRemoteOffset(TopicIdPartition topicIdPartition) throws RemoteStorageException { -Optional offset = Optional.empty(); -Optional maybeLog = fetchLog.apply(topicIdPartition.topicPartition()); -if (maybeLog.isPresent()) { -UnifiedLog log = maybeLog.get(); -Option maybeLeaderEpochFileCache = log.leaderEpochCache(); -if (maybeLeaderEpochFileCache.isDefined()) { -LeaderEpochFileCache cache = maybeLeaderEpochFileCache.get(); -OptionalInt epoch = cache.latestEpoch(); -while (!offset.isPresent() && epoch.isPresent()) { -offset = remoteLogMetadataManager.highestOffsetForEpoch(topicIdPartition, epoch.getAsInt()); -epoch = cache.previousEpoch(epoch.getAsInt()); +public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) throws RemoteStorageException, IOException { +int fetchMaxBytes = remoteStorageFetchInfo.fetchMaxBytes; +TopicPartition tp = remoteStorageFetchInfo.topicPartition; +FetchRequest.PartitionData fetchInfo = remoteStorageFetchInfo.fetchInfo; + +boolean includeAbortedTxns = remoteStorageFetchInfo.fetchIsolation == FetchIsolation.TXN_COMMITTED; + +long offset = fetchInfo.fetchOffset; +int maxBytes = Math.min(fetchMaxBytes, fetchInfo.maxBytes); + +Optional logOptional = fetchLog.apply(tp); +OptionalInt epoch = OptionalInt.empty(); + +if (logOptional.isPresent()) { +Option leaderEpochCache = logOptional.get().leaderEpochCache(); +if (leaderEpochCache.isDefined()) { +epoch = leaderEpochCache.get().epochForOffset(offset); +} +} + +Optional rlsMetadataOptional = epoch.isPresent() +? fetchRemoteLogSegmentMetadata(tp, epoch.getAsInt(), offset) +: Optional.empty(); + +if (!rlsMetadataOptional.isPresent()) { +String epochStr = (epoch.isPresent()) ? Integer.toString(epoch.getAsInt()) : "NOT AVAILABLE"; +throw new OffsetOutOfRangeException("Received request for offset " + offset + " for leader epoch " ++ epochStr + " and partition " + tp + " which does not exist in remote tier."); +} + +RemoteLogSegmentMetadata remoteLogSegmentMetadata = rlsMetadataOptional.get(); +int startPos = lookupPositionForOffset(remoteLogSegmentMetadata, offset); +InputStream remoteSegInputStream = null; +try { +// Search forward for the position of the last offset that is greater than or equal to the target offset +remoteSegInputStream = remoteLogStorageManager.fetchLogSegment(remoteLogSegmentMetadata, startPos); +RemoteLogInputStream remoteLogInputStream = new RemoteLogInputStream(remoteSegInputStream); + +RecordBatch firstBatch = findFirstBatch(remoteLogInputStream, offset); + +if (firstBatch == null) +return new FetchDataInfo(new LogOffsetMetadata(offset), MemoryRecords.EMPTY, false, +includeAbortedTxns ? Optional.of(Collections.emptyList()) : Optional.empty()); + +// An empty record is sent instead of an incomplete batch when there is no minimum-one-message constraint +// and for FetchRequest version 3 and above and the first batch size is more than maximum bytes that can be sent. +if (!remoteStorageFetchInfo.minOneMessage && +!remoteStorageFetchInfo.hardMaxBytesLimit && +firstBatch.sizeInBytes() > maxBytes) { Review Comment: After the recent change of extracting `firstBatch.sizeInBytes()`, the existing code looks simpler. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.
satishd commented on code in PR #13535: URL: https://github.com/apache/kafka/pull/13535#discussion_r1181518738 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -600,25 +622,208 @@ public String toString() { } } -long findHighestRemoteOffset(TopicIdPartition topicIdPartition) throws RemoteStorageException { -Optional offset = Optional.empty(); -Optional maybeLog = fetchLog.apply(topicIdPartition.topicPartition()); -if (maybeLog.isPresent()) { -UnifiedLog log = maybeLog.get(); -Option maybeLeaderEpochFileCache = log.leaderEpochCache(); -if (maybeLeaderEpochFileCache.isDefined()) { -LeaderEpochFileCache cache = maybeLeaderEpochFileCache.get(); -OptionalInt epoch = cache.latestEpoch(); -while (!offset.isPresent() && epoch.isPresent()) { -offset = remoteLogMetadataManager.highestOffsetForEpoch(topicIdPartition, epoch.getAsInt()); -epoch = cache.previousEpoch(epoch.getAsInt()); +public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) throws RemoteStorageException, IOException { +int fetchMaxBytes = remoteStorageFetchInfo.fetchMaxBytes; +TopicPartition tp = remoteStorageFetchInfo.topicPartition; +FetchRequest.PartitionData fetchInfo = remoteStorageFetchInfo.fetchInfo; + +boolean includeAbortedTxns = remoteStorageFetchInfo.fetchIsolation == FetchIsolation.TXN_COMMITTED; + +long offset = fetchInfo.fetchOffset; +int maxBytes = Math.min(fetchMaxBytes, fetchInfo.maxBytes); + +Optional logOptional = fetchLog.apply(tp); +OptionalInt epoch = OptionalInt.empty(); + +if (logOptional.isPresent()) { +Option leaderEpochCache = logOptional.get().leaderEpochCache(); +if (leaderEpochCache.isDefined()) { +epoch = leaderEpochCache.get().epochForOffset(offset); +} +} + +Optional rlsMetadataOptional = epoch.isPresent() +? fetchRemoteLogSegmentMetadata(tp, epoch.getAsInt(), offset) +: Optional.empty(); + +if (!rlsMetadataOptional.isPresent()) { +String epochStr = (epoch.isPresent()) ? Integer.toString(epoch.getAsInt()) : "NOT AVAILABLE"; +throw new OffsetOutOfRangeException("Received request for offset " + offset + " for leader epoch " ++ epochStr + " and partition " + tp + " which does not exist in remote tier."); +} + +RemoteLogSegmentMetadata remoteLogSegmentMetadata = rlsMetadataOptional.get(); +int startPos = lookupPositionForOffset(remoteLogSegmentMetadata, offset); +InputStream remoteSegInputStream = null; +try { +// Search forward for the position of the last offset that is greater than or equal to the target offset +remoteSegInputStream = remoteLogStorageManager.fetchLogSegment(remoteLogSegmentMetadata, startPos); +RemoteLogInputStream remoteLogInputStream = new RemoteLogInputStream(remoteSegInputStream); Review Comment: Sure, we can definitely look into that. We did not see much of GC issues remote read throughputs ~750 MBps on a broker but there are plans to improve by exploring buffer pool mechanisms(variations of pool used in producers). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.
satishd commented on code in PR #13535: URL: https://github.com/apache/kafka/pull/13535#discussion_r1181518576 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -1160,48 +1171,100 @@ class ReplicaManager(val config: KafkaConfig, fetchPartitionStatus += (topicIdPartition -> FetchPartitionStatus(logOffsetMetadata, partitionData)) }) } - val delayedFetch = new DelayedFetch( -params = params, -fetchPartitionStatus = fetchPartitionStatus, -replicaManager = this, -quota = quota, -responseCallback = responseCallback - ) - - // create a list of (topic, partition) pairs to use as keys for this delayed fetch operation - val delayedFetchKeys = fetchPartitionStatus.map { case (tp, _) => TopicPartitionOperationKey(tp) } - - // try to complete the request immediately, otherwise put it into the purgatory; - // this is because while the delayed fetch operation is being created, new requests - // may arrive and hence make this operation completable. - delayedFetchPurgatory.tryCompleteElseWatch(delayedFetch, delayedFetchKeys) + + if (remoteFetchInfo.isPresent) { +val key = new TopicPartitionOperationKey(remoteFetchInfo.get.topicPartition.topic(), remoteFetchInfo.get.topicPartition.partition()) +val remoteFetchResult = new CompletableFuture[RemoteLogReadResult] +var remoteFetchTask: Future[Void] = null +try { + remoteFetchTask = remoteLogManager.get.asyncRead(remoteFetchInfo.get, (result: RemoteLogReadResult) => { +remoteFetchResult.complete(result) +delayedRemoteFetchPurgatory.checkAndComplete(key) + }) +} catch { + // if the task queue of remote storage reader thread pool is full, return what we currently have + // (the data read from local log segment for the other topic-partitions) and an error for the topic-partition that + // we couldn't read from remote storage + case e: RejectedExecutionException => +val fetchPartitionData = logReadResults.map { case (tp, result) => + val r = { +if (tp.topicPartition().equals(remoteFetchInfo.get.topicPartition)) + createLogReadResult(e) Review Comment: This error is [propagated](https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala#L86) as unexpected error (UnknownServerException) to the consumer client and it is already handled. ## core/src/main/scala/kafka/server/DelayedRemoteFetch.scala: ## @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import org.apache.kafka.common.TopicIdPartition +import org.apache.kafka.common.errors._ +import org.apache.kafka.storage.internals.log.{FetchParams, FetchPartitionData, LogOffsetMetadata, RemoteLogReadResult, RemoteStorageFetchInfo} + +import java.util.concurrent.{CompletableFuture, Future} +import java.util.{Optional, OptionalInt, OptionalLong} +import scala.collection._ + +/** + * A remote fetch operation that can be created by the replica manager and watched + * in the remote fetch operation purgatory + */ +class DelayedRemoteFetch(remoteFetchTask: Future[Void], + remoteFetchResult: CompletableFuture[RemoteLogReadResult], + remoteFetchInfo: RemoteStorageFetchInfo, + fetchPartitionStatus: Seq[(TopicIdPartition, FetchPartitionStatus)], + fetchParams: FetchParams, + localReadResults: Seq[(TopicIdPartition, LogReadResult)], + replicaManager: ReplicaManager, + responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit) + extends DelayedOperation(fetchParams.maxWaitMs) { + + /** + * The operation can be completed if: + * + * Case a: This broker is no longer the leader of the partition it tries to fetch + * Case b: This broker does not know the partition it tries to fetch + * Case c: The remote storage read request completed (succeeded or failed) + * Case d
[GitHub] [kafka] satishd commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.
satishd commented on code in PR #13535: URL: https://github.com/apache/kafka/pull/13535#discussion_r1172535684 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -600,6 +622,176 @@ public String toString() { } } +public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) throws RemoteStorageException, IOException { +int fetchMaxBytes = remoteStorageFetchInfo.fetchMaxBytes; +TopicPartition tp = remoteStorageFetchInfo.topicPartition; +FetchRequest.PartitionData fetchInfo = remoteStorageFetchInfo.fetchInfo; + +boolean includeAbortedTxns = remoteStorageFetchInfo.fetchIsolation == FetchIsolation.TXN_COMMITTED; + +long offset = fetchInfo.fetchOffset; +int maxBytes = Math.min(fetchMaxBytes, fetchInfo.maxBytes); + +Optional logOptional = fetchLog.apply(tp); +OptionalInt epoch = OptionalInt.empty(); + +if (logOptional.isPresent()) { +Option leaderEpochCache = logOptional.get().leaderEpochCache(); +if (leaderEpochCache.isDefined()) { +epoch = leaderEpochCache.get().epochForOffset(offset); +} +} + +Optional rlsMetadata = epoch.isPresent() +? fetchRemoteLogSegmentMetadata(tp, epoch.getAsInt(), offset) +: Optional.empty(); + +if (!rlsMetadata.isPresent()) { +String epochStr = (epoch.isPresent()) ? Integer.toString(epoch.getAsInt()) : "NOT AVAILABLE"; +throw new OffsetOutOfRangeException("Received request for offset " + offset + " for leader epoch " ++ epochStr + " and partition " + tp + " which does not exist in remote tier."); +} + +int startPos = lookupPositionForOffset(rlsMetadata.get(), offset); +InputStream remoteSegInputStream = null; +try { +// Search forward for the position of the last offset that is greater than or equal to the target offset +remoteSegInputStream = remoteLogStorageManager.fetchLogSegment(rlsMetadata.get(), startPos); +RemoteLogInputStream remoteLogInputStream = new RemoteLogInputStream(remoteSegInputStream); + +RecordBatch firstBatch = findFirstBatch(remoteLogInputStream, offset); + +if (firstBatch == null) +return new FetchDataInfo(new LogOffsetMetadata(offset), MemoryRecords.EMPTY, false, +includeAbortedTxns ? Optional.of(Collections.emptyList()) : Optional.empty()); + +int updatedFetchSize = Review Comment: There is no risk here but it is good to be consistent with the local read pattern to return empty records for that case. Updated with the latest commit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.
satishd commented on code in PR #13535: URL: https://github.com/apache/kafka/pull/13535#discussion_r1171245421 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -600,6 +622,176 @@ public String toString() { } } +public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) throws RemoteStorageException, IOException { +int fetchMaxBytes = remoteStorageFetchInfo.fetchMaxBytes; +TopicPartition tp = remoteStorageFetchInfo.topicPartition; +FetchRequest.PartitionData fetchInfo = remoteStorageFetchInfo.fetchInfo; + +boolean includeAbortedTxns = remoteStorageFetchInfo.fetchIsolation == FetchIsolation.TXN_COMMITTED; + +long offset = fetchInfo.fetchOffset; +int maxBytes = Math.min(fetchMaxBytes, fetchInfo.maxBytes); + +Optional logOptional = fetchLog.apply(tp); +OptionalInt epoch = OptionalInt.empty(); + +if (logOptional.isPresent()) { +Option leaderEpochCache = logOptional.get().leaderEpochCache(); +if (leaderEpochCache.isDefined()) { +epoch = leaderEpochCache.get().epochForOffset(offset); +} +} + +Optional rlsMetadata = epoch.isPresent() +? fetchRemoteLogSegmentMetadata(tp, epoch.getAsInt(), offset) +: Optional.empty(); + +if (!rlsMetadata.isPresent()) { +String epochStr = (epoch.isPresent()) ? Integer.toString(epoch.getAsInt()) : "NOT AVAILABLE"; +throw new OffsetOutOfRangeException("Received request for offset " + offset + " for leader epoch " ++ epochStr + " and partition " + tp + " which does not exist in remote tier."); +} + +int startPos = lookupPositionForOffset(rlsMetadata.get(), offset); +InputStream remoteSegInputStream = null; +try { +// Search forward for the position of the last offset that is greater than or equal to the target offset +remoteSegInputStream = remoteLogStorageManager.fetchLogSegment(rlsMetadata.get(), startPos); +RemoteLogInputStream remoteLogInputStream = new RemoteLogInputStream(remoteSegInputStream); + +RecordBatch firstBatch = findFirstBatch(remoteLogInputStream, offset); + +if (firstBatch == null) +return new FetchDataInfo(new LogOffsetMetadata(offset), MemoryRecords.EMPTY, false, +includeAbortedTxns ? Optional.of(Collections.emptyList()) : Optional.empty()); + +int updatedFetchSize = Review Comment: Good point. There is no risk here but it is good to be consistent with the local read pattern to return empty records for that case, will update with the changes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.
satishd commented on code in PR #13535: URL: https://github.com/apache/kafka/pull/13535#discussion_r1171245421 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -600,6 +622,176 @@ public String toString() { } } +public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) throws RemoteStorageException, IOException { +int fetchMaxBytes = remoteStorageFetchInfo.fetchMaxBytes; +TopicPartition tp = remoteStorageFetchInfo.topicPartition; +FetchRequest.PartitionData fetchInfo = remoteStorageFetchInfo.fetchInfo; + +boolean includeAbortedTxns = remoteStorageFetchInfo.fetchIsolation == FetchIsolation.TXN_COMMITTED; + +long offset = fetchInfo.fetchOffset; +int maxBytes = Math.min(fetchMaxBytes, fetchInfo.maxBytes); + +Optional logOptional = fetchLog.apply(tp); +OptionalInt epoch = OptionalInt.empty(); + +if (logOptional.isPresent()) { +Option leaderEpochCache = logOptional.get().leaderEpochCache(); +if (leaderEpochCache.isDefined()) { +epoch = leaderEpochCache.get().epochForOffset(offset); +} +} + +Optional rlsMetadata = epoch.isPresent() +? fetchRemoteLogSegmentMetadata(tp, epoch.getAsInt(), offset) +: Optional.empty(); + +if (!rlsMetadata.isPresent()) { +String epochStr = (epoch.isPresent()) ? Integer.toString(epoch.getAsInt()) : "NOT AVAILABLE"; +throw new OffsetOutOfRangeException("Received request for offset " + offset + " for leader epoch " ++ epochStr + " and partition " + tp + " which does not exist in remote tier."); +} + +int startPos = lookupPositionForOffset(rlsMetadata.get(), offset); +InputStream remoteSegInputStream = null; +try { +// Search forward for the position of the last offset that is greater than or equal to the target offset +remoteSegInputStream = remoteLogStorageManager.fetchLogSegment(rlsMetadata.get(), startPos); +RemoteLogInputStream remoteLogInputStream = new RemoteLogInputStream(remoteSegInputStream); + +RecordBatch firstBatch = findFirstBatch(remoteLogInputStream, offset); + +if (firstBatch == null) +return new FetchDataInfo(new LogOffsetMetadata(offset), MemoryRecords.EMPTY, false, +includeAbortedTxns ? Optional.of(Collections.emptyList()) : Optional.empty()); + +int updatedFetchSize = Review Comment: Good point. There is no risk here but it is good to be consistent with the local read pattern to return empty records for that case, will update with the changes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.
satishd commented on code in PR #13535: URL: https://github.com/apache/kafka/pull/13535#discussion_r1171250580 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -1118,9 +1122,13 @@ class ReplicaManager(val config: KafkaConfig, responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit ): Unit = { // check if this fetch request can be satisfied right away -val logReadResults = readFromLocalLog(params, fetchInfos, quota, readFromPurgatory = false) +val logReadResults = readFromLog(params, fetchInfos, quota, readFromPurgatory = false) var bytesReadable: Long = 0 var errorReadingData = false + +// The 1st topic-partition that has to be read from remote storage +var remoteFetchInfo: Optional[RemoteStorageFetchInfo] = Optional.empty() Review Comment: As I already called out in this PR description, that it is followed up with a PR. We will describe the config on different options with respective scenarios. The default value will be to fetch from multiple partitions as it does with local log segments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.
satishd commented on code in PR #13535: URL: https://github.com/apache/kafka/pull/13535#discussion_r1171246205 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -600,6 +622,176 @@ public String toString() { } } +public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) throws RemoteStorageException, IOException { +int fetchMaxBytes = remoteStorageFetchInfo.fetchMaxBytes; +TopicPartition tp = remoteStorageFetchInfo.topicPartition; +FetchRequest.PartitionData fetchInfo = remoteStorageFetchInfo.fetchInfo; + +boolean includeAbortedTxns = remoteStorageFetchInfo.fetchIsolation == FetchIsolation.TXN_COMMITTED; + +long offset = fetchInfo.fetchOffset; +int maxBytes = Math.min(fetchMaxBytes, fetchInfo.maxBytes); + +Optional logOptional = fetchLog.apply(tp); +OptionalInt epoch = OptionalInt.empty(); + +if (logOptional.isPresent()) { +Option leaderEpochCache = logOptional.get().leaderEpochCache(); +if (leaderEpochCache.isDefined()) { +epoch = leaderEpochCache.get().epochForOffset(offset); +} +} + +Optional rlsMetadata = epoch.isPresent() +? fetchRemoteLogSegmentMetadata(tp, epoch.getAsInt(), offset) +: Optional.empty(); + +if (!rlsMetadata.isPresent()) { +String epochStr = (epoch.isPresent()) ? Integer.toString(epoch.getAsInt()) : "NOT AVAILABLE"; +throw new OffsetOutOfRangeException("Received request for offset " + offset + " for leader epoch " ++ epochStr + " and partition " + tp + " which does not exist in remote tier."); +} + +int startPos = lookupPositionForOffset(rlsMetadata.get(), offset); +InputStream remoteSegInputStream = null; +try { +// Search forward for the position of the last offset that is greater than or equal to the target offset +remoteSegInputStream = remoteLogStorageManager.fetchLogSegment(rlsMetadata.get(), startPos); Review Comment: We will look into it in a followup PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.
satishd commented on code in PR #13535: URL: https://github.com/apache/kafka/pull/13535#discussion_r1171245421 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -600,6 +622,176 @@ public String toString() { } } +public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) throws RemoteStorageException, IOException { +int fetchMaxBytes = remoteStorageFetchInfo.fetchMaxBytes; +TopicPartition tp = remoteStorageFetchInfo.topicPartition; +FetchRequest.PartitionData fetchInfo = remoteStorageFetchInfo.fetchInfo; + +boolean includeAbortedTxns = remoteStorageFetchInfo.fetchIsolation == FetchIsolation.TXN_COMMITTED; + +long offset = fetchInfo.fetchOffset; +int maxBytes = Math.min(fetchMaxBytes, fetchInfo.maxBytes); + +Optional logOptional = fetchLog.apply(tp); +OptionalInt epoch = OptionalInt.empty(); + +if (logOptional.isPresent()) { +Option leaderEpochCache = logOptional.get().leaderEpochCache(); +if (leaderEpochCache.isDefined()) { +epoch = leaderEpochCache.get().epochForOffset(offset); +} +} + +Optional rlsMetadata = epoch.isPresent() +? fetchRemoteLogSegmentMetadata(tp, epoch.getAsInt(), offset) +: Optional.empty(); + +if (!rlsMetadata.isPresent()) { +String epochStr = (epoch.isPresent()) ? Integer.toString(epoch.getAsInt()) : "NOT AVAILABLE"; +throw new OffsetOutOfRangeException("Received request for offset " + offset + " for leader epoch " ++ epochStr + " and partition " + tp + " which does not exist in remote tier."); +} + +int startPos = lookupPositionForOffset(rlsMetadata.get(), offset); +InputStream remoteSegInputStream = null; +try { +// Search forward for the position of the last offset that is greater than or equal to the target offset +remoteSegInputStream = remoteLogStorageManager.fetchLogSegment(rlsMetadata.get(), startPos); +RemoteLogInputStream remoteLogInputStream = new RemoteLogInputStream(remoteSegInputStream); + +RecordBatch firstBatch = findFirstBatch(remoteLogInputStream, offset); + +if (firstBatch == null) +return new FetchDataInfo(new LogOffsetMetadata(offset), MemoryRecords.EMPTY, false, +includeAbortedTxns ? Optional.of(Collections.emptyList()) : Optional.empty()); + +int updatedFetchSize = Review Comment: Good point. There is no risk here but it is good to be consistent with the local read pattern to return empty records for that case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.
satishd commented on code in PR #13535: URL: https://github.com/apache/kafka/pull/13535#discussion_r1171242181 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -600,6 +622,176 @@ public String toString() { } } +public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) throws RemoteStorageException, IOException { +int fetchMaxBytes = remoteStorageFetchInfo.fetchMaxBytes; +TopicPartition tp = remoteStorageFetchInfo.topicPartition; +FetchRequest.PartitionData fetchInfo = remoteStorageFetchInfo.fetchInfo; + +boolean includeAbortedTxns = remoteStorageFetchInfo.fetchIsolation == FetchIsolation.TXN_COMMITTED; + +long offset = fetchInfo.fetchOffset; +int maxBytes = Math.min(fetchMaxBytes, fetchInfo.maxBytes); + +Optional logOptional = fetchLog.apply(tp); +OptionalInt epoch = OptionalInt.empty(); + +if (logOptional.isPresent()) { +Option leaderEpochCache = logOptional.get().leaderEpochCache(); +if (leaderEpochCache.isDefined()) { +epoch = leaderEpochCache.get().epochForOffset(offset); +} +} + +Optional rlsMetadata = epoch.isPresent() +? fetchRemoteLogSegmentMetadata(tp, epoch.getAsInt(), offset) +: Optional.empty(); + +if (!rlsMetadata.isPresent()) { +String epochStr = (epoch.isPresent()) ? Integer.toString(epoch.getAsInt()) : "NOT AVAILABLE"; +throw new OffsetOutOfRangeException("Received request for offset " + offset + " for leader epoch " ++ epochStr + " and partition " + tp + " which does not exist in remote tier."); +} + +int startPos = lookupPositionForOffset(rlsMetadata.get(), offset); +InputStream remoteSegInputStream = null; +try { +// Search forward for the position of the last offset that is greater than or equal to the target offset +remoteSegInputStream = remoteLogStorageManager.fetchLogSegment(rlsMetadata.get(), startPos); +RemoteLogInputStream remoteLogInputStream = new RemoteLogInputStream(remoteSegInputStream); + +RecordBatch firstBatch = findFirstBatch(remoteLogInputStream, offset); + +if (firstBatch == null) +return new FetchDataInfo(new LogOffsetMetadata(offset), MemoryRecords.EMPTY, false, +includeAbortedTxns ? Optional.of(Collections.emptyList()) : Optional.empty()); + +int updatedFetchSize = +remoteStorageFetchInfo.minOneMessage && firstBatch.sizeInBytes() > maxBytes +? firstBatch.sizeInBytes() : maxBytes; + +ByteBuffer buffer = ByteBuffer.allocate(updatedFetchSize); +int remainingBytes = updatedFetchSize; + +firstBatch.writeTo(buffer); +remainingBytes -= firstBatch.sizeInBytes(); + +if (remainingBytes > 0) { +// input stream is read till (startPos - 1) while getting the batch of records earlier. +// read the input stream until min of (EOF stream or buffer's remaining capacity). +Utils.readFully(remoteSegInputStream, buffer); +} +buffer.flip(); + +FetchDataInfo fetchDataInfo = new FetchDataInfo(new LogOffsetMetadata(offset), MemoryRecords.readableRecords(buffer)); +if (includeAbortedTxns) { +fetchDataInfo = addAbortedTransactions(firstBatch.baseOffset(), rlsMetadata.get(), fetchDataInfo); +} + +return fetchDataInfo; +} finally { +Utils.closeQuietly(remoteSegInputStream, "RemoteLogSegmentInputStream"); +} +} + +private int lookupPositionForOffset(RemoteLogSegmentMetadata remoteLogSegmentMetadata, long offset) { +return indexCache.lookupOffset(remoteLogSegmentMetadata, offset); +} + +private FetchDataInfo addAbortedTransactions(long startOffset, + RemoteLogSegmentMetadata segmentMetadata, + FetchDataInfo fetchInfo) throws RemoteStorageException { +int fetchSize = fetchInfo.records.sizeInBytes(); +OffsetPosition startOffsetPosition = new OffsetPosition(fetchInfo.fetchOffsetMetadata.messageOffset, +fetchInfo.fetchOffsetMetadata.relativePositionInSegment); + +OffsetIndex offsetIndex = indexCache.getIndexEntry(segmentMetadata).offsetIndex(); +long upperBoundOffset = offsetIndex.fetchUpperBoundOffset(startOffsetPosition, fetchSize) +.map(x -> x.offset).orElse(segmentMetadata.endOffset() + 1); + +final List abortedTransactions = new ArrayList<>(); + +Consumer> accumulator = +abortedTxns -> abortedTransactions.addAll(abortedTxns.stream() + .map(AbortedTxn
[GitHub] [kafka] satishd commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.
satishd commented on code in PR #13535: URL: https://github.com/apache/kafka/pull/13535#discussion_r1171240495 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -1273,17 +1328,45 @@ class ReplicaManager(val config: KafkaConfig, _: FencedLeaderEpochException | _: ReplicaNotAvailableException | _: KafkaStorageException | - _: OffsetOutOfRangeException | _: InconsistentTopicIdException) => - LogReadResult(info = new FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY), -divergingEpoch = None, -highWatermark = UnifiedLog.UnknownOffset, -leaderLogStartOffset = UnifiedLog.UnknownOffset, -leaderLogEndOffset = UnifiedLog.UnknownOffset, -followerLogStartOffset = UnifiedLog.UnknownOffset, -fetchTimeMs = -1L, -lastStableOffset = None, -exception = Some(e)) + createLogReadResult(e) +case e: OffsetOutOfRangeException => Review Comment: Yes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.
satishd commented on code in PR #13535: URL: https://github.com/apache/kafka/pull/13535#discussion_r1170269949 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -1083,48 +1095,100 @@ class ReplicaManager(val config: KafkaConfig, fetchPartitionStatus += (topicIdPartition -> FetchPartitionStatus(logOffsetMetadata, partitionData)) }) } - val delayedFetch = new DelayedFetch( -params = params, -fetchPartitionStatus = fetchPartitionStatus, -replicaManager = this, -quota = quota, -responseCallback = responseCallback - ) - - // create a list of (topic, partition) pairs to use as keys for this delayed fetch operation - val delayedFetchKeys = fetchPartitionStatus.map { case (tp, _) => TopicPartitionOperationKey(tp) } - - // try to complete the request immediately, otherwise put it into the purgatory; - // this is because while the delayed fetch operation is being created, new requests - // may arrive and hence make this operation completable. - delayedFetchPurgatory.tryCompleteElseWatch(delayedFetch, delayedFetchKeys) + + if (remoteFetchInfo.isPresent) { Review Comment: I am not sure line num:1082 is sane as you meant it to be as the file could have been updated. Please clarify. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.
satishd commented on code in PR #13535: URL: https://github.com/apache/kafka/pull/13535#discussion_r1170273242 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -600,6 +622,176 @@ public String toString() { } } +public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) throws RemoteStorageException, IOException { +int fetchMaxBytes = remoteStorageFetchInfo.fetchMaxBytes; +TopicPartition tp = remoteStorageFetchInfo.topicPartition; +FetchRequest.PartitionData fetchInfo = remoteStorageFetchInfo.fetchInfo; + +boolean includeAbortedTxns = remoteStorageFetchInfo.fetchIsolation == FetchIsolation.TXN_COMMITTED; + +long offset = fetchInfo.fetchOffset; +int maxBytes = Math.min(fetchMaxBytes, fetchInfo.maxBytes); + +Optional logOptional = fetchLog.apply(tp); +OptionalInt epoch = OptionalInt.empty(); + +if (logOptional.isPresent()) { +Option leaderEpochCache = logOptional.get().leaderEpochCache(); +if (leaderEpochCache.isDefined()) { +epoch = leaderEpochCache.get().epochForOffset(offset); +} +} + +Optional rlsMetadata = epoch.isPresent() +? fetchRemoteLogSegmentMetadata(tp, epoch.getAsInt(), offset) +: Optional.empty(); + +if (!rlsMetadata.isPresent()) { +String epochStr = (epoch.isPresent()) ? Integer.toString(epoch.getAsInt()) : "NOT AVAILABLE"; +throw new OffsetOutOfRangeException("Received request for offset " + offset + " for leader epoch " ++ epochStr + " and partition " + tp + " which does not exist in remote tier."); +} + +int startPos = lookupPositionForOffset(rlsMetadata.get(), offset); +InputStream remoteSegInputStream = null; +try { +// Search forward for the position of the last offset that is greater than or equal to the target offset +remoteSegInputStream = remoteLogStorageManager.fetchLogSegment(rlsMetadata.get(), startPos); +RemoteLogInputStream remoteLogInputStream = new RemoteLogInputStream(remoteSegInputStream); + +RecordBatch firstBatch = findFirstBatch(remoteLogInputStream, offset); + +if (firstBatch == null) +return new FetchDataInfo(new LogOffsetMetadata(offset), MemoryRecords.EMPTY, false, +includeAbortedTxns ? Optional.of(Collections.emptyList()) : Optional.empty()); + +int updatedFetchSize = +remoteStorageFetchInfo.minOneMessage && firstBatch.sizeInBytes() > maxBytes +? firstBatch.sizeInBytes() : maxBytes; + +ByteBuffer buffer = ByteBuffer.allocate(updatedFetchSize); +int remainingBytes = updatedFetchSize; + +firstBatch.writeTo(buffer); +remainingBytes -= firstBatch.sizeInBytes(); + +if (remainingBytes > 0) { +// input stream is read till (startPos - 1) while getting the batch of records earlier. +// read the input stream until min of (EOF stream or buffer's remaining capacity). +Utils.readFully(remoteSegInputStream, buffer); +} +buffer.flip(); + +FetchDataInfo fetchDataInfo = new FetchDataInfo(new LogOffsetMetadata(offset), MemoryRecords.readableRecords(buffer)); Review Comment: Good catch, addressed it in the latest commit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.
satishd commented on code in PR #13535: URL: https://github.com/apache/kafka/pull/13535#discussion_r1168261768 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -1243,6 +1327,33 @@ class ReplicaManager(val config: KafkaConfig, result } + def createLogReadResult(highWatermark: Long, Review Comment: `createLogReadResult(e: Throwable)` can not be private as it is used in `DelayedRemoteFetch`. But this method can be used. It is going to be used in test classes that we are going to add in this PR or followup PR. ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -600,6 +622,176 @@ public String toString() { } } +public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) throws RemoteStorageException, IOException { +int fetchMaxBytes = remoteStorageFetchInfo.fetchMaxBytes; +TopicPartition tp = remoteStorageFetchInfo.topicPartition; +FetchRequest.PartitionData fetchInfo = remoteStorageFetchInfo.fetchInfo; + +boolean includeAbortedTxns = remoteStorageFetchInfo.fetchIsolation == FetchIsolation.TXN_COMMITTED; + +long offset = fetchInfo.fetchOffset; +int maxBytes = Math.min(fetchMaxBytes, fetchInfo.maxBytes); + +Optional logOptional = fetchLog.apply(tp); +OptionalInt epoch = OptionalInt.empty(); Review Comment: afaik, `lastFetchedEpoch` is the epoch of the last fetched record. That can be different from the fetch offset’s epoch. We should find the respective epoch for the target offset and use that to find the remote log segment metadata. ## core/src/main/java/kafka/log/remote/RemoteLogReader.java: ## @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.log.remote; Review Comment: `RemoteLogReader` can not be moved to storage module as it currently depends on `RemoteLogManager`. I will move along with `RemoteLogManager` later. `RemoteLogReadResult` and `RemoteStorageThreadPool` are moved to storage module. ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -1083,48 +1095,100 @@ class ReplicaManager(val config: KafkaConfig, fetchPartitionStatus += (topicIdPartition -> FetchPartitionStatus(logOffsetMetadata, partitionData)) }) } - val delayedFetch = new DelayedFetch( -params = params, -fetchPartitionStatus = fetchPartitionStatus, -replicaManager = this, -quota = quota, -responseCallback = responseCallback - ) - - // create a list of (topic, partition) pairs to use as keys for this delayed fetch operation - val delayedFetchKeys = fetchPartitionStatus.map { case (tp, _) => TopicPartitionOperationKey(tp) } - - // try to complete the request immediately, otherwise put it into the purgatory; - // this is because while the delayed fetch operation is being created, new requests - // may arrive and hence make this operation completable. - delayedFetchPurgatory.tryCompleteElseWatch(delayedFetch, delayedFetchKeys) + + if (remoteFetchInfo.isPresent) { Review Comment: I did not understand the comment here. ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -600,6 +622,176 @@ public String toString() { } } +public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) throws RemoteStorageException, IOException { +int fetchMaxBytes = remoteStorageFetchInfo.fetchMaxBytes; +TopicPartition tp = remoteStorageFetchInfo.topicPartition; +FetchRequest.PartitionData fetchInfo = remoteStorageFetchInfo.fetchInfo; + +boolean includeAbortedTxns = remoteStorageFetchInfo.fetchIsolation == FetchIsolation.TXN_COMMITTED; + +long offset = fetchInfo.fetchOffset; +int maxBytes = Math.min(fetchMaxBytes, fetchInfo.maxBytes); + +Optional logOptional = fetchLog.apply(tp); +OptionalInt epoch = OptionalInt.empty(); + +if (logOptional.isPresent()) { +Option leaderEpochCache = logOptional.get().leaderEpochCache(); +if (leaderEpochCache.isDef