[GitHub] spark pull request #22507: [SPARK-25495][SS]FetchedData.reset should reset a...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/22507 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22507: [SPARK-25495][SS]FetchedData.reset should reset a...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22507#discussion_r220286600 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala --- @@ -874,6 +874,57 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { ) } } + + test("SPARK-25495: FetchedData.reset should reset all fields") { +val topic = newTopic() +val topicPartition = new TopicPartition(topic, 0) +testUtils.createTopic(topic, partitions = 1) + +val ds = spark + .readStream + .format("kafka") + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("kafka.metadata.max.age.ms", "1") + .option("kafka.isolation.level", "read_committed") + .option("subscribe", topic) + .option("startingOffsets", "earliest") + .load() + .select($"value".as[String]) + +testUtils.withTranscationalProducer { producer => + producer.beginTransaction() + (0 to 3).foreach { i => +producer.send(new ProducerRecord[String, String](topic, i.toString)).get() + } + producer.commitTransaction() +} +testUtils.waitUntilOffsetAppears(topicPartition, 5) + +val q = ds.writeStream.foreachBatch { (ds, epochId) => + if (epochId == 0) { +// Post more messages to Kafka so that the executors will fetch messages in the next batch +// and drop them. In this case, if we forget to reset `FetchedData._nextOffsetInFetchedData` --- End diff -- Send more messages here so that the executor will prefetch 8 records even if the driver tells it to fetch just 4 records. As there is a gap caused by the commit marker, `InternalKafkaConsumer` will call `reset` to drop the prefetched data. However, before my fix, it will not reset `_nextOffsetInFetchedData`. When running the next batch, the offset we try to fetch will match `_nextOffsetInFetchedData` and hit this line: https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala#L360 As we already dropped the records, `fetchedData.hasNext` will be `false`. In addition, `offset` is also < `fetchedData.offsetAfterPoll`, we will just skip records between `offset` and `fetchedData.offsetAfterPoll`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22507: [SPARK-25495][SS]FetchedData.reset should reset a...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/22507#discussion_r220061053 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala --- @@ -874,6 +874,57 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { ) } } + + test("SPARK-25495: FetchedData.reset should reset all fields") { +val topic = newTopic() +val topicPartition = new TopicPartition(topic, 0) +testUtils.createTopic(topic, partitions = 1) + +val ds = spark + .readStream + .format("kafka") + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("kafka.metadata.max.age.ms", "1") + .option("kafka.isolation.level", "read_committed") + .option("subscribe", topic) + .option("startingOffsets", "earliest") + .load() + .select($"value".as[String]) + +testUtils.withTranscationalProducer { producer => + producer.beginTransaction() + (0 to 3).foreach { i => +producer.send(new ProducerRecord[String, String](topic, i.toString)).get() + } + producer.commitTransaction() +} +testUtils.waitUntilOffsetAppears(topicPartition, 5) + +val q = ds.writeStream.foreachBatch { (ds, epochId) => + if (epochId == 0) { +// Post more messages to Kafka so that the executors will fetch messages in the next batch +// and drop them. In this case, if we forget to reset `FetchedData._nextOffsetInFetchedData` --- End diff -- Make it clear that you " want to send more message *before* the tasks of the current batch start reading the current batch data, so that the executors also, I am not entirely sure how it causes `fetchedData.reset()` thus creating the issue. Are you sure this fails without your fix? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22507: [SPARK-25495][SS]FetchedData.reset should reset a...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/22507#discussion_r220059919 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala --- @@ -874,6 +874,57 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { ) } } + + test("SPARK-25495: FetchedData.reset should reset all fields") { +val topic = newTopic() +val topicPartition = new TopicPartition(topic, 0) +testUtils.createTopic(topic, partitions = 1) + +val ds = spark + .readStream + .format("kafka") + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("kafka.metadata.max.age.ms", "1") + .option("kafka.isolation.level", "read_committed") + .option("subscribe", topic) + .option("startingOffsets", "earliest") + .load() + .select($"value".as[String]) + +testUtils.withTranscationalProducer { producer => + producer.beginTransaction() + (0 to 3).foreach { i => +producer.send(new ProducerRecord[String, String](topic, i.toString)).get() + } + producer.commitTransaction() +} +testUtils.waitUntilOffsetAppears(topicPartition, 5) + +val q = ds.writeStream.foreachBatch { (ds, epochId) => + if (epochId == 0) { +// Post more messages to Kafka so that the executors will fetch messages in the next batch +// and drop them. In this case, if we forget to reset `FetchedData._nextOffsetInFetchedData` --- End diff -- what do you mean by drop them? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22507: [SPARK-25495][SS]FetchedData.reset should reset a...
GitHub user zsxwing opened a pull request: https://github.com/apache/spark/pull/22507 [SPARK-25495][SS]FetchedData.reset should reset all fields ## What changes were proposed in this pull request? `FetchedData.reset` should reset `_nextOffsetInFetchedData` and `_offsetAfterPoll`. Otherwise it will cause inconsistent cached data and may make Kafka connector return wrong results. ## How was this patch tested? The new unit test. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zsxwing/spark fix-kafka-reset Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22507.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22507 commit 1c8ef21a0d68154d9229afa2b117d3f19688a1a6 Author: Shixiong Zhu Date: 2018-09-20T22:49:19Z fix reset --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org