[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/15820#discussion_r89018842 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala --- @@ -34,53 +38,211 @@ import org.apache.spark.internal.Logging private[kafka010] case class CachedKafkaConsumer private( topicPartition: TopicPartition, kafkaParams: ju.Map[String, Object]) extends Logging { + import CachedKafkaConsumer._ private val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String] - private val consumer = { + private var consumer = createConsumer + + /** Iterator to the already fetch data */ + private var fetchedData = ju.Collections.emptyIterator[ConsumerRecord[Array[Byte], Array[Byte]]] + private var nextOffsetInFetchedData = UNKNOWN_OFFSET + + /** Create a KafkaConsumer to fetch records for `topicPartition` */ + private def createConsumer: KafkaConsumer[Array[Byte], Array[Byte]] = { val c = new KafkaConsumer[Array[Byte], Array[Byte]](kafkaParams) val tps = new ju.ArrayList[TopicPartition]() tps.add(topicPartition) c.assign(tps) c } - /** Iterator to the already fetch data */ - private var fetchedData = ju.Collections.emptyIterator[ConsumerRecord[Array[Byte], Array[Byte]]] - private var nextOffsetInFetchedData = -2L - /** - * Get the record for the given offset, waiting up to timeout ms if IO is necessary. - * Sequential forward access will use buffers, but random access will be horribly inefficient. + * Get the record for the given offset if available. Otherwise it will either throw error + * (if failOnDataLoss = true), or return the next available offset within [offset, untilOffset). + * + * @param offset the offset to fetch. + * @param untilOffset the max offset to fetch. Exclusive. + * @param pollTimeoutMs timeout in milliseconds to poll data from Kafka. + * @param failOnDataLoss When `failOnDataLoss` is `true`, this method will either return record at + * offset if available, or throw exception.when `failOnDataLoss` is `false`, + * this method will either return record at offset if available, or return + * the next earliest available record less than untilOffset, or null. It + * will not throw any exception. */ - def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = { + def get( --- End diff -- Can you also document that it can return null. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/15820#discussion_r89020354 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala --- @@ -413,13 +451,59 @@ class KafkaSourceSuite extends KafkaSourceTest { ) } + test("Delete a topic when a Spark job is running") { --- End diff -- nit: D -> d, for consistency. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/15820#discussion_r89019523 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala --- @@ -34,53 +38,211 @@ import org.apache.spark.internal.Logging private[kafka010] case class CachedKafkaConsumer private( topicPartition: TopicPartition, kafkaParams: ju.Map[String, Object]) extends Logging { + import CachedKafkaConsumer._ private val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String] - private val consumer = { + private var consumer = createConsumer + + /** Iterator to the already fetch data */ + private var fetchedData = ju.Collections.emptyIterator[ConsumerRecord[Array[Byte], Array[Byte]]] + private var nextOffsetInFetchedData = UNKNOWN_OFFSET + + /** Create a KafkaConsumer to fetch records for `topicPartition` */ + private def createConsumer: KafkaConsumer[Array[Byte], Array[Byte]] = { val c = new KafkaConsumer[Array[Byte], Array[Byte]](kafkaParams) val tps = new ju.ArrayList[TopicPartition]() tps.add(topicPartition) c.assign(tps) c } - /** Iterator to the already fetch data */ - private var fetchedData = ju.Collections.emptyIterator[ConsumerRecord[Array[Byte], Array[Byte]]] - private var nextOffsetInFetchedData = -2L - /** - * Get the record for the given offset, waiting up to timeout ms if IO is necessary. - * Sequential forward access will use buffers, but random access will be horribly inefficient. + * Get the record for the given offset if available. Otherwise it will either throw error + * (if failOnDataLoss = true), or return the next available offset within [offset, untilOffset). + * + * @param offset the offset to fetch. + * @param untilOffset the max offset to fetch. Exclusive. + * @param pollTimeoutMs timeout in milliseconds to poll data from Kafka. + * @param failOnDataLoss When `failOnDataLoss` is `true`, this method will either return record at + * offset if available, or throw exception.when `failOnDataLoss` is `false`, + * this method will either return record at offset if available, or return + * the next earliest available record less than untilOffset, or null. It + * will not throw any exception. */ - def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = { + def get( + offset: Long, + untilOffset: Long, + pollTimeoutMs: Long, + failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = { +require(offset < untilOffset, + s"offset must always be less than untilOffset [offset: $offset, untilOffset: $untilOffset]") logDebug(s"Get $groupId $topicPartition nextOffset $nextOffsetInFetchedData requested $offset") -if (offset != nextOffsetInFetchedData) { - logInfo(s"Initial fetch for $topicPartition $offset") - seek(offset) - poll(pollTimeoutMs) +// The following loop is basically for `failOnDataLoss = false`. When `failOnDataLoss` is +// `false`, firstly, we will try to fetch the record at `offset`. If no such record, then we +// will move to the next available offset within `[offset, untilOffset)` and retry. +// If `failOnDataLoss` is `true`, the loop body will be executed only once. +var toFetchOffset = offset +while (toFetchOffset != UNKNOWN_OFFSET) { + try { +return fetchData(toFetchOffset, pollTimeoutMs) + } catch { +case e: OffsetOutOfRangeException => + // When there is some error thrown, it's better to use a new consumer to drop all cached + // states in the old consumer. We don't need to worry about the performance because this + // is not a normal path. --- End diff -- normal --> common --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/15820#discussion_r89020163 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala --- @@ -458,6 +457,21 @@ private[kafka010] case class KafkaSource( /** Companion object for the [[KafkaSource]]. */ private[kafka010] object KafkaSource { + val INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE = +""" + |There may have been some data loss because some data may have been aged out in Kafka or + | the topic has been deleted and is therefore unavailable for processing. If you want your --- End diff -- nit: better grammar Some data may have been lost because they are not available in Kafka any more; either the data was aged out by Kafka or the topic may have been deleted before all the data in the topic was processed. Similarly change below. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/15820#discussion_r89013168 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala --- @@ -34,53 +38,211 @@ import org.apache.spark.internal.Logging private[kafka010] case class CachedKafkaConsumer private( topicPartition: TopicPartition, kafkaParams: ju.Map[String, Object]) extends Logging { + import CachedKafkaConsumer._ private val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String] - private val consumer = { + private var consumer = createConsumer + + /** Iterator to the already fetch data */ + private var fetchedData = ju.Collections.emptyIterator[ConsumerRecord[Array[Byte], Array[Byte]]] + private var nextOffsetInFetchedData = UNKNOWN_OFFSET + + /** Create a KafkaConsumer to fetch records for `topicPartition` */ + private def createConsumer: KafkaConsumer[Array[Byte], Array[Byte]] = { val c = new KafkaConsumer[Array[Byte], Array[Byte]](kafkaParams) val tps = new ju.ArrayList[TopicPartition]() tps.add(topicPartition) c.assign(tps) c } - /** Iterator to the already fetch data */ - private var fetchedData = ju.Collections.emptyIterator[ConsumerRecord[Array[Byte], Array[Byte]]] - private var nextOffsetInFetchedData = -2L - /** - * Get the record for the given offset, waiting up to timeout ms if IO is necessary. - * Sequential forward access will use buffers, but random access will be horribly inefficient. + * Get the record for the given offset if available. Otherwise it will either throw error + * (if failOnDataLoss = true), or return the next available offset within [offset, untilOffset). + * + * @param offset the offset to fetch. + * @param untilOffset the max offset to fetch. Exclusive. + * @param pollTimeoutMs timeout in milliseconds to poll data from Kafka. + * @param failOnDataLoss When `failOnDataLoss` is `true`, this method will either return record at + * offset if available, or throw exception.when `failOnDataLoss` is `false`, + * this method will either return record at offset if available, or return + * the next earliest available record less than untilOffset, or null. It + * will not throw any exception. */ - def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = { + def get( + offset: Long, + untilOffset: Long, + pollTimeoutMs: Long, + failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = { +require(offset < untilOffset, + s"offset must always be less than untilOffset [offset: $offset, untilOffset: $untilOffset]") logDebug(s"Get $groupId $topicPartition nextOffset $nextOffsetInFetchedData requested $offset") -if (offset != nextOffsetInFetchedData) { - logInfo(s"Initial fetch for $topicPartition $offset") - seek(offset) - poll(pollTimeoutMs) +// The following loop is basically for `failOnDataLoss = false`. When `failOnDataLoss` is +// `false`, firstly, we will try to fetch the record at `offset`. If no such record, then we --- End diff -- nit: firstly --> first, no such record exists overall +1, thanks for this explanation. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/15820#discussion_r88963845 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala --- @@ -34,53 +38,211 @@ import org.apache.spark.internal.Logging private[kafka010] case class CachedKafkaConsumer private( topicPartition: TopicPartition, kafkaParams: ju.Map[String, Object]) extends Logging { + import CachedKafkaConsumer._ private val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String] - private val consumer = { + private var consumer = createConsumer + + /** Iterator to the already fetch data */ + private var fetchedData = ju.Collections.emptyIterator[ConsumerRecord[Array[Byte], Array[Byte]]] + private var nextOffsetInFetchedData = UNKNOWN_OFFSET + + /** Create a KafkaConsumer to fetch records for `topicPartition` */ + private def createConsumer: KafkaConsumer[Array[Byte], Array[Byte]] = { val c = new KafkaConsumer[Array[Byte], Array[Byte]](kafkaParams) val tps = new ju.ArrayList[TopicPartition]() tps.add(topicPartition) c.assign(tps) c } - /** Iterator to the already fetch data */ - private var fetchedData = ju.Collections.emptyIterator[ConsumerRecord[Array[Byte], Array[Byte]]] - private var nextOffsetInFetchedData = -2L - /** - * Get the record for the given offset, waiting up to timeout ms if IO is necessary. - * Sequential forward access will use buffers, but random access will be horribly inefficient. + * Get the record for the given offset if available. Otherwise it will either throw error + * (if failOnDataLoss = true), or return the next available offset within [offset, untilOffset). + * + * @param offset the offset to fetch. + * @param untilOffset the max offset to fetch. Exclusive. + * @param pollTimeoutMs timeout in milliseconds to poll data from Kafka. + * @param failOnDataLoss When `failOnDataLoss` is `true`, this method will either return record at + * offset if available, or throw exception.when `failOnDataLoss` is `false`, + * this method will either return record at offset if available, or return + * the next earliest available record less than untilOffset, or null. It + * will not throw any exception. */ - def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = { + def get( + offset: Long, + untilOffset: Long, + pollTimeoutMs: Long, + failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = { +require(offset < untilOffset, + s"offset must always be less than untilOffset [offset: $offset, untilOffset: $untilOffset]") logDebug(s"Get $groupId $topicPartition nextOffset $nextOffsetInFetchedData requested $offset") -if (offset != nextOffsetInFetchedData) { - logInfo(s"Initial fetch for $topicPartition $offset") - seek(offset) - poll(pollTimeoutMs) +// The following loop is basically for `failOnDataLoss = false`. When `failOnDataLoss` is +// `false`, firstly, we will try to fetch the record at `offset`. If no such record, then we +// will move to the next available offset within `[offset, untilOffset)` and retry. +// If `failOnDataLoss` is `true`, the loop body will be executed only once. +var toFetchOffset = offset +while (toFetchOffset != UNKNOWN_OFFSET) { + try { +return fetchData(toFetchOffset, pollTimeoutMs) + } catch { +case e: OffsetOutOfRangeException => + // When there is some error thrown, it's better to use a new consumer to drop all cached + // states in the old consumer. We don't need to worry about the performance because this + // is not a normal path. + resetConsumer() + reportDataLoss(failOnDataLoss, s"Cannot fetch offset $toFetchOffset", e) + toFetchOffset = getEarliestAvailableOffsetBetween(toFetchOffset, untilOffset) + } } +resetFetchedData() +null + } -if (!fetchedData.hasNext()) { poll(pollTimeoutMs) } -assert(fetchedData.hasNext(), - s"Failed to get records for $groupId $topicPartition $offset " + -s"after polling for $pollTimeoutMs") -var record = fetchedData.next() + /** + * Return the next earliest available offset in [offset, untilOffset). If all offsets in + * [offset, untilOffset) are invalid (e.g., the topic is deleted and recreated), it will return + * `UNKNOWN_OFFSET`. + */
[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/15820#discussion_r88765764 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala --- @@ -47,40 +51,190 @@ private[kafka010] case class CachedKafkaConsumer private( /** Iterator to the already fetch data */ private var fetchedData = ju.Collections.emptyIterator[ConsumerRecord[Array[Byte], Array[Byte]]] - private var nextOffsetInFetchedData = -2L + private var nextOffsetInFetchedData = UNKNOWN_OFFSET /** - * Get the record for the given offset, waiting up to timeout ms if IO is necessary. - * Sequential forward access will use buffers, but random access will be horribly inefficient. + * Get the record for the given offset if available. Otherwise it will either throw error + * (if failOnDataLoss = true), or return the next available offset within [offset, untilOffset). + * + * @param offset the offset to fetch. + * @param untilOffset the max offset to fetch. Exclusive. + * @param pollTimeoutMs timeout in milliseconds to poll data from Kafka. + * @param failOnDataLoss When `failOnDataLoss` is `true`, this method will either return record at + * offset if available, or throw exception.when `failOnDataLoss` is `false`, + * this method will either return record at offset if available, or return + * the next earliest available record less than untilOffset, or null. It + * will not throw any exception. */ - def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = { + def get( + offset: Long, + untilOffset: Long, + pollTimeoutMs: Long, + failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = { +require(offset < untilOffset, + s"offset must always be less than untilOffset [offset: $offset, untilOffset: $untilOffset]") logDebug(s"Get $groupId $topicPartition nextOffset $nextOffsetInFetchedData requested $offset") -if (offset != nextOffsetInFetchedData) { - logInfo(s"Initial fetch for $topicPartition $offset") - seek(offset) - poll(pollTimeoutMs) +var toFetchOffset = offset +while (toFetchOffset != UNKNOWN_OFFSET) { --- End diff -- add docs for this logic. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/15820#discussion_r88759061 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala --- @@ -223,52 +228,184 @@ class KafkaSourceSuite extends KafkaSourceTest { ) } - test("assign from latest offsets") { -val topic = newTopic() -testFromLatestOffsets(topic, false, "assign" -> assignString(topic, 0 to 4)) - } + test("access offset 0 in Spark job but the topic has been deleted") { +KafkaSourceSuite.collectedData.clear() - test("assign from earliest offsets") { val topic = newTopic() -testFromEarliestOffsets(topic, false, "assign" -> assignString(topic, 0 to 4)) - } +testUtils.createTopic(topic, partitions = 1) +testUtils.sendMessages(topic, (1 to 10).map(_.toString).toArray) - test("assign from specific offsets") { -val topic = newTopic() -testFromSpecificOffsets(topic, "assign" -> assignString(topic, 0 to 4)) - } +val reader = spark + .readStream + .format("kafka") + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("kafka.metadata.max.age.ms", "1") + .option("subscribe", topic) + // If a topic is deleted and we try to poll data starting from offset 0, + // the Kafka consumer will just block until timeout and return an empty result. + // So set the timeout to 1 second to make this test fast. + .option("kafkaConsumer.pollTimeoutMs", "1000") + .option("startingOffsets", "earliest") + .option("failOnDataLoss", "false") +val kafka = reader.load() + .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + .as[(String, String)] +KafkaSourceSuite.globalTestUtils = testUtils +val query = kafka.map(kv => kv._2.toInt).writeStream.foreach(new ForeachWriter[Int] { --- End diff -- move this test lower. after the basic tests. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/15820#discussion_r88759005 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala --- @@ -223,52 +228,184 @@ class KafkaSourceSuite extends KafkaSourceTest { ) } - test("assign from latest offsets") { -val topic = newTopic() -testFromLatestOffsets(topic, false, "assign" -> assignString(topic, 0 to 4)) - } + test("access offset 0 in Spark job but the topic has been deleted") { +KafkaSourceSuite.collectedData.clear() - test("assign from earliest offsets") { val topic = newTopic() -testFromEarliestOffsets(topic, false, "assign" -> assignString(topic, 0 to 4)) - } +testUtils.createTopic(topic, partitions = 1) +testUtils.sendMessages(topic, (1 to 10).map(_.toString).toArray) - test("assign from specific offsets") { -val topic = newTopic() -testFromSpecificOffsets(topic, "assign" -> assignString(topic, 0 to 4)) - } +val reader = spark + .readStream + .format("kafka") + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("kafka.metadata.max.age.ms", "1") + .option("subscribe", topic) + // If a topic is deleted and we try to poll data starting from offset 0, + // the Kafka consumer will just block until timeout and return an empty result. + // So set the timeout to 1 second to make this test fast. + .option("kafkaConsumer.pollTimeoutMs", "1000") + .option("startingOffsets", "earliest") + .option("failOnDataLoss", "false") +val kafka = reader.load() + .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + .as[(String, String)] +KafkaSourceSuite.globalTestUtils = testUtils +val query = kafka.map(kv => kv._2.toInt).writeStream.foreach(new ForeachWriter[Int] { + override def open(partitionId: Long, version: Long): Boolean = { +KafkaSourceSuite.globalTestUtils.deleteTopic(topic) +true + } - test("subscribing topic by name from latest offsets") { -val topic = newTopic() -testFromLatestOffsets(topic, true, "subscribe" -> topic) - } + override def process(value: Int): Unit = { +KafkaSourceSuite.collectedData.add(value) + } - test("subscribing topic by name from earliest offsets") { -val topic = newTopic() -testFromEarliestOffsets(topic, true, "subscribe" -> topic) + override def close(errorOrNull: Throwable): Unit = {} +}).start() +query.processAllAvailable() +assert(KafkaSourceSuite.collectedData.isEmpty) +query.stop() +// `failOnDataLoss` is `false`, we should not fail the query +assert(query.exception.isEmpty) } - test("subscribing topic by name from specific offsets") { + test("access non-zero offset in Spark job but the topic has been deleted") { --- End diff -- remove this test. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/15820#discussion_r88758879 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala --- @@ -223,52 +228,184 @@ class KafkaSourceSuite extends KafkaSourceTest { ) } - test("assign from latest offsets") { -val topic = newTopic() -testFromLatestOffsets(topic, false, "assign" -> assignString(topic, 0 to 4)) - } + test("access offset 0 in Spark job but the topic has been deleted") { +KafkaSourceSuite.collectedData.clear() - test("assign from earliest offsets") { val topic = newTopic() -testFromEarliestOffsets(topic, false, "assign" -> assignString(topic, 0 to 4)) - } +testUtils.createTopic(topic, partitions = 1) +testUtils.sendMessages(topic, (1 to 10).map(_.toString).toArray) - test("assign from specific offsets") { -val topic = newTopic() -testFromSpecificOffsets(topic, "assign" -> assignString(topic, 0 to 4)) - } +val reader = spark + .readStream + .format("kafka") + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("kafka.metadata.max.age.ms", "1") + .option("subscribe", topic) + // If a topic is deleted and we try to poll data starting from offset 0, + // the Kafka consumer will just block until timeout and return an empty result. + // So set the timeout to 1 second to make this test fast. + .option("kafkaConsumer.pollTimeoutMs", "1000") + .option("startingOffsets", "earliest") + .option("failOnDataLoss", "false") +val kafka = reader.load() + .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + .as[(String, String)] +KafkaSourceSuite.globalTestUtils = testUtils +val query = kafka.map(kv => kv._2.toInt).writeStream.foreach(new ForeachWriter[Int] { --- End diff -- add explanation. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/15820#discussion_r88758556 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala --- @@ -626,3 +776,106 @@ class KafkaSourceStressSuite extends KafkaSourceTest { iterations = 50) } } + +class KafkaSourceStressForDontFailOnDataLossSuite extends StreamTest with SharedSQLContext { + + import testImplicits._ + + private var testUtils: KafkaTestUtils = _ + + private val topicId = new AtomicInteger(0) + + private def newTopic(): String = s"failOnDataLoss-${topicId.getAndIncrement()}" + + override def beforeAll(): Unit = { +super.beforeAll() +testUtils = new KafkaTestUtils { + override def brokerConfiguration: Properties = { +val props = super.brokerConfiguration +// Try to make Kafka clean up messages as fast as possible. However, there is a hard-code +// 30 seconds delay (kafka.log.LogManager.InitialTaskDelayMs) so this test should run at +// least 30 seconds. +props.put("log.cleaner.backoff.ms", "100") +props.put("log.segment.bytes", "40") +props.put("log.retention.bytes", "40") +props.put("log.retention.check.interval.ms", "100") +props.put("delete.retention.ms", "10") +props.put("log.flush.scheduler.interval.ms", "10") +props + } +} +testUtils.setup() + } + + override def afterAll(): Unit = { +if (testUtils != null) { + testUtils.teardown() + testUtils = null + super.afterAll() +} + } + + test("stress test for failOnDataLoss=false") { +val reader = spark + .readStream + .format("kafka") + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("kafka.metadata.max.age.ms", "1") + .option("subscribePattern", "failOnDataLoss.*") + .option("startingOffsets", "earliest") + .option("failOnDataLoss", "false") +val kafka = reader.load() + .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + .as[(String, String)] +val query = kafka.map(kv => kv._2.toInt).writeStream.foreach(new ForeachWriter[Int] { + + override def open(partitionId: Long, version: Long): Boolean = { +true + } + + override def process(value: Int): Unit = { +// Slow down the processing speed so that messages may be aged out. +Thread.sleep(Random.nextInt(100)) + } + + override def close(errorOrNull: Throwable): Unit = { + } +}).start() + +val testTime = 1.minutes +val startTime = System.currentTimeMillis() +val topics = mutable.ArrayBuffer[String]() +while (System.currentTimeMillis() - testTime.toMillis < startTime) { + Random.nextInt(6) match { +case 0 => + val topic = newTopic() + topics += topic + testUtils.createTopic(topic, partitions = 1) +case 1 => --- End diff -- Update test to recreate same topics. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/15820#discussion_r88758444 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala --- @@ -47,40 +51,191 @@ private[kafka010] case class CachedKafkaConsumer private( /** Iterator to the already fetch data */ private var fetchedData = ju.Collections.emptyIterator[ConsumerRecord[Array[Byte], Array[Byte]]] - private var nextOffsetInFetchedData = -2L + private var nextOffsetInFetchedData = UNKNOWN_OFFSET /** * Get the record for the given offset, waiting up to timeout ms if IO is necessary. * Sequential forward access will use buffers, but random access will be horribly inefficient. + * + * When `failOnDataLoss` is `true`, this will either return record at offset if available, or + * throw exception. + * + * When `failOnDataLoss` is `false`, this will either return record at offset if available, or + * return the next earliest available record less than untilOffset, or null. It will not throw + * any exception. */ - def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = { + def get( + offset: Long, + untilOffset: Long, + pollTimeoutMs: Long, + failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = { +require(offset < untilOffset, + s"offset must always be less than untilOffset [offset: $offset, untilOffset: $untilOffset]") logDebug(s"Get $groupId $topicPartition nextOffset $nextOffsetInFetchedData requested $offset") -if (offset != nextOffsetInFetchedData) { - logInfo(s"Initial fetch for $topicPartition $offset") - seek(offset) - poll(pollTimeoutMs) +var toFetchOffset = offset +while (toFetchOffset != UNKNOWN_OFFSET) { + try { +val record = fetchData(toFetchOffset, untilOffset, pollTimeoutMs, failOnDataLoss) +if (record == null) { + reset() +} +return record + } catch { +case e: OffsetOutOfRangeException => + val message = +if (failOnDataLoss) { + s"Cannot fetch offset $toFetchOffset" +} else { + s"Cannot fetch offset $toFetchOffset. Some data may be lost. " + +"Recovering from the earliest offset" +} + reportDataLoss(failOnDataLoss, message, e) + toFetchOffset = getNextEarliestOffset(toFetchOffset, untilOffset) + } } +reset() +null + } -if (!fetchedData.hasNext()) { poll(pollTimeoutMs) } -assert(fetchedData.hasNext(), - s"Failed to get records for $groupId $topicPartition $offset " + -s"after polling for $pollTimeoutMs") -var record = fetchedData.next() + /** + * Return the next earliest available offset in [offset, untilOffset). If all offsets in + * [offset, untilOffset) are invalid (e.g., the topic is deleted and recreated), it will return + * `UNKNOWN_OFFSET`. + */ + private def getNextEarliestOffset(offset: Long, untilOffset: Long): Long = { +val (earliestOffset, latestOffset) = getAvailableOffsetRange() +if (offset >= latestOffset || earliestOffset >= untilOffset) { + // [offset, untilOffset) and [earliestOffset, latestOffset) have no overlap, + // either + // + // ^ ^ ^ ^ + // | | | | + // earliestOffset latestOffset offset untilOffset + // + // or + // + // ^ ^ ^^ + // | | || + // offset untilOffset earliestOffset latestOffset + val warningMessage = +s""" + |The current available offset range is [$earliestOffset, $latestOffset). + | Offset ${offset} is out of range, and records in [$offset, $untilOffset) will be + | skipped ${additionalMessage(failOnDataLoss = false)} +""".stripMargin + logWarning(warningMessage) + UNKNOWN_OFFSET +} else if (offset >= earliestOffset) { + // - + // ^^ ^ ^ + // || | | + // earliestOffset offset
[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/15820#discussion_r88758179 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala --- @@ -47,40 +51,191 @@ private[kafka010] case class CachedKafkaConsumer private( /** Iterator to the already fetch data */ private var fetchedData = ju.Collections.emptyIterator[ConsumerRecord[Array[Byte], Array[Byte]]] - private var nextOffsetInFetchedData = -2L + private var nextOffsetInFetchedData = UNKNOWN_OFFSET /** * Get the record for the given offset, waiting up to timeout ms if IO is necessary. --- End diff -- nit: update docs to clarify earlier that this may not return offset Get the record for the given offset if available. Otherwise it will either throw error (if failOnDataLoss = true), or return the next available offset within [offset, untilOffset]. Use @param to explain pollTimeoutMs and others in more detail --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/15820#discussion_r88757785 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala --- @@ -47,40 +51,191 @@ private[kafka010] case class CachedKafkaConsumer private( /** Iterator to the already fetch data */ private var fetchedData = ju.Collections.emptyIterator[ConsumerRecord[Array[Byte], Array[Byte]]] - private var nextOffsetInFetchedData = -2L + private var nextOffsetInFetchedData = UNKNOWN_OFFSET /** * Get the record for the given offset, waiting up to timeout ms if IO is necessary. * Sequential forward access will use buffers, but random access will be horribly inefficient. + * + * When `failOnDataLoss` is `true`, this will either return record at offset if available, or + * throw exception. + * + * When `failOnDataLoss` is `false`, this will either return record at offset if available, or + * return the next earliest available record less than untilOffset, or null. It will not throw + * any exception. */ - def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = { + def get( + offset: Long, + untilOffset: Long, + pollTimeoutMs: Long, + failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = { +require(offset < untilOffset, + s"offset must always be less than untilOffset [offset: $offset, untilOffset: $untilOffset]") logDebug(s"Get $groupId $topicPartition nextOffset $nextOffsetInFetchedData requested $offset") -if (offset != nextOffsetInFetchedData) { - logInfo(s"Initial fetch for $topicPartition $offset") - seek(offset) - poll(pollTimeoutMs) +var toFetchOffset = offset +while (toFetchOffset != UNKNOWN_OFFSET) { + try { +val record = fetchData(toFetchOffset, untilOffset, pollTimeoutMs, failOnDataLoss) +if (record == null) { + reset() +} +return record + } catch { +case e: OffsetOutOfRangeException => + val message = +if (failOnDataLoss) { + s"Cannot fetch offset $toFetchOffset" +} else { + s"Cannot fetch offset $toFetchOffset. Some data may be lost. " + +"Recovering from the earliest offset" +} + reportDataLoss(failOnDataLoss, message, e) + toFetchOffset = getNextEarliestOffset(toFetchOffset, untilOffset) + } } +reset() +null + } -if (!fetchedData.hasNext()) { poll(pollTimeoutMs) } -assert(fetchedData.hasNext(), - s"Failed to get records for $groupId $topicPartition $offset " + -s"after polling for $pollTimeoutMs") -var record = fetchedData.next() + /** + * Return the next earliest available offset in [offset, untilOffset). If all offsets in + * [offset, untilOffset) are invalid (e.g., the topic is deleted and recreated), it will return + * `UNKNOWN_OFFSET`. + */ + private def getNextEarliestOffset(offset: Long, untilOffset: Long): Long = { --- End diff -- nit: `getEarliestAvailableOffsetBetween(...)` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/15820#discussion_r88366565 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala --- @@ -47,40 +51,229 @@ private[kafka010] case class CachedKafkaConsumer private( /** Iterator to the already fetch data */ private var fetchedData = ju.Collections.emptyIterator[ConsumerRecord[Array[Byte], Array[Byte]]] - private var nextOffsetInFetchedData = -2L + private var nextOffsetInFetchedData = UNKNOWN_OFFSET /** * Get the record for the given offset, waiting up to timeout ms if IO is necessary. * Sequential forward access will use buffers, but random access will be horribly inefficient. + * + * If `failOnDataLoss` is `false`, it will try to get the earliest record in + * `[offset, untilOffset)` when some illegal state happens. Otherwise, an `IllegalStateException` --- End diff -- "Otherwise" means, when failOnDataLoss is false but not some illegal state? its confusing. Rather just rewrite it as When `failOnDataLoss` is `true`, this will either return record at offset if available, or throw exception. When `failOnDataLoss` is `false`, this will either return record at offset if available, or return the next earliest available record < untilOffset, or null. It will not throw any exception. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/15820#discussion_r88363378 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala --- @@ -96,10 +289,20 @@ private[kafka010] case class CachedKafkaConsumer private( logDebug(s"Polled $groupId ${p.partitions()} ${r.size}") fetchedData = r.iterator } + + private def getCurrentOffsetRange(): (Long, Long) = { --- End diff -- getCurrentOffsetRange -> getValidOffsetRange or getAvailableOffsetRange to make it more clear on what this is. And add docs. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/15820#discussion_r88367402 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala --- @@ -47,40 +51,229 @@ private[kafka010] case class CachedKafkaConsumer private( /** Iterator to the already fetch data */ private var fetchedData = ju.Collections.emptyIterator[ConsumerRecord[Array[Byte], Array[Byte]]] - private var nextOffsetInFetchedData = -2L + private var nextOffsetInFetchedData = UNKNOWN_OFFSET /** * Get the record for the given offset, waiting up to timeout ms if IO is necessary. * Sequential forward access will use buffers, but random access will be horribly inefficient. + * + * If `failOnDataLoss` is `false`, it will try to get the earliest record in + * `[offset, untilOffset)` when some illegal state happens. Otherwise, an `IllegalStateException` + * will be thrown. + * + * It returns `null` only when `failOnDataLoss` is `false` and it cannot fetch any record between + * [offset, untilOffset). */ - def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = { + def get( + offset: Long, + untilOffset: Long, + pollTimeoutMs: Long, + failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = { +require(offset < untilOffset, s"offset: $offset, untilOffset: $untilOffset") logDebug(s"Get $groupId $topicPartition nextOffset $nextOffsetInFetchedData requested $offset") +try { + fetchDataIfNeeded(offset, pollTimeoutMs) + getRecordFromFetchedData(offset, untilOffset, failOnDataLoss) +} catch { + case e: OffsetOutOfRangeException => +val message = + if (failOnDataLoss) { +s"""Cannot fetch offset $offset. (GroupId: $groupId, TopicPartition: $topicPartition). + | $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE""".stripMargin + } else { +s"""Cannot fetch offset $offset. Some data may be lost. Recovering from the earliest + | offset (GroupId: $groupId, TopicPartition: $topicPartition).""".stripMargin + } +reportDataLoss(failOnDataLoss, message, e) +advanceToEarliestOffsetAndFetch(offset, untilOffset, pollTimeoutMs) +} + } + + /** + * Check the pre-fetched data with `offset` and try to fetch from Kafka if they don't match. + */ + private def fetchDataIfNeeded(offset: Long, pollTimeoutMs: Long): Unit = { if (offset != nextOffsetInFetchedData) { logInfo(s"Initial fetch for $topicPartition $offset") seek(offset) poll(pollTimeoutMs) +} else if (!fetchedData.hasNext()) { + // The last pre-fetched data has been drained. + // Seek to the offset because we may call seekToBeginning or seekToEnd before this. + seek(offset) + poll(pollTimeoutMs) } + } -if (!fetchedData.hasNext()) { poll(pollTimeoutMs) } -assert(fetchedData.hasNext(), - s"Failed to get records for $groupId $topicPartition $offset " + -s"after polling for $pollTimeoutMs") -var record = fetchedData.next() + /** + * Try to advance to the beginning offset and fetch again. `earliestOffset` should be in + * `[offset, untilOffset]`. If not, it will try to fetch `offset` again if it's in + * `[earliestOffset, latestOffset)`. Otherwise, it will return null and reset the pre-fetched + * data. + */ + private def advanceToEarliestOffsetAndFetch( + offset: Long, + untilOffset: Long, + pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = { +val (earliestOffset, latestOffset) = getCurrentOffsetRange() +if (offset >= latestOffset || earliestOffset >= untilOffset) { + // [offset, untilOffset) and [earliestOffset, latestOffset) have no overlap, + // either + // + // ^ ^ ^ ^ + // | | | | + // earliestOffset latestOffset offset untilOffset + // + // or + // + // ^ ^ ^^ + // | | || + // offset untilOffset earliestOffset latestOffset + val warningMessage = +s""" + |The current available offset range is [$earliestOffset, $latestOffset). + | Offset ${offset} is out of range, and records in
[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/15820#discussion_r88364958 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala --- @@ -47,40 +51,229 @@ private[kafka010] case class CachedKafkaConsumer private( /** Iterator to the already fetch data */ private var fetchedData = ju.Collections.emptyIterator[ConsumerRecord[Array[Byte], Array[Byte]]] - private var nextOffsetInFetchedData = -2L + private var nextOffsetInFetchedData = UNKNOWN_OFFSET /** * Get the record for the given offset, waiting up to timeout ms if IO is necessary. * Sequential forward access will use buffers, but random access will be horribly inefficient. + * + * If `failOnDataLoss` is `false`, it will try to get the earliest record in --- End diff -- nit: earliest available record --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/15820#discussion_r88363098 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala --- @@ -47,40 +51,229 @@ private[kafka010] case class CachedKafkaConsumer private( /** Iterator to the already fetch data */ private var fetchedData = ju.Collections.emptyIterator[ConsumerRecord[Array[Byte], Array[Byte]]] - private var nextOffsetInFetchedData = -2L + private var nextOffsetInFetchedData = UNKNOWN_OFFSET /** * Get the record for the given offset, waiting up to timeout ms if IO is necessary. * Sequential forward access will use buffers, but random access will be horribly inefficient. + * + * If `failOnDataLoss` is `false`, it will try to get the earliest record in + * `[offset, untilOffset)` when some illegal state happens. Otherwise, an `IllegalStateException` + * will be thrown. + * + * It returns `null` only when `failOnDataLoss` is `false` and it cannot fetch any record between + * [offset, untilOffset). */ - def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = { + def get( + offset: Long, + untilOffset: Long, + pollTimeoutMs: Long, + failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = { +require(offset < untilOffset, s"offset: $offset, untilOffset: $untilOffset") --- End diff -- give better error message, saying "offset must always be less than untilOffset" --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/15820#discussion_r88363196 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala --- @@ -47,40 +51,229 @@ private[kafka010] case class CachedKafkaConsumer private( /** Iterator to the already fetch data */ private var fetchedData = ju.Collections.emptyIterator[ConsumerRecord[Array[Byte], Array[Byte]]] - private var nextOffsetInFetchedData = -2L + private var nextOffsetInFetchedData = UNKNOWN_OFFSET /** * Get the record for the given offset, waiting up to timeout ms if IO is necessary. * Sequential forward access will use buffers, but random access will be horribly inefficient. + * + * If `failOnDataLoss` is `false`, it will try to get the earliest record in + * `[offset, untilOffset)` when some illegal state happens. Otherwise, an `IllegalStateException` + * will be thrown. + * + * It returns `null` only when `failOnDataLoss` is `false` and it cannot fetch any record between + * [offset, untilOffset). */ - def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = { + def get( + offset: Long, + untilOffset: Long, + pollTimeoutMs: Long, + failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = { +require(offset < untilOffset, s"offset: $offset, untilOffset: $untilOffset") logDebug(s"Get $groupId $topicPartition nextOffset $nextOffsetInFetchedData requested $offset") +try { + fetchDataIfNeeded(offset, pollTimeoutMs) + getRecordFromFetchedData(offset, untilOffset, failOnDataLoss) +} catch { + case e: OffsetOutOfRangeException => +val message = + if (failOnDataLoss) { +s"""Cannot fetch offset $offset. (GroupId: $groupId, TopicPartition: $topicPartition). + | $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE""".stripMargin + } else { +s"""Cannot fetch offset $offset. Some data may be lost. Recovering from the earliest + | offset (GroupId: $groupId, TopicPartition: $topicPartition).""".stripMargin + } +reportDataLoss(failOnDataLoss, message, e) +advanceToEarliestOffsetAndFetch(offset, untilOffset, pollTimeoutMs) +} + } + + /** + * Check the pre-fetched data with `offset` and try to fetch from Kafka if they don't match. + */ + private def fetchDataIfNeeded(offset: Long, pollTimeoutMs: Long): Unit = { if (offset != nextOffsetInFetchedData) { logInfo(s"Initial fetch for $topicPartition $offset") seek(offset) poll(pollTimeoutMs) +} else if (!fetchedData.hasNext()) { + // The last pre-fetched data has been drained. + // Seek to the offset because we may call seekToBeginning or seekToEnd before this. + seek(offset) + poll(pollTimeoutMs) --- End diff -- Why havent these two cases been merged? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/15820#discussion_r88366202 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala --- @@ -47,40 +51,229 @@ private[kafka010] case class CachedKafkaConsumer private( /** Iterator to the already fetch data */ private var fetchedData = ju.Collections.emptyIterator[ConsumerRecord[Array[Byte], Array[Byte]]] - private var nextOffsetInFetchedData = -2L + private var nextOffsetInFetchedData = UNKNOWN_OFFSET /** * Get the record for the given offset, waiting up to timeout ms if IO is necessary. * Sequential forward access will use buffers, but random access will be horribly inefficient. + * + * If `failOnDataLoss` is `false`, it will try to get the earliest record in + * `[offset, untilOffset)` when some illegal state happens. Otherwise, an `IllegalStateException` + * will be thrown. + * + * It returns `null` only when `failOnDataLoss` is `false` and it cannot fetch any record between + * [offset, untilOffset). */ - def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = { + def get( + offset: Long, + untilOffset: Long, + pollTimeoutMs: Long, + failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = { +require(offset < untilOffset, s"offset: $offset, untilOffset: $untilOffset") logDebug(s"Get $groupId $topicPartition nextOffset $nextOffsetInFetchedData requested $offset") +try { + fetchDataIfNeeded(offset, pollTimeoutMs) + getRecordFromFetchedData(offset, untilOffset, failOnDataLoss) +} catch { + case e: OffsetOutOfRangeException => +val message = + if (failOnDataLoss) { +s"""Cannot fetch offset $offset. (GroupId: $groupId, TopicPartition: $topicPartition). + | $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE""".stripMargin + } else { +s"""Cannot fetch offset $offset. Some data may be lost. Recovering from the earliest + | offset (GroupId: $groupId, TopicPartition: $topicPartition).""".stripMargin + } +reportDataLoss(failOnDataLoss, message, e) +advanceToEarliestOffsetAndFetch(offset, untilOffset, pollTimeoutMs) +} + } + + /** + * Check the pre-fetched data with `offset` and try to fetch from Kafka if they don't match. + */ + private def fetchDataIfNeeded(offset: Long, pollTimeoutMs: Long): Unit = { if (offset != nextOffsetInFetchedData) { logInfo(s"Initial fetch for $topicPartition $offset") seek(offset) poll(pollTimeoutMs) +} else if (!fetchedData.hasNext()) { + // The last pre-fetched data has been drained. + // Seek to the offset because we may call seekToBeginning or seekToEnd before this. + seek(offset) + poll(pollTimeoutMs) } + } -if (!fetchedData.hasNext()) { poll(pollTimeoutMs) } -assert(fetchedData.hasNext(), - s"Failed to get records for $groupId $topicPartition $offset " + -s"after polling for $pollTimeoutMs") -var record = fetchedData.next() + /** + * Try to advance to the beginning offset and fetch again. `earliestOffset` should be in + * `[offset, untilOffset]`. If not, it will try to fetch `offset` again if it's in + * `[earliestOffset, latestOffset)`. Otherwise, it will return null and reset the pre-fetched + * data. + */ + private def advanceToEarliestOffsetAndFetch( + offset: Long, + untilOffset: Long, + pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = { +val (earliestOffset, latestOffset) = getCurrentOffsetRange() +if (offset >= latestOffset || earliestOffset >= untilOffset) { + // [offset, untilOffset) and [earliestOffset, latestOffset) have no overlap, + // either + // + // ^ ^ ^ ^ + // | | | | + // earliestOffset latestOffset offset untilOffset + // + // or + // + // ^ ^ ^^ + // | | || + // offset untilOffset earliestOffset latestOffset + val warningMessage = +s""" + |The current available offset range is [$earliestOffset, $latestOffset). + | Offset ${offset} is out of range, and records in
[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/15820#discussion_r88360922 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala --- @@ -83,6 +86,129 @@ private[kafka010] case class CachedKafkaConsumer private( record } + /** + * Get the record at the `offset`. If it doesn't exist, try to get the earliest record in + * `[offset, untilOffset)`. + */ + def getAndIgnoreLostData( + offset: Long, + untilOffset: Long, + pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = { +// scalastyle:off +// When `failOnDataLoss` is `false`, we need to handle the following cases (note: untilOffset and latestOffset are exclusive): +// 1. Some data are aged out, and `offset < beginningOffset <= untilOffset - 1 <= latestOffset - 1` +// Seek to the beginningOffset and fetch the data. +// 2. Some data are aged out, and `offset <= untilOffset - 1 < beginningOffset`. +// There is nothing to fetch, return null. +// 3. The topic is deleted. +// There is nothing to fetch, return null. +// 4. The topic is deleted and recreated, and `beginningOffset <= offset <= untilOffset - 1 <= latestOffset - 1`. +// We cannot detect this case. We can still fetch data like nothing happens. +// 5. The topic is deleted and recreated, and `beginningOffset <= offset < latestOffset - 1 < untilOffset - 1`. +// Same as 4. +// 6. The topic is deleted and recreated, and `beginningOffset <= latestOffset - 1 < offset <= untilOffset - 1`. +// There is nothing to fetch, return null. +// 7. The topic is deleted and recreated, and `offset < beginningOffset <= untilOffset - 1`. +// Same as 1. +// 8. The topic is deleted and recreated, and `offset <= untilOffset - 1 < beginningOffset`. +// There is nothing to fetch, return null. +// scalastyle:on +require(offset < untilOffset, s"offset: $offset, untilOffset: $untilOffset") +logDebug(s"Get $groupId $topicPartition nextOffset $nextOffsetInFetchedData requested $offset") +try { + if (offset != nextOffsetInFetchedData) { +logInfo(s"Initial fetch for $topicPartition $offset") +seek(offset) +poll(pollTimeoutMs) + } else if (!fetchedData.hasNext()) { +// The last pre-fetched data has been drained. +poll(pollTimeoutMs) + } + getRecordFromFetchedData(offset, untilOffset) +} catch { + case e: OffsetOutOfRangeException => +logWarning(s"Cannot fetch offset $offset, try to recover from the beginning offset", e) +advanceToBeginningOffsetAndFetch(offset, untilOffset, pollTimeoutMs) +} + } + + /** + * Try to advance to the beginning offset and fetch again. `beginningOffset` should be in + * `[offset, untilOffset]`. If not, it will try to fetch `offset` again if it's in + * `[beginningOffset, latestOffset)`. Otherwise, it will return null and reset the pre-fetched + * data. + */ + private def advanceToBeginningOffsetAndFetch( + offset: Long, + untilOffset: Long, + pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = { +val beginningOffset = getBeginningOffset() +if (beginningOffset <= offset) { + val latestOffset = getLatestOffset() + if (latestOffset <= offset) { +// beginningOffset <= latestOffset - 1 < offset <= untilOffset - 1 +logWarning(s"Offset ${offset} is later than the latest offset $latestOffset. " + + s"Skipped [$offset, $untilOffset)") +reset() +null + } else { +// beginningOffset <= offset <= min(latestOffset - 1, untilOffset - 1) +getAndIgnoreLostData(offset, untilOffset, pollTimeoutMs) + } +} else { + if (beginningOffset >= untilOffset) { +// offset <= untilOffset - 1 < beginningOffset +logWarning(s"Buffer miss for $groupId $topicPartition [$offset, $untilOffset)") +reset() +null + } else { +// offset < beginningOffset <= untilOffset - 1 +logWarning(s"Buffer miss for $groupId $topicPartition [$offset, $beginningOffset)") +getAndIgnoreLostData(beginningOffset, untilOffset, pollTimeoutMs) + } +} + } + + /** + * Get the earliest record in [offset, untilOffset) from the fetched data. If there is no such + * record, returns null. Must be called after `poll`. + */ + private def getRecordFromFetchedData( + offset: Long, + untilOffset: Long):
[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/15820#discussion_r88139985 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala --- @@ -83,6 +86,139 @@ private[kafka010] case class CachedKafkaConsumer private( record } + /** + * Get the record at the `offset`. If it doesn't exist, try to get the earliest record in + * `[offset, untilOffset)`. + */ + def getAndIgnoreLostData( + offset: Long, + untilOffset: Long, + pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = { +// scalastyle:off +// When `failOnDataLoss` is `false`, we need to handle the following cases (note: untilOffset and latestOffset are exclusive): +// 1. Some data are aged out, and `offset < beginningOffset <= untilOffset - 1 <= latestOffset - 1` +// Seek to the beginningOffset and fetch the data. +// 2. Some data are aged out, and `offset <= untilOffset - 1 < beginningOffset`. +// There is nothing to fetch, return null. +// 3. The topic is deleted. +// There is nothing to fetch, return null. +// 4. The topic is deleted and recreated, and `beginningOffset <= offset <= untilOffset - 1 <= latestOffset - 1`. +// We cannot detect this case. We can still fetch data like nothing happens. +// 5. The topic is deleted and recreated, and `beginningOffset <= offset < latestOffset - 1 < untilOffset - 1`. +// Same as 4. +// 6. The topic is deleted and recreated, and `beginningOffset <= latestOffset - 1 < offset <= untilOffset - 1`. +// There is nothing to fetch, return null. +// 7. The topic is deleted and recreated, and `offset < beginningOffset <= untilOffset - 1`. +// Same as 1. +// 8. The topic is deleted and recreated, and `offset <= untilOffset - 1 < beginningOffset`. +// There is nothing to fetch, return null. +// scalastyle:on +require(offset < untilOffset, s"offset: $offset, untilOffset: $untilOffset") +logDebug(s"Get $groupId $topicPartition nextOffset $nextOffsetInFetchedData requested $offset") +try { + if (offset != nextOffsetInFetchedData) { +logInfo(s"Initial fetch for $topicPartition $offset") +seek(offset) +poll(pollTimeoutMs) + } else if (!fetchedData.hasNext()) { +// The last pre-fetched data has been drained. +poll(pollTimeoutMs) + } + getRecordFromFetchedData(offset, untilOffset) +} catch { + case e: OffsetOutOfRangeException => +logWarning(s"Cannot fetch offset $offset. Some data may be lost. Recovering from " + + "the beginning offset", e) +advanceToBeginningOffsetAndFetch(offset, untilOffset, pollTimeoutMs) +} + } + + /** + * Try to advance to the beginning offset and fetch again. `beginningOffset` should be in + * `[offset, untilOffset]`. If not, it will try to fetch `offset` again if it's in + * `[beginningOffset, latestOffset)`. Otherwise, it will return null and reset the pre-fetched + * data. + */ + private def advanceToBeginningOffsetAndFetch( + offset: Long, + untilOffset: Long, + pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = { +val beginningOffset = getBeginningOffset() +if (beginningOffset <= offset) { --- End diff -- nit: its easier to think when "offset" is before in the condition, i.e. offset >= beginningOffset --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/15820#discussion_r88140653 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala --- @@ -83,6 +86,139 @@ private[kafka010] case class CachedKafkaConsumer private( record } + /** + * Get the record at the `offset`. If it doesn't exist, try to get the earliest record in + * `[offset, untilOffset)`. + */ + def getAndIgnoreLostData( + offset: Long, + untilOffset: Long, + pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = { +// scalastyle:off +// When `failOnDataLoss` is `false`, we need to handle the following cases (note: untilOffset and latestOffset are exclusive): +// 1. Some data are aged out, and `offset < beginningOffset <= untilOffset - 1 <= latestOffset - 1` +// Seek to the beginningOffset and fetch the data. +// 2. Some data are aged out, and `offset <= untilOffset - 1 < beginningOffset`. +// There is nothing to fetch, return null. +// 3. The topic is deleted. +// There is nothing to fetch, return null. +// 4. The topic is deleted and recreated, and `beginningOffset <= offset <= untilOffset - 1 <= latestOffset - 1`. +// We cannot detect this case. We can still fetch data like nothing happens. +// 5. The topic is deleted and recreated, and `beginningOffset <= offset < latestOffset - 1 < untilOffset - 1`. +// Same as 4. +// 6. The topic is deleted and recreated, and `beginningOffset <= latestOffset - 1 < offset <= untilOffset - 1`. +// There is nothing to fetch, return null. +// 7. The topic is deleted and recreated, and `offset < beginningOffset <= untilOffset - 1`. +// Same as 1. +// 8. The topic is deleted and recreated, and `offset <= untilOffset - 1 < beginningOffset`. +// There is nothing to fetch, return null. +// scalastyle:on +require(offset < untilOffset, s"offset: $offset, untilOffset: $untilOffset") +logDebug(s"Get $groupId $topicPartition nextOffset $nextOffsetInFetchedData requested $offset") +try { + if (offset != nextOffsetInFetchedData) { +logInfo(s"Initial fetch for $topicPartition $offset") +seek(offset) +poll(pollTimeoutMs) + } else if (!fetchedData.hasNext()) { +// The last pre-fetched data has been drained. +poll(pollTimeoutMs) + } + getRecordFromFetchedData(offset, untilOffset) +} catch { + case e: OffsetOutOfRangeException => +logWarning(s"Cannot fetch offset $offset. Some data may be lost. Recovering from " + + "the beginning offset", e) +advanceToBeginningOffsetAndFetch(offset, untilOffset, pollTimeoutMs) +} + } + + /** + * Try to advance to the beginning offset and fetch again. `beginningOffset` should be in + * `[offset, untilOffset]`. If not, it will try to fetch `offset` again if it's in + * `[beginningOffset, latestOffset)`. Otherwise, it will return null and reset the pre-fetched + * data. + */ + private def advanceToBeginningOffsetAndFetch( + offset: Long, + untilOffset: Long, + pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = { +val beginningOffset = getBeginningOffset() +if (beginningOffset <= offset) { + val latestOffset = getLatestOffset() + if (latestOffset <= offset) { +// beginningOffset <= latestOffset - 1 < offset <= untilOffset - 1 +logWarning(s"Offset ${offset} is later than the latest offset $latestOffset. " + + s"Skipped [$offset, $untilOffset)") +reset() +null + } else { +// beginningOffset <= offset <= min(latestOffset - 1, untilOffset - 1) +// +// This will happen when a topic is deleted and recreated, and new data are pushed very fast +// , then we will see `offset` disappears first then appears again. Although the parameters +// are same, the state in Kafka cluster is changed, so it's not an endless loop. +// +// In addition, the stack here won't be deep unless the user keeps deleting and creating the +// topic very fast. +// +// Therefore, this recursive call is safe. +getAndIgnoreLostData(offset, untilOffset, pollTimeoutMs) + } +} else { + if (beginningOffset >= untilOffset) { +// offset <= untilOffset - 1 < beginningOffset +logWarning(s"Buffer miss for $groupId $topicPartition [$offset, $untilOffset)") +reset() +
[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/15820#discussion_r88137338 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala --- @@ -83,6 +86,139 @@ private[kafka010] case class CachedKafkaConsumer private( record } + /** + * Get the record at the `offset`. If it doesn't exist, try to get the earliest record in + * `[offset, untilOffset)`. + */ + def getAndIgnoreLostData( + offset: Long, + untilOffset: Long, + pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = { +// scalastyle:off +// When `failOnDataLoss` is `false`, we need to handle the following cases (note: untilOffset and latestOffset are exclusive): +// 1. Some data are aged out, and `offset < beginningOffset <= untilOffset - 1 <= latestOffset - 1` +// Seek to the beginningOffset and fetch the data. +// 2. Some data are aged out, and `offset <= untilOffset - 1 < beginningOffset`. +// There is nothing to fetch, return null. +// 3. The topic is deleted. +// There is nothing to fetch, return null. +// 4. The topic is deleted and recreated, and `beginningOffset <= offset <= untilOffset - 1 <= latestOffset - 1`. +// We cannot detect this case. We can still fetch data like nothing happens. +// 5. The topic is deleted and recreated, and `beginningOffset <= offset < latestOffset - 1 < untilOffset - 1`. +// Same as 4. +// 6. The topic is deleted and recreated, and `beginningOffset <= latestOffset - 1 < offset <= untilOffset - 1`. +// There is nothing to fetch, return null. +// 7. The topic is deleted and recreated, and `offset < beginningOffset <= untilOffset - 1`. +// Same as 1. +// 8. The topic is deleted and recreated, and `offset <= untilOffset - 1 < beginningOffset`. +// There is nothing to fetch, return null. +// scalastyle:on +require(offset < untilOffset, s"offset: $offset, untilOffset: $untilOffset") +logDebug(s"Get $groupId $topicPartition nextOffset $nextOffsetInFetchedData requested $offset") +try { + if (offset != nextOffsetInFetchedData) { +logInfo(s"Initial fetch for $topicPartition $offset") +seek(offset) +poll(pollTimeoutMs) + } else if (!fetchedData.hasNext()) { +// The last pre-fetched data has been drained. +poll(pollTimeoutMs) + } + getRecordFromFetchedData(offset, untilOffset) +} catch { + case e: OffsetOutOfRangeException => +logWarning(s"Cannot fetch offset $offset. Some data may be lost. Recovering from " + + "the beginning offset", e) +advanceToBeginningOffsetAndFetch(offset, untilOffset, pollTimeoutMs) +} + } + + /** + * Try to advance to the beginning offset and fetch again. `beginningOffset` should be in + * `[offset, untilOffset]`. If not, it will try to fetch `offset` again if it's in + * `[beginningOffset, latestOffset)`. Otherwise, it will return null and reset the pre-fetched + * data. + */ + private def advanceToBeginningOffsetAndFetch( + offset: Long, + untilOffset: Long, + pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = { +val beginningOffset = getBeginningOffset() +if (beginningOffset <= offset) { + val latestOffset = getLatestOffset() + if (latestOffset <= offset) { +// beginningOffset <= latestOffset - 1 < offset <= untilOffset - 1 +logWarning(s"Offset ${offset} is later than the latest offset $latestOffset. " + + s"Skipped [$offset, $untilOffset)") +reset() +null + } else { +// beginningOffset <= offset <= min(latestOffset - 1, untilOffset - 1) +// +// This will happen when a topic is deleted and recreated, and new data are pushed very fast +// , then we will see `offset` disappears first then appears again. Although the parameters +// are same, the state in Kafka cluster is changed, so it's not an endless loop. +// +// In addition, the stack here won't be deep unless the user keeps deleting and creating the +// topic very fast. +// +// Therefore, this recursive call is safe. +getAndIgnoreLostData(offset, untilOffset, pollTimeoutMs) + } +} else { + if (beginningOffset >= untilOffset) { +// offset <= untilOffset - 1 < beginningOffset +logWarning(s"Buffer miss for $groupId $topicPartition [$offset, $untilOffset)") +reset() +
[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/15820#discussion_r88140639 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala --- @@ -83,6 +86,139 @@ private[kafka010] case class CachedKafkaConsumer private( record } + /** + * Get the record at the `offset`. If it doesn't exist, try to get the earliest record in + * `[offset, untilOffset)`. + */ + def getAndIgnoreLostData( + offset: Long, + untilOffset: Long, + pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = { +// scalastyle:off +// When `failOnDataLoss` is `false`, we need to handle the following cases (note: untilOffset and latestOffset are exclusive): +// 1. Some data are aged out, and `offset < beginningOffset <= untilOffset - 1 <= latestOffset - 1` +// Seek to the beginningOffset and fetch the data. +// 2. Some data are aged out, and `offset <= untilOffset - 1 < beginningOffset`. +// There is nothing to fetch, return null. +// 3. The topic is deleted. +// There is nothing to fetch, return null. +// 4. The topic is deleted and recreated, and `beginningOffset <= offset <= untilOffset - 1 <= latestOffset - 1`. +// We cannot detect this case. We can still fetch data like nothing happens. +// 5. The topic is deleted and recreated, and `beginningOffset <= offset < latestOffset - 1 < untilOffset - 1`. +// Same as 4. +// 6. The topic is deleted and recreated, and `beginningOffset <= latestOffset - 1 < offset <= untilOffset - 1`. +// There is nothing to fetch, return null. +// 7. The topic is deleted and recreated, and `offset < beginningOffset <= untilOffset - 1`. +// Same as 1. +// 8. The topic is deleted and recreated, and `offset <= untilOffset - 1 < beginningOffset`. +// There is nothing to fetch, return null. +// scalastyle:on +require(offset < untilOffset, s"offset: $offset, untilOffset: $untilOffset") +logDebug(s"Get $groupId $topicPartition nextOffset $nextOffsetInFetchedData requested $offset") +try { + if (offset != nextOffsetInFetchedData) { +logInfo(s"Initial fetch for $topicPartition $offset") +seek(offset) +poll(pollTimeoutMs) + } else if (!fetchedData.hasNext()) { +// The last pre-fetched data has been drained. +poll(pollTimeoutMs) + } + getRecordFromFetchedData(offset, untilOffset) +} catch { + case e: OffsetOutOfRangeException => +logWarning(s"Cannot fetch offset $offset. Some data may be lost. Recovering from " + + "the beginning offset", e) +advanceToBeginningOffsetAndFetch(offset, untilOffset, pollTimeoutMs) +} + } + + /** + * Try to advance to the beginning offset and fetch again. `beginningOffset` should be in + * `[offset, untilOffset]`. If not, it will try to fetch `offset` again if it's in + * `[beginningOffset, latestOffset)`. Otherwise, it will return null and reset the pre-fetched + * data. + */ + private def advanceToBeginningOffsetAndFetch( + offset: Long, + untilOffset: Long, + pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = { +val beginningOffset = getBeginningOffset() +if (beginningOffset <= offset) { + val latestOffset = getLatestOffset() + if (latestOffset <= offset) { +// beginningOffset <= latestOffset - 1 < offset <= untilOffset - 1 +logWarning(s"Offset ${offset} is later than the latest offset $latestOffset. " + + s"Skipped [$offset, $untilOffset)") +reset() +null + } else { +// beginningOffset <= offset <= min(latestOffset - 1, untilOffset - 1) +// +// This will happen when a topic is deleted and recreated, and new data are pushed very fast +// , then we will see `offset` disappears first then appears again. Although the parameters +// are same, the state in Kafka cluster is changed, so it's not an endless loop. +// +// In addition, the stack here won't be deep unless the user keeps deleting and creating the +// topic very fast. +// +// Therefore, this recursive call is safe. +getAndIgnoreLostData(offset, untilOffset, pollTimeoutMs) + } +} else { + if (beginningOffset >= untilOffset) { +// offset <= untilOffset - 1 < beginningOffset --- End diff -- also improve printed warning. gave an example in another comment. --- If your project is set up
[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/15820#discussion_r88139536 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala --- @@ -83,6 +86,139 @@ private[kafka010] case class CachedKafkaConsumer private( record } + /** + * Get the record at the `offset`. If it doesn't exist, try to get the earliest record in + * `[offset, untilOffset)`. + */ + def getAndIgnoreLostData( + offset: Long, + untilOffset: Long, + pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = { +// scalastyle:off +// When `failOnDataLoss` is `false`, we need to handle the following cases (note: untilOffset and latestOffset are exclusive): +// 1. Some data are aged out, and `offset < beginningOffset <= untilOffset - 1 <= latestOffset - 1` +// Seek to the beginningOffset and fetch the data. +// 2. Some data are aged out, and `offset <= untilOffset - 1 < beginningOffset`. +// There is nothing to fetch, return null. +// 3. The topic is deleted. +// There is nothing to fetch, return null. +// 4. The topic is deleted and recreated, and `beginningOffset <= offset <= untilOffset - 1 <= latestOffset - 1`. +// We cannot detect this case. We can still fetch data like nothing happens. +// 5. The topic is deleted and recreated, and `beginningOffset <= offset < latestOffset - 1 < untilOffset - 1`. +// Same as 4. +// 6. The topic is deleted and recreated, and `beginningOffset <= latestOffset - 1 < offset <= untilOffset - 1`. +// There is nothing to fetch, return null. +// 7. The topic is deleted and recreated, and `offset < beginningOffset <= untilOffset - 1`. +// Same as 1. +// 8. The topic is deleted and recreated, and `offset <= untilOffset - 1 < beginningOffset`. +// There is nothing to fetch, return null. +// scalastyle:on +require(offset < untilOffset, s"offset: $offset, untilOffset: $untilOffset") +logDebug(s"Get $groupId $topicPartition nextOffset $nextOffsetInFetchedData requested $offset") +try { + if (offset != nextOffsetInFetchedData) { +logInfo(s"Initial fetch for $topicPartition $offset") +seek(offset) +poll(pollTimeoutMs) + } else if (!fetchedData.hasNext()) { --- End diff -- can these two conditions be merged as ``` if (offset != nextOffsetInFetchedData || !fetchedData.hasNext()) { seek(offset) poll(pollTimeoutMs) } ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/15820#discussion_r88137586 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala --- @@ -83,6 +86,139 @@ private[kafka010] case class CachedKafkaConsumer private( record } + /** + * Get the record at the `offset`. If it doesn't exist, try to get the earliest record in + * `[offset, untilOffset)`. + */ + def getAndIgnoreLostData( + offset: Long, + untilOffset: Long, + pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = { +// scalastyle:off +// When `failOnDataLoss` is `false`, we need to handle the following cases (note: untilOffset and latestOffset are exclusive): +// 1. Some data are aged out, and `offset < beginningOffset <= untilOffset - 1 <= latestOffset - 1` +// Seek to the beginningOffset and fetch the data. +// 2. Some data are aged out, and `offset <= untilOffset - 1 < beginningOffset`. +// There is nothing to fetch, return null. +// 3. The topic is deleted. +// There is nothing to fetch, return null. +// 4. The topic is deleted and recreated, and `beginningOffset <= offset <= untilOffset - 1 <= latestOffset - 1`. +// We cannot detect this case. We can still fetch data like nothing happens. +// 5. The topic is deleted and recreated, and `beginningOffset <= offset < latestOffset - 1 < untilOffset - 1`. +// Same as 4. +// 6. The topic is deleted and recreated, and `beginningOffset <= latestOffset - 1 < offset <= untilOffset - 1`. +// There is nothing to fetch, return null. +// 7. The topic is deleted and recreated, and `offset < beginningOffset <= untilOffset - 1`. +// Same as 1. +// 8. The topic is deleted and recreated, and `offset <= untilOffset - 1 < beginningOffset`. +// There is nothing to fetch, return null. +// scalastyle:on +require(offset < untilOffset, s"offset: $offset, untilOffset: $untilOffset") +logDebug(s"Get $groupId $topicPartition nextOffset $nextOffsetInFetchedData requested $offset") +try { + if (offset != nextOffsetInFetchedData) { +logInfo(s"Initial fetch for $topicPartition $offset") +seek(offset) +poll(pollTimeoutMs) + } else if (!fetchedData.hasNext()) { +// The last pre-fetched data has been drained. +poll(pollTimeoutMs) + } + getRecordFromFetchedData(offset, untilOffset) +} catch { + case e: OffsetOutOfRangeException => +logWarning(s"Cannot fetch offset $offset. Some data may be lost. Recovering from " + + "the beginning offset", e) +advanceToBeginningOffsetAndFetch(offset, untilOffset, pollTimeoutMs) +} + } + + /** + * Try to advance to the beginning offset and fetch again. `beginningOffset` should be in + * `[offset, untilOffset]`. If not, it will try to fetch `offset` again if it's in + * `[beginningOffset, latestOffset)`. Otherwise, it will return null and reset the pre-fetched + * data. + */ + private def advanceToBeginningOffsetAndFetch( + offset: Long, + untilOffset: Long, + pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = { +val beginningOffset = getBeginningOffset() +if (beginningOffset <= offset) { + val latestOffset = getLatestOffset() + if (latestOffset <= offset) { +// beginningOffset <= latestOffset - 1 < offset <= untilOffset - 1 +logWarning(s"Offset ${offset} is later than the latest offset $latestOffset. " + + s"Skipped [$offset, $untilOffset)") +reset() +null + } else { +// beginningOffset <= offset <= min(latestOffset - 1, untilOffset - 1) +// +// This will happen when a topic is deleted and recreated, and new data are pushed very fast +// , then we will see `offset` disappears first then appears again. Although the parameters +// are same, the state in Kafka cluster is changed, so it's not an endless loop. +// +// In addition, the stack here won't be deep unless the user keeps deleting and creating the +// topic very fast. +// +// Therefore, this recursive call is safe. +getAndIgnoreLostData(offset, untilOffset, pollTimeoutMs) + } +} else { + if (beginningOffset >= untilOffset) { +// offset <= untilOffset - 1 < beginningOffset +logWarning(s"Buffer miss for $groupId $topicPartition [$offset, $untilOffset)") +reset() +
[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/15820#discussion_r88137989 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala --- @@ -83,6 +86,139 @@ private[kafka010] case class CachedKafkaConsumer private( record } + /** + * Get the record at the `offset`. If it doesn't exist, try to get the earliest record in + * `[offset, untilOffset)`. + */ + def getAndIgnoreLostData( --- End diff -- Cant these two code paths be merged?? I envision that there should be a single method which takes a parameter "failOnDataLoss", and the method would try to get the next fetched offset and [throw error] OR [retry with warning] based on that flag. I think its good simplify the code further for future maintainability. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/15820#discussion_r88135087 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala --- @@ -83,6 +86,139 @@ private[kafka010] case class CachedKafkaConsumer private( record } + /** + * Get the record at the `offset`. If it doesn't exist, try to get the earliest record in + * `[offset, untilOffset)`. + */ + def getAndIgnoreLostData( + offset: Long, + untilOffset: Long, + pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = { +// scalastyle:off +// When `failOnDataLoss` is `false`, we need to handle the following cases (note: untilOffset and latestOffset are exclusive): +// 1. Some data are aged out, and `offset < beginningOffset <= untilOffset - 1 <= latestOffset - 1` +// Seek to the beginningOffset and fetch the data. +// 2. Some data are aged out, and `offset <= untilOffset - 1 < beginningOffset`. +// There is nothing to fetch, return null. +// 3. The topic is deleted. +// There is nothing to fetch, return null. +// 4. The topic is deleted and recreated, and `beginningOffset <= offset <= untilOffset - 1 <= latestOffset - 1`. +// We cannot detect this case. We can still fetch data like nothing happens. +// 5. The topic is deleted and recreated, and `beginningOffset <= offset < latestOffset - 1 < untilOffset - 1`. +// Same as 4. +// 6. The topic is deleted and recreated, and `beginningOffset <= latestOffset - 1 < offset <= untilOffset - 1`. +// There is nothing to fetch, return null. +// 7. The topic is deleted and recreated, and `offset < beginningOffset <= untilOffset - 1`. +// Same as 1. +// 8. The topic is deleted and recreated, and `offset <= untilOffset - 1 < beginningOffset`. +// There is nothing to fetch, return null. +// scalastyle:on +require(offset < untilOffset, s"offset: $offset, untilOffset: $untilOffset") +logDebug(s"Get $groupId $topicPartition nextOffset $nextOffsetInFetchedData requested $offset") +try { + if (offset != nextOffsetInFetchedData) { +logInfo(s"Initial fetch for $topicPartition $offset") +seek(offset) +poll(pollTimeoutMs) + } else if (!fetchedData.hasNext()) { +// The last pre-fetched data has been drained. +poll(pollTimeoutMs) + } + getRecordFromFetchedData(offset, untilOffset) +} catch { + case e: OffsetOutOfRangeException => +logWarning(s"Cannot fetch offset $offset. Some data may be lost. Recovering from " + + "the beginning offset", e) +advanceToBeginningOffsetAndFetch(offset, untilOffset, pollTimeoutMs) +} + } + + /** + * Try to advance to the beginning offset and fetch again. `beginningOffset` should be in + * `[offset, untilOffset]`. If not, it will try to fetch `offset` again if it's in + * `[beginningOffset, latestOffset)`. Otherwise, it will return null and reset the pre-fetched + * data. + */ + private def advanceToBeginningOffsetAndFetch( + offset: Long, + untilOffset: Long, + pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = { +val beginningOffset = getBeginningOffset() +if (beginningOffset <= offset) { + val latestOffset = getLatestOffset() + if (latestOffset <= offset) { +// beginningOffset <= latestOffset - 1 < offset <= untilOffset - 1 +logWarning(s"Offset ${offset} is later than the latest offset $latestOffset. " + + s"Skipped [$offset, $untilOffset)") +reset() +null + } else { +// beginningOffset <= offset <= min(latestOffset - 1, untilOffset - 1) +// +// This will happen when a topic is deleted and recreated, and new data are pushed very fast +// , then we will see `offset` disappears first then appears again. Although the parameters +// are same, the state in Kafka cluster is changed, so it's not an endless loop. +// +// In addition, the stack here won't be deep unless the user keeps deleting and creating the +// topic very fast. +// +// Therefore, this recursive call is safe. +getAndIgnoreLostData(offset, untilOffset, pollTimeoutMs) + } +} else { + if (beginningOffset >= untilOffset) { +// offset <= untilOffset - 1 < beginningOffset +logWarning(s"Buffer miss for $groupId $topicPartition [$offset, $untilOffset)") +reset() +
[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/15820#discussion_r88140370 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala --- @@ -83,6 +86,139 @@ private[kafka010] case class CachedKafkaConsumer private( record } + /** + * Get the record at the `offset`. If it doesn't exist, try to get the earliest record in + * `[offset, untilOffset)`. + */ + def getAndIgnoreLostData( + offset: Long, + untilOffset: Long, + pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = { +// scalastyle:off +// When `failOnDataLoss` is `false`, we need to handle the following cases (note: untilOffset and latestOffset are exclusive): +// 1. Some data are aged out, and `offset < beginningOffset <= untilOffset - 1 <= latestOffset - 1` +// Seek to the beginningOffset and fetch the data. +// 2. Some data are aged out, and `offset <= untilOffset - 1 < beginningOffset`. +// There is nothing to fetch, return null. +// 3. The topic is deleted. +// There is nothing to fetch, return null. +// 4. The topic is deleted and recreated, and `beginningOffset <= offset <= untilOffset - 1 <= latestOffset - 1`. +// We cannot detect this case. We can still fetch data like nothing happens. +// 5. The topic is deleted and recreated, and `beginningOffset <= offset < latestOffset - 1 < untilOffset - 1`. +// Same as 4. +// 6. The topic is deleted and recreated, and `beginningOffset <= latestOffset - 1 < offset <= untilOffset - 1`. +// There is nothing to fetch, return null. +// 7. The topic is deleted and recreated, and `offset < beginningOffset <= untilOffset - 1`. +// Same as 1. +// 8. The topic is deleted and recreated, and `offset <= untilOffset - 1 < beginningOffset`. +// There is nothing to fetch, return null. +// scalastyle:on +require(offset < untilOffset, s"offset: $offset, untilOffset: $untilOffset") +logDebug(s"Get $groupId $topicPartition nextOffset $nextOffsetInFetchedData requested $offset") +try { + if (offset != nextOffsetInFetchedData) { +logInfo(s"Initial fetch for $topicPartition $offset") +seek(offset) +poll(pollTimeoutMs) + } else if (!fetchedData.hasNext()) { +// The last pre-fetched data has been drained. +poll(pollTimeoutMs) + } + getRecordFromFetchedData(offset, untilOffset) +} catch { + case e: OffsetOutOfRangeException => +logWarning(s"Cannot fetch offset $offset. Some data may be lost. Recovering from " + + "the beginning offset", e) +advanceToBeginningOffsetAndFetch(offset, untilOffset, pollTimeoutMs) +} + } + + /** + * Try to advance to the beginning offset and fetch again. `beginningOffset` should be in + * `[offset, untilOffset]`. If not, it will try to fetch `offset` again if it's in + * `[beginningOffset, latestOffset)`. Otherwise, it will return null and reset the pre-fetched + * data. + */ + private def advanceToBeginningOffsetAndFetch( + offset: Long, + untilOffset: Long, + pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = { +val beginningOffset = getBeginningOffset() +if (beginningOffset <= offset) { + val latestOffset = getLatestOffset() + if (latestOffset <= offset) { +// beginningOffset <= latestOffset - 1 < offset <= untilOffset - 1 +logWarning(s"Offset ${offset} is later than the latest offset $latestOffset. " + + s"Skipped [$offset, $untilOffset)") +reset() +null + } else { +// beginningOffset <= offset <= min(latestOffset - 1, untilOffset - 1) +// +// This will happen when a topic is deleted and recreated, and new data are pushed very fast +// , then we will see `offset` disappears first then appears again. Although the parameters +// are same, the state in Kafka cluster is changed, so it's not an endless loop. +// +// In addition, the stack here won't be deep unless the user keeps deleting and creating the +// topic very fast. +// +// Therefore, this recursive call is safe. +getAndIgnoreLostData(offset, untilOffset, pollTimeoutMs) + } +} else { + if (beginningOffset >= untilOffset) { +// offset <= untilOffset - 1 < beginningOffset --- End diff -- improve docs by addition plain word explanation. e.g. required offset is earlier than the available
[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/15820#discussion_r88135655 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala --- @@ -83,6 +86,139 @@ private[kafka010] case class CachedKafkaConsumer private( record } + /** + * Get the record at the `offset`. If it doesn't exist, try to get the earliest record in + * `[offset, untilOffset)`. + */ + def getAndIgnoreLostData( + offset: Long, + untilOffset: Long, + pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = { +// scalastyle:off +// When `failOnDataLoss` is `false`, we need to handle the following cases (note: untilOffset and latestOffset are exclusive): +// 1. Some data are aged out, and `offset < beginningOffset <= untilOffset - 1 <= latestOffset - 1` +// Seek to the beginningOffset and fetch the data. +// 2. Some data are aged out, and `offset <= untilOffset - 1 < beginningOffset`. +// There is nothing to fetch, return null. +// 3. The topic is deleted. +// There is nothing to fetch, return null. +// 4. The topic is deleted and recreated, and `beginningOffset <= offset <= untilOffset - 1 <= latestOffset - 1`. +// We cannot detect this case. We can still fetch data like nothing happens. +// 5. The topic is deleted and recreated, and `beginningOffset <= offset < latestOffset - 1 < untilOffset - 1`. +// Same as 4. +// 6. The topic is deleted and recreated, and `beginningOffset <= latestOffset - 1 < offset <= untilOffset - 1`. +// There is nothing to fetch, return null. +// 7. The topic is deleted and recreated, and `offset < beginningOffset <= untilOffset - 1`. +// Same as 1. +// 8. The topic is deleted and recreated, and `offset <= untilOffset - 1 < beginningOffset`. +// There is nothing to fetch, return null. +// scalastyle:on +require(offset < untilOffset, s"offset: $offset, untilOffset: $untilOffset") +logDebug(s"Get $groupId $topicPartition nextOffset $nextOffsetInFetchedData requested $offset") +try { + if (offset != nextOffsetInFetchedData) { +logInfo(s"Initial fetch for $topicPartition $offset") +seek(offset) +poll(pollTimeoutMs) + } else if (!fetchedData.hasNext()) { +// The last pre-fetched data has been drained. +poll(pollTimeoutMs) + } + getRecordFromFetchedData(offset, untilOffset) +} catch { + case e: OffsetOutOfRangeException => +logWarning(s"Cannot fetch offset $offset. Some data may be lost. Recovering from " + + "the beginning offset", e) +advanceToBeginningOffsetAndFetch(offset, untilOffset, pollTimeoutMs) +} + } + + /** + * Try to advance to the beginning offset and fetch again. `beginningOffset` should be in + * `[offset, untilOffset]`. If not, it will try to fetch `offset` again if it's in + * `[beginningOffset, latestOffset)`. Otherwise, it will return null and reset the pre-fetched + * data. + */ + private def advanceToBeginningOffsetAndFetch( + offset: Long, + untilOffset: Long, + pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = { +val beginningOffset = getBeginningOffset() +if (beginningOffset <= offset) { + val latestOffset = getLatestOffset() + if (latestOffset <= offset) { +// beginningOffset <= latestOffset - 1 < offset <= untilOffset - 1 +logWarning(s"Offset ${offset} is later than the latest offset $latestOffset. " + + s"Skipped [$offset, $untilOffset)") +reset() +null + } else { +// beginningOffset <= offset <= min(latestOffset - 1, untilOffset - 1) +// +// This will happen when a topic is deleted and recreated, and new data are pushed very fast +// , then we will see `offset` disappears first then appears again. Although the parameters +// are same, the state in Kafka cluster is changed, so it's not an endless loop. +// +// In addition, the stack here won't be deep unless the user keeps deleting and creating the +// topic very fast. +// +// Therefore, this recursive call is safe. +getAndIgnoreLostData(offset, untilOffset, pollTimeoutMs) + } +} else { + if (beginningOffset >= untilOffset) { +// offset <= untilOffset - 1 < beginningOffset +logWarning(s"Buffer miss for $groupId $topicPartition [$offset, $untilOffset)") +reset() +
[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/15820#discussion_r88136863 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala --- @@ -83,6 +86,139 @@ private[kafka010] case class CachedKafkaConsumer private( record } + /** + * Get the record at the `offset`. If it doesn't exist, try to get the earliest record in + * `[offset, untilOffset)`. + */ + def getAndIgnoreLostData( + offset: Long, + untilOffset: Long, + pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = { +// scalastyle:off +// When `failOnDataLoss` is `false`, we need to handle the following cases (note: untilOffset and latestOffset are exclusive): --- End diff -- Okay i can guess that you used beginningOffset because of consumer.seekToBeginning. But then we should be consistent with seekToEnd as well by calling latest offset as endOffset. Rather, lets be consistent with more well known names "earliest" and "latest". --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/15820#discussion_r88139654 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala --- @@ -83,6 +86,139 @@ private[kafka010] case class CachedKafkaConsumer private( record } + /** + * Get the record at the `offset`. If it doesn't exist, try to get the earliest record in + * `[offset, untilOffset)`. + */ + def getAndIgnoreLostData( + offset: Long, + untilOffset: Long, + pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = { +// scalastyle:off +// When `failOnDataLoss` is `false`, we need to handle the following cases (note: untilOffset and latestOffset are exclusive): +// 1. Some data are aged out, and `offset < beginningOffset <= untilOffset - 1 <= latestOffset - 1` +// Seek to the beginningOffset and fetch the data. +// 2. Some data are aged out, and `offset <= untilOffset - 1 < beginningOffset`. +// There is nothing to fetch, return null. +// 3. The topic is deleted. +// There is nothing to fetch, return null. +// 4. The topic is deleted and recreated, and `beginningOffset <= offset <= untilOffset - 1 <= latestOffset - 1`. +// We cannot detect this case. We can still fetch data like nothing happens. +// 5. The topic is deleted and recreated, and `beginningOffset <= offset < latestOffset - 1 < untilOffset - 1`. +// Same as 4. +// 6. The topic is deleted and recreated, and `beginningOffset <= latestOffset - 1 < offset <= untilOffset - 1`. +// There is nothing to fetch, return null. +// 7. The topic is deleted and recreated, and `offset < beginningOffset <= untilOffset - 1`. +// Same as 1. +// 8. The topic is deleted and recreated, and `offset <= untilOffset - 1 < beginningOffset`. +// There is nothing to fetch, return null. +// scalastyle:on +require(offset < untilOffset, s"offset: $offset, untilOffset: $untilOffset") +logDebug(s"Get $groupId $topicPartition nextOffset $nextOffsetInFetchedData requested $offset") +try { + if (offset != nextOffsetInFetchedData) { +logInfo(s"Initial fetch for $topicPartition $offset") +seek(offset) +poll(pollTimeoutMs) + } else if (!fetchedData.hasNext()) { +// The last pre-fetched data has been drained. +poll(pollTimeoutMs) + } + getRecordFromFetchedData(offset, untilOffset) +} catch { + case e: OffsetOutOfRangeException => +logWarning(s"Cannot fetch offset $offset. Some data may be lost. Recovering from " + + "the beginning offset", e) +advanceToBeginningOffsetAndFetch(offset, untilOffset, pollTimeoutMs) +} + } + + /** + * Try to advance to the beginning offset and fetch again. `beginningOffset` should be in + * `[offset, untilOffset]`. If not, it will try to fetch `offset` again if it's in + * `[beginningOffset, latestOffset)`. Otherwise, it will return null and reset the pre-fetched + * data. + */ + private def advanceToBeginningOffsetAndFetch( + offset: Long, + untilOffset: Long, + pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = { +val beginningOffset = getBeginningOffset() +if (beginningOffset <= offset) { + val latestOffset = getLatestOffset() + if (latestOffset <= offset) { +// beginningOffset <= latestOffset - 1 < offset <= untilOffset - 1 +logWarning(s"Offset ${offset} is later than the latest offset $latestOffset. " + + s"Skipped [$offset, $untilOffset)") +reset() +null + } else { +// beginningOffset <= offset <= min(latestOffset - 1, untilOffset - 1) +// +// This will happen when a topic is deleted and recreated, and new data are pushed very fast +// , then we will see `offset` disappears first then appears again. Although the parameters --- End diff -- nit: comma after new line --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/15820#discussion_r88134704 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala --- @@ -83,6 +86,139 @@ private[kafka010] case class CachedKafkaConsumer private( record } + /** + * Get the record at the `offset`. If it doesn't exist, try to get the earliest record in + * `[offset, untilOffset)`. + */ + def getAndIgnoreLostData( + offset: Long, + untilOffset: Long, + pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = { +// scalastyle:off +// When `failOnDataLoss` is `false`, we need to handle the following cases (note: untilOffset and latestOffset are exclusive): +// 1. Some data are aged out, and `offset < beginningOffset <= untilOffset - 1 <= latestOffset - 1` +// Seek to the beginningOffset and fetch the data. +// 2. Some data are aged out, and `offset <= untilOffset - 1 < beginningOffset`. +// There is nothing to fetch, return null. +// 3. The topic is deleted. +// There is nothing to fetch, return null. +// 4. The topic is deleted and recreated, and `beginningOffset <= offset <= untilOffset - 1 <= latestOffset - 1`. +// We cannot detect this case. We can still fetch data like nothing happens. --- End diff -- nit: nothing happened. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/15820#discussion_r88137647 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala --- @@ -83,6 +86,139 @@ private[kafka010] case class CachedKafkaConsumer private( record } + /** + * Get the record at the `offset`. If it doesn't exist, try to get the earliest record in + * `[offset, untilOffset)`. + */ + def getAndIgnoreLostData( + offset: Long, + untilOffset: Long, + pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = { +// scalastyle:off +// When `failOnDataLoss` is `false`, we need to handle the following cases (note: untilOffset and latestOffset are exclusive): +// 1. Some data are aged out, and `offset < beginningOffset <= untilOffset - 1 <= latestOffset - 1` +// Seek to the beginningOffset and fetch the data. +// 2. Some data are aged out, and `offset <= untilOffset - 1 < beginningOffset`. +// There is nothing to fetch, return null. +// 3. The topic is deleted. +// There is nothing to fetch, return null. +// 4. The topic is deleted and recreated, and `beginningOffset <= offset <= untilOffset - 1 <= latestOffset - 1`. +// We cannot detect this case. We can still fetch data like nothing happens. +// 5. The topic is deleted and recreated, and `beginningOffset <= offset < latestOffset - 1 < untilOffset - 1`. +// Same as 4. +// 6. The topic is deleted and recreated, and `beginningOffset <= latestOffset - 1 < offset <= untilOffset - 1`. +// There is nothing to fetch, return null. +// 7. The topic is deleted and recreated, and `offset < beginningOffset <= untilOffset - 1`. +// Same as 1. +// 8. The topic is deleted and recreated, and `offset <= untilOffset - 1 < beginningOffset`. +// There is nothing to fetch, return null. +// scalastyle:on +require(offset < untilOffset, s"offset: $offset, untilOffset: $untilOffset") +logDebug(s"Get $groupId $topicPartition nextOffset $nextOffsetInFetchedData requested $offset") +try { + if (offset != nextOffsetInFetchedData) { +logInfo(s"Initial fetch for $topicPartition $offset") +seek(offset) +poll(pollTimeoutMs) + } else if (!fetchedData.hasNext()) { +// The last pre-fetched data has been drained. +poll(pollTimeoutMs) + } + getRecordFromFetchedData(offset, untilOffset) +} catch { + case e: OffsetOutOfRangeException => +logWarning(s"Cannot fetch offset $offset. Some data may be lost. Recovering from " + + "the beginning offset", e) +advanceToBeginningOffsetAndFetch(offset, untilOffset, pollTimeoutMs) +} + } + + /** + * Try to advance to the beginning offset and fetch again. `beginningOffset` should be in + * `[offset, untilOffset]`. If not, it will try to fetch `offset` again if it's in + * `[beginningOffset, latestOffset)`. Otherwise, it will return null and reset the pre-fetched + * data. + */ + private def advanceToBeginningOffsetAndFetch( + offset: Long, + untilOffset: Long, + pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = { +val beginningOffset = getBeginningOffset() +if (beginningOffset <= offset) { + val latestOffset = getLatestOffset() + if (latestOffset <= offset) { +// beginningOffset <= latestOffset - 1 < offset <= untilOffset - 1 +logWarning(s"Offset ${offset} is later than the latest offset $latestOffset. " + + s"Skipped [$offset, $untilOffset)") +reset() +null + } else { +// beginningOffset <= offset <= min(latestOffset - 1, untilOffset - 1) +// +// This will happen when a topic is deleted and recreated, and new data are pushed very fast +// , then we will see `offset` disappears first then appears again. Although the parameters +// are same, the state in Kafka cluster is changed, so it's not an endless loop. +// +// In addition, the stack here won't be deep unless the user keeps deleting and creating the +// topic very fast. +// +// Therefore, this recursive call is safe. +getAndIgnoreLostData(offset, untilOffset, pollTimeoutMs) + } +} else { + if (beginningOffset >= untilOffset) { +// offset <= untilOffset - 1 < beginningOffset +logWarning(s"Buffer miss for $groupId $topicPartition [$offset, $untilOffset)") +reset() +
[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/15820#discussion_r88134526 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala --- @@ -83,6 +86,139 @@ private[kafka010] case class CachedKafkaConsumer private( record } + /** + * Get the record at the `offset`. If it doesn't exist, try to get the earliest record in + * `[offset, untilOffset)`. + */ + def getAndIgnoreLostData( + offset: Long, + untilOffset: Long, + pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = { +// scalastyle:off +// When `failOnDataLoss` is `false`, we need to handle the following cases (note: untilOffset and latestOffset are exclusive): --- End diff -- `latestOffset` means the latest offset that we get from the Kafka. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/15820#discussion_r87527804 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala --- @@ -83,6 +86,129 @@ private[kafka010] case class CachedKafkaConsumer private( record } + /** + * Get the record at the `offset`. If it doesn't exist, try to get the earliest record in + * `[offset, untilOffset)`. + */ + def getAndIgnoreLostData( + offset: Long, + untilOffset: Long, + pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = { +// scalastyle:off +// When `failOnDataLoss` is `false`, we need to handle the following cases (note: untilOffset and latestOffset are exclusive): +// 1. Some data are aged out, and `offset < beginningOffset <= untilOffset - 1 <= latestOffset - 1` +// Seek to the beginningOffset and fetch the data. +// 2. Some data are aged out, and `offset <= untilOffset - 1 < beginningOffset`. +// There is nothing to fetch, return null. +// 3. The topic is deleted. +// There is nothing to fetch, return null. +// 4. The topic is deleted and recreated, and `beginningOffset <= offset <= untilOffset - 1 <= latestOffset - 1`. +// We cannot detect this case. We can still fetch data like nothing happens. +// 5. The topic is deleted and recreated, and `beginningOffset <= offset < latestOffset - 1 < untilOffset - 1`. +// Same as 4. +// 6. The topic is deleted and recreated, and `beginningOffset <= latestOffset - 1 < offset <= untilOffset - 1`. +// There is nothing to fetch, return null. +// 7. The topic is deleted and recreated, and `offset < beginningOffset <= untilOffset - 1`. +// Same as 1. +// 8. The topic is deleted and recreated, and `offset <= untilOffset - 1 < beginningOffset`. +// There is nothing to fetch, return null. +// scalastyle:on +require(offset < untilOffset, s"offset: $offset, untilOffset: $untilOffset") +logDebug(s"Get $groupId $topicPartition nextOffset $nextOffsetInFetchedData requested $offset") +try { + if (offset != nextOffsetInFetchedData) { +logInfo(s"Initial fetch for $topicPartition $offset") +seek(offset) +poll(pollTimeoutMs) + } else if (!fetchedData.hasNext()) { +// The last pre-fetched data has been drained. +poll(pollTimeoutMs) + } + getRecordFromFetchedData(offset, untilOffset) +} catch { + case e: OffsetOutOfRangeException => +logWarning(s"Cannot fetch offset $offset, try to recover from the beginning offset", e) +advanceToBeginningOffsetAndFetch(offset, untilOffset, pollTimeoutMs) +} + } + + /** + * Try to advance to the beginning offset and fetch again. `beginningOffset` should be in + * `[offset, untilOffset]`. If not, it will try to fetch `offset` again if it's in + * `[beginningOffset, latestOffset)`. Otherwise, it will return null and reset the pre-fetched + * data. + */ + private def advanceToBeginningOffsetAndFetch( + offset: Long, + untilOffset: Long, + pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = { +val beginningOffset = getBeginningOffset() +if (beginningOffset <= offset) { + val latestOffset = getLatestOffset() + if (latestOffset <= offset) { +// beginningOffset <= latestOffset - 1 < offset <= untilOffset - 1 +logWarning(s"Offset ${offset} is later than the latest offset $latestOffset. " + + s"Skipped [$offset, $untilOffset)") +reset() +null + } else { +// beginningOffset <= offset <= min(latestOffset - 1, untilOffset - 1) +getAndIgnoreLostData(offset, untilOffset, pollTimeoutMs) + } +} else { + if (beginningOffset >= untilOffset) { +// offset <= untilOffset - 1 < beginningOffset +logWarning(s"Buffer miss for $groupId $topicPartition [$offset, $untilOffset)") +reset() +null + } else { +// offset < beginningOffset <= untilOffset - 1 +logWarning(s"Buffer miss for $groupId $topicPartition [$offset, $beginningOffset)") +getAndIgnoreLostData(beginningOffset, untilOffset, pollTimeoutMs) + } +} + } + + /** + * Get the earliest record in [offset, untilOffset) from the fetched data. If there is no such + * record, returns null. Must be called after `poll`. + */ + private def getRecordFromFetchedData( + offset: Long, + untilOffset: Long):
[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...
Github user huitseeker commented on a diff in the pull request: https://github.com/apache/spark/pull/15820#discussion_r87483402 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala --- @@ -83,6 +86,129 @@ private[kafka010] case class CachedKafkaConsumer private( record } + /** + * Get the record at the `offset`. If it doesn't exist, try to get the earliest record in + * `[offset, untilOffset)`. + */ + def getAndIgnoreLostData( + offset: Long, + untilOffset: Long, + pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = { +// scalastyle:off +// When `failOnDataLoss` is `false`, we need to handle the following cases (note: untilOffset and latestOffset are exclusive): +// 1. Some data are aged out, and `offset < beginningOffset <= untilOffset - 1 <= latestOffset - 1` +// Seek to the beginningOffset and fetch the data. +// 2. Some data are aged out, and `offset <= untilOffset - 1 < beginningOffset`. +// There is nothing to fetch, return null. +// 3. The topic is deleted. +// There is nothing to fetch, return null. +// 4. The topic is deleted and recreated, and `beginningOffset <= offset <= untilOffset - 1 <= latestOffset - 1`. +// We cannot detect this case. We can still fetch data like nothing happens. +// 5. The topic is deleted and recreated, and `beginningOffset <= offset < latestOffset - 1 < untilOffset - 1`. +// Same as 4. +// 6. The topic is deleted and recreated, and `beginningOffset <= latestOffset - 1 < offset <= untilOffset - 1`. +// There is nothing to fetch, return null. +// 7. The topic is deleted and recreated, and `offset < beginningOffset <= untilOffset - 1`. +// Same as 1. +// 8. The topic is deleted and recreated, and `offset <= untilOffset - 1 < beginningOffset`. +// There is nothing to fetch, return null. +// scalastyle:on +require(offset < untilOffset, s"offset: $offset, untilOffset: $untilOffset") +logDebug(s"Get $groupId $topicPartition nextOffset $nextOffsetInFetchedData requested $offset") +try { + if (offset != nextOffsetInFetchedData) { +logInfo(s"Initial fetch for $topicPartition $offset") +seek(offset) +poll(pollTimeoutMs) + } else if (!fetchedData.hasNext()) { +// The last pre-fetched data has been drained. +poll(pollTimeoutMs) + } + getRecordFromFetchedData(offset, untilOffset) +} catch { + case e: OffsetOutOfRangeException => +logWarning(s"Cannot fetch offset $offset, try to recover from the beginning offset", e) +advanceToBeginningOffsetAndFetch(offset, untilOffset, pollTimeoutMs) +} + } + + /** + * Try to advance to the beginning offset and fetch again. `beginningOffset` should be in + * `[offset, untilOffset]`. If not, it will try to fetch `offset` again if it's in + * `[beginningOffset, latestOffset)`. Otherwise, it will return null and reset the pre-fetched + * data. + */ + private def advanceToBeginningOffsetAndFetch( + offset: Long, + untilOffset: Long, + pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = { +val beginningOffset = getBeginningOffset() +if (beginningOffset <= offset) { + val latestOffset = getLatestOffset() + if (latestOffset <= offset) { +// beginningOffset <= latestOffset - 1 < offset <= untilOffset - 1 +logWarning(s"Offset ${offset} is later than the latest offset $latestOffset. " + + s"Skipped [$offset, $untilOffset)") +reset() +null + } else { +// beginningOffset <= offset <= min(latestOffset - 1, untilOffset - 1) +getAndIgnoreLostData(offset, untilOffset, pollTimeoutMs) --- End diff -- +1 Especially since with the loss of the use of `@tailrec`, this must now prove it will terminate within a limited stack size, and should prove it will under most stack size configurations. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/15820#discussion_r87450274 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala --- @@ -83,6 +86,129 @@ private[kafka010] case class CachedKafkaConsumer private( record } + /** + * Get the record at the `offset`. If it doesn't exist, try to get the earliest record in + * `[offset, untilOffset)`. + */ + def getAndIgnoreLostData( + offset: Long, + untilOffset: Long, + pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = { +// scalastyle:off +// When `failOnDataLoss` is `false`, we need to handle the following cases (note: untilOffset and latestOffset are exclusive): +// 1. Some data are aged out, and `offset < beginningOffset <= untilOffset - 1 <= latestOffset - 1` +// Seek to the beginningOffset and fetch the data. +// 2. Some data are aged out, and `offset <= untilOffset - 1 < beginningOffset`. +// There is nothing to fetch, return null. +// 3. The topic is deleted. +// There is nothing to fetch, return null. +// 4. The topic is deleted and recreated, and `beginningOffset <= offset <= untilOffset - 1 <= latestOffset - 1`. +// We cannot detect this case. We can still fetch data like nothing happens. +// 5. The topic is deleted and recreated, and `beginningOffset <= offset < latestOffset - 1 < untilOffset - 1`. +// Same as 4. +// 6. The topic is deleted and recreated, and `beginningOffset <= latestOffset - 1 < offset <= untilOffset - 1`. +// There is nothing to fetch, return null. +// 7. The topic is deleted and recreated, and `offset < beginningOffset <= untilOffset - 1`. +// Same as 1. +// 8. The topic is deleted and recreated, and `offset <= untilOffset - 1 < beginningOffset`. +// There is nothing to fetch, return null. +// scalastyle:on +require(offset < untilOffset, s"offset: $offset, untilOffset: $untilOffset") +logDebug(s"Get $groupId $topicPartition nextOffset $nextOffsetInFetchedData requested $offset") +try { + if (offset != nextOffsetInFetchedData) { +logInfo(s"Initial fetch for $topicPartition $offset") +seek(offset) +poll(pollTimeoutMs) + } else if (!fetchedData.hasNext()) { +// The last pre-fetched data has been drained. +poll(pollTimeoutMs) + } + getRecordFromFetchedData(offset, untilOffset) +} catch { + case e: OffsetOutOfRangeException => +logWarning(s"Cannot fetch offset $offset, try to recover from the beginning offset", e) +advanceToBeginningOffsetAndFetch(offset, untilOffset, pollTimeoutMs) +} + } + + /** + * Try to advance to the beginning offset and fetch again. `beginningOffset` should be in + * `[offset, untilOffset]`. If not, it will try to fetch `offset` again if it's in + * `[beginningOffset, latestOffset)`. Otherwise, it will return null and reset the pre-fetched + * data. + */ + private def advanceToBeginningOffsetAndFetch( + offset: Long, + untilOffset: Long, + pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = { +val beginningOffset = getBeginningOffset() +if (beginningOffset <= offset) { + val latestOffset = getLatestOffset() + if (latestOffset <= offset) { +// beginningOffset <= latestOffset - 1 < offset <= untilOffset - 1 +logWarning(s"Offset ${offset} is later than the latest offset $latestOffset. " + + s"Skipped [$offset, $untilOffset)") +reset() +null + } else { +// beginningOffset <= offset <= min(latestOffset - 1, untilOffset - 1) +getAndIgnoreLostData(offset, untilOffset, pollTimeoutMs) + } +} else { + if (beginningOffset >= untilOffset) { +// offset <= untilOffset - 1 < beginningOffset +logWarning(s"Buffer miss for $groupId $topicPartition [$offset, $untilOffset)") +reset() +null + } else { +// offset < beginningOffset <= untilOffset - 1 +logWarning(s"Buffer miss for $groupId $topicPartition [$offset, $beginningOffset)") +getAndIgnoreLostData(beginningOffset, untilOffset, pollTimeoutMs) + } +} + } + + /** + * Get the earliest record in [offset, untilOffset) from the fetched data. If there is no such + * record, returns null. Must be called after `poll`. + */ + private def getRecordFromFetchedData( + offset: Long, + untilOffset: Long):
[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/15820#discussion_r87439798 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala --- @@ -83,6 +86,129 @@ private[kafka010] case class CachedKafkaConsumer private( record } + /** + * Get the record at the `offset`. If it doesn't exist, try to get the earliest record in + * `[offset, untilOffset)`. + */ + def getAndIgnoreLostData( + offset: Long, + untilOffset: Long, + pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = { +// scalastyle:off +// When `failOnDataLoss` is `false`, we need to handle the following cases (note: untilOffset and latestOffset are exclusive): +// 1. Some data are aged out, and `offset < beginningOffset <= untilOffset - 1 <= latestOffset - 1` +// Seek to the beginningOffset and fetch the data. +// 2. Some data are aged out, and `offset <= untilOffset - 1 < beginningOffset`. +// There is nothing to fetch, return null. +// 3. The topic is deleted. +// There is nothing to fetch, return null. +// 4. The topic is deleted and recreated, and `beginningOffset <= offset <= untilOffset - 1 <= latestOffset - 1`. +// We cannot detect this case. We can still fetch data like nothing happens. +// 5. The topic is deleted and recreated, and `beginningOffset <= offset < latestOffset - 1 < untilOffset - 1`. +// Same as 4. +// 6. The topic is deleted and recreated, and `beginningOffset <= latestOffset - 1 < offset <= untilOffset - 1`. +// There is nothing to fetch, return null. +// 7. The topic is deleted and recreated, and `offset < beginningOffset <= untilOffset - 1`. +// Same as 1. +// 8. The topic is deleted and recreated, and `offset <= untilOffset - 1 < beginningOffset`. +// There is nothing to fetch, return null. +// scalastyle:on +require(offset < untilOffset, s"offset: $offset, untilOffset: $untilOffset") +logDebug(s"Get $groupId $topicPartition nextOffset $nextOffsetInFetchedData requested $offset") +try { + if (offset != nextOffsetInFetchedData) { +logInfo(s"Initial fetch for $topicPartition $offset") +seek(offset) +poll(pollTimeoutMs) + } else if (!fetchedData.hasNext()) { +// The last pre-fetched data has been drained. +poll(pollTimeoutMs) + } + getRecordFromFetchedData(offset, untilOffset) +} catch { + case e: OffsetOutOfRangeException => +logWarning(s"Cannot fetch offset $offset, try to recover from the beginning offset", e) +advanceToBeginningOffsetAndFetch(offset, untilOffset, pollTimeoutMs) +} + } + + /** + * Try to advance to the beginning offset and fetch again. `beginningOffset` should be in + * `[offset, untilOffset]`. If not, it will try to fetch `offset` again if it's in + * `[beginningOffset, latestOffset)`. Otherwise, it will return null and reset the pre-fetched + * data. + */ + private def advanceToBeginningOffsetAndFetch( + offset: Long, + untilOffset: Long, + pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = { +val beginningOffset = getBeginningOffset() +if (beginningOffset <= offset) { + val latestOffset = getLatestOffset() + if (latestOffset <= offset) { +// beginningOffset <= latestOffset - 1 < offset <= untilOffset - 1 +logWarning(s"Offset ${offset} is later than the latest offset $latestOffset. " + + s"Skipped [$offset, $untilOffset)") +reset() +null + } else { +// beginningOffset <= offset <= min(latestOffset - 1, untilOffset - 1) +getAndIgnoreLostData(offset, untilOffset, pollTimeoutMs) + } +} else { + if (beginningOffset >= untilOffset) { +// offset <= untilOffset - 1 < beginningOffset +logWarning(s"Buffer miss for $groupId $topicPartition [$offset, $untilOffset)") +reset() +null + } else { +// offset < beginningOffset <= untilOffset - 1 +logWarning(s"Buffer miss for $groupId $topicPartition [$offset, $beginningOffset)") +getAndIgnoreLostData(beginningOffset, untilOffset, pollTimeoutMs) + } +} + } + + /** + * Get the earliest record in [offset, untilOffset) from the fetched data. If there is no such + * record, returns null. Must be called after `poll`. + */ + private def getRecordFromFetchedData( + offset: Long, + untilOffset: Long):
[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/15820#discussion_r87438222 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala --- @@ -83,6 +86,129 @@ private[kafka010] case class CachedKafkaConsumer private( record } + /** + * Get the record at the `offset`. If it doesn't exist, try to get the earliest record in + * `[offset, untilOffset)`. + */ + def getAndIgnoreLostData( + offset: Long, + untilOffset: Long, + pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = { +// scalastyle:off +// When `failOnDataLoss` is `false`, we need to handle the following cases (note: untilOffset and latestOffset are exclusive): +// 1. Some data are aged out, and `offset < beginningOffset <= untilOffset - 1 <= latestOffset - 1` +// Seek to the beginningOffset and fetch the data. +// 2. Some data are aged out, and `offset <= untilOffset - 1 < beginningOffset`. +// There is nothing to fetch, return null. +// 3. The topic is deleted. +// There is nothing to fetch, return null. +// 4. The topic is deleted and recreated, and `beginningOffset <= offset <= untilOffset - 1 <= latestOffset - 1`. +// We cannot detect this case. We can still fetch data like nothing happens. +// 5. The topic is deleted and recreated, and `beginningOffset <= offset < latestOffset - 1 < untilOffset - 1`. +// Same as 4. +// 6. The topic is deleted and recreated, and `beginningOffset <= latestOffset - 1 < offset <= untilOffset - 1`. +// There is nothing to fetch, return null. +// 7. The topic is deleted and recreated, and `offset < beginningOffset <= untilOffset - 1`. +// Same as 1. +// 8. The topic is deleted and recreated, and `offset <= untilOffset - 1 < beginningOffset`. +// There is nothing to fetch, return null. +// scalastyle:on +require(offset < untilOffset, s"offset: $offset, untilOffset: $untilOffset") +logDebug(s"Get $groupId $topicPartition nextOffset $nextOffsetInFetchedData requested $offset") +try { + if (offset != nextOffsetInFetchedData) { +logInfo(s"Initial fetch for $topicPartition $offset") +seek(offset) +poll(pollTimeoutMs) + } else if (!fetchedData.hasNext()) { +// The last pre-fetched data has been drained. +poll(pollTimeoutMs) + } + getRecordFromFetchedData(offset, untilOffset) +} catch { + case e: OffsetOutOfRangeException => +logWarning(s"Cannot fetch offset $offset, try to recover from the beginning offset", e) +advanceToBeginningOffsetAndFetch(offset, untilOffset, pollTimeoutMs) +} + } + + /** + * Try to advance to the beginning offset and fetch again. `beginningOffset` should be in + * `[offset, untilOffset]`. If not, it will try to fetch `offset` again if it's in + * `[beginningOffset, latestOffset)`. Otherwise, it will return null and reset the pre-fetched + * data. + */ + private def advanceToBeginningOffsetAndFetch( + offset: Long, + untilOffset: Long, + pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = { +val beginningOffset = getBeginningOffset() +if (beginningOffset <= offset) { + val latestOffset = getLatestOffset() + if (latestOffset <= offset) { +// beginningOffset <= latestOffset - 1 < offset <= untilOffset - 1 +logWarning(s"Offset ${offset} is later than the latest offset $latestOffset. " + + s"Skipped [$offset, $untilOffset)") +reset() +null + } else { +// beginningOffset <= offset <= min(latestOffset - 1, untilOffset - 1) +getAndIgnoreLostData(offset, untilOffset, pollTimeoutMs) --- End diff -- I'm clearer on why this terminates, but I think it's worth a comment, since it's a mutually recursive call without changing arguments. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/15820#discussion_r87438788 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala --- @@ -83,6 +86,129 @@ private[kafka010] case class CachedKafkaConsumer private( record } + /** + * Get the record at the `offset`. If it doesn't exist, try to get the earliest record in + * `[offset, untilOffset)`. + */ + def getAndIgnoreLostData( + offset: Long, + untilOffset: Long, + pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = { +// scalastyle:off +// When `failOnDataLoss` is `false`, we need to handle the following cases (note: untilOffset and latestOffset are exclusive): +// 1. Some data are aged out, and `offset < beginningOffset <= untilOffset - 1 <= latestOffset - 1` +// Seek to the beginningOffset and fetch the data. +// 2. Some data are aged out, and `offset <= untilOffset - 1 < beginningOffset`. +// There is nothing to fetch, return null. +// 3. The topic is deleted. +// There is nothing to fetch, return null. +// 4. The topic is deleted and recreated, and `beginningOffset <= offset <= untilOffset - 1 <= latestOffset - 1`. +// We cannot detect this case. We can still fetch data like nothing happens. +// 5. The topic is deleted and recreated, and `beginningOffset <= offset < latestOffset - 1 < untilOffset - 1`. +// Same as 4. +// 6. The topic is deleted and recreated, and `beginningOffset <= latestOffset - 1 < offset <= untilOffset - 1`. +// There is nothing to fetch, return null. +// 7. The topic is deleted and recreated, and `offset < beginningOffset <= untilOffset - 1`. +// Same as 1. +// 8. The topic is deleted and recreated, and `offset <= untilOffset - 1 < beginningOffset`. +// There is nothing to fetch, return null. +// scalastyle:on +require(offset < untilOffset, s"offset: $offset, untilOffset: $untilOffset") +logDebug(s"Get $groupId $topicPartition nextOffset $nextOffsetInFetchedData requested $offset") +try { + if (offset != nextOffsetInFetchedData) { +logInfo(s"Initial fetch for $topicPartition $offset") +seek(offset) +poll(pollTimeoutMs) + } else if (!fetchedData.hasNext()) { +// The last pre-fetched data has been drained. +poll(pollTimeoutMs) + } + getRecordFromFetchedData(offset, untilOffset) +} catch { + case e: OffsetOutOfRangeException => +logWarning(s"Cannot fetch offset $offset, try to recover from the beginning offset", e) --- End diff -- I think it's worth the warning explicitly stating that data has been lost --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/15820#discussion_r87129126 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala --- @@ -83,6 +86,113 @@ private[kafka010] case class CachedKafkaConsumer private( record } + @tailrec + final def getAndIgnoreLostData( + offset: Long, + untilOffset: Long, + pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = { +// scalastyle:off +// When `failOnDataLoss` is `false`, we need to handle the following cases (note: untilOffset and latestOffset are exclusive): +// 1. Some data are aged out, and `offset < beginningOffset <= untilOffset - 1 <= latestOffset - 1` +// Seek to the beginningOffset and fetch the data. +// 2. Some data are aged out, and `offset <= untilOffset - 1 < beginningOffset`. +// There is nothing to fetch, return null. +// 3. The topic is deleted. +// There is nothing to fetch, return null. +// 4. The topic is deleted and recreated, and `beginningOffset <= offset <= untilOffset - 1 <= latestOffset - 1`. +// We cannot detect this case. We can still fetch data like nothing happens. +// 5. The topic is deleted and recreated, and `beginningOffset <= offset < latestOffset - 1 < untilOffset - 1`. +// Same as 4. +// 6. The topic is deleted and recreated, and `beginningOffset <= latestOffset - 1 < offset <= untilOffset - 1`. +// There is nothing to fetch, return null. +// 7. The topic is deleted and recreated, and `offset < beginningOffset <= untilOffset - 1`. +// Same as 1. +// 8. The topic is deleted and recreated, and `offset <= untilOffset - 1 < beginningOffset`. +// There is nothing to fetch, return null. +// scalastyle:on +if (offset >= untilOffset) { + // Case 2 or 8 + // We seek to beginningOffset but beginningOffset >= untilOffset + reset() + return null +} + +logDebug(s"Get $groupId $topicPartition nextOffset $nextOffsetInFetchedData requested $offset") +var outOfOffset = false +if (offset != nextOffsetInFetchedData) { + logInfo(s"Initial fetch for $topicPartition $offset") + seek(offset) + try { +poll(pollTimeoutMs) + } catch { +case e: OffsetOutOfRangeException => + logWarning(s"Cannot fetch offset $offset, try to recover from the beginning offset", e) + outOfOffset = true + } +} else if (!fetchedData.hasNext()) { + // The last pre-fetched data has been drained. + seek(offset) + try { +poll(pollTimeoutMs) + } catch { +case e: OffsetOutOfRangeException => + logWarning(s"Cannot fetch offset $offset, try to recover from the beginning offset", e) + outOfOffset = true + } +} +if (outOfOffset) { + val beginningOffset = getBeginningOffset() + if (beginningOffset <= offset) { +val latestOffset = getLatestOffset() +if (latestOffset <= offset) { + // Case 3 or 6 + logWarning(s"Offset ${offset} is later than the latest offset $latestOffset. " + +s"Skipped [$offset, $untilOffset)") + reset() + return null +} else { + // Case 4 or 5 + getAndIgnoreLostData(offset, untilOffset, pollTimeoutMs) --- End diff -- - Why is this not an early return? - The arguments to the recursive function have not changed at this point, right? Why does it terminate? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/15820#discussion_r87129981 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala --- @@ -83,6 +86,113 @@ private[kafka010] case class CachedKafkaConsumer private( record } + @tailrec + final def getAndIgnoreLostData( + offset: Long, + untilOffset: Long, + pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = { +// scalastyle:off +// When `failOnDataLoss` is `false`, we need to handle the following cases (note: untilOffset and latestOffset are exclusive): +// 1. Some data are aged out, and `offset < beginningOffset <= untilOffset - 1 <= latestOffset - 1` +// Seek to the beginningOffset and fetch the data. +// 2. Some data are aged out, and `offset <= untilOffset - 1 < beginningOffset`. +// There is nothing to fetch, return null. +// 3. The topic is deleted. +// There is nothing to fetch, return null. +// 4. The topic is deleted and recreated, and `beginningOffset <= offset <= untilOffset - 1 <= latestOffset - 1`. +// We cannot detect this case. We can still fetch data like nothing happens. +// 5. The topic is deleted and recreated, and `beginningOffset <= offset < latestOffset - 1 < untilOffset - 1`. +// Same as 4. +// 6. The topic is deleted and recreated, and `beginningOffset <= latestOffset - 1 < offset <= untilOffset - 1`. +// There is nothing to fetch, return null. +// 7. The topic is deleted and recreated, and `offset < beginningOffset <= untilOffset - 1`. +// Same as 1. +// 8. The topic is deleted and recreated, and `offset <= untilOffset - 1 < beginningOffset`. +// There is nothing to fetch, return null. +// scalastyle:on +if (offset >= untilOffset) { + // Case 2 or 8 + // We seek to beginningOffset but beginningOffset >= untilOffset + reset() + return null +} + +logDebug(s"Get $groupId $topicPartition nextOffset $nextOffsetInFetchedData requested $offset") +var outOfOffset = false --- End diff -- Can this var be eliminated by just using a single try around the if / else? It's the same catch condition in either case --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/15820#discussion_r87129817 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala --- @@ -83,6 +86,113 @@ private[kafka010] case class CachedKafkaConsumer private( record } + @tailrec + final def getAndIgnoreLostData( + offset: Long, + untilOffset: Long, + pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = { +// scalastyle:off +// When `failOnDataLoss` is `false`, we need to handle the following cases (note: untilOffset and latestOffset are exclusive): +// 1. Some data are aged out, and `offset < beginningOffset <= untilOffset - 1 <= latestOffset - 1` +// Seek to the beginningOffset and fetch the data. +// 2. Some data are aged out, and `offset <= untilOffset - 1 < beginningOffset`. +// There is nothing to fetch, return null. +// 3. The topic is deleted. +// There is nothing to fetch, return null. +// 4. The topic is deleted and recreated, and `beginningOffset <= offset <= untilOffset - 1 <= latestOffset - 1`. +// We cannot detect this case. We can still fetch data like nothing happens. +// 5. The topic is deleted and recreated, and `beginningOffset <= offset < latestOffset - 1 < untilOffset - 1`. +// Same as 4. +// 6. The topic is deleted and recreated, and `beginningOffset <= latestOffset - 1 < offset <= untilOffset - 1`. +// There is nothing to fetch, return null. +// 7. The topic is deleted and recreated, and `offset < beginningOffset <= untilOffset - 1`. +// Same as 1. +// 8. The topic is deleted and recreated, and `offset <= untilOffset - 1 < beginningOffset`. +// There is nothing to fetch, return null. +// scalastyle:on +if (offset >= untilOffset) { + // Case 2 or 8 + // We seek to beginningOffset but beginningOffset >= untilOffset + reset() + return null +} + +logDebug(s"Get $groupId $topicPartition nextOffset $nextOffsetInFetchedData requested $offset") +var outOfOffset = false +if (offset != nextOffsetInFetchedData) { + logInfo(s"Initial fetch for $topicPartition $offset") + seek(offset) + try { +poll(pollTimeoutMs) + } catch { +case e: OffsetOutOfRangeException => + logWarning(s"Cannot fetch offset $offset, try to recover from the beginning offset", e) + outOfOffset = true + } +} else if (!fetchedData.hasNext()) { + // The last pre-fetched data has been drained. + seek(offset) --- End diff -- I don't think it's necessary to seek every time the fetched data is empty, in normal operation the poll should return the next offset, right? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/15820#discussion_r87129927 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala --- @@ -83,6 +86,113 @@ private[kafka010] case class CachedKafkaConsumer private( record } + @tailrec + final def getAndIgnoreLostData( + offset: Long, + untilOffset: Long, + pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = { +// scalastyle:off +// When `failOnDataLoss` is `false`, we need to handle the following cases (note: untilOffset and latestOffset are exclusive): +// 1. Some data are aged out, and `offset < beginningOffset <= untilOffset - 1 <= latestOffset - 1` +// Seek to the beginningOffset and fetch the data. +// 2. Some data are aged out, and `offset <= untilOffset - 1 < beginningOffset`. +// There is nothing to fetch, return null. +// 3. The topic is deleted. +// There is nothing to fetch, return null. +// 4. The topic is deleted and recreated, and `beginningOffset <= offset <= untilOffset - 1 <= latestOffset - 1`. +// We cannot detect this case. We can still fetch data like nothing happens. +// 5. The topic is deleted and recreated, and `beginningOffset <= offset < latestOffset - 1 < untilOffset - 1`. +// Same as 4. +// 6. The topic is deleted and recreated, and `beginningOffset <= latestOffset - 1 < offset <= untilOffset - 1`. +// There is nothing to fetch, return null. +// 7. The topic is deleted and recreated, and `offset < beginningOffset <= untilOffset - 1`. +// Same as 1. +// 8. The topic is deleted and recreated, and `offset <= untilOffset - 1 < beginningOffset`. +// There is nothing to fetch, return null. +// scalastyle:on +if (offset >= untilOffset) { + // Case 2 or 8 + // We seek to beginningOffset but beginningOffset >= untilOffset + reset() + return null +} + +logDebug(s"Get $groupId $topicPartition nextOffset $nextOffsetInFetchedData requested $offset") +var outOfOffset = false +if (offset != nextOffsetInFetchedData) { + logInfo(s"Initial fetch for $topicPartition $offset") + seek(offset) + try { +poll(pollTimeoutMs) + } catch { +case e: OffsetOutOfRangeException => + logWarning(s"Cannot fetch offset $offset, try to recover from the beginning offset", e) + outOfOffset = true + } +} else if (!fetchedData.hasNext()) { + // The last pre-fetched data has been drained. + seek(offset) + try { +poll(pollTimeoutMs) + } catch { +case e: OffsetOutOfRangeException => + logWarning(s"Cannot fetch offset $offset, try to recover from the beginning offset", e) + outOfOffset = true + } +} +if (outOfOffset) { + val beginningOffset = getBeginningOffset() + if (beginningOffset <= offset) { +val latestOffset = getLatestOffset() +if (latestOffset <= offset) { + // Case 3 or 6 + logWarning(s"Offset ${offset} is later than the latest offset $latestOffset. " + +s"Skipped [$offset, $untilOffset)") + reset() + return null +} else { + // Case 4 or 5 + getAndIgnoreLostData(offset, untilOffset, pollTimeoutMs) +} + } else { +// Case 1 or 7 +logWarning(s"Buffer miss for $groupId $topicPartition [$offset, $beginningOffset})") +return getAndIgnoreLostData(beginningOffset, untilOffset, pollTimeoutMs) + } +} else { + if (!fetchedData.hasNext()) { +// We cannot fetch anything after `polling`. Two possible cases: +// - `beginningOffset` is `offset` but there is nothing for `beginningOffset` right now. +// - Cannot fetch any date before timeout. +// Because there is no way to distinguish, just skip the rest offsets in the current +// partition. +logWarning(s"Buffer miss for $groupId $topicPartition [$offset, $untilOffset)") +reset() +return null + } + + val record = fetchedData.next() + if (record.offset >= untilOffset) { +// Case 2 +logWarning(s"Buffer miss for $groupId $topicPartition [$offset, $untilOffset)") +reset() +return null + } else { +if (record.offset != offset) { + // Case 1 + logWarning(s"Buffer miss for $groupId $topicPartition [$offset,
[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/15820#discussion_r87127811 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala --- @@ -83,6 +86,113 @@ private[kafka010] case class CachedKafkaConsumer private( record } + @tailrec + final def getAndIgnoreLostData( + offset: Long, + untilOffset: Long, + pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = { +// scalastyle:off +// When `failOnDataLoss` is `false`, we need to handle the following cases (note: untilOffset and latestOffset are exclusive): +// 1. Some data are aged out, and `offset < beginningOffset <= untilOffset - 1 <= latestOffset - 1` +// Seek to the beginningOffset and fetch the data. +// 2. Some data are aged out, and `offset <= untilOffset - 1 < beginningOffset`. +// There is nothing to fetch, return null. +// 3. The topic is deleted. +// There is nothing to fetch, return null. +// 4. The topic is deleted and recreated, and `beginningOffset <= offset <= untilOffset - 1 <= latestOffset - 1`. +// We cannot detect this case. We can still fetch data like nothing happens. +// 5. The topic is deleted and recreated, and `beginningOffset <= offset < latestOffset - 1 < untilOffset - 1`. +// Same as 4. +// 6. The topic is deleted and recreated, and `beginningOffset <= latestOffset - 1 < offset <= untilOffset - 1`. +// There is nothing to fetch, return null. +// 7. The topic is deleted and recreated, and `offset < beginningOffset <= untilOffset - 1`. +// Same as 1. +// 8. The topic is deleted and recreated, and `offset <= untilOffset - 1 < beginningOffset`. +// There is nothing to fetch, return null. +// scalastyle:on +if (offset >= untilOffset) { + // Case 2 or 8 + // We seek to beginningOffset but beginningOffset >= untilOffset + reset() + return null +} + +logDebug(s"Get $groupId $topicPartition nextOffset $nextOffsetInFetchedData requested $offset") +var outOfOffset = false +if (offset != nextOffsetInFetchedData) { + logInfo(s"Initial fetch for $topicPartition $offset") + seek(offset) + try { +poll(pollTimeoutMs) + } catch { +case e: OffsetOutOfRangeException => + logWarning(s"Cannot fetch offset $offset, try to recover from the beginning offset", e) + outOfOffset = true + } +} else if (!fetchedData.hasNext()) { + // The last pre-fetched data has been drained. + seek(offset) + try { +poll(pollTimeoutMs) + } catch { +case e: OffsetOutOfRangeException => + logWarning(s"Cannot fetch offset $offset, try to recover from the beginning offset", e) + outOfOffset = true + } +} +if (outOfOffset) { + val beginningOffset = getBeginningOffset() + if (beginningOffset <= offset) { +val latestOffset = getLatestOffset() +if (latestOffset <= offset) { + // Case 3 or 6 + logWarning(s"Offset ${offset} is later than the latest offset $latestOffset. " + +s"Skipped [$offset, $untilOffset)") + reset() + return null +} else { + // Case 4 or 5 + getAndIgnoreLostData(offset, untilOffset, pollTimeoutMs) +} + } else { +// Case 1 or 7 +logWarning(s"Buffer miss for $groupId $topicPartition [$offset, $beginningOffset})") +return getAndIgnoreLostData(beginningOffset, untilOffset, pollTimeoutMs) + } +} else { + if (!fetchedData.hasNext()) { +// We cannot fetch anything after `polling`. Two possible cases: +// - `beginningOffset` is `offset` but there is nothing for `beginningOffset` right now. +// - Cannot fetch any date before timeout. +// Because there is no way to distinguish, just skip the rest offsets in the current +// partition. +logWarning(s"Buffer miss for $groupId $topicPartition [$offset, $untilOffset)") +reset() +return null + } + + val record = fetchedData.next() + if (record.offset >= untilOffset) { +// Case 2 +logWarning(s"Buffer miss for $groupId $topicPartition [$offset, $untilOffset)") +reset() +return null + } else { +if (record.offset != offset) { + // Case 1 + logWarning(s"Buffer miss for $groupId $topicPartition [$offset,
[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/15820#discussion_r87130059 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala --- @@ -83,6 +86,113 @@ private[kafka010] case class CachedKafkaConsumer private( record } + @tailrec + final def getAndIgnoreLostData( + offset: Long, + untilOffset: Long, + pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = { +// scalastyle:off +// When `failOnDataLoss` is `false`, we need to handle the following cases (note: untilOffset and latestOffset are exclusive): +// 1. Some data are aged out, and `offset < beginningOffset <= untilOffset - 1 <= latestOffset - 1` +// Seek to the beginningOffset and fetch the data. +// 2. Some data are aged out, and `offset <= untilOffset - 1 < beginningOffset`. +// There is nothing to fetch, return null. +// 3. The topic is deleted. +// There is nothing to fetch, return null. +// 4. The topic is deleted and recreated, and `beginningOffset <= offset <= untilOffset - 1 <= latestOffset - 1`. +// We cannot detect this case. We can still fetch data like nothing happens. +// 5. The topic is deleted and recreated, and `beginningOffset <= offset < latestOffset - 1 < untilOffset - 1`. +// Same as 4. +// 6. The topic is deleted and recreated, and `beginningOffset <= latestOffset - 1 < offset <= untilOffset - 1`. +// There is nothing to fetch, return null. +// 7. The topic is deleted and recreated, and `offset < beginningOffset <= untilOffset - 1`. +// Same as 1. +// 8. The topic is deleted and recreated, and `offset <= untilOffset - 1 < beginningOffset`. +// There is nothing to fetch, return null. +// scalastyle:on +if (offset >= untilOffset) { + // Case 2 or 8 + // We seek to beginningOffset but beginningOffset >= untilOffset + reset() + return null +} + +logDebug(s"Get $groupId $topicPartition nextOffset $nextOffsetInFetchedData requested $offset") +var outOfOffset = false +if (offset != nextOffsetInFetchedData) { + logInfo(s"Initial fetch for $topicPartition $offset") + seek(offset) + try { +poll(pollTimeoutMs) + } catch { +case e: OffsetOutOfRangeException => + logWarning(s"Cannot fetch offset $offset, try to recover from the beginning offset", e) + outOfOffset = true + } +} else if (!fetchedData.hasNext()) { + // The last pre-fetched data has been drained. + seek(offset) + try { +poll(pollTimeoutMs) + } catch { +case e: OffsetOutOfRangeException => + logWarning(s"Cannot fetch offset $offset, try to recover from the beginning offset", e) + outOfOffset = true + } +} +if (outOfOffset) { + val beginningOffset = getBeginningOffset() + if (beginningOffset <= offset) { +val latestOffset = getLatestOffset() +if (latestOffset <= offset) { + // Case 3 or 6 + logWarning(s"Offset ${offset} is later than the latest offset $latestOffset. " + +s"Skipped [$offset, $untilOffset)") + reset() + return null +} else { + // Case 4 or 5 + getAndIgnoreLostData(offset, untilOffset, pollTimeoutMs) --- End diff -- - Why isn't this an early return? - Unless I'm misreading, this is a recursive call without changing the arguments. Why is it guaranteed to terminate? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/15820#discussion_r87128373 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala --- @@ -83,6 +86,113 @@ private[kafka010] case class CachedKafkaConsumer private( record } + @tailrec + final def getAndIgnoreLostData( + offset: Long, + untilOffset: Long, + pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = { +// scalastyle:off +// When `failOnDataLoss` is `false`, we need to handle the following cases (note: untilOffset and latestOffset are exclusive): +// 1. Some data are aged out, and `offset < beginningOffset <= untilOffset - 1 <= latestOffset - 1` +// Seek to the beginningOffset and fetch the data. +// 2. Some data are aged out, and `offset <= untilOffset - 1 < beginningOffset`. +// There is nothing to fetch, return null. +// 3. The topic is deleted. +// There is nothing to fetch, return null. +// 4. The topic is deleted and recreated, and `beginningOffset <= offset <= untilOffset - 1 <= latestOffset - 1`. +// We cannot detect this case. We can still fetch data like nothing happens. +// 5. The topic is deleted and recreated, and `beginningOffset <= offset < latestOffset - 1 < untilOffset - 1`. +// Same as 4. +// 6. The topic is deleted and recreated, and `beginningOffset <= latestOffset - 1 < offset <= untilOffset - 1`. +// There is nothing to fetch, return null. +// 7. The topic is deleted and recreated, and `offset < beginningOffset <= untilOffset - 1`. +// Same as 1. +// 8. The topic is deleted and recreated, and `offset <= untilOffset - 1 < beginningOffset`. +// There is nothing to fetch, return null. +// scalastyle:on +if (offset >= untilOffset) { + // Case 2 or 8 + // We seek to beginningOffset but beginningOffset >= untilOffset + reset() + return null +} + +logDebug(s"Get $groupId $topicPartition nextOffset $nextOffsetInFetchedData requested $offset") +var outOfOffset = false +if (offset != nextOffsetInFetchedData) { + logInfo(s"Initial fetch for $topicPartition $offset") + seek(offset) + try { +poll(pollTimeoutMs) + } catch { +case e: OffsetOutOfRangeException => + logWarning(s"Cannot fetch offset $offset, try to recover from the beginning offset", e) + outOfOffset = true + } +} else if (!fetchedData.hasNext()) { + // The last pre-fetched data has been drained. + seek(offset) + try { +poll(pollTimeoutMs) + } catch { +case e: OffsetOutOfRangeException => + logWarning(s"Cannot fetch offset $offset, try to recover from the beginning offset", e) + outOfOffset = true + } +} +if (outOfOffset) { + val beginningOffset = getBeginningOffset() + if (beginningOffset <= offset) { +val latestOffset = getLatestOffset() +if (latestOffset <= offset) { + // Case 3 or 6 + logWarning(s"Offset ${offset} is later than the latest offset $latestOffset. " + +s"Skipped [$offset, $untilOffset)") + reset() + return null +} else { + // Case 4 or 5 + getAndIgnoreLostData(offset, untilOffset, pollTimeoutMs) +} + } else { +// Case 1 or 7 +logWarning(s"Buffer miss for $groupId $topicPartition [$offset, $beginningOffset})") +return getAndIgnoreLostData(beginningOffset, untilOffset, pollTimeoutMs) + } +} else { + if (!fetchedData.hasNext()) { +// We cannot fetch anything after `polling`. Two possible cases: +// - `beginningOffset` is `offset` but there is nothing for `beginningOffset` right now. +// - Cannot fetch any date before timeout. +// Because there is no way to distinguish, just skip the rest offsets in the current +// partition. +logWarning(s"Buffer miss for $groupId $topicPartition [$offset, $untilOffset)") +reset() +return null + } + + val record = fetchedData.next() + if (record.offset >= untilOffset) { +// Case 2 +logWarning(s"Buffer miss for $groupId $topicPartition [$offset, $untilOffset)") +reset() +return null + } else { +if (record.offset != offset) { + // Case 1 + logWarning(s"Buffer miss for $groupId $topicPartition [$offset,
[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/15820#discussion_r87129204 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala --- @@ -83,6 +86,113 @@ private[kafka010] case class CachedKafkaConsumer private( record } + @tailrec + final def getAndIgnoreLostData( + offset: Long, + untilOffset: Long, + pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = { +// scalastyle:off +// When `failOnDataLoss` is `false`, we need to handle the following cases (note: untilOffset and latestOffset are exclusive): +// 1. Some data are aged out, and `offset < beginningOffset <= untilOffset - 1 <= latestOffset - 1` +// Seek to the beginningOffset and fetch the data. +// 2. Some data are aged out, and `offset <= untilOffset - 1 < beginningOffset`. +// There is nothing to fetch, return null. +// 3. The topic is deleted. +// There is nothing to fetch, return null. +// 4. The topic is deleted and recreated, and `beginningOffset <= offset <= untilOffset - 1 <= latestOffset - 1`. +// We cannot detect this case. We can still fetch data like nothing happens. +// 5. The topic is deleted and recreated, and `beginningOffset <= offset < latestOffset - 1 < untilOffset - 1`. +// Same as 4. +// 6. The topic is deleted and recreated, and `beginningOffset <= latestOffset - 1 < offset <= untilOffset - 1`. +// There is nothing to fetch, return null. +// 7. The topic is deleted and recreated, and `offset < beginningOffset <= untilOffset - 1`. +// Same as 1. +// 8. The topic is deleted and recreated, and `offset <= untilOffset - 1 < beginningOffset`. +// There is nothing to fetch, return null. +// scalastyle:on +if (offset >= untilOffset) { + // Case 2 or 8 + // We seek to beginningOffset but beginningOffset >= untilOffset + reset() + return null +} + +logDebug(s"Get $groupId $topicPartition nextOffset $nextOffsetInFetchedData requested $offset") +var outOfOffset = false --- End diff -- Can't this var be eliminated with a singly try around the following if/else? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...
GitHub user zsxwing opened a pull request: https://github.com/apache/spark/pull/15820 [SPARK-18373][SS][Kafka]Make failOnDataLoss=false work with Spark jobs ## What changes were proposed in this pull request? This PR adds `CachedKafkaConsumer.getAndIgnoreLostData` to handle corner cases of `failOnDataLoss=false`. I listed all kinds of cases we should support in the code comment. ## How was this patch tested? Because I cannot find any way to manually control the Kafka server to clean up logs, it's impossible to write unit tests for each corner cases. Therefore, I just created `test("stress test for failOnDataLoss=false")` which should cover most of corner cases. I also modified some existing tests to test for both `failOnDataLoss=false` and `failOnDataLoss=true` to make sure it doesn't break existing logic. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zsxwing/spark failOnDataLoss Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/15820.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #15820 commit e8eff9ff3b32320ab8d969089cded97e9ec29a52 Author: Shixiong ZhuDate: 2016-10-28T17:46:54Z Make failOnDataLoss=false stable for Spark jobs --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org