[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...

2016-11-21 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/15820#discussion_r89018842
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
 ---
@@ -34,53 +38,211 @@ import org.apache.spark.internal.Logging
 private[kafka010] case class CachedKafkaConsumer private(
 topicPartition: TopicPartition,
 kafkaParams: ju.Map[String, Object]) extends Logging {
+  import CachedKafkaConsumer._
 
   private val groupId = 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
 
-  private val consumer = {
+  private var consumer = createConsumer
+
+  /** Iterator to the already fetch data */
+  private var fetchedData = 
ju.Collections.emptyIterator[ConsumerRecord[Array[Byte], Array[Byte]]]
+  private var nextOffsetInFetchedData = UNKNOWN_OFFSET
+
+  /** Create a KafkaConsumer to fetch records for `topicPartition` */
+  private def createConsumer: KafkaConsumer[Array[Byte], Array[Byte]] = {
 val c = new KafkaConsumer[Array[Byte], Array[Byte]](kafkaParams)
 val tps = new ju.ArrayList[TopicPartition]()
 tps.add(topicPartition)
 c.assign(tps)
 c
   }
 
-  /** Iterator to the already fetch data */
-  private var fetchedData = 
ju.Collections.emptyIterator[ConsumerRecord[Array[Byte], Array[Byte]]]
-  private var nextOffsetInFetchedData = -2L
-
   /**
-   * Get the record for the given offset, waiting up to timeout ms if IO 
is necessary.
-   * Sequential forward access will use buffers, but random access will be 
horribly inefficient.
+   * Get the record for the given offset if available. Otherwise it will 
either throw error
+   * (if failOnDataLoss = true), or return the next available offset 
within [offset, untilOffset).
+   *
+   * @param offset the offset to fetch.
+   * @param untilOffset the max offset to fetch. Exclusive.
+   * @param pollTimeoutMs timeout in milliseconds to poll data from Kafka.
+   * @param failOnDataLoss When `failOnDataLoss` is `true`, this method 
will either return record at
+   *   offset if available, or throw exception.when 
`failOnDataLoss` is `false`,
+   *   this method will either return record at offset 
if available, or return
+   *   the next earliest available record less than 
untilOffset, or null. It
+   *   will not throw any exception.
*/
-  def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[Array[Byte], 
Array[Byte]] = {
+  def get(
--- End diff --

Can you also document that it can return null. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...

2016-11-21 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/15820#discussion_r89020354
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
 ---
@@ -413,13 +451,59 @@ class KafkaSourceSuite extends KafkaSourceTest {
 )
   }
 
+  test("Delete a topic when a Spark job is running") {
--- End diff --

nit: D -> d, for consistency.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...

2016-11-21 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/15820#discussion_r89019523
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
 ---
@@ -34,53 +38,211 @@ import org.apache.spark.internal.Logging
 private[kafka010] case class CachedKafkaConsumer private(
 topicPartition: TopicPartition,
 kafkaParams: ju.Map[String, Object]) extends Logging {
+  import CachedKafkaConsumer._
 
   private val groupId = 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
 
-  private val consumer = {
+  private var consumer = createConsumer
+
+  /** Iterator to the already fetch data */
+  private var fetchedData = 
ju.Collections.emptyIterator[ConsumerRecord[Array[Byte], Array[Byte]]]
+  private var nextOffsetInFetchedData = UNKNOWN_OFFSET
+
+  /** Create a KafkaConsumer to fetch records for `topicPartition` */
+  private def createConsumer: KafkaConsumer[Array[Byte], Array[Byte]] = {
 val c = new KafkaConsumer[Array[Byte], Array[Byte]](kafkaParams)
 val tps = new ju.ArrayList[TopicPartition]()
 tps.add(topicPartition)
 c.assign(tps)
 c
   }
 
-  /** Iterator to the already fetch data */
-  private var fetchedData = 
ju.Collections.emptyIterator[ConsumerRecord[Array[Byte], Array[Byte]]]
-  private var nextOffsetInFetchedData = -2L
-
   /**
-   * Get the record for the given offset, waiting up to timeout ms if IO 
is necessary.
-   * Sequential forward access will use buffers, but random access will be 
horribly inefficient.
+   * Get the record for the given offset if available. Otherwise it will 
either throw error
+   * (if failOnDataLoss = true), or return the next available offset 
within [offset, untilOffset).
+   *
+   * @param offset the offset to fetch.
+   * @param untilOffset the max offset to fetch. Exclusive.
+   * @param pollTimeoutMs timeout in milliseconds to poll data from Kafka.
+   * @param failOnDataLoss When `failOnDataLoss` is `true`, this method 
will either return record at
+   *   offset if available, or throw exception.when 
`failOnDataLoss` is `false`,
+   *   this method will either return record at offset 
if available, or return
+   *   the next earliest available record less than 
untilOffset, or null. It
+   *   will not throw any exception.
*/
-  def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[Array[Byte], 
Array[Byte]] = {
+  def get(
+  offset: Long,
+  untilOffset: Long,
+  pollTimeoutMs: Long,
+  failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = 
{
+require(offset < untilOffset,
+  s"offset must always be less than untilOffset [offset: $offset, 
untilOffset: $untilOffset]")
 logDebug(s"Get $groupId $topicPartition nextOffset 
$nextOffsetInFetchedData requested $offset")
-if (offset != nextOffsetInFetchedData) {
-  logInfo(s"Initial fetch for $topicPartition $offset")
-  seek(offset)
-  poll(pollTimeoutMs)
+// The following loop is basically for `failOnDataLoss = false`. When 
`failOnDataLoss` is
+// `false`, firstly, we will try to fetch the record at `offset`. If 
no such record, then we
+// will move to the next available offset within `[offset, 
untilOffset)` and retry.
+// If `failOnDataLoss` is `true`, the loop body will be executed only 
once.
+var toFetchOffset = offset
+while (toFetchOffset != UNKNOWN_OFFSET) {
+  try {
+return fetchData(toFetchOffset, pollTimeoutMs)
+  } catch {
+case e: OffsetOutOfRangeException =>
+  // When there is some error thrown, it's better to use a new 
consumer to drop all cached
+  // states in the old consumer. We don't need to worry about the 
performance because this
+  // is not a normal path.
--- End diff --

normal --> common 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...

2016-11-21 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/15820#discussion_r89020163
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
 ---
@@ -458,6 +457,21 @@ private[kafka010] case class KafkaSource(
 /** Companion object for the [[KafkaSource]]. */
 private[kafka010] object KafkaSource {
 
+  val INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE =
+"""
+  |There may have been some data loss because some data may have been 
aged out in Kafka or
+  | the topic has been deleted and is therefore unavailable for 
processing. If you want your
--- End diff --

nit: better grammar

Some data may have been lost because they are not available in Kafka any 
more; either the data was aged out by Kafka or the topic may have been deleted 
before all the data in the topic was processed. 

Similarly change below.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...

2016-11-21 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/15820#discussion_r89013168
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
 ---
@@ -34,53 +38,211 @@ import org.apache.spark.internal.Logging
 private[kafka010] case class CachedKafkaConsumer private(
 topicPartition: TopicPartition,
 kafkaParams: ju.Map[String, Object]) extends Logging {
+  import CachedKafkaConsumer._
 
   private val groupId = 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
 
-  private val consumer = {
+  private var consumer = createConsumer
+
+  /** Iterator to the already fetch data */
+  private var fetchedData = 
ju.Collections.emptyIterator[ConsumerRecord[Array[Byte], Array[Byte]]]
+  private var nextOffsetInFetchedData = UNKNOWN_OFFSET
+
+  /** Create a KafkaConsumer to fetch records for `topicPartition` */
+  private def createConsumer: KafkaConsumer[Array[Byte], Array[Byte]] = {
 val c = new KafkaConsumer[Array[Byte], Array[Byte]](kafkaParams)
 val tps = new ju.ArrayList[TopicPartition]()
 tps.add(topicPartition)
 c.assign(tps)
 c
   }
 
-  /** Iterator to the already fetch data */
-  private var fetchedData = 
ju.Collections.emptyIterator[ConsumerRecord[Array[Byte], Array[Byte]]]
-  private var nextOffsetInFetchedData = -2L
-
   /**
-   * Get the record for the given offset, waiting up to timeout ms if IO 
is necessary.
-   * Sequential forward access will use buffers, but random access will be 
horribly inefficient.
+   * Get the record for the given offset if available. Otherwise it will 
either throw error
+   * (if failOnDataLoss = true), or return the next available offset 
within [offset, untilOffset).
+   *
+   * @param offset the offset to fetch.
+   * @param untilOffset the max offset to fetch. Exclusive.
+   * @param pollTimeoutMs timeout in milliseconds to poll data from Kafka.
+   * @param failOnDataLoss When `failOnDataLoss` is `true`, this method 
will either return record at
+   *   offset if available, or throw exception.when 
`failOnDataLoss` is `false`,
+   *   this method will either return record at offset 
if available, or return
+   *   the next earliest available record less than 
untilOffset, or null. It
+   *   will not throw any exception.
*/
-  def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[Array[Byte], 
Array[Byte]] = {
+  def get(
+  offset: Long,
+  untilOffset: Long,
+  pollTimeoutMs: Long,
+  failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = 
{
+require(offset < untilOffset,
+  s"offset must always be less than untilOffset [offset: $offset, 
untilOffset: $untilOffset]")
 logDebug(s"Get $groupId $topicPartition nextOffset 
$nextOffsetInFetchedData requested $offset")
-if (offset != nextOffsetInFetchedData) {
-  logInfo(s"Initial fetch for $topicPartition $offset")
-  seek(offset)
-  poll(pollTimeoutMs)
+// The following loop is basically for `failOnDataLoss = false`. When 
`failOnDataLoss` is
+// `false`, firstly, we will try to fetch the record at `offset`. If 
no such record, then we
--- End diff --

nit: 
firstly --> first,
no such record exists

overall +1, thanks for this explanation.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...

2016-11-21 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/15820#discussion_r88963845
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
 ---
@@ -34,53 +38,211 @@ import org.apache.spark.internal.Logging
 private[kafka010] case class CachedKafkaConsumer private(
 topicPartition: TopicPartition,
 kafkaParams: ju.Map[String, Object]) extends Logging {
+  import CachedKafkaConsumer._
 
   private val groupId = 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
 
-  private val consumer = {
+  private var consumer = createConsumer
+
+  /** Iterator to the already fetch data */
+  private var fetchedData = 
ju.Collections.emptyIterator[ConsumerRecord[Array[Byte], Array[Byte]]]
+  private var nextOffsetInFetchedData = UNKNOWN_OFFSET
+
+  /** Create a KafkaConsumer to fetch records for `topicPartition` */
+  private def createConsumer: KafkaConsumer[Array[Byte], Array[Byte]] = {
 val c = new KafkaConsumer[Array[Byte], Array[Byte]](kafkaParams)
 val tps = new ju.ArrayList[TopicPartition]()
 tps.add(topicPartition)
 c.assign(tps)
 c
   }
 
-  /** Iterator to the already fetch data */
-  private var fetchedData = 
ju.Collections.emptyIterator[ConsumerRecord[Array[Byte], Array[Byte]]]
-  private var nextOffsetInFetchedData = -2L
-
   /**
-   * Get the record for the given offset, waiting up to timeout ms if IO 
is necessary.
-   * Sequential forward access will use buffers, but random access will be 
horribly inefficient.
+   * Get the record for the given offset if available. Otherwise it will 
either throw error
+   * (if failOnDataLoss = true), or return the next available offset 
within [offset, untilOffset).
+   *
+   * @param offset the offset to fetch.
+   * @param untilOffset the max offset to fetch. Exclusive.
+   * @param pollTimeoutMs timeout in milliseconds to poll data from Kafka.
+   * @param failOnDataLoss When `failOnDataLoss` is `true`, this method 
will either return record at
+   *   offset if available, or throw exception.when 
`failOnDataLoss` is `false`,
+   *   this method will either return record at offset 
if available, or return
+   *   the next earliest available record less than 
untilOffset, or null. It
+   *   will not throw any exception.
*/
-  def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[Array[Byte], 
Array[Byte]] = {
+  def get(
+  offset: Long,
+  untilOffset: Long,
+  pollTimeoutMs: Long,
+  failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = 
{
+require(offset < untilOffset,
+  s"offset must always be less than untilOffset [offset: $offset, 
untilOffset: $untilOffset]")
 logDebug(s"Get $groupId $topicPartition nextOffset 
$nextOffsetInFetchedData requested $offset")
-if (offset != nextOffsetInFetchedData) {
-  logInfo(s"Initial fetch for $topicPartition $offset")
-  seek(offset)
-  poll(pollTimeoutMs)
+// The following loop is basically for `failOnDataLoss = false`. When 
`failOnDataLoss` is
+// `false`, firstly, we will try to fetch the record at `offset`. If 
no such record, then we
+// will move to the next available offset within `[offset, 
untilOffset)` and retry.
+// If `failOnDataLoss` is `true`, the loop body will be executed only 
once.
+var toFetchOffset = offset
+while (toFetchOffset != UNKNOWN_OFFSET) {
+  try {
+return fetchData(toFetchOffset, pollTimeoutMs)
+  } catch {
+case e: OffsetOutOfRangeException =>
+  // When there is some error thrown, it's better to use a new 
consumer to drop all cached
+  // states in the old consumer. We don't need to worry about the 
performance because this
+  // is not a normal path.
+  resetConsumer()
+  reportDataLoss(failOnDataLoss, s"Cannot fetch offset 
$toFetchOffset", e)
+  toFetchOffset = getEarliestAvailableOffsetBetween(toFetchOffset, 
untilOffset)
+  }
 }
+resetFetchedData()
+null
+  }
 
-if (!fetchedData.hasNext()) { poll(pollTimeoutMs) }
-assert(fetchedData.hasNext(),
-  s"Failed to get records for $groupId $topicPartition $offset " +
-s"after polling for $pollTimeoutMs")
-var record = fetchedData.next()
+  /**
+   * Return the next earliest available offset in [offset, untilOffset). 
If all offsets in
+   * [offset, untilOffset) are invalid (e.g., the topic is deleted and 
recreated), it will return
+   * `UNKNOWN_OFFSET`.
+   */

[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...

2016-11-18 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/15820#discussion_r88765764
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
 ---
@@ -47,40 +51,190 @@ private[kafka010] case class CachedKafkaConsumer 
private(
 
   /** Iterator to the already fetch data */
   private var fetchedData = 
ju.Collections.emptyIterator[ConsumerRecord[Array[Byte], Array[Byte]]]
-  private var nextOffsetInFetchedData = -2L
+  private var nextOffsetInFetchedData = UNKNOWN_OFFSET
 
   /**
-   * Get the record for the given offset, waiting up to timeout ms if IO 
is necessary.
-   * Sequential forward access will use buffers, but random access will be 
horribly inefficient.
+   * Get the record for the given offset if available. Otherwise it will 
either throw error
+   * (if failOnDataLoss = true), or return the next available offset 
within [offset, untilOffset).
+   *
+   * @param offset the offset to fetch.
+   * @param untilOffset the max offset to fetch. Exclusive.
+   * @param pollTimeoutMs timeout in milliseconds to poll data from Kafka.
+   * @param failOnDataLoss When `failOnDataLoss` is `true`, this method 
will either return record at
+   *   offset if available, or throw exception.when 
`failOnDataLoss` is `false`,
+   *   this method will either return record at offset 
if available, or return
+   *   the next earliest available record less than 
untilOffset, or null. It
+   *   will not throw any exception.
*/
-  def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[Array[Byte], 
Array[Byte]] = {
+  def get(
+  offset: Long,
+  untilOffset: Long,
+  pollTimeoutMs: Long,
+  failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = 
{
+require(offset < untilOffset,
+  s"offset must always be less than untilOffset [offset: $offset, 
untilOffset: $untilOffset]")
 logDebug(s"Get $groupId $topicPartition nextOffset 
$nextOffsetInFetchedData requested $offset")
-if (offset != nextOffsetInFetchedData) {
-  logInfo(s"Initial fetch for $topicPartition $offset")
-  seek(offset)
-  poll(pollTimeoutMs)
+var toFetchOffset = offset
+while (toFetchOffset != UNKNOWN_OFFSET) {
--- End diff --

add docs for this logic.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...

2016-11-18 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/15820#discussion_r88759061
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
 ---
@@ -223,52 +228,184 @@ class KafkaSourceSuite extends KafkaSourceTest {
 )
   }
 
-  test("assign from latest offsets") {
-val topic = newTopic()
-testFromLatestOffsets(topic, false, "assign" -> assignString(topic, 0 
to 4))
-  }
+  test("access offset 0 in Spark job but the topic has been deleted") {
+KafkaSourceSuite.collectedData.clear()
 
-  test("assign from earliest offsets") {
 val topic = newTopic()
-testFromEarliestOffsets(topic, false, "assign" -> assignString(topic, 
0 to 4))
-  }
+testUtils.createTopic(topic, partitions = 1)
+testUtils.sendMessages(topic, (1 to 10).map(_.toString).toArray)
 
-  test("assign from specific offsets") {
-val topic = newTopic()
-testFromSpecificOffsets(topic, "assign" -> assignString(topic, 0 to 4))
-  }
+val reader = spark
+  .readStream
+  .format("kafka")
+  .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+  .option("kafka.metadata.max.age.ms", "1")
+  .option("subscribe", topic)
+  // If a topic is deleted and we try to poll data starting from 
offset 0,
+  // the Kafka consumer will just block until timeout and return an 
empty result.
+  // So set the timeout to 1 second to make this test fast.
+  .option("kafkaConsumer.pollTimeoutMs", "1000")
+  .option("startingOffsets", "earliest")
+  .option("failOnDataLoss", "false")
+val kafka = reader.load()
+  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+  .as[(String, String)]
+KafkaSourceSuite.globalTestUtils = testUtils
+val query = kafka.map(kv => kv._2.toInt).writeStream.foreach(new 
ForeachWriter[Int] {
--- End diff --

move this test lower. after the basic tests.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...

2016-11-18 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/15820#discussion_r88759005
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
 ---
@@ -223,52 +228,184 @@ class KafkaSourceSuite extends KafkaSourceTest {
 )
   }
 
-  test("assign from latest offsets") {
-val topic = newTopic()
-testFromLatestOffsets(topic, false, "assign" -> assignString(topic, 0 
to 4))
-  }
+  test("access offset 0 in Spark job but the topic has been deleted") {
+KafkaSourceSuite.collectedData.clear()
 
-  test("assign from earliest offsets") {
 val topic = newTopic()
-testFromEarliestOffsets(topic, false, "assign" -> assignString(topic, 
0 to 4))
-  }
+testUtils.createTopic(topic, partitions = 1)
+testUtils.sendMessages(topic, (1 to 10).map(_.toString).toArray)
 
-  test("assign from specific offsets") {
-val topic = newTopic()
-testFromSpecificOffsets(topic, "assign" -> assignString(topic, 0 to 4))
-  }
+val reader = spark
+  .readStream
+  .format("kafka")
+  .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+  .option("kafka.metadata.max.age.ms", "1")
+  .option("subscribe", topic)
+  // If a topic is deleted and we try to poll data starting from 
offset 0,
+  // the Kafka consumer will just block until timeout and return an 
empty result.
+  // So set the timeout to 1 second to make this test fast.
+  .option("kafkaConsumer.pollTimeoutMs", "1000")
+  .option("startingOffsets", "earliest")
+  .option("failOnDataLoss", "false")
+val kafka = reader.load()
+  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+  .as[(String, String)]
+KafkaSourceSuite.globalTestUtils = testUtils
+val query = kafka.map(kv => kv._2.toInt).writeStream.foreach(new 
ForeachWriter[Int] {
+  override def open(partitionId: Long, version: Long): Boolean = {
+KafkaSourceSuite.globalTestUtils.deleteTopic(topic)
+true
+  }
 
-  test("subscribing topic by name from latest offsets") {
-val topic = newTopic()
-testFromLatestOffsets(topic, true, "subscribe" -> topic)
-  }
+  override def process(value: Int): Unit = {
+KafkaSourceSuite.collectedData.add(value)
+  }
 
-  test("subscribing topic by name from earliest offsets") {
-val topic = newTopic()
-testFromEarliestOffsets(topic, true, "subscribe" -> topic)
+  override def close(errorOrNull: Throwable): Unit = {}
+}).start()
+query.processAllAvailable()
+assert(KafkaSourceSuite.collectedData.isEmpty)
+query.stop()
+// `failOnDataLoss` is `false`, we should not fail the query
+assert(query.exception.isEmpty)
   }
 
-  test("subscribing topic by name from specific offsets") {
+  test("access non-zero offset in Spark job but the topic has been 
deleted") {
--- End diff --

remove this test. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...

2016-11-18 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/15820#discussion_r88758879
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
 ---
@@ -223,52 +228,184 @@ class KafkaSourceSuite extends KafkaSourceTest {
 )
   }
 
-  test("assign from latest offsets") {
-val topic = newTopic()
-testFromLatestOffsets(topic, false, "assign" -> assignString(topic, 0 
to 4))
-  }
+  test("access offset 0 in Spark job but the topic has been deleted") {
+KafkaSourceSuite.collectedData.clear()
 
-  test("assign from earliest offsets") {
 val topic = newTopic()
-testFromEarliestOffsets(topic, false, "assign" -> assignString(topic, 
0 to 4))
-  }
+testUtils.createTopic(topic, partitions = 1)
+testUtils.sendMessages(topic, (1 to 10).map(_.toString).toArray)
 
-  test("assign from specific offsets") {
-val topic = newTopic()
-testFromSpecificOffsets(topic, "assign" -> assignString(topic, 0 to 4))
-  }
+val reader = spark
+  .readStream
+  .format("kafka")
+  .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+  .option("kafka.metadata.max.age.ms", "1")
+  .option("subscribe", topic)
+  // If a topic is deleted and we try to poll data starting from 
offset 0,
+  // the Kafka consumer will just block until timeout and return an 
empty result.
+  // So set the timeout to 1 second to make this test fast.
+  .option("kafkaConsumer.pollTimeoutMs", "1000")
+  .option("startingOffsets", "earliest")
+  .option("failOnDataLoss", "false")
+val kafka = reader.load()
+  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+  .as[(String, String)]
+KafkaSourceSuite.globalTestUtils = testUtils
+val query = kafka.map(kv => kv._2.toInt).writeStream.foreach(new 
ForeachWriter[Int] {
--- End diff --

add explanation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...

2016-11-18 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/15820#discussion_r88758556
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
 ---
@@ -626,3 +776,106 @@ class KafkaSourceStressSuite extends KafkaSourceTest {
   iterations = 50)
   }
 }
+
+class KafkaSourceStressForDontFailOnDataLossSuite extends StreamTest with 
SharedSQLContext {
+
+  import testImplicits._
+
+  private var testUtils: KafkaTestUtils = _
+
+  private val topicId = new AtomicInteger(0)
+
+  private def newTopic(): String = 
s"failOnDataLoss-${topicId.getAndIncrement()}"
+
+  override def beforeAll(): Unit = {
+super.beforeAll()
+testUtils = new KafkaTestUtils {
+  override def brokerConfiguration: Properties = {
+val props = super.brokerConfiguration
+// Try to make Kafka clean up messages as fast as possible. 
However, there is a hard-code
+// 30 seconds delay (kafka.log.LogManager.InitialTaskDelayMs) so 
this test should run at
+// least 30 seconds.
+props.put("log.cleaner.backoff.ms", "100")
+props.put("log.segment.bytes", "40")
+props.put("log.retention.bytes", "40")
+props.put("log.retention.check.interval.ms", "100")
+props.put("delete.retention.ms", "10")
+props.put("log.flush.scheduler.interval.ms", "10")
+props
+  }
+}
+testUtils.setup()
+  }
+
+  override def afterAll(): Unit = {
+if (testUtils != null) {
+  testUtils.teardown()
+  testUtils = null
+  super.afterAll()
+}
+  }
+
+  test("stress test for failOnDataLoss=false") {
+val reader = spark
+  .readStream
+  .format("kafka")
+  .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+  .option("kafka.metadata.max.age.ms", "1")
+  .option("subscribePattern", "failOnDataLoss.*")
+  .option("startingOffsets", "earliest")
+  .option("failOnDataLoss", "false")
+val kafka = reader.load()
+  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+  .as[(String, String)]
+val query = kafka.map(kv => kv._2.toInt).writeStream.foreach(new 
ForeachWriter[Int] {
+
+  override def open(partitionId: Long, version: Long): Boolean = {
+true
+  }
+
+  override def process(value: Int): Unit = {
+// Slow down the processing speed so that messages may be aged out.
+Thread.sleep(Random.nextInt(100))
+  }
+
+  override def close(errorOrNull: Throwable): Unit = {
+  }
+}).start()
+
+val testTime = 1.minutes
+val startTime = System.currentTimeMillis()
+val topics = mutable.ArrayBuffer[String]()
+while (System.currentTimeMillis() - testTime.toMillis < startTime) {
+  Random.nextInt(6) match {
+case 0 =>
+  val topic = newTopic()
+  topics += topic
+  testUtils.createTopic(topic, partitions = 1)
+case 1 =>
--- End diff --

Update test to recreate same topics.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...

2016-11-18 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/15820#discussion_r88758444
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
 ---
@@ -47,40 +51,191 @@ private[kafka010] case class CachedKafkaConsumer 
private(
 
   /** Iterator to the already fetch data */
   private var fetchedData = 
ju.Collections.emptyIterator[ConsumerRecord[Array[Byte], Array[Byte]]]
-  private var nextOffsetInFetchedData = -2L
+  private var nextOffsetInFetchedData = UNKNOWN_OFFSET
 
   /**
* Get the record for the given offset, waiting up to timeout ms if IO 
is necessary.
* Sequential forward access will use buffers, but random access will be 
horribly inefficient.
+   *
+   * When `failOnDataLoss` is `true`, this will either return record at 
offset if available, or
+   * throw exception.
+   *
+   * When `failOnDataLoss` is `false`, this will either return record at 
offset if available, or
+   * return the next earliest available record less than untilOffset, or 
null. It will not throw
+   * any exception.
*/
-  def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[Array[Byte], 
Array[Byte]] = {
+  def get(
+  offset: Long,
+  untilOffset: Long,
+  pollTimeoutMs: Long,
+  failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = 
{
+require(offset < untilOffset,
+  s"offset must always be less than untilOffset [offset: $offset, 
untilOffset: $untilOffset]")
 logDebug(s"Get $groupId $topicPartition nextOffset 
$nextOffsetInFetchedData requested $offset")
-if (offset != nextOffsetInFetchedData) {
-  logInfo(s"Initial fetch for $topicPartition $offset")
-  seek(offset)
-  poll(pollTimeoutMs)
+var toFetchOffset = offset
+while (toFetchOffset != UNKNOWN_OFFSET) {
+  try {
+val record = fetchData(toFetchOffset, untilOffset, pollTimeoutMs, 
failOnDataLoss)
+if (record == null) {
+  reset()
+}
+return record
+  } catch {
+case e: OffsetOutOfRangeException =>
+  val message =
+if (failOnDataLoss) {
+  s"Cannot fetch offset $toFetchOffset"
+} else {
+  s"Cannot fetch offset $toFetchOffset. Some data may be lost. 
" +
+"Recovering from the earliest offset"
+}
+  reportDataLoss(failOnDataLoss, message, e)
+  toFetchOffset = getNextEarliestOffset(toFetchOffset, untilOffset)
+  }
 }
+reset()
+null
+  }
 
-if (!fetchedData.hasNext()) { poll(pollTimeoutMs) }
-assert(fetchedData.hasNext(),
-  s"Failed to get records for $groupId $topicPartition $offset " +
-s"after polling for $pollTimeoutMs")
-var record = fetchedData.next()
+  /**
+   * Return the next earliest available offset in [offset, untilOffset). 
If all offsets in
+   * [offset, untilOffset) are invalid (e.g., the topic is deleted and 
recreated), it will return
+   * `UNKNOWN_OFFSET`.
+   */
+  private def getNextEarliestOffset(offset: Long, untilOffset: Long): Long 
= {
+val (earliestOffset, latestOffset) = getAvailableOffsetRange()
+if (offset >= latestOffset || earliestOffset >= untilOffset) {
+  // [offset, untilOffset) and [earliestOffset, latestOffset) have no 
overlap,
+  // either
+  // 
+  // ^ ^ ^ ^
+  // | | | |
+  //   earliestOffset   latestOffset   offset   untilOffset
+  //
+  // or
+  // 
+  //  ^  ^  ^^
+  //  |  |  ||
+  //   offset   untilOffset   earliestOffset   latestOffset
+  val warningMessage =
+s"""
+  |The current available offset range is [$earliestOffset, 
$latestOffset).
+  | Offset ${offset} is out of range, and records in [$offset, 
$untilOffset) will be
+  | skipped ${additionalMessage(failOnDataLoss = false)}
+""".stripMargin
+  logWarning(warningMessage)
+  UNKNOWN_OFFSET
+} else if (offset >= earliestOffset) {
+  // 
-
+  // ^^  ^ 
^
+  // ||  | 
|
+  //   earliestOffset   offset   

[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...

2016-11-18 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/15820#discussion_r88758179
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
 ---
@@ -47,40 +51,191 @@ private[kafka010] case class CachedKafkaConsumer 
private(
 
   /** Iterator to the already fetch data */
   private var fetchedData = 
ju.Collections.emptyIterator[ConsumerRecord[Array[Byte], Array[Byte]]]
-  private var nextOffsetInFetchedData = -2L
+  private var nextOffsetInFetchedData = UNKNOWN_OFFSET
 
   /**
* Get the record for the given offset, waiting up to timeout ms if IO 
is necessary.
--- End diff --

nit: update docs to clarify earlier that this may not return offset 
Get the record for the given offset if available. Otherwise it will either 
throw error (if failOnDataLoss = true), or return the next available offset 
within [offset, untilOffset].

Use @param to explain pollTimeoutMs and others in more detail




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...

2016-11-18 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/15820#discussion_r88757785
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
 ---
@@ -47,40 +51,191 @@ private[kafka010] case class CachedKafkaConsumer 
private(
 
   /** Iterator to the already fetch data */
   private var fetchedData = 
ju.Collections.emptyIterator[ConsumerRecord[Array[Byte], Array[Byte]]]
-  private var nextOffsetInFetchedData = -2L
+  private var nextOffsetInFetchedData = UNKNOWN_OFFSET
 
   /**
* Get the record for the given offset, waiting up to timeout ms if IO 
is necessary.
* Sequential forward access will use buffers, but random access will be 
horribly inefficient.
+   *
+   * When `failOnDataLoss` is `true`, this will either return record at 
offset if available, or
+   * throw exception.
+   *
+   * When `failOnDataLoss` is `false`, this will either return record at 
offset if available, or
+   * return the next earliest available record less than untilOffset, or 
null. It will not throw
+   * any exception.
*/
-  def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[Array[Byte], 
Array[Byte]] = {
+  def get(
+  offset: Long,
+  untilOffset: Long,
+  pollTimeoutMs: Long,
+  failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = 
{
+require(offset < untilOffset,
+  s"offset must always be less than untilOffset [offset: $offset, 
untilOffset: $untilOffset]")
 logDebug(s"Get $groupId $topicPartition nextOffset 
$nextOffsetInFetchedData requested $offset")
-if (offset != nextOffsetInFetchedData) {
-  logInfo(s"Initial fetch for $topicPartition $offset")
-  seek(offset)
-  poll(pollTimeoutMs)
+var toFetchOffset = offset
+while (toFetchOffset != UNKNOWN_OFFSET) {
+  try {
+val record = fetchData(toFetchOffset, untilOffset, pollTimeoutMs, 
failOnDataLoss)
+if (record == null) {
+  reset()
+}
+return record
+  } catch {
+case e: OffsetOutOfRangeException =>
+  val message =
+if (failOnDataLoss) {
+  s"Cannot fetch offset $toFetchOffset"
+} else {
+  s"Cannot fetch offset $toFetchOffset. Some data may be lost. 
" +
+"Recovering from the earliest offset"
+}
+  reportDataLoss(failOnDataLoss, message, e)
+  toFetchOffset = getNextEarliestOffset(toFetchOffset, untilOffset)
+  }
 }
+reset()
+null
+  }
 
-if (!fetchedData.hasNext()) { poll(pollTimeoutMs) }
-assert(fetchedData.hasNext(),
-  s"Failed to get records for $groupId $topicPartition $offset " +
-s"after polling for $pollTimeoutMs")
-var record = fetchedData.next()
+  /**
+   * Return the next earliest available offset in [offset, untilOffset). 
If all offsets in
+   * [offset, untilOffset) are invalid (e.g., the topic is deleted and 
recreated), it will return
+   * `UNKNOWN_OFFSET`.
+   */
+  private def getNextEarliestOffset(offset: Long, untilOffset: Long): Long 
= {
--- End diff --

nit: `getEarliestAvailableOffsetBetween(...)`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...

2016-11-16 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/15820#discussion_r88366565
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
 ---
@@ -47,40 +51,229 @@ private[kafka010] case class CachedKafkaConsumer 
private(
 
   /** Iterator to the already fetch data */
   private var fetchedData = 
ju.Collections.emptyIterator[ConsumerRecord[Array[Byte], Array[Byte]]]
-  private var nextOffsetInFetchedData = -2L
+  private var nextOffsetInFetchedData = UNKNOWN_OFFSET
 
   /**
* Get the record for the given offset, waiting up to timeout ms if IO 
is necessary.
* Sequential forward access will use buffers, but random access will be 
horribly inefficient.
+   *
+   * If `failOnDataLoss` is `false`, it will try to get the earliest 
record in
+   * `[offset, untilOffset)` when some illegal state happens. Otherwise, 
an `IllegalStateException`
--- End diff --

"Otherwise" means, when failOnDataLoss is false but not some illegal state? 
its confusing. 

Rather just rewrite it as
When `failOnDataLoss` is `true`, this will either return record at offset 
if available, or throw exception.
When `failOnDataLoss` is `false`, this will either return record at offset 
if available, or return the next earliest available record < untilOffset, or 
null. It will not throw any exception.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...

2016-11-16 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/15820#discussion_r88363378
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
 ---
@@ -96,10 +289,20 @@ private[kafka010] case class CachedKafkaConsumer 
private(
 logDebug(s"Polled $groupId ${p.partitions()}  ${r.size}")
 fetchedData = r.iterator
   }
+
+  private def getCurrentOffsetRange(): (Long, Long) = {
--- End diff --

getCurrentOffsetRange -> getValidOffsetRange or getAvailableOffsetRange to 
make it more clear on what this is. 
And add docs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...

2016-11-16 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/15820#discussion_r88367402
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
 ---
@@ -47,40 +51,229 @@ private[kafka010] case class CachedKafkaConsumer 
private(
 
   /** Iterator to the already fetch data */
   private var fetchedData = 
ju.Collections.emptyIterator[ConsumerRecord[Array[Byte], Array[Byte]]]
-  private var nextOffsetInFetchedData = -2L
+  private var nextOffsetInFetchedData = UNKNOWN_OFFSET
 
   /**
* Get the record for the given offset, waiting up to timeout ms if IO 
is necessary.
* Sequential forward access will use buffers, but random access will be 
horribly inefficient.
+   *
+   * If `failOnDataLoss` is `false`, it will try to get the earliest 
record in
+   * `[offset, untilOffset)` when some illegal state happens. Otherwise, 
an `IllegalStateException`
+   * will be thrown.
+   *
+   * It returns `null` only when `failOnDataLoss` is `false` and it cannot 
fetch any record between
+   * [offset, untilOffset).
*/
-  def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[Array[Byte], 
Array[Byte]] = {
+  def get(
+  offset: Long,
+  untilOffset: Long,
+  pollTimeoutMs: Long,
+  failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = 
{
+require(offset < untilOffset, s"offset: $offset, untilOffset: 
$untilOffset")
 logDebug(s"Get $groupId $topicPartition nextOffset 
$nextOffsetInFetchedData requested $offset")
+try {
+  fetchDataIfNeeded(offset, pollTimeoutMs)
+  getRecordFromFetchedData(offset, untilOffset, failOnDataLoss)
+} catch {
+  case e: OffsetOutOfRangeException =>
+val message =
+  if (failOnDataLoss) {
+s"""Cannot fetch offset $offset. (GroupId: $groupId, 
TopicPartition: $topicPartition).
+   | $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE""".stripMargin
+  } else {
+s"""Cannot fetch offset $offset. Some data may be lost. 
Recovering from the earliest
+   | offset (GroupId: $groupId, TopicPartition: 
$topicPartition).""".stripMargin
+  }
+reportDataLoss(failOnDataLoss, message, e)
+advanceToEarliestOffsetAndFetch(offset, untilOffset, pollTimeoutMs)
+}
+  }
+
+  /**
+   * Check the pre-fetched data with `offset` and try to fetch from Kafka 
if they don't match.
+   */
+  private def fetchDataIfNeeded(offset: Long, pollTimeoutMs: Long): Unit = 
{
 if (offset != nextOffsetInFetchedData) {
   logInfo(s"Initial fetch for $topicPartition $offset")
   seek(offset)
   poll(pollTimeoutMs)
+} else if (!fetchedData.hasNext()) {
+  // The last pre-fetched data has been drained.
+  // Seek to the offset because we may call seekToBeginning or 
seekToEnd before this.
+  seek(offset)
+  poll(pollTimeoutMs)
 }
+  }
 
-if (!fetchedData.hasNext()) { poll(pollTimeoutMs) }
-assert(fetchedData.hasNext(),
-  s"Failed to get records for $groupId $topicPartition $offset " +
-s"after polling for $pollTimeoutMs")
-var record = fetchedData.next()
+  /**
+   * Try to advance to the beginning offset and fetch again. 
`earliestOffset` should be in
+   * `[offset, untilOffset]`. If not, it will try to fetch `offset` again 
if it's in
+   * `[earliestOffset, latestOffset)`. Otherwise, it will return null and 
reset the pre-fetched
+   * data.
+   */
+  private def advanceToEarliestOffsetAndFetch(
+  offset: Long,
+  untilOffset: Long,
+  pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = {
+val (earliestOffset, latestOffset) = getCurrentOffsetRange()
+if (offset >= latestOffset || earliestOffset >= untilOffset) {
+  // [offset, untilOffset) and [earliestOffset, latestOffset) have no 
overlap,
+  // either
+  // 
+  // ^ ^ ^ ^
+  // | | | |
+  //   earliestOffset   latestOffset   offset   untilOffset
+  //
+  // or
+  // 
+  //  ^  ^  ^^
+  //  |  |  ||
+  //   offset   untilOffset   earliestOffset   latestOffset
+  val warningMessage =
+s"""
+  |The current available offset range is [$earliestOffset, 
$latestOffset).
+  | Offset ${offset} is out of range, and records in 

[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...

2016-11-16 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/15820#discussion_r88364958
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
 ---
@@ -47,40 +51,229 @@ private[kafka010] case class CachedKafkaConsumer 
private(
 
   /** Iterator to the already fetch data */
   private var fetchedData = 
ju.Collections.emptyIterator[ConsumerRecord[Array[Byte], Array[Byte]]]
-  private var nextOffsetInFetchedData = -2L
+  private var nextOffsetInFetchedData = UNKNOWN_OFFSET
 
   /**
* Get the record for the given offset, waiting up to timeout ms if IO 
is necessary.
* Sequential forward access will use buffers, but random access will be 
horribly inefficient.
+   *
+   * If `failOnDataLoss` is `false`, it will try to get the earliest 
record in
--- End diff --

nit: earliest available record


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...

2016-11-16 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/15820#discussion_r88363098
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
 ---
@@ -47,40 +51,229 @@ private[kafka010] case class CachedKafkaConsumer 
private(
 
   /** Iterator to the already fetch data */
   private var fetchedData = 
ju.Collections.emptyIterator[ConsumerRecord[Array[Byte], Array[Byte]]]
-  private var nextOffsetInFetchedData = -2L
+  private var nextOffsetInFetchedData = UNKNOWN_OFFSET
 
   /**
* Get the record for the given offset, waiting up to timeout ms if IO 
is necessary.
* Sequential forward access will use buffers, but random access will be 
horribly inefficient.
+   *
+   * If `failOnDataLoss` is `false`, it will try to get the earliest 
record in
+   * `[offset, untilOffset)` when some illegal state happens. Otherwise, 
an `IllegalStateException`
+   * will be thrown.
+   *
+   * It returns `null` only when `failOnDataLoss` is `false` and it cannot 
fetch any record between
+   * [offset, untilOffset).
*/
-  def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[Array[Byte], 
Array[Byte]] = {
+  def get(
+  offset: Long,
+  untilOffset: Long,
+  pollTimeoutMs: Long,
+  failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = 
{
+require(offset < untilOffset, s"offset: $offset, untilOffset: 
$untilOffset")
--- End diff --

give better error message, saying "offset must always be less than 
untilOffset"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...

2016-11-16 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/15820#discussion_r88363196
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
 ---
@@ -47,40 +51,229 @@ private[kafka010] case class CachedKafkaConsumer 
private(
 
   /** Iterator to the already fetch data */
   private var fetchedData = 
ju.Collections.emptyIterator[ConsumerRecord[Array[Byte], Array[Byte]]]
-  private var nextOffsetInFetchedData = -2L
+  private var nextOffsetInFetchedData = UNKNOWN_OFFSET
 
   /**
* Get the record for the given offset, waiting up to timeout ms if IO 
is necessary.
* Sequential forward access will use buffers, but random access will be 
horribly inefficient.
+   *
+   * If `failOnDataLoss` is `false`, it will try to get the earliest 
record in
+   * `[offset, untilOffset)` when some illegal state happens. Otherwise, 
an `IllegalStateException`
+   * will be thrown.
+   *
+   * It returns `null` only when `failOnDataLoss` is `false` and it cannot 
fetch any record between
+   * [offset, untilOffset).
*/
-  def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[Array[Byte], 
Array[Byte]] = {
+  def get(
+  offset: Long,
+  untilOffset: Long,
+  pollTimeoutMs: Long,
+  failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = 
{
+require(offset < untilOffset, s"offset: $offset, untilOffset: 
$untilOffset")
 logDebug(s"Get $groupId $topicPartition nextOffset 
$nextOffsetInFetchedData requested $offset")
+try {
+  fetchDataIfNeeded(offset, pollTimeoutMs)
+  getRecordFromFetchedData(offset, untilOffset, failOnDataLoss)
+} catch {
+  case e: OffsetOutOfRangeException =>
+val message =
+  if (failOnDataLoss) {
+s"""Cannot fetch offset $offset. (GroupId: $groupId, 
TopicPartition: $topicPartition).
+   | $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE""".stripMargin
+  } else {
+s"""Cannot fetch offset $offset. Some data may be lost. 
Recovering from the earliest
+   | offset (GroupId: $groupId, TopicPartition: 
$topicPartition).""".stripMargin
+  }
+reportDataLoss(failOnDataLoss, message, e)
+advanceToEarliestOffsetAndFetch(offset, untilOffset, pollTimeoutMs)
+}
+  }
+
+  /**
+   * Check the pre-fetched data with `offset` and try to fetch from Kafka 
if they don't match.
+   */
+  private def fetchDataIfNeeded(offset: Long, pollTimeoutMs: Long): Unit = 
{
 if (offset != nextOffsetInFetchedData) {
   logInfo(s"Initial fetch for $topicPartition $offset")
   seek(offset)
   poll(pollTimeoutMs)
+} else if (!fetchedData.hasNext()) {
+  // The last pre-fetched data has been drained.
+  // Seek to the offset because we may call seekToBeginning or 
seekToEnd before this.
+  seek(offset)
+  poll(pollTimeoutMs)
--- End diff --

Why havent these two cases been merged? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...

2016-11-16 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/15820#discussion_r88366202
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
 ---
@@ -47,40 +51,229 @@ private[kafka010] case class CachedKafkaConsumer 
private(
 
   /** Iterator to the already fetch data */
   private var fetchedData = 
ju.Collections.emptyIterator[ConsumerRecord[Array[Byte], Array[Byte]]]
-  private var nextOffsetInFetchedData = -2L
+  private var nextOffsetInFetchedData = UNKNOWN_OFFSET
 
   /**
* Get the record for the given offset, waiting up to timeout ms if IO 
is necessary.
* Sequential forward access will use buffers, but random access will be 
horribly inefficient.
+   *
+   * If `failOnDataLoss` is `false`, it will try to get the earliest 
record in
+   * `[offset, untilOffset)` when some illegal state happens. Otherwise, 
an `IllegalStateException`
+   * will be thrown.
+   *
+   * It returns `null` only when `failOnDataLoss` is `false` and it cannot 
fetch any record between
+   * [offset, untilOffset).
*/
-  def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[Array[Byte], 
Array[Byte]] = {
+  def get(
+  offset: Long,
+  untilOffset: Long,
+  pollTimeoutMs: Long,
+  failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = 
{
+require(offset < untilOffset, s"offset: $offset, untilOffset: 
$untilOffset")
 logDebug(s"Get $groupId $topicPartition nextOffset 
$nextOffsetInFetchedData requested $offset")
+try {
+  fetchDataIfNeeded(offset, pollTimeoutMs)
+  getRecordFromFetchedData(offset, untilOffset, failOnDataLoss)
+} catch {
+  case e: OffsetOutOfRangeException =>
+val message =
+  if (failOnDataLoss) {
+s"""Cannot fetch offset $offset. (GroupId: $groupId, 
TopicPartition: $topicPartition).
+   | $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE""".stripMargin
+  } else {
+s"""Cannot fetch offset $offset. Some data may be lost. 
Recovering from the earliest
+   | offset (GroupId: $groupId, TopicPartition: 
$topicPartition).""".stripMargin
+  }
+reportDataLoss(failOnDataLoss, message, e)
+advanceToEarliestOffsetAndFetch(offset, untilOffset, pollTimeoutMs)
+}
+  }
+
+  /**
+   * Check the pre-fetched data with `offset` and try to fetch from Kafka 
if they don't match.
+   */
+  private def fetchDataIfNeeded(offset: Long, pollTimeoutMs: Long): Unit = 
{
 if (offset != nextOffsetInFetchedData) {
   logInfo(s"Initial fetch for $topicPartition $offset")
   seek(offset)
   poll(pollTimeoutMs)
+} else if (!fetchedData.hasNext()) {
+  // The last pre-fetched data has been drained.
+  // Seek to the offset because we may call seekToBeginning or 
seekToEnd before this.
+  seek(offset)
+  poll(pollTimeoutMs)
 }
+  }
 
-if (!fetchedData.hasNext()) { poll(pollTimeoutMs) }
-assert(fetchedData.hasNext(),
-  s"Failed to get records for $groupId $topicPartition $offset " +
-s"after polling for $pollTimeoutMs")
-var record = fetchedData.next()
+  /**
+   * Try to advance to the beginning offset and fetch again. 
`earliestOffset` should be in
+   * `[offset, untilOffset]`. If not, it will try to fetch `offset` again 
if it's in
+   * `[earliestOffset, latestOffset)`. Otherwise, it will return null and 
reset the pre-fetched
+   * data.
+   */
+  private def advanceToEarliestOffsetAndFetch(
+  offset: Long,
+  untilOffset: Long,
+  pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = {
+val (earliestOffset, latestOffset) = getCurrentOffsetRange()
+if (offset >= latestOffset || earliestOffset >= untilOffset) {
+  // [offset, untilOffset) and [earliestOffset, latestOffset) have no 
overlap,
+  // either
+  // 
+  // ^ ^ ^ ^
+  // | | | |
+  //   earliestOffset   latestOffset   offset   untilOffset
+  //
+  // or
+  // 
+  //  ^  ^  ^^
+  //  |  |  ||
+  //   offset   untilOffset   earliestOffset   latestOffset
+  val warningMessage =
+s"""
+  |The current available offset range is [$earliestOffset, 
$latestOffset).
+  | Offset ${offset} is out of range, and records in 

[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...

2016-11-16 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/15820#discussion_r88360922
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
 ---
@@ -83,6 +86,129 @@ private[kafka010] case class CachedKafkaConsumer 
private(
 record
   }
 
+  /**
+   * Get the record at the `offset`. If it doesn't exist, try to get the 
earliest record in
+   * `[offset, untilOffset)`.
+   */
+  def getAndIgnoreLostData(
+  offset: Long,
+  untilOffset: Long,
+  pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = {
+// scalastyle:off
+// When `failOnDataLoss` is `false`, we need to handle the following 
cases (note: untilOffset and latestOffset are exclusive):
+// 1. Some data are aged out, and `offset < beginningOffset <= 
untilOffset - 1 <= latestOffset - 1`
+//  Seek to the beginningOffset and fetch the data.
+// 2. Some data are aged out, and `offset <= untilOffset - 1 < 
beginningOffset`.
+//  There is nothing to fetch, return null.
+// 3. The topic is deleted.
+//  There is nothing to fetch, return null.
+// 4. The topic is deleted and recreated, and `beginningOffset <= 
offset <= untilOffset - 1 <= latestOffset - 1`.
+//  We cannot detect this case. We can still fetch data like 
nothing happens.
+// 5. The topic is deleted and recreated, and `beginningOffset <= 
offset < latestOffset - 1 < untilOffset - 1`.
+//  Same as 4.
+// 6. The topic is deleted and recreated, and `beginningOffset <= 
latestOffset - 1 < offset <= untilOffset - 1`.
+//  There is nothing to fetch, return null.
+// 7. The topic is deleted and recreated, and `offset < 
beginningOffset <= untilOffset - 1`.
+//  Same as 1.
+// 8. The topic is deleted and recreated, and `offset <= untilOffset - 
1 < beginningOffset`.
+//  There is nothing to fetch, return null.
+// scalastyle:on
+require(offset < untilOffset, s"offset: $offset, untilOffset: 
$untilOffset")
+logDebug(s"Get $groupId $topicPartition nextOffset 
$nextOffsetInFetchedData requested $offset")
+try {
+  if (offset != nextOffsetInFetchedData) {
+logInfo(s"Initial fetch for $topicPartition $offset")
+seek(offset)
+poll(pollTimeoutMs)
+  } else if (!fetchedData.hasNext()) {
+// The last pre-fetched data has been drained.
+poll(pollTimeoutMs)
+  }
+  getRecordFromFetchedData(offset, untilOffset)
+} catch {
+  case e: OffsetOutOfRangeException =>
+logWarning(s"Cannot fetch offset $offset, try to recover from the 
beginning offset", e)
+advanceToBeginningOffsetAndFetch(offset, untilOffset, 
pollTimeoutMs)
+}
+  }
+
+  /**
+   * Try to advance to the beginning offset and fetch again. 
`beginningOffset` should be in
+   * `[offset, untilOffset]`. If not, it will try to fetch `offset` again 
if it's in
+   * `[beginningOffset, latestOffset)`. Otherwise, it will return null and 
reset the pre-fetched
+   * data.
+   */
+  private def advanceToBeginningOffsetAndFetch(
+  offset: Long,
+  untilOffset: Long,
+  pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = {
+val beginningOffset = getBeginningOffset()
+if (beginningOffset <= offset) {
+  val latestOffset = getLatestOffset()
+  if (latestOffset <= offset) {
+// beginningOffset <= latestOffset - 1 < offset <= untilOffset - 1
+logWarning(s"Offset ${offset} is later than the latest offset 
$latestOffset. " +
+  s"Skipped [$offset, $untilOffset)")
+reset()
+null
+  } else {
+// beginningOffset <= offset <= min(latestOffset - 1, untilOffset 
- 1)
+getAndIgnoreLostData(offset, untilOffset, pollTimeoutMs)
+  }
+} else {
+  if (beginningOffset >= untilOffset) {
+// offset <= untilOffset - 1 < beginningOffset
+logWarning(s"Buffer miss for $groupId $topicPartition [$offset, 
$untilOffset)")
+reset()
+null
+  } else {
+// offset < beginningOffset <= untilOffset - 1
+logWarning(s"Buffer miss for $groupId $topicPartition [$offset, 
$beginningOffset)")
+getAndIgnoreLostData(beginningOffset, untilOffset, pollTimeoutMs)
+  }
+}
+  }
+
+  /**
+   * Get the earliest record in [offset, untilOffset) from the fetched 
data. If there is no such
+   * record, returns null. Must be called after `poll`.
+   */
+  private def getRecordFromFetchedData(
+  offset: Long,
+  untilOffset: Long): 

[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...

2016-11-15 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/15820#discussion_r88139985
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
 ---
@@ -83,6 +86,139 @@ private[kafka010] case class CachedKafkaConsumer 
private(
 record
   }
 
+  /**
+   * Get the record at the `offset`. If it doesn't exist, try to get the 
earliest record in
+   * `[offset, untilOffset)`.
+   */
+  def getAndIgnoreLostData(
+  offset: Long,
+  untilOffset: Long,
+  pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = {
+// scalastyle:off
+// When `failOnDataLoss` is `false`, we need to handle the following 
cases (note: untilOffset and latestOffset are exclusive):
+// 1. Some data are aged out, and `offset < beginningOffset <= 
untilOffset - 1 <= latestOffset - 1`
+//  Seek to the beginningOffset and fetch the data.
+// 2. Some data are aged out, and `offset <= untilOffset - 1 < 
beginningOffset`.
+//  There is nothing to fetch, return null.
+// 3. The topic is deleted.
+//  There is nothing to fetch, return null.
+// 4. The topic is deleted and recreated, and `beginningOffset <= 
offset <= untilOffset - 1 <= latestOffset - 1`.
+//  We cannot detect this case. We can still fetch data like 
nothing happens.
+// 5. The topic is deleted and recreated, and `beginningOffset <= 
offset < latestOffset - 1 < untilOffset - 1`.
+//  Same as 4.
+// 6. The topic is deleted and recreated, and `beginningOffset <= 
latestOffset - 1 < offset <= untilOffset - 1`.
+//  There is nothing to fetch, return null.
+// 7. The topic is deleted and recreated, and `offset < 
beginningOffset <= untilOffset - 1`.
+//  Same as 1.
+// 8. The topic is deleted and recreated, and `offset <= untilOffset - 
1 < beginningOffset`.
+//  There is nothing to fetch, return null.
+// scalastyle:on
+require(offset < untilOffset, s"offset: $offset, untilOffset: 
$untilOffset")
+logDebug(s"Get $groupId $topicPartition nextOffset 
$nextOffsetInFetchedData requested $offset")
+try {
+  if (offset != nextOffsetInFetchedData) {
+logInfo(s"Initial fetch for $topicPartition $offset")
+seek(offset)
+poll(pollTimeoutMs)
+  } else if (!fetchedData.hasNext()) {
+// The last pre-fetched data has been drained.
+poll(pollTimeoutMs)
+  }
+  getRecordFromFetchedData(offset, untilOffset)
+} catch {
+  case e: OffsetOutOfRangeException =>
+logWarning(s"Cannot fetch offset $offset. Some data may be lost. 
Recovering from " +
+  "the beginning offset", e)
+advanceToBeginningOffsetAndFetch(offset, untilOffset, 
pollTimeoutMs)
+}
+  }
+
+  /**
+   * Try to advance to the beginning offset and fetch again. 
`beginningOffset` should be in
+   * `[offset, untilOffset]`. If not, it will try to fetch `offset` again 
if it's in
+   * `[beginningOffset, latestOffset)`. Otherwise, it will return null and 
reset the pre-fetched
+   * data.
+   */
+  private def advanceToBeginningOffsetAndFetch(
+  offset: Long,
+  untilOffset: Long,
+  pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = {
+val beginningOffset = getBeginningOffset()
+if (beginningOffset <= offset) {
--- End diff --

nit: its easier to think when "offset" is before in the condition, i.e. 
offset >= beginningOffset



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...

2016-11-15 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/15820#discussion_r88140653
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
 ---
@@ -83,6 +86,139 @@ private[kafka010] case class CachedKafkaConsumer 
private(
 record
   }
 
+  /**
+   * Get the record at the `offset`. If it doesn't exist, try to get the 
earliest record in
+   * `[offset, untilOffset)`.
+   */
+  def getAndIgnoreLostData(
+  offset: Long,
+  untilOffset: Long,
+  pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = {
+// scalastyle:off
+// When `failOnDataLoss` is `false`, we need to handle the following 
cases (note: untilOffset and latestOffset are exclusive):
+// 1. Some data are aged out, and `offset < beginningOffset <= 
untilOffset - 1 <= latestOffset - 1`
+//  Seek to the beginningOffset and fetch the data.
+// 2. Some data are aged out, and `offset <= untilOffset - 1 < 
beginningOffset`.
+//  There is nothing to fetch, return null.
+// 3. The topic is deleted.
+//  There is nothing to fetch, return null.
+// 4. The topic is deleted and recreated, and `beginningOffset <= 
offset <= untilOffset - 1 <= latestOffset - 1`.
+//  We cannot detect this case. We can still fetch data like 
nothing happens.
+// 5. The topic is deleted and recreated, and `beginningOffset <= 
offset < latestOffset - 1 < untilOffset - 1`.
+//  Same as 4.
+// 6. The topic is deleted and recreated, and `beginningOffset <= 
latestOffset - 1 < offset <= untilOffset - 1`.
+//  There is nothing to fetch, return null.
+// 7. The topic is deleted and recreated, and `offset < 
beginningOffset <= untilOffset - 1`.
+//  Same as 1.
+// 8. The topic is deleted and recreated, and `offset <= untilOffset - 
1 < beginningOffset`.
+//  There is nothing to fetch, return null.
+// scalastyle:on
+require(offset < untilOffset, s"offset: $offset, untilOffset: 
$untilOffset")
+logDebug(s"Get $groupId $topicPartition nextOffset 
$nextOffsetInFetchedData requested $offset")
+try {
+  if (offset != nextOffsetInFetchedData) {
+logInfo(s"Initial fetch for $topicPartition $offset")
+seek(offset)
+poll(pollTimeoutMs)
+  } else if (!fetchedData.hasNext()) {
+// The last pre-fetched data has been drained.
+poll(pollTimeoutMs)
+  }
+  getRecordFromFetchedData(offset, untilOffset)
+} catch {
+  case e: OffsetOutOfRangeException =>
+logWarning(s"Cannot fetch offset $offset. Some data may be lost. 
Recovering from " +
+  "the beginning offset", e)
+advanceToBeginningOffsetAndFetch(offset, untilOffset, 
pollTimeoutMs)
+}
+  }
+
+  /**
+   * Try to advance to the beginning offset and fetch again. 
`beginningOffset` should be in
+   * `[offset, untilOffset]`. If not, it will try to fetch `offset` again 
if it's in
+   * `[beginningOffset, latestOffset)`. Otherwise, it will return null and 
reset the pre-fetched
+   * data.
+   */
+  private def advanceToBeginningOffsetAndFetch(
+  offset: Long,
+  untilOffset: Long,
+  pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = {
+val beginningOffset = getBeginningOffset()
+if (beginningOffset <= offset) {
+  val latestOffset = getLatestOffset()
+  if (latestOffset <= offset) {
+// beginningOffset <= latestOffset - 1 < offset <= untilOffset - 1
+logWarning(s"Offset ${offset} is later than the latest offset 
$latestOffset. " +
+  s"Skipped [$offset, $untilOffset)")
+reset()
+null
+  } else {
+// beginningOffset <= offset <= min(latestOffset - 1, untilOffset 
- 1)
+//
+// This will happen when a topic is deleted and recreated, and new 
data are pushed very fast
+// , then we will see `offset` disappears first then appears 
again. Although the parameters
+// are same, the state in Kafka cluster is changed, so it's not an 
endless loop.
+//
+// In addition, the stack here won't be deep unless the user keeps 
deleting and creating the
+// topic very fast.
+//
+// Therefore, this recursive call is safe.
+getAndIgnoreLostData(offset, untilOffset, pollTimeoutMs)
+  }
+} else {
+  if (beginningOffset >= untilOffset) {
+// offset <= untilOffset - 1 < beginningOffset
+logWarning(s"Buffer miss for $groupId $topicPartition [$offset, 
$untilOffset)")
+reset()
+

[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...

2016-11-15 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/15820#discussion_r88137338
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
 ---
@@ -83,6 +86,139 @@ private[kafka010] case class CachedKafkaConsumer 
private(
 record
   }
 
+  /**
+   * Get the record at the `offset`. If it doesn't exist, try to get the 
earliest record in
+   * `[offset, untilOffset)`.
+   */
+  def getAndIgnoreLostData(
+  offset: Long,
+  untilOffset: Long,
+  pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = {
+// scalastyle:off
+// When `failOnDataLoss` is `false`, we need to handle the following 
cases (note: untilOffset and latestOffset are exclusive):
+// 1. Some data are aged out, and `offset < beginningOffset <= 
untilOffset - 1 <= latestOffset - 1`
+//  Seek to the beginningOffset and fetch the data.
+// 2. Some data are aged out, and `offset <= untilOffset - 1 < 
beginningOffset`.
+//  There is nothing to fetch, return null.
+// 3. The topic is deleted.
+//  There is nothing to fetch, return null.
+// 4. The topic is deleted and recreated, and `beginningOffset <= 
offset <= untilOffset - 1 <= latestOffset - 1`.
+//  We cannot detect this case. We can still fetch data like 
nothing happens.
+// 5. The topic is deleted and recreated, and `beginningOffset <= 
offset < latestOffset - 1 < untilOffset - 1`.
+//  Same as 4.
+// 6. The topic is deleted and recreated, and `beginningOffset <= 
latestOffset - 1 < offset <= untilOffset - 1`.
+//  There is nothing to fetch, return null.
+// 7. The topic is deleted and recreated, and `offset < 
beginningOffset <= untilOffset - 1`.
+//  Same as 1.
+// 8. The topic is deleted and recreated, and `offset <= untilOffset - 
1 < beginningOffset`.
+//  There is nothing to fetch, return null.
+// scalastyle:on
+require(offset < untilOffset, s"offset: $offset, untilOffset: 
$untilOffset")
+logDebug(s"Get $groupId $topicPartition nextOffset 
$nextOffsetInFetchedData requested $offset")
+try {
+  if (offset != nextOffsetInFetchedData) {
+logInfo(s"Initial fetch for $topicPartition $offset")
+seek(offset)
+poll(pollTimeoutMs)
+  } else if (!fetchedData.hasNext()) {
+// The last pre-fetched data has been drained.
+poll(pollTimeoutMs)
+  }
+  getRecordFromFetchedData(offset, untilOffset)
+} catch {
+  case e: OffsetOutOfRangeException =>
+logWarning(s"Cannot fetch offset $offset. Some data may be lost. 
Recovering from " +
+  "the beginning offset", e)
+advanceToBeginningOffsetAndFetch(offset, untilOffset, 
pollTimeoutMs)
+}
+  }
+
+  /**
+   * Try to advance to the beginning offset and fetch again. 
`beginningOffset` should be in
+   * `[offset, untilOffset]`. If not, it will try to fetch `offset` again 
if it's in
+   * `[beginningOffset, latestOffset)`. Otherwise, it will return null and 
reset the pre-fetched
+   * data.
+   */
+  private def advanceToBeginningOffsetAndFetch(
+  offset: Long,
+  untilOffset: Long,
+  pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = {
+val beginningOffset = getBeginningOffset()
+if (beginningOffset <= offset) {
+  val latestOffset = getLatestOffset()
+  if (latestOffset <= offset) {
+// beginningOffset <= latestOffset - 1 < offset <= untilOffset - 1
+logWarning(s"Offset ${offset} is later than the latest offset 
$latestOffset. " +
+  s"Skipped [$offset, $untilOffset)")
+reset()
+null
+  } else {
+// beginningOffset <= offset <= min(latestOffset - 1, untilOffset 
- 1)
+//
+// This will happen when a topic is deleted and recreated, and new 
data are pushed very fast
+// , then we will see `offset` disappears first then appears 
again. Although the parameters
+// are same, the state in Kafka cluster is changed, so it's not an 
endless loop.
+//
+// In addition, the stack here won't be deep unless the user keeps 
deleting and creating the
+// topic very fast.
+//
+// Therefore, this recursive call is safe.
+getAndIgnoreLostData(offset, untilOffset, pollTimeoutMs)
+  }
+} else {
+  if (beginningOffset >= untilOffset) {
+// offset <= untilOffset - 1 < beginningOffset
+logWarning(s"Buffer miss for $groupId $topicPartition [$offset, 
$untilOffset)")
+reset()
+

[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...

2016-11-15 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/15820#discussion_r88140639
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
 ---
@@ -83,6 +86,139 @@ private[kafka010] case class CachedKafkaConsumer 
private(
 record
   }
 
+  /**
+   * Get the record at the `offset`. If it doesn't exist, try to get the 
earliest record in
+   * `[offset, untilOffset)`.
+   */
+  def getAndIgnoreLostData(
+  offset: Long,
+  untilOffset: Long,
+  pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = {
+// scalastyle:off
+// When `failOnDataLoss` is `false`, we need to handle the following 
cases (note: untilOffset and latestOffset are exclusive):
+// 1. Some data are aged out, and `offset < beginningOffset <= 
untilOffset - 1 <= latestOffset - 1`
+//  Seek to the beginningOffset and fetch the data.
+// 2. Some data are aged out, and `offset <= untilOffset - 1 < 
beginningOffset`.
+//  There is nothing to fetch, return null.
+// 3. The topic is deleted.
+//  There is nothing to fetch, return null.
+// 4. The topic is deleted and recreated, and `beginningOffset <= 
offset <= untilOffset - 1 <= latestOffset - 1`.
+//  We cannot detect this case. We can still fetch data like 
nothing happens.
+// 5. The topic is deleted and recreated, and `beginningOffset <= 
offset < latestOffset - 1 < untilOffset - 1`.
+//  Same as 4.
+// 6. The topic is deleted and recreated, and `beginningOffset <= 
latestOffset - 1 < offset <= untilOffset - 1`.
+//  There is nothing to fetch, return null.
+// 7. The topic is deleted and recreated, and `offset < 
beginningOffset <= untilOffset - 1`.
+//  Same as 1.
+// 8. The topic is deleted and recreated, and `offset <= untilOffset - 
1 < beginningOffset`.
+//  There is nothing to fetch, return null.
+// scalastyle:on
+require(offset < untilOffset, s"offset: $offset, untilOffset: 
$untilOffset")
+logDebug(s"Get $groupId $topicPartition nextOffset 
$nextOffsetInFetchedData requested $offset")
+try {
+  if (offset != nextOffsetInFetchedData) {
+logInfo(s"Initial fetch for $topicPartition $offset")
+seek(offset)
+poll(pollTimeoutMs)
+  } else if (!fetchedData.hasNext()) {
+// The last pre-fetched data has been drained.
+poll(pollTimeoutMs)
+  }
+  getRecordFromFetchedData(offset, untilOffset)
+} catch {
+  case e: OffsetOutOfRangeException =>
+logWarning(s"Cannot fetch offset $offset. Some data may be lost. 
Recovering from " +
+  "the beginning offset", e)
+advanceToBeginningOffsetAndFetch(offset, untilOffset, 
pollTimeoutMs)
+}
+  }
+
+  /**
+   * Try to advance to the beginning offset and fetch again. 
`beginningOffset` should be in
+   * `[offset, untilOffset]`. If not, it will try to fetch `offset` again 
if it's in
+   * `[beginningOffset, latestOffset)`. Otherwise, it will return null and 
reset the pre-fetched
+   * data.
+   */
+  private def advanceToBeginningOffsetAndFetch(
+  offset: Long,
+  untilOffset: Long,
+  pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = {
+val beginningOffset = getBeginningOffset()
+if (beginningOffset <= offset) {
+  val latestOffset = getLatestOffset()
+  if (latestOffset <= offset) {
+// beginningOffset <= latestOffset - 1 < offset <= untilOffset - 1
+logWarning(s"Offset ${offset} is later than the latest offset 
$latestOffset. " +
+  s"Skipped [$offset, $untilOffset)")
+reset()
+null
+  } else {
+// beginningOffset <= offset <= min(latestOffset - 1, untilOffset 
- 1)
+//
+// This will happen when a topic is deleted and recreated, and new 
data are pushed very fast
+// , then we will see `offset` disappears first then appears 
again. Although the parameters
+// are same, the state in Kafka cluster is changed, so it's not an 
endless loop.
+//
+// In addition, the stack here won't be deep unless the user keeps 
deleting and creating the
+// topic very fast.
+//
+// Therefore, this recursive call is safe.
+getAndIgnoreLostData(offset, untilOffset, pollTimeoutMs)
+  }
+} else {
+  if (beginningOffset >= untilOffset) {
+// offset <= untilOffset - 1 < beginningOffset
--- End diff --

also improve printed warning. gave an example in another comment.


---
If your project is set up 

[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...

2016-11-15 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/15820#discussion_r88139536
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
 ---
@@ -83,6 +86,139 @@ private[kafka010] case class CachedKafkaConsumer 
private(
 record
   }
 
+  /**
+   * Get the record at the `offset`. If it doesn't exist, try to get the 
earliest record in
+   * `[offset, untilOffset)`.
+   */
+  def getAndIgnoreLostData(
+  offset: Long,
+  untilOffset: Long,
+  pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = {
+// scalastyle:off
+// When `failOnDataLoss` is `false`, we need to handle the following 
cases (note: untilOffset and latestOffset are exclusive):
+// 1. Some data are aged out, and `offset < beginningOffset <= 
untilOffset - 1 <= latestOffset - 1`
+//  Seek to the beginningOffset and fetch the data.
+// 2. Some data are aged out, and `offset <= untilOffset - 1 < 
beginningOffset`.
+//  There is nothing to fetch, return null.
+// 3. The topic is deleted.
+//  There is nothing to fetch, return null.
+// 4. The topic is deleted and recreated, and `beginningOffset <= 
offset <= untilOffset - 1 <= latestOffset - 1`.
+//  We cannot detect this case. We can still fetch data like 
nothing happens.
+// 5. The topic is deleted and recreated, and `beginningOffset <= 
offset < latestOffset - 1 < untilOffset - 1`.
+//  Same as 4.
+// 6. The topic is deleted and recreated, and `beginningOffset <= 
latestOffset - 1 < offset <= untilOffset - 1`.
+//  There is nothing to fetch, return null.
+// 7. The topic is deleted and recreated, and `offset < 
beginningOffset <= untilOffset - 1`.
+//  Same as 1.
+// 8. The topic is deleted and recreated, and `offset <= untilOffset - 
1 < beginningOffset`.
+//  There is nothing to fetch, return null.
+// scalastyle:on
+require(offset < untilOffset, s"offset: $offset, untilOffset: 
$untilOffset")
+logDebug(s"Get $groupId $topicPartition nextOffset 
$nextOffsetInFetchedData requested $offset")
+try {
+  if (offset != nextOffsetInFetchedData) {
+logInfo(s"Initial fetch for $topicPartition $offset")
+seek(offset)
+poll(pollTimeoutMs)
+  } else if (!fetchedData.hasNext()) {
--- End diff --

can these two conditions be merged as 
```
if (offset != nextOffsetInFetchedData || !fetchedData.hasNext()) {
  seek(offset)
  poll(pollTimeoutMs)
}
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...

2016-11-15 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/15820#discussion_r88137586
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
 ---
@@ -83,6 +86,139 @@ private[kafka010] case class CachedKafkaConsumer 
private(
 record
   }
 
+  /**
+   * Get the record at the `offset`. If it doesn't exist, try to get the 
earliest record in
+   * `[offset, untilOffset)`.
+   */
+  def getAndIgnoreLostData(
+  offset: Long,
+  untilOffset: Long,
+  pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = {
+// scalastyle:off
+// When `failOnDataLoss` is `false`, we need to handle the following 
cases (note: untilOffset and latestOffset are exclusive):
+// 1. Some data are aged out, and `offset < beginningOffset <= 
untilOffset - 1 <= latestOffset - 1`
+//  Seek to the beginningOffset and fetch the data.
+// 2. Some data are aged out, and `offset <= untilOffset - 1 < 
beginningOffset`.
+//  There is nothing to fetch, return null.
+// 3. The topic is deleted.
+//  There is nothing to fetch, return null.
+// 4. The topic is deleted and recreated, and `beginningOffset <= 
offset <= untilOffset - 1 <= latestOffset - 1`.
+//  We cannot detect this case. We can still fetch data like 
nothing happens.
+// 5. The topic is deleted and recreated, and `beginningOffset <= 
offset < latestOffset - 1 < untilOffset - 1`.
+//  Same as 4.
+// 6. The topic is deleted and recreated, and `beginningOffset <= 
latestOffset - 1 < offset <= untilOffset - 1`.
+//  There is nothing to fetch, return null.
+// 7. The topic is deleted and recreated, and `offset < 
beginningOffset <= untilOffset - 1`.
+//  Same as 1.
+// 8. The topic is deleted and recreated, and `offset <= untilOffset - 
1 < beginningOffset`.
+//  There is nothing to fetch, return null.
+// scalastyle:on
+require(offset < untilOffset, s"offset: $offset, untilOffset: 
$untilOffset")
+logDebug(s"Get $groupId $topicPartition nextOffset 
$nextOffsetInFetchedData requested $offset")
+try {
+  if (offset != nextOffsetInFetchedData) {
+logInfo(s"Initial fetch for $topicPartition $offset")
+seek(offset)
+poll(pollTimeoutMs)
+  } else if (!fetchedData.hasNext()) {
+// The last pre-fetched data has been drained.
+poll(pollTimeoutMs)
+  }
+  getRecordFromFetchedData(offset, untilOffset)
+} catch {
+  case e: OffsetOutOfRangeException =>
+logWarning(s"Cannot fetch offset $offset. Some data may be lost. 
Recovering from " +
+  "the beginning offset", e)
+advanceToBeginningOffsetAndFetch(offset, untilOffset, 
pollTimeoutMs)
+}
+  }
+
+  /**
+   * Try to advance to the beginning offset and fetch again. 
`beginningOffset` should be in
+   * `[offset, untilOffset]`. If not, it will try to fetch `offset` again 
if it's in
+   * `[beginningOffset, latestOffset)`. Otherwise, it will return null and 
reset the pre-fetched
+   * data.
+   */
+  private def advanceToBeginningOffsetAndFetch(
+  offset: Long,
+  untilOffset: Long,
+  pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = {
+val beginningOffset = getBeginningOffset()
+if (beginningOffset <= offset) {
+  val latestOffset = getLatestOffset()
+  if (latestOffset <= offset) {
+// beginningOffset <= latestOffset - 1 < offset <= untilOffset - 1
+logWarning(s"Offset ${offset} is later than the latest offset 
$latestOffset. " +
+  s"Skipped [$offset, $untilOffset)")
+reset()
+null
+  } else {
+// beginningOffset <= offset <= min(latestOffset - 1, untilOffset 
- 1)
+//
+// This will happen when a topic is deleted and recreated, and new 
data are pushed very fast
+// , then we will see `offset` disappears first then appears 
again. Although the parameters
+// are same, the state in Kafka cluster is changed, so it's not an 
endless loop.
+//
+// In addition, the stack here won't be deep unless the user keeps 
deleting and creating the
+// topic very fast.
+//
+// Therefore, this recursive call is safe.
+getAndIgnoreLostData(offset, untilOffset, pollTimeoutMs)
+  }
+} else {
+  if (beginningOffset >= untilOffset) {
+// offset <= untilOffset - 1 < beginningOffset
+logWarning(s"Buffer miss for $groupId $topicPartition [$offset, 
$untilOffset)")
+reset()
+

[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...

2016-11-15 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/15820#discussion_r88137989
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
 ---
@@ -83,6 +86,139 @@ private[kafka010] case class CachedKafkaConsumer 
private(
 record
   }
 
+  /**
+   * Get the record at the `offset`. If it doesn't exist, try to get the 
earliest record in
+   * `[offset, untilOffset)`.
+   */
+  def getAndIgnoreLostData(
--- End diff --

Cant these two code paths be merged?? I envision that there should be a 
single method which takes a parameter "failOnDataLoss", and the method would 
try to get the next fetched offset and [throw error] OR [retry with warning] 
based on that flag. 

I think its good simplify the code further for future maintainability.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...

2016-11-15 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/15820#discussion_r88135087
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
 ---
@@ -83,6 +86,139 @@ private[kafka010] case class CachedKafkaConsumer 
private(
 record
   }
 
+  /**
+   * Get the record at the `offset`. If it doesn't exist, try to get the 
earliest record in
+   * `[offset, untilOffset)`.
+   */
+  def getAndIgnoreLostData(
+  offset: Long,
+  untilOffset: Long,
+  pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = {
+// scalastyle:off
+// When `failOnDataLoss` is `false`, we need to handle the following 
cases (note: untilOffset and latestOffset are exclusive):
+// 1. Some data are aged out, and `offset < beginningOffset <= 
untilOffset - 1 <= latestOffset - 1`
+//  Seek to the beginningOffset and fetch the data.
+// 2. Some data are aged out, and `offset <= untilOffset - 1 < 
beginningOffset`.
+//  There is nothing to fetch, return null.
+// 3. The topic is deleted.
+//  There is nothing to fetch, return null.
+// 4. The topic is deleted and recreated, and `beginningOffset <= 
offset <= untilOffset - 1 <= latestOffset - 1`.
+//  We cannot detect this case. We can still fetch data like 
nothing happens.
+// 5. The topic is deleted and recreated, and `beginningOffset <= 
offset < latestOffset - 1 < untilOffset - 1`.
+//  Same as 4.
+// 6. The topic is deleted and recreated, and `beginningOffset <= 
latestOffset - 1 < offset <= untilOffset - 1`.
+//  There is nothing to fetch, return null.
+// 7. The topic is deleted and recreated, and `offset < 
beginningOffset <= untilOffset - 1`.
+//  Same as 1.
+// 8. The topic is deleted and recreated, and `offset <= untilOffset - 
1 < beginningOffset`.
+//  There is nothing to fetch, return null.
+// scalastyle:on
+require(offset < untilOffset, s"offset: $offset, untilOffset: 
$untilOffset")
+logDebug(s"Get $groupId $topicPartition nextOffset 
$nextOffsetInFetchedData requested $offset")
+try {
+  if (offset != nextOffsetInFetchedData) {
+logInfo(s"Initial fetch for $topicPartition $offset")
+seek(offset)
+poll(pollTimeoutMs)
+  } else if (!fetchedData.hasNext()) {
+// The last pre-fetched data has been drained.
+poll(pollTimeoutMs)
+  }
+  getRecordFromFetchedData(offset, untilOffset)
+} catch {
+  case e: OffsetOutOfRangeException =>
+logWarning(s"Cannot fetch offset $offset. Some data may be lost. 
Recovering from " +
+  "the beginning offset", e)
+advanceToBeginningOffsetAndFetch(offset, untilOffset, 
pollTimeoutMs)
+}
+  }
+
+  /**
+   * Try to advance to the beginning offset and fetch again. 
`beginningOffset` should be in
+   * `[offset, untilOffset]`. If not, it will try to fetch `offset` again 
if it's in
+   * `[beginningOffset, latestOffset)`. Otherwise, it will return null and 
reset the pre-fetched
+   * data.
+   */
+  private def advanceToBeginningOffsetAndFetch(
+  offset: Long,
+  untilOffset: Long,
+  pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = {
+val beginningOffset = getBeginningOffset()
+if (beginningOffset <= offset) {
+  val latestOffset = getLatestOffset()
+  if (latestOffset <= offset) {
+// beginningOffset <= latestOffset - 1 < offset <= untilOffset - 1
+logWarning(s"Offset ${offset} is later than the latest offset 
$latestOffset. " +
+  s"Skipped [$offset, $untilOffset)")
+reset()
+null
+  } else {
+// beginningOffset <= offset <= min(latestOffset - 1, untilOffset 
- 1)
+//
+// This will happen when a topic is deleted and recreated, and new 
data are pushed very fast
+// , then we will see `offset` disappears first then appears 
again. Although the parameters
+// are same, the state in Kafka cluster is changed, so it's not an 
endless loop.
+//
+// In addition, the stack here won't be deep unless the user keeps 
deleting and creating the
+// topic very fast.
+//
+// Therefore, this recursive call is safe.
+getAndIgnoreLostData(offset, untilOffset, pollTimeoutMs)
+  }
+} else {
+  if (beginningOffset >= untilOffset) {
+// offset <= untilOffset - 1 < beginningOffset
+logWarning(s"Buffer miss for $groupId $topicPartition [$offset, 
$untilOffset)")
+reset()
+

[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...

2016-11-15 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/15820#discussion_r88140370
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
 ---
@@ -83,6 +86,139 @@ private[kafka010] case class CachedKafkaConsumer 
private(
 record
   }
 
+  /**
+   * Get the record at the `offset`. If it doesn't exist, try to get the 
earliest record in
+   * `[offset, untilOffset)`.
+   */
+  def getAndIgnoreLostData(
+  offset: Long,
+  untilOffset: Long,
+  pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = {
+// scalastyle:off
+// When `failOnDataLoss` is `false`, we need to handle the following 
cases (note: untilOffset and latestOffset are exclusive):
+// 1. Some data are aged out, and `offset < beginningOffset <= 
untilOffset - 1 <= latestOffset - 1`
+//  Seek to the beginningOffset and fetch the data.
+// 2. Some data are aged out, and `offset <= untilOffset - 1 < 
beginningOffset`.
+//  There is nothing to fetch, return null.
+// 3. The topic is deleted.
+//  There is nothing to fetch, return null.
+// 4. The topic is deleted and recreated, and `beginningOffset <= 
offset <= untilOffset - 1 <= latestOffset - 1`.
+//  We cannot detect this case. We can still fetch data like 
nothing happens.
+// 5. The topic is deleted and recreated, and `beginningOffset <= 
offset < latestOffset - 1 < untilOffset - 1`.
+//  Same as 4.
+// 6. The topic is deleted and recreated, and `beginningOffset <= 
latestOffset - 1 < offset <= untilOffset - 1`.
+//  There is nothing to fetch, return null.
+// 7. The topic is deleted and recreated, and `offset < 
beginningOffset <= untilOffset - 1`.
+//  Same as 1.
+// 8. The topic is deleted and recreated, and `offset <= untilOffset - 
1 < beginningOffset`.
+//  There is nothing to fetch, return null.
+// scalastyle:on
+require(offset < untilOffset, s"offset: $offset, untilOffset: 
$untilOffset")
+logDebug(s"Get $groupId $topicPartition nextOffset 
$nextOffsetInFetchedData requested $offset")
+try {
+  if (offset != nextOffsetInFetchedData) {
+logInfo(s"Initial fetch for $topicPartition $offset")
+seek(offset)
+poll(pollTimeoutMs)
+  } else if (!fetchedData.hasNext()) {
+// The last pre-fetched data has been drained.
+poll(pollTimeoutMs)
+  }
+  getRecordFromFetchedData(offset, untilOffset)
+} catch {
+  case e: OffsetOutOfRangeException =>
+logWarning(s"Cannot fetch offset $offset. Some data may be lost. 
Recovering from " +
+  "the beginning offset", e)
+advanceToBeginningOffsetAndFetch(offset, untilOffset, 
pollTimeoutMs)
+}
+  }
+
+  /**
+   * Try to advance to the beginning offset and fetch again. 
`beginningOffset` should be in
+   * `[offset, untilOffset]`. If not, it will try to fetch `offset` again 
if it's in
+   * `[beginningOffset, latestOffset)`. Otherwise, it will return null and 
reset the pre-fetched
+   * data.
+   */
+  private def advanceToBeginningOffsetAndFetch(
+  offset: Long,
+  untilOffset: Long,
+  pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = {
+val beginningOffset = getBeginningOffset()
+if (beginningOffset <= offset) {
+  val latestOffset = getLatestOffset()
+  if (latestOffset <= offset) {
+// beginningOffset <= latestOffset - 1 < offset <= untilOffset - 1
+logWarning(s"Offset ${offset} is later than the latest offset 
$latestOffset. " +
+  s"Skipped [$offset, $untilOffset)")
+reset()
+null
+  } else {
+// beginningOffset <= offset <= min(latestOffset - 1, untilOffset 
- 1)
+//
+// This will happen when a topic is deleted and recreated, and new 
data are pushed very fast
+// , then we will see `offset` disappears first then appears 
again. Although the parameters
+// are same, the state in Kafka cluster is changed, so it's not an 
endless loop.
+//
+// In addition, the stack here won't be deep unless the user keeps 
deleting and creating the
+// topic very fast.
+//
+// Therefore, this recursive call is safe.
+getAndIgnoreLostData(offset, untilOffset, pollTimeoutMs)
+  }
+} else {
+  if (beginningOffset >= untilOffset) {
+// offset <= untilOffset - 1 < beginningOffset
--- End diff --

improve docs by addition plain word explanation. e.g. required offset is 
earlier than the available 

[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...

2016-11-15 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/15820#discussion_r88135655
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
 ---
@@ -83,6 +86,139 @@ private[kafka010] case class CachedKafkaConsumer 
private(
 record
   }
 
+  /**
+   * Get the record at the `offset`. If it doesn't exist, try to get the 
earliest record in
+   * `[offset, untilOffset)`.
+   */
+  def getAndIgnoreLostData(
+  offset: Long,
+  untilOffset: Long,
+  pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = {
+// scalastyle:off
+// When `failOnDataLoss` is `false`, we need to handle the following 
cases (note: untilOffset and latestOffset are exclusive):
+// 1. Some data are aged out, and `offset < beginningOffset <= 
untilOffset - 1 <= latestOffset - 1`
+//  Seek to the beginningOffset and fetch the data.
+// 2. Some data are aged out, and `offset <= untilOffset - 1 < 
beginningOffset`.
+//  There is nothing to fetch, return null.
+// 3. The topic is deleted.
+//  There is nothing to fetch, return null.
+// 4. The topic is deleted and recreated, and `beginningOffset <= 
offset <= untilOffset - 1 <= latestOffset - 1`.
+//  We cannot detect this case. We can still fetch data like 
nothing happens.
+// 5. The topic is deleted and recreated, and `beginningOffset <= 
offset < latestOffset - 1 < untilOffset - 1`.
+//  Same as 4.
+// 6. The topic is deleted and recreated, and `beginningOffset <= 
latestOffset - 1 < offset <= untilOffset - 1`.
+//  There is nothing to fetch, return null.
+// 7. The topic is deleted and recreated, and `offset < 
beginningOffset <= untilOffset - 1`.
+//  Same as 1.
+// 8. The topic is deleted and recreated, and `offset <= untilOffset - 
1 < beginningOffset`.
+//  There is nothing to fetch, return null.
+// scalastyle:on
+require(offset < untilOffset, s"offset: $offset, untilOffset: 
$untilOffset")
+logDebug(s"Get $groupId $topicPartition nextOffset 
$nextOffsetInFetchedData requested $offset")
+try {
+  if (offset != nextOffsetInFetchedData) {
+logInfo(s"Initial fetch for $topicPartition $offset")
+seek(offset)
+poll(pollTimeoutMs)
+  } else if (!fetchedData.hasNext()) {
+// The last pre-fetched data has been drained.
+poll(pollTimeoutMs)
+  }
+  getRecordFromFetchedData(offset, untilOffset)
+} catch {
+  case e: OffsetOutOfRangeException =>
+logWarning(s"Cannot fetch offset $offset. Some data may be lost. 
Recovering from " +
+  "the beginning offset", e)
+advanceToBeginningOffsetAndFetch(offset, untilOffset, 
pollTimeoutMs)
+}
+  }
+
+  /**
+   * Try to advance to the beginning offset and fetch again. 
`beginningOffset` should be in
+   * `[offset, untilOffset]`. If not, it will try to fetch `offset` again 
if it's in
+   * `[beginningOffset, latestOffset)`. Otherwise, it will return null and 
reset the pre-fetched
+   * data.
+   */
+  private def advanceToBeginningOffsetAndFetch(
+  offset: Long,
+  untilOffset: Long,
+  pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = {
+val beginningOffset = getBeginningOffset()
+if (beginningOffset <= offset) {
+  val latestOffset = getLatestOffset()
+  if (latestOffset <= offset) {
+// beginningOffset <= latestOffset - 1 < offset <= untilOffset - 1
+logWarning(s"Offset ${offset} is later than the latest offset 
$latestOffset. " +
+  s"Skipped [$offset, $untilOffset)")
+reset()
+null
+  } else {
+// beginningOffset <= offset <= min(latestOffset - 1, untilOffset 
- 1)
+//
+// This will happen when a topic is deleted and recreated, and new 
data are pushed very fast
+// , then we will see `offset` disappears first then appears 
again. Although the parameters
+// are same, the state in Kafka cluster is changed, so it's not an 
endless loop.
+//
+// In addition, the stack here won't be deep unless the user keeps 
deleting and creating the
+// topic very fast.
+//
+// Therefore, this recursive call is safe.
+getAndIgnoreLostData(offset, untilOffset, pollTimeoutMs)
+  }
+} else {
+  if (beginningOffset >= untilOffset) {
+// offset <= untilOffset - 1 < beginningOffset
+logWarning(s"Buffer miss for $groupId $topicPartition [$offset, 
$untilOffset)")
+reset()
+

[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...

2016-11-15 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/15820#discussion_r88136863
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
 ---
@@ -83,6 +86,139 @@ private[kafka010] case class CachedKafkaConsumer 
private(
 record
   }
 
+  /**
+   * Get the record at the `offset`. If it doesn't exist, try to get the 
earliest record in
+   * `[offset, untilOffset)`.
+   */
+  def getAndIgnoreLostData(
+  offset: Long,
+  untilOffset: Long,
+  pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = {
+// scalastyle:off
+// When `failOnDataLoss` is `false`, we need to handle the following 
cases (note: untilOffset and latestOffset are exclusive):
--- End diff --

Okay i can guess that you used beginningOffset because of 
consumer.seekToBeginning. But then we should be consistent with seekToEnd as 
well by calling latest offset as endOffset. Rather, lets be consistent with 
more well known names "earliest" and "latest".


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...

2016-11-15 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/15820#discussion_r88139654
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
 ---
@@ -83,6 +86,139 @@ private[kafka010] case class CachedKafkaConsumer 
private(
 record
   }
 
+  /**
+   * Get the record at the `offset`. If it doesn't exist, try to get the 
earliest record in
+   * `[offset, untilOffset)`.
+   */
+  def getAndIgnoreLostData(
+  offset: Long,
+  untilOffset: Long,
+  pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = {
+// scalastyle:off
+// When `failOnDataLoss` is `false`, we need to handle the following 
cases (note: untilOffset and latestOffset are exclusive):
+// 1. Some data are aged out, and `offset < beginningOffset <= 
untilOffset - 1 <= latestOffset - 1`
+//  Seek to the beginningOffset and fetch the data.
+// 2. Some data are aged out, and `offset <= untilOffset - 1 < 
beginningOffset`.
+//  There is nothing to fetch, return null.
+// 3. The topic is deleted.
+//  There is nothing to fetch, return null.
+// 4. The topic is deleted and recreated, and `beginningOffset <= 
offset <= untilOffset - 1 <= latestOffset - 1`.
+//  We cannot detect this case. We can still fetch data like 
nothing happens.
+// 5. The topic is deleted and recreated, and `beginningOffset <= 
offset < latestOffset - 1 < untilOffset - 1`.
+//  Same as 4.
+// 6. The topic is deleted and recreated, and `beginningOffset <= 
latestOffset - 1 < offset <= untilOffset - 1`.
+//  There is nothing to fetch, return null.
+// 7. The topic is deleted and recreated, and `offset < 
beginningOffset <= untilOffset - 1`.
+//  Same as 1.
+// 8. The topic is deleted and recreated, and `offset <= untilOffset - 
1 < beginningOffset`.
+//  There is nothing to fetch, return null.
+// scalastyle:on
+require(offset < untilOffset, s"offset: $offset, untilOffset: 
$untilOffset")
+logDebug(s"Get $groupId $topicPartition nextOffset 
$nextOffsetInFetchedData requested $offset")
+try {
+  if (offset != nextOffsetInFetchedData) {
+logInfo(s"Initial fetch for $topicPartition $offset")
+seek(offset)
+poll(pollTimeoutMs)
+  } else if (!fetchedData.hasNext()) {
+// The last pre-fetched data has been drained.
+poll(pollTimeoutMs)
+  }
+  getRecordFromFetchedData(offset, untilOffset)
+} catch {
+  case e: OffsetOutOfRangeException =>
+logWarning(s"Cannot fetch offset $offset. Some data may be lost. 
Recovering from " +
+  "the beginning offset", e)
+advanceToBeginningOffsetAndFetch(offset, untilOffset, 
pollTimeoutMs)
+}
+  }
+
+  /**
+   * Try to advance to the beginning offset and fetch again. 
`beginningOffset` should be in
+   * `[offset, untilOffset]`. If not, it will try to fetch `offset` again 
if it's in
+   * `[beginningOffset, latestOffset)`. Otherwise, it will return null and 
reset the pre-fetched
+   * data.
+   */
+  private def advanceToBeginningOffsetAndFetch(
+  offset: Long,
+  untilOffset: Long,
+  pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = {
+val beginningOffset = getBeginningOffset()
+if (beginningOffset <= offset) {
+  val latestOffset = getLatestOffset()
+  if (latestOffset <= offset) {
+// beginningOffset <= latestOffset - 1 < offset <= untilOffset - 1
+logWarning(s"Offset ${offset} is later than the latest offset 
$latestOffset. " +
+  s"Skipped [$offset, $untilOffset)")
+reset()
+null
+  } else {
+// beginningOffset <= offset <= min(latestOffset - 1, untilOffset 
- 1)
+//
+// This will happen when a topic is deleted and recreated, and new 
data are pushed very fast
+// , then we will see `offset` disappears first then appears 
again. Although the parameters
--- End diff --

nit: comma after new line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...

2016-11-15 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/15820#discussion_r88134704
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
 ---
@@ -83,6 +86,139 @@ private[kafka010] case class CachedKafkaConsumer 
private(
 record
   }
 
+  /**
+   * Get the record at the `offset`. If it doesn't exist, try to get the 
earliest record in
+   * `[offset, untilOffset)`.
+   */
+  def getAndIgnoreLostData(
+  offset: Long,
+  untilOffset: Long,
+  pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = {
+// scalastyle:off
+// When `failOnDataLoss` is `false`, we need to handle the following 
cases (note: untilOffset and latestOffset are exclusive):
+// 1. Some data are aged out, and `offset < beginningOffset <= 
untilOffset - 1 <= latestOffset - 1`
+//  Seek to the beginningOffset and fetch the data.
+// 2. Some data are aged out, and `offset <= untilOffset - 1 < 
beginningOffset`.
+//  There is nothing to fetch, return null.
+// 3. The topic is deleted.
+//  There is nothing to fetch, return null.
+// 4. The topic is deleted and recreated, and `beginningOffset <= 
offset <= untilOffset - 1 <= latestOffset - 1`.
+//  We cannot detect this case. We can still fetch data like 
nothing happens.
--- End diff --

nit: nothing happened.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...

2016-11-15 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/15820#discussion_r88137647
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
 ---
@@ -83,6 +86,139 @@ private[kafka010] case class CachedKafkaConsumer 
private(
 record
   }
 
+  /**
+   * Get the record at the `offset`. If it doesn't exist, try to get the 
earliest record in
+   * `[offset, untilOffset)`.
+   */
+  def getAndIgnoreLostData(
+  offset: Long,
+  untilOffset: Long,
+  pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = {
+// scalastyle:off
+// When `failOnDataLoss` is `false`, we need to handle the following 
cases (note: untilOffset and latestOffset are exclusive):
+// 1. Some data are aged out, and `offset < beginningOffset <= 
untilOffset - 1 <= latestOffset - 1`
+//  Seek to the beginningOffset and fetch the data.
+// 2. Some data are aged out, and `offset <= untilOffset - 1 < 
beginningOffset`.
+//  There is nothing to fetch, return null.
+// 3. The topic is deleted.
+//  There is nothing to fetch, return null.
+// 4. The topic is deleted and recreated, and `beginningOffset <= 
offset <= untilOffset - 1 <= latestOffset - 1`.
+//  We cannot detect this case. We can still fetch data like 
nothing happens.
+// 5. The topic is deleted and recreated, and `beginningOffset <= 
offset < latestOffset - 1 < untilOffset - 1`.
+//  Same as 4.
+// 6. The topic is deleted and recreated, and `beginningOffset <= 
latestOffset - 1 < offset <= untilOffset - 1`.
+//  There is nothing to fetch, return null.
+// 7. The topic is deleted and recreated, and `offset < 
beginningOffset <= untilOffset - 1`.
+//  Same as 1.
+// 8. The topic is deleted and recreated, and `offset <= untilOffset - 
1 < beginningOffset`.
+//  There is nothing to fetch, return null.
+// scalastyle:on
+require(offset < untilOffset, s"offset: $offset, untilOffset: 
$untilOffset")
+logDebug(s"Get $groupId $topicPartition nextOffset 
$nextOffsetInFetchedData requested $offset")
+try {
+  if (offset != nextOffsetInFetchedData) {
+logInfo(s"Initial fetch for $topicPartition $offset")
+seek(offset)
+poll(pollTimeoutMs)
+  } else if (!fetchedData.hasNext()) {
+// The last pre-fetched data has been drained.
+poll(pollTimeoutMs)
+  }
+  getRecordFromFetchedData(offset, untilOffset)
+} catch {
+  case e: OffsetOutOfRangeException =>
+logWarning(s"Cannot fetch offset $offset. Some data may be lost. 
Recovering from " +
+  "the beginning offset", e)
+advanceToBeginningOffsetAndFetch(offset, untilOffset, 
pollTimeoutMs)
+}
+  }
+
+  /**
+   * Try to advance to the beginning offset and fetch again. 
`beginningOffset` should be in
+   * `[offset, untilOffset]`. If not, it will try to fetch `offset` again 
if it's in
+   * `[beginningOffset, latestOffset)`. Otherwise, it will return null and 
reset the pre-fetched
+   * data.
+   */
+  private def advanceToBeginningOffsetAndFetch(
+  offset: Long,
+  untilOffset: Long,
+  pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = {
+val beginningOffset = getBeginningOffset()
+if (beginningOffset <= offset) {
+  val latestOffset = getLatestOffset()
+  if (latestOffset <= offset) {
+// beginningOffset <= latestOffset - 1 < offset <= untilOffset - 1
+logWarning(s"Offset ${offset} is later than the latest offset 
$latestOffset. " +
+  s"Skipped [$offset, $untilOffset)")
+reset()
+null
+  } else {
+// beginningOffset <= offset <= min(latestOffset - 1, untilOffset 
- 1)
+//
+// This will happen when a topic is deleted and recreated, and new 
data are pushed very fast
+// , then we will see `offset` disappears first then appears 
again. Although the parameters
+// are same, the state in Kafka cluster is changed, so it's not an 
endless loop.
+//
+// In addition, the stack here won't be deep unless the user keeps 
deleting and creating the
+// topic very fast.
+//
+// Therefore, this recursive call is safe.
+getAndIgnoreLostData(offset, untilOffset, pollTimeoutMs)
+  }
+} else {
+  if (beginningOffset >= untilOffset) {
+// offset <= untilOffset - 1 < beginningOffset
+logWarning(s"Buffer miss for $groupId $topicPartition [$offset, 
$untilOffset)")
+reset()
+

[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...

2016-11-15 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/15820#discussion_r88134526
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
 ---
@@ -83,6 +86,139 @@ private[kafka010] case class CachedKafkaConsumer 
private(
 record
   }
 
+  /**
+   * Get the record at the `offset`. If it doesn't exist, try to get the 
earliest record in
+   * `[offset, untilOffset)`.
+   */
+  def getAndIgnoreLostData(
+  offset: Long,
+  untilOffset: Long,
+  pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = {
+// scalastyle:off
+// When `failOnDataLoss` is `false`, we need to handle the following 
cases (note: untilOffset and latestOffset are exclusive):
--- End diff --

`latestOffset` means the latest offset that we get from the Kafka.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...

2016-11-10 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/15820#discussion_r87527804
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
 ---
@@ -83,6 +86,129 @@ private[kafka010] case class CachedKafkaConsumer 
private(
 record
   }
 
+  /**
+   * Get the record at the `offset`. If it doesn't exist, try to get the 
earliest record in
+   * `[offset, untilOffset)`.
+   */
+  def getAndIgnoreLostData(
+  offset: Long,
+  untilOffset: Long,
+  pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = {
+// scalastyle:off
+// When `failOnDataLoss` is `false`, we need to handle the following 
cases (note: untilOffset and latestOffset are exclusive):
+// 1. Some data are aged out, and `offset < beginningOffset <= 
untilOffset - 1 <= latestOffset - 1`
+//  Seek to the beginningOffset and fetch the data.
+// 2. Some data are aged out, and `offset <= untilOffset - 1 < 
beginningOffset`.
+//  There is nothing to fetch, return null.
+// 3. The topic is deleted.
+//  There is nothing to fetch, return null.
+// 4. The topic is deleted and recreated, and `beginningOffset <= 
offset <= untilOffset - 1 <= latestOffset - 1`.
+//  We cannot detect this case. We can still fetch data like 
nothing happens.
+// 5. The topic is deleted and recreated, and `beginningOffset <= 
offset < latestOffset - 1 < untilOffset - 1`.
+//  Same as 4.
+// 6. The topic is deleted and recreated, and `beginningOffset <= 
latestOffset - 1 < offset <= untilOffset - 1`.
+//  There is nothing to fetch, return null.
+// 7. The topic is deleted and recreated, and `offset < 
beginningOffset <= untilOffset - 1`.
+//  Same as 1.
+// 8. The topic is deleted and recreated, and `offset <= untilOffset - 
1 < beginningOffset`.
+//  There is nothing to fetch, return null.
+// scalastyle:on
+require(offset < untilOffset, s"offset: $offset, untilOffset: 
$untilOffset")
+logDebug(s"Get $groupId $topicPartition nextOffset 
$nextOffsetInFetchedData requested $offset")
+try {
+  if (offset != nextOffsetInFetchedData) {
+logInfo(s"Initial fetch for $topicPartition $offset")
+seek(offset)
+poll(pollTimeoutMs)
+  } else if (!fetchedData.hasNext()) {
+// The last pre-fetched data has been drained.
+poll(pollTimeoutMs)
+  }
+  getRecordFromFetchedData(offset, untilOffset)
+} catch {
+  case e: OffsetOutOfRangeException =>
+logWarning(s"Cannot fetch offset $offset, try to recover from the 
beginning offset", e)
+advanceToBeginningOffsetAndFetch(offset, untilOffset, 
pollTimeoutMs)
+}
+  }
+
+  /**
+   * Try to advance to the beginning offset and fetch again. 
`beginningOffset` should be in
+   * `[offset, untilOffset]`. If not, it will try to fetch `offset` again 
if it's in
+   * `[beginningOffset, latestOffset)`. Otherwise, it will return null and 
reset the pre-fetched
+   * data.
+   */
+  private def advanceToBeginningOffsetAndFetch(
+  offset: Long,
+  untilOffset: Long,
+  pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = {
+val beginningOffset = getBeginningOffset()
+if (beginningOffset <= offset) {
+  val latestOffset = getLatestOffset()
+  if (latestOffset <= offset) {
+// beginningOffset <= latestOffset - 1 < offset <= untilOffset - 1
+logWarning(s"Offset ${offset} is later than the latest offset 
$latestOffset. " +
+  s"Skipped [$offset, $untilOffset)")
+reset()
+null
+  } else {
+// beginningOffset <= offset <= min(latestOffset - 1, untilOffset 
- 1)
+getAndIgnoreLostData(offset, untilOffset, pollTimeoutMs)
+  }
+} else {
+  if (beginningOffset >= untilOffset) {
+// offset <= untilOffset - 1 < beginningOffset
+logWarning(s"Buffer miss for $groupId $topicPartition [$offset, 
$untilOffset)")
+reset()
+null
+  } else {
+// offset < beginningOffset <= untilOffset - 1
+logWarning(s"Buffer miss for $groupId $topicPartition [$offset, 
$beginningOffset)")
+getAndIgnoreLostData(beginningOffset, untilOffset, pollTimeoutMs)
+  }
+}
+  }
+
+  /**
+   * Get the earliest record in [offset, untilOffset) from the fetched 
data. If there is no such
+   * record, returns null. Must be called after `poll`.
+   */
+  private def getRecordFromFetchedData(
+  offset: Long,
+  untilOffset: Long): 

[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...

2016-11-10 Thread huitseeker
Github user huitseeker commented on a diff in the pull request:

https://github.com/apache/spark/pull/15820#discussion_r87483402
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
 ---
@@ -83,6 +86,129 @@ private[kafka010] case class CachedKafkaConsumer 
private(
 record
   }
 
+  /**
+   * Get the record at the `offset`. If it doesn't exist, try to get the 
earliest record in
+   * `[offset, untilOffset)`.
+   */
+  def getAndIgnoreLostData(
+  offset: Long,
+  untilOffset: Long,
+  pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = {
+// scalastyle:off
+// When `failOnDataLoss` is `false`, we need to handle the following 
cases (note: untilOffset and latestOffset are exclusive):
+// 1. Some data are aged out, and `offset < beginningOffset <= 
untilOffset - 1 <= latestOffset - 1`
+//  Seek to the beginningOffset and fetch the data.
+// 2. Some data are aged out, and `offset <= untilOffset - 1 < 
beginningOffset`.
+//  There is nothing to fetch, return null.
+// 3. The topic is deleted.
+//  There is nothing to fetch, return null.
+// 4. The topic is deleted and recreated, and `beginningOffset <= 
offset <= untilOffset - 1 <= latestOffset - 1`.
+//  We cannot detect this case. We can still fetch data like 
nothing happens.
+// 5. The topic is deleted and recreated, and `beginningOffset <= 
offset < latestOffset - 1 < untilOffset - 1`.
+//  Same as 4.
+// 6. The topic is deleted and recreated, and `beginningOffset <= 
latestOffset - 1 < offset <= untilOffset - 1`.
+//  There is nothing to fetch, return null.
+// 7. The topic is deleted and recreated, and `offset < 
beginningOffset <= untilOffset - 1`.
+//  Same as 1.
+// 8. The topic is deleted and recreated, and `offset <= untilOffset - 
1 < beginningOffset`.
+//  There is nothing to fetch, return null.
+// scalastyle:on
+require(offset < untilOffset, s"offset: $offset, untilOffset: 
$untilOffset")
+logDebug(s"Get $groupId $topicPartition nextOffset 
$nextOffsetInFetchedData requested $offset")
+try {
+  if (offset != nextOffsetInFetchedData) {
+logInfo(s"Initial fetch for $topicPartition $offset")
+seek(offset)
+poll(pollTimeoutMs)
+  } else if (!fetchedData.hasNext()) {
+// The last pre-fetched data has been drained.
+poll(pollTimeoutMs)
+  }
+  getRecordFromFetchedData(offset, untilOffset)
+} catch {
+  case e: OffsetOutOfRangeException =>
+logWarning(s"Cannot fetch offset $offset, try to recover from the 
beginning offset", e)
+advanceToBeginningOffsetAndFetch(offset, untilOffset, 
pollTimeoutMs)
+}
+  }
+
+  /**
+   * Try to advance to the beginning offset and fetch again. 
`beginningOffset` should be in
+   * `[offset, untilOffset]`. If not, it will try to fetch `offset` again 
if it's in
+   * `[beginningOffset, latestOffset)`. Otherwise, it will return null and 
reset the pre-fetched
+   * data.
+   */
+  private def advanceToBeginningOffsetAndFetch(
+  offset: Long,
+  untilOffset: Long,
+  pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = {
+val beginningOffset = getBeginningOffset()
+if (beginningOffset <= offset) {
+  val latestOffset = getLatestOffset()
+  if (latestOffset <= offset) {
+// beginningOffset <= latestOffset - 1 < offset <= untilOffset - 1
+logWarning(s"Offset ${offset} is later than the latest offset 
$latestOffset. " +
+  s"Skipped [$offset, $untilOffset)")
+reset()
+null
+  } else {
+// beginningOffset <= offset <= min(latestOffset - 1, untilOffset 
- 1)
+getAndIgnoreLostData(offset, untilOffset, pollTimeoutMs)
--- End diff --

+1
Especially since with the loss of the use of `@tailrec`, this must now 
prove it will terminate within a limited stack size, and should prove it will 
under most stack size configurations.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...

2016-11-10 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/15820#discussion_r87450274
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
 ---
@@ -83,6 +86,129 @@ private[kafka010] case class CachedKafkaConsumer 
private(
 record
   }
 
+  /**
+   * Get the record at the `offset`. If it doesn't exist, try to get the 
earliest record in
+   * `[offset, untilOffset)`.
+   */
+  def getAndIgnoreLostData(
+  offset: Long,
+  untilOffset: Long,
+  pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = {
+// scalastyle:off
+// When `failOnDataLoss` is `false`, we need to handle the following 
cases (note: untilOffset and latestOffset are exclusive):
+// 1. Some data are aged out, and `offset < beginningOffset <= 
untilOffset - 1 <= latestOffset - 1`
+//  Seek to the beginningOffset and fetch the data.
+// 2. Some data are aged out, and `offset <= untilOffset - 1 < 
beginningOffset`.
+//  There is nothing to fetch, return null.
+// 3. The topic is deleted.
+//  There is nothing to fetch, return null.
+// 4. The topic is deleted and recreated, and `beginningOffset <= 
offset <= untilOffset - 1 <= latestOffset - 1`.
+//  We cannot detect this case. We can still fetch data like 
nothing happens.
+// 5. The topic is deleted and recreated, and `beginningOffset <= 
offset < latestOffset - 1 < untilOffset - 1`.
+//  Same as 4.
+// 6. The topic is deleted and recreated, and `beginningOffset <= 
latestOffset - 1 < offset <= untilOffset - 1`.
+//  There is nothing to fetch, return null.
+// 7. The topic is deleted and recreated, and `offset < 
beginningOffset <= untilOffset - 1`.
+//  Same as 1.
+// 8. The topic is deleted and recreated, and `offset <= untilOffset - 
1 < beginningOffset`.
+//  There is nothing to fetch, return null.
+// scalastyle:on
+require(offset < untilOffset, s"offset: $offset, untilOffset: 
$untilOffset")
+logDebug(s"Get $groupId $topicPartition nextOffset 
$nextOffsetInFetchedData requested $offset")
+try {
+  if (offset != nextOffsetInFetchedData) {
+logInfo(s"Initial fetch for $topicPartition $offset")
+seek(offset)
+poll(pollTimeoutMs)
+  } else if (!fetchedData.hasNext()) {
+// The last pre-fetched data has been drained.
+poll(pollTimeoutMs)
+  }
+  getRecordFromFetchedData(offset, untilOffset)
+} catch {
+  case e: OffsetOutOfRangeException =>
+logWarning(s"Cannot fetch offset $offset, try to recover from the 
beginning offset", e)
+advanceToBeginningOffsetAndFetch(offset, untilOffset, 
pollTimeoutMs)
+}
+  }
+
+  /**
+   * Try to advance to the beginning offset and fetch again. 
`beginningOffset` should be in
+   * `[offset, untilOffset]`. If not, it will try to fetch `offset` again 
if it's in
+   * `[beginningOffset, latestOffset)`. Otherwise, it will return null and 
reset the pre-fetched
+   * data.
+   */
+  private def advanceToBeginningOffsetAndFetch(
+  offset: Long,
+  untilOffset: Long,
+  pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = {
+val beginningOffset = getBeginningOffset()
+if (beginningOffset <= offset) {
+  val latestOffset = getLatestOffset()
+  if (latestOffset <= offset) {
+// beginningOffset <= latestOffset - 1 < offset <= untilOffset - 1
+logWarning(s"Offset ${offset} is later than the latest offset 
$latestOffset. " +
+  s"Skipped [$offset, $untilOffset)")
+reset()
+null
+  } else {
+// beginningOffset <= offset <= min(latestOffset - 1, untilOffset 
- 1)
+getAndIgnoreLostData(offset, untilOffset, pollTimeoutMs)
+  }
+} else {
+  if (beginningOffset >= untilOffset) {
+// offset <= untilOffset - 1 < beginningOffset
+logWarning(s"Buffer miss for $groupId $topicPartition [$offset, 
$untilOffset)")
+reset()
+null
+  } else {
+// offset < beginningOffset <= untilOffset - 1
+logWarning(s"Buffer miss for $groupId $topicPartition [$offset, 
$beginningOffset)")
+getAndIgnoreLostData(beginningOffset, untilOffset, pollTimeoutMs)
+  }
+}
+  }
+
+  /**
+   * Get the earliest record in [offset, untilOffset) from the fetched 
data. If there is no such
+   * record, returns null. Must be called after `poll`.
+   */
+  private def getRecordFromFetchedData(
+  offset: Long,
+  untilOffset: Long): 

[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...

2016-11-10 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/15820#discussion_r87439798
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
 ---
@@ -83,6 +86,129 @@ private[kafka010] case class CachedKafkaConsumer 
private(
 record
   }
 
+  /**
+   * Get the record at the `offset`. If it doesn't exist, try to get the 
earliest record in
+   * `[offset, untilOffset)`.
+   */
+  def getAndIgnoreLostData(
+  offset: Long,
+  untilOffset: Long,
+  pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = {
+// scalastyle:off
+// When `failOnDataLoss` is `false`, we need to handle the following 
cases (note: untilOffset and latestOffset are exclusive):
+// 1. Some data are aged out, and `offset < beginningOffset <= 
untilOffset - 1 <= latestOffset - 1`
+//  Seek to the beginningOffset and fetch the data.
+// 2. Some data are aged out, and `offset <= untilOffset - 1 < 
beginningOffset`.
+//  There is nothing to fetch, return null.
+// 3. The topic is deleted.
+//  There is nothing to fetch, return null.
+// 4. The topic is deleted and recreated, and `beginningOffset <= 
offset <= untilOffset - 1 <= latestOffset - 1`.
+//  We cannot detect this case. We can still fetch data like 
nothing happens.
+// 5. The topic is deleted and recreated, and `beginningOffset <= 
offset < latestOffset - 1 < untilOffset - 1`.
+//  Same as 4.
+// 6. The topic is deleted and recreated, and `beginningOffset <= 
latestOffset - 1 < offset <= untilOffset - 1`.
+//  There is nothing to fetch, return null.
+// 7. The topic is deleted and recreated, and `offset < 
beginningOffset <= untilOffset - 1`.
+//  Same as 1.
+// 8. The topic is deleted and recreated, and `offset <= untilOffset - 
1 < beginningOffset`.
+//  There is nothing to fetch, return null.
+// scalastyle:on
+require(offset < untilOffset, s"offset: $offset, untilOffset: 
$untilOffset")
+logDebug(s"Get $groupId $topicPartition nextOffset 
$nextOffsetInFetchedData requested $offset")
+try {
+  if (offset != nextOffsetInFetchedData) {
+logInfo(s"Initial fetch for $topicPartition $offset")
+seek(offset)
+poll(pollTimeoutMs)
+  } else if (!fetchedData.hasNext()) {
+// The last pre-fetched data has been drained.
+poll(pollTimeoutMs)
+  }
+  getRecordFromFetchedData(offset, untilOffset)
+} catch {
+  case e: OffsetOutOfRangeException =>
+logWarning(s"Cannot fetch offset $offset, try to recover from the 
beginning offset", e)
+advanceToBeginningOffsetAndFetch(offset, untilOffset, 
pollTimeoutMs)
+}
+  }
+
+  /**
+   * Try to advance to the beginning offset and fetch again. 
`beginningOffset` should be in
+   * `[offset, untilOffset]`. If not, it will try to fetch `offset` again 
if it's in
+   * `[beginningOffset, latestOffset)`. Otherwise, it will return null and 
reset the pre-fetched
+   * data.
+   */
+  private def advanceToBeginningOffsetAndFetch(
+  offset: Long,
+  untilOffset: Long,
+  pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = {
+val beginningOffset = getBeginningOffset()
+if (beginningOffset <= offset) {
+  val latestOffset = getLatestOffset()
+  if (latestOffset <= offset) {
+// beginningOffset <= latestOffset - 1 < offset <= untilOffset - 1
+logWarning(s"Offset ${offset} is later than the latest offset 
$latestOffset. " +
+  s"Skipped [$offset, $untilOffset)")
+reset()
+null
+  } else {
+// beginningOffset <= offset <= min(latestOffset - 1, untilOffset 
- 1)
+getAndIgnoreLostData(offset, untilOffset, pollTimeoutMs)
+  }
+} else {
+  if (beginningOffset >= untilOffset) {
+// offset <= untilOffset - 1 < beginningOffset
+logWarning(s"Buffer miss for $groupId $topicPartition [$offset, 
$untilOffset)")
+reset()
+null
+  } else {
+// offset < beginningOffset <= untilOffset - 1
+logWarning(s"Buffer miss for $groupId $topicPartition [$offset, 
$beginningOffset)")
+getAndIgnoreLostData(beginningOffset, untilOffset, pollTimeoutMs)
+  }
+}
+  }
+
+  /**
+   * Get the earliest record in [offset, untilOffset) from the fetched 
data. If there is no such
+   * record, returns null. Must be called after `poll`.
+   */
+  private def getRecordFromFetchedData(
+  offset: Long,
+  untilOffset: Long): 

[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...

2016-11-10 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/15820#discussion_r87438222
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
 ---
@@ -83,6 +86,129 @@ private[kafka010] case class CachedKafkaConsumer 
private(
 record
   }
 
+  /**
+   * Get the record at the `offset`. If it doesn't exist, try to get the 
earliest record in
+   * `[offset, untilOffset)`.
+   */
+  def getAndIgnoreLostData(
+  offset: Long,
+  untilOffset: Long,
+  pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = {
+// scalastyle:off
+// When `failOnDataLoss` is `false`, we need to handle the following 
cases (note: untilOffset and latestOffset are exclusive):
+// 1. Some data are aged out, and `offset < beginningOffset <= 
untilOffset - 1 <= latestOffset - 1`
+//  Seek to the beginningOffset and fetch the data.
+// 2. Some data are aged out, and `offset <= untilOffset - 1 < 
beginningOffset`.
+//  There is nothing to fetch, return null.
+// 3. The topic is deleted.
+//  There is nothing to fetch, return null.
+// 4. The topic is deleted and recreated, and `beginningOffset <= 
offset <= untilOffset - 1 <= latestOffset - 1`.
+//  We cannot detect this case. We can still fetch data like 
nothing happens.
+// 5. The topic is deleted and recreated, and `beginningOffset <= 
offset < latestOffset - 1 < untilOffset - 1`.
+//  Same as 4.
+// 6. The topic is deleted and recreated, and `beginningOffset <= 
latestOffset - 1 < offset <= untilOffset - 1`.
+//  There is nothing to fetch, return null.
+// 7. The topic is deleted and recreated, and `offset < 
beginningOffset <= untilOffset - 1`.
+//  Same as 1.
+// 8. The topic is deleted and recreated, and `offset <= untilOffset - 
1 < beginningOffset`.
+//  There is nothing to fetch, return null.
+// scalastyle:on
+require(offset < untilOffset, s"offset: $offset, untilOffset: 
$untilOffset")
+logDebug(s"Get $groupId $topicPartition nextOffset 
$nextOffsetInFetchedData requested $offset")
+try {
+  if (offset != nextOffsetInFetchedData) {
+logInfo(s"Initial fetch for $topicPartition $offset")
+seek(offset)
+poll(pollTimeoutMs)
+  } else if (!fetchedData.hasNext()) {
+// The last pre-fetched data has been drained.
+poll(pollTimeoutMs)
+  }
+  getRecordFromFetchedData(offset, untilOffset)
+} catch {
+  case e: OffsetOutOfRangeException =>
+logWarning(s"Cannot fetch offset $offset, try to recover from the 
beginning offset", e)
+advanceToBeginningOffsetAndFetch(offset, untilOffset, 
pollTimeoutMs)
+}
+  }
+
+  /**
+   * Try to advance to the beginning offset and fetch again. 
`beginningOffset` should be in
+   * `[offset, untilOffset]`. If not, it will try to fetch `offset` again 
if it's in
+   * `[beginningOffset, latestOffset)`. Otherwise, it will return null and 
reset the pre-fetched
+   * data.
+   */
+  private def advanceToBeginningOffsetAndFetch(
+  offset: Long,
+  untilOffset: Long,
+  pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = {
+val beginningOffset = getBeginningOffset()
+if (beginningOffset <= offset) {
+  val latestOffset = getLatestOffset()
+  if (latestOffset <= offset) {
+// beginningOffset <= latestOffset - 1 < offset <= untilOffset - 1
+logWarning(s"Offset ${offset} is later than the latest offset 
$latestOffset. " +
+  s"Skipped [$offset, $untilOffset)")
+reset()
+null
+  } else {
+// beginningOffset <= offset <= min(latestOffset - 1, untilOffset 
- 1)
+getAndIgnoreLostData(offset, untilOffset, pollTimeoutMs)
--- End diff --

I'm clearer on why this terminates, but I think it's worth a comment, since 
it's a mutually recursive call without changing arguments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...

2016-11-10 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/15820#discussion_r87438788
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
 ---
@@ -83,6 +86,129 @@ private[kafka010] case class CachedKafkaConsumer 
private(
 record
   }
 
+  /**
+   * Get the record at the `offset`. If it doesn't exist, try to get the 
earliest record in
+   * `[offset, untilOffset)`.
+   */
+  def getAndIgnoreLostData(
+  offset: Long,
+  untilOffset: Long,
+  pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = {
+// scalastyle:off
+// When `failOnDataLoss` is `false`, we need to handle the following 
cases (note: untilOffset and latestOffset are exclusive):
+// 1. Some data are aged out, and `offset < beginningOffset <= 
untilOffset - 1 <= latestOffset - 1`
+//  Seek to the beginningOffset and fetch the data.
+// 2. Some data are aged out, and `offset <= untilOffset - 1 < 
beginningOffset`.
+//  There is nothing to fetch, return null.
+// 3. The topic is deleted.
+//  There is nothing to fetch, return null.
+// 4. The topic is deleted and recreated, and `beginningOffset <= 
offset <= untilOffset - 1 <= latestOffset - 1`.
+//  We cannot detect this case. We can still fetch data like 
nothing happens.
+// 5. The topic is deleted and recreated, and `beginningOffset <= 
offset < latestOffset - 1 < untilOffset - 1`.
+//  Same as 4.
+// 6. The topic is deleted and recreated, and `beginningOffset <= 
latestOffset - 1 < offset <= untilOffset - 1`.
+//  There is nothing to fetch, return null.
+// 7. The topic is deleted and recreated, and `offset < 
beginningOffset <= untilOffset - 1`.
+//  Same as 1.
+// 8. The topic is deleted and recreated, and `offset <= untilOffset - 
1 < beginningOffset`.
+//  There is nothing to fetch, return null.
+// scalastyle:on
+require(offset < untilOffset, s"offset: $offset, untilOffset: 
$untilOffset")
+logDebug(s"Get $groupId $topicPartition nextOffset 
$nextOffsetInFetchedData requested $offset")
+try {
+  if (offset != nextOffsetInFetchedData) {
+logInfo(s"Initial fetch for $topicPartition $offset")
+seek(offset)
+poll(pollTimeoutMs)
+  } else if (!fetchedData.hasNext()) {
+// The last pre-fetched data has been drained.
+poll(pollTimeoutMs)
+  }
+  getRecordFromFetchedData(offset, untilOffset)
+} catch {
+  case e: OffsetOutOfRangeException =>
+logWarning(s"Cannot fetch offset $offset, try to recover from the 
beginning offset", e)
--- End diff --

I think it's worth the warning explicitly stating that data has been lost


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...

2016-11-08 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/15820#discussion_r87129126
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
 ---
@@ -83,6 +86,113 @@ private[kafka010] case class CachedKafkaConsumer 
private(
 record
   }
 
+  @tailrec
+  final def getAndIgnoreLostData(
+  offset: Long,
+  untilOffset: Long,
+  pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = {
+// scalastyle:off
+// When `failOnDataLoss` is `false`, we need to handle the following 
cases (note: untilOffset and latestOffset are exclusive):
+// 1. Some data are aged out, and `offset < beginningOffset <= 
untilOffset - 1 <= latestOffset - 1`
+//  Seek to the beginningOffset and fetch the data.
+// 2. Some data are aged out, and `offset <= untilOffset - 1 < 
beginningOffset`.
+//  There is nothing to fetch, return null.
+// 3. The topic is deleted.
+//  There is nothing to fetch, return null.
+// 4. The topic is deleted and recreated, and `beginningOffset <= 
offset <= untilOffset - 1 <= latestOffset - 1`.
+//  We cannot detect this case. We can still fetch data like 
nothing happens.
+// 5. The topic is deleted and recreated, and `beginningOffset <= 
offset < latestOffset - 1 < untilOffset - 1`.
+//  Same as 4.
+// 6. The topic is deleted and recreated, and `beginningOffset <= 
latestOffset - 1 < offset <= untilOffset - 1`.
+//  There is nothing to fetch, return null.
+// 7. The topic is deleted and recreated, and `offset < 
beginningOffset <= untilOffset - 1`.
+//  Same as 1.
+// 8. The topic is deleted and recreated, and `offset <= untilOffset - 
1 < beginningOffset`.
+//  There is nothing to fetch, return null.
+// scalastyle:on
+if (offset >= untilOffset) {
+  // Case 2 or 8
+  // We seek to beginningOffset but beginningOffset >= untilOffset
+  reset()
+  return null
+}
+
+logDebug(s"Get $groupId $topicPartition nextOffset 
$nextOffsetInFetchedData requested $offset")
+var outOfOffset = false
+if (offset != nextOffsetInFetchedData) {
+  logInfo(s"Initial fetch for $topicPartition $offset")
+  seek(offset)
+  try {
+poll(pollTimeoutMs)
+  } catch {
+case e: OffsetOutOfRangeException =>
+  logWarning(s"Cannot fetch offset $offset, try to recover from 
the beginning offset", e)
+  outOfOffset = true
+  }
+} else if (!fetchedData.hasNext()) {
+  // The last pre-fetched data has been drained.
+  seek(offset)
+  try {
+poll(pollTimeoutMs)
+  } catch {
+case e: OffsetOutOfRangeException =>
+  logWarning(s"Cannot fetch offset $offset, try to recover from 
the beginning offset", e)
+  outOfOffset = true
+  }
+}
+if (outOfOffset) {
+  val beginningOffset = getBeginningOffset()
+  if (beginningOffset <= offset) {
+val latestOffset = getLatestOffset()
+if (latestOffset <= offset) {
+  // Case 3 or 6
+  logWarning(s"Offset ${offset} is later than the latest offset 
$latestOffset. " +
+s"Skipped [$offset, $untilOffset)")
+  reset()
+  return null
+} else {
+  // Case 4 or 5
+  getAndIgnoreLostData(offset, untilOffset, pollTimeoutMs)
--- End diff --

- Why is this not an early return?
- The arguments to the recursive function have not changed at this point, 
right?  Why does it terminate?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...

2016-11-08 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/15820#discussion_r87129981
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
 ---
@@ -83,6 +86,113 @@ private[kafka010] case class CachedKafkaConsumer 
private(
 record
   }
 
+  @tailrec
+  final def getAndIgnoreLostData(
+  offset: Long,
+  untilOffset: Long,
+  pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = {
+// scalastyle:off
+// When `failOnDataLoss` is `false`, we need to handle the following 
cases (note: untilOffset and latestOffset are exclusive):
+// 1. Some data are aged out, and `offset < beginningOffset <= 
untilOffset - 1 <= latestOffset - 1`
+//  Seek to the beginningOffset and fetch the data.
+// 2. Some data are aged out, and `offset <= untilOffset - 1 < 
beginningOffset`.
+//  There is nothing to fetch, return null.
+// 3. The topic is deleted.
+//  There is nothing to fetch, return null.
+// 4. The topic is deleted and recreated, and `beginningOffset <= 
offset <= untilOffset - 1 <= latestOffset - 1`.
+//  We cannot detect this case. We can still fetch data like 
nothing happens.
+// 5. The topic is deleted and recreated, and `beginningOffset <= 
offset < latestOffset - 1 < untilOffset - 1`.
+//  Same as 4.
+// 6. The topic is deleted and recreated, and `beginningOffset <= 
latestOffset - 1 < offset <= untilOffset - 1`.
+//  There is nothing to fetch, return null.
+// 7. The topic is deleted and recreated, and `offset < 
beginningOffset <= untilOffset - 1`.
+//  Same as 1.
+// 8. The topic is deleted and recreated, and `offset <= untilOffset - 
1 < beginningOffset`.
+//  There is nothing to fetch, return null.
+// scalastyle:on
+if (offset >= untilOffset) {
+  // Case 2 or 8
+  // We seek to beginningOffset but beginningOffset >= untilOffset
+  reset()
+  return null
+}
+
+logDebug(s"Get $groupId $topicPartition nextOffset 
$nextOffsetInFetchedData requested $offset")
+var outOfOffset = false
--- End diff --

Can this var be eliminated by just using a single try around the if / else? 
 It's the same catch condition in either case


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...

2016-11-08 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/15820#discussion_r87129817
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
 ---
@@ -83,6 +86,113 @@ private[kafka010] case class CachedKafkaConsumer 
private(
 record
   }
 
+  @tailrec
+  final def getAndIgnoreLostData(
+  offset: Long,
+  untilOffset: Long,
+  pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = {
+// scalastyle:off
+// When `failOnDataLoss` is `false`, we need to handle the following 
cases (note: untilOffset and latestOffset are exclusive):
+// 1. Some data are aged out, and `offset < beginningOffset <= 
untilOffset - 1 <= latestOffset - 1`
+//  Seek to the beginningOffset and fetch the data.
+// 2. Some data are aged out, and `offset <= untilOffset - 1 < 
beginningOffset`.
+//  There is nothing to fetch, return null.
+// 3. The topic is deleted.
+//  There is nothing to fetch, return null.
+// 4. The topic is deleted and recreated, and `beginningOffset <= 
offset <= untilOffset - 1 <= latestOffset - 1`.
+//  We cannot detect this case. We can still fetch data like 
nothing happens.
+// 5. The topic is deleted and recreated, and `beginningOffset <= 
offset < latestOffset - 1 < untilOffset - 1`.
+//  Same as 4.
+// 6. The topic is deleted and recreated, and `beginningOffset <= 
latestOffset - 1 < offset <= untilOffset - 1`.
+//  There is nothing to fetch, return null.
+// 7. The topic is deleted and recreated, and `offset < 
beginningOffset <= untilOffset - 1`.
+//  Same as 1.
+// 8. The topic is deleted and recreated, and `offset <= untilOffset - 
1 < beginningOffset`.
+//  There is nothing to fetch, return null.
+// scalastyle:on
+if (offset >= untilOffset) {
+  // Case 2 or 8
+  // We seek to beginningOffset but beginningOffset >= untilOffset
+  reset()
+  return null
+}
+
+logDebug(s"Get $groupId $topicPartition nextOffset 
$nextOffsetInFetchedData requested $offset")
+var outOfOffset = false
+if (offset != nextOffsetInFetchedData) {
+  logInfo(s"Initial fetch for $topicPartition $offset")
+  seek(offset)
+  try {
+poll(pollTimeoutMs)
+  } catch {
+case e: OffsetOutOfRangeException =>
+  logWarning(s"Cannot fetch offset $offset, try to recover from 
the beginning offset", e)
+  outOfOffset = true
+  }
+} else if (!fetchedData.hasNext()) {
+  // The last pre-fetched data has been drained.
+  seek(offset)
--- End diff --

I don't think it's necessary to seek every time the fetched data is empty, 
in normal operation the poll should return the next offset, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...

2016-11-08 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/15820#discussion_r87129927
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
 ---
@@ -83,6 +86,113 @@ private[kafka010] case class CachedKafkaConsumer 
private(
 record
   }
 
+  @tailrec
+  final def getAndIgnoreLostData(
+  offset: Long,
+  untilOffset: Long,
+  pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = {
+// scalastyle:off
+// When `failOnDataLoss` is `false`, we need to handle the following 
cases (note: untilOffset and latestOffset are exclusive):
+// 1. Some data are aged out, and `offset < beginningOffset <= 
untilOffset - 1 <= latestOffset - 1`
+//  Seek to the beginningOffset and fetch the data.
+// 2. Some data are aged out, and `offset <= untilOffset - 1 < 
beginningOffset`.
+//  There is nothing to fetch, return null.
+// 3. The topic is deleted.
+//  There is nothing to fetch, return null.
+// 4. The topic is deleted and recreated, and `beginningOffset <= 
offset <= untilOffset - 1 <= latestOffset - 1`.
+//  We cannot detect this case. We can still fetch data like 
nothing happens.
+// 5. The topic is deleted and recreated, and `beginningOffset <= 
offset < latestOffset - 1 < untilOffset - 1`.
+//  Same as 4.
+// 6. The topic is deleted and recreated, and `beginningOffset <= 
latestOffset - 1 < offset <= untilOffset - 1`.
+//  There is nothing to fetch, return null.
+// 7. The topic is deleted and recreated, and `offset < 
beginningOffset <= untilOffset - 1`.
+//  Same as 1.
+// 8. The topic is deleted and recreated, and `offset <= untilOffset - 
1 < beginningOffset`.
+//  There is nothing to fetch, return null.
+// scalastyle:on
+if (offset >= untilOffset) {
+  // Case 2 or 8
+  // We seek to beginningOffset but beginningOffset >= untilOffset
+  reset()
+  return null
+}
+
+logDebug(s"Get $groupId $topicPartition nextOffset 
$nextOffsetInFetchedData requested $offset")
+var outOfOffset = false
+if (offset != nextOffsetInFetchedData) {
+  logInfo(s"Initial fetch for $topicPartition $offset")
+  seek(offset)
+  try {
+poll(pollTimeoutMs)
+  } catch {
+case e: OffsetOutOfRangeException =>
+  logWarning(s"Cannot fetch offset $offset, try to recover from 
the beginning offset", e)
+  outOfOffset = true
+  }
+} else if (!fetchedData.hasNext()) {
+  // The last pre-fetched data has been drained.
+  seek(offset)
+  try {
+poll(pollTimeoutMs)
+  } catch {
+case e: OffsetOutOfRangeException =>
+  logWarning(s"Cannot fetch offset $offset, try to recover from 
the beginning offset", e)
+  outOfOffset = true
+  }
+}
+if (outOfOffset) {
+  val beginningOffset = getBeginningOffset()
+  if (beginningOffset <= offset) {
+val latestOffset = getLatestOffset()
+if (latestOffset <= offset) {
+  // Case 3 or 6
+  logWarning(s"Offset ${offset} is later than the latest offset 
$latestOffset. " +
+s"Skipped [$offset, $untilOffset)")
+  reset()
+  return null
+} else {
+  // Case 4 or 5
+  getAndIgnoreLostData(offset, untilOffset, pollTimeoutMs)
+}
+  } else {
+// Case 1 or 7
+logWarning(s"Buffer miss for $groupId $topicPartition [$offset, 
$beginningOffset})")
+return getAndIgnoreLostData(beginningOffset, untilOffset, 
pollTimeoutMs)
+  }
+} else {
+  if (!fetchedData.hasNext()) {
+// We cannot fetch anything after `polling`. Two possible cases:
+// - `beginningOffset` is `offset` but there is nothing for 
`beginningOffset` right now.
+// - Cannot fetch any date before timeout.
+// Because there is no way to distinguish, just skip the rest 
offsets in the current
+// partition.
+logWarning(s"Buffer miss for $groupId $topicPartition [$offset, 
$untilOffset)")
+reset()
+return null
+  }
+
+  val record = fetchedData.next()
+  if (record.offset >= untilOffset) {
+// Case 2
+logWarning(s"Buffer miss for $groupId $topicPartition [$offset, 
$untilOffset)")
+reset()
+return null
+  } else {
+if (record.offset != offset) {
+  // Case 1
+  logWarning(s"Buffer miss for $groupId $topicPartition [$offset, 

[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...

2016-11-08 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/15820#discussion_r87127811
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
 ---
@@ -83,6 +86,113 @@ private[kafka010] case class CachedKafkaConsumer 
private(
 record
   }
 
+  @tailrec
+  final def getAndIgnoreLostData(
+  offset: Long,
+  untilOffset: Long,
+  pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = {
+// scalastyle:off
+// When `failOnDataLoss` is `false`, we need to handle the following 
cases (note: untilOffset and latestOffset are exclusive):
+// 1. Some data are aged out, and `offset < beginningOffset <= 
untilOffset - 1 <= latestOffset - 1`
+//  Seek to the beginningOffset and fetch the data.
+// 2. Some data are aged out, and `offset <= untilOffset - 1 < 
beginningOffset`.
+//  There is nothing to fetch, return null.
+// 3. The topic is deleted.
+//  There is nothing to fetch, return null.
+// 4. The topic is deleted and recreated, and `beginningOffset <= 
offset <= untilOffset - 1 <= latestOffset - 1`.
+//  We cannot detect this case. We can still fetch data like 
nothing happens.
+// 5. The topic is deleted and recreated, and `beginningOffset <= 
offset < latestOffset - 1 < untilOffset - 1`.
+//  Same as 4.
+// 6. The topic is deleted and recreated, and `beginningOffset <= 
latestOffset - 1 < offset <= untilOffset - 1`.
+//  There is nothing to fetch, return null.
+// 7. The topic is deleted and recreated, and `offset < 
beginningOffset <= untilOffset - 1`.
+//  Same as 1.
+// 8. The topic is deleted and recreated, and `offset <= untilOffset - 
1 < beginningOffset`.
+//  There is nothing to fetch, return null.
+// scalastyle:on
+if (offset >= untilOffset) {
+  // Case 2 or 8
+  // We seek to beginningOffset but beginningOffset >= untilOffset
+  reset()
+  return null
+}
+
+logDebug(s"Get $groupId $topicPartition nextOffset 
$nextOffsetInFetchedData requested $offset")
+var outOfOffset = false
+if (offset != nextOffsetInFetchedData) {
+  logInfo(s"Initial fetch for $topicPartition $offset")
+  seek(offset)
+  try {
+poll(pollTimeoutMs)
+  } catch {
+case e: OffsetOutOfRangeException =>
+  logWarning(s"Cannot fetch offset $offset, try to recover from 
the beginning offset", e)
+  outOfOffset = true
+  }
+} else if (!fetchedData.hasNext()) {
+  // The last pre-fetched data has been drained.
+  seek(offset)
+  try {
+poll(pollTimeoutMs)
+  } catch {
+case e: OffsetOutOfRangeException =>
+  logWarning(s"Cannot fetch offset $offset, try to recover from 
the beginning offset", e)
+  outOfOffset = true
+  }
+}
+if (outOfOffset) {
+  val beginningOffset = getBeginningOffset()
+  if (beginningOffset <= offset) {
+val latestOffset = getLatestOffset()
+if (latestOffset <= offset) {
+  // Case 3 or 6
+  logWarning(s"Offset ${offset} is later than the latest offset 
$latestOffset. " +
+s"Skipped [$offset, $untilOffset)")
+  reset()
+  return null
+} else {
+  // Case 4 or 5
+  getAndIgnoreLostData(offset, untilOffset, pollTimeoutMs)
+}
+  } else {
+// Case 1 or 7
+logWarning(s"Buffer miss for $groupId $topicPartition [$offset, 
$beginningOffset})")
+return getAndIgnoreLostData(beginningOffset, untilOffset, 
pollTimeoutMs)
+  }
+} else {
+  if (!fetchedData.hasNext()) {
+// We cannot fetch anything after `polling`. Two possible cases:
+// - `beginningOffset` is `offset` but there is nothing for 
`beginningOffset` right now.
+// - Cannot fetch any date before timeout.
+// Because there is no way to distinguish, just skip the rest 
offsets in the current
+// partition.
+logWarning(s"Buffer miss for $groupId $topicPartition [$offset, 
$untilOffset)")
+reset()
+return null
+  }
+
+  val record = fetchedData.next()
+  if (record.offset >= untilOffset) {
+// Case 2
+logWarning(s"Buffer miss for $groupId $topicPartition [$offset, 
$untilOffset)")
+reset()
+return null
+  } else {
+if (record.offset != offset) {
+  // Case 1
+  logWarning(s"Buffer miss for $groupId $topicPartition [$offset, 

[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...

2016-11-08 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/15820#discussion_r87130059
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
 ---
@@ -83,6 +86,113 @@ private[kafka010] case class CachedKafkaConsumer 
private(
 record
   }
 
+  @tailrec
+  final def getAndIgnoreLostData(
+  offset: Long,
+  untilOffset: Long,
+  pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = {
+// scalastyle:off
+// When `failOnDataLoss` is `false`, we need to handle the following 
cases (note: untilOffset and latestOffset are exclusive):
+// 1. Some data are aged out, and `offset < beginningOffset <= 
untilOffset - 1 <= latestOffset - 1`
+//  Seek to the beginningOffset and fetch the data.
+// 2. Some data are aged out, and `offset <= untilOffset - 1 < 
beginningOffset`.
+//  There is nothing to fetch, return null.
+// 3. The topic is deleted.
+//  There is nothing to fetch, return null.
+// 4. The topic is deleted and recreated, and `beginningOffset <= 
offset <= untilOffset - 1 <= latestOffset - 1`.
+//  We cannot detect this case. We can still fetch data like 
nothing happens.
+// 5. The topic is deleted and recreated, and `beginningOffset <= 
offset < latestOffset - 1 < untilOffset - 1`.
+//  Same as 4.
+// 6. The topic is deleted and recreated, and `beginningOffset <= 
latestOffset - 1 < offset <= untilOffset - 1`.
+//  There is nothing to fetch, return null.
+// 7. The topic is deleted and recreated, and `offset < 
beginningOffset <= untilOffset - 1`.
+//  Same as 1.
+// 8. The topic is deleted and recreated, and `offset <= untilOffset - 
1 < beginningOffset`.
+//  There is nothing to fetch, return null.
+// scalastyle:on
+if (offset >= untilOffset) {
+  // Case 2 or 8
+  // We seek to beginningOffset but beginningOffset >= untilOffset
+  reset()
+  return null
+}
+
+logDebug(s"Get $groupId $topicPartition nextOffset 
$nextOffsetInFetchedData requested $offset")
+var outOfOffset = false
+if (offset != nextOffsetInFetchedData) {
+  logInfo(s"Initial fetch for $topicPartition $offset")
+  seek(offset)
+  try {
+poll(pollTimeoutMs)
+  } catch {
+case e: OffsetOutOfRangeException =>
+  logWarning(s"Cannot fetch offset $offset, try to recover from 
the beginning offset", e)
+  outOfOffset = true
+  }
+} else if (!fetchedData.hasNext()) {
+  // The last pre-fetched data has been drained.
+  seek(offset)
+  try {
+poll(pollTimeoutMs)
+  } catch {
+case e: OffsetOutOfRangeException =>
+  logWarning(s"Cannot fetch offset $offset, try to recover from 
the beginning offset", e)
+  outOfOffset = true
+  }
+}
+if (outOfOffset) {
+  val beginningOffset = getBeginningOffset()
+  if (beginningOffset <= offset) {
+val latestOffset = getLatestOffset()
+if (latestOffset <= offset) {
+  // Case 3 or 6
+  logWarning(s"Offset ${offset} is later than the latest offset 
$latestOffset. " +
+s"Skipped [$offset, $untilOffset)")
+  reset()
+  return null
+} else {
+  // Case 4 or 5
+  getAndIgnoreLostData(offset, untilOffset, pollTimeoutMs)
--- End diff --

- Why isn't this an early return?
- Unless I'm misreading, this is a recursive call without changing the 
arguments.  Why is it guaranteed to terminate?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...

2016-11-08 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/15820#discussion_r87128373
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
 ---
@@ -83,6 +86,113 @@ private[kafka010] case class CachedKafkaConsumer 
private(
 record
   }
 
+  @tailrec
+  final def getAndIgnoreLostData(
+  offset: Long,
+  untilOffset: Long,
+  pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = {
+// scalastyle:off
+// When `failOnDataLoss` is `false`, we need to handle the following 
cases (note: untilOffset and latestOffset are exclusive):
+// 1. Some data are aged out, and `offset < beginningOffset <= 
untilOffset - 1 <= latestOffset - 1`
+//  Seek to the beginningOffset and fetch the data.
+// 2. Some data are aged out, and `offset <= untilOffset - 1 < 
beginningOffset`.
+//  There is nothing to fetch, return null.
+// 3. The topic is deleted.
+//  There is nothing to fetch, return null.
+// 4. The topic is deleted and recreated, and `beginningOffset <= 
offset <= untilOffset - 1 <= latestOffset - 1`.
+//  We cannot detect this case. We can still fetch data like 
nothing happens.
+// 5. The topic is deleted and recreated, and `beginningOffset <= 
offset < latestOffset - 1 < untilOffset - 1`.
+//  Same as 4.
+// 6. The topic is deleted and recreated, and `beginningOffset <= 
latestOffset - 1 < offset <= untilOffset - 1`.
+//  There is nothing to fetch, return null.
+// 7. The topic is deleted and recreated, and `offset < 
beginningOffset <= untilOffset - 1`.
+//  Same as 1.
+// 8. The topic is deleted and recreated, and `offset <= untilOffset - 
1 < beginningOffset`.
+//  There is nothing to fetch, return null.
+// scalastyle:on
+if (offset >= untilOffset) {
+  // Case 2 or 8
+  // We seek to beginningOffset but beginningOffset >= untilOffset
+  reset()
+  return null
+}
+
+logDebug(s"Get $groupId $topicPartition nextOffset 
$nextOffsetInFetchedData requested $offset")
+var outOfOffset = false
+if (offset != nextOffsetInFetchedData) {
+  logInfo(s"Initial fetch for $topicPartition $offset")
+  seek(offset)
+  try {
+poll(pollTimeoutMs)
+  } catch {
+case e: OffsetOutOfRangeException =>
+  logWarning(s"Cannot fetch offset $offset, try to recover from 
the beginning offset", e)
+  outOfOffset = true
+  }
+} else if (!fetchedData.hasNext()) {
+  // The last pre-fetched data has been drained.
+  seek(offset)
+  try {
+poll(pollTimeoutMs)
+  } catch {
+case e: OffsetOutOfRangeException =>
+  logWarning(s"Cannot fetch offset $offset, try to recover from 
the beginning offset", e)
+  outOfOffset = true
+  }
+}
+if (outOfOffset) {
+  val beginningOffset = getBeginningOffset()
+  if (beginningOffset <= offset) {
+val latestOffset = getLatestOffset()
+if (latestOffset <= offset) {
+  // Case 3 or 6
+  logWarning(s"Offset ${offset} is later than the latest offset 
$latestOffset. " +
+s"Skipped [$offset, $untilOffset)")
+  reset()
+  return null
+} else {
+  // Case 4 or 5
+  getAndIgnoreLostData(offset, untilOffset, pollTimeoutMs)
+}
+  } else {
+// Case 1 or 7
+logWarning(s"Buffer miss for $groupId $topicPartition [$offset, 
$beginningOffset})")
+return getAndIgnoreLostData(beginningOffset, untilOffset, 
pollTimeoutMs)
+  }
+} else {
+  if (!fetchedData.hasNext()) {
+// We cannot fetch anything after `polling`. Two possible cases:
+// - `beginningOffset` is `offset` but there is nothing for 
`beginningOffset` right now.
+// - Cannot fetch any date before timeout.
+// Because there is no way to distinguish, just skip the rest 
offsets in the current
+// partition.
+logWarning(s"Buffer miss for $groupId $topicPartition [$offset, 
$untilOffset)")
+reset()
+return null
+  }
+
+  val record = fetchedData.next()
+  if (record.offset >= untilOffset) {
+// Case 2
+logWarning(s"Buffer miss for $groupId $topicPartition [$offset, 
$untilOffset)")
+reset()
+return null
+  } else {
+if (record.offset != offset) {
+  // Case 1
+  logWarning(s"Buffer miss for $groupId $topicPartition [$offset, 

[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...

2016-11-08 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/15820#discussion_r87129204
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
 ---
@@ -83,6 +86,113 @@ private[kafka010] case class CachedKafkaConsumer 
private(
 record
   }
 
+  @tailrec
+  final def getAndIgnoreLostData(
+  offset: Long,
+  untilOffset: Long,
+  pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = {
+// scalastyle:off
+// When `failOnDataLoss` is `false`, we need to handle the following 
cases (note: untilOffset and latestOffset are exclusive):
+// 1. Some data are aged out, and `offset < beginningOffset <= 
untilOffset - 1 <= latestOffset - 1`
+//  Seek to the beginningOffset and fetch the data.
+// 2. Some data are aged out, and `offset <= untilOffset - 1 < 
beginningOffset`.
+//  There is nothing to fetch, return null.
+// 3. The topic is deleted.
+//  There is nothing to fetch, return null.
+// 4. The topic is deleted and recreated, and `beginningOffset <= 
offset <= untilOffset - 1 <= latestOffset - 1`.
+//  We cannot detect this case. We can still fetch data like 
nothing happens.
+// 5. The topic is deleted and recreated, and `beginningOffset <= 
offset < latestOffset - 1 < untilOffset - 1`.
+//  Same as 4.
+// 6. The topic is deleted and recreated, and `beginningOffset <= 
latestOffset - 1 < offset <= untilOffset - 1`.
+//  There is nothing to fetch, return null.
+// 7. The topic is deleted and recreated, and `offset < 
beginningOffset <= untilOffset - 1`.
+//  Same as 1.
+// 8. The topic is deleted and recreated, and `offset <= untilOffset - 
1 < beginningOffset`.
+//  There is nothing to fetch, return null.
+// scalastyle:on
+if (offset >= untilOffset) {
+  // Case 2 or 8
+  // We seek to beginningOffset but beginningOffset >= untilOffset
+  reset()
+  return null
+}
+
+logDebug(s"Get $groupId $topicPartition nextOffset 
$nextOffsetInFetchedData requested $offset")
+var outOfOffset = false
--- End diff --

Can't this var be eliminated with a singly try around the following if/else?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15820: [SPARK-18373][SS][Kafka]Make failOnDataLoss=false...

2016-11-08 Thread zsxwing
GitHub user zsxwing opened a pull request:

https://github.com/apache/spark/pull/15820

[SPARK-18373][SS][Kafka]Make failOnDataLoss=false work with Spark jobs

## What changes were proposed in this pull request?

This PR adds `CachedKafkaConsumer.getAndIgnoreLostData` to handle corner 
cases of `failOnDataLoss=false`. I listed all kinds of cases we should support 
in the code comment.

## How was this patch tested?

Because I cannot find any way to manually control the Kafka server to clean 
up logs, it's impossible to write unit tests for each corner cases. Therefore, 
I just created `test("stress test for failOnDataLoss=false")` which should 
cover most of corner cases.

I also modified some existing tests to test for both `failOnDataLoss=false` 
and `failOnDataLoss=true` to make sure it doesn't break existing logic.




You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zsxwing/spark failOnDataLoss

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/15820.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #15820


commit e8eff9ff3b32320ab8d969089cded97e9ec29a52
Author: Shixiong Zhu 
Date:   2016-10-28T17:46:54Z

Make failOnDataLoss=false stable for Spark jobs




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org