Dave, Kafka internally keeps track the valid offsets. You just need to call commitOffsets() for the offsets to be checkpointed in ZK. You don't need to know the offset directly.
Thanks, Jun On Thu, Sep 22, 2011 at 3:37 PM, Dave Fayram <dfay...@gmail.com> wrote: > Hi, I'm trying to write a system that takes a stream of many many > small events (a N-way split of a kafka topic over partitions) and > compress these into one larger interval, which is then flushed to a > write-through cache. For consistency, I'd like to turn off the > autocommit of the partition and only update the read offset when I > actually flush the aggregated result down to a write-through cache. > For durability, I'd also like to record the offset as part of the > record I flush. > > But I am having trouble trying to figure out exactly how to do this > with the Scala API. It seems like the client code can't easily get > access to this data. I know that the Hadoop consumer does it > (according to the other thread on offsets and message lengths), but I > can't really figure out how. > > Any advice on how to do this? I thought I could use > ZookeeperConsumerStats, but this class seemes to have been removed. > > -- > -- > Dave Fayram > dfay...@gmail.com >