Thanks Guozhang.
Yuanjia Li From: Guozhang Wang Date: 2016-07-26 07:06 To: users@kafka.apache.org Subject: Re: Re: KafkaConsumer position block Hi Yuanjia, If the consumer has just been created and there is no metadata in it yet, seeking to the latest offset would require at least two round-trips to the broker to first get the metadata of the partitions, and then get the offsets from the partition hosted brokers. Note that consumer.assign() and consumer.seekToEnd() does not trigger any requests to the brokers, i.e. they are "lazily evaluated", and only consumer.position() will trigger sending the request for offsets and wait for the response, and I think that is what you have observed. Regarding the delete topic issue, it seems like topic "test-topic" was not exist when the delete request was sent. You may want to wipe-out your testing broker cluster, set the enable delete config right, and re-try to see if this issue is re-producible. Guozhang On Thu, Jul 21, 2016 at 11:52 PM, yuanjia8...@163.com <yuanjia8...@163.com> wrote: > Hi Guozhang, > > I want to get the latest offset, code as follows: > consumer.assign(topicPartitionList); > consumer.seekToEnd(topicPartitionList); > long offset = consumer.position(topicPartition); > > I note that the topic is marked for deletion but "delete.topic.enable" is > not set to true. > Maybe it cause that? > > After set delete.topic.enable to true and restart, I encounter another > question, log as follows: > [2016-07-21 18:25:27,740] INFO [delete-topics-thread-61], Starting > (kafka.controller.TopicDeletionManager$DeleteTopicsThread) > [2016-07-21 18:25:27,740] INFO [Controller 61]: Controller startup > complete (kafka.controller.KafkaController) > [2016-07-21 18:25:27,801] INFO [delete-topics-thread-61], Handling > deletion for topics test-topic > (kafka.controller.TopicDeletionManager$DeleteTopicsThread) > [2016-07-21 18:25:27,806] ERROR [delete-topics-thread-61], Error due to > (kafka.controller.TopicDeletionManager$DeleteTopicsThread) > java.util.NoSuchElementException: key not found: test-topic > at scala.collection.MapLike$class.default(MapLike.scala:228) > at scala.collection.AbstractMap.default(Map.scala:59) > at scala.collection.mutable.HashMap.apply(HashMap.scala:65) > at > kafka.controller.PartitionStateMachine.deregisterPartitionChangeListener(PartitionStateMachine.scala:387) > at > kafka.controller.TopicDeletionManager.kafka$controller$TopicDeletionManager$$completeDeleteTopic(TopicDeletionManager.scala:282) > at > kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1$$anonfun$apply$mcV$sp$4.apply(TopicDeletionManager.scala:407) > at > kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1$$anonfun$apply$mcV$sp$4.apply(TopicDeletionManager.scala:403) > at scala.collection.immutable.Set$Set1.foreach(Set.scala:94) > at > kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply$mcV$sp(TopicDeletionManager.scala:403) > at > kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply(TopicDeletionManager.scala:397) > at > kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply(TopicDeletionManager.scala:397) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:231) > at > kafka.controller.TopicDeletionManager$DeleteTopicsThread.doWork(TopicDeletionManager.scala:397) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63) > [2016-07-21 18:25:27,809] INFO [delete-topics-thread-61], Stopped > (kafka.controller.TopicDeletionManager$DeleteTopicsThread) > > > > YuanJia Li > > From: Guozhang Wang > Date: 2016-07-22 02:07 > To: users@kafka.apache.org > Subject: Re: KafkaConsumer position block > Hello Yuanjia, > > Could you share your code example on calling consumer.position()? Is the > partition that you are getting the offset from assigned to the consumer? > > > Guozhang > > On Wed, Jul 20, 2016 at 11:50 PM, yuanjia8...@163.com <yuanjia8...@163.com > > > wrote: > > > Hi, > > With kafka-clients-0.10.0.0, I use KafkaConsumer.position() to get > > the offset, the process block in > ConsumerNetworkClient.awaitMetadataUpdate. > > Block until the meadata has been refreshed. > > My questions are: > > 1. Why the metadata not refresh? > > 2. Could it use timeout or throw exception instead of block? > > > > Thanks. > > > > > > > > YuanJia Li > > > > > > -- > -- Guozhang > -- -- Guozhang