[GitHub] spark pull request #20253: [SPARK-22908][SS] fix continuous Kafka data reade...

2018-01-12 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/20253#discussion_r161351849
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
 ---
@@ -192,11 +194,30 @@ class KafkaContinuousDataReader(
   override def next(): Boolean = {
 var r: ConsumerRecord[Array[Byte], Array[Byte]] = null
 while (r == null) {
-  r = consumer.get(
-nextKafkaOffset,
-untilOffset = Long.MaxValue,
-pollTimeoutMs = Long.MaxValue,
-failOnDataLoss)
+  if (TaskContext.get().isInterrupted() || 
TaskContext.get().isCompleted()) return false
+  // Our consumer.get is not interruptible, so we have to set a low 
poll timeout, leaving
+  // interrupt points to end the query rather than waiting for new 
data that might never come.
+  try {
+r = consumer.get(
+  nextKafkaOffset,
+  untilOffset = Long.MaxValue,
+  pollTimeoutMs = 1000,
+  failOnDataLoss)
+  } catch {
+// We didn't read within the timeout. We're supposed to block 
indefinitely for new data, so
+// swallow and ignore this.
+case _: TimeoutException =>
+// This is a failOnDataLoss exception. Retry if nextKafkaOffset is 
within the data range,
+// or if it's the endpoint of the data range (i.e. the "true" next 
offset).
+case e: IllegalStateException =>
+  val range = consumer.getAvailableOffsetRange()
+  if (e.getCause.isInstanceOf[OffsetOutOfRangeException] &&
--- End diff --

nit: move this condition above like `case e: IllegalStateException  if 
e.getCause.isInstanceOf[OffsetOutOfRangeException] =>`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20253: [SPARK-22908][SS] fix continuous Kafka data reade...

2018-01-12 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/20253#discussion_r161349767
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
 ---
@@ -192,11 +194,26 @@ class KafkaContinuousDataReader(
   override def next(): Boolean = {
 var r: ConsumerRecord[Array[Byte], Array[Byte]] = null
 while (r == null) {
-  r = consumer.get(
-nextKafkaOffset,
-untilOffset = Long.MaxValue,
-pollTimeoutMs = Long.MaxValue,
-failOnDataLoss)
+  if (TaskContext.get().isInterrupted() || 
TaskContext.get().isCompleted()) return false
+  // Our consumer.get is not interruptible, so we have to set a low 
poll timeout, leaving
+  // interrupt points to end the query rather than waiting for new 
data that might never come.
+  try {
+r = consumer.get(
+  nextKafkaOffset,
+  untilOffset = Long.MaxValue,
+  pollTimeoutMs = 1000,
+  failOnDataLoss)
+  } catch {
+// We didn't read within the timeout. We're supposed to block 
indefinitely for new data, so
+// swallow and ignore this.
+case _: TimeoutException =>
+// Data loss is reported on both sides - but we expect scenarios 
where the offset we're
+// looking for isn't available yet. Don't propagate the exception 
unless it's because
+// the offset we're looking for is below the available range.
+case e: IllegalStateException
+  if e.getCause.isInstanceOf[OffsetOutOfRangeException] &&
+!(consumer.getAvailableOffsetRange().earliest > 
nextKafkaOffset) =>
--- End diff --

I would change this condition to `this.getAvailableOffsetRange().latest == 
nextKafkaOffset` for safety.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20253: [SPARK-22908][SS] fix continuous Kafka data reade...

2018-01-12 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/20253#discussion_r161347769
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
 ---
@@ -192,11 +193,18 @@ class KafkaContinuousDataReader(
   override def next(): Boolean = {
 var r: ConsumerRecord[Array[Byte], Array[Byte]] = null
 while (r == null) {
-  r = consumer.get(
-nextKafkaOffset,
-untilOffset = Long.MaxValue,
-pollTimeoutMs = Long.MaxValue,
-failOnDataLoss)
+  // Our consumer.get is not interruptible, so we have to set a low 
poll timeout, leaving
+  // interrupt points to end the query rather than waiting for new 
data that might never come.
+  // We simply swallow any resulting timeout exceptions and retry.
+  try {
+r = consumer.get(
+  nextKafkaOffset,
+  untilOffset = Long.MaxValue,
+  pollTimeoutMs = 1000,
+  failOnDataLoss = false)
+  } catch {
+case _: TimeoutException => ()
--- End diff --

Done.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20253: [SPARK-22908][SS] fix continuous Kafka data reade...

2018-01-12 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/20253#discussion_r161347074
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
 ---
@@ -192,11 +193,18 @@ class KafkaContinuousDataReader(
   override def next(): Boolean = {
 var r: ConsumerRecord[Array[Byte], Array[Byte]] = null
 while (r == null) {
-  r = consumer.get(
-nextKafkaOffset,
-untilOffset = Long.MaxValue,
-pollTimeoutMs = Long.MaxValue,
-failOnDataLoss)
+  // Our consumer.get is not interruptible, so we have to set a low 
poll timeout, leaving
+  // interrupt points to end the query rather than waiting for new 
data that might never come.
+  // We simply swallow any resulting timeout exceptions and retry.
+  try {
+r = consumer.get(
+  nextKafkaOffset,
+  untilOffset = Long.MaxValue,
+  pollTimeoutMs = 1000,
+  failOnDataLoss = false)
--- End diff --

Kafka consumers don't respect thread interrupts, and our consumer wrapper 
only partly mitigates this by running them in an UninterruptibleThread. As it 
stands, even when its parent task ends, the reader will never actually close 
until it reads a new value or the JVM dies.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20253: [SPARK-22908][SS] fix continuous Kafka data reade...

2018-01-12 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20253#discussion_r161344562
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
 ---
@@ -192,11 +193,18 @@ class KafkaContinuousDataReader(
   override def next(): Boolean = {
 var r: ConsumerRecord[Array[Byte], Array[Byte]] = null
 while (r == null) {
-  r = consumer.get(
-nextKafkaOffset,
-untilOffset = Long.MaxValue,
-pollTimeoutMs = Long.MaxValue,
-failOnDataLoss)
+  // Our consumer.get is not interruptible, so we have to set a low 
poll timeout, leaving
+  // interrupt points to end the query rather than waiting for new 
data that might never come.
+  // We simply swallow any resulting timeout exceptions and retry.
+  try {
+r = consumer.get(
+  nextKafkaOffset,
+  untilOffset = Long.MaxValue,
+  pollTimeoutMs = 1000,
+  failOnDataLoss = false)
+  } catch {
+case _: TimeoutException => ()
--- End diff --

nit: I think this can be `case _: TimeoutException =>` or even better we 
might add a comment like `case _: TimeoutException => // ignore the exception 
and continue polling` or what you think is more appropriate


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20253: [SPARK-22908][SS] fix continuous Kafka data reade...

2018-01-12 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20253#discussion_r161343982
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
 ---
@@ -192,11 +193,18 @@ class KafkaContinuousDataReader(
   override def next(): Boolean = {
 var r: ConsumerRecord[Array[Byte], Array[Byte]] = null
 while (r == null) {
-  r = consumer.get(
-nextKafkaOffset,
-untilOffset = Long.MaxValue,
-pollTimeoutMs = Long.MaxValue,
-failOnDataLoss)
+  // Our consumer.get is not interruptible, so we have to set a low 
poll timeout, leaving
+  // interrupt points to end the query rather than waiting for new 
data that might never come.
+  // We simply swallow any resulting timeout exceptions and retry.
+  try {
+r = consumer.get(
+  nextKafkaOffset,
+  untilOffset = Long.MaxValue,
+  pollTimeoutMs = 1000,
+  failOnDataLoss = false)
--- End diff --

may I kindly ask you why we have to change this?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20253: [SPARK-22908][SS] fix continuous Kafka data reade...

2018-01-12 Thread jose-torres
GitHub user jose-torres opened a pull request:

https://github.com/apache/spark/pull/20253

[SPARK-22908][SS] fix continuous Kafka data reader

## What changes were proposed in this pull request?

The Kafka reader is now interruptible and can close itself.

Note that this means we no longer have full failOnDataLoss support. We 
should not block on that, since this is causing serious test flakiness.

## How was this patch tested?

I locally ran one of the ContinuousKafkaSourceSuite tests in a tight loop. 
Before the fix, my machine ran out of open file descriptors a few iterations 
in; now it works fine.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jose-torres/spark fix-data-reader

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20253.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20253


commit 0cb70f66fd64fb935a54a9098f9161368843b146
Author: Jose Torres 
Date:   2018-01-12T22:40:45Z

fix data reader




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org