spark git commit: [SPARK-18588][TESTS] Fix flaky test: KafkaSourceStressForDontFailOnDataLossSuite
Repository: spark Updated Branches: refs/heads/master bb94f61a7 -> edc87e189 [SPARK-18588][TESTS] Fix flaky test: KafkaSourceStressForDontFailOnDataLossSuite ## What changes were proposed in this pull request? Fixed the following failures: ``` org.scalatest.exceptions.TestFailedDueToTimeoutException: The code passed to eventually never returned normally. Attempted 3745 times over 1.79085165 minutes. Last failure message: assertion failed: failOnDataLoss-0 not deleted after timeout. ``` ``` sbt.ForkMain$ForkError: org.apache.spark.sql.streaming.StreamingQueryException: Query query-66 terminated with exception: null at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:252) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:146) Caused by: sbt.ForkMain$ForkError: java.lang.NullPointerException: null at java.util.ArrayList.addAll(ArrayList.java:577) at org.apache.kafka.clients.Metadata.getClusterForCurrentTopics(Metadata.java:257) at org.apache.kafka.clients.Metadata.update(Metadata.java:177) at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.handleResponse(NetworkClient.java:605) at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeHandleCompletedReceive(NetworkClient.java:582) at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:450) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:269) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitPendingRequests(ConsumerNetworkClient.java:260) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:222) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:366) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:978) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938) at ... ``` ## How was this patch tested? Tested in #16048 by running many times. Author: Shixiong Zhu Closes #16109 from zsxwing/fix-kafka-flaky-test. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/edc87e18 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/edc87e18 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/edc87e18 Branch: refs/heads/master Commit: edc87e18922b98be47c298cdc3daa2b049a737e9 Parents: bb94f61 Author: Shixiong Zhu Authored: Wed Dec 7 13:47:44 2016 -0800 Committer: Tathagata Das Committed: Wed Dec 7 13:47:44 2016 -0800 -- .../sql/kafka010/CachedKafkaConsumer.scala | 39 -- .../apache/spark/sql/kafka010/KafkaSource.scala | 2 +- .../spark/sql/kafka010/KafkaSourceSuite.scala | 11 ++- .../spark/sql/kafka010/KafkaTestUtils.scala | 75 +--- .../spark/sql/test/SharedSQLContext.scala | 8 ++- 5 files changed, 96 insertions(+), 39 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/edc87e18/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala -- diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala index 3f438e9..3f396a7 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala @@ -86,7 +86,7 @@ private[kafka010] case class CachedKafkaConsumer private( var toFetchOffset = offset while (toFetchOffset != UNKNOWN_OFFSET) { try { -return fetchData(toFetchOffset, pollTimeoutMs) +return fetchData(toFetchOffset, untilOffset, pollTimeoutMs, failOnDataLoss) } catch { case e: OffsetOutOfRangeException => // When there is some error thrown, it's better to use a new consumer to drop all cached @@ -159,14 +159,18 @@ private[kafka010] case class CachedKafkaConsumer private( } /** - * Get the record at `offset`. + * Get the record for the
spark git commit: [SPARK-18588][TESTS] Fix flaky test: KafkaSourceStressForDontFailOnDataLossSuite
Repository: spark Updated Branches: refs/heads/branch-2.1 76e1f1651 -> e9b3afac9 [SPARK-18588][TESTS] Fix flaky test: KafkaSourceStressForDontFailOnDataLossSuite ## What changes were proposed in this pull request? Fixed the following failures: ``` org.scalatest.exceptions.TestFailedDueToTimeoutException: The code passed to eventually never returned normally. Attempted 3745 times over 1.79085165 minutes. Last failure message: assertion failed: failOnDataLoss-0 not deleted after timeout. ``` ``` sbt.ForkMain$ForkError: org.apache.spark.sql.streaming.StreamingQueryException: Query query-66 terminated with exception: null at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:252) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:146) Caused by: sbt.ForkMain$ForkError: java.lang.NullPointerException: null at java.util.ArrayList.addAll(ArrayList.java:577) at org.apache.kafka.clients.Metadata.getClusterForCurrentTopics(Metadata.java:257) at org.apache.kafka.clients.Metadata.update(Metadata.java:177) at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.handleResponse(NetworkClient.java:605) at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeHandleCompletedReceive(NetworkClient.java:582) at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:450) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:269) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitPendingRequests(ConsumerNetworkClient.java:260) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:222) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:366) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:978) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938) at ... ``` ## How was this patch tested? Tested in #16048 by running many times. Author: Shixiong Zhu Closes #16109 from zsxwing/fix-kafka-flaky-test. (cherry picked from commit edc87e18922b98be47c298cdc3daa2b049a737e9) Signed-off-by: Tathagata Das Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e9b3afac Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e9b3afac Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e9b3afac Branch: refs/heads/branch-2.1 Commit: e9b3afac9ce5ea4bffb8201a58856598c521a3a9 Parents: 76e1f16 Author: Shixiong Zhu Authored: Wed Dec 7 13:47:44 2016 -0800 Committer: Tathagata Das Committed: Wed Dec 7 13:47:54 2016 -0800 -- .../sql/kafka010/CachedKafkaConsumer.scala | 39 -- .../apache/spark/sql/kafka010/KafkaSource.scala | 2 +- .../spark/sql/kafka010/KafkaSourceSuite.scala | 11 ++- .../spark/sql/kafka010/KafkaTestUtils.scala | 75 +--- .../spark/sql/test/SharedSQLContext.scala | 8 ++- 5 files changed, 96 insertions(+), 39 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e9b3afac/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala -- diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala index 3f438e9..3f396a7 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala @@ -86,7 +86,7 @@ private[kafka010] case class CachedKafkaConsumer private( var toFetchOffset = offset while (toFetchOffset != UNKNOWN_OFFSET) { try { -return fetchData(toFetchOffset, pollTimeoutMs) +return fetchData(toFetchOffset, untilOffset, pollTimeoutMs, failOnDataLoss) } catch { case e: OffsetOutOfRangeException => // When there is some error thrown, it's better to use a new consumer to drop all cached @@ -159,14 +159,18 @@ private[kafka010] case clas