Carl,

I double if the change you proposed will have at-least-once guarantee.
consumedOffset
is the next offset of the message that is being returned from
iterator.next(). For example the message returned is A with offset 1 and
then consumedOffset will be 2 set to currentTopicInfo. While the consumer
is processing message A it is possible that the offset 2 will be committed
no matter if we put currentTopicInfo.resetConsumeOffset(..) before
super.next or after super.next. If consumer fails at this stage then
message A will not be reprocessed next time the consumer is started again
since offset 2 is committed.


On Mon, 22 Jun 2015 at 15:36 Carl Heymann <ch.heym...@gmail.com> wrote:

> OK, thanks. I agree, the current code is better if you get lots of
> rebalancing, and you can do your own thing for stronger guarantees.
>
> For the new consumer, it looks like it should be possible to use multiple
> threads, as long as partition order is preserved in the processing, right?
> So, one can build a custom API similar to the current connector + streams.
> But I guess that's a different discussion.
>
> With the new consumer API, rebalancing is handled during poll(), which is
> called from a client. What if some client stops polling, will this cause
> rebalancing hiccups for all consumers in the cluster? Let me know if this
> has already been discussed.
>
> On Mon, Jun 22, 2015 at 8:50 AM, Jiangjie Qin <j...@linkedin.com.invalid>
> wrote:
>
> > Yes, your approach works. I am not sure if we should take this as default
> > solution, though. User can have a simple wrapper + customized rebalance
> > listener. The tricky part is that the rebalance listener might need
> > different implementations. So it looks the current API provides enough
> > simplicity and enough flexibility.
> >
> > For the new consumer, if there is only one user thread, this might not be
> > a issue. If the consumer is shared by multiple threads (which is not
> > recommended), similar principle applies - commit offsets only after
> > processing them.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> > On 6/21/15, 10:50 PM, "Carl Heymann" <ch.heym...@gmail.com> wrote:
> >
> > >Thanks Jiangjie
> > >
> > >So you agree that with the modified ConsumerIterator.next() code, the
> high
> > >level consumer becomes at-least-once, even with auto-commit enabled?
> That
> > >is what I really want to know.
> > >
> > >I'll have a look at the rebalancing code. I think I understand: during
> > >rebalancing, with auto-commit enabled, the offsets are committed
> > >in ZookeeperConsumerConnector.closeFetchersForQueues(..). Some
> processing
> > >might still be happening at this point. The rebalance listener is called
> > >only after this commit. So the current code (without my change) would
> lead
> > >to fewer duplicate messages, because it assumes that these transactions
> > >normally complete. This seems prudent, since rebalancing happens much
> more
> > >frequently than java processes being killed unexpectedly. On the other
> > >hand
> > >it means giving up at-least-once guarantees for message processing,
> when a
> > >java process actually does die unexpectedly.
> > >
> > >So I see it should be better to create a custom offset tracking&commit
> > >component, with some ability to wait a reasonable amount of time for
> > >consumer threads on streams to complete their current transaction, on
> > >rebalance, before committing from a rebalance listener.
> > >
> > >Is it OK to block for a second or two
> > >in consumerRebalanceListener.beforeReleasingPartitions(..), to wait for
> > >processing threads to complete? Will this hold up the whole cluster's
> > >rebalancing?
> > >
> > >The new KafkaConsumer code doesn't appear to do a commit in the same way
> > >during rebalance, when autocommit is enabled. So if current users of the
> > >high level consumer switch to the new consumer, they might get more
> > >duplicates on rebalance, right?
> > >
> > >Regards
> > >Carl
> > >
> > >
> > >On Sun, Jun 21, 2015 at 1:43 AM, Jiangjie Qin <j...@linkedin.com.invalid
> >
> > >wrote:
> > >
> > >> Hi Carl,
> > >>
> > >> Generally, you approach works to guarantee at least once consumption -
> > >> basically people have to commit offset only after they have processed
> > >>the
> > >> message.
> > >> The only problem is that in old high level consumer, during consumer
> > >> rebalance consumer will (and should) commit offsets. To guarantee
> > >> at-least-once and avoid unecessary duplicates on rebalance, ideally we
> > >> should wait until all the messages returned by iterator to be
> processed
> > >> before commit offsets.
> > >>
> > >> In LinkedIn, we have wrapper around open source consumer iterator
> where
> > >>we
> > >> can implants those logics.
> > >>
> > >> Jiangjie (Becket) Qin
> > >>
> > >> On 6/19/15, 12:22 AM, "Carl Heymann" <ch.heym...@gmail.com> wrote:
> > >>
> > >> >Thanks Bhavesh.
> > >> >
> > >> >I understand that to get "exactly once" processing of a message
> > >>requires
> > >> >some de-duplication. What I'm saying, is that the current high level
> > >> >consumer, with automatic offset commits enabled, gives neither "at
> most
> > >> >once" nor "at least once" guarantees: A consumer group might get
> > >>duplicate
> > >> >messages, but might also never fully process some messages (which is
> a
> > >> >bigger problem for me).
> > >> >
> > >> >With the code change I propose, I think it changes to "at least
> once",
> > >> >i.e.
> > >> >one can then do the deduplication you describe, without worrying
> about
> > >> >"losing" messages. Messages should not get committed without being
> > >>fully
> > >> >processed. I want to know if this code change has any obvious
> problems.
> > >> >
> > >> >Regards
> > >> >Carl
> > >> >
> > >> >
> > >> >On Thu, Jun 18, 2015 at 11:19 PM, Bhavesh Mistry
> > >> ><mistry.p.bhav...@gmail.com
> > >> >> wrote:
> > >> >
> > >> >> HI Carl,
> > >> >>
> > >> >> Produce side retry can produce duplicated message being sent to
> > >>brokers
> > >> >> with different offset with same message. Also, you may get
> duplicated
> > >> >>when
> > >> >> the High Level Consumer offset is not being saved or commit but you
> > >>have
> > >> >> processed data and your server restart etc...
> > >> >>
> > >> >>
> > >> >>
> > >> >> To guaranteed at-least one processing across partitions (and across
> > >> >> servers), you will need to store message hash or primary key into
> > >> >> distributed LRU cache (with eviction policy )  like Hazelcast
> > >> >> <http://www.hazelcast.com> and do dedupping across partitions.
> > >> >>
> > >> >>
> > >> >>
> > >> >> I hope this help !
> > >> >>
> > >> >>
> > >> >>
> > >> >> Thanks,
> > >> >>
> > >> >> Bhavesh
> > >> >>
> > >> >>
> > >> >> On Wed, Jun 17, 2015 at 1:49 AM, yewton <yew...@gmail.com> wrote:
> > >> >>
> > >> >> > So Carl Heymann's ConsumerIterator.next hack approach is not
> > >> >>reasonable?
> > >> >> >
> > >> >> > 2015-06-17 08:12:50 +0000 上のメッセージ Stevo Slavić:
> > >> >> >
> > >> >> >  --047d7bfcf30ed09b460518b241db
> > >> >> >>
> > >> >> >> Content-Type: text/plain; charset=UTF-8
> > >> >> >>
> > >> >> >>
> > >> >> >>
> > >> >> >>
> > >> >> >> With auto-commit one can only have at-most-once delivery
> > >>guarantee -
> > >> >> after
> > >> >> >>
> > >> >> >> commit but before message is delivered for processing, or even
> > >>after
> > >> >>it
> > >> >> is
> > >> >> >>
> > >> >> >> delivered but before it is processed, things can fail, causing
> > >>event
> > >> >>not
> > >> >> >> to
> > >> >> >>
> > >> >> >> be processed, which is basically same outcome as if it was not
> > >> >> delivered.
> > >> >> >>
> > >> >> >>
> > >> >> >>
> > >> >> >> On Mon, Jun 15, 2015 at 9:12 PM, Carl Heymann
> > >><ch.heym...@gmail.com>
> > >> >> >> wrote:
> > >> >> >>
> > >> >> >>
> > >> >> >>
> > >> >> >> > Hi
> > >> >> >>
> > >> >> >> >
> > >> >> >>
> > >> >> >> > ** Disclaimer: I know there's a new consumer API on the way,
> > >>this
> > >> >>mail
> > >> >> >> is
> > >> >> >>
> > >> >> >> > about the currently available API. I also apologise if the
> below
> > >> >>has
> > >> >> >>
> > >> >> >> > already been discussed previously. I did try to check previous
> > >> >> >> discussions
> > >> >> >>
> > >> >> >> > on ConsumerIterator **
> > >> >> >>
> > >> >> >> >
> > >> >> >>
> > >> >> >> > It seems to me that the high-level consumer would be able to
> > >> >>support
> > >> >> >>
> > >> >> >> > at-least-once messaging, even if one uses auto-commit, by
> > >>changing
> > >> >> >>
> > >> >> >> > kafka.consumer.ConsumerIterator.next() to call
> > >> >> >>
> > >> >> >> > currentTopicInfo.resetConsumeOffset(..) _before_ super.next().
> > >>This
> > >> >> >> way, a
> > >> >> >>
> > >> >> >> > consumer thread for a KafkaStream could just loop:
> > >> >> >>
> > >> >> >> >
> > >> >> >>
> > >> >> >> > while (true) {
> > >> >> >>
> > >> >> >> >     MyMessage message = iterator.next().message();
> > >> >> >>
> > >> >> >> >     process(message);
> > >> >> >>
> > >> >> >> > }
> > >> >> >>
> > >> >> >> >
> > >> >> >>
> > >> >> >> > Each call to "iterator.next()" then updates the offset to
> > >>commit to
> > >> >> the
> > >> >> >> end
> > >> >> >>
> > >> >> >> > of the message that was just processed. When offsets are
> > >>committed
> > >> >>for
> > >> >> >> the
> > >> >> >>
> > >> >> >> > ConsumerConnector (either automatically or manually), the
> commit
> > >> >>will
> > >> >> >> not
> > >> >> >>
> > >> >> >> > include offsets of messages that haven't been fully processed.
> > >> >> >>
> > >> >> >> >
> > >> >> >>
> > >> >> >> > I've tested the following ConsumerIterator.next(), and it
> seems
> > >>to
> > >> >> work
> > >> >> >> as
> > >> >> >>
> > >> >> >> > I expect:
> > >> >> >>
> > >> >> >> >
> > >> >> >>
> > >> >> >> >   override def next(): MessageAndMetadata[K, V] = {
> > >> >> >>
> > >> >> >> >     // New code: reset consumer offset to the end of the
> > >>previously
> > >> >> >>
> > >> >> >> > consumed message:
> > >> >> >>
> > >> >> >> >     if (consumedOffset > -1L && currentTopicInfo != null) {
> > >> >> >>
> > >> >> >> >         currentTopicInfo.resetConsumeOffset(consumedOffset)
> > >> >> >>
> > >> >> >> >         val topic = currentTopicInfo.topic
> > >> >> >>
> > >> >> >> >         trace("Setting %s consumed offset to %d".format(topic,
> > >> >> >>
> > >> >> >> > consumedOffset))
> > >> >> >>
> > >> >> >> >     }
> > >> >> >>
> > >> >> >> >
> > >> >> >>
> > >> >> >> >     // Old code, excluding reset:
> > >> >> >>
> > >> >> >> >     val item = super.next()
> > >> >> >>
> > >> >> >> >     if(consumedOffset < 0)
> > >> >> >>
> > >> >> >> >       throw new KafkaException("Offset returned by the message
> > >>set
> > >> >>is
> > >> >> >>
> > >> >> >> > invalid %d".format(consumedOffset))
> > >> >> >>
> > >> >> >> >     val topic = currentTopicInfo.topic
> > >> >> >>
> > >> >> >> >
> > >> >>consumerTopicStats.getConsumerTopicStats(topic).messageRate.mark()
> > >> >> >>
> > >> >> >> >
> > >> >>consumerTopicStats.getConsumerAllTopicStats().messageRate.mark()
> > >> >> >>
> > >> >> >> >     item
> > >> >> >>
> > >> >> >> >   }
> > >> >> >>
> > >> >> >> >
> > >> >> >>
> > >> >> >> > I've seen several people asking about managing commit offsets
> > >> >>manually
> > >> >> >> with
> > >> >> >>
> > >> >> >> > the high level consumer. I suspect that this approach (the
> > >>modified
> > >> >> >>
> > >> >> >> > ConsumerIterator) would scale better than having a separate
> > >> >> >>
> > >> >> >> > ConsumerConnecter per stream just so that you can commit
> offsets
> > >> >>with
> > >> >> >>
> > >> >> >> > at-least-once semantics. The downside of this approach is more
> > >> >> duplicate
> > >> >> >>
> > >> >> >> > deliveries after recovery from hard failure (but this is "at
> > >>least
> > >> >> >> once",
> > >> >> >>
> > >> >> >> > right, not "exactly once").
> > >> >> >>
> > >> >> >> >
> > >> >> >>
> > >> >> >> > I don't propose that the code necessarily be changed like this
> > >>in
> > >> >> >> trunk, I
> > >> >> >>
> > >> >> >> > just want to know if the approach seems reasonable.
> > >> >> >>
> > >> >> >> >
> > >> >> >>
> > >> >> >> > Regards
> > >> >> >>
> > >> >> >> > Carl Heymann
> > >> >> >>
> > >> >> >> >
> > >> >> >>
> > >> >> >>
> > >> >> >>
> > >> >> >> --047d7bfcf30ed09b460518b241db--
> > >> >> >>
> > >> >> >>
> > >> >> >>
> > >> >> >>
> > >> >> >
> > >> >> >
> > >> >> >
> > >> >>
> > >>
> > >>
> >
> >
>

Reply via email to