[GitHub] spark pull request #20253: [SPARK-22908][SS] fix continuous Kafka data reade...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/20253#discussion_r161351849 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala --- @@ -192,11 +194,30 @@ class KafkaContinuousDataReader( override def next(): Boolean = { var r: ConsumerRecord[Array[Byte], Array[Byte]] = null while (r == null) { - r = consumer.get( -nextKafkaOffset, -untilOffset = Long.MaxValue, -pollTimeoutMs = Long.MaxValue, -failOnDataLoss) + if (TaskContext.get().isInterrupted() || TaskContext.get().isCompleted()) return false + // Our consumer.get is not interruptible, so we have to set a low poll timeout, leaving + // interrupt points to end the query rather than waiting for new data that might never come. + try { +r = consumer.get( + nextKafkaOffset, + untilOffset = Long.MaxValue, + pollTimeoutMs = 1000, + failOnDataLoss) + } catch { +// We didn't read within the timeout. We're supposed to block indefinitely for new data, so +// swallow and ignore this. +case _: TimeoutException => +// This is a failOnDataLoss exception. Retry if nextKafkaOffset is within the data range, +// or if it's the endpoint of the data range (i.e. the "true" next offset). +case e: IllegalStateException => + val range = consumer.getAvailableOffsetRange() + if (e.getCause.isInstanceOf[OffsetOutOfRangeException] && --- End diff -- nit: move this condition above like `case e: IllegalStateException if e.getCause.isInstanceOf[OffsetOutOfRangeException] =>` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20253: [SPARK-22908][SS] fix continuous Kafka data reade...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/20253#discussion_r161349767 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala --- @@ -192,11 +194,26 @@ class KafkaContinuousDataReader( override def next(): Boolean = { var r: ConsumerRecord[Array[Byte], Array[Byte]] = null while (r == null) { - r = consumer.get( -nextKafkaOffset, -untilOffset = Long.MaxValue, -pollTimeoutMs = Long.MaxValue, -failOnDataLoss) + if (TaskContext.get().isInterrupted() || TaskContext.get().isCompleted()) return false + // Our consumer.get is not interruptible, so we have to set a low poll timeout, leaving + // interrupt points to end the query rather than waiting for new data that might never come. + try { +r = consumer.get( + nextKafkaOffset, + untilOffset = Long.MaxValue, + pollTimeoutMs = 1000, + failOnDataLoss) + } catch { +// We didn't read within the timeout. We're supposed to block indefinitely for new data, so +// swallow and ignore this. +case _: TimeoutException => +// Data loss is reported on both sides - but we expect scenarios where the offset we're +// looking for isn't available yet. Don't propagate the exception unless it's because +// the offset we're looking for is below the available range. +case e: IllegalStateException + if e.getCause.isInstanceOf[OffsetOutOfRangeException] && +!(consumer.getAvailableOffsetRange().earliest > nextKafkaOffset) => --- End diff -- I would change this condition to `this.getAvailableOffsetRange().latest == nextKafkaOffset` for safety. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20253: [SPARK-22908][SS] fix continuous Kafka data reade...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/20253#discussion_r161347769 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala --- @@ -192,11 +193,18 @@ class KafkaContinuousDataReader( override def next(): Boolean = { var r: ConsumerRecord[Array[Byte], Array[Byte]] = null while (r == null) { - r = consumer.get( -nextKafkaOffset, -untilOffset = Long.MaxValue, -pollTimeoutMs = Long.MaxValue, -failOnDataLoss) + // Our consumer.get is not interruptible, so we have to set a low poll timeout, leaving + // interrupt points to end the query rather than waiting for new data that might never come. + // We simply swallow any resulting timeout exceptions and retry. + try { +r = consumer.get( + nextKafkaOffset, + untilOffset = Long.MaxValue, + pollTimeoutMs = 1000, + failOnDataLoss = false) + } catch { +case _: TimeoutException => () --- End diff -- Done. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20253: [SPARK-22908][SS] fix continuous Kafka data reade...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/20253#discussion_r161347074 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala --- @@ -192,11 +193,18 @@ class KafkaContinuousDataReader( override def next(): Boolean = { var r: ConsumerRecord[Array[Byte], Array[Byte]] = null while (r == null) { - r = consumer.get( -nextKafkaOffset, -untilOffset = Long.MaxValue, -pollTimeoutMs = Long.MaxValue, -failOnDataLoss) + // Our consumer.get is not interruptible, so we have to set a low poll timeout, leaving + // interrupt points to end the query rather than waiting for new data that might never come. + // We simply swallow any resulting timeout exceptions and retry. + try { +r = consumer.get( + nextKafkaOffset, + untilOffset = Long.MaxValue, + pollTimeoutMs = 1000, + failOnDataLoss = false) --- End diff -- Kafka consumers don't respect thread interrupts, and our consumer wrapper only partly mitigates this by running them in an UninterruptibleThread. As it stands, even when its parent task ends, the reader will never actually close until it reads a new value or the JVM dies. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20253: [SPARK-22908][SS] fix continuous Kafka data reade...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/20253#discussion_r161344562 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala --- @@ -192,11 +193,18 @@ class KafkaContinuousDataReader( override def next(): Boolean = { var r: ConsumerRecord[Array[Byte], Array[Byte]] = null while (r == null) { - r = consumer.get( -nextKafkaOffset, -untilOffset = Long.MaxValue, -pollTimeoutMs = Long.MaxValue, -failOnDataLoss) + // Our consumer.get is not interruptible, so we have to set a low poll timeout, leaving + // interrupt points to end the query rather than waiting for new data that might never come. + // We simply swallow any resulting timeout exceptions and retry. + try { +r = consumer.get( + nextKafkaOffset, + untilOffset = Long.MaxValue, + pollTimeoutMs = 1000, + failOnDataLoss = false) + } catch { +case _: TimeoutException => () --- End diff -- nit: I think this can be `case _: TimeoutException =>` or even better we might add a comment like `case _: TimeoutException => // ignore the exception and continue polling` or what you think is more appropriate --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20253: [SPARK-22908][SS] fix continuous Kafka data reade...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/20253#discussion_r161343982 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala --- @@ -192,11 +193,18 @@ class KafkaContinuousDataReader( override def next(): Boolean = { var r: ConsumerRecord[Array[Byte], Array[Byte]] = null while (r == null) { - r = consumer.get( -nextKafkaOffset, -untilOffset = Long.MaxValue, -pollTimeoutMs = Long.MaxValue, -failOnDataLoss) + // Our consumer.get is not interruptible, so we have to set a low poll timeout, leaving + // interrupt points to end the query rather than waiting for new data that might never come. + // We simply swallow any resulting timeout exceptions and retry. + try { +r = consumer.get( + nextKafkaOffset, + untilOffset = Long.MaxValue, + pollTimeoutMs = 1000, + failOnDataLoss = false) --- End diff -- may I kindly ask you why we have to change this? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20253: [SPARK-22908][SS] fix continuous Kafka data reade...
GitHub user jose-torres opened a pull request: https://github.com/apache/spark/pull/20253 [SPARK-22908][SS] fix continuous Kafka data reader ## What changes were proposed in this pull request? The Kafka reader is now interruptible and can close itself. Note that this means we no longer have full failOnDataLoss support. We should not block on that, since this is causing serious test flakiness. ## How was this patch tested? I locally ran one of the ContinuousKafkaSourceSuite tests in a tight loop. Before the fix, my machine ran out of open file descriptors a few iterations in; now it works fine. You can merge this pull request into a Git repository by running: $ git pull https://github.com/jose-torres/spark fix-data-reader Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20253.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20253 commit 0cb70f66fd64fb935a54a9098f9161368843b146 Author: Jose TorresDate: 2018-01-12T22:40:45Z fix data reader --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org