[ 
https://issues.apache.org/jira/browse/KAFKA-7143?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Raman Gupta updated KAFKA-7143:
-------------------------------
    Description: 
I am using a new KafkaConsumer via Kotlin. My application makes use of Kotlin 
[coroutines|https://kotlinlang.org/docs/reference/coroutines.html], which 
supports a style of async programming that avoids the need for callbacks (and 
existing callback-based API's such as Kafka's can easily be adapted to this). 
With coroutines, methods with callbacks are suspended, and resumed once the 
call is complete. With this approach, while access to the KafkaConsumer is done 
in a thread-safe way, it does NOT happen from a single thread -- a different 
underlying thread may actually execute the code after the suspension point.

However, the KafkaConsumer includes additional checks to verify not only the 
thread safety of the client, but that the *same thread* is being used -- if the 
same thread (by id) is not being used the consumer throws an exception like:

{code}
Exception in thread "ForkJoinPool.commonPool-worker-25" 
java.util.ConcurrentModificationException: KafkaConsumer is not safe for 
multi-threaded access
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1824)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1808)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.commitAsync(KafkaConsumer.java:1321)
{code}

I understand this check is present to protect people from themselves, but I'd 
like the ability to disable this check so that this code can be used 
effectively by libraries such as Kotlin coroutines.

There is a workaround for the above: run the consumer in a coroutine with a 
single-thread context, which isn't ideal because it dedicates a thread to the 
consumer. However, further problems await -- the `commitAsync` method also 
cannot be used with coroutines because the callback is never executed and 
therefore the coroutine is never resumed past the suspension point. The 
callback seems to only be executed on future calls to poll, which will never 
happen because of the suspension, so we have a deadlock. I guess the idea 
behind this API design is that consuming new messages may continue, even though 
commits of previous offsets have not necessarily been processed. However, with 
a coroutine based API, the commitAsync can be sequential before the next poll 
like commitSync, but happen asynchronously without tying up a client 
application thread.

  was:
I am using a new KafkaConsumer via Kotlin. My application makes use of Kotlin 
[coroutines|https://kotlinlang.org/docs/reference/coroutines.html], which 
supports a style of async programming that avoids the need for callbacks (and 
existing callback-based API's such as Kafka's can easily be adapted to this). 
With coroutines, methods with callbacks are suspended, and resumed once the 
call is complete. With this approach, while access to the KafkaConsumer is done 
in a thread-safe way, it does NOT happen from a single thread -- a different 
underlying thread may actually execute the code after the suspension point.

However, the KafkaConsumer includes additional checks to verify not only the 
thread safety of the client, but that the *same thread* is being used -- if the 
same thread (by id) is not being used the consumer throws an exception like:

{code}
Exception in thread "ForkJoinPool.commonPool-worker-25" 
java.util.ConcurrentModificationException: KafkaConsumer is not safe for 
multi-threaded access
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1824)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1808)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.commitAsync(KafkaConsumer.java:1321)
{code}

I understand this check is present to protect people from themselves, but I'd 
like the ability to disable this check so that this code can be used 
effectively by libraries such as Kotlin coroutines.


> Cannot use KafkaConsumer with Kotlin coroutines due to various issues
> ---------------------------------------------------------------------
>
>                 Key: KAFKA-7143
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7143
>             Project: Kafka
>          Issue Type: Improvement
>          Components: clients
>    Affects Versions: 1.1.0
>            Reporter: Raman Gupta
>            Priority: Major
>
> I am using a new KafkaConsumer via Kotlin. My application makes use of Kotlin 
> [coroutines|https://kotlinlang.org/docs/reference/coroutines.html], which 
> supports a style of async programming that avoids the need for callbacks (and 
> existing callback-based API's such as Kafka's can easily be adapted to this). 
> With coroutines, methods with callbacks are suspended, and resumed once the 
> call is complete. With this approach, while access to the KafkaConsumer is 
> done in a thread-safe way, it does NOT happen from a single thread -- a 
> different underlying thread may actually execute the code after the 
> suspension point.
> However, the KafkaConsumer includes additional checks to verify not only the 
> thread safety of the client, but that the *same thread* is being used -- if 
> the same thread (by id) is not being used the consumer throws an exception 
> like:
> {code}
> Exception in thread "ForkJoinPool.commonPool-worker-25" 
> java.util.ConcurrentModificationException: KafkaConsumer is not safe for 
> multi-threaded access
>       at 
> org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1824)
>       at 
> org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1808)
>       at 
> org.apache.kafka.clients.consumer.KafkaConsumer.commitAsync(KafkaConsumer.java:1321)
> {code}
> I understand this check is present to protect people from themselves, but I'd 
> like the ability to disable this check so that this code can be used 
> effectively by libraries such as Kotlin coroutines.
> There is a workaround for the above: run the consumer in a coroutine with a 
> single-thread context, which isn't ideal because it dedicates a thread to the 
> consumer. However, further problems await -- the `commitAsync` method also 
> cannot be used with coroutines because the callback is never executed and 
> therefore the coroutine is never resumed past the suspension point. The 
> callback seems to only be executed on future calls to poll, which will never 
> happen because of the suspension, so we have a deadlock. I guess the idea 
> behind this API design is that consuming new messages may continue, even 
> though commits of previous offsets have not necessarily been processed. 
> However, with a coroutine based API, the commitAsync can be sequential before 
> the next poll like commitSync, but happen asynchronously without tying up a 
> client application thread.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to