[GitHub] spark pull request #22507: [SPARK-25495][SS]FetchedData.reset should reset a...

2018-09-25 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/22507


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22507: [SPARK-25495][SS]FetchedData.reset should reset a...

2018-09-25 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22507#discussion_r220286600
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
 ---
@@ -874,6 +874,57 @@ abstract class KafkaMicroBatchSourceSuiteBase extends 
KafkaSourceSuiteBase {
   )
 }
   }
+
+  test("SPARK-25495: FetchedData.reset should reset all fields") {
+val topic = newTopic()
+val topicPartition = new TopicPartition(topic, 0)
+testUtils.createTopic(topic, partitions = 1)
+
+val ds = spark
+  .readStream
+  .format("kafka")
+  .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+  .option("kafka.metadata.max.age.ms", "1")
+  .option("kafka.isolation.level", "read_committed")
+  .option("subscribe", topic)
+  .option("startingOffsets", "earliest")
+  .load()
+  .select($"value".as[String])
+
+testUtils.withTranscationalProducer { producer =>
+  producer.beginTransaction()
+  (0 to 3).foreach { i =>
+producer.send(new ProducerRecord[String, String](topic, 
i.toString)).get()
+  }
+  producer.commitTransaction()
+}
+testUtils.waitUntilOffsetAppears(topicPartition, 5)
+
+val q = ds.writeStream.foreachBatch { (ds, epochId) =>
+  if (epochId == 0) {
+// Post more messages to Kafka so that the executors will fetch 
messages in the next batch
+// and drop them. In this case, if we forget to reset 
`FetchedData._nextOffsetInFetchedData`
--- End diff --

Send more messages here so that the executor will prefetch 8 records even 
if the driver tells it to fetch just 4 records. As there is a gap caused by the 
commit marker, `InternalKafkaConsumer` will call `reset` to drop the prefetched 
data. However, before my fix, it will not reset `_nextOffsetInFetchedData`. 
When running the next batch, the offset we try to fetch will match 
`_nextOffsetInFetchedData` and hit this line: 
https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala#L360

As we already dropped the records, `fetchedData.hasNext` will be `false`. 
In addition, `offset` is also < `fetchedData.offsetAfterPoll`, we will just 
skip records between `offset` and `fetchedData.offsetAfterPoll`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22507: [SPARK-25495][SS]FetchedData.reset should reset a...

2018-09-24 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/22507#discussion_r220061053
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
 ---
@@ -874,6 +874,57 @@ abstract class KafkaMicroBatchSourceSuiteBase extends 
KafkaSourceSuiteBase {
   )
 }
   }
+
+  test("SPARK-25495: FetchedData.reset should reset all fields") {
+val topic = newTopic()
+val topicPartition = new TopicPartition(topic, 0)
+testUtils.createTopic(topic, partitions = 1)
+
+val ds = spark
+  .readStream
+  .format("kafka")
+  .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+  .option("kafka.metadata.max.age.ms", "1")
+  .option("kafka.isolation.level", "read_committed")
+  .option("subscribe", topic)
+  .option("startingOffsets", "earliest")
+  .load()
+  .select($"value".as[String])
+
+testUtils.withTranscationalProducer { producer =>
+  producer.beginTransaction()
+  (0 to 3).foreach { i =>
+producer.send(new ProducerRecord[String, String](topic, 
i.toString)).get()
+  }
+  producer.commitTransaction()
+}
+testUtils.waitUntilOffsetAppears(topicPartition, 5)
+
+val q = ds.writeStream.foreachBatch { (ds, epochId) =>
+  if (epochId == 0) {
+// Post more messages to Kafka so that the executors will fetch 
messages in the next batch
+// and drop them. In this case, if we forget to reset 
`FetchedData._nextOffsetInFetchedData`
--- End diff --

Make it clear that you " want to send more message *before* the tasks of 
the current batch start reading the current batch data, so that the executors 


also, I am not entirely sure how it causes `fetchedData.reset()` thus 
creating the issue. Are you sure this fails without your fix?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22507: [SPARK-25495][SS]FetchedData.reset should reset a...

2018-09-24 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/22507#discussion_r220059919
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
 ---
@@ -874,6 +874,57 @@ abstract class KafkaMicroBatchSourceSuiteBase extends 
KafkaSourceSuiteBase {
   )
 }
   }
+
+  test("SPARK-25495: FetchedData.reset should reset all fields") {
+val topic = newTopic()
+val topicPartition = new TopicPartition(topic, 0)
+testUtils.createTopic(topic, partitions = 1)
+
+val ds = spark
+  .readStream
+  .format("kafka")
+  .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+  .option("kafka.metadata.max.age.ms", "1")
+  .option("kafka.isolation.level", "read_committed")
+  .option("subscribe", topic)
+  .option("startingOffsets", "earliest")
+  .load()
+  .select($"value".as[String])
+
+testUtils.withTranscationalProducer { producer =>
+  producer.beginTransaction()
+  (0 to 3).foreach { i =>
+producer.send(new ProducerRecord[String, String](topic, 
i.toString)).get()
+  }
+  producer.commitTransaction()
+}
+testUtils.waitUntilOffsetAppears(topicPartition, 5)
+
+val q = ds.writeStream.foreachBatch { (ds, epochId) =>
+  if (epochId == 0) {
+// Post more messages to Kafka so that the executors will fetch 
messages in the next batch
+// and drop them. In this case, if we forget to reset 
`FetchedData._nextOffsetInFetchedData`
--- End diff --

what do you mean by drop them?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22507: [SPARK-25495][SS]FetchedData.reset should reset a...

2018-09-20 Thread zsxwing
GitHub user zsxwing opened a pull request:

https://github.com/apache/spark/pull/22507

[SPARK-25495][SS]FetchedData.reset should reset all fields

## What changes were proposed in this pull request?

`FetchedData.reset` should reset `_nextOffsetInFetchedData` and 
`_offsetAfterPoll`. Otherwise it will cause inconsistent cached data and may 
make Kafka connector return wrong results.

## How was this patch tested?

The new unit test.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zsxwing/spark fix-kafka-reset

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22507.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22507


commit 1c8ef21a0d68154d9229afa2b117d3f19688a1a6
Author: Shixiong Zhu 
Date:   2018-09-20T22:49:19Z

fix reset




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org