Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/20253#discussion_r161347074 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala --- @@ -192,11 +193,18 @@ class KafkaContinuousDataReader( override def next(): Boolean = { var r: ConsumerRecord[Array[Byte], Array[Byte]] = null while (r == null) { - r = consumer.get( - nextKafkaOffset, - untilOffset = Long.MaxValue, - pollTimeoutMs = Long.MaxValue, - failOnDataLoss) + // Our consumer.get is not interruptible, so we have to set a low poll timeout, leaving + // interrupt points to end the query rather than waiting for new data that might never come. + // We simply swallow any resulting timeout exceptions and retry. + try { + r = consumer.get( + nextKafkaOffset, + untilOffset = Long.MaxValue, + pollTimeoutMs = 1000, + failOnDataLoss = false) --- End diff -- Kafka consumers don't respect thread interrupts, and our consumer wrapper only partly mitigates this by running them in an UninterruptibleThread. As it stands, even when its parent task ends, the reader will never actually close until it reads a new value or the JVM dies.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org