kete1987 commented on code in PR #56427:
URL: https://github.com/apache/spark/pull/56427#discussion_r3444604070


##########
connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala:
##########
@@ -675,12 +675,21 @@ abstract class KafkaRelationSuiteBase extends 
SharedSparkSession with KafkaTest
   test("resolved start offset greater than end offset (without latest)") {
     val topic = newTopic()
     testUtils.createTopic(topic, partitions = 3)
-    val timestamp1 =
-      testUtils.sendMessages(topic, Seq("0", "0").toArray, 
Some(0))(1)._2.timestamp()
-    val timestamp2 =
-      testUtils.sendMessages(topic, Seq("0", "0").toArray, 
Some(1))(1)._2.timestamp()
-    val timestamp3 =
-      testUtils.sendMessages(topic, Seq("0", "0").toArray, 
Some(2))(1)._2.timestamp()
+    // Two messages per partition with explicit, increasing timestamps so the 
second message
+    // (offset 1) is deterministically resolved by offsetsForTimes. Without 
explicit timestamps
+    // both messages can share a CreateTime millisecond and resolve to offset 
0, making the test
+    // flaky (the `eventually` below does not help: produced timestamps are 
fixed at produce time,

Review Comment:
   Agreed — removed the `eventually(60.seconds)` wrapper and replaced it with a 
flat `intercept`, matching the `(with latest)` twin. As you noted, the 
resolved-offset check fires synchronously on the driver and the produced 
timestamps are fixed, so retrying could never change the outcome.



##########
connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala:
##########
@@ -675,12 +675,21 @@ abstract class KafkaRelationSuiteBase extends 
SharedSparkSession with KafkaTest
   test("resolved start offset greater than end offset (without latest)") {
     val topic = newTopic()
     testUtils.createTopic(topic, partitions = 3)
-    val timestamp1 =
-      testUtils.sendMessages(topic, Seq("0", "0").toArray, 
Some(0))(1)._2.timestamp()
-    val timestamp2 =
-      testUtils.sendMessages(topic, Seq("0", "0").toArray, 
Some(1))(1)._2.timestamp()
-    val timestamp3 =
-      testUtils.sendMessages(topic, Seq("0", "0").toArray, 
Some(2))(1)._2.timestamp()
+    // Two messages per partition with explicit, increasing timestamps so the 
second message
+    // (offset 1) is deterministically resolved by offsetsForTimes. Without 
explicit timestamps
+    // both messages can share a CreateTime millisecond and resolve to offset 
0, making the test
+    // flaky (the `eventually` below does not help: produced timestamps are 
fixed at produce time,
+    // so retrying always resolves the same wrong offset).
+    def sendTwoWithDistinctTs(part: Int): Long = {

Review Comment:
   Done — `sendTwoWithDistinctTs` now returns `base + 1` directly instead of 
reading back the metadata timestamp. Added a one-line note in the comment that 
this relies on the default CreateTime semantics (broker echoes the producer 
timestamp unchanged).



##########
connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala:
##########
@@ -675,12 +675,21 @@ abstract class KafkaRelationSuiteBase extends 
SharedSparkSession with KafkaTest
   test("resolved start offset greater than end offset (without latest)") {
     val topic = newTopic()
     testUtils.createTopic(topic, partitions = 3)
-    val timestamp1 =
-      testUtils.sendMessages(topic, Seq("0", "0").toArray, 
Some(0))(1)._2.timestamp()
-    val timestamp2 =
-      testUtils.sendMessages(topic, Seq("0", "0").toArray, 
Some(1))(1)._2.timestamp()
-    val timestamp3 =
-      testUtils.sendMessages(topic, Seq("0", "0").toArray, 
Some(2))(1)._2.timestamp()
+    // Two messages per partition with explicit, increasing timestamps so the 
second message
+    // (offset 1) is deterministically resolved by offsetsForTimes. Without 
explicit timestamps
+    // both messages can share a CreateTime millisecond and resolve to offset 
0, making the test
+    // flaky (the `eventually` below does not help: produced timestamps are 
fixed at produce time,
+    // so retrying always resolves the same wrong offset).
+    def sendTwoWithDistinctTs(part: Int): Long = {
+      val base = System.currentTimeMillis()
+      testUtils.sendMessages(Seq(
+        new RecordBuilder(topic, "0").partition(part).timestamp(base).build(),
+        new RecordBuilder(topic, "0").partition(part).timestamp(base + 
1).build()
+      ))(1)._2.timestamp() // = base + 1, resolves to offset 1
+    }
+    val timestamp1 = sendTwoWithDistinctTs(0)

Review Comment:
   Done — switched to `(0 to 2).map(sendTwoWithDistinctTs)` to match 
`prepareTimestampRelatedUnitTest`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to