Hi Jun, Was trying out this last option or else would use the Simpleconsumer.
I am tryting to set the offset of my choice using getZkClinet().createPersistent("/consumers/test_group/offsets/golf4/0-0", "22274"); or ZkUtils.updatePersistentPath(getZkClinet(),"/consumers/test_group/offsets/golf4/0-0", "22274"); But was getting this exception ZookeeperConsumerConnector:66 - test_group_ajits-machine-1347505711889-9df96cdb exception during rebalance java.lang.NumberFormatException: For input string: "�� at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) at java.lang.Long.parseLong(Long.java:441) at java.lang.Long.parseLong(Long.java:483) at scala.collection.immutable.StringLike$class.toLong(StringLike.scala:232) at scala.collection.immutable.StringOps.toLong(StringOps.scala:31) at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$addPartitionTopicInfo(ZookeeperConsumerConnector.scala:644) at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance$2$$anonfun$apply$11$$anonfun$apply$13.apply(ZookeeperConsumerConnector.scala:523) at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance$2$$anonfun$apply$11$$anonfun$apply$13.apply(ZookeeperConsumerConnector.scala:520) at scala.collection.immutable.Range.foreach(Range.scala:78) at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance$2$$anonfun$apply$11.apply(ZookeeperConsumerConnector.scala:520) at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance$2$$anonfun$apply$11.apply(ZookeeperConsumerConnector.scala:507) at scala.collection.mutable.HashSet.foreach(HashSet.scala:72) at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance$2.apply(ZookeeperConsumerConnector.scala:507) at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance$2.apply(ZookeeperConsumerConnector.scala:494) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:93) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:93) at scala.collection.Iterator$class.foreach(Iterator.scala:652) at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:157) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:190) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:43) Anything I could do here other way to resolve this issue Thanks Ajit Koti On Wed, Sep 12, 2012 at 9:50 AM, Jun Rao <jun...@gmail.com> wrote: > If you also need this for Hadoop, then it's more difficult. Unfortunately, > we don't have an api for changing offsets programmatically right now. We > will revisit this post 0.8.0. > > Thanks, > > Jun > > On Tue, Sep 11, 2012 at 8:32 AM, Ajit Koti <ajitk...@gmail.com> wrote: > > > Hi Jun, > > > > By keeping the log segment size and the log retention size to same value > > means I will lose out messages right,which I dont want to happen. > > As the Hadoop consumer would not have consumed the message > > and persisted them into database. > > > > Was trying to do this that is > > set the autooffset to largest and was trying to delete the corresponding > > offsets > > > > > > > http://mail-archives.apache.org/mod_mbox/incubator-kafka-users/201202.mbox/%3ccalxo6kityw_xtbvxkkv2x31wynxfwoz47o-e6dbfo21msmq...@mail.gmail.com%3E > > > > Not particularly sure how to delete the offsets > > using the zkutils was trying to delete this path before consuming any > > messages. > > /consumers/[group_id]/offsets/[topic]/[broker_id-partition_id]. > > > > Not sure about right approach . > > > > > > Thanks > > Ajit > > > > On Tue, Sep 11, 2012 at 8:05 PM, Jun Rao <jun...@gmail.com> wrote: > > > > > That's an interesting use case and it is not what the ZK consumer is > > really > > > designed for. First, having a topic per user may not be ideal, > especially > > > if you have millions of users. Second, the ZK consumer is designed for > > > continuously consuming data, not skipping data all the time. > > > > > > With that said, you can probably try the following trick and see if it > > > works. Set both the log segment size and the log retention size to a > > value > > > that matches the size of about 100 messages. This should make sure that > > at > > > any given point time, only the most recent 100 messages are kept in the > > > broker. Set autooffset.reset to earliest. This will make sure that if > the > > > consumer's offset is out of range, it will automatically switch to the > > > current smallest offset. > > > > > > Thanks, > > > > > > Jun > > > > > > On Mon, Sep 10, 2012 at 9:07 PM, Ajit Koti <ajitk...@gmail.com> wrote: > > > > > > > Hi, > > > > > > > > No Consumers are not falling behind . > > > > What happens is the users of the consumers don't fetch messages for > > > > certain period of time and when then the users want to fetch the want > > to > > > > fetch the latest messages from the topic. > > > > > > > > For example in case of displaying news feed each user has got its own > > > topic > > > > , if the users don't login for couple of day > > > > he wouldn't consume the messages from the topic.But when he logs in > he > > > want > > > > to get the latest update for which we have to fetch the latest > messages > > > > from the topic. > > > > > > > > So is it possible to use the zkconsumer and set the offset to last > 100 > > > > messages and then fetch them. > > > > > > > > Thanks > > > > Ajit Koti > > > > > > > > > > > > On Tue, Sep 11, 2012 at 9:04 AM, Jun Rao <jun...@gmail.com> wrote: > > > > > > > > > Is consuming falling behind? Can you have more consumers in the > same > > > > group > > > > > to increase the consumption throughput? > > > > > > > > > > Thanks, > > > > > > > > > > Jun > > > > > > > > > > On Mon, Sep 10, 2012 at 12:22 PM, Ajit Koti <ajitk...@gmail.com> > > > wrote: > > > > > > > > > > > Hi All , > > > > > > > > > > > > Was happy consuming messages from the consumer . > > > > > > Recently got a new requirement where I have to process the > latest > > > > > message > > > > > > always . > > > > > > Is there anyways I can fetch the latest 100 messages.I know apart > > > from > > > > > > simple consumer I cannot specify the offset . > > > > > > But still wondering is there any way to start consuming from a > > > > particular > > > > > > offset. > > > > > > > > > > > > > > > > > > Note : I have only one consumer per topic. > > > > > > > > > > > > > > > > > > > > > > > > Thanks > > > > > > Ajit > > > > > > > > > > > > > > > > > > > > >