Raman Gupta created KAFKA-7143:
----------------------------------

             Summary: Cannot use KafkaConsumer with Kotlin coroutines due to 
Thread id check
                 Key: KAFKA-7143
                 URL: https://issues.apache.org/jira/browse/KAFKA-7143
             Project: Kafka
          Issue Type: Improvement
          Components: clients
    Affects Versions: 1.1.0
            Reporter: Raman Gupta


I am using a new KafkaConsumer via Kotlin. My application makes use of Kotlin 
[coroutines|https://kotlinlang.org/docs/reference/coroutines.html], which 
supports a style of async programming that avoids the need for callbacks (and 
existing callback-based API's such as Kafka's can easily be adapted to this). 
With coroutines, methods with callbacks are suspended, and resumed once the 
call is complete. With this approach, while access to the KafkaConsumer is done 
in a thread-safe way, it does NOT happen from a single thread -- a different 
underlying thread may actually execute the code after the suspension point.

However, the KafkaConsumer includes additional checks to verify not only the 
thread safety of the client, but that the *same thread* is being used -- if the 
same thread (by id) is not being used the consumer throws an exception like:

{code}
Exception in thread "ForkJoinPool.commonPool-worker-25" 
java.util.ConcurrentModificationException: KafkaConsumer is not safe for 
multi-threaded access
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1824)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1808)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.commitAsync(KafkaConsumer.java:1321)
{code}

I understand this check is present to protect people from themselves, but I'd 
like the ability to disable this check so that this code can be used 
effectively by libraries such as Kotlin coroutines.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to