Hi Jun,

I modified the UpdateOffsetsInZK so that it could take a custom offset and
set it not sure of the consequences though.
The ZKConsumer can now get the messages from the desired offset , So Its
working as of now need to test more though .


Modified method
----------------------------------
 def getAndSetOffsets(zkClient: ZkClient, offsetOption: String, config:
ConsumerConfig, topic: String): Unit = {
    val cluster = ZkUtils.getCluster(zkClient)
    val partitionsPerTopicMap = ZkUtils.getPartitionsForTopics(zkClient,
List(topic).iterator)
    var partitions: List[String] = Nil

    partitionsPerTopicMap.get(topic) match {
      case Some(l) =>  partitions = l.sortWith((s,t) => s < t)
      case _ => throw new RuntimeException("Can't find topic " + topic)
    }

    var numParts = 0
    for (partString <- partitions) {
      val part = Partition.parse(partString)
      val broker = cluster.getBroker(part.brokerId) match {
        case Some(b) => b
        case None => throw new IllegalStateException("Broker " +
part.brokerId + " is unavailable. Cannot issue " +
          "getOffsetsBefore request")
      }

      val topicDirs = new ZKGroupTopicDirs(config.groupId, topic)

      println("updating partition " + part.name + " with new offset: " +
offsetOption)
      ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir +
"/" + part.name, offsetOption)
      numParts += 1
    }
    println("updated the offset for " + numParts + " partitions")
  }

Thanks
Ajit Koti





On Thu, Sep 13, 2012 at 8:46 AM, Ajit Koti <ajitk...@gmail.com> wrote:

