Sorry, I meant my example to be: eg ByteBufferMessageSet.batchIterator : Iterator<BatchMessage> where BatchMessage is a simple extension of Message that has an additional method to allow getting a ByteBufferMessageSet (ie. wraps the call to CompressionUtils).
On 1 June 2012 16:32, Ross Black <ross.w.bl...@gmail.com> wrote: > Hi Jun. > > I did find a way to process by batch, but it probably reaches a little too > deep into the internals of kafka? > > FetchRequest request = new FetchRequest("topic", > partitionNumber, requestOffset, bufferSize); > ByteBufferMessageSet messageSet = > simpleConsumer.fetch(request); > Iterator<MessageAndOffset> batchIterator = > messageSet.underlying().shallowIterator(); > while (batchIterator.hasNext()) { > MessageAndOffset messageAndOffset = batchIterator.next(); > Message batchMessage = messageAndOffset.message(); > long offset = messageAndOffset.offset(); > Iterator<MessageAndOffset> messages = > CompressionUtils.decompress(batchMessage).iterator(); > // process the batch of messages and persist with the > offset > } > > This should work ok, but I am concerned that it is using internal kafka > classes. The code has to reach into the underlying (scala) > ByteBufferMessageSet because shallowIterator is not exposed by the java > variant. The code also has to understand that the message is potentially > compressed and then call CompressionUtils. > > How likely is the above approach to work with subsequent releases? > Is it worth exposing the concept of batches in ByteBufferMessageSet to > make it explicit? > > eg ByteBufferMessageSet.batchIterator : BatchMessage > where BatchMessage is a simple extension of Message that has an additional > method to allow getting a ByteBufferMessageSet (ie. wraps the call to > CompressionUtils). > > > Thoughts? > > Thanks, > Ross > > > > > On 1 June 2012 14:51, Jun Rao <jun...@gmail.com> wrote: > >> Ross, >> >> With compression enabled, it's a bit hard to implement exact-once since >> offsets are only advanced after a compressed batch of messages has been >> consumed. So, you will have to make sure that each batch of messages can >> be >> consumed together as a unit. The other option is to compress with a batch >> size of 1. >> >> Thanks, >> >> Jun >> >> On Thu, May 31, 2012 at 8:05 PM, Ross Black <ross.w.bl...@gmail.com> >> wrote: >> >> > Hi, >> > >> > Using SimpleConsumer, I get the offset of a message (from >> MessageAndOffset) >> > and persist it with my consumer data to get exactly-once semantics for >> > consumer state (as described in the kafka design docs). If the consumer >> > fails then it is simply a matter of starting replay of messages from the >> > persisted index. >> > >> > When using compression, the offset from MessageAndOffset appears to be >> the >> > offset of the compressed batch. e.g. For a batch of 10 messages, the >> > offset returned for messages 1-9 is the start of the *current* batch, >> and >> > the offset for message 10 is the start of the *next* batch. >> > >> > How can I get the exactly-once semantics for consumer state? >> > Is there a way that I can get a batch of messages from SimpleConsumer? >> > (otherwise I have to reconstruct a batch by watching for a change in the >> > offset between messages) >> > >> > Thanks, >> > Ross >> > >> > >