HeartSaVioR commented on a change in pull request #32747:
URL: https://github.com/apache/spark/pull/32747#discussion_r654408254



##########
File path: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderConsumer.scala
##########
@@ -270,6 +262,35 @@ private[kafka010] class KafkaOffsetReaderConsumer(
       fnAssertFetchedOffsets)
   }
 
+  private def readTimestampOffsets(
+      tpToOffsetMap: Map[TopicPartition, OffsetAndTimestamp],
+      isStartingOffsets: Boolean,
+      strategyOnNoMatchStartingOffset: StrategyOnNoMatchStartingOffset.Value,
+      partitionTimestampFn: TopicPartition => Long): Map[TopicPartition, Long] 
= {
+
+    tpToOffsetMap.map { case (tp, offsetSpec) =>
+      val offset = if (offsetSpec == null) {
+        if (isStartingOffsets) {
+          strategyOnNoMatchStartingOffset match {
+            case StrategyOnNoMatchStartingOffset.ERROR =>
+              throw new IllegalArgumentException("No offset " +

Review comment:
       Ah OK we used assert, not require. Good point.

##########
File path: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
##########
@@ -1456,8 +1456,7 @@ abstract class KafkaSourceSuiteBase extends 
KafkaSourceTest {
       testFromSpecificOffsets(
         topic,
         failOnDataLoss = failOnDataLoss,
-        "assign" -> assignString(topic, 0 to 4),
-        "failOnDataLoss" -> failOnDataLoss.toString)

Review comment:
       Yes, we set the same config unnecessarily twice.

##########
File path: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
##########
@@ -1568,6 +1565,137 @@ abstract class KafkaSourceSuiteBase extends 
KafkaSourceTest {
     }
   }
 
+  test("subscribing topic by name from specific timestamps with non-matching 
starting offset") {
+    val topic = newTopic()
+    testFromSpecificTimestampsWithNoMatchingStartingOffset(topic, "subscribe" 
-> topic)
+  }
+
+  test("subscribing topic by name from global timestamp per topic with " +
+    "non-matching starting offset") {
+    val topic = newTopic()
+    testFromGlobalTimestampWithNoMatchingStartingOffset(topic, "subscribe" -> 
topic)
+  }
+
+  test("subscribing topic by pattern from specific timestamps with " +
+    "non-matching starting offset") {
+    val topicPrefix = newTopic()
+    val topic = topicPrefix + "-suffix"
+    testFromSpecificTimestampsWithNoMatchingStartingOffset(topic,
+      "subscribePattern" -> s"$topicPrefix-.*")
+  }
+
+  test("subscribing topic by pattern from global timestamp per topic with " +
+    "non-matching starting offset") {
+    val topicPrefix = newTopic()
+    val topic = topicPrefix + "-suffix"
+    testFromGlobalTimestampWithNoMatchingStartingOffset(topic,
+      "subscribePattern" -> s"$topicPrefix-.*")
+  }
+
+  private def testFromSpecificTimestampsWithNoMatchingStartingOffset(
+      topic: String,
+      options: (String, String)*): Unit = {
+    testUtils.createTopic(topic, partitions = 5)
+
+    val firstTimestamp = System.currentTimeMillis() - 5000
+    val secondTimestamp = firstTimestamp + 1000
+    setupTestMessagesForTestOnTimestampOffsets(topic, firstTimestamp, 
secondTimestamp)
+    // no data after second timestamp for partition 4
+
+    require(testUtils.getLatestOffsets(Set(topic)).size === 5)
+
+    // here we starts from second timestamp for all partitions, whereas we 
know there's
+    // no data in partition 4 matching second timestamp
+    val startPartitionTimestamps: Map[TopicPartition, Long] =
+    (0 to 4).map(new TopicPartition(topic, _) -> secondTimestamp).toMap
+    val startingTimestamps = 
JsonUtils.partitionTimestamps(startPartitionTimestamps)
+
+    val mapped = setupDataFrameForTestOnTimestampOffsets(startingTimestamps, 
failOnDataLoss = true,
+      options: _*)
+    assertQueryFailOnStartOffsetStrategyAsError(mapped)
+
+    val mapped2 = setupDataFrameForTestOnTimestampOffsets(startingTimestamps, 
failOnDataLoss = true,
+      options :+ ("startingoffsetsbytimestampstrategy", "error"): _*)
+    assertQueryFailOnStartOffsetStrategyAsError(mapped2)
+
+    val mapped3 = setupDataFrameForTestOnTimestampOffsets(startingTimestamps, 
failOnDataLoss = true,
+      options :+ ("startingoffsetsbytimestampstrategy", "latest"): _*)
+
+    testStream(mapped3)(
+      makeSureGetOffsetCalled,
+      Execute { q =>
+        val partitions = (0 to 4).map(new TopicPartition(topic, _))
+        // wait to reach the last offset in every partition
+        q.awaitOffset(
+          0, KafkaSourceOffset(partitions.map(tp => tp -> 3L).toMap), 
streamingTimeout.toMillis)
+      },
+      CheckAnswer(-21, -22, -11, -12, 2, 12),
+      Execute { q =>
+        sendMessagesWithTimestamp(topic, Array(23, 24, 25).map(_.toString), 4, 
secondTimestamp)
+        // wait to reach the new last offset in every partition
+        val partitions = (0 to 3).map(new TopicPartition(topic, _)).map(tp => 
tp -> 3L) ++
+          Seq(new TopicPartition(topic, 4) -> 6L)
+        q.awaitOffset(
+          0, KafkaSourceOffset(partitions.toMap), streamingTimeout.toMillis)
+      },
+      CheckNewAnswer(23, 24, 25)
+    )
+  }
+
+  private def testFromGlobalTimestampWithNoMatchingStartingOffset(
+      topic: String,
+      options: (String, String)*): Unit = {
+    testUtils.createTopic(topic, partitions = 5)
+
+    val firstTimestamp = System.currentTimeMillis() - 5000
+    val secondTimestamp = firstTimestamp + 1000
+    setupTestMessagesForTestOnTimestampOffsets(topic, firstTimestamp, 
secondTimestamp)
+
+    require(testUtils.getLatestOffsets(Set(topic)).size === 5)
+
+    // here we starts from second timestamp for all partitions, whereas we 
know there's
+    // no data in partition 4 matching second timestamp
+
+    val mapped = setupDataFrameForTestOnGlobalTimestamp(secondTimestamp, 
failOnDataLoss = true,
+      options: _*)
+    assertQueryFailOnStartOffsetStrategyAsError(mapped)
+
+    val mapped2 = setupDataFrameForTestOnGlobalTimestamp(secondTimestamp, 
failOnDataLoss = true,
+      options :+ ("startingoffsetsbytimestampstrategy", "error"): _*)
+    assertQueryFailOnStartOffsetStrategyAsError(mapped2)
+
+    val mapped3 = setupDataFrameForTestOnGlobalTimestamp(secondTimestamp, 
failOnDataLoss = true,
+      options :+ ("startingoffsetsbytimestampstrategy", "latest"): _*)
+
+    testStream(mapped3)(
+      makeSureGetOffsetCalled,
+      Execute { q =>
+        val partitions = (0 to 4).map(new TopicPartition(topic, _))
+        // wait to reach the last offset in every partition
+        q.awaitOffset(
+          0, KafkaSourceOffset(partitions.map(tp => tp -> 3L).toMap), 
streamingTimeout.toMillis)
+      },
+      CheckAnswer(-21, -22, -11, -12, 2, 12),
+      Execute { q =>
+        sendMessagesWithTimestamp(topic, Array(23, 24, 25).map(_.toString), 4, 
secondTimestamp)
+        // wait to reach the new last offset in every partition
+        val partitions = (0 to 3).map(new TopicPartition(topic, _)).map(tp => 
tp -> 3L) ++
+          Seq(new TopicPartition(topic, 4) -> 6L)
+        q.awaitOffset(
+          0, KafkaSourceOffset(partitions.toMap), streamingTimeout.toMillis)
+      },
+      CheckNewAnswer(23, 24, 25)
+    )
+  }
+
+  private def assertQueryFailOnStartOffsetStrategyAsError(df: Dataset[_]): 
Unit = {
+    // In continuous mode, the origin exception is not caught here 
unfortunately, so we have to
+    // stick with checking general exception instead of verifying 
IllegalArgumentException.
+    intercept[Exception] {

Review comment:
       In continuous mode, the error message is completely different than we 
expect. This is a log message I get from `exc.getMessage()`:
   
   ```
   21/06/18 22:10:20.680 ScalaTest-run-running-KafkaContinuousSourceSuite WARN 
KafkaContinuousSourceSuite: 
   IncrementalExecution was not created: The code passed to eventually never 
returned normally. Attempted 1794 times over 30.009420817000002 seconds. Last 
failure message: null equaled null.
   ```
   
   The actual exception is logged, but we can't intercept the original 
exception here.
   
   ```
   21/06/18 22:09:53.747 stream execution thread for [id = 
bc35e8a2-077e-4949-b8b6-47670d1c057f, runId = 
d528f706-e553-4603-b022-fc0fe7cc0432] ERROR ContinuousExecution: Query [id = 
bc35e8a2-077e-4949-b8b6-47670d1c057f, runId = 
d528f706-e553-4603-b022-fc0fe7cc0432] terminated with error
   java.lang.AssertionError: No offset matched from request of topic-partition 
topic-31-4 and timestamp 1624021786438.
        at 
org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.$anonfun$readTimestampOffsets$1(KafkaOffsetReaderConsumer.scala:280)
        at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
        at 
scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:400)
        at 
scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:728)
        at scala.collection.TraversableLike.map(TraversableLike.scala:286)
        at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
        at scala.collection.AbstractTraversable.map(Traversable.scala:108)
        at 
org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.readTimestampOffsets(KafkaOffsetReaderConsumer.scala:271)
        at 
org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.$anonfun$fetchSpecificTimestampBasedOffsets$4(KafkaOffsetReaderConsumer.scala:227)
        at 
org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.$anonfun$fetchSpecificOffsets0$1(KafkaOffsetReaderConsumer.scala:305)
        at 
org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.$anonfun$partitionsAssignedToConsumer$2(KafkaOffsetReaderConsumer.scala:561)
        at 
org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.$anonfun$withRetriesWithoutInterrupt$1(KafkaOffsetReaderConsumer.scala:594)
        at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at 
org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77)
        at 
org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.withRetriesWithoutInterrupt(KafkaOffsetReaderConsumer.scala:593)
        at 
org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.$anonfun$partitionsAssignedToConsumer$1(KafkaOffsetReaderConsumer.scala:547)
        at 
org.apache.spark.util.UninterruptibleThreadRunner.runUninterruptibly(UninterruptibleThreadRunner.scala:48)
        at 
org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.partitionsAssignedToConsumer(KafkaOffsetReaderConsumer.scala:547)
        at 
org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.fetchSpecificOffsets0(KafkaOffsetReaderConsumer.scala:301)
        at 
org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.fetchSpecificTimestampBasedOffsets(KafkaOffsetReaderConsumer.scala:233)
        at 
org.apache.spark.sql.kafka010.KafkaContinuousStream.initialOffset(KafkaContinuousStream.scala:74)
        at 
org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution$$anonfun$3.$anonfun$applyOrElse$4(ContinuousExecution.scala:170)
   ...
   ```

##########
File path: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
##########
@@ -1568,6 +1565,137 @@ abstract class KafkaSourceSuiteBase extends 
KafkaSourceTest {
     }
   }
 
+  test("subscribing topic by name from specific timestamps with non-matching 
starting offset") {
+    val topic = newTopic()
+    testFromSpecificTimestampsWithNoMatchingStartingOffset(topic, "subscribe" 
-> topic)
+  }
+
+  test("subscribing topic by name from global timestamp per topic with " +
+    "non-matching starting offset") {
+    val topic = newTopic()
+    testFromGlobalTimestampWithNoMatchingStartingOffset(topic, "subscribe" -> 
topic)
+  }
+
+  test("subscribing topic by pattern from specific timestamps with " +
+    "non-matching starting offset") {
+    val topicPrefix = newTopic()
+    val topic = topicPrefix + "-suffix"
+    testFromSpecificTimestampsWithNoMatchingStartingOffset(topic,
+      "subscribePattern" -> s"$topicPrefix-.*")
+  }
+
+  test("subscribing topic by pattern from global timestamp per topic with " +
+    "non-matching starting offset") {
+    val topicPrefix = newTopic()
+    val topic = topicPrefix + "-suffix"
+    testFromGlobalTimestampWithNoMatchingStartingOffset(topic,
+      "subscribePattern" -> s"$topicPrefix-.*")
+  }
+
+  private def testFromSpecificTimestampsWithNoMatchingStartingOffset(
+      topic: String,
+      options: (String, String)*): Unit = {
+    testUtils.createTopic(topic, partitions = 5)
+
+    val firstTimestamp = System.currentTimeMillis() - 5000
+    val secondTimestamp = firstTimestamp + 1000
+    setupTestMessagesForTestOnTimestampOffsets(topic, firstTimestamp, 
secondTimestamp)
+    // no data after second timestamp for partition 4
+
+    require(testUtils.getLatestOffsets(Set(topic)).size === 5)
+
+    // here we starts from second timestamp for all partitions, whereas we 
know there's
+    // no data in partition 4 matching second timestamp
+    val startPartitionTimestamps: Map[TopicPartition, Long] =
+    (0 to 4).map(new TopicPartition(topic, _) -> secondTimestamp).toMap
+    val startingTimestamps = 
JsonUtils.partitionTimestamps(startPartitionTimestamps)
+
+    val mapped = setupDataFrameForTestOnTimestampOffsets(startingTimestamps, 
failOnDataLoss = true,
+      options: _*)
+    assertQueryFailOnStartOffsetStrategyAsError(mapped)
+
+    val mapped2 = setupDataFrameForTestOnTimestampOffsets(startingTimestamps, 
failOnDataLoss = true,
+      options :+ ("startingoffsetsbytimestampstrategy", "error"): _*)
+    assertQueryFailOnStartOffsetStrategyAsError(mapped2)
+
+    val mapped3 = setupDataFrameForTestOnTimestampOffsets(startingTimestamps, 
failOnDataLoss = true,
+      options :+ ("startingoffsetsbytimestampstrategy", "latest"): _*)
+
+    testStream(mapped3)(
+      makeSureGetOffsetCalled,
+      Execute { q =>
+        val partitions = (0 to 4).map(new TopicPartition(topic, _))
+        // wait to reach the last offset in every partition
+        q.awaitOffset(
+          0, KafkaSourceOffset(partitions.map(tp => tp -> 3L).toMap), 
streamingTimeout.toMillis)
+      },
+      CheckAnswer(-21, -22, -11, -12, 2, 12),
+      Execute { q =>
+        sendMessagesWithTimestamp(topic, Array(23, 24, 25).map(_.toString), 4, 
secondTimestamp)
+        // wait to reach the new last offset in every partition
+        val partitions = (0 to 3).map(new TopicPartition(topic, _)).map(tp => 
tp -> 3L) ++
+          Seq(new TopicPartition(topic, 4) -> 6L)
+        q.awaitOffset(
+          0, KafkaSourceOffset(partitions.toMap), streamingTimeout.toMillis)
+      },
+      CheckNewAnswer(23, 24, 25)
+    )
+  }
+
+  private def testFromGlobalTimestampWithNoMatchingStartingOffset(
+      topic: String,
+      options: (String, String)*): Unit = {
+    testUtils.createTopic(topic, partitions = 5)
+
+    val firstTimestamp = System.currentTimeMillis() - 5000
+    val secondTimestamp = firstTimestamp + 1000
+    setupTestMessagesForTestOnTimestampOffsets(topic, firstTimestamp, 
secondTimestamp)
+
+    require(testUtils.getLatestOffsets(Set(topic)).size === 5)
+
+    // here we starts from second timestamp for all partitions, whereas we 
know there's
+    // no data in partition 4 matching second timestamp
+
+    val mapped = setupDataFrameForTestOnGlobalTimestamp(secondTimestamp, 
failOnDataLoss = true,
+      options: _*)
+    assertQueryFailOnStartOffsetStrategyAsError(mapped)
+
+    val mapped2 = setupDataFrameForTestOnGlobalTimestamp(secondTimestamp, 
failOnDataLoss = true,
+      options :+ ("startingoffsetsbytimestampstrategy", "error"): _*)
+    assertQueryFailOnStartOffsetStrategyAsError(mapped2)
+
+    val mapped3 = setupDataFrameForTestOnGlobalTimestamp(secondTimestamp, 
failOnDataLoss = true,
+      options :+ ("startingoffsetsbytimestampstrategy", "latest"): _*)
+
+    testStream(mapped3)(
+      makeSureGetOffsetCalled,
+      Execute { q =>
+        val partitions = (0 to 4).map(new TopicPartition(topic, _))
+        // wait to reach the last offset in every partition
+        q.awaitOffset(
+          0, KafkaSourceOffset(partitions.map(tp => tp -> 3L).toMap), 
streamingTimeout.toMillis)
+      },
+      CheckAnswer(-21, -22, -11, -12, 2, 12),
+      Execute { q =>
+        sendMessagesWithTimestamp(topic, Array(23, 24, 25).map(_.toString), 4, 
secondTimestamp)
+        // wait to reach the new last offset in every partition
+        val partitions = (0 to 3).map(new TopicPartition(topic, _)).map(tp => 
tp -> 3L) ++
+          Seq(new TopicPartition(topic, 4) -> 6L)
+        q.awaitOffset(
+          0, KafkaSourceOffset(partitions.toMap), streamingTimeout.toMillis)
+      },
+      CheckNewAnswer(23, 24, 25)
+    )
+  }
+
+  private def assertQueryFailOnStartOffsetStrategyAsError(df: Dataset[_]): 
Unit = {
+    // In continuous mode, the origin exception is not caught here 
unfortunately, so we have to
+    // stick with checking general exception instead of verifying 
IllegalArgumentException.
+    intercept[Exception] {

Review comment:
       In continuous mode, the error message is completely different than we 
expect. This is a log message I get from `exc.getMessage()`:
   
   ```
   21/06/18 22:10:20.680 ScalaTest-run-running-KafkaContinuousSourceSuite WARN 
KafkaContinuousSourceSuite: 
   IncrementalExecution was not created: The code passed to eventually never 
returned normally. Attempted 1794 times over 30.009420817000002 seconds. Last 
failure message: null equaled null.
   ```
   
   The actual exception is logged, but we can't intercept the original 
exception here, hence can't find the relevant message from the exception being 
intercepted.
   
   ```
   21/06/18 22:09:53.747 stream execution thread for [id = 
bc35e8a2-077e-4949-b8b6-47670d1c057f, runId = 
d528f706-e553-4603-b022-fc0fe7cc0432] ERROR ContinuousExecution: Query [id = 
bc35e8a2-077e-4949-b8b6-47670d1c057f, runId = 
d528f706-e553-4603-b022-fc0fe7cc0432] terminated with error
   java.lang.AssertionError: No offset matched from request of topic-partition 
topic-31-4 and timestamp 1624021786438.
        at 
org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.$anonfun$readTimestampOffsets$1(KafkaOffsetReaderConsumer.scala:280)
        at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
        at 
scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:400)
        at 
scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:728)
        at scala.collection.TraversableLike.map(TraversableLike.scala:286)
        at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
        at scala.collection.AbstractTraversable.map(Traversable.scala:108)
        at 
org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.readTimestampOffsets(KafkaOffsetReaderConsumer.scala:271)
        at 
org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.$anonfun$fetchSpecificTimestampBasedOffsets$4(KafkaOffsetReaderConsumer.scala:227)
        at 
org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.$anonfun$fetchSpecificOffsets0$1(KafkaOffsetReaderConsumer.scala:305)
        at 
org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.$anonfun$partitionsAssignedToConsumer$2(KafkaOffsetReaderConsumer.scala:561)
        at 
org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.$anonfun$withRetriesWithoutInterrupt$1(KafkaOffsetReaderConsumer.scala:594)
        at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at 
org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77)
        at 
org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.withRetriesWithoutInterrupt(KafkaOffsetReaderConsumer.scala:593)
        at 
org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.$anonfun$partitionsAssignedToConsumer$1(KafkaOffsetReaderConsumer.scala:547)
        at 
org.apache.spark.util.UninterruptibleThreadRunner.runUninterruptibly(UninterruptibleThreadRunner.scala:48)
        at 
org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.partitionsAssignedToConsumer(KafkaOffsetReaderConsumer.scala:547)
        at 
org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.fetchSpecificOffsets0(KafkaOffsetReaderConsumer.scala:301)
        at 
org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.fetchSpecificTimestampBasedOffsets(KafkaOffsetReaderConsumer.scala:233)
        at 
org.apache.spark.sql.kafka010.KafkaContinuousStream.initialOffset(KafkaContinuousStream.scala:74)
        at 
org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution$$anonfun$3.$anonfun$applyOrElse$4(ContinuousExecution.scala:170)
   ...
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to