[
https://issues.apache.org/jira/browse/KAFKA-5880?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Ray Chiang updated KAFKA-5880:
------------------------------
Component/s: clients
> Transactional producer and read committed consumer causes consumer to stuck
> ---------------------------------------------------------------------------
>
> Key: KAFKA-5880
> URL: https://issues.apache.org/jira/browse/KAFKA-5880
> Project: Kafka
> Issue Type: Bug
> Components: clients
> Reporter: Lae
> Priority: Major
> Attachments: index-updates-3.zip
>
>
> We use transactional producers, and have configured isolation level on the
> consumer to only read committed data. The consumer has somehow got into a
> stuck state where it can no longer move forward because the Kafka server
> always return empty list of records despite there are thousands more
> successful transactions after the offset.
> This is an example producer code:
> {code:java}
> Properties config = new Properties();
> config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
> config.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,
> UUID.randomUUID().toString());
> config.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
> config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
> StringSerializer.class.getName());
> config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
> StringSerializer.class.getName());
> try (Producer<String, String> producer = new KafkaProducer<>(config)) {
> producer.initTransactions();
> try {
> producer.beginTransaction();
> // Multiple producer.send(...) here
> producer.commitTransaction();
> } catch (Throwable e) {
> producer.abortTransaction();
> }
> }
> {code}
> This is the test consumer code:
> {code:java}
> Properties config = new Properties();
> config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
> config.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
> config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
> config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
> config.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,
> IsolationLevel.READ_COMMITTED.toString().toLowerCase(ENGLISH));
> config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
> StringDeserializer.class.getName());
> config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
> StringDeserializer.class.getName());
> try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(config)) {
> consumer.subscribe(Collections.singleton("index-updates"));
> while (true) {
> ConsumerRecords<String, String> records = consumer.poll(5000);
> for (ConsumerRecord<String, String> record : records) {
> System.err.println(record.value());
> }
> consumer.commitSync();
> }
> }
> {code}
> I have also attached the problematic partition data index-updates-3.zip, to
> reproduce the issue using the data, you can run a local Kafka instance, then
> create a topic called "index-updates" with 10 partitions, and replace the
> content of the index-updates-3 log directory with the attached content, then
> running the above consumer code.
> Then the consumer will be stuck at some point (not always at the same offset)
> not making anymore progress even if you send new data into the partition
> (other partitions seem fine). The following example is when the consumer was
> stuck at offset 46644, and the Kafka server always return empty list of
> records when the consumer fetches from 46644:
> {noformat}
> root@0b1e67f0c34b:/# /opt/kafka/bin/kafka-consumer-groups.sh --describe
> --group my-group --bootstrap-server localhost:9092
> Note: This will only show information about consumers that use the Java
> consumer API (non-ZooKeeper-based consumers).
> TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
> CONSUMER-ID HOST
> CLIENT-ID
> index-updates 0 15281 15281 0
> consumer-1-c8f3fede-ef6f-4d12-a426-44aa9c47e71f /10.100.1.97
> consumer-1
> index-updates 1 0 0 0
> consumer-1-c8f3fede-ef6f-4d12-a426-44aa9c47e71f /10.100.1.97
> consumer-1
> index-updates 2 0 0 0
> consumer-1-c8f3fede-ef6f-4d12-a426-44aa9c47e71f /10.100.1.97
> consumer-1
> index-updates 3 46644 65735
> 19091 consumer-1-c8f3fede-ef6f-4d12-a426-44aa9c47e71f /10.100.1.97
> consumer-1
> index-updates 4 0 0 0
> consumer-1-c8f3fede-ef6f-4d12-a426-44aa9c47e71f /10.100.1.97
> consumer-1
> index-updates 5 0 0 0
> consumer-1-c8f3fede-ef6f-4d12-a426-44aa9c47e71f /10.100.1.97
> consumer-1
> index-updates 6 0 0 0
> consumer-1-c8f3fede-ef6f-4d12-a426-44aa9c47e71f /10.100.1.97
> consumer-1
> index-updates 7 0 0 0
> consumer-1-c8f3fede-ef6f-4d12-a426-44aa9c47e71f /10.100.1.97
> consumer-1
> index-updates 8 0 0 0
> consumer-1-c8f3fede-ef6f-4d12-a426-44aa9c47e71f /10.100.1.97
> consumer-1
> index-updates 9 0 0 0
> consumer-1-c8f3fede-ef6f-4d12-a426-44aa9c47e71f /10.100.1.97
> consumer-1
> root@0b1e67f0c34b:/#
> {noformat}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)