[
https://issues.apache.org/jira/browse/KAFKA-19788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18029980#comment-18029980
]
zhuming commented on KAFKA-19788:
---------------------------------
Thanks. I find the reason of this ClusterAuthorizationException. It involves
the config of idempotence.
https://issues.apache.org/jira/browse/KAFKA-13673
In this kafka-client 3.1.1 issue {color:#de350b}b.
[enable.id|http://enable.id/]empotence unset && acks=all => enable idempotence
.{color}
So Idempotence is set to true based on the producer properties However, the
Kafka server has not enabled the idempotence right for admin. Finally it throws
such ClusterAuthorizationException.
After following command executed in kafka server. Producer can send record
successfully with acks: -1
{code:java}
sh kafka-acls.sh --authorizer-properties zookeeper.connect=xxxx:2191/xxx --add
--allow-principal User:admin --allow-host "*" --operation IdempotentWrite
--cluster {code}
> kafka server ClusterAuthorization don't support acks=-1 for kafka-client
> version>3.1.0
> --------------------------------------------------------------------------------------
>
> Key: KAFKA-19788
> URL: https://issues.apache.org/jira/browse/KAFKA-19788
> Project: Kafka
> Issue Type: Bug
> Components: producer
> Affects Versions: 3.1.1
> Reporter: zhuming
> Priority: Major
>
> * kafka server version is 2.5.1
> * kafka-client version bigger than 3.1.1
>
> {code:java}
> import org.apache.kafka.clients.producer.KafkaProducer;
> import org.apache.kafka.clients.producer.Producer;
> import org.apache.kafka.clients.producer.ProducerRecord;
> import java.util.Properties;
> public class KafkaSendTest {
> public static void main(String[] args) {
> Properties props = new Properties();
> props.put("bootstrap.servers", "xxx:9092,xxxx:9092");
> props.put("key.serializer",
> "org.apache.kafka.common.serialization.ByteArraySerializer");
> props.put("value.serializer",
> "org.apache.kafka.common.serialization.ByteArraySerializer");
> props.put("transaction.timeout.ms", "300000");
> props.put("acks", "-1"); // If acks=1 or acks=0 it will send successfully
> props.put("compression.type", "lz4");
> props.put("security.protocol", "SASL_PLAINTEXT");
> props.put("sasl.mechanism", "SCRAM-SHA-256");
> props.put("sasl.jaas.config",
> "org.apache.kafka.common.security.scram.ScramLoginModule required
> username=\"xxx\" password=\"xxxx\";");
> Producer<byte[], byte[]> producer = new KafkaProducer<>(props);
> try {
> String topic = "topic1";
> byte[] value = new byte[]{1,2}; // example
> ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(topic,
> null, value);
> producer.send(record, (metadata, exception) -> {
> if (exception == null) {
> System.out.printf("Sent record(key=%s value=%s) meta(partition=%d,
> offset=%d)\n",
> record.key(), new String(record.value()),
> metadata.partition(), metadata.offset());
> } else {
> exception.printStackTrace();
> }
> });
> producer.close();
> } catch (Exception e) {
> e.printStackTrace();
> }
> }
> } {code}
> pom.xml config
> {code:java}
> <dependency>
> <groupId>org.apache.kafka</groupId>
> <artifactId>kafka-clients</artifactId>
> <version>3.4.0</version>
> </dependency> {code}
> When kafka producer acks=-1, It will throw exception.
>
> {code:java}
> org.apache.kafka.common.KafkaException: Cannot execute transactional method
> because we are in an error state at
> org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:1010)
> at
> org.apache.kafka.clients.producer.internals.TransactionManager.maybeAddPartition(TransactionManager.java:328)
> at
> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1061)
> at
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:962)
> at
> com.mvad.realtime.show.converter.DataEntryToRTLogConverterTest.main(DataEntryToRTLogConverterTest.java:34)Caused
> by: org.apache.kafka.common.errors.ClusterAuthorizationException: Cluster
> authorization failed. {code}
> If acks=1 or acks=0 it will send successfully
> {code:java}
> Sent record(key=null ) meta(partition=6, offset=321496) {code}
> acks=-1 is just a param, How it effects ClusterAuthorization of kafka
> producer.
> Is this a bug or a mechanism in itself?
>
> If change kafka-client verison to 3.1.0. When kafka producer acks=-1, It will
> send successfully
> {code:java}
> <dependency>
> <groupId>org.apache.kafka</groupId>
> <artifactId>kafka-clients</artifactId>
> <version>3.1.0</version>
> </dependency> {code}
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)