> Hi Jun,
>
> Was trying out this last option or else would use the Simpleconsumer.
>
> I am tryting to set the offset of my choice using
>
> getZkClinet().createPersistent("/consumers/test_group/offsets/golf4/0-0",
> "22274");
>
> or
>  
> ZkUtils.updatePersistentPath(getZkClinet(),"/consumers/test_group/offsets/golf4/0-0",
> "22274");
>
> But was getting this exception
>
>  ZookeeperConsumerConnector:66 -
> test_group_ajits-machine-1347505711889-9df96cdb exception during rebalance
> java.lang.NumberFormatException: For input string: "��
> at
> java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
> at java.lang.Long.parseLong(Long.java:441)
>  at java.lang.Long.parseLong(Long.java:483)
> at scala.collection.immutable.StringLike$class.toLong(StringLike.scala:232)
>  at scala.collection.immutable.StringOps.toLong(StringOps.scala:31)
> at
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$addPartitionTopicInfo(ZookeeperConsumerConnector.scala:644)
>  at
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance$2$$anonfun$apply$11$$anonfun$apply$13.apply(ZookeeperConsumerConnector.scala:523)
>  at
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance$2$$anonfun$apply$11$$anonfun$apply$13.apply(ZookeeperConsumerConnector.scala:520)
>  at scala.collection.immutable.Range.foreach(Range.scala:78)
> at
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance$2$$anonfun$apply$11.apply(ZookeeperConsumerConnector.scala:520)
>  at
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance$2$$anonfun$apply$11.apply(ZookeeperConsumerConnector.scala:507)
>  at scala.collection.mutable.HashSet.foreach(HashSet.scala:72)
> at
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance$2.apply(ZookeeperConsumerConnector.scala:507)
>  at
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance$2.apply(ZookeeperConsumerConnector.scala:494)
>  at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:93)
> at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:93)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:652)
> at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:157)
>  at
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:190)
> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:43)
>
>
>
> Anything I could do here other way to resolve this issue
>
> Thanks
> Ajit Koti
>
>
> On Wed, Sep 12, 2012 at 9:50 AM, Jun Rao <jun...@gmail.com> wrote:
>
>> If you also need this for Hadoop, then it's more difficult. Unfortunately,
>> we don't have an api for changing offsets programmatically right now. We
>> will revisit this post 0.8.0.
>>
>> Thanks,
>>
>> Jun
>>
>> On Tue, Sep 11, 2012 at 8:32 AM, Ajit Koti <ajitk...@gmail.com> wrote:
>>
>> > Hi Jun,
>> >
>> > By keeping the log segment size and the log retention size to same value
>> > means I will lose out messages right,which I dont want to happen.
>> > As the Hadoop consumer would not have consumed the message
>> > and persisted them into database.
>> >
>> > Was trying to do this that is
>> > set the autooffset to largest and was trying to delete the corresponding
>> > offsets
>> >
>> >
>> >
>> http://mail-archives.apache.org/mod_mbox/incubator-kafka-users/201202.mbox/%3ccalxo6kityw_xtbvxkkv2x31wynxfwoz47o-e6dbfo21msmq...@mail.gmail.com%3E
>> >
>> > Not particularly sure how to delete the offsets
>> > using the zkutils was trying to delete this path  before consuming any
>> > messages.
>> > /consumers/[group_id]/offsets/[topic]/[broker_id-partition_id].
>> >
>> > Not sure about right approach .
>> >
>> >
>> > Thanks
>> > Ajit
>> >
>> > On Tue, Sep 11, 2012 at 8:05 PM, Jun Rao <jun...@gmail.com> wrote:
>> >
>> > > That's an interesting use case and it is not what the ZK consumer is
>> > really
>> > > designed for. First, having a topic per user may not be ideal,
>> especially
>> > > if you have millions of users. Second, the ZK consumer is designed for
>> > > continuously consuming data, not skipping data all the time.
>> > >
>> > > With that said, you can probably try the following trick and see if it
>> > > works. Set both the log segment size and the log retention size to a
>> > value
>> > > that matches the size of about 100 messages. This should make sure
>> that
>> > at
>> > > any given point time, only the most recent 100 messages are kept in
>> the
>> > > broker. Set autooffset.reset to earliest. This will make sure that if
>> the
>> > > consumer's offset is out of range, it will automatically switch to the
>> > > current smallest offset.
>> > >
>> > > Thanks,
>> > >
>> > > Jun
>> > >
>> > > On Mon, Sep 10, 2012 at 9:07 PM, Ajit Koti <ajitk...@gmail.com>
>> wrote:
>> > >
>> > > > Hi,
>> > > >
>> > > > No Consumers are not falling behind .
>> > > >   What happens is the users of the consumers don't fetch messages
>> for
>> > > > certain period of time and when then the users want to fetch the
>> want
>> > to
>> > > > fetch the latest messages from the topic.
>> > > >
>> > > > For example in case of displaying news feed each user has got its
>> own
>> > > topic
>> > > > , if the users don't login for couple of day
>> > > > he wouldn't consume the messages from the topic.But when he logs in
>> he
>> > > want
>> > > > to get the latest update for which we have to fetch the latest
>> messages
>> > > > from the topic.
>> > > >
>> > > > So is it possible to use the zkconsumer and set the offset to last
>> 100
>> > > > messages and then fetch them.
>> > > >
>> > > > Thanks
>> > > > Ajit Koti
>> > > >
>> > > >
>> > > > On Tue, Sep 11, 2012 at 9:04 AM, Jun Rao <jun...@gmail.com> wrote:
>> > > >
>> > > > > Is consuming falling behind? Can you have more consumers in the
>> same
>> > > > group
>> > > > > to increase the consumption throughput?
>> > > > >
>> > > > > Thanks,
>> > > > >
>> > > > > Jun
>> > > > >
>> > > > > On Mon, Sep 10, 2012 at 12:22 PM, Ajit Koti <ajitk...@gmail.com>
>> > > wrote:
>> > > > >
>> > > > > > Hi All ,
>> > > > > >
>> > > > > > Was happy consuming messages from the consumer .
>> > > > > > Recently got a new requirement  where I have to process the
>> latest
>> > > > > message
>> > > > > > always .
>> > > > > > Is there anyways I can fetch the latest 100 messages.I know
>> apart
>> > > from
>> > > > > > simple consumer I cannot specify the offset .
>> > > > > > But still wondering is there any way to start consuming from a
>> > > > particular
>> > > > > > offset.
>> > > > > >
>> > > > > >
>> > > > > > Note : I have only one consumer per topic.
>> > > > > >
>> > > > > >
>> > > > > >
>> > > > > > Thanks
>> > > > > > Ajit
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>
>

Reply via email to