Github user jose-torres commented on a diff in the pull request:
https://github.com/apache/spark/pull/20253#discussion_r161347074
--- Diff:
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
---
@@ -192,11 +193,18 @@ class KafkaContinuousDataReader(
override def next(): Boolean = {
var r: ConsumerRecord[Array[Byte], Array[Byte]] = null
while (r == null) {
- r = consumer.get(
- nextKafkaOffset,
- untilOffset = Long.MaxValue,
- pollTimeoutMs = Long.MaxValue,
- failOnDataLoss)
+ // Our consumer.get is not interruptible, so we have to set a low
poll timeout, leaving
+ // interrupt points to end the query rather than waiting for new
data that might never come.
+ // We simply swallow any resulting timeout exceptions and retry.
+ try {
+ r = consumer.get(
+ nextKafkaOffset,
+ untilOffset = Long.MaxValue,
+ pollTimeoutMs = 1000,
+ failOnDataLoss = false)
--- End diff --
Kafka consumers don't respect thread interrupts, and our consumer wrapper
only partly mitigates this by running them in an UninterruptibleThread. As it
stands, even when its parent task ends, the reader will never actually close
until it reads a new value or the JVM dies.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]