Lingxiao WANG created KAFKA-6782:
------------------------------------

             Summary: GlobalStateStore never finishes restoring when consuming 
transactional messages
                 Key: KAFKA-6782
                 URL: https://issues.apache.org/jira/browse/KAFKA-6782
             Project: Kafka
          Issue Type: Bug
          Components: streams
    Affects Versions: 1.0.1, 1.1.0
            Reporter: Lingxiao WANG


Some problem with https://issues.apache.org/jira/browse/KAFKA-6190, but his 
proposition :
{code:java}
while (offset < highWatermark) {
 final ConsumerRecords<byte[], byte[]> records = consumer.poll(100);
 for (ConsumerRecord<byte[], byte[]> record : records) {
 if (record.key() != null) {
   stateRestoreCallback.restore(record.key(), record.value());
 }
 offset = consumer.position(topicPartition);
 }
 }{code}
doesn't work for me. In my situation, there are chance to have several 
transaction markers appear in sequence in one partition. In this case, the 
consumer is blocked and can't poll any records, and the code 'offset = 
consumer.position(topicPartition)' doesn't have any opportunity to execute.

 So I propose to move the code 'offset = consumer.position(topicPartition)' 
outside of the cycle to guarantee that event if no records are polled, the 
offset can always be updated.
{code:java}
while (offset < highWatermark) {
 final ConsumerRecords<byte[], byte[]> records = consumer.poll(100);
 for (ConsumerRecord<byte[], byte[]> record : records) {
 if (record.key() != null) {
   stateRestoreCallback.restore(record.key(), record.value());
 }
 }
 offset = consumer.position(topicPartition);
 }{code}
 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to