[ 
https://issues.apache.org/jira/browse/KAFKA-10334?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luis Araujo updated KAFKA-10334:
--------------------------------
    Description: 
I'm using transactions provided by Kafka Producer API in a Scala project built 
with SBT. The dependency used in the project is: 
{code:java}
"org.apache.kafka" % "kafka-clients" % "2.1.0" {code}
I followed the documentation and I was expecting that transactions fail when I 
call *.commitTransaction* if some problem is raised when sending a message like 
it's described in the 
[documentation|https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#send-org.apache.kafka.clients.producer.ProducerRecord-org.apache.kafka.clients.producer.Callback-].

Unfortunately, when testing this behaviour using a message larger than the size 
accepted by the Kafka broker/cluster, the transactions are not working properly.

I tested with a 3 Kafka broker cluster with 1MB message max size (default 
value):
 - when the message has 1MB, the transaction is aborted and an exception is 
raised when calling *commitTransaction()*
 - when the message is bigger than 1MB, the transaction is completed 
successfully *without* the message being written. No exception is thrown.

As an example, this means that when I produce 9 messages with 1 KB and 1 
message with 1.1MB in the same transaction, the transaction is completed but 
only 9 messages are written to the Kafka cluster.

I tested this behaviour with Kafka version 2.1.0 and 2.3.0 in both Kafka 
cluster and Kafka Producer API.

The configs that I'm using to create the KafkaProducer in order to use 
transactions:
{code:java}
new Properties() {
  {
    put(BOOTSTRAP_SERVERS_CONFIG, 
"localhost:29092,localhost:29093,localhost:29094")
    put(ACKS_CONFIG, "-1")
    put(MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1")
    put(KEY_SERIALIZER_CLASS_CONFIG, 
Class.forName(classOf[StringSerializer].getName))
    put(VALUE_SERIALIZER_CLASS_CONFIG, 
Class.forName(classOf[ByteArraySerializer].getName))
    put(CLIENT_ID_CONFIG, "app")
    put(TRANSACTIONAL_ID_CONFIG, "app")
    put(ENABLE_IDEMPOTENCE_CONFIG, "true")
  }
}
{code}

  was:
I'm using transactions provided by Kafka Producer API in a Scala project built 
with SBT. The dependency used in the project is: 
{code:java}
"org.apache.kafka" % "kafka-clients" % "2.1.0" {code}
I followed the documentation and I was expecting that transactions fail when I 
call *.commitTransaction* if some problem is raised when sending a message like 
it's described in the 
[documentation|https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#send-org.apache.kafka.clients.producer.ProducerRecord-org.apache.kafka.clients.producer.Callback-].

Unfortunately, when testing this behaviour using a message larger than the size 
accepted by the Kafka broker/cluster, the transactions are not working properly.

I tested with a 3 Kafka broker cluster with 1MB message max size (default 
value):
 - when the message has 1MB, the transaction is aborted and an exception is 
raised when calling *commitTransaction()*
 - when the message is bigger than 1MB, the transaction is completed 
successfully *without* the message being written. No exception is thrown.

As an example, this means that when I produce 9 messages with 1 KB and 1 
message with 1.1MB in the same transaction, the transaction is completed but 
only 9 messages are written to the Kafka cluster.

I tested this behaviour with Kafka version 2.1.0 and 2.3.0 in both Kafka 
cluster and Kafka Producer API.


> Transactions not working properly
> ---------------------------------
>
>                 Key: KAFKA-10334
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10334
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients, producer 
>    Affects Versions: 2.1.0, 2.3.0
>            Reporter: Luis Araujo
>            Priority: Major
>
> I'm using transactions provided by Kafka Producer API in a Scala project 
> built with SBT. The dependency used in the project is: 
> {code:java}
> "org.apache.kafka" % "kafka-clients" % "2.1.0" {code}
> I followed the documentation and I was expecting that transactions fail when 
> I call *.commitTransaction* if some problem is raised when sending a message 
> like it's described in the 
> [documentation|https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#send-org.apache.kafka.clients.producer.ProducerRecord-org.apache.kafka.clients.producer.Callback-].
> Unfortunately, when testing this behaviour using a message larger than the 
> size accepted by the Kafka broker/cluster, the transactions are not working 
> properly.
> I tested with a 3 Kafka broker cluster with 1MB message max size (default 
> value):
>  - when the message has 1MB, the transaction is aborted and an exception is 
> raised when calling *commitTransaction()*
>  - when the message is bigger than 1MB, the transaction is completed 
> successfully *without* the message being written. No exception is thrown.
> As an example, this means that when I produce 9 messages with 1 KB and 1 
> message with 1.1MB in the same transaction, the transaction is completed but 
> only 9 messages are written to the Kafka cluster.
> I tested this behaviour with Kafka version 2.1.0 and 2.3.0 in both Kafka 
> cluster and Kafka Producer API.
> The configs that I'm using to create the KafkaProducer in order to use 
> transactions:
> {code:java}
> new Properties() {
>   {
>     put(BOOTSTRAP_SERVERS_CONFIG, 
> "localhost:29092,localhost:29093,localhost:29094")
>     put(ACKS_CONFIG, "-1")
>     put(MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1")
>     put(KEY_SERIALIZER_CLASS_CONFIG, 
> Class.forName(classOf[StringSerializer].getName))
>     put(VALUE_SERIALIZER_CLASS_CONFIG, 
> Class.forName(classOf[ByteArraySerializer].getName))
>     put(CLIENT_ID_CONFIG, "app")
>     put(TRANSACTIONAL_ID_CONFIG, "app")
>     put(ENABLE_IDEMPOTENCE_CONFIG, "true")
>   }
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to