Hi Jun, Thanks. I will stick with using the deep iterator then to avoid any internal changes.
Are you able to comment on http://mail-archives.apache.org/mod_mbox/incubator-kafka-users/201205.mbox/%3CCAM%2BbZhhjGSDuR9_4-rgbTx3tZ4B%2BHscjX%2B6STXp9kLZUVnj0PQ%40mail.gmail.com%3E? In particular I just wanted to check whether SyncProducer, AsyncProducer, and SimpleConsumer and considered part of the "public" API so that they do not disappear? Thanks, Ross On 3 June 2012 02:19, Jun Rao <jun...@gmail.com> wrote: > You can get the same offset using deep iterator. Whenever the offset > increases, you know you have crossed the compressed unit. > > Jun > > On Sat, Jun 2, 2012 at 12:18 AM, Ross Black <ross.w.bl...@gmail.com> > wrote: > > > Hi Jun, > > > > The only reason I would like the compressed messages exposed is so that I > > know the boundary to be able to safely persist my state with the offset. > > Is there a better way to achieve that? > > In my (probably poor) example attempt to expose batch messages, the only > > things you can do with a compressed message set are - get the offset, get > > the serialized form, and iterate over the contained messages. > > > > Is kafka attempting to support exactly-once semantics? If so, it would > seem > > that something needs to be exposed in the API to make it a bit more > > explicit than having to keep track of offsets changing for individual > > messages. > > > > Thanks, > > Ross > > > > > > > > On 2 June 2012 14:19, Jun Rao <jun...@gmail.com> wrote: > > > > > Ross, > > > > > > The shallow iterator is intended for efficient mirroring btw kafka > > > clusters. Not sure if it's a good idea to expose it as an external api. > > > Note that you can really can't do much on a compressed message set > other > > > than store it as raw bytes somewhere else. > > > > > > Thanks, > > > > > > Jun > > > > > > On Thu, May 31, 2012 at 11:32 PM, Ross Black <ross.w.bl...@gmail.com> > > > wrote: > > > > > > > Hi Jun. > > > > > > > > I did find a way to process by batch, but it probably reaches a > little > > > too > > > > deep into the internals of kafka? > > > > > > > > FetchRequest request = new FetchRequest("topic", > > > > partitionNumber, requestOffset, bufferSize); > > > > ByteBufferMessageSet messageSet = > > > simpleConsumer.fetch(request); > > > > Iterator<MessageAndOffset> batchIterator = > > > > messageSet.underlying().shallowIterator(); > > > > while (batchIterator.hasNext()) { > > > > MessageAndOffset messageAndOffset = > > batchIterator.next(); > > > > Message batchMessage = messageAndOffset.message(); > > > > long offset = messageAndOffset.offset(); > > > > Iterator<MessageAndOffset> messages = > > > > CompressionUtils.decompress(batchMessage).iterator(); > > > > // process the batch of messages and persist with the > > > offset > > > > } > > > > > > > > This should work ok, but I am concerned that it is using internal > kafka > > > > classes. The code has to reach into the underlying (scala) > > > > ByteBufferMessageSet because shallowIterator is not exposed by the > java > > > > variant. The code also has to understand that the message is > > potentially > > > > compressed and then call CompressionUtils. > > > > > > > > How likely is the above approach to work with subsequent releases? > > > > Is it worth exposing the concept of batches in ByteBufferMessageSet > to > > > make > > > > it explicit? > > > > > > > > eg ByteBufferMessageSet.batchIterator : BatchMessage > > > > where BatchMessage is a simple extension of Message that has an > > > additional > > > > method to allow getting a ByteBufferMessageSet (ie. wraps the call to > > > > CompressionUtils). > > > > > > > > > > > > Thoughts? > > > > > > > > Thanks, > > > > Ross > > > > > > > > > > > > > > > > On 1 June 2012 14:51, Jun Rao <jun...@gmail.com> wrote: > > > > > > > > > Ross, > > > > > > > > > > With compression enabled, it's a bit hard to implement exact-once > > since > > > > > offsets are only advanced after a compressed batch of messages has > > been > > > > > consumed. So, you will have to make sure that each batch of > messages > > > can > > > > be > > > > > consumed together as a unit. The other option is to compress with a > > > batch > > > > > size of 1. > > > > > > > > > > Thanks, > > > > > > > > > > Jun > > > > > > > > > > On Thu, May 31, 2012 at 8:05 PM, Ross Black < > ross.w.bl...@gmail.com> > > > > > wrote: > > > > > > > > > > > Hi, > > > > > > > > > > > > Using SimpleConsumer, I get the offset of a message (from > > > > > MessageAndOffset) > > > > > > and persist it with my consumer data to get exactly-once > semantics > > > for > > > > > > consumer state (as described in the kafka design docs). If the > > > > consumer > > > > > > fails then it is simply a matter of starting replay of messages > > from > > > > the > > > > > > persisted index. > > > > > > > > > > > > When using compression, the offset from MessageAndOffset appears > to > > > be > > > > > the > > > > > > offset of the compressed batch. e.g. For a batch of 10 messages, > > the > > > > > > offset returned for messages 1-9 is the start of the *current* > > batch, > > > > and > > > > > > the offset for message 10 is the start of the *next* batch. > > > > > > > > > > > > How can I get the exactly-once semantics for consumer state? > > > > > > Is there a way that I can get a batch of messages from > > > SimpleConsumer? > > > > > > (otherwise I have to reconstruct a batch by watching for a change > > in > > > > the > > > > > > offset between messages) > > > > > > > > > > > > Thanks, > > > > > > Ross > > > > > > > > > > > > > > > > > > > > >