Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/20096#discussion_r159985929
--- Diff:
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
---
@@ -145,6 +149,19 @@ private[kafka010] class KafkaOffsetReader(
}
}
+ partitionOffsets.foreach {
+ case (tp, off) if off != KafkaOffsetRangeLimit.LATEST &&
+ off != KafkaOffsetRangeLimit.EARLIEST =>
+ if (fetched(tp) != off) {
+ reportDataLoss(
+ s"startingOffsets for $tp was $off but consumer reset to
${fetched(tp)}")
+ }
+ case _ =>
+ // no real way to check that beginning or end is reasonable
--- End diff --
nit: wrong indent
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]