jolshan commented on code in PR #18448:
URL: https://github.com/apache/kafka/pull/18448#discussion_r1920444227


##########
core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIntegrationTest.scala:
##########
@@ -85,6 +88,50 @@ class ProducerIntegrationTest {
     } finally if (producer != null) producer.close()
   }
 
+  @ClusterTests(Array(
+    new ClusterTest(
+      features = Array(
+        new ClusterFeature(feature = Feature.TRANSACTION_VERSION, version = 
0))),
+    new ClusterTest(
+      features = Array(
+        new ClusterFeature(feature = Feature.TRANSACTION_VERSION, version = 
1))),
+    new ClusterTest(
+      features = Array(
+        new ClusterFeature(feature = Feature.TRANSACTION_VERSION, version = 
2))),
+  ))
+  def testTransactionWithInvalidSend(cluster: ClusterInstance): Unit = {
+    val topic = new NewTopic("foobar", 1, 
1.toShort).configs(Collections.singletonMap(TopicConfig.MAX_MESSAGE_BYTES_CONFIG,
 "1"))
+    val admin = cluster.admin()
+    var txnVersion: Short = 0
+    try {
+      txnVersion = 
Option(admin.describeFeatures().featureMetadata().get().finalizedFeatures().get(Feature.TRANSACTION_VERSION))
+        .map(finalizedFeatures => finalizedFeatures.maxVersionLevel())
+        .getOrElse(0)
+      admin.createTopics(List(topic).asJava)
+    } finally if (admin != null) admin.close()
+
+    val properties = new util.HashMap[String, Object]
+    properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "foobar")
+    properties.put(ProducerConfig.CLIENT_ID_CONFIG, "test")
+    properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")
+
+    val producer: Producer[Array[Byte], Array[Byte]] = 
cluster.producer(properties)
+    try {
+      producer.initTransactions()
+      producer.beginTransaction()
+      assertInstanceOf(classOf[RecordTooLargeException],
+        assertThrows(classOf[ExecutionException],
+          () => producer.send(new ProducerRecord[Array[Byte], 
Array[Byte]](topic.name(), "key".getBytes, "value".getBytes)).get()).getCause)
+
+      val commitError = assertThrows(classOf[KafkaException], () => 
producer.commitTransaction()) // fail due to last send failed
+      assertInstanceOf(classOf[RecordTooLargeException], commitError.getCause)
+
+      if (txnVersion == 2) {

Review Comment:
   Let me ponder this a bit. Thanks @brandboat for the deeper investigation. Is 
this because the TV0, TV1, send intProducerId when it needs to bump the epoch? 
This is a difference between TV0/1 and TV2. 
   
   If this is correct, the issue is that any request after EndTxn will see 
concurrent transactions, which makes sense and is expected.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to