C0urante commented on a change in pull request #11508: URL: https://github.com/apache/kafka/pull/11508#discussion_r792021213
########## File path: clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java ########## @@ -1019,6 +1022,149 @@ public void testMeasureAbortTransactionDuration() { } } + @Test + public void testCommitTransactionWithRecordTooLargeException() throws Exception { + Map<String, Object> configs = new HashMap<>(); + configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "some.id"); + configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000"); + configs.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 1000); + Time time = new MockTime(1); + MetadataResponse initialUpdateResponse = RequestTestUtils.metadataUpdateWith(1, singletonMap("topic", 1)); + ProducerMetadata metadata = mock(ProducerMetadata.class); + MockClient client = new MockClient(time, metadata); + client.updateMetadata(initialUpdateResponse); + client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "some.id", host1)); + client.prepareResponse(initProducerIdResponse(1L, (short) 5, Errors.NONE)); + + when(metadata.fetch()).thenReturn(onePartitionCluster); + + String largeString = IntStream.range(0, 1000).mapToObj(i -> "**********").collect(Collectors.joining()); Review comment: Haha fair, will do. ########## File path: clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java ########## @@ -1019,6 +1022,149 @@ public void testMeasureAbortTransactionDuration() { } } + @Test + public void testCommitTransactionWithRecordTooLargeException() throws Exception { + Map<String, Object> configs = new HashMap<>(); + configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "some.id"); + configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000"); + configs.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 1000); + Time time = new MockTime(1); + MetadataResponse initialUpdateResponse = RequestTestUtils.metadataUpdateWith(1, singletonMap("topic", 1)); + ProducerMetadata metadata = mock(ProducerMetadata.class); + MockClient client = new MockClient(time, metadata); + client.updateMetadata(initialUpdateResponse); + client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "some.id", host1)); + client.prepareResponse(initProducerIdResponse(1L, (short) 5, Errors.NONE)); + + when(metadata.fetch()).thenReturn(onePartitionCluster); + + String largeString = IntStream.range(0, 1000).mapToObj(i -> "**********").collect(Collectors.joining()); + ProducerRecord<String, String> largeRecord = new ProducerRecord<>(topic, "large string", largeString); + + try (KafkaProducer<String, String> producer = kafkaProducer(configs, new StringSerializer(), + new StringSerializer(), metadata, client, null, time)) { + producer.initTransactions(); + + client.prepareResponse(endTxnResponse(Errors.NONE)); + producer.beginTransaction(); + TestUtils.assertFutureError(producer.send(largeRecord), RecordTooLargeException.class); + assertThrows(KafkaException.class, producer::commitTransaction); + } + } + + @Test + public void testCommitTransactionWithMetadataTimeoutForMissingTopic() throws Exception { + Map<String, Object> configs = new HashMap<>(); + configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "some.id"); + configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); + configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 60000); + + // Create a record for a not-yet-created topic + ProducerRecord<String, String> record = new ProducerRecord<>(topic, 2, null, "value"); Review comment: Good call, done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org