HeartSaVioR commented on code in PR #38911:
URL: https://github.com/apache/spark/pull/38911#discussion_r1041651140
##########
connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala:
##########
@@ -316,6 +320,54 @@ private[kafka010] class KafkaMicroBatchStream(
}
}
+ private def assertEndOffsetForTriggerAvailableNow(
+ endPartitionOffsets: Map[TopicPartition, Long]): Unit = {
+ val tpsForPrefetched = allDataForTriggerAvailableNow.keySet
+ val tpsForEndOffset = endPartitionOffsets.keySet
+ assert(tpsForPrefetched == tpsForEndOffset,
+ "Kafka data source in Trigger.AvailableNow should provide the same topic
partitions in " +
+ "pre-fetched offset to end offset for each microbatch. " +
+ s"topic-partitions for pre-fetched offset: $tpsForPrefetched, " +
+ s"topic-partitions for end offset: $tpsForEndOffset.")
+
+ val endOffsetHasGreaterOrEqualOffsetComparedToPrefetched = {
+ allDataForTriggerAvailableNow.keySet.forall { tp =>
+ val offsetFromPrefetched = allDataForTriggerAvailableNow(tp)
+ val offsetFromEndOffset = endPartitionOffsets(tp)
+ offsetFromEndOffset <= offsetFromPrefetched
+ }
+ }
+ assert(endOffsetHasGreaterOrEqualOffsetComparedToPrefetched,
Review Comment:
Yeah... it's a bit tricky. The initial rationalization of the assertion
"was" to point out the bug quickly and let the streaming query fail fast rather
than running infinitely. But it is also the user who can make any arbitrary
change against topic partition externally during the run of
Trigger.AvailableNow and mess the query.
So we have actually two different audiences. If we consider the cases only
for the possible bugs we would need to leave this as it is, so that this is
considered as "INTERNAL ERROR". (Not sure we have to go with error framework
for this case as well. Maybe @MaxGekk ?) If not, we should probably change the
error as leveraging error framework and not mark this as internal error.
My feeling is that it'd be rare for users to modify the topic during the
query run so it still makes sense to target to internal first, but I'm OK
either way.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]