kete1987 commented on code in PR #56427:
URL: https://github.com/apache/spark/pull/56427#discussion_r3444604070
##########
connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala:
##########
@@ -675,12 +675,21 @@ abstract class KafkaRelationSuiteBase extends
SharedSparkSession with KafkaTest
test("resolved start offset greater than end offset (without latest)") {
val topic = newTopic()
testUtils.createTopic(topic, partitions = 3)
- val timestamp1 =
- testUtils.sendMessages(topic, Seq("0", "0").toArray,
Some(0))(1)._2.timestamp()
- val timestamp2 =
- testUtils.sendMessages(topic, Seq("0", "0").toArray,
Some(1))(1)._2.timestamp()
- val timestamp3 =
- testUtils.sendMessages(topic, Seq("0", "0").toArray,
Some(2))(1)._2.timestamp()
+ // Two messages per partition with explicit, increasing timestamps so the
second message
+ // (offset 1) is deterministically resolved by offsetsForTimes. Without
explicit timestamps
+ // both messages can share a CreateTime millisecond and resolve to offset
0, making the test
+ // flaky (the `eventually` below does not help: produced timestamps are
fixed at produce time,
Review Comment:
Agreed — removed the `eventually(60.seconds)` wrapper and replaced it with a
flat `intercept`, matching the `(with latest)` twin. As you noted, the
resolved-offset check fires synchronously on the driver and the produced
timestamps are fixed, so retrying could never change the outcome.
##########
connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala:
##########
@@ -675,12 +675,21 @@ abstract class KafkaRelationSuiteBase extends
SharedSparkSession with KafkaTest
test("resolved start offset greater than end offset (without latest)") {
val topic = newTopic()
testUtils.createTopic(topic, partitions = 3)
- val timestamp1 =
- testUtils.sendMessages(topic, Seq("0", "0").toArray,
Some(0))(1)._2.timestamp()
- val timestamp2 =
- testUtils.sendMessages(topic, Seq("0", "0").toArray,
Some(1))(1)._2.timestamp()
- val timestamp3 =
- testUtils.sendMessages(topic, Seq("0", "0").toArray,
Some(2))(1)._2.timestamp()
+ // Two messages per partition with explicit, increasing timestamps so the
second message
+ // (offset 1) is deterministically resolved by offsetsForTimes. Without
explicit timestamps
+ // both messages can share a CreateTime millisecond and resolve to offset
0, making the test
+ // flaky (the `eventually` below does not help: produced timestamps are
fixed at produce time,
+ // so retrying always resolves the same wrong offset).
+ def sendTwoWithDistinctTs(part: Int): Long = {
Review Comment:
Done — `sendTwoWithDistinctTs` now returns `base + 1` directly instead of
reading back the metadata timestamp. Added a one-line note in the comment that
this relies on the default CreateTime semantics (broker echoes the producer
timestamp unchanged).
##########
connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala:
##########
@@ -675,12 +675,21 @@ abstract class KafkaRelationSuiteBase extends
SharedSparkSession with KafkaTest
test("resolved start offset greater than end offset (without latest)") {
val topic = newTopic()
testUtils.createTopic(topic, partitions = 3)
- val timestamp1 =
- testUtils.sendMessages(topic, Seq("0", "0").toArray,
Some(0))(1)._2.timestamp()
- val timestamp2 =
- testUtils.sendMessages(topic, Seq("0", "0").toArray,
Some(1))(1)._2.timestamp()
- val timestamp3 =
- testUtils.sendMessages(topic, Seq("0", "0").toArray,
Some(2))(1)._2.timestamp()
+ // Two messages per partition with explicit, increasing timestamps so the
second message
+ // (offset 1) is deterministically resolved by offsetsForTimes. Without
explicit timestamps
+ // both messages can share a CreateTime millisecond and resolve to offset
0, making the test
+ // flaky (the `eventually` below does not help: produced timestamps are
fixed at produce time,
+ // so retrying always resolves the same wrong offset).
+ def sendTwoWithDistinctTs(part: Int): Long = {
+ val base = System.currentTimeMillis()
+ testUtils.sendMessages(Seq(
+ new RecordBuilder(topic, "0").partition(part).timestamp(base).build(),
+ new RecordBuilder(topic, "0").partition(part).timestamp(base +
1).build()
+ ))(1)._2.timestamp() // = base + 1, resolves to offset 1
+ }
+ val timestamp1 = sendTwoWithDistinctTs(0)
Review Comment:
Done — switched to `(0 to 2).map(sendTwoWithDistinctTs)` to match
`prepareTimestampRelatedUnitTest`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]