[jira] [Updated] (KAFKA-4879) KafkaConsumer.position may hang forever when deleting a topic
[ https://issues.apache.org/jira/browse/KAFKA-4879?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-4879: -- Fix Version/s: (was: 1.1.0) 2.0.0 > KafkaConsumer.position may hang forever when deleting a topic > - > > Key: KAFKA-4879 > URL: https://issues.apache.org/jira/browse/KAFKA-4879 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 0.10.2.0 >Reporter: Shixiong Zhu >Assignee: Jason Gustafson >Priority: Major > Fix For: 2.0.0 > > > KafkaConsumer.position may hang forever when deleting a topic. The problem is > this line > https://github.com/apache/kafka/blob/022bf129518e33e165f9ceefc4ab9e622952d3bd/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L374 > The timeout is "Long.MAX_VALUE", and it will just retry forever for > UnknownTopicOrPartitionException. > Here is a reproducer > {code} > import org.apache.kafka.clients.consumer.KafkaConsumer; > import org.apache.kafka.common.TopicPartition; > import org.apache.kafka.common.serialization.StringDeserializer; > import java.util.Collections; > import java.util.Properties; > import java.util.Set; > public class KafkaReproducer { > public static void main(String[] args) { > // Make sure "delete.topic.enable" is set to true. > // Please create the topic test with "3" partitions manually. > // The issue is gone when there is only one partition. > String topic = "test"; > Properties props = new Properties(); > props.put("bootstrap.servers", "localhost:9092"); > props.put("group.id", "testgroup"); > props.put("value.deserializer", StringDeserializer.class.getName()); > props.put("key.deserializer", StringDeserializer.class.getName()); > props.put("enable.auto.commit", "false"); > KafkaConsumer kc = new KafkaConsumer(props); > kc.subscribe(Collections.singletonList(topic)); > kc.poll(0); > Set partitions = kc.assignment(); > System.out.println("partitions: " + partitions); > kc.pause(partitions); > kc.seekToEnd(partitions); > System.out.println("please delete the topic in 30 seconds"); > try { > // Sleep 30 seconds to give us enough time to delete the topic. > Thread.sleep(3); > } catch (InterruptedException e) { > e.printStackTrace(); > } > System.out.println("sleep end"); > for (TopicPartition p : partitions) { > System.out.println(p + " offset: " + kc.position(p)); > } > System.out.println("cannot reach here"); > kc.close(); > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-4879) KafkaConsumer.position may hang forever when deleting a topic
[ https://issues.apache.org/jira/browse/KAFKA-4879?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma updated KAFKA-4879: --- Fix Version/s: (was: 1.0.0) 1.1.0 > KafkaConsumer.position may hang forever when deleting a topic > - > > Key: KAFKA-4879 > URL: https://issues.apache.org/jira/browse/KAFKA-4879 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 0.10.2.0 >Reporter: Shixiong Zhu >Assignee: Balint Molnar > Fix For: 1.1.0 > > > KafkaConsumer.position may hang forever when deleting a topic. The problem is > this line > https://github.com/apache/kafka/blob/022bf129518e33e165f9ceefc4ab9e622952d3bd/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L374 > The timeout is "Long.MAX_VALUE", and it will just retry forever for > UnknownTopicOrPartitionException. > Here is a reproducer > {code} > import org.apache.kafka.clients.consumer.KafkaConsumer; > import org.apache.kafka.common.TopicPartition; > import org.apache.kafka.common.serialization.StringDeserializer; > import java.util.Collections; > import java.util.Properties; > import java.util.Set; > public class KafkaReproducer { > public static void main(String[] args) { > // Make sure "delete.topic.enable" is set to true. > // Please create the topic test with "3" partitions manually. > // The issue is gone when there is only one partition. > String topic = "test"; > Properties props = new Properties(); > props.put("bootstrap.servers", "localhost:9092"); > props.put("group.id", "testgroup"); > props.put("value.deserializer", StringDeserializer.class.getName()); > props.put("key.deserializer", StringDeserializer.class.getName()); > props.put("enable.auto.commit", "false"); > KafkaConsumer kc = new KafkaConsumer(props); > kc.subscribe(Collections.singletonList(topic)); > kc.poll(0); > Set partitions = kc.assignment(); > System.out.println("partitions: " + partitions); > kc.pause(partitions); > kc.seekToEnd(partitions); > System.out.println("please delete the topic in 30 seconds"); > try { > // Sleep 30 seconds to give us enough time to delete the topic. > Thread.sleep(3); > } catch (InterruptedException e) { > e.printStackTrace(); > } > System.out.println("sleep end"); > for (TopicPartition p : partitions) { > System.out.println(p + " offset: " + kc.position(p)); > } > System.out.println("cannot reach here"); > kc.close(); > } > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-4879) KafkaConsumer.position may hang forever when deleting a topic
[ https://issues.apache.org/jira/browse/KAFKA-4879?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-4879: - *Reminder to the contributor / reviewer of the PR*: please note that the code deadline for 1.0.0 is less than 2 weeks away (Oct. 4th). Please re-evaluate your JIRA and see if it still makes sense to be merged into 1.0.0 or it could be pushed out to 1.1.0, or be closed directly if the JIRA itself is not valid any more, or re-assign yourself as contributor / committer if you are no longer working on the JIRA. > KafkaConsumer.position may hang forever when deleting a topic > - > > Key: KAFKA-4879 > URL: https://issues.apache.org/jira/browse/KAFKA-4879 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 0.10.2.0 >Reporter: Shixiong Zhu >Assignee: Balint Molnar > Fix For: 1.0.0 > > > KafkaConsumer.position may hang forever when deleting a topic. The problem is > this line > https://github.com/apache/kafka/blob/022bf129518e33e165f9ceefc4ab9e622952d3bd/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L374 > The timeout is "Long.MAX_VALUE", and it will just retry forever for > UnknownTopicOrPartitionException. > Here is a reproducer > {code} > import org.apache.kafka.clients.consumer.KafkaConsumer; > import org.apache.kafka.common.TopicPartition; > import org.apache.kafka.common.serialization.StringDeserializer; > import java.util.Collections; > import java.util.Properties; > import java.util.Set; > public class KafkaReproducer { > public static void main(String[] args) { > // Make sure "delete.topic.enable" is set to true. > // Please create the topic test with "3" partitions manually. > // The issue is gone when there is only one partition. > String topic = "test"; > Properties props = new Properties(); > props.put("bootstrap.servers", "localhost:9092"); > props.put("group.id", "testgroup"); > props.put("value.deserializer", StringDeserializer.class.getName()); > props.put("key.deserializer", StringDeserializer.class.getName()); > props.put("enable.auto.commit", "false"); > KafkaConsumer kc = new KafkaConsumer(props); > kc.subscribe(Collections.singletonList(topic)); > kc.poll(0); > Set partitions = kc.assignment(); > System.out.println("partitions: " + partitions); > kc.pause(partitions); > kc.seekToEnd(partitions); > System.out.println("please delete the topic in 30 seconds"); > try { > // Sleep 30 seconds to give us enough time to delete the topic. > Thread.sleep(3); > } catch (InterruptedException e) { > e.printStackTrace(); > } > System.out.println("sleep end"); > for (TopicPartition p : partitions) { > System.out.println(p + " offset: " + kc.position(p)); > } > System.out.println("cannot reach here"); > kc.close(); > } > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)