junrao commented on code in PR #15968:
URL: https://github.com/apache/kafka/pull/15968#discussion_r1868206289


##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -788,7 +814,9 @@ class ReplicaManager(val config: KafkaConfig,
    * @param requiredAcks                  number of replicas who must 
acknowledge the append before sending the response
    * @param internalTopicsAllowed         boolean indicating whether internal 
topics can be appended to
    * @param origin                        source of the append request (ie, 
client, replication, coordinator)
-   * @param entriesPerPartition           the records per partition to be 
appended
+   * @param entriesPerPartition           the records per topic partition to 
be appended.
+   *                                      If topic partition contains 
Uuid.ZERO_UUID or null as topicId the method
+   *                                      will fall back to the old behaviour 
and relay on topic name.

Review Comment:
   relay => rely



##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -1447,37 +1475,36 @@ class ReplicaManager(val config: KafkaConfig,
     if (traceEnabled)
       trace(s"Append [$entriesPerPartition] to local log")
 
-    entriesPerPartition.map { case (topicPartition, records) =>
-      
brokerTopicStats.topicStats(topicPartition.topic).totalProduceRequestRate.mark()
+    entriesPerPartition.map { case (topicIdPartition, records) =>
+      
brokerTopicStats.topicStats(topicIdPartition.topic).totalProduceRequestRate.mark()
       brokerTopicStats.allTopicsStats.totalProduceRequestRate.mark()
 
       // reject appending to internal topics if it is not allowed
-      if (Topic.isInternal(topicPartition.topic) && !internalTopicsAllowed) {
-        (new TopicOptionalIdPartition(Optional.empty(), topicPartition), 
LogAppendResult(
+

Review Comment:
   extra new line



##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -788,7 +814,9 @@ class ReplicaManager(val config: KafkaConfig,
    * @param requiredAcks                  number of replicas who must 
acknowledge the append before sending the response
    * @param internalTopicsAllowed         boolean indicating whether internal 
topics can be appended to
    * @param origin                        source of the append request (ie, 
client, replication, coordinator)
-   * @param entriesPerPartition           the records per partition to be 
appended
+   * @param entriesPerPartition           the records per topic partition to 
be appended.
+   *                                      If topic partition contains 
Uuid.ZERO_UUID or null as topicId the method

Review Comment:
   It seems that topicId can only be 0, but not null? If null is indeed 
allowed, we need to check that in all places where we check for 0.



##########
core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala:
##########
@@ -136,11 +138,10 @@ class CoordinatorPartitionWriterTest {
       VerificationGuard.SENTINEL,
       batch
     ))
-
     assertEquals(
       batch,
-      recordsCapture.getValue.getOrElse(tp,
-        throw new AssertionError(s"No records for $tp"))
+      recordsCapture.getValue.find(_._1 == new TopicIdPartition(topicId, 
tp)).getOrElse(

Review Comment:
   Could we use case to avoid unnamed references?



##########
core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala:
##########
@@ -92,9 +92,12 @@ class AuthorizerIntegrationTest extends 
AbstractAuthorizerIntegrationTest {
   val requestKeyToError = (topicNames: Map[Uuid, String], version: Short) => 
Map[ApiKeys, Nothing => Errors](
     ApiKeys.METADATA -> ((resp: requests.MetadataResponse) => 
resp.errors.asScala.find(_._1 == topic).getOrElse(("test", Errors.NONE))._2),
     ApiKeys.PRODUCE -> ((resp: requests.ProduceResponse) => {
+
+      val topicId = topicNames.find { case (topicId, topicName) => topicName 
== topic}.map(_._1).getOrElse(Uuid.ZERO_UUID)

Review Comment:
   It's probably clearer if we do
   `val topicId = topicNames.find { case (_, topicName) => topicName == 
topic}.map{ case (topicId, _) => topicId).getOrElse(Uuid.ZERO_UUID)`



##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -953,8 +980,8 @@ class ReplicaManager(val config: KafkaConfig,
   }
 
   private def buildProducePartitionStatus(
-    results: Map[TopicPartition, LogAppendResult]
-  ): Map[TopicPartition, ProducePartitionStatus] = {
+    results: Map[TopicIdPartition, LogAppendResult]
+  ): Map[TopicIdPartition, ProducePartitionStatus] = {

Review Comment:
   Could we change topicPartition to topicIdPartition in the line below?



##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -1486,16 +1513,17 @@ class ReplicaManager(val config: KafkaConfig,
                    _: RecordTooLargeException |
                    _: RecordBatchTooLargeException |
                    _: CorruptRecordException |
-                   _: KafkaStorageException) =>
-            (new TopicOptionalIdPartition(Optional.empty(), topicPartition), 
LogAppendResult(LogAppendInfo.UNKNOWN_LOG_APPEND_INFO, Some(e), 
hasCustomErrorMessage = false))
+                   _: KafkaStorageException |
+                   _: UnknownTopicIdException) =>

Review Comment:
   Could we add the new error code in ProduceResponse? Also, it seems that the 
original KIP doesn't include this new error code for the producer. It would be 
useful to update the KIP and the email thread.



##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -2636,20 +2636,22 @@ class KafkaApisTest extends Logging {
   @Test
   def shouldReplaceProducerFencedWithInvalidProducerEpochInProduceResponse(): 
Unit = {
     val topic = "topic"
-    addTopicToMetadataCache(topic, numPartitions = 2)
+    val topicId = Uuid.fromString("d2Gg8tgzJa2JYK2eTHUapg")
+    val tp = new TopicIdPartition(topicId, 0, "topic")
+    addTopicToMetadataCache(topic, numPartitions = 2, topicId = topicId)
 
     for (version <- ApiKeys.PRODUCE.oldestVersion to 
ApiKeys.PRODUCE.latestVersion) {
 
       reset(replicaManager, clientQuotaManager, clientRequestQuotaManager, 
requestChannel, txnCoordinator)
 
-      val responseCallback: ArgumentCaptor[Map[TopicPartition, 
PartitionResponse] => Unit] = 
ArgumentCaptor.forClass(classOf[Map[TopicPartition, PartitionResponse] => Unit])
-
-      val tp = new TopicPartition("topic", 0)
+      val responseCallback: ArgumentCaptor[Map[TopicIdPartition, 
PartitionResponse] => Unit] = 
ArgumentCaptor.forClass(classOf[Map[TopicIdPartition, PartitionResponse] => 
Unit])
 
       val produceRequest = ProduceRequest.forCurrentMagic(new 
ProduceRequestData()
         .setTopicData(new ProduceRequestData.TopicProduceDataCollection(
           Collections.singletonList(new ProduceRequestData.TopicProduceData()
-            .setName(tp.topic).setPartitionData(Collections.singletonList(
+            .setName(tp.topic)

Review Comment:
   There is no need to set the topic name? Ditto below.



##########
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##########
@@ -3143,7 +3147,7 @@ class ReplicaManagerTest {
       requiredAcks = requiredAcks,
       internalTopicsAllowed = false,
       transactionalId = transactionalId,
-      entriesPerPartition = entriesToAppend,
+      entriesPerPartition = entriesToAppend.map(e => 
replicaManager.topicIdPartition(e._1) -> e._2),

Review Comment:
   Could we use case to avoid unnamed references?



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -2478,7 +2493,8 @@ class KafkaApis(val requestChannel: RequestChannel,
           } else {
             // Otherwise, the regular appendRecords path is used for all the 
non __consumer_offsets
             // partitions or for all partitions when the new group coordinator 
is disabled.
-            controlRecords += partition -> 
MemoryRecords.withEndTransactionMarker(
+            // If topicIdPartition contains Uuid.ZERO_UUid or null all 
functionality will fall back on topic name.

Review Comment:
   The topicId can't be null, right?



##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -860,20 +886,21 @@ class ReplicaManager(val config: KafkaConfig,
                           requiredAcks: Short,
                           internalTopicsAllowed: Boolean,
                           transactionalId: String,
-                          entriesPerPartition: Map[TopicPartition, 
MemoryRecords],
-                          responseCallback: Map[TopicPartition, 
PartitionResponse] => Unit,
-                          recordValidationStatsCallback: Map[TopicPartition, 
RecordValidationStats] => Unit = _ => (),
+                          entriesPerPartition: Map[TopicIdPartition, 
MemoryRecords],

Review Comment:
   It would be useful to document that unlike `appendRecords`, the topicIds in 
entriesPerPartition are always present. I am still a bit concerned about this 
discrepancy. It would be better if these two apis are consistent.



##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -2892,18 +2914,20 @@ class KafkaApisTest extends Logging {
     val topic = "topic"
     val transactionalId = "txn1"
 
-    addTopicToMetadataCache(topic, numPartitions = 2)
+    val topicId = Uuid.fromString("d2Gg8tgzJa2JYK2eTHUapg")
+    val tp = new TopicIdPartition(topicId, 0, "topic")
+    addTopicToMetadataCache(topic, numPartitions = 2, topicId = tp.topicId())
 
     for (version <- 3 to ApiKeys.PRODUCE.latestVersion) {
 
       reset(replicaManager, clientQuotaManager, clientRequestQuotaManager, 
requestChannel, txnCoordinator)
 
-      val tp = new TopicPartition("topic", 0)
-
       val produceRequest = ProduceRequest.forCurrentMagic(new 
ProduceRequestData()
         .setTopicData(new ProduceRequestData.TopicProduceDataCollection(
           Collections.singletonList(new ProduceRequestData.TopicProduceData()
-            .setName(tp.topic).setPartitionData(Collections.singletonList(
+            .setName(tp.topic)

Review Comment:
   Should we set either the topic name or the topic id as we did in 
`testProduceResponseMetadataLookupErrorOnNotLeaderOrFollower`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to