Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/20253#discussion_r161343982 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala --- @@ -192,11 +193,18 @@ class KafkaContinuousDataReader( override def next(): Boolean = { var r: ConsumerRecord[Array[Byte], Array[Byte]] = null while (r == null) { - r = consumer.get( - nextKafkaOffset, - untilOffset = Long.MaxValue, - pollTimeoutMs = Long.MaxValue, - failOnDataLoss) + // Our consumer.get is not interruptible, so we have to set a low poll timeout, leaving + // interrupt points to end the query rather than waiting for new data that might never come. + // We simply swallow any resulting timeout exceptions and retry. + try { + r = consumer.get( + nextKafkaOffset, + untilOffset = Long.MaxValue, + pollTimeoutMs = 1000, + failOnDataLoss = false) --- End diff -- may I kindly ask you why we have to change this?
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org