kamalcph commented on code in PR #16653: URL: https://github.com/apache/kafka/pull/16653#discussion_r1688147139
########## core/src/main/scala/kafka/server/ReplicaManager.scala: ########## @@ -1749,7 +1749,7 @@ class ReplicaManager(val config: KafkaConfig, // 1) remote log manager is enabled and it is available // 2) `log` instance should not be null here as that would have been caught earlier with NotLeaderForPartitionException or ReplicaNotAvailableException. // 3) fetch offset is within the offset range of the remote storage layer - if (remoteLogManager.isDefined && log != null && log.remoteLogEnabled() && + if (remoteLogManager.isDefined && log != null && log.remoteLogEnabledOrRetainPolicy() && Review Comment: Thanks for handling the empty follower joining the ISR case during the disablement! ########## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ########## @@ -512,12 +531,17 @@ public void stopPartitions(Set<StopPartition> stopPartitions, .filter(sp -> sp.deleteLocalLog() && topicIdByPartitionMap.containsKey(sp.topicPartition())) .map(sp -> new TopicIdPartition(topicIdByPartitionMap.get(sp.topicPartition()), sp.topicPartition())) .collect(Collectors.toSet()); + Review Comment: Can we update the comment in L529? It is no longer true with topic disablement and can confuse the reader: ``` // Note `deleteLocalLog` will always be true when `deleteRemoteLog` is true but not the other way around. ``` ########## core/src/main/scala/kafka/log/UnifiedLog.scala: ########## @@ -183,17 +184,18 @@ class UnifiedLog(@volatile var logStartOffset: Long, } def updateLogStartOffsetFromRemoteTier(remoteLogStartOffset: Long): Unit = { - if (!remoteLogEnabled()) { - error("Ignoring the call as the remote log storage is disabled") - return - } maybeIncrementLogStartOffset(remoteLogStartOffset, LogStartOffsetIncrementReason.SegmentDeletion) } def remoteLogEnabled(): Boolean = { UnifiedLog.isRemoteLogEnabled(remoteStorageSystemEnable, config, topicPartition.topic()) } + def remoteLogEnabledOrRetainPolicy(): Boolean = { Review Comment: This method will always return `true` for topics that are not enabled with tiered storage. Can we add a doc for clarity? Should we also check whether LSO == LLSO? If true and `remoteLogEnabled()` is false, then return the value as `false` instead. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org