[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38911: [SPARK-41387][SS] Add defensive assertions to Kafka data source for Trigger.AvailableNow
HeartSaVioR commented on code in PR #38911: URL: https://github.com/apache/spark/pull/38911#discussion_r1040675168 ## connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala: ## @@ -316,6 +320,26 @@ private[kafka010] class KafkaMicroBatchStream( } } + private def assertEndOffsetForTriggerAvailableNow( + endPartitionOffsets: Map[TopicPartition, Long]): Unit = { +val tpsForPrefetched = allDataForTriggerAvailableNow.keySet +val tpsForEndOffset = endPartitionOffsets.keySet +assert(tpsForPrefetched == tpsForEndOffset, + "Kafka data source in Trigger.AvailableNow should provide the same topic partitions in " + +"pre-fetched offset to end offset for each microbatch. " + +s"topic-partitions for pre-fetched offset: $tpsForPrefetched, " + +s"topic-partitions for end offset: $tpsForEndOffset.") + +assert(allDataForTriggerAvailableNow.keySet.forall { tp => Review Comment: And this never fails with current implementation of Trigger.AvailableNow (once it figures out the final end offset, it never looks at the available offset range in real topic) hence it's not feasible to create a test for this. This is to identify future bugs quickly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38911: [SPARK-41387][SS] Add defensive assertions to Kafka data source for Trigger.AvailableNow
HeartSaVioR commented on code in PR #38911: URL: https://github.com/apache/spark/pull/38911#discussion_r1040675168 ## connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala: ## @@ -316,6 +320,26 @@ private[kafka010] class KafkaMicroBatchStream( } } + private def assertEndOffsetForTriggerAvailableNow( + endPartitionOffsets: Map[TopicPartition, Long]): Unit = { +val tpsForPrefetched = allDataForTriggerAvailableNow.keySet +val tpsForEndOffset = endPartitionOffsets.keySet +assert(tpsForPrefetched == tpsForEndOffset, + "Kafka data source in Trigger.AvailableNow should provide the same topic partitions in " + +"pre-fetched offset to end offset for each microbatch. " + +s"topic-partitions for pre-fetched offset: $tpsForPrefetched, " + +s"topic-partitions for end offset: $tpsForEndOffset.") + +assert(allDataForTriggerAvailableNow.keySet.forall { tp => Review Comment: And this never fails with current implementation of Trigger.AvailableNow hence it's not feasible to create a test for this. This is to identify future bugs quickly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38911: [SPARK-41387][SS] Add defensive assertions to Kafka data source for Trigger.AvailableNow
HeartSaVioR commented on code in PR #38911: URL: https://github.com/apache/spark/pull/38911#discussion_r1040641431 ## connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala: ## @@ -316,6 +320,26 @@ private[kafka010] class KafkaMicroBatchStream( } } + private def assertEndOffsetForTriggerAvailableNow( + endPartitionOffsets: Map[TopicPartition, Long]): Unit = { +val tpsForPrefetched = allDataForTriggerAvailableNow.keySet +val tpsForEndOffset = endPartitionOffsets.keySet +assert(tpsForPrefetched == tpsForEndOffset, + "Kafka data source in Trigger.AvailableNow should provide the same topic partitions in " + +"pre-fetched offset to end offset for each microbatch. " + +s"topic-partitions for pre-fetched offset: $tpsForPrefetched, " + +s"topic-partitions for end offset: $tpsForEndOffset.") + +assert(allDataForTriggerAvailableNow.keySet.forall { tp => Review Comment: The check is also done in KafkaOffsetReader.getOffsetRangesFromResolvedOffsets, but we do not fail the query when `failondataloss` is set to `true`. We do this specifically for Trigger.AvailableNow since we want to let the query fail even though users specify the option `failondataloss` to `true`. ## connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala: ## @@ -316,6 +320,26 @@ private[kafka010] class KafkaMicroBatchStream( } } + private def assertEndOffsetForTriggerAvailableNow( + endPartitionOffsets: Map[TopicPartition, Long]): Unit = { +val tpsForPrefetched = allDataForTriggerAvailableNow.keySet +val tpsForEndOffset = endPartitionOffsets.keySet +assert(tpsForPrefetched == tpsForEndOffset, + "Kafka data source in Trigger.AvailableNow should provide the same topic partitions in " + +"pre-fetched offset to end offset for each microbatch. " + +s"topic-partitions for pre-fetched offset: $tpsForPrefetched, " + +s"topic-partitions for end offset: $tpsForEndOffset.") + +assert(allDataForTriggerAvailableNow.keySet.forall { tp => Review Comment: The check is also done in `KafkaOffsetReader.getOffsetRangesFromResolvedOffsets`, but we do not fail the query when `failondataloss` is set to `true`. We do this specifically for Trigger.AvailableNow since we want to let the query fail even though users specify the option `failondataloss` to `true`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org