Hi Carl,

Generally, you approach works to guarantee at least once consumption -
basically people have to commit offset only after they have processed the
message. 
The only problem is that in old high level consumer, during consumer
rebalance consumer will (and should) commit offsets. To guarantee
at-least-once and avoid unecessary duplicates on rebalance, ideally we
should wait until all the messages returned by iterator to be processed
before commit offsets.

In LinkedIn, we have wrapper around open source consumer iterator where we
can implants those logics.

Jiangjie (Becket) Qin

On 6/19/15, 12:22 AM, "Carl Heymann" <ch.heym...@gmail.com> wrote:

>Thanks Bhavesh.
>
>I understand that to get "exactly once" processing of a message requires
>some de-duplication. What I'm saying, is that the current high level
>consumer, with automatic offset commits enabled, gives neither "at most
>once" nor "at least once" guarantees: A consumer group might get duplicate
>messages, but might also never fully process some messages (which is a
>bigger problem for me).
>
>With the code change I propose, I think it changes to "at least once",
>i.e.
>one can then do the deduplication you describe, without worrying about
>"losing" messages. Messages should not get committed without being fully
>processed. I want to know if this code change has any obvious problems.
>
>Regards
>Carl
>
>
>On Thu, Jun 18, 2015 at 11:19 PM, Bhavesh Mistry
><mistry.p.bhav...@gmail.com
>> wrote:
>
>> HI Carl,
>>
>> Produce side retry can produce duplicated message being sent to brokers
>> with different offset with same message. Also, you may get duplicated
>>when
>> the High Level Consumer offset is not being saved or commit but you have
>> processed data and your server restart etc...
>>
>>
>>
>> To guaranteed at-least one processing across partitions (and across
>> servers), you will need to store message hash or primary key into
>> distributed LRU cache (with eviction policy )  like Hazelcast
>> <http://www.hazelcast.com> and do dedupping across partitions.
>>
>>
>>
>> I hope this help !
>>
>>
>>
>> Thanks,
>>
>> Bhavesh
>>
>>
>> On Wed, Jun 17, 2015 at 1:49 AM, yewton <yew...@gmail.com> wrote:
>>
>> > So Carl Heymann's ConsumerIterator.next hack approach is not
>>reasonable?
>> >
>> > 2015-06-17 08:12:50 +0000 上のメッセージ Stevo Slavić:
>> >
>> >  --047d7bfcf30ed09b460518b241db
>> >>
>> >> Content-Type: text/plain; charset=UTF-8
>> >>
>> >>
>> >>
>> >>
>> >> With auto-commit one can only have at-most-once delivery guarantee -
>> after
>> >>
>> >> commit but before message is delivered for processing, or even after
>>it
>> is
>> >>
>> >> delivered but before it is processed, things can fail, causing event
>>not
>> >> to
>> >>
>> >> be processed, which is basically same outcome as if it was not
>> delivered.
>> >>
>> >>
>> >>
>> >> On Mon, Jun 15, 2015 at 9:12 PM, Carl Heymann <ch.heym...@gmail.com>
>> >> wrote:
>> >>
>> >>
>> >>
>> >> > Hi
>> >>
>> >> >
>> >>
>> >> > ** Disclaimer: I know there's a new consumer API on the way, this
>>mail
>> >> is
>> >>
>> >> > about the currently available API. I also apologise if the below
>>has
>> >>
>> >> > already been discussed previously. I did try to check previous
>> >> discussions
>> >>
>> >> > on ConsumerIterator **
>> >>
>> >> >
>> >>
>> >> > It seems to me that the high-level consumer would be able to
>>support
>> >>
>> >> > at-least-once messaging, even if one uses auto-commit, by changing
>> >>
>> >> > kafka.consumer.ConsumerIterator.next() to call
>> >>
>> >> > currentTopicInfo.resetConsumeOffset(..) _before_ super.next(). This
>> >> way, a
>> >>
>> >> > consumer thread for a KafkaStream could just loop:
>> >>
>> >> >
>> >>
>> >> > while (true) {
>> >>
>> >> >     MyMessage message = iterator.next().message();
>> >>
>> >> >     process(message);
>> >>
>> >> > }
>> >>
>> >> >
>> >>
>> >> > Each call to "iterator.next()" then updates the offset to commit to
>> the
>> >> end
>> >>
>> >> > of the message that was just processed. When offsets are committed
>>for
>> >> the
>> >>
>> >> > ConsumerConnector (either automatically or manually), the commit
>>will
>> >> not
>> >>
>> >> > include offsets of messages that haven't been fully processed.
>> >>
>> >> >
>> >>
>> >> > I've tested the following ConsumerIterator.next(), and it seems to
>> work
>> >> as
>> >>
>> >> > I expect:
>> >>
>> >> >
>> >>
>> >> >   override def next(): MessageAndMetadata[K, V] = {
>> >>
>> >> >     // New code: reset consumer offset to the end of the previously
>> >>
>> >> > consumed message:
>> >>
>> >> >     if (consumedOffset > -1L && currentTopicInfo != null) {
>> >>
>> >> >         currentTopicInfo.resetConsumeOffset(consumedOffset)
>> >>
>> >> >         val topic = currentTopicInfo.topic
>> >>
>> >> >         trace("Setting %s consumed offset to %d".format(topic,
>> >>
>> >> > consumedOffset))
>> >>
>> >> >     }
>> >>
>> >> >
>> >>
>> >> >     // Old code, excluding reset:
>> >>
>> >> >     val item = super.next()
>> >>
>> >> >     if(consumedOffset < 0)
>> >>
>> >> >       throw new KafkaException("Offset returned by the message set
>>is
>> >>
>> >> > invalid %d".format(consumedOffset))
>> >>
>> >> >     val topic = currentTopicInfo.topic
>> >>
>> >> >     
>>consumerTopicStats.getConsumerTopicStats(topic).messageRate.mark()
>> >>
>> >> >     
>>consumerTopicStats.getConsumerAllTopicStats().messageRate.mark()
>> >>
>> >> >     item
>> >>
>> >> >   }
>> >>
>> >> >
>> >>
>> >> > I've seen several people asking about managing commit offsets
>>manually
>> >> with
>> >>
>> >> > the high level consumer. I suspect that this approach (the modified
>> >>
>> >> > ConsumerIterator) would scale better than having a separate
>> >>
>> >> > ConsumerConnecter per stream just so that you can commit offsets
>>with
>> >>
>> >> > at-least-once semantics. The downside of this approach is more
>> duplicate
>> >>
>> >> > deliveries after recovery from hard failure (but this is "at least
>> >> once",
>> >>
>> >> > right, not "exactly once").
>> >>
>> >> >
>> >>
>> >> > I don't propose that the code necessarily be changed like this in
>> >> trunk, I
>> >>
>> >> > just want to know if the approach seems reasonable.
>> >>
>> >> >
>> >>
>> >> > Regards
>> >>
>> >> > Carl Heymann
>> >>
>> >> >
>> >>
>> >>
>> >>
>> >> --047d7bfcf30ed09b460518b241db--
>> >>
>> >>
>> >>
>> >>
>> >
>> >
>> >
>>

Reply via email to