[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38911: [SPARK-41387][SS] Add defensive assertions to Kafka data source for Trigger.AvailableNow

2022-12-06 Thread GitBox


HeartSaVioR commented on code in PR #38911:
URL: https://github.com/apache/spark/pull/38911#discussion_r1040675168


##
connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala:
##
@@ -316,6 +320,26 @@ private[kafka010] class KafkaMicroBatchStream(
 }
   }
 
+  private def assertEndOffsetForTriggerAvailableNow(
+  endPartitionOffsets: Map[TopicPartition, Long]): Unit = {
+val tpsForPrefetched = allDataForTriggerAvailableNow.keySet
+val tpsForEndOffset = endPartitionOffsets.keySet
+assert(tpsForPrefetched == tpsForEndOffset,
+  "Kafka data source in Trigger.AvailableNow should provide the same topic 
partitions in " +
+"pre-fetched offset to end offset for each microbatch. " +
+s"topic-partitions for pre-fetched offset: $tpsForPrefetched, " +
+s"topic-partitions for end offset: $tpsForEndOffset.")
+
+assert(allDataForTriggerAvailableNow.keySet.forall { tp =>

Review Comment:
   And this never fails with current implementation of Trigger.AvailableNow 
(once it figures out the final end offset, it never looks at the available 
offset range in real topic) hence it's not feasible to create a test for this. 
This is to identify future bugs quickly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38911: [SPARK-41387][SS] Add defensive assertions to Kafka data source for Trigger.AvailableNow

2022-12-06 Thread GitBox


HeartSaVioR commented on code in PR #38911:
URL: https://github.com/apache/spark/pull/38911#discussion_r1040675168


##
connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala:
##
@@ -316,6 +320,26 @@ private[kafka010] class KafkaMicroBatchStream(
 }
   }
 
+  private def assertEndOffsetForTriggerAvailableNow(
+  endPartitionOffsets: Map[TopicPartition, Long]): Unit = {
+val tpsForPrefetched = allDataForTriggerAvailableNow.keySet
+val tpsForEndOffset = endPartitionOffsets.keySet
+assert(tpsForPrefetched == tpsForEndOffset,
+  "Kafka data source in Trigger.AvailableNow should provide the same topic 
partitions in " +
+"pre-fetched offset to end offset for each microbatch. " +
+s"topic-partitions for pre-fetched offset: $tpsForPrefetched, " +
+s"topic-partitions for end offset: $tpsForEndOffset.")
+
+assert(allDataForTriggerAvailableNow.keySet.forall { tp =>

Review Comment:
   And this never fails with current implementation of Trigger.AvailableNow 
hence it's not feasible to create a test for this. This is to identify future 
bugs quickly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38911: [SPARK-41387][SS] Add defensive assertions to Kafka data source for Trigger.AvailableNow

2022-12-06 Thread GitBox


HeartSaVioR commented on code in PR #38911:
URL: https://github.com/apache/spark/pull/38911#discussion_r1040641431


##
connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala:
##
@@ -316,6 +320,26 @@ private[kafka010] class KafkaMicroBatchStream(
 }
   }
 
+  private def assertEndOffsetForTriggerAvailableNow(
+  endPartitionOffsets: Map[TopicPartition, Long]): Unit = {
+val tpsForPrefetched = allDataForTriggerAvailableNow.keySet
+val tpsForEndOffset = endPartitionOffsets.keySet
+assert(tpsForPrefetched == tpsForEndOffset,
+  "Kafka data source in Trigger.AvailableNow should provide the same topic 
partitions in " +
+"pre-fetched offset to end offset for each microbatch. " +
+s"topic-partitions for pre-fetched offset: $tpsForPrefetched, " +
+s"topic-partitions for end offset: $tpsForEndOffset.")
+
+assert(allDataForTriggerAvailableNow.keySet.forall { tp =>

Review Comment:
   The check is also done in 
KafkaOffsetReader.getOffsetRangesFromResolvedOffsets, but we do not fail the 
query when `failondataloss` is set to `true`. We do this specifically for 
Trigger.AvailableNow since we want to let the query fail even though users 
specify the option `failondataloss` to `true`.



##
connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala:
##
@@ -316,6 +320,26 @@ private[kafka010] class KafkaMicroBatchStream(
 }
   }
 
+  private def assertEndOffsetForTriggerAvailableNow(
+  endPartitionOffsets: Map[TopicPartition, Long]): Unit = {
+val tpsForPrefetched = allDataForTriggerAvailableNow.keySet
+val tpsForEndOffset = endPartitionOffsets.keySet
+assert(tpsForPrefetched == tpsForEndOffset,
+  "Kafka data source in Trigger.AvailableNow should provide the same topic 
partitions in " +
+"pre-fetched offset to end offset for each microbatch. " +
+s"topic-partitions for pre-fetched offset: $tpsForPrefetched, " +
+s"topic-partitions for end offset: $tpsForEndOffset.")
+
+assert(allDataForTriggerAvailableNow.keySet.forall { tp =>

Review Comment:
   The check is also done in 
`KafkaOffsetReader.getOffsetRangesFromResolvedOffsets`, but we do not fail the 
query when `failondataloss` is set to `true`. We do this specifically for 
Trigger.AvailableNow since we want to let the query fail even though users 
specify the option `failondataloss` to `true`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org