[ https://issues.apache.org/jira/browse/KAFKA-7143?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Raman Gupta updated KAFKA-7143: ------------------------------- Description: I am using a new KafkaConsumer via Kotlin. My application makes use of Kotlin [coroutines|https://kotlinlang.org/docs/reference/coroutines.html], which supports a style of async programming that avoids the need for callbacks (and existing callback-based API's such as Kafka's can easily be adapted to this). With coroutines, methods with callbacks are suspended, and resumed once the call is complete. With this approach, while access to the KafkaConsumer is done in a thread-safe way, it does NOT happen from a single thread -- a different underlying thread may actually execute the code after the suspension point. However, the KafkaConsumer includes additional checks to verify not only the thread safety of the client, but that the *same thread* is being used -- if the same thread (by id) is not being used the consumer throws an exception like: {code} Exception in thread "ForkJoinPool.commonPool-worker-25" java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1824) at org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1808) at org.apache.kafka.clients.consumer.KafkaConsumer.commitAsync(KafkaConsumer.java:1321) {code} I understand this check is present to protect people from themselves, but I'd like the ability to disable this check so that this code can be used effectively by libraries such as Kotlin coroutines. There is a workaround for the above: run the consumer in a coroutine with a single-thread context, which isn't ideal because it dedicates a thread to the consumer. However, further problems await -- the `commitAsync` method also cannot be used with coroutines because the callback is never executed and therefore the coroutine is never resumed past the suspension point. The callback seems to only be executed on future calls to poll, which will never happen because of the suspension, so we have a deadlock. I guess the idea behind this API design is that consuming new messages may continue, even though commits of previous offsets have not necessarily been processed. However, with a coroutine based API, the commitAsync can be sequential before the next poll like commitSync, but happen asynchronously without tying up a client application thread. was: I am using a new KafkaConsumer via Kotlin. My application makes use of Kotlin [coroutines|https://kotlinlang.org/docs/reference/coroutines.html], which supports a style of async programming that avoids the need for callbacks (and existing callback-based API's such as Kafka's can easily be adapted to this). With coroutines, methods with callbacks are suspended, and resumed once the call is complete. With this approach, while access to the KafkaConsumer is done in a thread-safe way, it does NOT happen from a single thread -- a different underlying thread may actually execute the code after the suspension point. However, the KafkaConsumer includes additional checks to verify not only the thread safety of the client, but that the *same thread* is being used -- if the same thread (by id) is not being used the consumer throws an exception like: {code} Exception in thread "ForkJoinPool.commonPool-worker-25" java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1824) at org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1808) at org.apache.kafka.clients.consumer.KafkaConsumer.commitAsync(KafkaConsumer.java:1321) {code} I understand this check is present to protect people from themselves, but I'd like the ability to disable this check so that this code can be used effectively by libraries such as Kotlin coroutines. > Cannot use KafkaConsumer with Kotlin coroutines due to various issues > --------------------------------------------------------------------- > > Key: KAFKA-7143 > URL: https://issues.apache.org/jira/browse/KAFKA-7143 > Project: Kafka > Issue Type: Improvement > Components: clients > Affects Versions: 1.1.0 > Reporter: Raman Gupta > Priority: Major > > I am using a new KafkaConsumer via Kotlin. My application makes use of Kotlin > [coroutines|https://kotlinlang.org/docs/reference/coroutines.html], which > supports a style of async programming that avoids the need for callbacks (and > existing callback-based API's such as Kafka's can easily be adapted to this). > With coroutines, methods with callbacks are suspended, and resumed once the > call is complete. With this approach, while access to the KafkaConsumer is > done in a thread-safe way, it does NOT happen from a single thread -- a > different underlying thread may actually execute the code after the > suspension point. > However, the KafkaConsumer includes additional checks to verify not only the > thread safety of the client, but that the *same thread* is being used -- if > the same thread (by id) is not being used the consumer throws an exception > like: > {code} > Exception in thread "ForkJoinPool.commonPool-worker-25" > java.util.ConcurrentModificationException: KafkaConsumer is not safe for > multi-threaded access > at > org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1824) > at > org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1808) > at > org.apache.kafka.clients.consumer.KafkaConsumer.commitAsync(KafkaConsumer.java:1321) > {code} > I understand this check is present to protect people from themselves, but I'd > like the ability to disable this check so that this code can be used > effectively by libraries such as Kotlin coroutines. > There is a workaround for the above: run the consumer in a coroutine with a > single-thread context, which isn't ideal because it dedicates a thread to the > consumer. However, further problems await -- the `commitAsync` method also > cannot be used with coroutines because the callback is never executed and > therefore the coroutine is never resumed past the suspension point. The > callback seems to only be executed on future calls to poll, which will never > happen because of the suspension, so we have a deadlock. I guess the idea > behind this API design is that consuming new messages may continue, even > though commits of previous offsets have not necessarily been processed. > However, with a coroutine based API, the commitAsync can be sequential before > the next poll like commitSync, but happen asynchronously without tying up a > client application thread. -- This message was sent by Atlassian JIRA (v7.6.3#76005)