HeartSaVioR commented on a change in pull request #32747:
URL: https://github.com/apache/spark/pull/32747#discussion_r654408254
##########
File path:
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderConsumer.scala
##########
@@ -270,6 +262,35 @@ private[kafka010] class KafkaOffsetReaderConsumer(
fnAssertFetchedOffsets)
}
+ private def readTimestampOffsets(
+ tpToOffsetMap: Map[TopicPartition, OffsetAndTimestamp],
+ isStartingOffsets: Boolean,
+ strategyOnNoMatchStartingOffset: StrategyOnNoMatchStartingOffset.Value,
+ partitionTimestampFn: TopicPartition => Long): Map[TopicPartition, Long]
= {
+
+ tpToOffsetMap.map { case (tp, offsetSpec) =>
+ val offset = if (offsetSpec == null) {
+ if (isStartingOffsets) {
+ strategyOnNoMatchStartingOffset match {
+ case StrategyOnNoMatchStartingOffset.ERROR =>
+ throw new IllegalArgumentException("No offset " +
Review comment:
Ah OK we used assert, not require. Good point.
##########
File path:
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
##########
@@ -1456,8 +1456,7 @@ abstract class KafkaSourceSuiteBase extends
KafkaSourceTest {
testFromSpecificOffsets(
topic,
failOnDataLoss = failOnDataLoss,
- "assign" -> assignString(topic, 0 to 4),
- "failOnDataLoss" -> failOnDataLoss.toString)
Review comment:
Yes, we set the same config unnecessarily twice.
##########
File path:
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
##########
@@ -1568,6 +1565,137 @@ abstract class KafkaSourceSuiteBase extends
KafkaSourceTest {
}
}
+ test("subscribing topic by name from specific timestamps with non-matching
starting offset") {
+ val topic = newTopic()
+ testFromSpecificTimestampsWithNoMatchingStartingOffset(topic, "subscribe"
-> topic)
+ }
+
+ test("subscribing topic by name from global timestamp per topic with " +
+ "non-matching starting offset") {
+ val topic = newTopic()
+ testFromGlobalTimestampWithNoMatchingStartingOffset(topic, "subscribe" ->
topic)
+ }
+
+ test("subscribing topic by pattern from specific timestamps with " +
+ "non-matching starting offset") {
+ val topicPrefix = newTopic()
+ val topic = topicPrefix + "-suffix"
+ testFromSpecificTimestampsWithNoMatchingStartingOffset(topic,
+ "subscribePattern" -> s"$topicPrefix-.*")
+ }
+
+ test("subscribing topic by pattern from global timestamp per topic with " +
+ "non-matching starting offset") {
+ val topicPrefix = newTopic()
+ val topic = topicPrefix + "-suffix"
+ testFromGlobalTimestampWithNoMatchingStartingOffset(topic,
+ "subscribePattern" -> s"$topicPrefix-.*")
+ }
+
+ private def testFromSpecificTimestampsWithNoMatchingStartingOffset(
+ topic: String,
+ options: (String, String)*): Unit = {
+ testUtils.createTopic(topic, partitions = 5)
+
+ val firstTimestamp = System.currentTimeMillis() - 5000
+ val secondTimestamp = firstTimestamp + 1000
+ setupTestMessagesForTestOnTimestampOffsets(topic, firstTimestamp,
secondTimestamp)
+ // no data after second timestamp for partition 4
+
+ require(testUtils.getLatestOffsets(Set(topic)).size === 5)
+
+ // here we starts from second timestamp for all partitions, whereas we
know there's
+ // no data in partition 4 matching second timestamp
+ val startPartitionTimestamps: Map[TopicPartition, Long] =
+ (0 to 4).map(new TopicPartition(topic, _) -> secondTimestamp).toMap
+ val startingTimestamps =
JsonUtils.partitionTimestamps(startPartitionTimestamps)
+
+ val mapped = setupDataFrameForTestOnTimestampOffsets(startingTimestamps,
failOnDataLoss = true,
+ options: _*)
+ assertQueryFailOnStartOffsetStrategyAsError(mapped)
+
+ val mapped2 = setupDataFrameForTestOnTimestampOffsets(startingTimestamps,
failOnDataLoss = true,
+ options :+ ("startingoffsetsbytimestampstrategy", "error"): _*)
+ assertQueryFailOnStartOffsetStrategyAsError(mapped2)
+
+ val mapped3 = setupDataFrameForTestOnTimestampOffsets(startingTimestamps,
failOnDataLoss = true,
+ options :+ ("startingoffsetsbytimestampstrategy", "latest"): _*)
+
+ testStream(mapped3)(
+ makeSureGetOffsetCalled,
+ Execute { q =>
+ val partitions = (0 to 4).map(new TopicPartition(topic, _))
+ // wait to reach the last offset in every partition
+ q.awaitOffset(
+ 0, KafkaSourceOffset(partitions.map(tp => tp -> 3L).toMap),
streamingTimeout.toMillis)
+ },
+ CheckAnswer(-21, -22, -11, -12, 2, 12),
+ Execute { q =>
+ sendMessagesWithTimestamp(topic, Array(23, 24, 25).map(_.toString), 4,
secondTimestamp)
+ // wait to reach the new last offset in every partition
+ val partitions = (0 to 3).map(new TopicPartition(topic, _)).map(tp =>
tp -> 3L) ++
+ Seq(new TopicPartition(topic, 4) -> 6L)
+ q.awaitOffset(
+ 0, KafkaSourceOffset(partitions.toMap), streamingTimeout.toMillis)
+ },
+ CheckNewAnswer(23, 24, 25)
+ )
+ }
+
+ private def testFromGlobalTimestampWithNoMatchingStartingOffset(
+ topic: String,
+ options: (String, String)*): Unit = {
+ testUtils.createTopic(topic, partitions = 5)
+
+ val firstTimestamp = System.currentTimeMillis() - 5000
+ val secondTimestamp = firstTimestamp + 1000
+ setupTestMessagesForTestOnTimestampOffsets(topic, firstTimestamp,
secondTimestamp)
+
+ require(testUtils.getLatestOffsets(Set(topic)).size === 5)
+
+ // here we starts from second timestamp for all partitions, whereas we
know there's
+ // no data in partition 4 matching second timestamp
+
+ val mapped = setupDataFrameForTestOnGlobalTimestamp(secondTimestamp,
failOnDataLoss = true,
+ options: _*)
+ assertQueryFailOnStartOffsetStrategyAsError(mapped)
+
+ val mapped2 = setupDataFrameForTestOnGlobalTimestamp(secondTimestamp,
failOnDataLoss = true,
+ options :+ ("startingoffsetsbytimestampstrategy", "error"): _*)
+ assertQueryFailOnStartOffsetStrategyAsError(mapped2)
+
+ val mapped3 = setupDataFrameForTestOnGlobalTimestamp(secondTimestamp,
failOnDataLoss = true,
+ options :+ ("startingoffsetsbytimestampstrategy", "latest"): _*)
+
+ testStream(mapped3)(
+ makeSureGetOffsetCalled,
+ Execute { q =>
+ val partitions = (0 to 4).map(new TopicPartition(topic, _))
+ // wait to reach the last offset in every partition
+ q.awaitOffset(
+ 0, KafkaSourceOffset(partitions.map(tp => tp -> 3L).toMap),
streamingTimeout.toMillis)
+ },
+ CheckAnswer(-21, -22, -11, -12, 2, 12),
+ Execute { q =>
+ sendMessagesWithTimestamp(topic, Array(23, 24, 25).map(_.toString), 4,
secondTimestamp)
+ // wait to reach the new last offset in every partition
+ val partitions = (0 to 3).map(new TopicPartition(topic, _)).map(tp =>
tp -> 3L) ++
+ Seq(new TopicPartition(topic, 4) -> 6L)
+ q.awaitOffset(
+ 0, KafkaSourceOffset(partitions.toMap), streamingTimeout.toMillis)
+ },
+ CheckNewAnswer(23, 24, 25)
+ )
+ }
+
+ private def assertQueryFailOnStartOffsetStrategyAsError(df: Dataset[_]):
Unit = {
+ // In continuous mode, the origin exception is not caught here
unfortunately, so we have to
+ // stick with checking general exception instead of verifying
IllegalArgumentException.
+ intercept[Exception] {
Review comment:
In continuous mode, the error message is completely different than we
expect. This is a log message I get from `exc.getMessage()`:
```
21/06/18 22:10:20.680 ScalaTest-run-running-KafkaContinuousSourceSuite WARN
KafkaContinuousSourceSuite:
IncrementalExecution was not created: The code passed to eventually never
returned normally. Attempted 1794 times over 30.009420817000002 seconds. Last
failure message: null equaled null.
```
The actual exception is logged, but we can't intercept the original
exception here.
```
21/06/18 22:09:53.747 stream execution thread for [id =
bc35e8a2-077e-4949-b8b6-47670d1c057f, runId =
d528f706-e553-4603-b022-fc0fe7cc0432] ERROR ContinuousExecution: Query [id =
bc35e8a2-077e-4949-b8b6-47670d1c057f, runId =
d528f706-e553-4603-b022-fc0fe7cc0432] terminated with error
java.lang.AssertionError: No offset matched from request of topic-partition
topic-31-4 and timestamp 1624021786438.
at
org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.$anonfun$readTimestampOffsets$1(KafkaOffsetReaderConsumer.scala:280)
at
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
at
scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:400)
at
scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:728)
at scala.collection.TraversableLike.map(TraversableLike.scala:286)
at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
at
org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.readTimestampOffsets(KafkaOffsetReaderConsumer.scala:271)
at
org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.$anonfun$fetchSpecificTimestampBasedOffsets$4(KafkaOffsetReaderConsumer.scala:227)
at
org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.$anonfun$fetchSpecificOffsets0$1(KafkaOffsetReaderConsumer.scala:305)
at
org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.$anonfun$partitionsAssignedToConsumer$2(KafkaOffsetReaderConsumer.scala:561)
at
org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.$anonfun$withRetriesWithoutInterrupt$1(KafkaOffsetReaderConsumer.scala:594)
at
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at
org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77)
at
org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.withRetriesWithoutInterrupt(KafkaOffsetReaderConsumer.scala:593)
at
org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.$anonfun$partitionsAssignedToConsumer$1(KafkaOffsetReaderConsumer.scala:547)
at
org.apache.spark.util.UninterruptibleThreadRunner.runUninterruptibly(UninterruptibleThreadRunner.scala:48)
at
org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.partitionsAssignedToConsumer(KafkaOffsetReaderConsumer.scala:547)
at
org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.fetchSpecificOffsets0(KafkaOffsetReaderConsumer.scala:301)
at
org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.fetchSpecificTimestampBasedOffsets(KafkaOffsetReaderConsumer.scala:233)
at
org.apache.spark.sql.kafka010.KafkaContinuousStream.initialOffset(KafkaContinuousStream.scala:74)
at
org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution$$anonfun$3.$anonfun$applyOrElse$4(ContinuousExecution.scala:170)
...
```
##########
File path:
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
##########
@@ -1568,6 +1565,137 @@ abstract class KafkaSourceSuiteBase extends
KafkaSourceTest {
}
}
+ test("subscribing topic by name from specific timestamps with non-matching
starting offset") {
+ val topic = newTopic()
+ testFromSpecificTimestampsWithNoMatchingStartingOffset(topic, "subscribe"
-> topic)
+ }
+
+ test("subscribing topic by name from global timestamp per topic with " +
+ "non-matching starting offset") {
+ val topic = newTopic()
+ testFromGlobalTimestampWithNoMatchingStartingOffset(topic, "subscribe" ->
topic)
+ }
+
+ test("subscribing topic by pattern from specific timestamps with " +
+ "non-matching starting offset") {
+ val topicPrefix = newTopic()
+ val topic = topicPrefix + "-suffix"
+ testFromSpecificTimestampsWithNoMatchingStartingOffset(topic,
+ "subscribePattern" -> s"$topicPrefix-.*")
+ }
+
+ test("subscribing topic by pattern from global timestamp per topic with " +
+ "non-matching starting offset") {
+ val topicPrefix = newTopic()
+ val topic = topicPrefix + "-suffix"
+ testFromGlobalTimestampWithNoMatchingStartingOffset(topic,
+ "subscribePattern" -> s"$topicPrefix-.*")
+ }
+
+ private def testFromSpecificTimestampsWithNoMatchingStartingOffset(
+ topic: String,
+ options: (String, String)*): Unit = {
+ testUtils.createTopic(topic, partitions = 5)
+
+ val firstTimestamp = System.currentTimeMillis() - 5000
+ val secondTimestamp = firstTimestamp + 1000
+ setupTestMessagesForTestOnTimestampOffsets(topic, firstTimestamp,
secondTimestamp)
+ // no data after second timestamp for partition 4
+
+ require(testUtils.getLatestOffsets(Set(topic)).size === 5)
+
+ // here we starts from second timestamp for all partitions, whereas we
know there's
+ // no data in partition 4 matching second timestamp
+ val startPartitionTimestamps: Map[TopicPartition, Long] =
+ (0 to 4).map(new TopicPartition(topic, _) -> secondTimestamp).toMap
+ val startingTimestamps =
JsonUtils.partitionTimestamps(startPartitionTimestamps)
+
+ val mapped = setupDataFrameForTestOnTimestampOffsets(startingTimestamps,
failOnDataLoss = true,
+ options: _*)
+ assertQueryFailOnStartOffsetStrategyAsError(mapped)
+
+ val mapped2 = setupDataFrameForTestOnTimestampOffsets(startingTimestamps,
failOnDataLoss = true,
+ options :+ ("startingoffsetsbytimestampstrategy", "error"): _*)
+ assertQueryFailOnStartOffsetStrategyAsError(mapped2)
+
+ val mapped3 = setupDataFrameForTestOnTimestampOffsets(startingTimestamps,
failOnDataLoss = true,
+ options :+ ("startingoffsetsbytimestampstrategy", "latest"): _*)
+
+ testStream(mapped3)(
+ makeSureGetOffsetCalled,
+ Execute { q =>
+ val partitions = (0 to 4).map(new TopicPartition(topic, _))
+ // wait to reach the last offset in every partition
+ q.awaitOffset(
+ 0, KafkaSourceOffset(partitions.map(tp => tp -> 3L).toMap),
streamingTimeout.toMillis)
+ },
+ CheckAnswer(-21, -22, -11, -12, 2, 12),
+ Execute { q =>
+ sendMessagesWithTimestamp(topic, Array(23, 24, 25).map(_.toString), 4,
secondTimestamp)
+ // wait to reach the new last offset in every partition
+ val partitions = (0 to 3).map(new TopicPartition(topic, _)).map(tp =>
tp -> 3L) ++
+ Seq(new TopicPartition(topic, 4) -> 6L)
+ q.awaitOffset(
+ 0, KafkaSourceOffset(partitions.toMap), streamingTimeout.toMillis)
+ },
+ CheckNewAnswer(23, 24, 25)
+ )
+ }
+
+ private def testFromGlobalTimestampWithNoMatchingStartingOffset(
+ topic: String,
+ options: (String, String)*): Unit = {
+ testUtils.createTopic(topic, partitions = 5)
+
+ val firstTimestamp = System.currentTimeMillis() - 5000
+ val secondTimestamp = firstTimestamp + 1000
+ setupTestMessagesForTestOnTimestampOffsets(topic, firstTimestamp,
secondTimestamp)
+
+ require(testUtils.getLatestOffsets(Set(topic)).size === 5)
+
+ // here we starts from second timestamp for all partitions, whereas we
know there's
+ // no data in partition 4 matching second timestamp
+
+ val mapped = setupDataFrameForTestOnGlobalTimestamp(secondTimestamp,
failOnDataLoss = true,
+ options: _*)
+ assertQueryFailOnStartOffsetStrategyAsError(mapped)
+
+ val mapped2 = setupDataFrameForTestOnGlobalTimestamp(secondTimestamp,
failOnDataLoss = true,
+ options :+ ("startingoffsetsbytimestampstrategy", "error"): _*)
+ assertQueryFailOnStartOffsetStrategyAsError(mapped2)
+
+ val mapped3 = setupDataFrameForTestOnGlobalTimestamp(secondTimestamp,
failOnDataLoss = true,
+ options :+ ("startingoffsetsbytimestampstrategy", "latest"): _*)
+
+ testStream(mapped3)(
+ makeSureGetOffsetCalled,
+ Execute { q =>
+ val partitions = (0 to 4).map(new TopicPartition(topic, _))
+ // wait to reach the last offset in every partition
+ q.awaitOffset(
+ 0, KafkaSourceOffset(partitions.map(tp => tp -> 3L).toMap),
streamingTimeout.toMillis)
+ },
+ CheckAnswer(-21, -22, -11, -12, 2, 12),
+ Execute { q =>
+ sendMessagesWithTimestamp(topic, Array(23, 24, 25).map(_.toString), 4,
secondTimestamp)
+ // wait to reach the new last offset in every partition
+ val partitions = (0 to 3).map(new TopicPartition(topic, _)).map(tp =>
tp -> 3L) ++
+ Seq(new TopicPartition(topic, 4) -> 6L)
+ q.awaitOffset(
+ 0, KafkaSourceOffset(partitions.toMap), streamingTimeout.toMillis)
+ },
+ CheckNewAnswer(23, 24, 25)
+ )
+ }
+
+ private def assertQueryFailOnStartOffsetStrategyAsError(df: Dataset[_]):
Unit = {
+ // In continuous mode, the origin exception is not caught here
unfortunately, so we have to
+ // stick with checking general exception instead of verifying
IllegalArgumentException.
+ intercept[Exception] {
Review comment:
In continuous mode, the error message is completely different than we
expect. This is a log message I get from `exc.getMessage()`:
```
21/06/18 22:10:20.680 ScalaTest-run-running-KafkaContinuousSourceSuite WARN
KafkaContinuousSourceSuite:
IncrementalExecution was not created: The code passed to eventually never
returned normally. Attempted 1794 times over 30.009420817000002 seconds. Last
failure message: null equaled null.
```
The actual exception is logged, but we can't intercept the original
exception here, hence can't find the relevant message from the exception being
intercepted.
```
21/06/18 22:09:53.747 stream execution thread for [id =
bc35e8a2-077e-4949-b8b6-47670d1c057f, runId =
d528f706-e553-4603-b022-fc0fe7cc0432] ERROR ContinuousExecution: Query [id =
bc35e8a2-077e-4949-b8b6-47670d1c057f, runId =
d528f706-e553-4603-b022-fc0fe7cc0432] terminated with error
java.lang.AssertionError: No offset matched from request of topic-partition
topic-31-4 and timestamp 1624021786438.
at
org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.$anonfun$readTimestampOffsets$1(KafkaOffsetReaderConsumer.scala:280)
at
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
at
scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:400)
at
scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:728)
at scala.collection.TraversableLike.map(TraversableLike.scala:286)
at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
at
org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.readTimestampOffsets(KafkaOffsetReaderConsumer.scala:271)
at
org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.$anonfun$fetchSpecificTimestampBasedOffsets$4(KafkaOffsetReaderConsumer.scala:227)
at
org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.$anonfun$fetchSpecificOffsets0$1(KafkaOffsetReaderConsumer.scala:305)
at
org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.$anonfun$partitionsAssignedToConsumer$2(KafkaOffsetReaderConsumer.scala:561)
at
org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.$anonfun$withRetriesWithoutInterrupt$1(KafkaOffsetReaderConsumer.scala:594)
at
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at
org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77)
at
org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.withRetriesWithoutInterrupt(KafkaOffsetReaderConsumer.scala:593)
at
org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.$anonfun$partitionsAssignedToConsumer$1(KafkaOffsetReaderConsumer.scala:547)
at
org.apache.spark.util.UninterruptibleThreadRunner.runUninterruptibly(UninterruptibleThreadRunner.scala:48)
at
org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.partitionsAssignedToConsumer(KafkaOffsetReaderConsumer.scala:547)
at
org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.fetchSpecificOffsets0(KafkaOffsetReaderConsumer.scala:301)
at
org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.fetchSpecificTimestampBasedOffsets(KafkaOffsetReaderConsumer.scala:233)
at
org.apache.spark.sql.kafka010.KafkaContinuousStream.initialOffset(KafkaContinuousStream.scala:74)
at
org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution$$anonfun$3.$anonfun$applyOrElse$4(ContinuousExecution.scala:170)
...
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]