GitHub user tdas opened a pull request:
https://github.com/apache/spark/pull/20848
[SPARK-23623][SS] Avoid concurrent use of cached consumers in
CachedKafkaConsumer (branch-2.3)
This is a backport of #20767 to branch 2.3
## What changes were proposed in this pull request?
CacheKafkaConsumer in the project `kafka-0-10-sql` is designed to maintain
a pool of KafkaConsumers that can be reused. However, it was built with the
assumption there will be only one task using trying to read the same Kafka
TopicPartition at the same time. Hence, the cache was keyed by the
TopicPartition a consumer is supposed to read. And any cases where this
assumption may not be true, we have SparkPlan flag to disable the use of a
cache. So it was up to the planner to correctly identify when it was not safe
to use the cache and set the flag accordingly.
Fundamentally, this is the wrong way to approach the problem. It is HARD
for a high-level planner to reason about the low-level execution model, whether
there will be multiple tasks in the same query trying to read the same
partition. Case in point, 2.3.0 introduced stream-stream joins, and you can
build a streaming self-join query on Kafka. It's pretty non-trivial to figure
out how this leads to two tasks reading the same partition twice, possibly
concurrently. And due to the non-triviality, it is hard to figure this out in
the planner and set the flag to avoid the cache / consumer pool. And this can
inadvertently lead to ConcurrentModificationException ,or worse, silent reading
of incorrect data.
Here is a better way to design this. The planner shouldnt have to
understand these low-level optimizations. Rather the consumer pool should be
smart enough avoid concurrent use of a cached consumer. Currently, it tries to
do so but incorrectly (the flag inuse is not checked when returning a cached
consumer, see
[this](https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala#L403)).
If there is another request for the same partition as a currently in-use
consumer, the pool should automatically return a fresh consumer that should be
closed when the task is done. Then the planner does not have to have a flag to
avoid reuses.
This PR is a step towards that goal. It does the following.
- There are effectively two kinds of consumer that may be generated
- Cached consumer - this should be returned to the pool at task end
- Non-cached consumer - this should be closed at task end
- A trait called KafkaConsumer is introduced to hide this difference from
the users of the consumer so that the client code does not have to reason about
whether to stop and release. They simply called `val consumer =
KafkaConsumer.acquire` and then `consumer.release()`.
- If there is request for a consumer that is in-use, then a new consumer is
generated.
- If there is a concurrent attempt of the same task, then a new consumer is
generated, and the existing cached consumer is marked for close upon release.
- In addition, I renamed the classes because CachedKafkaConsumer is a
misnomer given that what it returns may or may not be cached.
This PR does not remove the planner flag to avoid reuse to make this patch
safe enough for merging in branch-2.3. This can be done later in master-only.
## How was this patch tested?
A new stress test that verifies it is safe to concurrently get consumers
for the same partition from the consumer pool.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/tdas/spark SPARK-23623-2.3
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/20848.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #20848
----
commit 9e440a4e788980e0dc475aa7966c3e56010e7cf7
Author: Tathagata Das <tathagata.das1565@...>
Date: 2018-03-16T18:11:07Z
[SPARK-23623][SS] Avoid concurrent use of cached consumers in
CachedKafkaConsumer
CacheKafkaConsumer in the project `kafka-0-10-sql` is designed to maintain
a pool of KafkaConsumers that can be reused. However, it was built with the
assumption there will be only one task using trying to read the same Kafka
TopicPartition at the same time. Hence, the cache was keyed by the
TopicPartition a consumer is supposed to read. And any cases where this
assumption may not be true, we have SparkPlan flag to disable the use of a
cache. So it was up to the planner to correctly identify when it was not safe
to use the cache and set the flag accordingly.
Fundamentally, this is the wrong way to approach the problem. It is HARD
for a high-level planner to reason about the low-level execution model, whether
there will be multiple tasks in the same query trying to read the same
partition. Case in point, 2.3.0 introduced stream-stream joins, and you can
build a streaming self-join query on Kafka. It's pretty non-trivial to figure
out how this leads to two tasks reading the same partition twice, possibly
concurrently. And due to the non-triviality, it is hard to figure this out in
the planner and set the flag to avoid the cache / consumer pool. And this can
inadvertently lead to ConcurrentModificationException ,or worse, silent reading
of incorrect data.
Here is a better way to design this. The planner shouldnt have to
understand these low-level optimizations. Rather the consumer pool should be
smart enough avoid concurrent use of a cached consumer. Currently, it tries to
do so but incorrectly (the flag inuse is not checked when returning a cached
consumer, see
[this](https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala#L403)).
If there is another request for the same partition as a currently in-use
consumer, the pool should automatically return a fresh consumer that should be
closed when the task is done. Then the planner does not have to have a flag to
avoid reuses.
This PR is a step towards that goal. It does the following.
- There are effectively two kinds of consumer that may be generated
- Cached consumer - this should be returned to the pool at task end
- Non-cached consumer - this should be closed at task end
- A trait called KafkaConsumer is introduced to hide this difference from
the users of the consumer so that the client code does not have to reason about
whether to stop and release. They simply called `val consumer =
KafkaConsumer.acquire` and then `consumer.release()`.
- If there is request for a consumer that is in-use, then a new consumer is
generated.
- If there is a concurrent attempt of the same task, then a new consumer is
generated, and the existing cached consumer is marked for close upon release.
- In addition, I renamed the classes because CachedKafkaConsumer is a
misnomer given that what it returns may or may not be cached.
This PR does not remove the planner flag to avoid reuse to make this patch
safe enough for merging in branch-2.3. This can be done later in master-only.
A new stress test that verifies it is safe to concurrently get consumers
for the same partition from the consumer pool.
Author: Tathagata Das <[email protected]>
Closes #20767 from tdas/SPARK-23623.
----
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]