Sorry, I meant my example to be:

eg ByteBufferMessageSet.batchIterator : Iterator<BatchMessage>
where BatchMessage is a simple extension of Message that has an additional
method to allow getting a ByteBufferMessageSet (ie. wraps the call to
CompressionUtils).


On 1 June 2012 16:32, Ross Black <ross.w.bl...@gmail.com> wrote:

> Hi Jun.
>
> I did find a way to process by batch, but it probably reaches a little too
> deep into the internals of kafka?
>
>             FetchRequest request = new FetchRequest("topic",
> partitionNumber, requestOffset, bufferSize);
>             ByteBufferMessageSet messageSet =
> simpleConsumer.fetch(request);
>             Iterator<MessageAndOffset> batchIterator =
> messageSet.underlying().shallowIterator();
>             while (batchIterator.hasNext()) {
>                 MessageAndOffset messageAndOffset = batchIterator.next();
>                 Message batchMessage = messageAndOffset.message();
>                 long offset = messageAndOffset.offset();
>                 Iterator<MessageAndOffset> messages =
> CompressionUtils.decompress(batchMessage).iterator();
>                 // process the batch of messages and persist with the
> offset
>             }
>
> This should work ok, but I am concerned that it is using internal kafka
> classes.  The code has to reach into the underlying (scala)
> ByteBufferMessageSet because shallowIterator is not exposed by the java
> variant.  The code also has to understand that the message is potentially
> compressed and then call CompressionUtils.
>
> How likely is the above approach to work with subsequent releases?
> Is it worth exposing the concept of batches in ByteBufferMessageSet to
> make it explicit?
>
> eg ByteBufferMessageSet.batchIterator : BatchMessage
> where BatchMessage is a simple extension of Message that has an additional
> method to allow getting a ByteBufferMessageSet (ie. wraps the call to
> CompressionUtils).
>
>
> Thoughts?
>
> Thanks,
> Ross
>
>
>
>
> On 1 June 2012 14:51, Jun Rao <jun...@gmail.com> wrote:
>
>> Ross,
>>
>> With compression enabled, it's a bit hard to implement exact-once since
>> offsets are only advanced after a compressed batch of messages has been
>> consumed. So, you will have to make sure that each batch of messages can
>> be
>> consumed together as a unit. The other option is to compress with a batch
>> size of 1.
>>
>> Thanks,
>>
>> Jun
>>
>> On Thu, May 31, 2012 at 8:05 PM, Ross Black <ross.w.bl...@gmail.com>
>> wrote:
>>
>> > Hi,
>> >
>> > Using SimpleConsumer, I get the offset of a message (from
>> MessageAndOffset)
>> > and persist it with my consumer data to get exactly-once semantics for
>> > consumer state (as described in the kafka design docs).  If the consumer
>> > fails then it is simply a matter of starting replay of messages from the
>> > persisted index.
>> >
>> > When using compression, the offset from MessageAndOffset appears to be
>> the
>> > offset of the compressed batch.  e.g. For a batch of 10 messages, the
>> > offset returned for messages 1-9 is the start of the *current* batch,
>> and
>> > the offset for message 10 is the start of the *next* batch.
>> >
>> > How can I get the exactly-once semantics for consumer state?
>> > Is there a way that I can get a batch of messages from SimpleConsumer?
>> > (otherwise I have to reconstruct a batch by watching for a change in the
>> > offset between messages)
>> >
>> > Thanks,
>> > Ross
>> >
>>
>
>

Reply via email to