LuciferYang commented on code in PR #56427:
URL: https://github.com/apache/spark/pull/56427#discussion_r3443883464


##########
connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala:
##########
@@ -675,12 +675,21 @@ abstract class KafkaRelationSuiteBase extends 
SharedSparkSession with KafkaTest
   test("resolved start offset greater than end offset (without latest)") {
     val topic = newTopic()
     testUtils.createTopic(topic, partitions = 3)
-    val timestamp1 =
-      testUtils.sendMessages(topic, Seq("0", "0").toArray, 
Some(0))(1)._2.timestamp()
-    val timestamp2 =
-      testUtils.sendMessages(topic, Seq("0", "0").toArray, 
Some(1))(1)._2.timestamp()
-    val timestamp3 =
-      testUtils.sendMessages(topic, Seq("0", "0").toArray, 
Some(2))(1)._2.timestamp()
+    // Two messages per partition with explicit, increasing timestamps so the 
second message
+    // (offset 1) is deterministically resolved by offsetsForTimes. Without 
explicit timestamps
+    // both messages can share a CreateTime millisecond and resolve to offset 
0, making the test
+    // flaky (the `eventually` below does not help: produced timestamps are 
fixed at produce time,

Review Comment:
   The new comment at `KafkaRelationSuite.scala:681` admits: *"the `eventually` 
below does not help: produced timestamps are fixed at produce time, so retrying 
always resolves the same wrong offset."* Once that's said, leaving the wrapper 
in is dead code.
   
   No other source of intermittency exists on the path either: `createTopic` 
already does `waitUntilMetadataIsPropagated` (`KafkaTestUtils.scala:362-380`); 
`producer.send().get(10s)` waits on `ack=all` (`KafkaTestUtils.scala:434`); and 
the `RESOLVED_START_OFFSET_GREATER_THAN_END_OFFSET` check fires synchronously 
on the driver before any task runs — V1 via `KafkaRelation.buildScan` → 
`getOffsetRangesFromUnresolvedOffsets`, V2 via `KafkaBatch.planInputPartitions` 
→ same — so `df.collect()` gets the exception immediately on all four concrete 
suites (V1, V2, WithAdminV1, WithAdminV2). Keeping `eventually` only adds up to 
60 s of silent retries for any future unrelated flake, masking the very kind of 
issue this PR aims to surface. Twin test `(with latest)` at 
`KafkaRelationSuite.scala:760` already does a plain `intercept` without 
`eventually`; symmetry argues for the same here.
   
   Suggested change: replace the `eventually(timeout(60.seconds)) { val e = 
intercept[...] { df.collect() }; assert(...) }` block with a flat `val e = 
intercept[...] { df.collect() }; assert(...)`.
   



##########
connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala:
##########
@@ -675,12 +675,21 @@ abstract class KafkaRelationSuiteBase extends 
SharedSparkSession with KafkaTest
   test("resolved start offset greater than end offset (without latest)") {
     val topic = newTopic()
     testUtils.createTopic(topic, partitions = 3)
-    val timestamp1 =
-      testUtils.sendMessages(topic, Seq("0", "0").toArray, 
Some(0))(1)._2.timestamp()
-    val timestamp2 =
-      testUtils.sendMessages(topic, Seq("0", "0").toArray, 
Some(1))(1)._2.timestamp()
-    val timestamp3 =
-      testUtils.sendMessages(topic, Seq("0", "0").toArray, 
Some(2))(1)._2.timestamp()
+    // Two messages per partition with explicit, increasing timestamps so the 
second message
+    // (offset 1) is deterministically resolved by offsetsForTimes. Without 
explicit timestamps
+    // both messages can share a CreateTime millisecond and resolve to offset 
0, making the test
+    // flaky (the `eventually` below does not help: produced timestamps are 
fixed at produce time,
+    // so retrying always resolves the same wrong offset).
+    def sendTwoWithDistinctTs(part: Int): Long = {

Review Comment:
   this function ends with `testUtils.sendMessages(...)((1)._2.timestamp())` to 
read back the metadata timestamp of the second record. With explicit producer 
timestamps and a CreateTime topic (Spark's `KafkaTestUtils` doesn't override 
`log.message.timestamp.type`, so the broker default `CreateTime` applies), 
`metadata.timestamp()` returns exactly `base + 1`. Skipping the round-trip and 
returning `base + 1` directly makes the intent unambiguous and removes an 
implicit dependency on broker-side timestamp semantics. If the round-trip is 
intentionally defensive (e.g. for a hypothetical future LogAppendTime 
override), say so in a one-line comment.
   
   Suggested change:
   
   ```scala
   def sendTwoWithDistinctTs(part: Int): Long = {
   val base = System.currentTimeMillis()
   testUtils.sendMessages(Seq(
    new RecordBuilder(topic, "0").partition(part).timestamp(base).build(),
    new RecordBuilder(topic, "0").partition(part).timestamp(base + 1).build()
   ))
   base + 1
   }
   ```



##########
connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala:
##########
@@ -675,12 +675,21 @@ abstract class KafkaRelationSuiteBase extends 
SharedSparkSession with KafkaTest
   test("resolved start offset greater than end offset (without latest)") {
     val topic = newTopic()
     testUtils.createTopic(topic, partitions = 3)
-    val timestamp1 =
-      testUtils.sendMessages(topic, Seq("0", "0").toArray, 
Some(0))(1)._2.timestamp()
-    val timestamp2 =
-      testUtils.sendMessages(topic, Seq("0", "0").toArray, 
Some(1))(1)._2.timestamp()
-    val timestamp3 =
-      testUtils.sendMessages(topic, Seq("0", "0").toArray, 
Some(2))(1)._2.timestamp()
+    // Two messages per partition with explicit, increasing timestamps so the 
second message
+    // (offset 1) is deterministically resolved by offsetsForTimes. Without 
explicit timestamps
+    // both messages can share a CreateTime millisecond and resolve to offset 
0, making the test
+    // flaky (the `eventually` below does not help: produced timestamps are 
fixed at produce time,
+    // so retrying always resolves the same wrong offset).
+    def sendTwoWithDistinctTs(part: Int): Long = {
+      val base = System.currentTimeMillis()
+      testUtils.sendMessages(Seq(
+        new RecordBuilder(topic, "0").partition(part).timestamp(base).build(),
+        new RecordBuilder(topic, "0").partition(part).timestamp(base + 
1).build()
+      ))(1)._2.timestamp() // = base + 1, resolves to offset 1
+    }
+    val timestamp1 = sendTwoWithDistinctTs(0)

Review Comment:
   Pure style: `(0 to 2).map(sendTwoWithDistinctTs)` matches the surrounding 
helper `prepareTimestampRelatedUnitTest` (`KafkaRelationSuite.scala:366-369`). 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to