brandboat commented on code in PR #18448:
URL: https://github.com/apache/kafka/pull/18448#discussion_r1915562323
##########
core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIntegrationTest.scala:
##########
@@ -85,6 +88,50 @@ class ProducerIntegrationTest {
} finally if (producer != null) producer.close()
}
+ @ClusterTests(Array(
+ new ClusterTest(
+ features = Array(
+ new ClusterFeature(feature = Feature.TRANSACTION_VERSION, version =
0))),
+ new ClusterTest(
+ features = Array(
+ new ClusterFeature(feature = Feature.TRANSACTION_VERSION, version =
1))),
+ new ClusterTest(
+ features = Array(
+ new ClusterFeature(feature = Feature.TRANSACTION_VERSION, version =
2))),
+ ))
+ def testTransactionWithInvalidSend(cluster: ClusterInstance): Unit = {
+ val topic = new NewTopic("foobar", 1,
1.toShort).configs(Collections.singletonMap(TopicConfig.MAX_MESSAGE_BYTES_CONFIG,
"1"))
+ val admin = cluster.admin()
+ var txnVersion: Short = 0
+ try {
+ txnVersion =
Option(admin.describeFeatures().featureMetadata().get().finalizedFeatures().get(Feature.TRANSACTION_VERSION))
+ .map(finalizedFeatures => finalizedFeatures.maxVersionLevel())
+ .getOrElse(0)
+ admin.createTopics(List(topic).asJava)
+ } finally if (admin != null) admin.close()
+
+ val properties = new util.HashMap[String, Object]
+ properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "foobar")
+ properties.put(ProducerConfig.CLIENT_ID_CONFIG, "test")
+ properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")
+
+ val producer: Producer[Array[Byte], Array[Byte]] =
cluster.producer(properties)
+ try {
+ producer.initTransactions()
+ producer.beginTransaction()
+ assertInstanceOf(classOf[RecordTooLargeException],
+ assertThrows(classOf[ExecutionException],
+ () => producer.send(new ProducerRecord[Array[Byte],
Array[Byte]](topic.name(), "key".getBytes, "value".getBytes)).get()).getCause)
+
+ val commitError = assertThrows(classOf[KafkaException], () =>
producer.commitTransaction()) // fail due to last send failed
+ assertInstanceOf(classOf[RecordTooLargeException], commitError.getCause)
+
+ if (txnVersion == 2) {
Review Comment:
> small nit -- is there a reason to run this test on the other versions? I
think we should either have cases to test for versions 0 and 1 or not run those
for this one. WDYT?
Yeah.. though I think the test case perhaps is wrong. But I observed that
the abortTransaction in this test case only works under TV_2 instead of TV_0
and TV_1. I haven't look into it yet, not sure why...
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]