zsxwing closed pull request #23365: [SPARK-26267][SS] Retry when detecting
incorrect offsets from Kafka (2.4)
URL: https://github.com/apache/spark/pull/23365
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
index 8ce56a249622d..561d501359321 100644
---
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
+++
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
@@ -73,7 +73,7 @@ class KafkaContinuousReader(
offset = start.orElse {
val offsets = initialOffsets match {
case EarliestOffsetRangeLimit =>
KafkaSourceOffset(offsetReader.fetchEarliestOffsets())
- case LatestOffsetRangeLimit =>
KafkaSourceOffset(offsetReader.fetchLatestOffsets())
+ case LatestOffsetRangeLimit =>
KafkaSourceOffset(offsetReader.fetchLatestOffsets(None))
case SpecificOffsetRangeLimit(p) =>
offsetReader.fetchSpecificOffsets(p, reportDataLoss)
}
logInfo(s"Initial offsets: $offsets")
@@ -128,7 +128,7 @@ class KafkaContinuousReader(
}
override def needsReconfiguration(): Boolean = {
- knownPartitions != null && offsetReader.fetchLatestOffsets().keySet !=
knownPartitions
+ knownPartitions != null && offsetReader.fetchLatestOffsets(None).keySet !=
knownPartitions
}
override def toString(): String = s"KafkaSource[$offsetReader]"
diff --git
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
index 8cc989fce1976..b6c803545578d 100644
---
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
+++
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
@@ -93,7 +93,8 @@ private[kafka010] class KafkaMicroBatchReader(
endPartitionOffsets = Option(end.orElse(null))
.map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets)
.getOrElse {
- val latestPartitionOffsets = kafkaOffsetReader.fetchLatestOffsets()
+ val latestPartitionOffsets =
+ kafkaOffsetReader.fetchLatestOffsets(Some(startPartitionOffsets))
maxOffsetsPerTrigger.map { maxOffsets =>
rateLimit(maxOffsets, startPartitionOffsets,
latestPartitionOffsets)
}.getOrElse {
@@ -132,10 +133,21 @@ private[kafka010] class KafkaMicroBatchReader(
}.toSeq
logDebug("TopicPartitions: " + topicPartitions.mkString(", "))
+ val fromOffsets = startPartitionOffsets ++ newPartitionInitialOffsets
+ val untilOffsets = endPartitionOffsets
+ untilOffsets.foreach { case (tp, untilOffset) =>
+ fromOffsets.get(tp).foreach { fromOffset =>
+ if (untilOffset < fromOffset) {
+ reportDataLoss(s"Partition $tp's offset was changed from " +
+ s"$fromOffset to $untilOffset, some data may have been missed")
+ }
+ }
+ }
+
// Calculate offset ranges
val offsetRanges = rangeCalculator.getRanges(
- fromOffsets = startPartitionOffsets ++ newPartitionInitialOffsets,
- untilOffsets = endPartitionOffsets,
+ fromOffsets = fromOffsets,
+ untilOffsets = untilOffsets,
executorLocations = getSortedExecutorList())
// Reuse Kafka consumers only when all the offset ranges have distinct
TopicPartitions,
@@ -192,7 +204,7 @@ private[kafka010] class KafkaMicroBatchReader(
case EarliestOffsetRangeLimit =>
KafkaSourceOffset(kafkaOffsetReader.fetchEarliestOffsets())
case LatestOffsetRangeLimit =>
- KafkaSourceOffset(kafkaOffsetReader.fetchLatestOffsets())
+ KafkaSourceOffset(kafkaOffsetReader.fetchLatestOffsets(None))
case SpecificOffsetRangeLimit(p) =>
kafkaOffsetReader.fetchSpecificOffsets(p, reportDataLoss)
}
diff --git
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala
index fb209c724afba..6008794924052 100644
---
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala
+++
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala
@@ -37,6 +37,8 @@ private[kafka010] class KafkaOffsetRangeCalculator(val
minPartitions: Option[Int
* the read tasks of the skewed partitions to multiple Spark tasks.
* The number of Spark tasks will be *approximately* `numPartitions`. It can
be less or more
* depending on rounding errors or Kafka partitions that didn't receive any
new data.
+ *
+ * Empty ranges (`KafkaOffsetRange.size <= 0`) will be dropped.
*/
def getRanges(
fromOffsets: PartitionOffsetMap,
diff --git
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
index 82066697cb95a..fc443d22bf5a2 100644
---
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
+++
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
@@ -21,6 +21,7 @@ import java.{util => ju}
import java.util.concurrent.{Executors, ThreadFactory}
import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration.Duration
import scala.util.control.NonFatal
@@ -137,6 +138,12 @@ private[kafka010] class KafkaOffsetReader(
// Poll to get the latest assigned partitions
consumer.poll(0)
val partitions = consumer.assignment()
+
+ // Call `position` to wait until the potential offset request
triggered by `poll(0)` is
+ // done. This is a workaround for KAFKA-7703, which an async
`seekToBeginning` triggered by
+ // `poll(0)` may reset offsets that should have been set by another
request.
+ partitions.asScala.map(p => p -> consumer.position(p)).foreach(_ => {})
+
consumer.pause(partitions)
assert(partitions.asScala == partitionOffsets.keySet,
"If startingOffsets contains specific offsets, you must specify all
TopicPartitions.\n" +
@@ -192,19 +199,82 @@ private[kafka010] class KafkaOffsetReader(
/**
* Fetch the latest offsets for the topic partitions that are indicated
* in the [[ConsumerStrategy]].
+ *
+ * Kafka may return earliest offsets when we are requesting latest offsets
if `poll` is called
+ * right before `seekToEnd` (KAFKA-7703). As a workaround, we will call
`position` right after
+ * `poll` to wait until the potential offset request triggered by `poll(0)`
is done.
+ *
+ * In addition, to avoid other unknown issues, we also use the given
`knownOffsets` to audit the
+ * latest offsets returned by Kafka. If we find some incorrect offsets (a
latest offset is less
+ * than an offset in `knownOffsets`), we will retry at most
`maxOffsetFetchAttempts` times. When
+ * a topic is recreated, the latest offsets may be less than offsets in
`knownOffsets`. We cannot
+ * distinguish this with KAFKA-7703, so we just return whatever we get from
Kafka after retrying.
*/
- def fetchLatestOffsets(): Map[TopicPartition, Long] = runUninterruptibly {
+ def fetchLatestOffsets(
+ knownOffsets: Option[PartitionOffsetMap]): PartitionOffsetMap =
runUninterruptibly {
withRetriesWithoutInterrupt {
// Poll to get the latest assigned partitions
consumer.poll(0)
val partitions = consumer.assignment()
+
+ // Call `position` to wait until the potential offset request triggered
by `poll(0)` is
+ // done. This is a workaround for KAFKA-7703, which an async
`seekToBeginning` triggered by
+ // `poll(0)` may reset offsets that should have been set by another
request.
+ partitions.asScala.map(p => p -> consumer.position(p)).foreach(_ => {})
+
consumer.pause(partitions)
logDebug(s"Partitions assigned to consumer: $partitions. Seeking to the
end.")
- consumer.seekToEnd(partitions)
- val partitionOffsets = partitions.asScala.map(p => p ->
consumer.position(p)).toMap
- logDebug(s"Got latest offsets for partition : $partitionOffsets")
- partitionOffsets
+ if (knownOffsets.isEmpty) {
+ consumer.seekToEnd(partitions)
+ partitions.asScala.map(p => p -> consumer.position(p)).toMap
+ } else {
+ var partitionOffsets: PartitionOffsetMap = Map.empty
+
+ /**
+ * Compare `knownOffsets` and `partitionOffsets`. Returns all
partitions that have incorrect
+ * latest offset (offset in `knownOffsets` is great than the one in
`partitionOffsets`).
+ */
+ def findIncorrectOffsets(): Seq[(TopicPartition, Long, Long)] = {
+ var incorrectOffsets = ArrayBuffer[(TopicPartition, Long, Long)]()
+ partitionOffsets.foreach { case (tp, offset) =>
+ knownOffsets.foreach(_.get(tp).foreach { knownOffset =>
+ if (knownOffset > offset) {
+ val incorrectOffset = (tp, knownOffset, offset)
+ incorrectOffsets += incorrectOffset
+ }
+ })
+ }
+ incorrectOffsets
+ }
+
+ // Retry to fetch latest offsets when detecting incorrect offsets. We
don't use
+ // `withRetriesWithoutInterrupt` to retry because:
+ //
+ // - `withRetriesWithoutInterrupt` will reset the consumer for each
attempt but a fresh
+ // consumer has a much bigger chance to hit KAFKA-7703.
+ // - Avoid calling `consumer.poll(0)` which may cause KAFKA-7703.
+ var incorrectOffsets: Seq[(TopicPartition, Long, Long)] = Nil
+ var attempt = 0
+ do {
+ consumer.seekToEnd(partitions)
+ partitionOffsets = partitions.asScala.map(p => p ->
consumer.position(p)).toMap
+ attempt += 1
+
+ incorrectOffsets = findIncorrectOffsets()
+ if (incorrectOffsets.nonEmpty) {
+ logWarning("Found incorrect offsets in some partitions " +
+ s"(partition, previous offset, fetched offset):
$incorrectOffsets")
+ if (attempt < maxOffsetFetchAttempts) {
+ logWarning("Retrying to fetch latest offsets because of
incorrect offsets")
+ Thread.sleep(offsetFetchAttemptIntervalMs)
+ }
+ }
+ } while (incorrectOffsets.nonEmpty && attempt < maxOffsetFetchAttempts)
+
+ logDebug(s"Got latest offsets for partition : $partitionOffsets")
+ partitionOffsets
+ }
}
}
diff --git
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
index 66ec7e0cd084a..d65b3cea632c4 100644
---
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
+++
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
@@ -130,7 +130,7 @@ private[kafka010] class KafkaSource(
metadataLog.get(0).getOrElse {
val offsets = startingOffsets match {
case EarliestOffsetRangeLimit =>
KafkaSourceOffset(kafkaReader.fetchEarliestOffsets())
- case LatestOffsetRangeLimit =>
KafkaSourceOffset(kafkaReader.fetchLatestOffsets())
+ case LatestOffsetRangeLimit =>
KafkaSourceOffset(kafkaReader.fetchLatestOffsets(None))
case SpecificOffsetRangeLimit(p) =>
kafkaReader.fetchSpecificOffsets(p, reportDataLoss)
}
metadataLog.add(0, offsets)
@@ -148,7 +148,8 @@ private[kafka010] class KafkaSource(
// Make sure initialPartitionOffsets is initialized
initialPartitionOffsets
- val latest = kafkaReader.fetchLatestOffsets()
+ val latest = kafkaReader.fetchLatestOffsets(
+ currentPartitionOffsets.orElse(Some(initialPartitionOffsets)))
val offsets = maxOffsetsPerTrigger match {
case None =>
latest
diff --git
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
index d89e45e1e77fe..5f058332de52d 100644
---
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
+++
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
@@ -327,6 +327,54 @@ abstract class KafkaMicroBatchSourceSuiteBase extends
KafkaSourceSuiteBase {
)
}
+ test("subscribe topic by pattern with topic recreation between batches") {
+ val topicPrefix = newTopic()
+ val topic = topicPrefix + "-good"
+ val topic2 = topicPrefix + "-bad"
+ testUtils.createTopic(topic, partitions = 1)
+ testUtils.sendMessages(topic, Array("1", "3"))
+ testUtils.createTopic(topic2, partitions = 1)
+ testUtils.sendMessages(topic2, Array("2", "4"))
+
+ val reader = spark
+ .readStream
+ .format("kafka")
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .option("kafka.metadata.max.age.ms", "1")
+ .option("kafka.default.api.timeout.ms", "3000")
+ .option("startingOffsets", "earliest")
+ .option("subscribePattern", s"$topicPrefix-.*")
+
+ val ds = reader.load()
+ .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+ .as[(String, String)]
+ .map(kv => kv._2.toInt)
+
+ testStream(ds)(
+ StartStream(),
+ AssertOnQuery { q =>
+ q.processAllAvailable()
+ true
+ },
+ CheckAnswer(1, 2, 3, 4),
+ // Restart the stream in this test to make the test stable. When
recreating a topic when a
+ // consumer is alive, it may not be able to see the recreated topic even
if a fresh consumer
+ // has seen it.
+ StopStream,
+ // Recreate `topic2` and wait until it's available
+ WithOffsetSync(new TopicPartition(topic2, 0), expectedOffset = 1) { () =>
+ testUtils.deleteTopic(topic2)
+ testUtils.createTopic(topic2)
+ testUtils.sendMessages(topic2, Array("6"))
+ },
+ StartStream(),
+ ExpectFailure[IllegalStateException](e => {
+ // The offset of `topic2` should be changed from 2 to 1
+ assert(e.getMessage.contains("was changed from 2 to 1"))
+ })
+ )
+ }
+
test("ensure that initial offset are written with an extra byte in the
beginning (SPARK-19517)") {
withTempDir { metadataPath =>
val topic = "kafka-initial-offset-current"
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]