[ 
https://issues.apache.org/jira/browse/KAFKA-9279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16991513#comment-16991513
 ] 

Andrew Klopper edited comment on KAFKA-9279 at 12/9/19 11:53 AM:
-----------------------------------------------------------------

The transaction is supposed to fail if any of the individual sends fails, which 
is clearly stated in the [Kafka Producer 
documentation|https://kafka.apache.org/23/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html]:
{quote}
When used as part of a transaction, it is not necessary to define a callback or 
check the result of the future in order to detect errors from {{send}}. If any 
of the send calls failed with an irrecoverable error, the final 
[{{commitTransaction()}}|https://kafka.apache.org/23/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#commitTransaction--]
 call will fail and throw the exception from the last failed send. When this 
happens, your application should call 
[{{abortTransaction()}}|https://kafka.apache.org/23/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#abortTransaction--]
 to reset the state and continue to send data.
{quote}
The examples on that page also do not check the result of the individual 
producer send calls that form part of a transaction, which further reinforces 
the idea that failing the transaction on any send failure is the expected 
behaviour.
 


was (Author: andrewrk):
The transaction is supposed to fail if any of the individual sends fails, which 
is clearly stated in the Kafka Producer documentation:

"When used as part of a transaction, it is not necessary to define a callback 
or check the result of the future in order to detect errors from {{send}}. If 
any of the send calls failed with an irrecoverable error, the final 
[{{commitTransaction()}}|https://kafka.apache.org/23/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#commitTransaction--]
 call will fail and throw the exception from the last failed send. When this 
happens, your application should call 
[{{abortTransaction()}}|https://kafka.apache.org/23/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#abortTransaction--]
 to reset the state and continue to send data."

The examples on that page also do not check the result of the individual 
producer send calls that form part of a transaction, which further reinforces 
the idea that failing the transaction on any send failure is the expected 
behaviour.

 

 

 

> Silent data loss in Kafka producer
> ----------------------------------
>
>                 Key: KAFKA-9279
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9279
>             Project: Kafka
>          Issue Type: Bug
>          Components: producer 
>    Affects Versions: 2.3.0
>            Reporter: Andrew Klopper
>            Priority: Major
>
> It appears that it is possible for a producer.commitTransaction() call to 
> succeed even if an individual producer.send() call has failed. The following 
> code demonstrates the issue:
> {code:java}
> package org.example.dataloss;
> import java.nio.charset.StandardCharsets;
> import java.util.Properties;
> import java.util.Random;
> import org.apache.kafka.clients.producer.KafkaProducer;
> import org.apache.kafka.clients.producer.ProducerConfig;
> import org.apache.kafka.clients.producer.ProducerRecord;
> import org.apache.kafka.common.serialization.ByteArraySerializer;
> public class Main {
>     public static void main(final String[] args) {
>         final Properties producerProps = new Properties();
>         if (args.length != 2) {
>             System.err.println("Invalid command-line arguments");
>             System.exit(1);
>         }
>         final String bootstrapServer = args[0];
>         final String topic = args[1];
>         producerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
> bootstrapServer);
>         producerProps.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "500000");
>         producerProps.setProperty(ProducerConfig.LINGER_MS_CONFIG, "1000");
>         producerProps.setProperty(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 
> "1000000");
>         producerProps.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, 
> "true");
>         producerProps.setProperty(ProducerConfig.CLIENT_ID_CONFIG, 
> "dataloss_01");
>         producerProps.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, 
> "dataloss_01");
>         try (final KafkaProducer<byte[], byte[]> producer = new 
> KafkaProducer<>(producerProps, new ByteArraySerializer(), new 
> ByteArraySerializer())) {
>             producer.initTransactions();
>             producer.beginTransaction();
>             final Random random = new Random();
>             final byte[] largePayload = new byte[2000000];
>             random.nextBytes(largePayload);
>             producer.send(
>                 new ProducerRecord<>(
>                     topic,
>                     "large".getBytes(StandardCharsets.UTF_8),
>                     largePayload
>                 ),
>                 (metadata, e) -> {
>                     if (e == null) {
>                         System.out.println("INFO: Large payload succeeded");
>                     } else {
>                         System.err.printf("ERROR: Large payload failed: 
> %s\n", e.getMessage());
>                     }
>                 }
>             );
>             producer.commitTransaction();
>             System.out.println("Commit succeeded");
>         } catch (final Exception e) {
>             System.err.printf("FATAL ERROR: %s", e.getMessage());
>         }
>     }
> }
> {code}
> The code prints the following output:
> {code:java}
> ERROR: Large payload failed: The message is 2000093 bytes when serialized 
> which is larger than the maximum request size you have configured with the 
> max.request.size configuration.
> Commit succeeded{code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to