Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/20253#discussion_r161351849 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala --- @@ -192,11 +194,30 @@ class KafkaContinuousDataReader( override def next(): Boolean = { var r: ConsumerRecord[Array[Byte], Array[Byte]] = null while (r == null) { - r = consumer.get( - nextKafkaOffset, - untilOffset = Long.MaxValue, - pollTimeoutMs = Long.MaxValue, - failOnDataLoss) + if (TaskContext.get().isInterrupted() || TaskContext.get().isCompleted()) return false + // Our consumer.get is not interruptible, so we have to set a low poll timeout, leaving + // interrupt points to end the query rather than waiting for new data that might never come. + try { + r = consumer.get( + nextKafkaOffset, + untilOffset = Long.MaxValue, + pollTimeoutMs = 1000, + failOnDataLoss) + } catch { + // We didn't read within the timeout. We're supposed to block indefinitely for new data, so + // swallow and ignore this. + case _: TimeoutException => + // This is a failOnDataLoss exception. Retry if nextKafkaOffset is within the data range, + // or if it's the endpoint of the data range (i.e. the "true" next offset). + case e: IllegalStateException => + val range = consumer.getAvailableOffsetRange() + if (e.getCause.isInstanceOf[OffsetOutOfRangeException] && --- End diff -- nit: move this condition above like `case e: IllegalStateException if e.getCause.isInstanceOf[OffsetOutOfRangeException] =>`
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org