GitHub user eatoncys opened a pull request:

    https://github.com/apache/spark/pull/19819

    [SPARK-22606][Streaming]Add threadId to the CachedKafkaConsumer key

    ## What changes were proposed in this pull request?
    If the value of param 'spark.streaming.concurrentJobs' is more than one, 
and the value of param 'spark.executor.cores' is more than one, there may be 
two or more tasks in one executor will use the same kafka consumer at the same 
time, then it will throw an exception: "KafkaConsumer is not safe for 
multi-threaded access";
    for example:
    spark.streaming.concurrentJobs=2
    spark.executor.cores=2
    spark.cores.max=2
    if there is only one topic with one partition('topic1',0) to consume, there 
will be two jobs to run at the same time, and they will use the same 
cacheKey('groupid','topic1',0) to get the CachedKafkaConsumer from the cache 
list of' private var cache: ju.LinkedHashMap[CacheKey, CachedKafkaConsumer[_, 
_]]' , then it will get the same CachedKafkaConsumer.
    
    this PR add threadId  to the CachedKafkaConsumer key to prevent two thread 
using a consumer at the same time.
    
    
    
    ## How was this patch tested?
    existing ut test


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/eatoncys/spark kafka

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/19819.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #19819
    
----
commit aa02d8904fcbaa91df47ac224d90345bd555a372
Author: 10129659 <chen.yans...@zte.com.cn>
Date:   2017-11-25T08:15:17Z

    Add threadId to CachedKafkaConsumer key

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to