Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/22042#discussion_r212521083
--- Diff:
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
---
@@ -239,56 +335,74 @@ private[kafka010] case class InternalKafkaConsumer(
}
/**
- * Get the record for the given offset if available. Otherwise it will
either throw error
- * (if failOnDataLoss = true), or return the next available offset
within [offset, untilOffset),
- * or null.
+ * Get the fetched record for the given offset if available.
+ *
+ * If the record is invisible (either a transaction message, or an
aborted message when the
+ * consumer's `isolation.level` is `read_committed`), it will return a
`FetchedRecord` with the
+ * next offset to fetch.
+ *
+ * This method also will try the best to detect data loss. If
`failOnDataLoss` is true`, it will
+ * throw an exception when we detect an unavailable offset. If
`failOnDataLoss` is `false`, this
+ * method will return `null` if the next available record is within
[offset, untilOffset).
*
* @throws OffsetOutOfRangeException if `offset` is out of range
* @throws TimeoutException if cannot fetch the record in
`pollTimeoutMs` milliseconds.
*/
- private def fetchData(
+ private def fetchRecord(
offset: Long,
untilOffset: Long,
pollTimeoutMs: Long,
- failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] =
{
- if (offset != nextOffsetInFetchedData || !fetchedData.hasNext()) {
- // This is the first fetch, or the last pre-fetched data has been
drained.
- // Seek to the offset because we may call seekToBeginning or
seekToEnd before this.
- seek(offset)
- poll(pollTimeoutMs)
- }
-
- if (!fetchedData.hasNext()) {
- // We cannot fetch anything after `poll`. Two possible cases:
- // - `offset` is out of range so that Kafka returns nothing. Just
throw
- // `OffsetOutOfRangeException` to let the caller handle it.
- // - Cannot fetch any data before timeout. TimeoutException will be
thrown.
- val range = getAvailableOffsetRange()
- if (offset < range.earliest || offset >= range.latest) {
- throw new OffsetOutOfRangeException(
- Map(topicPartition -> java.lang.Long.valueOf(offset)).asJava)
+ failOnDataLoss: Boolean): FetchedRecord = {
+ if (offset != fetchedData.nextOffsetInFetchedData) {
+ // This is the first fetch, or the fetched data has been reset.
+ // Fetch records from Kafka and update `fetchedData`.
+ fetchData(offset, pollTimeoutMs)
+ } else if (!fetchedData.hasNext) { // The last pre-fetched data has
been drained.
+ if (offset < fetchedData.offsetAfterPoll) {
+ // Offsets in [offset, fetchedData.offsetAfterPoll) are invisible.
Return a record to ask
+ // the next call to start from `fetchedData.offsetAfterPoll`.
+ fetchedData.reset()
+ return fetchedRecord.withRecord(null, fetchedData.offsetAfterPoll)
} else {
- throw new TimeoutException(
- s"Cannot fetch record for offset $offset in $pollTimeoutMs
milliseconds")
+ // Fetch records from Kafka and update `fetchedData`.
+ fetchData(offset, pollTimeoutMs)
}
+ }
+
+ if (!fetchedData.hasNext) {
+ // When we reach here, we have already tried to poll from Kafka. As
`fetchedData` is still
+ // empty, all messages in [offset, fetchedData.offsetAfterPoll) are
invisible. Return a
+ // record to ask the next call to start from
`fetchedData.offsetAfterPoll`.
+ assert(offset <= fetchedData.offsetAfterPoll,
+ s"seek to $offset and poll but the offset was reset to
${fetchedData.offsetAfterPoll}")
+ fetchedRecord.withRecord(null, fetchedData.offsetAfterPoll)
} else {
val record = fetchedData.next()
- nextOffsetInFetchedData = record.offset + 1
// In general, Kafka uses the specified offset as the start point,
and tries to fetch the next
// available offset. Hence we need to handle offset mismatch.
if (record.offset > offset) {
+ val range = getAvailableOffsetRange()
+ if (range.earliest <= offset) {
+ // `offset` is still valid but the corresponding message is
invisible. We should skip it
+ // and jump to `record.offset`. Here we move `fetchedData` back
so that the next call of
+ // `fetchRecord` can just return `record` directly.
+ fetchedData.previous()
+ return fetchedRecord.withRecord(null, record.offset)
+ }
// This may happen when some records aged out but their offsets
already got verified
if (failOnDataLoss) {
reportDataLoss(true, s"Cannot fetch records in [$offset,
${record.offset})")
// Never happen as "reportDataLoss" will throw an exception
- null
+ throw new IllegalStateException(
+ "reportDataLoss didn't throw an exception when
'failOnDataLoss' is true")
} else {
if (record.offset >= untilOffset) {
reportDataLoss(false, s"Skip missing records in [$offset,
$untilOffset)")
- null
+ // Set `nextOffsetToFetch` to `untilOffset` to finish the
current batch.
+ fetchedRecord.withRecord(null, untilOffset)
} else {
reportDataLoss(false, s"Skip missing records in [$offset,
${record.offset})")
- record
+ fetchedRecord.withRecord(record,
fetchedData.nextOffsetInFetchedData)
}
--- End diff --
nit: This can be unnested.
if ... else { if ... else ... } -> if ... else if .. else
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]