GitHub user gaborgsomogyi opened a pull request:

    https://github.com/apache/spark/pull/20997

    [SPARK-19185] [DSTREAMS] Avoid concurrent use of cached consumers in 
CachedKafkaConsumer

    ## What changes were proposed in this pull request?
    
    `CachedKafkaConsumer` in the project kafka-0-10 is designed to maintain a 
pool of KafkaConsumers that can be reused. However, it was built with the 
assumption there will be only one thread trying to read the same Kafka 
TopicPartition at the same time. This assumption is not true all the time and 
this can inadvertently lead to ConcurrentModificationException, or worse, 
silent reading of incorrect data.
    
    Here is a better way to design this. The consumer pool should be smart 
enough to avoid concurrent use of a cached consumer. If there is another 
request for the same TopicPartition as a currently in-use consumer, the pool 
should automatically return a fresh consumer.
    
    - There are effectively two kinds of consumer that may be generated
      - Cached consumer - this should be returned to the pool at task end
      - Non-cached consumer - this should be closed at task end
    - A trait called `KafkaDataConsumer` is introduced to hide this difference 
from the users of the consumer so that the client code does not have to reason 
about whether to stop and release. They simply call `val consumer = 
KafkaDataConsumer.acquire` and then `consumer.release`.
    - If there is request for a consumer that is in-use, then a new consumer is 
generated.
    - If there is request for a consumer which is a task reattempt, then 
already existing cached consumers will be invalidated and a new consumer is 
generated. This could fix potential issues if the source of the reattempt is a 
malfunctioning consumer.
    - In addition, I renamed the `CachedKafkaConsumer` class to 
`KafkaDataConsumer` because is a misnomer given that what it returns may or may 
not be cached.
    
    ## How was this patch tested?
    
    A new stress test that verifies it is safe to concurrently get consumers 
for the same TopicPartition from the consumer pool.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/gaborgsomogyi/spark SPARK-19185

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/20997.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #20997
    
----
commit 0fe456b48d93ed24cc59446b79ccfb32694295bc
Author: Gabor Somogyi <gabor.g.somogyi@...>
Date:   2018-03-20T03:04:04Z

    [SPARK-19185] [DSTREAMS] Avoid concurrent use of cached consumers in 
CachedKafkaConsumer

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to