Github user jose-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20253#discussion_r161347074
  
    --- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
 ---
    @@ -192,11 +193,18 @@ class KafkaContinuousDataReader(
       override def next(): Boolean = {
         var r: ConsumerRecord[Array[Byte], Array[Byte]] = null
         while (r == null) {
    -      r = consumer.get(
    -        nextKafkaOffset,
    -        untilOffset = Long.MaxValue,
    -        pollTimeoutMs = Long.MaxValue,
    -        failOnDataLoss)
    +      // Our consumer.get is not interruptible, so we have to set a low 
poll timeout, leaving
    +      // interrupt points to end the query rather than waiting for new 
data that might never come.
    +      // We simply swallow any resulting timeout exceptions and retry.
    +      try {
    +        r = consumer.get(
    +          nextKafkaOffset,
    +          untilOffset = Long.MaxValue,
    +          pollTimeoutMs = 1000,
    +          failOnDataLoss = false)
    --- End diff --
    
    Kafka consumers don't respect thread interrupts, and our consumer wrapper 
only partly mitigates this by running them in an UninterruptibleThread. As it 
stands, even when its parent task ends, the reader will never actually close 
until it reads a new value or the JVM dies.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to