kamalcph commented on code in PR #16653:
URL: https://github.com/apache/kafka/pull/16653#discussion_r1688147139


##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -1749,7 +1749,7 @@ class ReplicaManager(val config: KafkaConfig,
     //   1) remote log manager is enabled and it is available
     //   2) `log` instance should not be null here as that would have been 
caught earlier with NotLeaderForPartitionException or 
ReplicaNotAvailableException.
     //   3) fetch offset is within the offset range of the remote storage layer
-    if (remoteLogManager.isDefined && log != null && log.remoteLogEnabled() &&
+    if (remoteLogManager.isDefined && log != null && 
log.remoteLogEnabledOrRetainPolicy() &&

Review Comment:
   Thanks for handling the empty follower joining the ISR case during the 
disablement!



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -512,12 +531,17 @@ public void stopPartitions(Set<StopPartition> 
stopPartitions,
                 .filter(sp -> sp.deleteLocalLog() && 
topicIdByPartitionMap.containsKey(sp.topicPartition()))
                 .map(sp -> new 
TopicIdPartition(topicIdByPartitionMap.get(sp.topicPartition()), 
sp.topicPartition()))
                 .collect(Collectors.toSet());
+

Review Comment:
   Can we update the comment in L529? It is no longer true with topic 
disablement and can confuse the reader:
   
   ```
   // Note `deleteLocalLog` will always be true when `deleteRemoteLog` is true 
but not the other way around.
   ```



##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -183,17 +184,18 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   }
 
   def updateLogStartOffsetFromRemoteTier(remoteLogStartOffset: Long): Unit = {
-    if (!remoteLogEnabled()) {
-      error("Ignoring the call as the remote log storage is disabled")
-      return
-    }
     maybeIncrementLogStartOffset(remoteLogStartOffset, 
LogStartOffsetIncrementReason.SegmentDeletion)
   }
 
   def remoteLogEnabled(): Boolean = {
     UnifiedLog.isRemoteLogEnabled(remoteStorageSystemEnable, config, 
topicPartition.topic())
   }
 
+  def remoteLogEnabledOrRetainPolicy(): Boolean = {

Review Comment:
   This method will always return `true` for topics that are not enabled with 
tiered storage. Can we  add a doc for clarity?
   
   Should we also check whether LSO == LLSO? If true and `remoteLogEnabled()` 
is false, then return the value as `false` instead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to