spark git commit: [SPARK-18588][TESTS] Fix flaky test: KafkaSourceStressForDontFailOnDataLossSuite

2016-12-07 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/master bb94f61a7 -> edc87e189


[SPARK-18588][TESTS] Fix flaky test: KafkaSourceStressForDontFailOnDataLossSuite

## What changes were proposed in this pull request?

Fixed the following failures:

```
org.scalatest.exceptions.TestFailedDueToTimeoutException: The code passed to 
eventually never returned normally. Attempted 3745 times over 
1.79085165 minutes. Last failure message: assertion failed: 
failOnDataLoss-0 not deleted after timeout.
```

```
sbt.ForkMain$ForkError: org.apache.spark.sql.streaming.StreamingQueryException: 
Query query-66 terminated with exception: null
at 
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:252)
at 
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:146)
Caused by: sbt.ForkMain$ForkError: java.lang.NullPointerException: null
at java.util.ArrayList.addAll(ArrayList.java:577)
at 
org.apache.kafka.clients.Metadata.getClusterForCurrentTopics(Metadata.java:257)
at org.apache.kafka.clients.Metadata.update(Metadata.java:177)
at 
org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.handleResponse(NetworkClient.java:605)
at 
org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeHandleCompletedReceive(NetworkClient.java:582)
at 
org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:450)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:269)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitPendingRequests(ConsumerNetworkClient.java:260)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:222)
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:366)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:978)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938)
at
...
```

## How was this patch tested?

Tested in #16048 by running many times.

Author: Shixiong Zhu 

Closes #16109 from zsxwing/fix-kafka-flaky-test.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/edc87e18
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/edc87e18
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/edc87e18

Branch: refs/heads/master
Commit: edc87e18922b98be47c298cdc3daa2b049a737e9
Parents: bb94f61
Author: Shixiong Zhu 
Authored: Wed Dec 7 13:47:44 2016 -0800
Committer: Tathagata Das 
Committed: Wed Dec 7 13:47:44 2016 -0800

--
 .../sql/kafka010/CachedKafkaConsumer.scala  | 39 --
 .../apache/spark/sql/kafka010/KafkaSource.scala |  2 +-
 .../spark/sql/kafka010/KafkaSourceSuite.scala   | 11 ++-
 .../spark/sql/kafka010/KafkaTestUtils.scala | 75 +---
 .../spark/sql/test/SharedSQLContext.scala   |  8 ++-
 5 files changed, 96 insertions(+), 39 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/edc87e18/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
--
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
index 3f438e9..3f396a7 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
@@ -86,7 +86,7 @@ private[kafka010] case class CachedKafkaConsumer private(
 var toFetchOffset = offset
 while (toFetchOffset != UNKNOWN_OFFSET) {
   try {
-return fetchData(toFetchOffset, pollTimeoutMs)
+return fetchData(toFetchOffset, untilOffset, pollTimeoutMs, 
failOnDataLoss)
   } catch {
 case e: OffsetOutOfRangeException =>
   // When there is some error thrown, it's better to use a new 
consumer to drop all cached
@@ -159,14 +159,18 @@ private[kafka010] case class CachedKafkaConsumer private(
   }
 
   /**
-   * Get the record at `offset`.
+   * Get the record for the 

spark git commit: [SPARK-18588][TESTS] Fix flaky test: KafkaSourceStressForDontFailOnDataLossSuite

2016-12-07 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 76e1f1651 -> e9b3afac9


[SPARK-18588][TESTS] Fix flaky test: KafkaSourceStressForDontFailOnDataLossSuite

## What changes were proposed in this pull request?

Fixed the following failures:

```
org.scalatest.exceptions.TestFailedDueToTimeoutException: The code passed to 
eventually never returned normally. Attempted 3745 times over 
1.79085165 minutes. Last failure message: assertion failed: 
failOnDataLoss-0 not deleted after timeout.
```

```
sbt.ForkMain$ForkError: org.apache.spark.sql.streaming.StreamingQueryException: 
Query query-66 terminated with exception: null
at 
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:252)
at 
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:146)
Caused by: sbt.ForkMain$ForkError: java.lang.NullPointerException: null
at java.util.ArrayList.addAll(ArrayList.java:577)
at 
org.apache.kafka.clients.Metadata.getClusterForCurrentTopics(Metadata.java:257)
at org.apache.kafka.clients.Metadata.update(Metadata.java:177)
at 
org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.handleResponse(NetworkClient.java:605)
at 
org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeHandleCompletedReceive(NetworkClient.java:582)
at 
org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:450)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:269)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitPendingRequests(ConsumerNetworkClient.java:260)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:222)
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:366)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:978)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938)
at
...
```

## How was this patch tested?

Tested in #16048 by running many times.

Author: Shixiong Zhu 

Closes #16109 from zsxwing/fix-kafka-flaky-test.

(cherry picked from commit edc87e18922b98be47c298cdc3daa2b049a737e9)
Signed-off-by: Tathagata Das 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e9b3afac
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e9b3afac
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e9b3afac

Branch: refs/heads/branch-2.1
Commit: e9b3afac9ce5ea4bffb8201a58856598c521a3a9
Parents: 76e1f16
Author: Shixiong Zhu 
Authored: Wed Dec 7 13:47:44 2016 -0800
Committer: Tathagata Das 
Committed: Wed Dec 7 13:47:54 2016 -0800

--
 .../sql/kafka010/CachedKafkaConsumer.scala  | 39 --
 .../apache/spark/sql/kafka010/KafkaSource.scala |  2 +-
 .../spark/sql/kafka010/KafkaSourceSuite.scala   | 11 ++-
 .../spark/sql/kafka010/KafkaTestUtils.scala | 75 +---
 .../spark/sql/test/SharedSQLContext.scala   |  8 ++-
 5 files changed, 96 insertions(+), 39 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/e9b3afac/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
--
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
index 3f438e9..3f396a7 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
@@ -86,7 +86,7 @@ private[kafka010] case class CachedKafkaConsumer private(
 var toFetchOffset = offset
 while (toFetchOffset != UNKNOWN_OFFSET) {
   try {
-return fetchData(toFetchOffset, pollTimeoutMs)
+return fetchData(toFetchOffset, untilOffset, pollTimeoutMs, 
failOnDataLoss)
   } catch {
 case e: OffsetOutOfRangeException =>
   // When there is some error thrown, it's better to use a new 
consumer to drop all cached
@@ -159,14 +159,18 @@ private[kafka010] case clas