Github user zsxwing commented on a diff in the pull request:
https://github.com/apache/spark/pull/22042#discussion_r211786183
--- Diff:
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
---
@@ -31,22 +31,21 @@ import
org.apache.spark.sql.kafka010.KafkaDataConsumer.AvailableOffsetRange
import org.apache.spark.sql.kafka010.KafkaSourceProvider._
import org.apache.spark.util.UninterruptibleThread
-/**
- * An exception to indicate there is a missing offset in the records
returned by Kafka consumer.
- * This means it's either a transaction (commit or abort) marker, or an
aborted message if
- * "isolation.level" is "read_committed". The offsets in the range
[offset, nextOffsetToFetch) are
- * missing. In order words, `nextOffsetToFetch` indicates the next offset
to fetch.
- */
-private[kafka010] class MissingOffsetException(
- val offset: Long,
- val nextOffsetToFetch: Long) extends Exception(
- s"Offset $offset is missing. The next offset to fetch is:
$nextOffsetToFetch")
-
private[kafka010] sealed trait KafkaDataConsumer {
/**
- * Get the record for the given offset if available. Otherwise it will
either throw error
- * (if failOnDataLoss = true), or return the next available offset
within [offset, untilOffset),
- * or null.
+ * Get the record for the given offset if available.
+ *
+ * If the record is invisible (either a
+ * transaction message, or an aborted message when the consumer's
`isolation.level` is
+ * `read_committed`), it will be skipped and this method will try to
fetch next available record
+ * within [offset, untilOffset).
+ *
+ * This method also will try the best to detect data loss. If
`failOnDataLoss` is `false`, it will
--- End diff --
Good catch
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]