junrao commented on code in PR #15968: URL: https://github.com/apache/kafka/pull/15968#discussion_r1868206289
########## core/src/main/scala/kafka/server/ReplicaManager.scala: ########## @@ -788,7 +814,9 @@ class ReplicaManager(val config: KafkaConfig, * @param requiredAcks number of replicas who must acknowledge the append before sending the response * @param internalTopicsAllowed boolean indicating whether internal topics can be appended to * @param origin source of the append request (ie, client, replication, coordinator) - * @param entriesPerPartition the records per partition to be appended + * @param entriesPerPartition the records per topic partition to be appended. + * If topic partition contains Uuid.ZERO_UUID or null as topicId the method + * will fall back to the old behaviour and relay on topic name. Review Comment: relay => rely ########## core/src/main/scala/kafka/server/ReplicaManager.scala: ########## @@ -1447,37 +1475,36 @@ class ReplicaManager(val config: KafkaConfig, if (traceEnabled) trace(s"Append [$entriesPerPartition] to local log") - entriesPerPartition.map { case (topicPartition, records) => - brokerTopicStats.topicStats(topicPartition.topic).totalProduceRequestRate.mark() + entriesPerPartition.map { case (topicIdPartition, records) => + brokerTopicStats.topicStats(topicIdPartition.topic).totalProduceRequestRate.mark() brokerTopicStats.allTopicsStats.totalProduceRequestRate.mark() // reject appending to internal topics if it is not allowed - if (Topic.isInternal(topicPartition.topic) && !internalTopicsAllowed) { - (new TopicOptionalIdPartition(Optional.empty(), topicPartition), LogAppendResult( + Review Comment: extra new line ########## core/src/main/scala/kafka/server/ReplicaManager.scala: ########## @@ -788,7 +814,9 @@ class ReplicaManager(val config: KafkaConfig, * @param requiredAcks number of replicas who must acknowledge the append before sending the response * @param internalTopicsAllowed boolean indicating whether internal topics can be appended to * @param origin source of the append request (ie, client, replication, coordinator) - * @param entriesPerPartition the records per partition to be appended + * @param entriesPerPartition the records per topic partition to be appended. + * If topic partition contains Uuid.ZERO_UUID or null as topicId the method Review Comment: It seems that topicId can only be 0, but not null? If null is indeed allowed, we need to check that in all places where we check for 0. ########## core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala: ########## @@ -136,11 +138,10 @@ class CoordinatorPartitionWriterTest { VerificationGuard.SENTINEL, batch )) - assertEquals( batch, - recordsCapture.getValue.getOrElse(tp, - throw new AssertionError(s"No records for $tp")) + recordsCapture.getValue.find(_._1 == new TopicIdPartition(topicId, tp)).getOrElse( Review Comment: Could we use case to avoid unnamed references? ########## core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala: ########## @@ -92,9 +92,12 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest { val requestKeyToError = (topicNames: Map[Uuid, String], version: Short) => Map[ApiKeys, Nothing => Errors]( ApiKeys.METADATA -> ((resp: requests.MetadataResponse) => resp.errors.asScala.find(_._1 == topic).getOrElse(("test", Errors.NONE))._2), ApiKeys.PRODUCE -> ((resp: requests.ProduceResponse) => { + + val topicId = topicNames.find { case (topicId, topicName) => topicName == topic}.map(_._1).getOrElse(Uuid.ZERO_UUID) Review Comment: It's probably clearer if we do `val topicId = topicNames.find { case (_, topicName) => topicName == topic}.map{ case (topicId, _) => topicId).getOrElse(Uuid.ZERO_UUID)` ########## core/src/main/scala/kafka/server/ReplicaManager.scala: ########## @@ -953,8 +980,8 @@ class ReplicaManager(val config: KafkaConfig, } private def buildProducePartitionStatus( - results: Map[TopicPartition, LogAppendResult] - ): Map[TopicPartition, ProducePartitionStatus] = { + results: Map[TopicIdPartition, LogAppendResult] + ): Map[TopicIdPartition, ProducePartitionStatus] = { Review Comment: Could we change topicPartition to topicIdPartition in the line below? ########## core/src/main/scala/kafka/server/ReplicaManager.scala: ########## @@ -1486,16 +1513,17 @@ class ReplicaManager(val config: KafkaConfig, _: RecordTooLargeException | _: RecordBatchTooLargeException | _: CorruptRecordException | - _: KafkaStorageException) => - (new TopicOptionalIdPartition(Optional.empty(), topicPartition), LogAppendResult(LogAppendInfo.UNKNOWN_LOG_APPEND_INFO, Some(e), hasCustomErrorMessage = false)) + _: KafkaStorageException | + _: UnknownTopicIdException) => Review Comment: Could we add the new error code in ProduceResponse? Also, it seems that the original KIP doesn't include this new error code for the producer. It would be useful to update the KIP and the email thread. ########## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ########## @@ -2636,20 +2636,22 @@ class KafkaApisTest extends Logging { @Test def shouldReplaceProducerFencedWithInvalidProducerEpochInProduceResponse(): Unit = { val topic = "topic" - addTopicToMetadataCache(topic, numPartitions = 2) + val topicId = Uuid.fromString("d2Gg8tgzJa2JYK2eTHUapg") + val tp = new TopicIdPartition(topicId, 0, "topic") + addTopicToMetadataCache(topic, numPartitions = 2, topicId = topicId) for (version <- ApiKeys.PRODUCE.oldestVersion to ApiKeys.PRODUCE.latestVersion) { reset(replicaManager, clientQuotaManager, clientRequestQuotaManager, requestChannel, txnCoordinator) - val responseCallback: ArgumentCaptor[Map[TopicPartition, PartitionResponse] => Unit] = ArgumentCaptor.forClass(classOf[Map[TopicPartition, PartitionResponse] => Unit]) - - val tp = new TopicPartition("topic", 0) + val responseCallback: ArgumentCaptor[Map[TopicIdPartition, PartitionResponse] => Unit] = ArgumentCaptor.forClass(classOf[Map[TopicIdPartition, PartitionResponse] => Unit]) val produceRequest = ProduceRequest.forCurrentMagic(new ProduceRequestData() .setTopicData(new ProduceRequestData.TopicProduceDataCollection( Collections.singletonList(new ProduceRequestData.TopicProduceData() - .setName(tp.topic).setPartitionData(Collections.singletonList( + .setName(tp.topic) Review Comment: There is no need to set the topic name? Ditto below. ########## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ########## @@ -3143,7 +3147,7 @@ class ReplicaManagerTest { requiredAcks = requiredAcks, internalTopicsAllowed = false, transactionalId = transactionalId, - entriesPerPartition = entriesToAppend, + entriesPerPartition = entriesToAppend.map(e => replicaManager.topicIdPartition(e._1) -> e._2), Review Comment: Could we use case to avoid unnamed references? ########## core/src/main/scala/kafka/server/KafkaApis.scala: ########## @@ -2478,7 +2493,8 @@ class KafkaApis(val requestChannel: RequestChannel, } else { // Otherwise, the regular appendRecords path is used for all the non __consumer_offsets // partitions or for all partitions when the new group coordinator is disabled. - controlRecords += partition -> MemoryRecords.withEndTransactionMarker( + // If topicIdPartition contains Uuid.ZERO_UUid or null all functionality will fall back on topic name. Review Comment: The topicId can't be null, right? ########## core/src/main/scala/kafka/server/ReplicaManager.scala: ########## @@ -860,20 +886,21 @@ class ReplicaManager(val config: KafkaConfig, requiredAcks: Short, internalTopicsAllowed: Boolean, transactionalId: String, - entriesPerPartition: Map[TopicPartition, MemoryRecords], - responseCallback: Map[TopicPartition, PartitionResponse] => Unit, - recordValidationStatsCallback: Map[TopicPartition, RecordValidationStats] => Unit = _ => (), + entriesPerPartition: Map[TopicIdPartition, MemoryRecords], Review Comment: It would be useful to document that unlike `appendRecords`, the topicIds in entriesPerPartition are always present. I am still a bit concerned about this discrepancy. It would be better if these two apis are consistent. ########## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ########## @@ -2892,18 +2914,20 @@ class KafkaApisTest extends Logging { val topic = "topic" val transactionalId = "txn1" - addTopicToMetadataCache(topic, numPartitions = 2) + val topicId = Uuid.fromString("d2Gg8tgzJa2JYK2eTHUapg") + val tp = new TopicIdPartition(topicId, 0, "topic") + addTopicToMetadataCache(topic, numPartitions = 2, topicId = tp.topicId()) for (version <- 3 to ApiKeys.PRODUCE.latestVersion) { reset(replicaManager, clientQuotaManager, clientRequestQuotaManager, requestChannel, txnCoordinator) - val tp = new TopicPartition("topic", 0) - val produceRequest = ProduceRequest.forCurrentMagic(new ProduceRequestData() .setTopicData(new ProduceRequestData.TopicProduceDataCollection( Collections.singletonList(new ProduceRequestData.TopicProduceData() - .setName(tp.topic).setPartitionData(Collections.singletonList( + .setName(tp.topic) Review Comment: Should we set either the topic name or the topic id as we did in `testProduceResponseMetadataLookupErrorOnNotLeaderOrFollower`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org