[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38911: [SPARK-41387][SS] Assert current end offset from Kafka data source for Trigger.AvailableNow
HeartSaVioR commented on code in PR #38911: URL: https://github.com/apache/spark/pull/38911#discussion_r1047895867 ## connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala: ## @@ -316,6 +320,50 @@ private[kafka010] class KafkaMicroBatchStream( } } + private def assertEndOffsetForTriggerAvailableNow( Review Comment: Yeah my initial intention was to guard against the code bug, but now it's more than the intention. Good point. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38911: [SPARK-41387][SS] Assert current end offset from Kafka data source for Trigger.AvailableNow
HeartSaVioR commented on code in PR #38911: URL: https://github.com/apache/spark/pull/38911#discussion_r1047895383 ## connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala: ## @@ -316,6 +320,50 @@ private[kafka010] class KafkaMicroBatchStream( } } + private def assertEndOffsetForTriggerAvailableNow( + endPartitionOffsets: Map[TopicPartition, Long]): Unit = { +val tpsForPrefetched = allDataForTriggerAvailableNow.keySet +val tpsForEndOffset = endPartitionOffsets.keySet + +if (tpsForPrefetched != tpsForEndOffset) { + throw KafkaExceptions.topicPartitionsInEndOffsetAreNotSameWithPrefetched( +tpsForPrefetched, tpsForEndOffset) +} + +val endOffsetHasGreaterThanPrefetched = { + allDataForTriggerAvailableNow.keySet.exists { tp => +val offsetFromPrefetched = allDataForTriggerAvailableNow(tp) +val offsetFromEndOffset = endPartitionOffsets(tp) +offsetFromEndOffset > offsetFromPrefetched + } +} +if (endOffsetHasGreaterThanPrefetched) { + throw KafkaExceptions.endOffsetHasGreaterOffsetForTopicPartitionThanPrefetched( +allDataForTriggerAvailableNow, endPartitionOffsets) +} + +val latestOffsets = kafkaOffsetReader.fetchLatestOffsets(Some(endPartitionOffsets)) Review Comment: Yeah sorry I meant if it's not turned on (turned off). > If we tolerate with 'processing time' trigger, any reason why we shouldn't tolerate here (and avoid waiting forever). > E.g. we could reset the offsets. Source may be able to do the smart thing, e.g. end offset being built by preparation can change based on the change of latest offset, but it would be also tricky. Maybe someone has bright idea to rebuild the end offset with consideration of failOnDataLoss, this would be better. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38911: [SPARK-41387][SS] Assert current end offset from Kafka data source for Trigger.AvailableNow
HeartSaVioR commented on code in PR #38911: URL: https://github.com/apache/spark/pull/38911#discussion_r1042916237 ## connector/kafka-0-10-sql/src/test/resources/error/kafka-error-classes.json: ## @@ -0,0 +1,26 @@ +{ + "TOPIC_PARTITIONS_IN_END_OFFSET_ARE_NOT_SAME_WITH_PREFETCHED" : { Review Comment: I just went with MISMATCHED_TOPIC_PARTITIONS_BETWEEN_END_OFFSET_AND_PREFETCHED although it is still lengthy. Probably I can't change this till I understand what is the purpose and what's preference, via getting answers from above two questions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38911: [SPARK-41387][SS] Assert current end offset from Kafka data source for Trigger.AvailableNow
HeartSaVioR commented on code in PR #38911: URL: https://github.com/apache/spark/pull/38911#discussion_r1042892403 ## connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala: ## @@ -234,6 +235,108 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { assert(index == 3) } + test("Query with Trigger.AvailableNow should throw error when topic partitions got unavailable " + +"during subsequent batches") { +val topic = newTopic() +testUtils.createTopic(topic, partitions = 5) + +testUtils.sendMessages(topic, (0 until 15).map { case x => + s"foo-$x" +}.toArray, Some(0)) + +val reader = spark + .readStream + .format("kafka") + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("kafka.metadata.max.age.ms", "1") + .option("maxOffsetsPerTrigger", 5) + .option("subscribe", topic) + .option("startingOffsets", "earliest") + // the query should fail regardless of this option + .option("failOnDataLoss", "true") + .load() + +def startTriggerAvailableNowQuery(): StreamingQuery = { + reader.writeStream +.foreachBatch((_: Dataset[Row], batchId: Long) => { + testUtils.deleteTopic(topic) + // create partitions less than the kafka data source figured out as an end state + testUtils.createTopic(topic, partitions = 3) + // offset will keep the same + testUtils.sendMessages(topic, (0 until 15).map { case x => +s"foo-$x" + }.toArray, Some(0)) + null.asInstanceOf[Unit] +}) +.trigger(Trigger.AvailableNow) +.start() +} + +val exc = intercept[Exception] { + val query = startTriggerAvailableNowQuery() + try { +assert(query.awaitTermination(streamingTimeout.toMillis)) + } finally { +query.stop() + } +} +TestUtils.assertExceptionMsg(exc, "Some of partitions in Kafka topic(s) have been lost " + Review Comment: Anyway the necessity for improvement of error class framework should not be scoped to this PR. We could leverage Kafka data source to dogfood for 3rd party data source implementation, but it needs to be different effort. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38911: [SPARK-41387][SS] Assert current end offset from Kafka data source for Trigger.AvailableNow
HeartSaVioR commented on code in PR #38911: URL: https://github.com/apache/spark/pull/38911#discussion_r1042860224 ## connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala: ## @@ -316,6 +320,50 @@ private[kafka010] class KafkaMicroBatchStream( } } + private def assertEndOffsetForTriggerAvailableNow( + endPartitionOffsets: Map[TopicPartition, Long]): Unit = { +val tpsForPrefetched = allDataForTriggerAvailableNow.keySet +val tpsForEndOffset = endPartitionOffsets.keySet + +if (tpsForPrefetched != tpsForEndOffset) { + throw KafkaExceptions.topicPartitionsInEndOffsetAreNotSameWithPrefetched( +tpsForPrefetched, tpsForEndOffset) +} + +val endOffsetHasGreaterThanPrefetched = { + allDataForTriggerAvailableNow.keySet.exists { tp => +val offsetFromPrefetched = allDataForTriggerAvailableNow(tp) +val offsetFromEndOffset = endPartitionOffsets(tp) +offsetFromEndOffset > offsetFromPrefetched + } +} +if (endOffsetHasGreaterThanPrefetched) { + throw KafkaExceptions.endOffsetHasGreaterOffsetForTopicPartitionThanPrefetched( +allDataForTriggerAvailableNow, endPartitionOffsets) +} + +val latestOffsets = kafkaOffsetReader.fetchLatestOffsets(Some(endPartitionOffsets)) Review Comment: The thing is, when the query is running with AvailableNow, there is an "end" condition, and if the offset is off against the direction to reach an end condition, the query will run infinitely which is definitely not what users are desired. Huge difference with other triggers. And also, in other triggers, they fetch the latest information in each batch already. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38911: [SPARK-41387][SS] Assert current end offset from Kafka data source for Trigger.AvailableNow
HeartSaVioR commented on code in PR #38911: URL: https://github.com/apache/spark/pull/38911#discussion_r1042860224 ## connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala: ## @@ -316,6 +320,50 @@ private[kafka010] class KafkaMicroBatchStream( } } + private def assertEndOffsetForTriggerAvailableNow( + endPartitionOffsets: Map[TopicPartition, Long]): Unit = { +val tpsForPrefetched = allDataForTriggerAvailableNow.keySet +val tpsForEndOffset = endPartitionOffsets.keySet + +if (tpsForPrefetched != tpsForEndOffset) { + throw KafkaExceptions.topicPartitionsInEndOffsetAreNotSameWithPrefetched( +tpsForPrefetched, tpsForEndOffset) +} + +val endOffsetHasGreaterThanPrefetched = { + allDataForTriggerAvailableNow.keySet.exists { tp => +val offsetFromPrefetched = allDataForTriggerAvailableNow(tp) +val offsetFromEndOffset = endPartitionOffsets(tp) +offsetFromEndOffset > offsetFromPrefetched + } +} +if (endOffsetHasGreaterThanPrefetched) { + throw KafkaExceptions.endOffsetHasGreaterOffsetForTopicPartitionThanPrefetched( +allDataForTriggerAvailableNow, endPartitionOffsets) +} + +val latestOffsets = kafkaOffsetReader.fetchLatestOffsets(Some(endPartitionOffsets)) Review Comment: The thing is, when the query is running with AvailableNow, there is an "end" condition, and if the offset is off against the direction to reach an end condition, the query will run infinitely. Huge difference with other triggers. And also, in other triggers, they fetch the latest information in each batch already. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38911: [SPARK-41387][SS] Assert current end offset from Kafka data source for Trigger.AvailableNow
HeartSaVioR commented on code in PR #38911: URL: https://github.com/apache/spark/pull/38911#discussion_r1042858825 ## connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala: ## @@ -194,6 +194,10 @@ private[kafka010] class KafkaMicroBatchStream( val startPartitionOffsets = start.asInstanceOf[KafkaSourceOffset].partitionToOffsets val endPartitionOffsets = end.asInstanceOf[KafkaSourceOffset].partitionToOffsets +if (allDataForTriggerAvailableNow != null) { Review Comment: This is effectively done via two assertions (topic partitions, offsets) in assertEndOffsetForTriggerAvailableNow. Do you mean by having sanity check for non Trigger.AvailableNow case? KafkaOffsetReader.getOffsetRangesFromResolvedOffsets handles it, and we have to lean on this because setting failOnDataLoss to false allows end topic offsets is smaller than start topic offsets. (I would argue it is scary how many fault scenarios the option simply tolerates (ignores) but it's not something we can make changes.) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38911: [SPARK-41387][SS] Assert current end offset from Kafka data source for Trigger.AvailableNow
HeartSaVioR commented on code in PR #38911: URL: https://github.com/apache/spark/pull/38911#discussion_r1042839180 ## connector/kafka-0-10-sql/src/test/resources/error/kafka-error-classes.json: ## @@ -0,0 +1,26 @@ +{ + "TOPIC_PARTITIONS_IN_END_OFFSET_ARE_NOT_SAME_WITH_PREFETCHED" : { Review Comment: I admit I just wrote a sentence with underbar. But would it work if I use abbr of the expression, e.g. TPS as topic partitions to make it shorter? What should we take between shorter vs clearer? `MISMATCHED_TOPIC_PARTITIONS_BETWEEN_END_OFFSET_AND_PREFETCHED` this is still very long. We can't discard the part of mentioning two targets because we have similar error type with different targets of comparison. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38911: [SPARK-41387][SS] Assert current end offset from Kafka data source for Trigger.AvailableNow
HeartSaVioR commented on code in PR #38911: URL: https://github.com/apache/spark/pull/38911#discussion_r1042805980 ## connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala: ## @@ -234,6 +235,108 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { assert(index == 3) } + test("Query with Trigger.AvailableNow should throw error when topic partitions got unavailable " + +"during subsequent batches") { +val topic = newTopic() +testUtils.createTopic(topic, partitions = 5) + +testUtils.sendMessages(topic, (0 until 15).map { case x => + s"foo-$x" +}.toArray, Some(0)) + +val reader = spark + .readStream + .format("kafka") + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("kafka.metadata.max.age.ms", "1") + .option("maxOffsetsPerTrigger", 5) + .option("subscribe", topic) + .option("startingOffsets", "earliest") + // the query should fail regardless of this option + .option("failOnDataLoss", "true") + .load() + +def startTriggerAvailableNowQuery(): StreamingQuery = { + reader.writeStream +.foreachBatch((_: Dataset[Row], batchId: Long) => { + testUtils.deleteTopic(topic) + // create partitions less than the kafka data source figured out as an end state + testUtils.createTopic(topic, partitions = 3) + // offset will keep the same + testUtils.sendMessages(topic, (0 until 15).map { case x => +s"foo-$x" + }.toArray, Some(0)) + null.asInstanceOf[Unit] +}) +.trigger(Trigger.AvailableNow) +.start() +} + +val exc = intercept[Exception] { + val query = startTriggerAvailableNowQuery() + try { +assert(query.awaitTermination(streamingTimeout.toMillis)) + } finally { +query.stop() + } +} +TestUtils.assertExceptionMsg(exc, "Some of partitions in Kafka topic(s) have been lost " + Review Comment: Stand on the 3rd party developer and go with journey how they can use the classes to integrate their data source to the error class framework. I know there is a README file but I don't think it's friendly to 3rd party developer. https://github.com/apache/spark/tree/master/core/src/main/resources/error/README.md 1. They have to define their own error-class.json as the file is not modificable. 2. We guide them to leverage SparkThrowable but default implementation is tied to Spark's error class json so they will be surprised that it just doesn't work and they have to override every default implementation. No documentation for this. 3. They may understand what error class intends to do, but if they have to go with uncategorized error then they have no idea how they can ensure picking up unused sequence number. 4. I believe classifying internal vs user-facing error is one of key points for UX of error class framework, but there is no mention in the README. Actually someone would have no idea how error class framework will show the exception to the end users. If they expect the same, they will be surprised for how we handle internal errors separately. This error class framework guideline is not mentioned anywhere in the data source implementation doc(we don't have one actually)/code comment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38911: [SPARK-41387][SS] Assert current end offset from Kafka data source for Trigger.AvailableNow
HeartSaVioR commented on code in PR #38911: URL: https://github.com/apache/spark/pull/38911#discussion_r1042805980 ## connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala: ## @@ -234,6 +235,108 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { assert(index == 3) } + test("Query with Trigger.AvailableNow should throw error when topic partitions got unavailable " + +"during subsequent batches") { +val topic = newTopic() +testUtils.createTopic(topic, partitions = 5) + +testUtils.sendMessages(topic, (0 until 15).map { case x => + s"foo-$x" +}.toArray, Some(0)) + +val reader = spark + .readStream + .format("kafka") + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("kafka.metadata.max.age.ms", "1") + .option("maxOffsetsPerTrigger", 5) + .option("subscribe", topic) + .option("startingOffsets", "earliest") + // the query should fail regardless of this option + .option("failOnDataLoss", "true") + .load() + +def startTriggerAvailableNowQuery(): StreamingQuery = { + reader.writeStream +.foreachBatch((_: Dataset[Row], batchId: Long) => { + testUtils.deleteTopic(topic) + // create partitions less than the kafka data source figured out as an end state + testUtils.createTopic(topic, partitions = 3) + // offset will keep the same + testUtils.sendMessages(topic, (0 until 15).map { case x => +s"foo-$x" + }.toArray, Some(0)) + null.asInstanceOf[Unit] +}) +.trigger(Trigger.AvailableNow) +.start() +} + +val exc = intercept[Exception] { + val query = startTriggerAvailableNowQuery() + try { +assert(query.awaitTermination(streamingTimeout.toMillis)) + } finally { +query.stop() + } +} +TestUtils.assertExceptionMsg(exc, "Some of partitions in Kafka topic(s) have been lost " + Review Comment: Stand on the 3rd party developer and go with journey how they can use the classes to integrate their data source to the error class framework. I know there is a README file but I don't think it's friendly to 3rd party developer. https://github.com/apache/spark/tree/master/core/src/main/resources/error/README.md 1. They have to define their own error-class.json as the file is not modificable. 2. We guide them to leverage SparkThrowable but default implementation is tied to Spark's error class json so they will be surprised that it just doesn't work and they have to override every default implementation. No documentation for this. 3. They may understand what error class intends to do, but if they have to go with uncategorized error then they have no idea how they can ensure picking up unused sequence number. 4. I believe classifying internal vs user-facing error is one of key points for UX of error class framework, but there is no mention in the README. Actually someone would have no idea how error class framework will show the exception to the end users. If they expect the same, they will be surprised for how we handle internal errors separately. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38911: [SPARK-41387][SS] Assert current end offset from Kafka data source for Trigger.AvailableNow
HeartSaVioR commented on code in PR #38911: URL: https://github.com/apache/spark/pull/38911#discussion_r1042795449 ## connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala: ## @@ -316,6 +320,50 @@ private[kafka010] class KafkaMicroBatchStream( } } + private def assertEndOffsetForTriggerAvailableNow( + endPartitionOffsets: Map[TopicPartition, Long]): Unit = { +val tpsForPrefetched = allDataForTriggerAvailableNow.keySet +val tpsForEndOffset = endPartitionOffsets.keySet + +if (tpsForPrefetched != tpsForEndOffset) { + throw KafkaExceptions.topicPartitionsInEndOffsetAreNotSameWithPrefetched( +tpsForPrefetched, tpsForEndOffset) +} + +val endOffsetHasGreaterThanPrefetched = { + allDataForTriggerAvailableNow.keySet.exists { tp => +val offsetFromPrefetched = allDataForTriggerAvailableNow(tp) +val offsetFromEndOffset = endPartitionOffsets(tp) +offsetFromEndOffset > offsetFromPrefetched + } +} +if (endOffsetHasGreaterThanPrefetched) { + throw KafkaExceptions.endOffsetHasGreaterOffsetForTopicPartitionThanPrefetched( +allDataForTriggerAvailableNow, endPartitionOffsets) +} + +val latestOffsets = kafkaOffsetReader.fetchLatestOffsets(Some(endPartitionOffsets)) Review Comment: If failOnDataLoss is turned on, we tolerate the removal of topic partition, and allow offset to go "backward". -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38911: [SPARK-41387][SS] Assert current end offset from Kafka data source for Trigger.AvailableNow
HeartSaVioR commented on code in PR #38911: URL: https://github.com/apache/spark/pull/38911#discussion_r1042172068 ## connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala: ## @@ -234,6 +235,108 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { assert(index == 3) } + test("Query with Trigger.AvailableNow should throw error when topic partitions got unavailable " + +"during subsequent batches") { +val topic = newTopic() +testUtils.createTopic(topic, partitions = 5) + +testUtils.sendMessages(topic, (0 until 15).map { case x => + s"foo-$x" +}.toArray, Some(0)) + +val reader = spark + .readStream + .format("kafka") + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("kafka.metadata.max.age.ms", "1") + .option("maxOffsetsPerTrigger", 5) + .option("subscribe", topic) + .option("startingOffsets", "earliest") + // the query should fail regardless of this option + .option("failOnDataLoss", "true") + .load() + +def startTriggerAvailableNowQuery(): StreamingQuery = { + reader.writeStream +.foreachBatch((_: Dataset[Row], batchId: Long) => { + testUtils.deleteTopic(topic) + // create partitions less than the kafka data source figured out as an end state + testUtils.createTopic(topic, partitions = 3) + // offset will keep the same + testUtils.sendMessages(topic, (0 until 15).map { case x => +s"foo-$x" + }.toArray, Some(0)) + null.asInstanceOf[Unit] +}) +.trigger(Trigger.AvailableNow) +.start() +} + +val exc = intercept[Exception] { + val query = startTriggerAvailableNowQuery() + try { +assert(query.awaitTermination(streamingTimeout.toMillis)) + } finally { +query.stop() + } +} +TestUtils.assertExceptionMsg(exc, "Some of partitions in Kafka topic(s) have been lost " + Review Comment: E.g. The default implementation of SparkThrowable is tightly coupled with SparkThrowableHelper which error class reader is tied to the Spark project global one. If 3rd party data source developer decides to (and technically has to) go with different error class json file then the default implementation no longer works. Maybe there is a room for improvement, make utility class/object be extensible. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38911: [SPARK-41387][SS] Assert current end offset from Kafka data source for Trigger.AvailableNow
HeartSaVioR commented on code in PR #38911: URL: https://github.com/apache/spark/pull/38911#discussion_r1042164208 ## connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala: ## @@ -234,6 +235,108 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { assert(index == 3) } + test("Query with Trigger.AvailableNow should throw error when topic partitions got unavailable " + +"during subsequent batches") { +val topic = newTopic() +testUtils.createTopic(topic, partitions = 5) + +testUtils.sendMessages(topic, (0 until 15).map { case x => + s"foo-$x" +}.toArray, Some(0)) + +val reader = spark + .readStream + .format("kafka") + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("kafka.metadata.max.age.ms", "1") + .option("maxOffsetsPerTrigger", 5) + .option("subscribe", topic) + .option("startingOffsets", "earliest") + // the query should fail regardless of this option + .option("failOnDataLoss", "true") + .load() + +def startTriggerAvailableNowQuery(): StreamingQuery = { + reader.writeStream +.foreachBatch((_: Dataset[Row], batchId: Long) => { + testUtils.deleteTopic(topic) + // create partitions less than the kafka data source figured out as an end state + testUtils.createTopic(topic, partitions = 3) + // offset will keep the same + testUtils.sendMessages(topic, (0 until 15).map { case x => +s"foo-$x" + }.toArray, Some(0)) + null.asInstanceOf[Unit] +}) +.trigger(Trigger.AvailableNow) +.start() +} + +val exc = intercept[Exception] { + val query = startTriggerAvailableNowQuery() + try { +assert(query.awaitTermination(streamingTimeout.toMillis)) + } finally { +query.stop() + } +} +TestUtils.assertExceptionMsg(exc, "Some of partitions in Kafka topic(s) have been lost " + Review Comment: New exceptions don't define a new error class. I don't feel like we have constructed the best practice to apply error class framework to "Data sources", especially if it's 3rd party one. (Pretty sure Kafka is built-in, but this is also a reference implementation for 3rd party developers.) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38911: [SPARK-41387][SS] Assert current end offset from Kafka data source for Trigger.AvailableNow
HeartSaVioR commented on code in PR #38911: URL: https://github.com/apache/spark/pull/38911#discussion_r1042164208 ## connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala: ## @@ -234,6 +235,108 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { assert(index == 3) } + test("Query with Trigger.AvailableNow should throw error when topic partitions got unavailable " + +"during subsequent batches") { +val topic = newTopic() +testUtils.createTopic(topic, partitions = 5) + +testUtils.sendMessages(topic, (0 until 15).map { case x => + s"foo-$x" +}.toArray, Some(0)) + +val reader = spark + .readStream + .format("kafka") + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("kafka.metadata.max.age.ms", "1") + .option("maxOffsetsPerTrigger", 5) + .option("subscribe", topic) + .option("startingOffsets", "earliest") + // the query should fail regardless of this option + .option("failOnDataLoss", "true") + .load() + +def startTriggerAvailableNowQuery(): StreamingQuery = { + reader.writeStream +.foreachBatch((_: Dataset[Row], batchId: Long) => { + testUtils.deleteTopic(topic) + // create partitions less than the kafka data source figured out as an end state + testUtils.createTopic(topic, partitions = 3) + // offset will keep the same + testUtils.sendMessages(topic, (0 until 15).map { case x => +s"foo-$x" + }.toArray, Some(0)) + null.asInstanceOf[Unit] +}) +.trigger(Trigger.AvailableNow) +.start() +} + +val exc = intercept[Exception] { + val query = startTriggerAvailableNowQuery() + try { +assert(query.awaitTermination(streamingTimeout.toMillis)) + } finally { +query.stop() + } +} +TestUtils.assertExceptionMsg(exc, "Some of partitions in Kafka topic(s) have been lost " + Review Comment: New exceptions don't define a new error class. I don't feel like we have constructed a best practice to apply error class framework to "Data sources", especially if it's 3rd party one. (Pretty sure Kafka is built-in, but this is also a reference implementation for 3rd party developers.) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38911: [SPARK-41387][SS] Assert current end offset from Kafka data source for Trigger.AvailableNow
HeartSaVioR commented on code in PR #38911: URL: https://github.com/apache/spark/pull/38911#discussion_r1041879197 ## connector/kafka-0-10-sql/src/test/resources/error/kafka-error-classes.json: ## @@ -0,0 +1,26 @@ +{ + "TOPIC_PARTITIONS_IN_END_OFFSET_ARE_NOT_SAME_WITH_PREFETCHED" : { Review Comment: I'm not sure about the doc since this is not a part of Spark's centralized error class. I intended to separate this one for Kafka data source. Kafka data source is considered as a reference implementation hence I wanted to see how 3rd party can integrate Spark's error class framework. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38911: [SPARK-41387][SS] Assert current end offset from Kafka data source for Trigger.AvailableNow
HeartSaVioR commented on code in PR #38911: URL: https://github.com/apache/spark/pull/38911#discussion_r1041866576 ## connector/kafka-0-10-sql/src/test/resources/error/kafka-error-classes.json: ## @@ -0,0 +1,26 @@ +{ + "TOPIC_PARTITIONS_IN_END_OFFSET_ARE_NOT_SAME_WITH_PREFETCHED" : { +"message" : [ + "Kafka data source in Trigger.AvailableNow should provide the same topic partitions in pre-fetched offset to end offset for each microbatch. ", + "topic-partitions for pre-fetched offset: , topic-partitions for end offset: ." +] + }, + "END_OFFSET_HAS_GREATER_OFFSET_FOR_TOPIC_PARTITION_THAN_PREFETCHED" : { +"message" : [ + "For Kafka data source with Trigger.AvailableNow, end offset should have lower or equal offset per each topic partition than pre-fetched offset.", + "pre-fetched offset: , end offset: ." +] + }, + "LOST_TOPIC_PARTITIONS_IN_END_OFFSET_WITH_TRIGGER_AVAILABLENOW" : { +"message" : [ + "Some of partitions in Kafka topic(s) have been lost during running query with Trigger.AvailableNow. Make sure topic partitions are not dropped during the query run.", Review Comment: Yup, maybe it's not a trivial case that users drop (and recreate) a topic which is being read from the running query. Better guidance may be just saying restart your query and it will work well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38911: [SPARK-41387][SS] Assert current end offset from Kafka data source for Trigger.AvailableNow
HeartSaVioR commented on code in PR #38911: URL: https://github.com/apache/spark/pull/38911#discussion_r1041864100 ## connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala: ## @@ -316,6 +320,50 @@ private[kafka010] class KafkaMicroBatchStream( } } + private def assertEndOffsetForTriggerAvailableNow( + endPartitionOffsets: Map[TopicPartition, Long]): Unit = { +val tpsForPrefetched = allDataForTriggerAvailableNow.keySet +val tpsForEndOffset = endPartitionOffsets.keySet + +if (tpsForPrefetched != tpsForEndOffset) { + throw KafkaExceptions.topicPartitionsInEndOffsetAreNotSameWithPrefetched( +tpsForPrefetched, tpsForEndOffset) +} + +val endOffsetHasGreaterThanPrefetched = { + allDataForTriggerAvailableNow.keySet.exists { tp => +val offsetFromPrefetched = allDataForTriggerAvailableNow(tp) +val offsetFromEndOffset = endPartitionOffsets(tp) +offsetFromEndOffset > offsetFromPrefetched + } +} +if (endOffsetHasGreaterThanPrefetched) { + throw KafkaExceptions.endOffsetHasGreaterOffsetForTopicPartitionThanPrefetched( +allDataForTriggerAvailableNow, endPartitionOffsets) +} + +val latestOffsets = kafkaOffsetReader.fetchLatestOffsets(Some(endPartitionOffsets)) Review Comment: It's not safe to just pretend that Kafka resource won't change during the query run. If it happens, it will end up with unexpected behavior e.g. with consumer based grouping, polling from non-exist topic partition may lead to timeout of metadata fetch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38911: [SPARK-41387][SS] Assert current end offset from Kafka data source for Trigger.AvailableNow
HeartSaVioR commented on code in PR #38911: URL: https://github.com/apache/spark/pull/38911#discussion_r1041864100 ## connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala: ## @@ -316,6 +320,50 @@ private[kafka010] class KafkaMicroBatchStream( } } + private def assertEndOffsetForTriggerAvailableNow( + endPartitionOffsets: Map[TopicPartition, Long]): Unit = { +val tpsForPrefetched = allDataForTriggerAvailableNow.keySet +val tpsForEndOffset = endPartitionOffsets.keySet + +if (tpsForPrefetched != tpsForEndOffset) { + throw KafkaExceptions.topicPartitionsInEndOffsetAreNotSameWithPrefetched( +tpsForPrefetched, tpsForEndOffset) +} + +val endOffsetHasGreaterThanPrefetched = { + allDataForTriggerAvailableNow.keySet.exists { tp => +val offsetFromPrefetched = allDataForTriggerAvailableNow(tp) +val offsetFromEndOffset = endPartitionOffsets(tp) +offsetFromEndOffset > offsetFromPrefetched + } +} +if (endOffsetHasGreaterThanPrefetched) { + throw KafkaExceptions.endOffsetHasGreaterOffsetForTopicPartitionThanPrefetched( +allDataForTriggerAvailableNow, endPartitionOffsets) +} + +val latestOffsets = kafkaOffsetReader.fetchLatestOffsets(Some(endPartitionOffsets)) Review Comment: It's not safe to just pretend that Kafka resource won't change during the query run. If it happens, it will end up with unexpected behavior e.g. consumer based grouping, polling from non-exist topic partition may lead to timeout of metadata fetch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38911: [SPARK-41387][SS] Assert current end offset from Kafka data source for Trigger.AvailableNow
HeartSaVioR commented on code in PR #38911: URL: https://github.com/apache/spark/pull/38911#discussion_r1041860498 ## connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala: ## @@ -316,6 +320,50 @@ private[kafka010] class KafkaMicroBatchStream( } } + private def assertEndOffsetForTriggerAvailableNow( + endPartitionOffsets: Map[TopicPartition, Long]): Unit = { +val tpsForPrefetched = allDataForTriggerAvailableNow.keySet +val tpsForEndOffset = endPartitionOffsets.keySet + +if (tpsForPrefetched != tpsForEndOffset) { + throw KafkaExceptions.topicPartitionsInEndOffsetAreNotSameWithPrefetched( +tpsForPrefetched, tpsForEndOffset) +} + +val endOffsetHasGreaterThanPrefetched = { + allDataForTriggerAvailableNow.keySet.exists { tp => +val offsetFromPrefetched = allDataForTriggerAvailableNow(tp) +val offsetFromEndOffset = endPartitionOffsets(tp) +offsetFromEndOffset > offsetFromPrefetched + } +} +if (endOffsetHasGreaterThanPrefetched) { + throw KafkaExceptions.endOffsetHasGreaterOffsetForTopicPartitionThanPrefetched( +allDataForTriggerAvailableNow, endPartitionOffsets) +} + +val latestOffsets = kafkaOffsetReader.fetchLatestOffsets(Some(endPartitionOffsets)) Review Comment: If we don't fetch the latest offset with latest topic-partitions from Kafka again, what we are trying to guard against? If someone turns on failOnDataLoss and runs the query with Trigger.AvailableNow, it will just move on when specific topic / partition is dropped, leading that it never reaches the end state (prepared offset). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38911: [SPARK-41387][SS] Assert current end offset from Kafka data source for Trigger.AvailableNow
HeartSaVioR commented on code in PR #38911: URL: https://github.com/apache/spark/pull/38911#discussion_r1041860498 ## connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala: ## @@ -316,6 +320,50 @@ private[kafka010] class KafkaMicroBatchStream( } } + private def assertEndOffsetForTriggerAvailableNow( + endPartitionOffsets: Map[TopicPartition, Long]): Unit = { +val tpsForPrefetched = allDataForTriggerAvailableNow.keySet +val tpsForEndOffset = endPartitionOffsets.keySet + +if (tpsForPrefetched != tpsForEndOffset) { + throw KafkaExceptions.topicPartitionsInEndOffsetAreNotSameWithPrefetched( +tpsForPrefetched, tpsForEndOffset) +} + +val endOffsetHasGreaterThanPrefetched = { + allDataForTriggerAvailableNow.keySet.exists { tp => +val offsetFromPrefetched = allDataForTriggerAvailableNow(tp) +val offsetFromEndOffset = endPartitionOffsets(tp) +offsetFromEndOffset > offsetFromPrefetched + } +} +if (endOffsetHasGreaterThanPrefetched) { + throw KafkaExceptions.endOffsetHasGreaterOffsetForTopicPartitionThanPrefetched( +allDataForTriggerAvailableNow, endPartitionOffsets) +} + +val latestOffsets = kafkaOffsetReader.fetchLatestOffsets(Some(endPartitionOffsets)) Review Comment: If we don't fetch the latest offset with latest topic-partitions from Kafka again, what we are trying to guard against? If someone turns on failOnDataLoss and runs the query with Trigger.AvailableNow, it will just move on when specific topic partition is dropped, leading that it never reaches the end state (prepared offset). Same for topic recreation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38911: [SPARK-41387][SS] Assert current end offset from Kafka data source for Trigger.AvailableNow
HeartSaVioR commented on code in PR #38911: URL: https://github.com/apache/spark/pull/38911#discussion_r1041860498 ## connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala: ## @@ -316,6 +320,50 @@ private[kafka010] class KafkaMicroBatchStream( } } + private def assertEndOffsetForTriggerAvailableNow( + endPartitionOffsets: Map[TopicPartition, Long]): Unit = { +val tpsForPrefetched = allDataForTriggerAvailableNow.keySet +val tpsForEndOffset = endPartitionOffsets.keySet + +if (tpsForPrefetched != tpsForEndOffset) { + throw KafkaExceptions.topicPartitionsInEndOffsetAreNotSameWithPrefetched( +tpsForPrefetched, tpsForEndOffset) +} + +val endOffsetHasGreaterThanPrefetched = { + allDataForTriggerAvailableNow.keySet.exists { tp => +val offsetFromPrefetched = allDataForTriggerAvailableNow(tp) +val offsetFromEndOffset = endPartitionOffsets(tp) +offsetFromEndOffset > offsetFromPrefetched + } +} +if (endOffsetHasGreaterThanPrefetched) { + throw KafkaExceptions.endOffsetHasGreaterOffsetForTopicPartitionThanPrefetched( +allDataForTriggerAvailableNow, endPartitionOffsets) +} + +val latestOffsets = kafkaOffsetReader.fetchLatestOffsets(Some(endPartitionOffsets)) Review Comment: If we don't fetch the latest offset with latest topic-partitions from Kafka again, what we are trying to guard against? If someone turns on failOnDataLoss and runs the query with Trigger.AvailableNow, it will just move on when specific topic partition is dropped, leading that it never reaches the end state (prepared offset). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38911: [SPARK-41387][SS] Assert current end offset from Kafka data source for Trigger.AvailableNow
HeartSaVioR commented on code in PR #38911: URL: https://github.com/apache/spark/pull/38911#discussion_r1041856552 ## connector/kafka-0-10-sql/src/test/resources/error/kafka-error-classes.json: ## @@ -0,0 +1,26 @@ +{ + "TOPIC_PARTITIONS_IN_END_OFFSET_ARE_NOT_SAME_WITH_PREFETCHED" : { +"message" : [ + "Kafka data source in Trigger.AvailableNow should provide the same topic partitions in pre-fetched offset to end offset for each microbatch. ", Review Comment: We are guarding against arbitrary bugs so there is no 100% guarantee that this would only happen transiently, but I agree that restarting the query would mitigate the issue in most cases. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38911: [SPARK-41387][SS] Assert current end offset from Kafka data source for Trigger.AvailableNow
HeartSaVioR commented on code in PR #38911: URL: https://github.com/apache/spark/pull/38911#discussion_r1041779007 ## connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala: ## @@ -316,6 +320,54 @@ private[kafka010] class KafkaMicroBatchStream( } } + private def assertEndOffsetForTriggerAvailableNow( + endPartitionOffsets: Map[TopicPartition, Long]): Unit = { +val tpsForPrefetched = allDataForTriggerAvailableNow.keySet +val tpsForEndOffset = endPartitionOffsets.keySet +assert(tpsForPrefetched == tpsForEndOffset, + "Kafka data source in Trigger.AvailableNow should provide the same topic partitions in " + +"pre-fetched offset to end offset for each microbatch. " + +s"topic-partitions for pre-fetched offset: $tpsForPrefetched, " + +s"topic-partitions for end offset: $tpsForEndOffset.") + +val endOffsetHasGreaterOrEqualOffsetComparedToPrefetched = { + allDataForTriggerAvailableNow.keySet.forall { tp => +val offsetFromPrefetched = allDataForTriggerAvailableNow(tp) +val offsetFromEndOffset = endPartitionOffsets(tp) +offsetFromEndOffset <= offsetFromPrefetched + } +} +assert(endOffsetHasGreaterOrEqualOffsetComparedToPrefetched, Review Comment: I just dealt with error class framework (separate one for Kafka data source) as well as making errors to non-internal (user-facing). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38911: [SPARK-41387][SS] Assert current end offset from Kafka data source for Trigger.AvailableNow
HeartSaVioR commented on code in PR #38911: URL: https://github.com/apache/spark/pull/38911#discussion_r1041651140 ## connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala: ## @@ -316,6 +320,54 @@ private[kafka010] class KafkaMicroBatchStream( } } + private def assertEndOffsetForTriggerAvailableNow( + endPartitionOffsets: Map[TopicPartition, Long]): Unit = { +val tpsForPrefetched = allDataForTriggerAvailableNow.keySet +val tpsForEndOffset = endPartitionOffsets.keySet +assert(tpsForPrefetched == tpsForEndOffset, + "Kafka data source in Trigger.AvailableNow should provide the same topic partitions in " + +"pre-fetched offset to end offset for each microbatch. " + +s"topic-partitions for pre-fetched offset: $tpsForPrefetched, " + +s"topic-partitions for end offset: $tpsForEndOffset.") + +val endOffsetHasGreaterOrEqualOffsetComparedToPrefetched = { + allDataForTriggerAvailableNow.keySet.forall { tp => +val offsetFromPrefetched = allDataForTriggerAvailableNow(tp) +val offsetFromEndOffset = endPartitionOffsets(tp) +offsetFromEndOffset <= offsetFromPrefetched + } +} +assert(endOffsetHasGreaterOrEqualOffsetComparedToPrefetched, Review Comment: Yeah... it's a bit tricky. The initial rationalization of the assertion "was" to point out the bug quickly and let the streaming query fail fast rather than running infinitely. But it is also the user who can make any arbitrary change against topic partition externally during the run of Trigger.AvailableNow and mess the query. So we have actually two different audiences. If we consider the cases only for the possible bugs we would need to leave this as it is, so that this is considered as "INTERNAL ERROR". (Not sure we have to go with error framework for this case as well. Maybe @MaxGekk ?) If not, we should probably change the error as leveraging error framework and not mark this as internal error. My feeling is that it'd be rare for users to modify the topic during the query run so it still makes sense to target to internal first, but I'm OK either way. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38911: [SPARK-41387][SS] Assert current end offset from Kafka data source for Trigger.AvailableNow
HeartSaVioR commented on code in PR #38911: URL: https://github.com/apache/spark/pull/38911#discussion_r1041067010 ## connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala: ## @@ -349,6 +354,54 @@ private[kafka010] class KafkaSource( } } + private def assertEndOffsetForTriggerAvailableNow( Review Comment: In the end we want to remove DSv1 implementation of Kafka data source. (There is a semantic issue on batch side.) Till that time, I'd like to duplicate code, like we have been doing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org