David Jacot created KAFKA-16194:
-----------------------------------

             Summary: KafkaConsumer.groupMetadata() should be correct when 
first records are returned
                 Key: KAFKA-16194
                 URL: https://issues.apache.org/jira/browse/KAFKA-16194
             Project: Kafka
          Issue Type: Sub-task
            Reporter: David Jacot


The following code returns records before the group metadata is updated. This 
fails the first transactions ever run by the Producer/Consumer.

 
{code:java}
Producer<String, String> txnProducer = new KafkaProducer<>(txnProducerProps);
Consumer<String, String> consumer = new KafkaConsumer<>(consumerProps);

txnProducer.initTransactions();
System.out.println("Init transactions called");

try {
    txnProducer.beginTransaction();
    System.out.println("Begin transactions called");

    consumer.subscribe(Collections.singletonList("input"));
    System.out.println("Consumer subscribed to topic -> KIP848-topic-2 ");

    ConsumerRecords<String, String> records = 
consumer.poll(Duration.ofSeconds(10));
    System.out.println("Returned " + records.count() + " records.");

    // Process and send txn messages.
    for (ConsumerRecord<String, String> processedRecord : records) {
        txnProducer.send(new ProducerRecord<>("output", processedRecord.key(), 
"Processed: " + processedRecord.value()));
    }

    ConsumerGroupMetadata groupMetadata = consumer.groupMetadata();
    System.out.println("Group metadata inside test" + groupMetadata);

    Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();
    for (ConsumerRecord<String, String> record : records) {
        offsetsToCommit.put(new TopicPartition(record.topic(), 
record.partition()),
            new OffsetAndMetadata(record.offset() + 1));
    }
    System.out.println("Offsets to commit" + offsetsToCommit);
    // Send offsets to transaction with ConsumerGroupMetadata.
    txnProducer.sendOffsetsToTransaction(offsetsToCommit, groupMetadata);
    System.out.println("Send offsets to transaction done");

    // Commit the transaction.
    txnProducer.commitTransaction();
    System.out.println("Commit transaction done");
} catch (ProducerFencedException | OutOfOrderSequenceException | 
AuthorizationException e) {
    e.printStackTrace();
    txnProducer.close();
} catch (KafkaException e) {
    e.printStackTrace();
    txnProducer.abortTransaction();
} finally {
    txnProducer.close();
    consumer.close();
} {code}
The issue seems to be that while it waits in `poll`, the event to update the 
group metadata is not processed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to