[jira] [Created] (KAFKA-8520) TimeoutException in client side doesn't have stack trace
Shixiong Zhu created KAFKA-8520: --- Summary: TimeoutException in client side doesn't have stack trace Key: KAFKA-8520 URL: https://issues.apache.org/jira/browse/KAFKA-8520 Project: Kafka Issue Type: New Feature Components: clients Reporter: Shixiong Zhu When a TimeoutException is thrown directly in the client side, it doesn't have any stack trace because it inherits "org.apache.kafka.common.errors.ApiException". This makes the user hard to debug timeout issues, because it's hard to know which line in the user codes throwing this TimeoutException. It would be great that adding a new client side TimeoutException which contains the stack trace. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8258) Verbose logs in org.apache.kafka.clients.consumer.internals.Fetcher
Shixiong Zhu created KAFKA-8258: --- Summary: Verbose logs in org.apache.kafka.clients.consumer.internals.Fetcher Key: KAFKA-8258 URL: https://issues.apache.org/jira/browse/KAFKA-8258 Project: Kafka Issue Type: Bug Affects Versions: 1.1.0 Reporter: Shixiong Zhu We noticed that the Spark's Kafka connector outputs a lot of following verbose logs: {code} 19/04/18 04:31:06 INFO Fetcher: [Consumer clientId=consumer-2, groupId=...] Resetting offset for partition ... to offset {code} It comes from https://github.com/hachikuji/kafka/blob/76c796ca128c3c97231f3ebda994a07bb06b26aa/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L567 This log was added in https://github.com/apache/kafka/pull/4557 In Spark, we use `seekToEnd` to discover latest offsets of a topic. If there are thousands of partitions in this topic, it will output thousands of INFO logs. Is it intentional? If not, can we change it to DEBUG? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7703) KafkaConsumer.position may return a wrong offset after "seekToEnd" is called
Shixiong Zhu created KAFKA-7703: --- Summary: KafkaConsumer.position may return a wrong offset after "seekToEnd" is called Key: KAFKA-7703 URL: https://issues.apache.org/jira/browse/KAFKA-7703 Project: Kafka Issue Type: Bug Components: clients Affects Versions: 2.1.0 Reporter: Shixiong Zhu After "seekToEnd" is called, "KafkaConsumer.position" may return a wrong offset set by another reset request. Here is a reproducer: https://github.com/zsxwing/kafka/commit/4e1aa11bfa99a38ac1e2cb0872c055db56b33246 In this reproducer, "poll(0)" will send an "earliest" request in background. However, after "seekToEnd" is called, due to a race condition in "Fetcher.resetOffsetIfNeeded" (It's not atomic, "seekToEnd" could happen between https://github.com/zsxwing/kafka/commit/4e1aa11bfa99a38ac1e2cb0872c055db56b33246#diff-b45245913eaae46aa847d2615d62cde0R585 and https://github.com/zsxwing/kafka/commit/4e1aa11bfa99a38ac1e2cb0872c055db56b33246#diff-b45245913eaae46aa847d2615d62cde0R605), "KafkaConsumer.position" may return an "earliest" offset. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-4879) KafkaConsumer.position may hang forever when deleting a topic
Shixiong Zhu created KAFKA-4879: --- Summary: KafkaConsumer.position may hang forever when deleting a topic Key: KAFKA-4879 URL: https://issues.apache.org/jira/browse/KAFKA-4879 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 0.10.2.0 Reporter: Shixiong Zhu KafkaConsumer.position may hang forever when deleting a topic. The problem is this line https://github.com/apache/kafka/blob/022bf129518e33e165f9ceefc4ab9e622952d3bd/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L374 The timeout is "Long.MAX_VALUE", and it will just retry forever for UnknownTopicOrPartitionException. Here is a reproducer {code} import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; import java.util.Collections; import java.util.Properties; import java.util.Set; public class KafkaReproducer { public static void main(String[] args) { // Make sure "delete.topic.enable" is set to true. // Please create the topic test with "3" partitions manually. // The issue is gone when there is only one partition. String topic = "test"; Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "testgroup"); props.put("value.deserializer", StringDeserializer.class.getName()); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("enable.auto.commit", "false"); KafkaConsumer kc = new KafkaConsumer(props); kc.subscribe(Collections.singletonList(topic)); kc.poll(0); Set partitions = kc.assignment(); System.out.println("partitions: " + partitions); kc.pause(partitions); kc.seekToEnd(partitions); System.out.println("please delete the topic in 30 seconds"); try { // Sleep 30 seconds to give us enough time to delete the topic. Thread.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("sleep end"); for (TopicPartition p : partitions) { System.out.println(p + " offset: " + kc.position(p)); } System.out.println("cannot reach here"); kc.close(); } } {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-1894) Avoid long or infinite blocking in the consumer
[ https://issues.apache.org/jira/browse/KAFKA-1894?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15651799#comment-15651799 ] Shixiong Zhu commented on KAFKA-1894: - Hit the same issue in Spark Structured Streaming Kafka Source. These loops should check the current thread's interrupted status. > Avoid long or infinite blocking in the consumer > --- > > Key: KAFKA-1894 > URL: https://issues.apache.org/jira/browse/KAFKA-1894 > Project: Kafka > Issue Type: Sub-task > Components: consumer >Reporter: Jay Kreps >Assignee: Jason Gustafson > Fix For: 0.10.2.0 > > > The new consumer has a lot of loops that look something like > {code} > while(!isThingComplete()) > client.poll(); > {code} > This occurs both in KafkaConsumer but also in NetworkClient.completeAll. > These retry loops are actually mostly the behavior we want but there are > several cases where they may cause problems: > - In the case of a hard failure we may hang for a long time or indefinitely > before realizing the connection is lost. > - In the case where the cluster is malfunctioning or down we may retry > forever. > It would probably be better to give a timeout to these. The proposed approach > would be to add something like retry.time.ms=6 and only continue retrying > for that period of time. -- This message was sent by Atlassian JIRA (v6.3.4#6332)