rondagostino commented on a change in pull request #10282:
URL: https://github.com/apache/kafka/pull/10282#discussion_r590548220



##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -425,36 +425,35 @@ class Partition(val topicPartition: TopicPartition,
   }
 
   /**
-   * Checks if the topic ID provided in the request is consistent with the 
topic ID in the log.
+   * Checks if the topic ID received is consistent with the topic ID in the 
log.
    * If a valid topic ID is provided, and the log exists but has no ID set, 
set the log ID to be the request ID.
    *
-   * @param requestTopicId the topic ID from the request
+   * @param receivedTopicId the topic ID from the LeaderAndIsr request or from 
the metadata records
    * @return true if the request topic id is consistent, false otherwise
    */
-  def checkOrSetTopicId(requestTopicId: Uuid): Boolean = {
-    // If the request had an invalid topic ID, then we assume that topic IDs 
are not supported.
-    // The topic ID was not inconsistent, so return true.
-    // If the log is empty, then we can not say that topic ID is inconsistent, 
so return true.
-    if (requestTopicId == null || requestTopicId == Uuid.ZERO_UUID)
-      true
-    else {
+  def checkOrSetTopicId(receivedTopicId: Uuid, usingRaft: Boolean): Boolean = {
+    // If the request had an invalid topic ID, then we assume that topic IDs 
are not supported so ID is consistent.
+    // This is only the case when from LeaderAndIsr Request. Raft code should 
never have invalid topic IDs.
+    if (receivedTopicId == null || receivedTopicId == Uuid.ZERO_UUID) {
+      if (usingRaft) false else true

Review comment:
       Maybe just `!usingRaft // only okay when not using Raft`
   
   Is this okay when using ZooKeeper with interBrokerProtocolVersion >= 
KAFKA_2_8_IV0?

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1369,7 +1369,7 @@ class ReplicaManager(val config: KafkaConfig,
               val requestLeaderEpoch = partitionState.leaderEpoch
               val requestTopicId = topicIds.get(topicPartition.topic)
 
-              if (!partition.checkOrSetTopicId(requestTopicId)) {
+              if (!partition.checkOrSetTopicId(requestTopicId, false)) {

Review comment:
       `s/false/usingRaft = false/` for clarity?

##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -425,36 +425,35 @@ class Partition(val topicPartition: TopicPartition,
   }
 
   /**
-   * Checks if the topic ID provided in the request is consistent with the 
topic ID in the log.
+   * Checks if the topic ID received is consistent with the topic ID in the 
log.
    * If a valid topic ID is provided, and the log exists but has no ID set, 
set the log ID to be the request ID.
    *
-   * @param requestTopicId the topic ID from the request
+   * @param receivedTopicId the topic ID from the LeaderAndIsr request or from 
the metadata records
    * @return true if the request topic id is consistent, false otherwise

Review comment:
       s/request/received/

##########
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##########
@@ -1892,8 +1892,9 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: 
Boolean, dynamicConfigO
     }
   }
 
+  // Topic IDs are used with all self-managed quorum clusters and ZK cluster 
with IBP greater than or equal to 2.8
   def usesTopicId: Boolean =
-    interBrokerProtocolVersion >= KAFKA_2_8_IV0
+    usesSelfManagedQuorum || (requiresZookeeper && interBrokerProtocolVersion 
>= KAFKA_2_8_IV0)

Review comment:
       `requiresZookeeper` is always the inverse of `usesSelfManagedQuorum`, so 
this can simply be `usesSelfManagedQuorum || interBrokerProtocolVersion >= 
KAFKA_2_8_IV0`

##########
File path: core/src/main/scala/kafka/server/RaftReplicaManager.scala
##########
@@ -251,6 +251,14 @@ class RaftReplicaManager(config: KafkaConfig,
               (Some(Partition(topicPartition, time, configRepository, this)), 
None)
           }
           partition.foreach { partition =>
+            builder.topicNameToId(partition.topic) match {
+              case Some(id) =>
+                if (!partition.checkOrSetTopicId(id, true))

Review comment:
       `s/true/usingRaft = true/` to make it clear what this boolean is?  Also, 
since this same section of code appears twice, maybe refactor it out into a 
helper method?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to