[jira] [Updated] (SPARK-23623) Avoid concurrent use of cached KafkaConsumer in CachedKafkaConsumer (kafka-0-10-sql)

2018-03-17 Thread Shixiong Zhu (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23623?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shixiong Zhu updated SPARK-23623:
-
Fix Version/s: 2.3.1

> Avoid concurrent use of cached KafkaConsumer in CachedKafkaConsumer 
> (kafka-0-10-sql)
> 
>
> Key: SPARK-23623
> URL: https://issues.apache.org/jira/browse/SPARK-23623
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.3.0
>Reporter: Tathagata Das
>Assignee: Tathagata Das
>Priority: Critical
> Fix For: 2.3.1, 2.4.0
>
>
> CacheKafkaConsumer in the project `kafka-0-10-sql` is designed to maintain a 
> pool of KafkaConsumers that can be reused. However, it was built with the 
> assumption there will be only one task using trying to read the same Kafka 
> TopicPartition at the same time. Hence, the cache was keyed by the 
> TopicPartition a consumer is supposed to read. And any cases where this 
> assumption may not be true, we have SparkPlan flag to disable the use of a 
> cache. So it was up to the planner to correctly identify when it was not safe 
> to use the cache and set the flag accordingly. 
> Fundamentally, this is the wrong way to approach the problem. It is HARD for 
> a high-level planner to reason about the low-level execution model, whether 
> there will be multiple tasks in the same query trying to read the same 
> partition. Case in point, 2.3.0 introduced stream-stream joins, and you can 
> build a streaming self-join query on Kafka. It's pretty non-trivial to figure 
> out how this leads to two tasks reading the same partition twice, possibly 
> concurrently. And due to the non-triviality, it is hard to figure this out in 
> the planner and set the flag to avoid the cache / consumer pool. And this can 
> inadvertently lead to {{ConcurrentModificationException}} ,or worse, silent 
> reading of incorrect data.
> Here is a better way to design this. The planner shouldnt have to understand 
> these low-level optimizations. Rather the consumer pool should be smart 
> enough avoid concurrent use of a cached consumer. Currently, it tries to do 
> so but incorrectly (the flag {{inuse}} is not checked when returning a cached 
> consumer, see 
> [this|[https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala#L403]).]
>  If there is another request for the same partition as a currently in-use 
> consumer, the pool should automatically return a fresh consumer that should 
> be closed when the task is done.
>  
>  
>  
>  
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-23623) Avoid concurrent use of cached KafkaConsumer in CachedKafkaConsumer (kafka-0-10-sql)

2018-03-07 Thread Tathagata Das (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23623?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tathagata Das updated SPARK-23623:
--
Description: 
CacheKafkaConsumer in the project `kafka-0-10-sql` is designed to maintain a 
pool of KafkaConsumers that can be reused. However, it was built with the 
assumption there will be only one task using trying to read the same Kafka 
TopicPartition at the same time. Hence, the cache was keyed by the 
TopicPartition a consumer is supposed to read. And any cases where this 
assumption may not be true, we have SparkPlan flag to disable the use of a 
cache. So it was up to the planner to correctly identify when it was not safe 
to use the cache and set the flag accordingly. 

Fundamentally, this is the wrong way to approach the problem. It is HARD for a 
high-level planner to reason about the low-level execution model, whether there 
will be multiple tasks in the same query trying to read the same partition. 
Case in point, 2.3.0 introduced stream-stream joins, and you can build a 
streaming self-join query on Kafka. It's pretty non-trivial to figure out how 
this leads to two tasks reading the same partition twice, possibly 
concurrently. And due to the non-triviality, it is hard to figure this out in 
the planner and set the flag to avoid the cache / consumer pool. And this can 
inadvertently lead to {{ConcurrentModificationException}} ,or worse, silent 
reading of incorrect data.

Here is a better way to design this. The planner shouldnt have to understand 
these low-level optimizations. Rather the consumer pool should be smart enough 
avoid concurrent use of a cached consumer. Currently, it tries to do so but 
incorrectly (the flag {{inuse}} is not checked when returning a cached 
consumer, see 
[this|[https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala#L403]).]
 If there is another request for the same partition as a currently in-use 
consumer, the pool should automatically return a fresh consumer that should be 
closed when the task is done.

 

 

 

 

 

 

 

  was:
CacheKafkaConsumer in the project `kafka-0-10-sql` is designed to maintain a 
pool of KafkaConsumers that can be reused. However, it was built with the 
assumption there will be only one task using trying to read the same Kafka 
TopicPartition at the same time. Hence, the cache was keyed by the 
TopicPartition a consumer is supposed to read. And any cases where this 
assumption may not be true, we have SparkPlan flag to disable the use of a 
cache. So it was up to the planner to correctly identify when it was not safe 
to use the cache and set the flag accordingly. 

Fundamentally, this is the wrong way to approach the problem. It is HARD for a 
high-level planner to reason about the low-level execution model, whether there 
will be multiple tasks in the same query trying to read the same partition. 
Case in point, 2.3.0 introduced stream-stream joins, and you can build a 
streaming self-join query on Kafka. It's pretty non-trivial to figure out how 
this leads to two tasks reading the same partition twice, possibly 
concurrently. And due to the non-triviality, it is hard to figure this out in 
the planner and set the flag to avoid the cache / consumer pool. And this can 
inadvertantly lead to 

Here is a better way to design this. The planner shouldnt have to understand 
these low-level optimizations. Rather the consumer pool should be smart enough 
avoid concurrent use of a cached consumer. Currently, it tries to do so but 
incorrectly (the flag {{inuse}} is not checked when returning a cached 
consumer, see 
[this|[https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala#L403]).]
 If there is another request for the same partition as a currently in-use 
consumer, the pool should automatically return a fresh consumer that should be 
closed when the task is done.

 

 

 

 

 

 

 


> Avoid concurrent use of cached KafkaConsumer in CachedKafkaConsumer 
> (kafka-0-10-sql)
> 
>
> Key: SPARK-23623
> URL: https://issues.apache.org/jira/browse/SPARK-23623
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.3.0
>Reporter: Tathagata Das
>Assignee: Tathagata Das
>Priority: Critical
>
> CacheKafkaConsumer in the project `kafka-0-10-sql` is designed to maintain a 
> pool of KafkaConsumers that can be reused. However, it was built with the 
> assumption there will be only one task using trying to read the same Kafka 
> TopicPartition at the same time. Hence, the cache was keyed by the 
> TopicPartition a consumer is supposed to read. And any cases 

[jira] [Updated] (SPARK-23623) Avoid concurrent use of cached KafkaConsumer in CachedKafkaConsumer (kafka-0-10-sql)

2018-03-07 Thread Tathagata Das (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23623?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tathagata Das updated SPARK-23623:
--
Description: 
CacheKafkaConsumer in the project `kafka-0-10-sql` is designed to maintain a 
pool of KafkaConsumers that can be reused. However, it was built with the 
assumption there will be only one task using trying to read the same Kafka 
TopicPartition at the same time. Hence, the cache was keyed by the 
TopicPartition a consumer is supposed to read. And any cases where this 
assumption may not be true, we have SparkPlan flag to disable the use of a 
cache. So it was up to the planner to correctly identify when it was not safe 
to use the cache and set the flag accordingly. 

Fundamentally, this is the wrong way to approach the problem. It is HARD for a 
high-level planner to reason about the low-level execution model, whether there 
will be multiple tasks in the same query trying to read the same partition. 
Case in point, 2.3.0 introduced stream-stream joins, and you can build a 
streaming self-join query on Kafka. It's pretty non-trivial to figure out how 
this leads to two tasks reading the same partition twice, possibly 
concurrently. And due to the non-triviality, it is hard to figure this out in 
the planner and set the flag to avoid the cache / consumer pool. And this can 
inadvertantly lead to 

Here is a better way to design this. The planner shouldnt have to understand 
these low-level optimizations. Rather the consumer pool should be smart enough 
avoid concurrent use of a cached consumer. Currently, it tries to do so but 
incorrectly (the flag {{inuse}} is not checked when returning a cached 
consumer, see 
[this|[https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala#L403]).]
 If there is another request for the same partition as a currently in-use 
consumer, the pool should automatically return a fresh consumer that should be 
closed when the task is done.

 

 

 

 

 

 

 

  was:
CacheKafkaConsumer in the project `kafka-0-10-sql` is designed to maintain a 
pool of KafkaConsumers that can be reused. However, it was built with the 
assumption there will be only one task using trying to read the same Kafka 
TopicPartition at the same time. Hence, the cache was keyed by the 
TopicPartition a consumer is supposed to read. And any cases where this 
assumption may not be true, we have SparkPlan flag to disable the use of a 
cache. So it was up to the planner to correctly identify when it was not safe 
to use the cache and set the flag accordingly. 

Fundamentally, this is the wrong way to approach the problem. It is HARD for a 
high-level planner to reason about the low-level execution model, whether there 
will be multiple tasks in the same query trying to read the same partition. 
Case in point, 2.3.0 introduced stream-stream joins, and you can build a 
streaming self-join query on Kafka. It's pretty non-trivial to figure out how 
this leads to two tasks reading the same partition twice, possibly 
concurrently. And due to the non-triviality, it is hard to figure this out in 
the planner and set the flag to avoid the cache / consumer pool.

Here is a better way to design this. The planner shouldnt have to understand 
these low-level optimizations. Rather the consumer pool should be smart enough 
avoid concurrent use of a cached consumer. Currently, it tries to do so but 
incorrectly (the flag {{inuse}} is not checked when returning a cached 
consumer, see 
[this|[https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala#L403]).]
 If there is another request for the same partition as a currently in-use 
consumer, the pool should automatically return a fresh consumer that should be 
closed when the task is done.

 

 

 

 

 

 

 


> Avoid concurrent use of cached KafkaConsumer in CachedKafkaConsumer 
> (kafka-0-10-sql)
> 
>
> Key: SPARK-23623
> URL: https://issues.apache.org/jira/browse/SPARK-23623
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.3.0
>Reporter: Tathagata Das
>Assignee: Tathagata Das
>Priority: Critical
>
> CacheKafkaConsumer in the project `kafka-0-10-sql` is designed to maintain a 
> pool of KafkaConsumers that can be reused. However, it was built with the 
> assumption there will be only one task using trying to read the same Kafka 
> TopicPartition at the same time. Hence, the cache was keyed by the 
> TopicPartition a consumer is supposed to read. And any cases where this 
> assumption may not be true, we have SparkPlan flag to disable the use of a 
> cache. So it was up to the