Thanks Guozhang.


Yuanjia Li
 
From: Guozhang Wang
Date: 2016-07-26 07:06
To: users@kafka.apache.org
Subject: Re: Re: KafkaConsumer position block
Hi Yuanjia,
 
If the consumer has just been created and there is no metadata in it yet,
seeking to the latest offset would require at least two round-trips to the
broker to first get the metadata of the partitions, and then get the
offsets from the partition hosted brokers. Note that consumer.assign() and
consumer.seekToEnd() does not trigger any requests to the brokers, i.e.
they are "lazily evaluated", and only consumer.position() will trigger
sending the request for offsets and wait for the response, and I think that
is what you have observed.
 
Regarding the delete topic issue, it seems like topic "test-topic" was not
exist when the delete request was sent. You may want to wipe-out your
testing broker cluster, set the enable delete config right, and re-try to
see if this issue is re-producible.
 
 
Guozhang
 
 
 
On Thu, Jul 21, 2016 at 11:52 PM, yuanjia8...@163.com <yuanjia8...@163.com>
wrote:
 
> Hi Guozhang,
>
> I want to get the latest offset, code as follows:
> consumer.assign(topicPartitionList);
> consumer.seekToEnd(topicPartitionList);
> long offset = consumer.position(topicPartition);
>
> I note that the topic is marked for deletion but "delete.topic.enable" is
> not set to true.
> Maybe it cause that?
>
> After set delete.topic.enable to true and restart, I encounter another
> question, log as follows:
> [2016-07-21 18:25:27,740] INFO [delete-topics-thread-61], Starting
> (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
> [2016-07-21 18:25:27,740] INFO [Controller 61]: Controller startup
> complete (kafka.controller.KafkaController)
> [2016-07-21 18:25:27,801] INFO [delete-topics-thread-61], Handling
> deletion for topics test-topic
> (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
> [2016-07-21 18:25:27,806] ERROR [delete-topics-thread-61], Error due to
> (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
> java.util.NoSuchElementException: key not found: test-topic
>         at scala.collection.MapLike$class.default(MapLike.scala:228)
>         at scala.collection.AbstractMap.default(Map.scala:59)
>         at scala.collection.mutable.HashMap.apply(HashMap.scala:65)
>         at
> kafka.controller.PartitionStateMachine.deregisterPartitionChangeListener(PartitionStateMachine.scala:387)
>         at
> kafka.controller.TopicDeletionManager.kafka$controller$TopicDeletionManager$$completeDeleteTopic(TopicDeletionManager.scala:282)
>         at
> kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1$$anonfun$apply$mcV$sp$4.apply(TopicDeletionManager.scala:407)
>         at
> kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1$$anonfun$apply$mcV$sp$4.apply(TopicDeletionManager.scala:403)
>         at scala.collection.immutable.Set$Set1.foreach(Set.scala:94)
>         at
> kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply$mcV$sp(TopicDeletionManager.scala:403)
>         at
> kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply(TopicDeletionManager.scala:397)
>         at
> kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply(TopicDeletionManager.scala:397)
>         at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:231)
>         at
> kafka.controller.TopicDeletionManager$DeleteTopicsThread.doWork(TopicDeletionManager.scala:397)
>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> [2016-07-21 18:25:27,809] INFO [delete-topics-thread-61], Stopped
> (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
>
>
>
> YuanJia Li
>
> From: Guozhang Wang
> Date: 2016-07-22 02:07
> To: users@kafka.apache.org
> Subject: Re: KafkaConsumer position block
> Hello Yuanjia,
>
> Could you share your code example on calling consumer.position()? Is the
> partition that you are getting the offset from assigned to the consumer?
>
>
> Guozhang
>
> On Wed, Jul 20, 2016 at 11:50 PM, yuanjia8...@163.com <yuanjia8...@163.com
> >
> wrote:
>
> > Hi,
> >      With kafka-clients-0.10.0.0, I use KafkaConsumer.position() to get
> > the offset, the process block in
> ConsumerNetworkClient.awaitMetadataUpdate.
> > Block until the meadata has been refreshed.
> >      My questions are:
> >      1. Why the metadata not refresh?
> >      2. Could it use timeout or throw exception instead of block?
> >
> >      Thanks.
> >
> >
> >
> > YuanJia Li
> >
>
>
>
> --
> -- Guozhang
>
 
 
 
-- 
-- Guozhang

Reply via email to