HeartSaVioR commented on code in PR #38911:
URL: https://github.com/apache/spark/pull/38911#discussion_r1040675168


##########
connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala:
##########
@@ -316,6 +320,26 @@ private[kafka010] class KafkaMicroBatchStream(
     }
   }
 
+  private def assertEndOffsetForTriggerAvailableNow(
+      endPartitionOffsets: Map[TopicPartition, Long]): Unit = {
+    val tpsForPrefetched = allDataForTriggerAvailableNow.keySet
+    val tpsForEndOffset = endPartitionOffsets.keySet
+    assert(tpsForPrefetched == tpsForEndOffset,
+      "Kafka data source in Trigger.AvailableNow should provide the same topic 
partitions in " +
+        "pre-fetched offset to end offset for each microbatch. " +
+        s"topic-partitions for pre-fetched offset: $tpsForPrefetched, " +
+        s"topic-partitions for end offset: $tpsForEndOffset.")
+
+    assert(allDataForTriggerAvailableNow.keySet.forall { tp =>

Review Comment:
   And this never fails with current implementation of Trigger.AvailableNow 
(once it figures out the final end offset, it never looks at the available 
offset range in real topic) hence it's not feasible to create a test for this. 
This is to identify future bugs quickly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to