Hi Jun,

Thanks.  I will stick with using the deep iterator then to avoid any
internal changes.

Are you able to comment on
http://mail-archives.apache.org/mod_mbox/incubator-kafka-users/201205.mbox/%3CCAM%2BbZhhjGSDuR9_4-rgbTx3tZ4B%2BHscjX%2B6STXp9kLZUVnj0PQ%40mail.gmail.com%3E?

In particular I just wanted to check whether SyncProducer, AsyncProducer,
and SimpleConsumer and considered part of the "public" API so that they do
not disappear?

Thanks,
Ross


On 3 June 2012 02:19, Jun Rao <jun...@gmail.com> wrote:

> You can get the same offset using deep iterator. Whenever the offset
> increases, you know you have crossed the compressed unit.
>
> Jun
>
> On Sat, Jun 2, 2012 at 12:18 AM, Ross Black <ross.w.bl...@gmail.com>
> wrote:
>
> > Hi Jun,
> >
> > The only reason I would like the compressed messages exposed is so that I
> > know the boundary to be able to safely persist my state with the offset.
> > Is there a better way to achieve that?
> > In my (probably poor) example attempt to expose batch messages, the only
> > things you can do with a compressed message set are - get the offset, get
> > the serialized form, and iterate over the contained messages.
> >
> > Is kafka attempting to support exactly-once semantics? If so, it would
> seem
> > that something needs to be exposed in the API to make it a bit more
> > explicit than having to keep track of offsets changing for individual
> > messages.
> >
> > Thanks,
> > Ross
> >
> >
> >
> > On 2 June 2012 14:19, Jun Rao <jun...@gmail.com> wrote:
> >
> > > Ross,
> > >
> > > The shallow iterator is intended for efficient mirroring btw kafka
> > > clusters. Not sure if it's a good idea to expose it as an external api.
> > > Note that you can really can't do much on a compressed message set
> other
> > > than store it as raw bytes somewhere else.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Thu, May 31, 2012 at 11:32 PM, Ross Black <ross.w.bl...@gmail.com>
> > > wrote:
> > >
> > > > Hi Jun.
> > > >
> > > > I did find a way to process by batch, but it probably reaches a
> little
> > > too
> > > > deep into the internals of kafka?
> > > >
> > > >            FetchRequest request = new FetchRequest("topic",
> > > > partitionNumber, requestOffset, bufferSize);
> > > >            ByteBufferMessageSet messageSet =
> > > simpleConsumer.fetch(request);
> > > >            Iterator<MessageAndOffset> batchIterator =
> > > > messageSet.underlying().shallowIterator();
> > > >            while (batchIterator.hasNext()) {
> > > >                MessageAndOffset messageAndOffset =
> > batchIterator.next();
> > > >                Message batchMessage = messageAndOffset.message();
> > > >                long offset = messageAndOffset.offset();
> > > >                Iterator<MessageAndOffset> messages =
> > > > CompressionUtils.decompress(batchMessage).iterator();
> > > >                // process the batch of messages and persist with the
> > > offset
> > > >            }
> > > >
> > > > This should work ok, but I am concerned that it is using internal
> kafka
> > > > classes.  The code has to reach into the underlying (scala)
> > > > ByteBufferMessageSet because shallowIterator is not exposed by the
> java
> > > > variant.  The code also has to understand that the message is
> > potentially
> > > > compressed and then call CompressionUtils.
> > > >
> > > > How likely is the above approach to work with subsequent releases?
> > > > Is it worth exposing the concept of batches in ByteBufferMessageSet
> to
> > > make
> > > > it explicit?
> > > >
> > > > eg ByteBufferMessageSet.batchIterator : BatchMessage
> > > > where BatchMessage is a simple extension of Message that has an
> > > additional
> > > > method to allow getting a ByteBufferMessageSet (ie. wraps the call to
> > > > CompressionUtils).
> > > >
> > > >
> > > > Thoughts?
> > > >
> > > > Thanks,
> > > > Ross
> > > >
> > > >
> > > >
> > > > On 1 June 2012 14:51, Jun Rao <jun...@gmail.com> wrote:
> > > >
> > > > > Ross,
> > > > >
> > > > > With compression enabled, it's a bit hard to implement exact-once
> > since
> > > > > offsets are only advanced after a compressed batch of messages has
> > been
> > > > > consumed. So, you will have to make sure that each batch of
> messages
> > > can
> > > > be
> > > > > consumed together as a unit. The other option is to compress with a
> > > batch
> > > > > size of 1.
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Jun
> > > > >
> > > > > On Thu, May 31, 2012 at 8:05 PM, Ross Black <
> ross.w.bl...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Hi,
> > > > > >
> > > > > > Using SimpleConsumer, I get the offset of a message (from
> > > > > MessageAndOffset)
> > > > > > and persist it with my consumer data to get exactly-once
> semantics
> > > for
> > > > > > consumer state (as described in the kafka design docs).  If the
> > > > consumer
> > > > > > fails then it is simply a matter of starting replay of messages
> > from
> > > > the
> > > > > > persisted index.
> > > > > >
> > > > > > When using compression, the offset from MessageAndOffset appears
> to
> > > be
> > > > > the
> > > > > > offset of the compressed batch.  e.g. For a batch of 10 messages,
> > the
> > > > > > offset returned for messages 1-9 is the start of the *current*
> > batch,
> > > > and
> > > > > > the offset for message 10 is the start of the *next* batch.
> > > > > >
> > > > > > How can I get the exactly-once semantics for consumer state?
> > > > > > Is there a way that I can get a batch of messages from
> > > SimpleConsumer?
> > > > > > (otherwise I have to reconstruct a batch by watching for a change
> > in
> > > > the
> > > > > > offset between messages)
> > > > > >
> > > > > > Thanks,
> > > > > > Ross
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to