Re: [DISCUSS] KIP-983: Full speed async processing during rebalance
Thanks Philip, That sounds pretty good. Meanwhile I'll continue to study KIP-848. It is a bit too much to digest in 1 go. Do you have a rough timeline for when the new consumer implementation can be tried out in non-production environments? Kind regards, Erik. Op 14-10-2023 om 20:48 schreef Philip Nee: Hi Erik, Thanks for the KIP, again. I am also very much interested in the idea of this KIP, and I want to let you know that we are rewriting the kafka consumer using an event-driven approach, so I think the new impl would make this KIP much easier to implement. In a nutshell, the network IO will become completely asynchronous to the application thread, so that the blocking APIs won't stale the network send/receive. In the new impl, the main role of poll are 1. check if there are any background events such as error or callback invocation, 2. notify the background that the user is polling, and 3. check if there is any data to return to the user. Because the background thread and application thread are inherently working in an async fashion, it is possible to continue to process and commit during the revocation period; however, we have to be very careful with the state of partition ownership as you mentioned in the KIP. Please keep an eye out on the new consumer implementation, in case if you are interested in digging in, it is the PrototypeKafkaConsumer module. It is not fully functional but we are working full speed to get this to a good state. Thanks - I am still reading to KIP and your previous KIP to see if I can make more constructive suggestions here. P On Fri, Oct 13, 2023 at 11:54 PM Erik van Oosten wrote: Hello David, Thanks, I am happy to hear we agree on the problem. All the tiny details of an implementation are less important. I will read KIP-848 first to answer you question about its relation with KIP-983. But for sure it makes sense to complete the implementation of KIP-848 first. Kind regards, Erik. Op 13-10-2023 om 21:00 schreef David Jacot: Hi Erik, Thanks for the KIP. I haven’t fully read the KIP yet but I agree with the weaknesses that you point out in it. I will continue to read it. For your information, we are working full speed on implementing KIP-848 while also changing the internal threading model of consumer. Those changes are already extremely large so I would rather prefer to complete them before adding more on top of them. Moreover, I think that this KIP should build on top of KIP-848 now. Would you agree with this? Best, David Le ven. 13 oct. 2023 à 20:44, Erik van Oosten .invalid> a écrit : Thanks Philip, No worries, I am not in a hurry. Knowing this is not forgotten is enough for me. If there is anything I can do to help the process please let me know. Kind regards, Erik. Op 13-10-2023 om 20:29 schreef Philip Nee: Hi Erik, Sorry for the delay, I have not finished reviewing the KIP, but I also have not forgotten about it! In general, KIP review process can be lengthy, so I think mailing list is the best bet to get the committer's attention. P On Fri, Oct 13, 2023 at 10:55 AM Erik van Oosten wrote: Hi client developers, The text is updated so that it is more clear that you can only use auto-commit when doing synchronous processing (approach 1). I am assuming that auto-commit commits whatever was consumed in the previous poll. I am wondering why this KIP doesn't get more attention. Is async processing not something that the kafka client wants to support? Kind regards, Erik. Op 25-09-2023 om 18:17 schreef Erik van Oosten: Hi Viktor, Good questions! 1. Auto-commits would only work with approach 1 in the KIP. Any async solution is incompatible with auto-commits. Do you think the text will improve when this is mentioned? 2. That is entirely correct. If you use async commits you can await completion by doing a single sync commit with an empty offsets Map (this will work as of Kafka 3.6.0). Is there anything I can do to make the text clearer? Kind regards, Erik. Op 25-09-2023 om 17:04 schreef Viktor Somogyi-Vass: Hi Erik, I'm still trying to wrap my head around the KIP, however I have a few questions that weren't clear to me regarding offset commits: 1. Would auto-commits interfere with the behavior defined in your KIP or would it work the same as manual commits? 2. As I see you don't separate offset commits by whether they're sync or async. For sync commits timing isn't really a problem but how would you change work in case of async offset commits? There can be a few caveats there as you may not know whether a commit is finished or not until your callback is called. Thanks, Viktor On Sat, Sep 23, 2023 at 4:00 PM Erik van Oosten wrote: Hi all, I would like to start the discussion on KIP-983: Full speed async processing during rebalance [1]. The idea is that we can prevent the drop in throughput during a cooperative rebalance. I am curious to your id
Re: [DISCUSS] KIP-983: Full speed async processing during rebalance
Hello David, Thanks, I am happy to hear we agree on the problem. All the tiny details of an implementation are less important. I will read KIP-848 first to answer you question about its relation with KIP-983. But for sure it makes sense to complete the implementation of KIP-848 first. Kind regards, Erik. Op 13-10-2023 om 21:00 schreef David Jacot: Hi Erik, Thanks for the KIP. I haven’t fully read the KIP yet but I agree with the weaknesses that you point out in it. I will continue to read it. For your information, we are working full speed on implementing KIP-848 while also changing the internal threading model of consumer. Those changes are already extremely large so I would rather prefer to complete them before adding more on top of them. Moreover, I think that this KIP should build on top of KIP-848 now. Would you agree with this? Best, David Le ven. 13 oct. 2023 à 20:44, Erik van Oosten a écrit : Thanks Philip, No worries, I am not in a hurry. Knowing this is not forgotten is enough for me. If there is anything I can do to help the process please let me know. Kind regards, Erik. Op 13-10-2023 om 20:29 schreef Philip Nee: Hi Erik, Sorry for the delay, I have not finished reviewing the KIP, but I also have not forgotten about it! In general, KIP review process can be lengthy, so I think mailing list is the best bet to get the committer's attention. P On Fri, Oct 13, 2023 at 10:55 AM Erik van Oosten wrote: Hi client developers, The text is updated so that it is more clear that you can only use auto-commit when doing synchronous processing (approach 1). I am assuming that auto-commit commits whatever was consumed in the previous poll. I am wondering why this KIP doesn't get more attention. Is async processing not something that the kafka client wants to support? Kind regards, Erik. Op 25-09-2023 om 18:17 schreef Erik van Oosten: Hi Viktor, Good questions! 1. Auto-commits would only work with approach 1 in the KIP. Any async solution is incompatible with auto-commits. Do you think the text will improve when this is mentioned? 2. That is entirely correct. If you use async commits you can await completion by doing a single sync commit with an empty offsets Map (this will work as of Kafka 3.6.0). Is there anything I can do to make the text clearer? Kind regards, Erik. Op 25-09-2023 om 17:04 schreef Viktor Somogyi-Vass: Hi Erik, I'm still trying to wrap my head around the KIP, however I have a few questions that weren't clear to me regarding offset commits: 1. Would auto-commits interfere with the behavior defined in your KIP or would it work the same as manual commits? 2. As I see you don't separate offset commits by whether they're sync or async. For sync commits timing isn't really a problem but how would you change work in case of async offset commits? There can be a few caveats there as you may not know whether a commit is finished or not until your callback is called. Thanks, Viktor On Sat, Sep 23, 2023 at 4:00 PM Erik van Oosten wrote: Hi all, I would like to start the discussion on KIP-983: Full speed async processing during rebalance [1]. The idea is that we can prevent the drop in throughput during a cooperative rebalance. I am curious to your ideas and comments. Kind regards, Erik. [1] https://cwiki.apache.org/confluence/display/KAFKA/KIP-983%3A+Full+speed+async+processing+during+rebalance -- Erik van Oosten e.vanoos...@grons.nl https://day-to-day-stuff.blogspot.com
Re: [DISCUSS] KIP-983: Full speed async processing during rebalance
Thanks Philip, No worries, I am not in a hurry. Knowing this is not forgotten is enough for me. If there is anything I can do to help the process please let me know. Kind regards, Erik. Op 13-10-2023 om 20:29 schreef Philip Nee: Hi Erik, Sorry for the delay, I have not finished reviewing the KIP, but I also have not forgotten about it! In general, KIP review process can be lengthy, so I think mailing list is the best bet to get the committer's attention. P On Fri, Oct 13, 2023 at 10:55 AM Erik van Oosten wrote: Hi client developers, The text is updated so that it is more clear that you can only use auto-commit when doing synchronous processing (approach 1). I am assuming that auto-commit commits whatever was consumed in the previous poll. I am wondering why this KIP doesn't get more attention. Is async processing not something that the kafka client wants to support? Kind regards, Erik. Op 25-09-2023 om 18:17 schreef Erik van Oosten: Hi Viktor, Good questions! 1. Auto-commits would only work with approach 1 in the KIP. Any async solution is incompatible with auto-commits. Do you think the text will improve when this is mentioned? 2. That is entirely correct. If you use async commits you can await completion by doing a single sync commit with an empty offsets Map (this will work as of Kafka 3.6.0). Is there anything I can do to make the text clearer? Kind regards, Erik. Op 25-09-2023 om 17:04 schreef Viktor Somogyi-Vass: Hi Erik, I'm still trying to wrap my head around the KIP, however I have a few questions that weren't clear to me regarding offset commits: 1. Would auto-commits interfere with the behavior defined in your KIP or would it work the same as manual commits? 2. As I see you don't separate offset commits by whether they're sync or async. For sync commits timing isn't really a problem but how would you change work in case of async offset commits? There can be a few caveats there as you may not know whether a commit is finished or not until your callback is called. Thanks, Viktor On Sat, Sep 23, 2023 at 4:00 PM Erik van Oosten wrote: Hi all, I would like to start the discussion on KIP-983: Full speed async processing during rebalance [1]. The idea is that we can prevent the drop in throughput during a cooperative rebalance. I am curious to your ideas and comments. Kind regards, Erik. [1] https://cwiki.apache.org/confluence/display/KAFKA/KIP-983%3A+Full+speed+async+processing+during+rebalance -- Erik van Oosten e.vanoos...@grons.nl https://day-to-day-stuff.blogspot.com -- Erik van Oosten e.vanoos...@grons.nl https://day-to-day-stuff.blogspot.com
Re: [DISCUSS] KIP-983: Full speed async processing during rebalance
Hi client developers, The text is updated so that it is more clear that you can only use auto-commit when doing synchronous processing (approach 1). I am assuming that auto-commit commits whatever was consumed in the previous poll. I am wondering why this KIP doesn't get more attention. Is async processing not something that the kafka client wants to support? Kind regards, Erik. Op 25-09-2023 om 18:17 schreef Erik van Oosten: Hi Viktor, Good questions! 1. Auto-commits would only work with approach 1 in the KIP. Any async solution is incompatible with auto-commits. Do you think the text will improve when this is mentioned? 2. That is entirely correct. If you use async commits you can await completion by doing a single sync commit with an empty offsets Map (this will work as of Kafka 3.6.0). Is there anything I can do to make the text clearer? Kind regards, Erik. Op 25-09-2023 om 17:04 schreef Viktor Somogyi-Vass: Hi Erik, I'm still trying to wrap my head around the KIP, however I have a few questions that weren't clear to me regarding offset commits: 1. Would auto-commits interfere with the behavior defined in your KIP or would it work the same as manual commits? 2. As I see you don't separate offset commits by whether they're sync or async. For sync commits timing isn't really a problem but how would you change work in case of async offset commits? There can be a few caveats there as you may not know whether a commit is finished or not until your callback is called. Thanks, Viktor On Sat, Sep 23, 2023 at 4:00 PM Erik van Oosten wrote: Hi all, I would like to start the discussion on KIP-983: Full speed async processing during rebalance [1]. The idea is that we can prevent the drop in throughput during a cooperative rebalance. I am curious to your ideas and comments. Kind regards, Erik. [1] https://cwiki.apache.org/confluence/display/KAFKA/KIP-983%3A+Full+speed+async+processing+during+rebalance -- Erik van Oosten e.vanoos...@grons.nl https://day-to-day-stuff.blogspot.com
Re: [DISCUSS] KIP-983: Full speed async processing during rebalance
Hi Viktor, Good questions! 1. Auto-commits would only work with approach 1 in the KIP. Any async solution is incompatible with auto-commits. Do you think the text will improve when this is mentioned? 2. That is entirely correct. If you use async commits you can await completion by doing a single sync commit with an empty offsets Map (this will work as of Kafka 3.6.0). Is there anything I can do to make the text clearer? Kind regards, Erik. Op 25-09-2023 om 17:04 schreef Viktor Somogyi-Vass: Hi Erik, I'm still trying to wrap my head around the KIP, however I have a few questions that weren't clear to me regarding offset commits: 1. Would auto-commits interfere with the behavior defined in your KIP or would it work the same as manual commits? 2. As I see you don't separate offset commits by whether they're sync or async. For sync commits timing isn't really a problem but how would you change work in case of async offset commits? There can be a few caveats there as you may not know whether a commit is finished or not until your callback is called. Thanks, Viktor On Sat, Sep 23, 2023 at 4:00 PM Erik van Oosten wrote: Hi all, I would like to start the discussion on KIP-983: Full speed async processing during rebalance [1]. The idea is that we can prevent the drop in throughput during a cooperative rebalance. I am curious to your ideas and comments. Kind regards, Erik. [1] https://cwiki.apache.org/confluence/display/KAFKA/KIP-983%3A+Full+speed+async+processing+during+rebalance -- Erik van Oosten e.vanoos...@grons.nl https://day-to-day-stuff.blogspot.com
[DISCUSS] KIP-983: Full speed async processing during rebalance
Hi all, I would like to start the discussion on KIP-983: Full speed async processing during rebalance [1]. The idea is that we can prevent the drop in throughput during a cooperative rebalance. I am curious to your ideas and comments. Kind regards, Erik. [1] https://cwiki.apache.org/confluence/display/KAFKA/KIP-983%3A+Full+speed+async+processing+during+rebalance -- Erik van Oosten e.vanoos...@grons.nl https://day-to-day-stuff.blogspot.com
[jira] [Resolved] (KAFKA-14972) Make KafkaConsumer usable in async runtimes
[ https://issues.apache.org/jira/browse/KAFKA-14972?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Erik van Oosten resolved KAFKA-14972. - Resolution: Won't Fix > Make KafkaConsumer usable in async runtimes > --- > > Key: KAFKA-14972 > URL: https://issues.apache.org/jira/browse/KAFKA-14972 > Project: Kafka > Issue Type: Wish > Components: consumer > Reporter: Erik van Oosten >Priority: Major > Labels: needs-kip > > KafkaConsumer contains a check that rejects nested invocations from different > threads (method {{{}acquire{}}}). For users that use an async runtime, this > is an almost impossible requirement. Examples of async runtimes that are > affected are Kotlin co-routines (see KAFKA-7143) and Zio. > It should be possible for a thread to pass on its capability to access the > consumer to another thread. See > [KIP-944|https://cwiki.apache.org/confluence/x/chw0Dw] for a proposal and > [https://github.com/apache/kafka/pull/13914] for an implementation. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [VOTE] KIP-944 Support async runtimes in consumer, votes needed!
Hi Philip, Colin, Chris, Matthias, Kirk, David, Xiangyuan LI, KIP-944 was extended a bit more to explain why effect systems like Zio and Cats-effects make it impossible to run code on a specific thread. I understand that using an effect system is pretty far removed from writing Java in transaction script style, the style that is probably used by most Kafka committers. I took me quite some time to get comfortable with effects. It is not the academic fringe tool as perceived by many. For me it is a way to quickly and correctly write serious data processing applications. Even so, we both use the same Kafka eco-system and supporting different styles only makes it more rich. IMHO it would be a shame if we can not live together using the same code base. Philip, thanks for your support. I hope I have convinced the others as well by now. If not, I am giving up and I will spend my energy elsewhere. Kind regards, Erik. Op 24-07-2023 om 18:12 schreef Erik van Oosten: Hello Xiangyuan LI, I am not familiar with coroutines, nor with Kotlin. You will have to work with the documentation: https://kotlinlang.org/docs/coroutines-overview.html However, I am familiar with Zio and Cats-effects (both Scala libraries). In both Zio and Cats-effects one creates effects (aka workflows) which are descriptions of a computation. For example, when executing the Scala code `val effect = ZIO.attempt(println("Hello world!"))` one creates only a description; it does not print anything yet. The language to describe these effects is rich enough to describe entire applications including things like concurrency. In fact, the language is so rich, that it is the most convenient way that I know to safely write highly concurrent and async applications. For many developer teams the performance penalty (which is real but not big) is worth it. To execute a Zio or Cats effect one gives it to the runtime. The runtime then schedules the work on one of the threads in its thread-pool. Zio, nor Cats-effects supports running an effect on the thread that manages the thread-pool. I hope this clear enough. Kind regards, Erik. Op 24-07-2023 om 05:21 schreef Xiangyuan LI: Hi Erik: I read KIP-944 and email list roughly, it seems most Java developer not familiar with the conception of "coroutine" so cannot imagine why code of one function without Thread.start() may run in separate threads and even developer couldn't control it. Maybe you need a more elaborate description to demonstrate how coroutine code run. Erik van Oosten 于2023年7月23日周日 17:47写道: -- Erik van Oosten e.vanoos...@grons.nl https://day-to-day-stuff.blogspot.com
Re: [VOTE] KIP-944 Support async runtimes in consumer, votes needed!
Hello Xiangyuan LI, I am not familiar with coroutines, nor with Kotlin. You will have to work with the documentation: https://kotlinlang.org/docs/coroutines-overview.html However, I am familiar with Zio and Cats-effects (both Scala libraries). In both Zio and Cats-effects one creates effects (aka workflows) which are descriptions of a computation. For example, when executing the Scala code `val effect = ZIO.attempt(println("Hello world!"))` one creates only a description; it does not print anything yet. The language to describe these effects is rich enough to describe entire applications including things like concurrency. In fact, the language is so rich, that it is the most convenient way that I know to safely write highly concurrent and async applications. For many developer teams the performance penalty (which is real but not big) is worth it. To execute a Zio or Cats effect one gives it to the runtime. The runtime then schedules the work on one of the threads in its thread-pool. Zio, nor Cats-effects supports running an effect on the thread that manages the thread-pool. I hope this clear enough. Kind regards, Erik. Op 24-07-2023 om 05:21 schreef Xiangyuan LI: Hi Erik: I read KIP-944 and email list roughly, it seems most Java developer not familiar with the conception of "coroutine" so cannot imagine why code of one function without Thread.start() may run in separate threads and even developer couldn't control it. Maybe you need a more elaborate description to demonstrate how coroutine code run. Erik van Oosten 于2023年7月23日周日 17:47写道: Hi David, > Could you elaborate a bit more on why the callbacks must be ran in another thread vs in the invoker thread? I have been thinking on how to explain this for 2 months now and it is not easy. It has something to do with that you cannot control what a thread is doing if you have to also run on that thread. But I just realized that /for me/ it really comes down to this: We want to use Zio in the callback. Zio does not support it. There are more reasons as can be read in KAFKA-7143. But I do not know anything about Kotlin so I cannot elaborate on that. Kind regards, Erik. Op 22-07-2023 om 21:39 schreef David Jacot: Hi Erik, Thanks for the KIP. I would like to better understand the motivation of this KIP. I am not familiar with async runtimes so please excuse me if I ask stupid questions. Could you elaborate a bit more on why the callbacks must be ran in another thread vs in the invoker thread? This is not clear to me. In the example that you use with the ConsumerRebalanceListener, I would have thought that calling commitSync (without changing thread) would have achieved the same. The invoker has to wait anyway on the offset commit completion so using another thread does not bring any benefit here. I suppose that I am missing something here… Regarding Chris’ proposal, this feels like a hack to me. The issue with it is that we cannot guarantee it in the long term if it is not part of *the* Consumer interface. I second what Chris said. We are all trying to understand the motivation in order to find a good solution for Kafka. I apologize if this creates frustration. This is definitely not our goal. Best, David PS: I just saw that you opened a new KIP based on Chris’ idea. This is not necessary. You can just update the current KIP based on the discussion. Le sam. 22 juil. 2023 à 18:34, Erik van Oosten a écrit : Colin, Matthias, Chris, I have expanded the use case description in KIP-944. I hope it is more clear what we're trying to achieve. https://cwiki.apache.org/confluence/x/chw0Dw Kind regards, Erik. Op 22-07-2023 om 17:23 schreef Erik van Oosten: Hello Chris, Thanks for elaborating Matthias' words. Apparently the use case description is too terse. Indeed, that is not FUD and that is something I can work with. It's also worth mentioning that what's proposed in the KIP is only blocked by the private access modifier on the KafkaConsumer::acquire and KafkaConsumer::release methods. If we upgraded the visibility of these methods from private to protected, it would be possible for subclasses to implement the proposal in KIP-944, without any KIPs or other changes to the official Java clients library. That is absolutely brilliant! Since I am pretty sure I am using the consumer correctly, I could replace acquire and release with an empty method body and be done. /Is making acquire and release protected something that other people can live with?/ If yes, I will create a new PR with just that change. Kind regards, Erik. Op 22-07-2023 om 16:39 schreef Chris Egerton: Hi Erik, I don't think Matthias is bringing FUD to the discussion. Many of the people who maintain Kafka are familiar with Kafka client internals and the Java programming language, but not necessarily other JVM languages or asynchronous runtimes. I think it's reasonable to ask fo
Re: [VOTE] KIP-944 Support async runtimes in consumer, votes needed!
Hi David, > Could you elaborate a bit more on why the callbacks must be ran in another thread vs in the invoker thread? I have been thinking on how to explain this for 2 months now and it is not easy. It has something to do with that you cannot control what a thread is doing if you have to also run on that thread. But I just realized that /for me/ it really comes down to this: We want to use Zio in the callback. Zio does not support it. There are more reasons as can be read in KAFKA-7143. But I do not know anything about Kotlin so I cannot elaborate on that. Kind regards, Erik. Op 22-07-2023 om 21:39 schreef David Jacot: Hi Erik, Thanks for the KIP. I would like to better understand the motivation of this KIP. I am not familiar with async runtimes so please excuse me if I ask stupid questions. Could you elaborate a bit more on why the callbacks must be ran in another thread vs in the invoker thread? This is not clear to me. In the example that you use with the ConsumerRebalanceListener, I would have thought that calling commitSync (without changing thread) would have achieved the same. The invoker has to wait anyway on the offset commit completion so using another thread does not bring any benefit here. I suppose that I am missing something here… Regarding Chris’ proposal, this feels like a hack to me. The issue with it is that we cannot guarantee it in the long term if it is not part of *the* Consumer interface. I second what Chris said. We are all trying to understand the motivation in order to find a good solution for Kafka. I apologize if this creates frustration. This is definitely not our goal. Best, David PS: I just saw that you opened a new KIP based on Chris’ idea. This is not necessary. You can just update the current KIP based on the discussion. Le sam. 22 juil. 2023 à 18:34, Erik van Oosten a écrit : Colin, Matthias, Chris, I have expanded the use case description in KIP-944. I hope it is more clear what we're trying to achieve. https://cwiki.apache.org/confluence/x/chw0Dw Kind regards, Erik. Op 22-07-2023 om 17:23 schreef Erik van Oosten: Hello Chris, Thanks for elaborating Matthias' words. Apparently the use case description is too terse. Indeed, that is not FUD and that is something I can work with. It's also worth mentioning that what's proposed in the KIP is only blocked by the private access modifier on the KafkaConsumer::acquire and KafkaConsumer::release methods. If we upgraded the visibility of these methods from private to protected, it would be possible for subclasses to implement the proposal in KIP-944, without any KIPs or other changes to the official Java clients library. That is absolutely brilliant! Since I am pretty sure I am using the consumer correctly, I could replace acquire and release with an empty method body and be done. /Is making acquire and release protected something that other people can live with?/ If yes, I will create a new PR with just that change. Kind regards, Erik. Op 22-07-2023 om 16:39 schreef Chris Egerton: Hi Erik, I don't think Matthias is bringing FUD to the discussion. Many of the people who maintain Kafka are familiar with Kafka client internals and the Java programming language, but not necessarily other JVM languages or asynchronous runtimes. I think it's reasonable to ask for a code snippet or two that demonstrates what you'd like to do with the consumer today that you can't because of restrictions around concurrent access, and this is not already addressed in the KIP. Linking to a docs page on Kotlin coroutines is helpful but still requires reviewers to gather a lot of context on their own that could more easily be provided in the KIP, and although the description of KAFKA-7143 is more detailed, I find it a little hard to follow as someone who isn't already familiar with the environment the user is working in. It's also worth mentioning that what's proposed in the KIP is only blocked by the private access modifier on the KafkaConsumer::acquire and KafkaConsumer::release methods. If we upgraded the visibility of these methods from private to protected, it would be possible for subclasses to implement the proposal in KIP-944, without any KIPs or other changes to the official Java clients library. Best, Chris On Sat, Jul 22, 2023 at 4:24 AM Erik van Oosten wrote: Hi Matthias, I am getting a bit frustrated here. All the concerns and questions I have seen so far are addressed in KIP-944. Please let me know if they are not clear enough, but please do not come with FUD. Kind regards, Erik. Op 21-07-2023 om 21:13 schreef Matthias J. Sax: I am not a clients (or threading) expert, but I tend to agree to Colin's concerns. In particular, it would be nice to see an example how you intent to use the API (I am not familiar with Kotlin or it's co-routins), to better understand what this changes help to solve to begin with. Opening up the consumer sounds potentially danger
[DISCUSS] KIP-957 Support async runtimes in consumer
Hello developers of the Java consumer, This is a simpler alternative to KIP-944 as proposed by Chris Egerton. In this proposal we make method acquire and release of the KafkaConsumer class protected. This allows anyone to implement these methods as appropriate for their environment. The wiki page for KIP-957 contains more details https://cwiki.apache.org/confluence/x/lY6zDw This is a call for discussion. If possible I would like to include this change in Kafka 3.6. Any questions, comments, ideas and other additions are welcome! Kind regards, Erik. -- Erik van Oosten e.vanoos...@grons.nl https://day-to-day-stuff.blogspot.com
Re: [VOTE] KIP-944 Support async runtimes in consumer, votes needed!
Colin, Matthias, Chris, I have expanded the use case description in KIP-944. I hope it is more clear what we're trying to achieve. https://cwiki.apache.org/confluence/x/chw0Dw Kind regards, Erik. Op 22-07-2023 om 17:23 schreef Erik van Oosten: Hello Chris, Thanks for elaborating Matthias' words. Apparently the use case description is too terse. Indeed, that is not FUD and that is something I can work with. > It's also worth mentioning that what's proposed in the KIP is only blocked by the private access modifier on the KafkaConsumer::acquire and KafkaConsumer::release methods. If we upgraded the visibility of these methods from private to protected, it would be possible for subclasses to implement the proposal in KIP-944, without any KIPs or other changes to the official Java clients library. That is absolutely brilliant! Since I am pretty sure I am using the consumer correctly, I could replace acquire and release with an empty method body and be done. /Is making acquire and release protected something that other people can live with?/ If yes, I will create a new PR with just that change. Kind regards, Erik. Op 22-07-2023 om 16:39 schreef Chris Egerton: Hi Erik, I don't think Matthias is bringing FUD to the discussion. Many of the people who maintain Kafka are familiar with Kafka client internals and the Java programming language, but not necessarily other JVM languages or asynchronous runtimes. I think it's reasonable to ask for a code snippet or two that demonstrates what you'd like to do with the consumer today that you can't because of restrictions around concurrent access, and this is not already addressed in the KIP. Linking to a docs page on Kotlin coroutines is helpful but still requires reviewers to gather a lot of context on their own that could more easily be provided in the KIP, and although the description of KAFKA-7143 is more detailed, I find it a little hard to follow as someone who isn't already familiar with the environment the user is working in. It's also worth mentioning that what's proposed in the KIP is only blocked by the private access modifier on the KafkaConsumer::acquire and KafkaConsumer::release methods. If we upgraded the visibility of these methods from private to protected, it would be possible for subclasses to implement the proposal in KIP-944, without any KIPs or other changes to the official Java clients library. Best, Chris On Sat, Jul 22, 2023 at 4:24 AM Erik van Oosten wrote: Hi Matthias, I am getting a bit frustrated here. All the concerns and questions I have seen so far are addressed in KIP-944. Please let me know if they are not clear enough, but please do not come with FUD. Kind regards, Erik. Op 21-07-2023 om 21:13 schreef Matthias J. Sax: I am not a clients (or threading) expert, but I tend to agree to Colin's concerns. In particular, it would be nice to see an example how you intent to use the API (I am not familiar with Kotlin or it's co-routins), to better understand what this changes help to solve to begin with. Opening up the consumer sounds potentially dangerous and we should weight opportunity and risk before making a decision. So far, I see risks but do not understand the opportunity you are after. -Matthias On 7/14/23 11:43 AM, Kirk True wrote: Hi Erik, Thanks for the KIP! I empathize with your frustration over the radio silence. It gets like that sometimes, and I apologize for my lack of feedback. I’d personally like to see this lively exchange move over to the DISCUSS thread you’d created before. Thanks, Kirk On Jul 14, 2023, at 1:33 AM, Erik van Oosten wrote: Hi Colin, The way I understood Philp's message is that KIP-944 also plays nice with KIP-945. But I might be mistaken. Regardless, KIP-945 does /not/ resolve the underlying problem (the need for nested consumer invocations) because it has the explicit goal of not changing the user facing API. ... KIP-945 but haven't posted a DISCUSS thread yet There is a thread called 'KafkaConsumer refactor proposal', but indeed no official discussion yet. I really don't want to be debugging complex interactions between Java thread-local variables and green threads. In that email thread, I proposed an API change in which callbacks are no longer needed. The proposal completely removes the need for such complex interactions. In addition, it gives clients the ability to process at full speed even while a coorperative rebalance is ongoing. Regards, Erik. Op 14-07-2023 om 00:36 schreef Colin McCabe: HI Philip & Erik, Hmm... if we agree that KIP-945 addresses this use case, I think it would be better to just focus on that KIP. Fundamentally it's a better and cleaner model than a complex scheme involving thread-local variables. I really don't want to be debugging complex interactions between Java thread-local variables and green threads. It also generally helps to have some use-cases in mind when
Re: [VOTE] KIP-944 Support async runtimes in consumer, votes needed!
Hi all, I have created https://github.com/apache/kafka/pull/14071 to implement Chris' idea. Kind regards, Erik. Op 22-07-2023 om 16:39 schreef Chris Egerton: Hi Erik, I don't think Matthias is bringing FUD to the discussion. Many of the people who maintain Kafka are familiar with Kafka client internals and the Java programming language, but not necessarily other JVM languages or asynchronous runtimes. I think it's reasonable to ask for a code snippet or two that demonstrates what you'd like to do with the consumer today that you can't because of restrictions around concurrent access, and this is not already addressed in the KIP. Linking to a docs page on Kotlin coroutines is helpful but still requires reviewers to gather a lot of context on their own that could more easily be provided in the KIP, and although the description of KAFKA-7143 is more detailed, I find it a little hard to follow as someone who isn't already familiar with the environment the user is working in. It's also worth mentioning that what's proposed in the KIP is only blocked by the private access modifier on the KafkaConsumer::acquire and KafkaConsumer::release methods. If we upgraded the visibility of these methods from private to protected, it would be possible for subclasses to implement the proposal in KIP-944, without any KIPs or other changes to the official Java clients library. Best, Chris On Sat, Jul 22, 2023 at 4:24 AM Erik van Oosten wrote: Hi Matthias, I am getting a bit frustrated here. All the concerns and questions I have seen so far are addressed in KIP-944. Please let me know if they are not clear enough, but please do not come with FUD. Kind regards, Erik. Op 21-07-2023 om 21:13 schreef Matthias J. Sax: I am not a clients (or threading) expert, but I tend to agree to Colin's concerns. In particular, it would be nice to see an example how you intent to use the API (I am not familiar with Kotlin or it's co-routins), to better understand what this changes help to solve to begin with. Opening up the consumer sounds potentially dangerous and we should weight opportunity and risk before making a decision. So far, I see risks but do not understand the opportunity you are after. -Matthias On 7/14/23 11:43 AM, Kirk True wrote: Hi Erik, Thanks for the KIP! I empathize with your frustration over the radio silence. It gets like that sometimes, and I apologize for my lack of feedback. I’d personally like to see this lively exchange move over to the DISCUSS thread you’d created before. Thanks, Kirk On Jul 14, 2023, at 1:33 AM, Erik van Oosten wrote: Hi Colin, The way I understood Philp's message is that KIP-944 also plays nice with KIP-945. But I might be mistaken. Regardless, KIP-945 does /not/ resolve the underlying problem (the need for nested consumer invocations) because it has the explicit goal of not changing the user facing API. ... KIP-945 but haven't posted a DISCUSS thread yet There is a thread called 'KafkaConsumer refactor proposal', but indeed no official discussion yet. I really don't want to be debugging complex interactions between Java thread-local variables and green threads. In that email thread, I proposed an API change in which callbacks are no longer needed. The proposal completely removes the need for such complex interactions. In addition, it gives clients the ability to process at full speed even while a coorperative rebalance is ongoing. Regards, Erik. Op 14-07-2023 om 00:36 schreef Colin McCabe: HI Philip & Erik, Hmm... if we agree that KIP-945 addresses this use case, I think it would be better to just focus on that KIP. Fundamentally it's a better and cleaner model than a complex scheme involving thread-local variables. I really don't want to be debugging complex interactions between Java thread-local variables and green threads. It also generally helps to have some use-cases in mind when writing these things. If we get feedback about what would be useful for async runtimes, that would probably help improve and focus KIP-945. By the way, I can see you have a draft on the wiki for KIP-945 but haven't posted a DISCUSS thread yet, so I assume it's not ready for review yet ;) best, Colin On Tue, Jul 11, 2023, at 12:24, Philip Nee wrote: Hey Erik - Another thing I want to add to my comment is. We are in-process of re-writing the KafkaConsumer, and I think your proposal would work in the new consumer because we are going to separate the user thread and the background thread. Here is the 1-pager, and we are in process of converting this in to KIP-945. Thanks, P On Tue, Jul 11, 2023 at 10:33 AM Philip Nee wrote: Hey Erik, Sorry for holding up this email for a few days since Colin's response includes some of my concerns. I'm in favor of this KIP, and I think your approach seems safe. Of course, I probably missed something therefore I think this KIP needs to cover different use cases to demonstrate it doesn't cause any unsafe ac
Re: [VOTE] KIP-944 Support async runtimes in consumer, votes needed!
Hello Chris, Thanks for elaborating Matthias' words. Apparently the use case description is too terse. Indeed, that is not FUD and that is something I can work with. > It's also worth mentioning that what's proposed in the KIP is only blocked by the private access modifier on the KafkaConsumer::acquire and KafkaConsumer::release methods. If we upgraded the visibility of these methods from private to protected, it would be possible for subclasses to implement the proposal in KIP-944, without any KIPs or other changes to the official Java clients library. That is absolutely brilliant! Since I am pretty sure I am using the consumer correctly, I could replace acquire and release with an empty method body and be done. /Is making acquire and release protected something that other people can live with?/ If yes, I will create a new PR with just that change. Kind regards, Erik. Op 22-07-2023 om 16:39 schreef Chris Egerton: Hi Erik, I don't think Matthias is bringing FUD to the discussion. Many of the people who maintain Kafka are familiar with Kafka client internals and the Java programming language, but not necessarily other JVM languages or asynchronous runtimes. I think it's reasonable to ask for a code snippet or two that demonstrates what you'd like to do with the consumer today that you can't because of restrictions around concurrent access, and this is not already addressed in the KIP. Linking to a docs page on Kotlin coroutines is helpful but still requires reviewers to gather a lot of context on their own that could more easily be provided in the KIP, and although the description of KAFKA-7143 is more detailed, I find it a little hard to follow as someone who isn't already familiar with the environment the user is working in. It's also worth mentioning that what's proposed in the KIP is only blocked by the private access modifier on the KafkaConsumer::acquire and KafkaConsumer::release methods. If we upgraded the visibility of these methods from private to protected, it would be possible for subclasses to implement the proposal in KIP-944, without any KIPs or other changes to the official Java clients library. Best, Chris On Sat, Jul 22, 2023 at 4:24 AM Erik van Oosten wrote: Hi Matthias, I am getting a bit frustrated here. All the concerns and questions I have seen so far are addressed in KIP-944. Please let me know if they are not clear enough, but please do not come with FUD. Kind regards, Erik. Op 21-07-2023 om 21:13 schreef Matthias J. Sax: I am not a clients (or threading) expert, but I tend to agree to Colin's concerns. In particular, it would be nice to see an example how you intent to use the API (I am not familiar with Kotlin or it's co-routins), to better understand what this changes help to solve to begin with. Opening up the consumer sounds potentially dangerous and we should weight opportunity and risk before making a decision. So far, I see risks but do not understand the opportunity you are after. -Matthias On 7/14/23 11:43 AM, Kirk True wrote: Hi Erik, Thanks for the KIP! I empathize with your frustration over the radio silence. It gets like that sometimes, and I apologize for my lack of feedback. I’d personally like to see this lively exchange move over to the DISCUSS thread you’d created before. Thanks, Kirk On Jul 14, 2023, at 1:33 AM, Erik van Oosten wrote: Hi Colin, The way I understood Philp's message is that KIP-944 also plays nice with KIP-945. But I might be mistaken. Regardless, KIP-945 does /not/ resolve the underlying problem (the need for nested consumer invocations) because it has the explicit goal of not changing the user facing API. ... KIP-945 but haven't posted a DISCUSS thread yet There is a thread called 'KafkaConsumer refactor proposal', but indeed no official discussion yet. I really don't want to be debugging complex interactions between Java thread-local variables and green threads. In that email thread, I proposed an API change in which callbacks are no longer needed. The proposal completely removes the need for such complex interactions. In addition, it gives clients the ability to process at full speed even while a coorperative rebalance is ongoing. Regards, Erik. Op 14-07-2023 om 00:36 schreef Colin McCabe: HI Philip & Erik, Hmm... if we agree that KIP-945 addresses this use case, I think it would be better to just focus on that KIP. Fundamentally it's a better and cleaner model than a complex scheme involving thread-local variables. I really don't want to be debugging complex interactions between Java thread-local variables and green threads. It also generally helps to have some use-cases in mind when writing these things. If we get feedback about what would be useful for async runtimes, that would probably help improve and focus KIP-945. By the way, I can see you have a draft on the wiki for KIP-945 but haven't posted a DISCUSS thread yet, so I assume it's not ready for
Re: [VOTE] KIP-944 Support async runtimes in consumer, votes needed!
Hi Matthias, I am getting a bit frustrated here. All the concerns and questions I have seen so far are addressed in KIP-944. Please let me know if they are not clear enough, but please do not come with FUD. Kind regards, Erik. Op 21-07-2023 om 21:13 schreef Matthias J. Sax: I am not a clients (or threading) expert, but I tend to agree to Colin's concerns. In particular, it would be nice to see an example how you intent to use the API (I am not familiar with Kotlin or it's co-routins), to better understand what this changes help to solve to begin with. Opening up the consumer sounds potentially dangerous and we should weight opportunity and risk before making a decision. So far, I see risks but do not understand the opportunity you are after. -Matthias On 7/14/23 11:43 AM, Kirk True wrote: Hi Erik, Thanks for the KIP! I empathize with your frustration over the radio silence. It gets like that sometimes, and I apologize for my lack of feedback. I’d personally like to see this lively exchange move over to the DISCUSS thread you’d created before. Thanks, Kirk On Jul 14, 2023, at 1:33 AM, Erik van Oosten wrote: Hi Colin, The way I understood Philp's message is that KIP-944 also plays nice with KIP-945. But I might be mistaken. Regardless, KIP-945 does /not/ resolve the underlying problem (the need for nested consumer invocations) because it has the explicit goal of not changing the user facing API. ... KIP-945 but haven't posted a DISCUSS thread yet There is a thread called 'KafkaConsumer refactor proposal', but indeed no official discussion yet. I really don't want to be debugging complex interactions between Java thread-local variables and green threads. In that email thread, I proposed an API change in which callbacks are no longer needed. The proposal completely removes the need for such complex interactions. In addition, it gives clients the ability to process at full speed even while a coorperative rebalance is ongoing. Regards, Erik. Op 14-07-2023 om 00:36 schreef Colin McCabe: HI Philip & Erik, Hmm... if we agree that KIP-945 addresses this use case, I think it would be better to just focus on that KIP. Fundamentally it's a better and cleaner model than a complex scheme involving thread-local variables. I really don't want to be debugging complex interactions between Java thread-local variables and green threads. It also generally helps to have some use-cases in mind when writing these things. If we get feedback about what would be useful for async runtimes, that would probably help improve and focus KIP-945. By the way, I can see you have a draft on the wiki for KIP-945 but haven't posted a DISCUSS thread yet, so I assume it's not ready for review yet ;) best, Colin On Tue, Jul 11, 2023, at 12:24, Philip Nee wrote: Hey Erik - Another thing I want to add to my comment is. We are in-process of re-writing the KafkaConsumer, and I think your proposal would work in the new consumer because we are going to separate the user thread and the background thread. Here is the 1-pager, and we are in process of converting this in to KIP-945. Thanks, P On Tue, Jul 11, 2023 at 10:33 AM Philip Nee wrote: Hey Erik, Sorry for holding up this email for a few days since Colin's response includes some of my concerns. I'm in favor of this KIP, and I think your approach seems safe. Of course, I probably missed something therefore I think this KIP needs to cover different use cases to demonstrate it doesn't cause any unsafe access. I think this can be demonstrated via diagrams and some code in the KIP. Thanks, P On Sat, Jul 8, 2023 at 12:28 PM Erik van Oosten wrote: Hello Colin, >> In KIP-944, the callback thread can only delegate to another thread after reading from and writing to a threadlocal variable, providing the barriers right there. > I don't see any documentation that accessing thread local variables provides a total store or load barrier. Do you have such documentation? It seems like if this were the case, we could eliminate volatile variables from most of the code base. Now I was imprecise. The thread-locals are only somewhat involved. In the KIP proposal the callback thread reads an access key from a thread-local variable. It then needs to pass that access key to another thread, which then can set it on its own thread-local variable. The act of passing a value from one thread to another implies that a memory barrier needs to be passed. However, this is all not so relevant since there is no need to pass the access key back when the other thread is done. But now I think about it a bit more, the locking mechanism runs in a synchronized block. If I remember correctly this should be enough to pass read and write barriers. >> In the current implementation the consumer is also invoked from random threads. If it works now, it should continue to work. > I'
Re: [DISCUSS] KIP-944 Support async runtimes in consumer
Hi Colin, Philip, Kirk, As far as I am aware all concerns about KIP-944 have been addressed. Including those about dirty reads between threads and green threads because of missing memory barriers. If you agree, I would like to open the vote again. If not, please let me know. I'll open another KIP with a proposal on how to improve the consumer API so that we don't need any thread trickery anymore. I would rather not wait for that one because there will be a lot work before that can even be implemented. Once KIP-944 has been accepted, I'll work on adding the unit tests that are described in the KIP. Kind regards, Erik. Op 30-06-2023 om 07:56 schreef Erik van Oosten: [This is a resend with the correct KIP number.] Hello developers of the Java based consumer, I submitted https://github.com/apache/kafka/pull/13914 to fix a long standing problem that the Kafka consumer on the JVM is not usable from asynchronous runtimes such as Kotlin co-routines and ZIO. However, since it extends the public API I was requested to create a KIP. So here it is: KIP-944 Support async runtimes in consumer https://cwiki.apache.org/confluence/x/chw0Dw Any questions, comments, ideas and other additions are welcome! The KIP should be complete except for the testing section. As far as I am aware there are no tests for the current behavior. Any help in this area would be appreciated. Kind regards, Erik. -- Erik van Oosten e.vanoos...@grons.nl https://day-to-day-stuff.blogspot.com
Re: [VOTE] KIP-944 Support async runtimes in consumer, votes needed!
Hi Colin, The way I understood Philp's message is that KIP-944 also plays nice with KIP-945. But I might be mistaken. Regardless, KIP-945 does /not/ resolve the underlying problem (the need for nested consumer invocations) because it has the explicit goal of not changing the user facing API. > ... KIP-945 but haven't posted a DISCUSS thread yet There is a thread called 'KafkaConsumer refactor proposal', but indeed no official discussion yet. > I really don't want to be debugging complex interactions between Java thread-local variables and green threads. In that email thread, I proposed an API change in which callbacks are no longer needed. The proposal completely removes the need for such complex interactions. In addition, it gives clients the ability to process at full speed even while a coorperative rebalance is ongoing. Regards, Erik. Op 14-07-2023 om 00:36 schreef Colin McCabe: HI Philip & Erik, Hmm... if we agree that KIP-945 addresses this use case, I think it would be better to just focus on that KIP. Fundamentally it's a better and cleaner model than a complex scheme involving thread-local variables. I really don't want to be debugging complex interactions between Java thread-local variables and green threads. It also generally helps to have some use-cases in mind when writing these things. If we get feedback about what would be useful for async runtimes, that would probably help improve and focus KIP-945. By the way, I can see you have a draft on the wiki for KIP-945 but haven't posted a DISCUSS thread yet, so I assume it's not ready for review yet ;) best, Colin On Tue, Jul 11, 2023, at 12:24, Philip Nee wrote: Hey Erik - Another thing I want to add to my comment is. We are in-process of re-writing the KafkaConsumer, and I think your proposal would work in the new consumer because we are going to separate the user thread and the background thread. Here is the 1-pager, and we are in process of converting this in to KIP-945. Thanks, P On Tue, Jul 11, 2023 at 10:33 AM Philip Nee wrote: Hey Erik, Sorry for holding up this email for a few days since Colin's response includes some of my concerns. I'm in favor of this KIP, and I think your approach seems safe. Of course, I probably missed something therefore I think this KIP needs to cover different use cases to demonstrate it doesn't cause any unsafe access. I think this can be demonstrated via diagrams and some code in the KIP. Thanks, P On Sat, Jul 8, 2023 at 12:28 PM Erik van Oosten wrote: Hello Colin, >> In KIP-944, the callback thread can only delegate to another thread after reading from and writing to a threadlocal variable, providing the barriers right there. > I don't see any documentation that accessing thread local variables provides a total store or load barrier. Do you have such documentation? It seems like if this were the case, we could eliminate volatile variables from most of the code base. Now I was imprecise. The thread-locals are only somewhat involved. In the KIP proposal the callback thread reads an access key from a thread-local variable. It then needs to pass that access key to another thread, which then can set it on its own thread-local variable. The act of passing a value from one thread to another implies that a memory barrier needs to be passed. However, this is all not so relevant since there is no need to pass the access key back when the other thread is done. But now I think about it a bit more, the locking mechanism runs in a synchronized block. If I remember correctly this should be enough to pass read and write barriers. >> In the current implementation the consumer is also invoked from random threads. If it works now, it should continue to work. > I'm not sure what you're referring to. Can you expand on this? Any invocation of the consumer (e.g. method poll) is not from a thread managed by the consumer. This is what I was assuming you meant with the term 'random thread'. > Hmm, not sure what you mean by "cooperate with blocking code." If you have 10 green threads you're multiplexing on to one CPU thread, and that CPU thread gets blocked because of what one green thread is doing, the other 9 green threads are blocked too, right? I guess it's "just" a performance problem, but it still seems like it could be a serious one. There are several ways to deal with this. All async runtimes I know (Akka, Zio, Cats-effects) support this by letting you mark a task as blocking. The runtime will then either schedule it to another thread-pool, or it will grow the thread-pool to accommodate. In any case 'the other 9 green threads' will simply be scheduled to another real thread. In addition, some of these runtimes detect long running tasks and will reschedule waiting tasks to another thread. This is all a bit off topic though. > I don't see why this has to be "inherently multi-threaded." Why c
Re: KafkaConsumer refactor proposal
Hi Philip, I have been scanning through https://cwiki.apache.org/confluence/display/KAFKA/Consumer+threading+refactor+design and KIP-848 and from this I understand that the kafka consumer API will not change. Perhaps the refactoring and/or KIP-848 is a good opportunity to improve the API somewhat. In this email I explain why and also give a rough idea what that could look like. In the current API, the rebalance listener callback gives the user a chance to commit all work in progress before a partition is actually revoked and assigned to another consumer. While the callback is doing all this, the main user thread is not able to process new incoming data. So the rebalance listener affects latency and throughput for non-revoked partitions during a rebalance. In addition, I feel that doing a lot of stuff /in/ a callback is always quite awkward. Better only use it to trigger some processing elsewhere. Therefore, I would like to propose a new API that does not have these problems and is easy to use (and I hope still easy to implement). In my ideal world, poll is the only method that you need. Lets call it poll2 (to do: come up with a less crappy name). Poll2 returns more than just the polled records, it will also contain newly assigned partitions, partitions that will be revoked during the next call to poll2, partitions that were lost, and perhaps it will even contain the offsets committed so far. The most important idea here is that partitions are not revoked immediately, but in the next call to poll2. With this API, a user can commit offsets at their own pace during a rebalance. Optionally, for the case that processing of data from the to-be-revoked partition is stil ongoing, we allow the user to postpone the actual revocation in the next poll, so that polling can continue for other partitions. Since we are no longer blocking the main user thread, partitions that are not revoked can be processed at full speed. Removal of the rebalance listener also makes the API safer; there is no more need for the thread-id check (nor KIP-944) because, concurrent invocations are simply no longer needed. (Of course, if backward compatibility is a goal, not all of these things can be done.) Curious to your thoughts and kind regards, Erik. -- Erik van Oosten e.vanoos...@grons.nl https://day-to-day-stuff.blogspot.com
Re: [VOTE] KIP-944 Support async runtimes in consumer, votes needed!
Hi Colin, Philip, I have added a section to KIIP-944 to address your concerns around memory consistency over multiple threads. You can read them here: https://cwiki.apache.org/confluence/display/KAFKA/KIP-944%3A+Support+async+runtimes+in+consumer#KIP944:Supportasyncruntimesinconsumer-Threadsafety Kind regards, Erik. Op 12-07-2023 om 13:24 schreef Erik van Oosten: Thanks Philip, > I think this can be demonstrated via diagrams and some code in the KIP. There are some diagrams in KIP-944. How can they be improved? I will add some code to address the concerns around memory barriers. > We are in-process of re-writing the KafkaConsumer Nice! I will read the KIP. Hopefully we don't need complex logic in callbacks after the rewrite. Kind regards, Erik. Op 11-07-2023 om 19:33 schreef Philip Nee: Hey Erik, Sorry for holding up this email for a few days since Colin's response includes some of my concerns. I'm in favor of this KIP, and I think your approach seems safe. Of course, I probably missed something therefore I think this KIP needs to cover different use cases to demonstrate it doesn't cause any unsafe access. I think this can be demonstrated via diagrams and some code in the KIP. Thanks, P On Sat, Jul 8, 2023 at 12:28 PM Erik van Oosten wrote: Hello Colin, >> In KIP-944, the callback thread can only delegate to another thread after reading from and writing to a threadlocal variable, providing the barriers right there. > I don't see any documentation that accessing thread local variables provides a total store or load barrier. Do you have such documentation? It seems like if this were the case, we could eliminate volatile variables from most of the code base. Now I was imprecise. The thread-locals are only somewhat involved. In the KIP proposal the callback thread reads an access key from a thread-local variable. It then needs to pass that access key to another thread, which then can set it on its own thread-local variable. The act of passing a value from one thread to another implies that a memory barrier needs to be passed. However, this is all not so relevant since there is no need to pass the access key back when the other thread is done. But now I think about it a bit more, the locking mechanism runs in a synchronized block. If I remember correctly this should be enough to pass read and write barriers. >> In the current implementation the consumer is also invoked from random threads. If it works now, it should continue to work. > I'm not sure what you're referring to. Can you expand on this? Any invocation of the consumer (e.g. method poll) is not from a thread managed by the consumer. This is what I was assuming you meant with the term 'random thread'. > Hmm, not sure what you mean by "cooperate with blocking code." If you have 10 green threads you're multiplexing on to one CPU thread, and that CPU thread gets blocked because of what one green thread is doing, the other 9 green threads are blocked too, right? I guess it's "just" a performance problem, but it still seems like it could be a serious one. There are several ways to deal with this. All async runtimes I know (Akka, Zio, Cats-effects) support this by letting you mark a task as blocking. The runtime will then either schedule it to another thread-pool, or it will grow the thread-pool to accommodate. In any case 'the other 9 green threads' will simply be scheduled to another real thread. In addition, some of these runtimes detect long running tasks and will reschedule waiting tasks to another thread. This is all a bit off topic though. > I don't see why this has to be "inherently multi-threaded." Why can't we have the other threads report back what messages they've processed to the worker thread. Then it will be able to handle these callbacks without involving the other threads. Please consider the context which is that we are running inside the callback of the rebalance listener. The only way to execute something and also have a timeout on it is to run the something on another thread. Kind regards, Erik. Op 08-07-2023 om 19:17 schreef Colin McCabe: On Sat, Jul 8, 2023, at 02:41, Erik van Oosten wrote: Hi Colin, Thanks for your thoughts and taking the time to reply. Let me take away your concerns. None of your worries are an issue with the algorithm described in KIP-944. Here it goes: > It's not clear ot me that it's safe to access the Kafka consumer or producer concurrently from different threads. Concurrent access is /not/ a design goal of KIP-944. In fact, it goes through great lengths to make sure that this cannot happen. *The only design goal is to allow callbacks to call the consumer from another thread.* To make sure there are no more misunderstandings about this, I have added this goal to the KIP. Hi Erik, Sorry, I spoke imprecisely. My concern is not concurrent ac
Re: [VOTE] KIP-944 Support async runtimes in consumer, votes needed!
Thanks Philip, > I think this can be demonstrated via diagrams and some code in the KIP. There are some diagrams in KIP-944. How can they be improved? I will add some code to address the concerns around memory barriers. > We are in-process of re-writing the KafkaConsumer Nice! I will read the KIP. Hopefully we don't need complex logic in callbacks after the rewrite. Kind regards, Erik. Op 11-07-2023 om 19:33 schreef Philip Nee: Hey Erik, Sorry for holding up this email for a few days since Colin's response includes some of my concerns. I'm in favor of this KIP, and I think your approach seems safe. Of course, I probably missed something therefore I think this KIP needs to cover different use cases to demonstrate it doesn't cause any unsafe access. I think this can be demonstrated via diagrams and some code in the KIP. Thanks, P On Sat, Jul 8, 2023 at 12:28 PM Erik van Oosten wrote: Hello Colin, >> In KIP-944, the callback thread can only delegate to another thread after reading from and writing to a threadlocal variable, providing the barriers right there. > I don't see any documentation that accessing thread local variables provides a total store or load barrier. Do you have such documentation? It seems like if this were the case, we could eliminate volatile variables from most of the code base. Now I was imprecise. The thread-locals are only somewhat involved. In the KIP proposal the callback thread reads an access key from a thread-local variable. It then needs to pass that access key to another thread, which then can set it on its own thread-local variable. The act of passing a value from one thread to another implies that a memory barrier needs to be passed. However, this is all not so relevant since there is no need to pass the access key back when the other thread is done. But now I think about it a bit more, the locking mechanism runs in a synchronized block. If I remember correctly this should be enough to pass read and write barriers. >> In the current implementation the consumer is also invoked from random threads. If it works now, it should continue to work. > I'm not sure what you're referring to. Can you expand on this? Any invocation of the consumer (e.g. method poll) is not from a thread managed by the consumer. This is what I was assuming you meant with the term 'random thread'. > Hmm, not sure what you mean by "cooperate with blocking code." If you have 10 green threads you're multiplexing on to one CPU thread, and that CPU thread gets blocked because of what one green thread is doing, the other 9 green threads are blocked too, right? I guess it's "just" a performance problem, but it still seems like it could be a serious one. There are several ways to deal with this. All async runtimes I know (Akka, Zio, Cats-effects) support this by letting you mark a task as blocking. The runtime will then either schedule it to another thread-pool, or it will grow the thread-pool to accommodate. In any case 'the other 9 green threads' will simply be scheduled to another real thread. In addition, some of these runtimes detect long running tasks and will reschedule waiting tasks to another thread. This is all a bit off topic though. > I don't see why this has to be "inherently multi-threaded." Why can't we have the other threads report back what messages they've processed to the worker thread. Then it will be able to handle these callbacks without involving the other threads. Please consider the context which is that we are running inside the callback of the rebalance listener. The only way to execute something and also have a timeout on it is to run the something on another thread. Kind regards, Erik. Op 08-07-2023 om 19:17 schreef Colin McCabe: On Sat, Jul 8, 2023, at 02:41, Erik van Oosten wrote: Hi Colin, Thanks for your thoughts and taking the time to reply. Let me take away your concerns. None of your worries are an issue with the algorithm described in KIP-944. Here it goes: > It's not clear ot me that it's safe to access the Kafka consumer or producer concurrently from different threads. Concurrent access is /not/ a design goal of KIP-944. In fact, it goes through great lengths to make sure that this cannot happen. *The only design goal is to allow callbacks to call the consumer from another thread.* To make sure there are no more misunderstandings about this, I have added this goal to the KIP. Hi Erik, Sorry, I spoke imprecisely. My concern is not concurrent access, but multithreaded access in general. Basically cache line visibility issues. > This is true even if the accesses happen at different times, because modern CPUs require memory barriers to guarantee inter-thread visibilty of loads and stores. In KIP-944, the callback thread can only delegate to another thread after reading from and writing to a threadlocal variable, providing the barriers
Re: [VOTE] KIP-944 Support async runtimes in consumer, votes needed!
Hello Colin, >> In KIP-944, the callback thread can only delegate to another thread after reading from and writing to a threadlocal variable, providing the barriers right there. > I don't see any documentation that accessing thread local variables provides a total store or load barrier. Do you have such documentation? It seems like if this were the case, we could eliminate volatile variables from most of the code base. Now I was imprecise. The thread-locals are only somewhat involved. In the KIP proposal the callback thread reads an access key from a thread-local variable. It then needs to pass that access key to another thread, which then can set it on its own thread-local variable. The act of passing a value from one thread to another implies that a memory barrier needs to be passed. However, this is all not so relevant since there is no need to pass the access key back when the other thread is done. But now I think about it a bit more, the locking mechanism runs in a synchronized block. If I remember correctly this should be enough to pass read and write barriers. >> In the current implementation the consumer is also invoked from random threads. If it works now, it should continue to work. > I'm not sure what you're referring to. Can you expand on this? Any invocation of the consumer (e.g. method poll) is not from a thread managed by the consumer. This is what I was assuming you meant with the term 'random thread'. > Hmm, not sure what you mean by "cooperate with blocking code." If you have 10 green threads you're multiplexing on to one CPU thread, and that CPU thread gets blocked because of what one green thread is doing, the other 9 green threads are blocked too, right? I guess it's "just" a performance problem, but it still seems like it could be a serious one. There are several ways to deal with this. All async runtimes I know (Akka, Zio, Cats-effects) support this by letting you mark a task as blocking. The runtime will then either schedule it to another thread-pool, or it will grow the thread-pool to accommodate. In any case 'the other 9 green threads' will simply be scheduled to another real thread. In addition, some of these runtimes detect long running tasks and will reschedule waiting tasks to another thread. This is all a bit off topic though. > I don't see why this has to be "inherently multi-threaded." Why can't we have the other threads report back what messages they've processed to the worker thread. Then it will be able to handle these callbacks without involving the other threads. Please consider the context which is that we are running inside the callback of the rebalance listener. The only way to execute something and also have a timeout on it is to run the something on another thread. Kind regards, Erik. Op 08-07-2023 om 19:17 schreef Colin McCabe: On Sat, Jul 8, 2023, at 02:41, Erik van Oosten wrote: Hi Colin, Thanks for your thoughts and taking the time to reply. Let me take away your concerns. None of your worries are an issue with the algorithm described in KIP-944. Here it goes: > It's not clear ot me that it's safe to access the Kafka consumer or producer concurrently from different threads. Concurrent access is /not/ a design goal of KIP-944. In fact, it goes through great lengths to make sure that this cannot happen. *The only design goal is to allow callbacks to call the consumer from another thread.* To make sure there are no more misunderstandings about this, I have added this goal to the KIP. Hi Erik, Sorry, I spoke imprecisely. My concern is not concurrent access, but multithreaded access in general. Basically cache line visibility issues. > This is true even if the accesses happen at different times, because modern CPUs require memory barriers to guarantee inter-thread visibilty of loads and stores. In KIP-944, the callback thread can only delegate to another thread after reading from and writing to a threadlocal variable, providing the barriers right there. I don't see any documentation that accessing thread local variables provides a total store or load barrier. Do you have such documentation? It seems like if this were the case, we could eliminate volatile variables from most of the code base. > I know that there are at least a few locks in the consumer code now, due to our need to send heartbeats from a worker thread. I don't think those would be sufficient to protect a client that is making calls from random threads. In the current implementation the consumer is also invoked from random threads. If it works now, it should continue to work. I'm not sure what you're referring to. Can you expand on this? > There has been some discussion of moving to a more traditional model where people make calls to the client and the clients passes the given data to a single background worker thread. This would avo
Re: [VOTE] KIP-944 Support async runtimes in consumer, votes needed!
than to try to re-engineer the Kafka client ot work from random threads. There is actually somed good advice about how to handle multiple threads in the KafkaConsumer.java header file itself. Check the sections "One Consumer Per Thread" and "Decouple Consumption and Processing." What I'm recommending here is essentially the latter. I do understand that it's frustrating to not get a quick response. However, overall I think this one needs a lot more discussion before getting anywhere near a vote. I will leave a -1 just as a procedural step. Maybe some of the people working in the client area can also chime in. best, Colin On Thu, Jul 6, 2023, at 12:02, Erik van Oosten wrote: Dear PMCs, So far there have been 0 responses to KIP-944. I understand this may not be something that keeps you busy, but this KIP is important to people that use async runtimes like Zio, Cats and Kotlin. Is there anything you need to come to a decision? Kind regards, Erik. Op 05-07-2023 om 11:38 schreef Erik van Oosten: Hello all, I'd like to call a vote on KIP-944 Support async runtimes in consumer. It has has been 'under discussion' for 7 days now. 'Under discussion' between quotes, because there were 0 comments so far. I hope the KIP is clear! KIP description:https://cwiki.apache.org/confluence/x/chw0Dw Kind regards, Erik. -- Erik van Oosten e.vanoos...@grons.nl https://day-to-day-stuff.blogspot.com
Re: [VOTE] KIP-944 Support async runtimes in consumer, votes needed!
Dear PMCs, So far there have been 0 responses to KIP-944. I understand this may not be something that keeps you busy, but this KIP is important to people that use async runtimes like Zio, Cats and Kotlin. Is there anything you need to come to a decision? Kind regards, Erik. Op 05-07-2023 om 11:38 schreef Erik van Oosten: Hello all, I'd like to call a vote on KIP-944 Support async runtimes in consumer. It has has been 'under discussion' for 7 days now. 'Under discussion' between quotes, because there were 0 comments so far. I hope the KIP is clear! KIP description: https://cwiki.apache.org/confluence/x/chw0Dw Kind regards, Erik. -- Erik van Oosten e.vanoos...@grons.nl https://day-to-day-stuff.blogspot.com
[VOTE] KIP-944 Support async runtimes in consumer
Hello all, I'd like to call a vote on KIP-944 Support async runtimes in consumer. It has has been 'under discussion' for 7 days now. 'Under discussion' between quotes, because there were 0 comments so far. I hope the KIP is clear! KIP description: https://cwiki.apache.org/confluence/x/chw0Dw Kind regards, Erik.
[DISCUSS] KIP-944 Support async runtimes in consumer
[This is a resend with the correct KIP number.] Hello developers of the Java based consumer, I submitted https://github.com/apache/kafka/pull/13914 to fix a long standing problem that the Kafka consumer on the JVM is not usable from asynchronous runtimes such as Kotlin co-routines and ZIO. However, since it extends the public API I was requested to create a KIP. So here it is: KIP-944 Support async runtimes in consumer https://cwiki.apache.org/confluence/x/chw0Dw Any questions, comments, ideas and other additions are welcome! The KIP should be complete except for the testing section. As far as I am aware there are no tests for the current behavior. Any help in this area would be appreciated. Kind regards, Erik. -- Erik van Oosten e.vanoos...@grons.nl https://day-to-day-stuff.blogspot.com
Re: [DISCUSS] KIP-944 Support async runtimes in consumer
It seems KIP-941 was already taken. Updated to: KIP-944. Op 28-06-2023 om 10:11 schreef Erik van Oosten: Hello developers of the Java based consumer, I submitted https://github.com/apache/kafka/pull/13914 to fix a long standing problem that the Kafka consumer on the JVM is not usable from asynchronous runtimes such as Kotlin co-routines and ZIO. However, since it extends the public API I was requested to create a KIP. So here it is: KIP-944 Support async runtimes in consumer https://cwiki.apache.org/confluence/x/chw0Dw Any questions, comments, ideas and other additions are welcome! The KIP should be complete except for the testing section. As far as I am aware there are no tests for the current behavior. Any help in this area would be appreciated. Kind regards, Erik. -- Erik van Oosten e.vanoos...@grons.nl https://day-to-day-stuff.blogspot.com
[DISCUSS] KIP-941 Support async runtimes in consumer
Hello developers of the Java based consumer, I submitted https://github.com/apache/kafka/pull/13914 to fix a long standing problem that the Kafka consumer on the JVM is not usable from asynchronous runtimes such as Kotlin co-routines and ZIO. However, since it extends the public API I was requested to create a KIP. So here it is: KIP-941 Support async runtimes in consumer https://cwiki.apache.org/confluence/x/chw0Dw Any questions, comments, ideas and other additions are welcome! The KIP should be complete except for the testing section. As far as I am aware there are no tests for the current behavior. Any help in this area would be appreciated. Kind regards, Erik. -- Erik van Oosten e.vanoos...@grons.nl https://day-to-day-stuff.blogspot.com
Permissions to contribute to Apache Kafka
Dear reader, I would like to create a KIP and understand I need to request permissions for that. my wiki username: e.vanoos...@chello.nl (note, this is /not/ my email address) my Jira username: erikvanoosten Kind regards, Erik. -- Erik van Oosten e.vanoos...@grons.nl https://day-to-day-stuff.blogspot.com
Review request for Java Kafka consumer
Dear Kafka developers, I submitted https://github.com/apache/kafka/pull/13914 to fix a long standing problem that the Kafka consumer on the JVM is not usable from asynchronous runtimes such as Kotlin co-routines and ZIO. Your review is much appreciated. Kind regards, Erik. -- Erik van Oosten e.vanoos...@grons.nl https://day-to-day-stuff.blogspot.com
[jira] [Created] (KAFKA-14972) Make KafkaConsumer usable in async runtimes
Erik van Oosten created KAFKA-14972: --- Summary: Make KafkaConsumer usable in async runtimes Key: KAFKA-14972 URL: https://issues.apache.org/jira/browse/KAFKA-14972 Project: Kafka Issue Type: Wish Components: consumer Reporter: Erik van Oosten KafkaConsumer contains a check that rejects nested invocations from different threads (method {{{}acquire{}}}). For users that use an async runtime, this is an almost impossible requirement. Examples of async runtimes that are affected are Kotlin co-routines (see KAFKA-7143) and Zio. We propose to replace the thread-id check with an access-id that is stored on a thread-local variable. Existing programs will not be affected. Developers that work in an async runtime can pick up the access-id and set it on the thread-local variable in a thread of their choosing. Every time a callback is invoked a new access-id is generated. When the callback completes, the previous access-id is restored. This proposal does not make it impossible to use the client incorrectly. However, we think it strikes a good balance between making correct usage from an async runtime possible while making incorrect usage difficult. Alternatives considered: # Configuration that switches off the check completely. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: Kafka client needs KAFKA-10337 to cover async commit use case
Thanks! Here is Tom Lee's PR recreated on trunk: https://github.com/apache/kafka/pull/13678 I believe that this PR might not be complete though. When we only call commitAsync (repeatedly) from the rebalance listener callback method. Will the client's poll method ever be invoked? I suspect that no polling takes place in this scenario and that async commits will still not complete. With the changes of this PR, commitSync can be used as a workaround. I guess we can fix this by adding `client.pollNoWakeup()`, e.g. at the start of `ConsumerCoordinator.commitOffsetsAsync`. Is that an acceptable change? Kind regards, Erik. Op 05-05-2023 om 20:20 schreef Philip Nee: Hey Eric, Maybe its more straightforward to open a new PR. Thanks! P On Fri, May 5, 2023 at 9:36 AM Erik van Oosten wrote: If I were to rebase the old pull request and re-open KAFKA-10337, would it be considered for merging? Kind regards, Erik. Op 03-05-2023 om 09:21 schreef Erik van Oosten: Hi Philip, Firstly, could you explain the situation in that you would prefer to invoke commitAsync over commitSync in the rebalance listener? Of course! Short answer: we prefer commitAsync because we want to handle multiple partitions concurrently using the ZIO runtime. Long answer: this is in the context of zio-kafka. In zio-kafka the user writes code for a stream that processes data and does commits. In the library we intercept those commits and pass them to the KafkaConsumer. We also keep track of the offsets of handed out records. Together this information allows us to track when a stream is ready processing a partition and that it is safe to start the rebalance. All of this happens concurrently and asynchronously using the ZIO runtime. When calling commit inside the onPartitionRevoked callback the library does a thread-id check; we can only call the KafkaConsumer from the same thread that invoked us. This is unfortunate because it forces us to spin up a specialized single-threaded ZIO runtime inside the callback method. Though this runtime can run blocking methods like commitSync, it will need careful programming since all other tasks need to wait. (BTW, it would be great if there is an option to disable the thread-id check. It has more use cases, see for example KAFKA-7143.) is it your concern that we currently don't have a way to invoke the callback, and the user won't be to correctly handle these failed/successful async commits? Yes, that is correct. Sorry - I dug a bit into the old PR. Seems like the issue is there's broken contract that if the commitSync won't wait for the previous async commits to complete if it tries to commit an empty offset map. Indeed! I am assuming the same is true for commitAsync. The important thing is that we need something to get those callbacks. I would prefer commitAsync but if only commitSync gets fixed we can use that as well. If there is another method completely for this task, that would be good as well. Kind regards, Erik. Philip Nee schreef op 2023-05-02 21:49: Hey Erik, Just a couple of questions to you: Firstly, could you explain the situation in that you would prefer to invoke commitAsync over commitSync in the rebalance listener? Typically we would use the synchronized method to ensure the commits are completed before moving on with the rebalancing, which leads to my second comment/question. is it your concern that we currently don't have a way to invoke the callback, and the user won't be to correctly handle these failed/successful async commits? Thanks, P On Tue, May 2, 2023 at 12:22 PM Erik van Oosten wrote: Dear developers of the Kafka java client, It seems I have found a feature gap in the Kafka java client. KAFKA-10337 and its associated pull request on Github (from 2020!) would solve this, but it was closed without merging. We would love to see it being reconsidered for merging. This mail has the arguments for doing so. The javadoc of `ConsumerRebalanceListener` method `onPartitionsRevoked` recommends you commit all offsets within the method, thereby holding up the rebalance until those commits are done. The (perceived) feature gap is when the user is trying to do async commits from the rebalance listener; there is nothing available to trigger the callbacks of completed commits. Without these callback, there is no way to know when it is safe to return from onPartitionsRevoked. (We cannot call `poll` because the rebalance listener is already called from inside a poll.) Calling `commitAsync` with an empty offsets parameter seems a perfect candidate for triggering callbacks of earlier commits. Unfortunately, commitAsync doesn't behave that way. This is fixed by mentioned pull request. The pull request conversation has a comment saying that calling `commit` with an empty offsets parameter is not something that should happen. I found this a strange thing to say. First of all, the method does have special handling for this situation, negating
Re: Kafka client needs KAFKA-10337 to cover async commit use case
If I were to rebase the old pull request and re-open KAFKA-10337, would it be considered for merging? Kind regards, Erik. Op 03-05-2023 om 09:21 schreef Erik van Oosten: Hi Philip, Firstly, could you explain the situation in that you would prefer to invoke commitAsync over commitSync in the rebalance listener? Of course! Short answer: we prefer commitAsync because we want to handle multiple partitions concurrently using the ZIO runtime. Long answer: this is in the context of zio-kafka. In zio-kafka the user writes code for a stream that processes data and does commits. In the library we intercept those commits and pass them to the KafkaConsumer. We also keep track of the offsets of handed out records. Together this information allows us to track when a stream is ready processing a partition and that it is safe to start the rebalance. All of this happens concurrently and asynchronously using the ZIO runtime. When calling commit inside the onPartitionRevoked callback the library does a thread-id check; we can only call the KafkaConsumer from the same thread that invoked us. This is unfortunate because it forces us to spin up a specialized single-threaded ZIO runtime inside the callback method. Though this runtime can run blocking methods like commitSync, it will need careful programming since all other tasks need to wait. (BTW, it would be great if there is an option to disable the thread-id check. It has more use cases, see for example KAFKA-7143.) is it your concern that we currently don't have a way to invoke the callback, and the user won't be to correctly handle these failed/successful async commits? Yes, that is correct. Sorry - I dug a bit into the old PR. Seems like the issue is there's broken contract that if the commitSync won't wait for the previous async commits to complete if it tries to commit an empty offset map. Indeed! I am assuming the same is true for commitAsync. The important thing is that we need something to get those callbacks. I would prefer commitAsync but if only commitSync gets fixed we can use that as well. If there is another method completely for this task, that would be good as well. Kind regards, Erik. Philip Nee schreef op 2023-05-02 21:49: Hey Erik, Just a couple of questions to you: Firstly, could you explain the situation in that you would prefer to invoke commitAsync over commitSync in the rebalance listener? Typically we would use the synchronized method to ensure the commits are completed before moving on with the rebalancing, which leads to my second comment/question. is it your concern that we currently don't have a way to invoke the callback, and the user won't be to correctly handle these failed/successful async commits? Thanks, P On Tue, May 2, 2023 at 12:22 PM Erik van Oosten wrote: Dear developers of the Kafka java client, It seems I have found a feature gap in the Kafka java client. KAFKA-10337 and its associated pull request on Github (from 2020!) would solve this, but it was closed without merging. We would love to see it being reconsidered for merging. This mail has the arguments for doing so. The javadoc of `ConsumerRebalanceListener` method `onPartitionsRevoked` recommends you commit all offsets within the method, thereby holding up the rebalance until those commits are done. The (perceived) feature gap is when the user is trying to do async commits from the rebalance listener; there is nothing available to trigger the callbacks of completed commits. Without these callback, there is no way to know when it is safe to return from onPartitionsRevoked. (We cannot call `poll` because the rebalance listener is already called from inside a poll.) Calling `commitAsync` with an empty offsets parameter seems a perfect candidate for triggering callbacks of earlier commits. Unfortunately, commitAsync doesn't behave that way. This is fixed by mentioned pull request. The pull request conversation has a comment saying that calling `commit` with an empty offsets parameter is not something that should happen. I found this a strange thing to say. First of all, the method does have special handling for this situation, negating the comment outright. In addition this special handling violates the contract of the method (as specified in the javadoc section about ordering). Therefore, this pull request has 2 advantages: 1. KafkaConsumer.commitAsync will be more in line with its javadoc, 2. the feature gap is gone. Of course, it might be that I missed something and that there are other ways to trigger the commit callbacks. I would be very happy to hear about that because it means I do not have to wait for a release cycle. If you agree these arguments are sound, I would be happy to make the pull request mergable again. Curious to your thoughts and kind regards, Erik. -- Erik van Oosten e.vanoos...@grons.nl https://day-to-day-stuff.blogspot.com Committer of zio-kafkahttps://github.com
Re: Kafka client needs KAFKA-10337 to cover async commit use case
Hi Philip, Firstly, could you explain the situation in that you would prefer to invoke commitAsync over commitSync in the rebalance listener? Of course! Short answer: we prefer commitAsync because we want to handle multiple partitions concurrently using the ZIO runtime. Long answer: this is in the context of zio-kafka. In zio-kafka the user writes code for a stream that processes data and does commits. In the library we intercept those commits and pass them to the KafkaConsumer. We also keep track of the offsets of handed out records. Together this information allows us to track when a stream is ready processing a partition and that it is safe to start the rebalance. All of this happens concurrently and asynchronously using the ZIO runtime. When calling commit inside the onPartitionRevoked callback the library does a thread-id check; we can only call the KafkaConsumer from the same thread that invoked us. This is unfortunate because it forces us to spin up a specialized single-threaded ZIO runtime inside the callback method. Though this runtime can run blocking methods like commitSync, it will need careful programming since all other tasks need to wait. (BTW, it would be great if there is an option to disable the thread-id check. It has more use cases, see for example KAFKA-7143.) is it your concern that we currently don't have a way to invoke the callback, and the user won't be to correctly handle these failed/successful async commits? Yes, that is correct. Sorry - I dug a bit into the old PR. Seems like the issue is there's broken contract that if the commitSync won't wait for the previous async commits to complete if it tries to commit an empty offset map. Indeed! I am assuming the same is true for commitAsync. The important thing is that we need something to get those callbacks. I would prefer commitAsync but if only commitSync gets fixed we can use that as well. If there is another method completely for this task, that would be good as well. Kind regards, Erik. Philip Nee schreef op 2023-05-02 21:49: Hey Erik, Just a couple of questions to you: Firstly, could you explain the situation in that you would prefer to invoke commitAsync over commitSync in the rebalance listener? Typically we would use the synchronized method to ensure the commits are completed before moving on with the rebalancing, which leads to my second comment/question. is it your concern that we currently don't have a way to invoke the callback, and the user won't be to correctly handle these failed/successful async commits? Thanks, P On Tue, May 2, 2023 at 12:22 PM Erik van Oosten wrote: Dear developers of the Kafka java client, It seems I have found a feature gap in the Kafka java client. KAFKA-10337 and its associated pull request on Github (from 2020!) would solve this, but it was closed without merging. We would love to see it being reconsidered for merging. This mail has the arguments for doing so. The javadoc of `ConsumerRebalanceListener` method `onPartitionsRevoked` recommends you commit all offsets within the method, thereby holding up the rebalance until those commits are done. The (perceived) feature gap is when the user is trying to do async commits from the rebalance listener; there is nothing available to trigger the callbacks of completed commits. Without these callback, there is no way to know when it is safe to return from onPartitionsRevoked. (We cannot call `poll` because the rebalance listener is already called from inside a poll.) Calling `commitAsync` with an empty offsets parameter seems a perfect candidate for triggering callbacks of earlier commits. Unfortunately, commitAsync doesn't behave that way. This is fixed by mentioned pull request. The pull request conversation has a comment saying that calling `commit` with an empty offsets parameter is not something that should happen. I found this a strange thing to say. First of all, the method does have special handling for this situation, negating the comment outright. In addition this special handling violates the contract of the method (as specified in the javadoc section about ordering). Therefore, this pull request has 2 advantages: 1. KafkaConsumer.commitAsync will be more in line with its javadoc, 2. the feature gap is gone. Of course, it might be that I missed something and that there are other ways to trigger the commit callbacks. I would be very happy to hear about that because it means I do not have to wait for a release cycle. If you agree these arguments are sound, I would be happy to make the pull request mergable again. Curious to your thoughts and kind regards, Erik. -- Erik van Oosten e.vanoos...@grons.nl https://day-to-day-stuff.blogspot.com Committer of zio-kafkahttps://github.com/zio/zio-kafka
Kafka client needs KAFKA-10337 to cover async commit use case
Dear developers of the Kafka java client, It seems I have found a feature gap in the Kafka java client. KAFKA-10337 and its associated pull request on Github (from 2020!) would solve this, but it was closed without merging. We would love to see it being reconsidered for merging. This mail has the arguments for doing so. The javadoc of `ConsumerRebalanceListener` method `onPartitionsRevoked` recommends you commit all offsets within the method, thereby holding up the rebalance until those commits are done. The (perceived) feature gap is when the user is trying to do async commits from the rebalance listener; there is nothing available to trigger the callbacks of completed commits. Without these callback, there is no way to know when it is safe to return from onPartitionsRevoked. (We cannot call `poll` because the rebalance listener is already called from inside a poll.) Calling `commitAsync` with an empty offsets parameter seems a perfect candidate for triggering callbacks of earlier commits. Unfortunately, commitAsync doesn't behave that way. This is fixed by mentioned pull request. The pull request conversation has a comment saying that calling `commit` with an empty offsets parameter is not something that should happen. I found this a strange thing to say. First of all, the method does have special handling for this situation, negating the comment outright. In addition this special handling violates the contract of the method (as specified in the javadoc section about ordering). Therefore, this pull request has 2 advantages: 1. KafkaConsumer.commitAsync will be more in line with its javadoc, 2. the feature gap is gone. Of course, it might be that I missed something and that there are other ways to trigger the commit callbacks. I would be very happy to hear about that because it means I do not have to wait for a release cycle. If you agree these arguments are sound, I would be happy to make the pull request mergable again. Curious to your thoughts and kind regards, Erik. -- Erik van Oosten e.vanoos...@grons.nl https://day-to-day-stuff.blogspot.com Committer of zio-kafkahttps://github.com/zio/zio-kafka
[jira] [Commented] (KAFKA-960) Upgrade Metrics to 3.x
[ https://issues.apache.org/jira/browse/KAFKA-960?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14194990#comment-14194990 ] Erik van Oosten commented on KAFKA-960: --- If 2.20 and 2.1.5 are indeed binary compatible (how do you test that?), _all existing_ releases could be patched by simply replacing a jar :) Upgrade Metrics to 3.x -- Key: KAFKA-960 URL: https://issues.apache.org/jira/browse/KAFKA-960 Project: Kafka Issue Type: Improvement Affects Versions: 0.8.1 Reporter: Cosmin Lehene Now that metrics 3.0 has been released (http://metrics.codahale.com/about/release-notes/) we can upgrade back -- This message was sent by Atlassian JIRA (v6.3.4#6332)