Hi Jun, I modified the UpdateOffsetsInZK so that it could take a custom offset and set it not sure of the consequences though. The ZKConsumer can now get the messages from the desired offset , So Its working as of now need to test more though .
Modified method ---------------------------------- def getAndSetOffsets(zkClient: ZkClient, offsetOption: String, config: ConsumerConfig, topic: String): Unit = { val cluster = ZkUtils.getCluster(zkClient) val partitionsPerTopicMap = ZkUtils.getPartitionsForTopics(zkClient, List(topic).iterator) var partitions: List[String] = Nil partitionsPerTopicMap.get(topic) match { case Some(l) => partitions = l.sortWith((s,t) => s < t) case _ => throw new RuntimeException("Can't find topic " + topic) } var numParts = 0 for (partString <- partitions) { val part = Partition.parse(partString) val broker = cluster.getBroker(part.brokerId) match { case Some(b) => b case None => throw new IllegalStateException("Broker " + part.brokerId + " is unavailable. Cannot issue " + "getOffsetsBefore request") } val topicDirs = new ZKGroupTopicDirs(config.groupId, topic) println("updating partition " + part.name + " with new offset: " + offsetOption) ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + part.name, offsetOption) numParts += 1 } println("updated the offset for " + numParts + " partitions") } Thanks Ajit Koti On Thu, Sep 13, 2012 at 8:46 AM, Ajit Koti <ajitk...@gmail.com> wrote: > Hi Jun, > > Was trying out this last option or else would use the Simpleconsumer. > > I am tryting to set the offset of my choice using > > getZkClinet().createPersistent("/consumers/test_group/offsets/golf4/0-0", > "22274"); > > or > > ZkUtils.updatePersistentPath(getZkClinet(),"/consumers/test_group/offsets/golf4/0-0", > "22274"); > > But was getting this exception > > ZookeeperConsumerConnector:66 - > test_group_ajits-machine-1347505711889-9df96cdb exception during rebalance > java.lang.NumberFormatException: For input string: "�� > at > java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) > at java.lang.Long.parseLong(Long.java:441) > at java.lang.Long.parseLong(Long.java:483) > at scala.collection.immutable.StringLike$class.toLong(StringLike.scala:232) > at scala.collection.immutable.StringOps.toLong(StringOps.scala:31) > at > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$addPartitionTopicInfo(ZookeeperConsumerConnector.scala:644) > at > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance$2$$anonfun$apply$11$$anonfun$apply$13.apply(ZookeeperConsumerConnector.scala:523) > at > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance$2$$anonfun$apply$11$$anonfun$apply$13.apply(ZookeeperConsumerConnector.scala:520) > at scala.collection.immutable.Range.foreach(Range.scala:78) > at > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance$2$$anonfun$apply$11.apply(ZookeeperConsumerConnector.scala:520) > at > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance$2$$anonfun$apply$11.apply(ZookeeperConsumerConnector.scala:507) > at scala.collection.mutable.HashSet.foreach(HashSet.scala:72) > at > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance$2.apply(ZookeeperConsumerConnector.scala:507) > at > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance$2.apply(ZookeeperConsumerConnector.scala:494) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:93) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:93) > at scala.collection.Iterator$class.foreach(Iterator.scala:652) > at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:157) > at > scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:190) > at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:43) > > > > Anything I could do here other way to resolve this issue > > Thanks > Ajit Koti > > > On Wed, Sep 12, 2012 at 9:50 AM, Jun Rao <jun...@gmail.com> wrote: > >> If you also need this for Hadoop, then it's more difficult. Unfortunately, >> we don't have an api for changing offsets programmatically right now. We >> will revisit this post 0.8.0. >> >> Thanks, >> >> Jun >> >> On Tue, Sep 11, 2012 at 8:32 AM, Ajit Koti <ajitk...@gmail.com> wrote: >> >> > Hi Jun, >> > >> > By keeping the log segment size and the log retention size to same value >> > means I will lose out messages right,which I dont want to happen. >> > As the Hadoop consumer would not have consumed the message >> > and persisted them into database. >> > >> > Was trying to do this that is >> > set the autooffset to largest and was trying to delete the corresponding >> > offsets >> > >> > >> > >> http://mail-archives.apache.org/mod_mbox/incubator-kafka-users/201202.mbox/%3ccalxo6kityw_xtbvxkkv2x31wynxfwoz47o-e6dbfo21msmq...@mail.gmail.com%3E >> > >> > Not particularly sure how to delete the offsets >> > using the zkutils was trying to delete this path before consuming any >> > messages. >> > /consumers/[group_id]/offsets/[topic]/[broker_id-partition_id]. >> > >> > Not sure about right approach . >> > >> > >> > Thanks >> > Ajit >> > >> > On Tue, Sep 11, 2012 at 8:05 PM, Jun Rao <jun...@gmail.com> wrote: >> > >> > > That's an interesting use case and it is not what the ZK consumer is >> > really >> > > designed for. First, having a topic per user may not be ideal, >> especially >> > > if you have millions of users. Second, the ZK consumer is designed for >> > > continuously consuming data, not skipping data all the time. >> > > >> > > With that said, you can probably try the following trick and see if it >> > > works. Set both the log segment size and the log retention size to a >> > value >> > > that matches the size of about 100 messages. This should make sure >> that >> > at >> > > any given point time, only the most recent 100 messages are kept in >> the >> > > broker. Set autooffset.reset to earliest. This will make sure that if >> the >> > > consumer's offset is out of range, it will automatically switch to the >> > > current smallest offset. >> > > >> > > Thanks, >> > > >> > > Jun >> > > >> > > On Mon, Sep 10, 2012 at 9:07 PM, Ajit Koti <ajitk...@gmail.com> >> wrote: >> > > >> > > > Hi, >> > > > >> > > > No Consumers are not falling behind . >> > > > What happens is the users of the consumers don't fetch messages >> for >> > > > certain period of time and when then the users want to fetch the >> want >> > to >> > > > fetch the latest messages from the topic. >> > > > >> > > > For example in case of displaying news feed each user has got its >> own >> > > topic >> > > > , if the users don't login for couple of day >> > > > he wouldn't consume the messages from the topic.But when he logs in >> he >> > > want >> > > > to get the latest update for which we have to fetch the latest >> messages >> > > > from the topic. >> > > > >> > > > So is it possible to use the zkconsumer and set the offset to last >> 100 >> > > > messages and then fetch them. >> > > > >> > > > Thanks >> > > > Ajit Koti >> > > > >> > > > >> > > > On Tue, Sep 11, 2012 at 9:04 AM, Jun Rao <jun...@gmail.com> wrote: >> > > > >> > > > > Is consuming falling behind? Can you have more consumers in the >> same >> > > > group >> > > > > to increase the consumption throughput? >> > > > > >> > > > > Thanks, >> > > > > >> > > > > Jun >> > > > > >> > > > > On Mon, Sep 10, 2012 at 12:22 PM, Ajit Koti <ajitk...@gmail.com> >> > > wrote: >> > > > > >> > > > > > Hi All , >> > > > > > >> > > > > > Was happy consuming messages from the consumer . >> > > > > > Recently got a new requirement where I have to process the >> latest >> > > > > message >> > > > > > always . >> > > > > > Is there anyways I can fetch the latest 100 messages.I know >> apart >> > > from >> > > > > > simple consumer I cannot specify the offset . >> > > > > > But still wondering is there any way to start consuming from a >> > > > particular >> > > > > > offset. >> > > > > > >> > > > > > >> > > > > > Note : I have only one consumer per topic. >> > > > > > >> > > > > > >> > > > > > >> > > > > > Thanks >> > > > > > Ajit >> > > > > > >> > > > > >> > > > >> > > >> > >> > >