Hi Jun. I did find a way to process by batch, but it probably reaches a little too deep into the internals of kafka?
FetchRequest request = new FetchRequest("topic", partitionNumber, requestOffset, bufferSize); ByteBufferMessageSet messageSet = simpleConsumer.fetch(request); Iterator<MessageAndOffset> batchIterator = messageSet.underlying().shallowIterator(); while (batchIterator.hasNext()) { MessageAndOffset messageAndOffset = batchIterator.next(); Message batchMessage = messageAndOffset.message(); long offset = messageAndOffset.offset(); Iterator<MessageAndOffset> messages = CompressionUtils.decompress(batchMessage).iterator(); // process the batch of messages and persist with the offset } This should work ok, but I am concerned that it is using internal kafka classes. The code has to reach into the underlying (scala) ByteBufferMessageSet because shallowIterator is not exposed by the java variant. The code also has to understand that the message is potentially compressed and then call CompressionUtils. How likely is the above approach to work with subsequent releases? Is it worth exposing the concept of batches in ByteBufferMessageSet to make it explicit? eg ByteBufferMessageSet.batchIterator : BatchMessage where BatchMessage is a simple extension of Message that has an additional method to allow getting a ByteBufferMessageSet (ie. wraps the call to CompressionUtils). Thoughts? Thanks, Ross On 1 June 2012 14:51, Jun Rao <jun...@gmail.com> wrote: > Ross, > > With compression enabled, it's a bit hard to implement exact-once since > offsets are only advanced after a compressed batch of messages has been > consumed. So, you will have to make sure that each batch of messages can be > consumed together as a unit. The other option is to compress with a batch > size of 1. > > Thanks, > > Jun > > On Thu, May 31, 2012 at 8:05 PM, Ross Black <ross.w.bl...@gmail.com> > wrote: > > > Hi, > > > > Using SimpleConsumer, I get the offset of a message (from > MessageAndOffset) > > and persist it with my consumer data to get exactly-once semantics for > > consumer state (as described in the kafka design docs). If the consumer > > fails then it is simply a matter of starting replay of messages from the > > persisted index. > > > > When using compression, the offset from MessageAndOffset appears to be > the > > offset of the compressed batch. e.g. For a batch of 10 messages, the > > offset returned for messages 1-9 is the start of the *current* batch, and > > the offset for message 10 is the start of the *next* batch. > > > > How can I get the exactly-once semantics for consumer state? > > Is there a way that I can get a batch of messages from SimpleConsumer? > > (otherwise I have to reconstruct a batch by watching for a change in the > > offset between messages) > > > > Thanks, > > Ross > > >