Carl, I double if the change you proposed will have at-least-once guarantee. consumedOffset is the next offset of the message that is being returned from iterator.next(). For example the message returned is A with offset 1 and then consumedOffset will be 2 set to currentTopicInfo. While the consumer is processing message A it is possible that the offset 2 will be committed no matter if we put currentTopicInfo.resetConsumeOffset(..) before super.next or after super.next. If consumer fails at this stage then message A will not be reprocessed next time the consumer is started again since offset 2 is committed.
On Mon, 22 Jun 2015 at 15:36 Carl Heymann <ch.heym...@gmail.com> wrote: > OK, thanks. I agree, the current code is better if you get lots of > rebalancing, and you can do your own thing for stronger guarantees. > > For the new consumer, it looks like it should be possible to use multiple > threads, as long as partition order is preserved in the processing, right? > So, one can build a custom API similar to the current connector + streams. > But I guess that's a different discussion. > > With the new consumer API, rebalancing is handled during poll(), which is > called from a client. What if some client stops polling, will this cause > rebalancing hiccups for all consumers in the cluster? Let me know if this > has already been discussed. > > On Mon, Jun 22, 2015 at 8:50 AM, Jiangjie Qin <j...@linkedin.com.invalid> > wrote: > > > Yes, your approach works. I am not sure if we should take this as default > > solution, though. User can have a simple wrapper + customized rebalance > > listener. The tricky part is that the rebalance listener might need > > different implementations. So it looks the current API provides enough > > simplicity and enough flexibility. > > > > For the new consumer, if there is only one user thread, this might not be > > a issue. If the consumer is shared by multiple threads (which is not > > recommended), similar principle applies - commit offsets only after > > processing them. > > > > Thanks, > > > > Jiangjie (Becket) Qin > > > > On 6/21/15, 10:50 PM, "Carl Heymann" <ch.heym...@gmail.com> wrote: > > > > >Thanks Jiangjie > > > > > >So you agree that with the modified ConsumerIterator.next() code, the > high > > >level consumer becomes at-least-once, even with auto-commit enabled? > That > > >is what I really want to know. > > > > > >I'll have a look at the rebalancing code. I think I understand: during > > >rebalancing, with auto-commit enabled, the offsets are committed > > >in ZookeeperConsumerConnector.closeFetchersForQueues(..). Some > processing > > >might still be happening at this point. The rebalance listener is called > > >only after this commit. So the current code (without my change) would > lead > > >to fewer duplicate messages, because it assumes that these transactions > > >normally complete. This seems prudent, since rebalancing happens much > more > > >frequently than java processes being killed unexpectedly. On the other > > >hand > > >it means giving up at-least-once guarantees for message processing, > when a > > >java process actually does die unexpectedly. > > > > > >So I see it should be better to create a custom offset tracking&commit > > >component, with some ability to wait a reasonable amount of time for > > >consumer threads on streams to complete their current transaction, on > > >rebalance, before committing from a rebalance listener. > > > > > >Is it OK to block for a second or two > > >in consumerRebalanceListener.beforeReleasingPartitions(..), to wait for > > >processing threads to complete? Will this hold up the whole cluster's > > >rebalancing? > > > > > >The new KafkaConsumer code doesn't appear to do a commit in the same way > > >during rebalance, when autocommit is enabled. So if current users of the > > >high level consumer switch to the new consumer, they might get more > > >duplicates on rebalance, right? > > > > > >Regards > > >Carl > > > > > > > > >On Sun, Jun 21, 2015 at 1:43 AM, Jiangjie Qin <j...@linkedin.com.invalid > > > > >wrote: > > > > > >> Hi Carl, > > >> > > >> Generally, you approach works to guarantee at least once consumption - > > >> basically people have to commit offset only after they have processed > > >>the > > >> message. > > >> The only problem is that in old high level consumer, during consumer > > >> rebalance consumer will (and should) commit offsets. To guarantee > > >> at-least-once and avoid unecessary duplicates on rebalance, ideally we > > >> should wait until all the messages returned by iterator to be > processed > > >> before commit offsets. > > >> > > >> In LinkedIn, we have wrapper around open source consumer iterator > where > > >>we > > >> can implants those logics. > > >> > > >> Jiangjie (Becket) Qin > > >> > > >> On 6/19/15, 12:22 AM, "Carl Heymann" <ch.heym...@gmail.com> wrote: > > >> > > >> >Thanks Bhavesh. > > >> > > > >> >I understand that to get "exactly once" processing of a message > > >>requires > > >> >some de-duplication. What I'm saying, is that the current high level > > >> >consumer, with automatic offset commits enabled, gives neither "at > most > > >> >once" nor "at least once" guarantees: A consumer group might get > > >>duplicate > > >> >messages, but might also never fully process some messages (which is > a > > >> >bigger problem for me). > > >> > > > >> >With the code change I propose, I think it changes to "at least > once", > > >> >i.e. > > >> >one can then do the deduplication you describe, without worrying > about > > >> >"losing" messages. Messages should not get committed without being > > >>fully > > >> >processed. I want to know if this code change has any obvious > problems. > > >> > > > >> >Regards > > >> >Carl > > >> > > > >> > > > >> >On Thu, Jun 18, 2015 at 11:19 PM, Bhavesh Mistry > > >> ><mistry.p.bhav...@gmail.com > > >> >> wrote: > > >> > > > >> >> HI Carl, > > >> >> > > >> >> Produce side retry can produce duplicated message being sent to > > >>brokers > > >> >> with different offset with same message. Also, you may get > duplicated > > >> >>when > > >> >> the High Level Consumer offset is not being saved or commit but you > > >>have > > >> >> processed data and your server restart etc... > > >> >> > > >> >> > > >> >> > > >> >> To guaranteed at-least one processing across partitions (and across > > >> >> servers), you will need to store message hash or primary key into > > >> >> distributed LRU cache (with eviction policy ) like Hazelcast > > >> >> <http://www.hazelcast.com> and do dedupping across partitions. > > >> >> > > >> >> > > >> >> > > >> >> I hope this help ! > > >> >> > > >> >> > > >> >> > > >> >> Thanks, > > >> >> > > >> >> Bhavesh > > >> >> > > >> >> > > >> >> On Wed, Jun 17, 2015 at 1:49 AM, yewton <yew...@gmail.com> wrote: > > >> >> > > >> >> > So Carl Heymann's ConsumerIterator.next hack approach is not > > >> >>reasonable? > > >> >> > > > >> >> > 2015-06-17 08:12:50 +0000 上のメッセージ Stevo Slavić: > > >> >> > > > >> >> > --047d7bfcf30ed09b460518b241db > > >> >> >> > > >> >> >> Content-Type: text/plain; charset=UTF-8 > > >> >> >> > > >> >> >> > > >> >> >> > > >> >> >> > > >> >> >> With auto-commit one can only have at-most-once delivery > > >>guarantee - > > >> >> after > > >> >> >> > > >> >> >> commit but before message is delivered for processing, or even > > >>after > > >> >>it > > >> >> is > > >> >> >> > > >> >> >> delivered but before it is processed, things can fail, causing > > >>event > > >> >>not > > >> >> >> to > > >> >> >> > > >> >> >> be processed, which is basically same outcome as if it was not > > >> >> delivered. > > >> >> >> > > >> >> >> > > >> >> >> > > >> >> >> On Mon, Jun 15, 2015 at 9:12 PM, Carl Heymann > > >><ch.heym...@gmail.com> > > >> >> >> wrote: > > >> >> >> > > >> >> >> > > >> >> >> > > >> >> >> > Hi > > >> >> >> > > >> >> >> > > > >> >> >> > > >> >> >> > ** Disclaimer: I know there's a new consumer API on the way, > > >>this > > >> >>mail > > >> >> >> is > > >> >> >> > > >> >> >> > about the currently available API. I also apologise if the > below > > >> >>has > > >> >> >> > > >> >> >> > already been discussed previously. I did try to check previous > > >> >> >> discussions > > >> >> >> > > >> >> >> > on ConsumerIterator ** > > >> >> >> > > >> >> >> > > > >> >> >> > > >> >> >> > It seems to me that the high-level consumer would be able to > > >> >>support > > >> >> >> > > >> >> >> > at-least-once messaging, even if one uses auto-commit, by > > >>changing > > >> >> >> > > >> >> >> > kafka.consumer.ConsumerIterator.next() to call > > >> >> >> > > >> >> >> > currentTopicInfo.resetConsumeOffset(..) _before_ super.next(). > > >>This > > >> >> >> way, a > > >> >> >> > > >> >> >> > consumer thread for a KafkaStream could just loop: > > >> >> >> > > >> >> >> > > > >> >> >> > > >> >> >> > while (true) { > > >> >> >> > > >> >> >> > MyMessage message = iterator.next().message(); > > >> >> >> > > >> >> >> > process(message); > > >> >> >> > > >> >> >> > } > > >> >> >> > > >> >> >> > > > >> >> >> > > >> >> >> > Each call to "iterator.next()" then updates the offset to > > >>commit to > > >> >> the > > >> >> >> end > > >> >> >> > > >> >> >> > of the message that was just processed. When offsets are > > >>committed > > >> >>for > > >> >> >> the > > >> >> >> > > >> >> >> > ConsumerConnector (either automatically or manually), the > commit > > >> >>will > > >> >> >> not > > >> >> >> > > >> >> >> > include offsets of messages that haven't been fully processed. > > >> >> >> > > >> >> >> > > > >> >> >> > > >> >> >> > I've tested the following ConsumerIterator.next(), and it > seems > > >>to > > >> >> work > > >> >> >> as > > >> >> >> > > >> >> >> > I expect: > > >> >> >> > > >> >> >> > > > >> >> >> > > >> >> >> > override def next(): MessageAndMetadata[K, V] = { > > >> >> >> > > >> >> >> > // New code: reset consumer offset to the end of the > > >>previously > > >> >> >> > > >> >> >> > consumed message: > > >> >> >> > > >> >> >> > if (consumedOffset > -1L && currentTopicInfo != null) { > > >> >> >> > > >> >> >> > currentTopicInfo.resetConsumeOffset(consumedOffset) > > >> >> >> > > >> >> >> > val topic = currentTopicInfo.topic > > >> >> >> > > >> >> >> > trace("Setting %s consumed offset to %d".format(topic, > > >> >> >> > > >> >> >> > consumedOffset)) > > >> >> >> > > >> >> >> > } > > >> >> >> > > >> >> >> > > > >> >> >> > > >> >> >> > // Old code, excluding reset: > > >> >> >> > > >> >> >> > val item = super.next() > > >> >> >> > > >> >> >> > if(consumedOffset < 0) > > >> >> >> > > >> >> >> > throw new KafkaException("Offset returned by the message > > >>set > > >> >>is > > >> >> >> > > >> >> >> > invalid %d".format(consumedOffset)) > > >> >> >> > > >> >> >> > val topic = currentTopicInfo.topic > > >> >> >> > > >> >> >> > > > >> >>consumerTopicStats.getConsumerTopicStats(topic).messageRate.mark() > > >> >> >> > > >> >> >> > > > >> >>consumerTopicStats.getConsumerAllTopicStats().messageRate.mark() > > >> >> >> > > >> >> >> > item > > >> >> >> > > >> >> >> > } > > >> >> >> > > >> >> >> > > > >> >> >> > > >> >> >> > I've seen several people asking about managing commit offsets > > >> >>manually > > >> >> >> with > > >> >> >> > > >> >> >> > the high level consumer. I suspect that this approach (the > > >>modified > > >> >> >> > > >> >> >> > ConsumerIterator) would scale better than having a separate > > >> >> >> > > >> >> >> > ConsumerConnecter per stream just so that you can commit > offsets > > >> >>with > > >> >> >> > > >> >> >> > at-least-once semantics. The downside of this approach is more > > >> >> duplicate > > >> >> >> > > >> >> >> > deliveries after recovery from hard failure (but this is "at > > >>least > > >> >> >> once", > > >> >> >> > > >> >> >> > right, not "exactly once"). > > >> >> >> > > >> >> >> > > > >> >> >> > > >> >> >> > I don't propose that the code necessarily be changed like this > > >>in > > >> >> >> trunk, I > > >> >> >> > > >> >> >> > just want to know if the approach seems reasonable. > > >> >> >> > > >> >> >> > > > >> >> >> > > >> >> >> > Regards > > >> >> >> > > >> >> >> > Carl Heymann > > >> >> >> > > >> >> >> > > > >> >> >> > > >> >> >> > > >> >> >> > > >> >> >> --047d7bfcf30ed09b460518b241db-- > > >> >> >> > > >> >> >> > > >> >> >> > > >> >> >> > > >> >> > > > >> >> > > > >> >> > > > >> >> > > >> > > >> > > > > >