LuciferYang commented on code in PR #56427:
URL: https://github.com/apache/spark/pull/56427#discussion_r3443883464
##########
connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala:
##########
@@ -675,12 +675,21 @@ abstract class KafkaRelationSuiteBase extends
SharedSparkSession with KafkaTest
test("resolved start offset greater than end offset (without latest)") {
val topic = newTopic()
testUtils.createTopic(topic, partitions = 3)
- val timestamp1 =
- testUtils.sendMessages(topic, Seq("0", "0").toArray,
Some(0))(1)._2.timestamp()
- val timestamp2 =
- testUtils.sendMessages(topic, Seq("0", "0").toArray,
Some(1))(1)._2.timestamp()
- val timestamp3 =
- testUtils.sendMessages(topic, Seq("0", "0").toArray,
Some(2))(1)._2.timestamp()
+ // Two messages per partition with explicit, increasing timestamps so the
second message
+ // (offset 1) is deterministically resolved by offsetsForTimes. Without
explicit timestamps
+ // both messages can share a CreateTime millisecond and resolve to offset
0, making the test
+ // flaky (the `eventually` below does not help: produced timestamps are
fixed at produce time,
Review Comment:
The new comment at `KafkaRelationSuite.scala:681` admits: *"the `eventually`
below does not help: produced timestamps are fixed at produce time, so retrying
always resolves the same wrong offset."* Once that's said, leaving the wrapper
in is dead code.
No other source of intermittency exists on the path either: `createTopic`
already does `waitUntilMetadataIsPropagated` (`KafkaTestUtils.scala:362-380`);
`producer.send().get(10s)` waits on `ack=all` (`KafkaTestUtils.scala:434`); and
the `RESOLVED_START_OFFSET_GREATER_THAN_END_OFFSET` check fires synchronously
on the driver before any task runs — V1 via `KafkaRelation.buildScan` →
`getOffsetRangesFromUnresolvedOffsets`, V2 via `KafkaBatch.planInputPartitions`
→ same — so `df.collect()` gets the exception immediately on all four concrete
suites (V1, V2, WithAdminV1, WithAdminV2). Keeping `eventually` only adds up to
60 s of silent retries for any future unrelated flake, masking the very kind of
issue this PR aims to surface. Twin test `(with latest)` at
`KafkaRelationSuite.scala:760` already does a plain `intercept` without
`eventually`; symmetry argues for the same here.
Suggested change: replace the `eventually(timeout(60.seconds)) { val e =
intercept[...] { df.collect() }; assert(...) }` block with a flat `val e =
intercept[...] { df.collect() }; assert(...)`.
##########
connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala:
##########
@@ -675,12 +675,21 @@ abstract class KafkaRelationSuiteBase extends
SharedSparkSession with KafkaTest
test("resolved start offset greater than end offset (without latest)") {
val topic = newTopic()
testUtils.createTopic(topic, partitions = 3)
- val timestamp1 =
- testUtils.sendMessages(topic, Seq("0", "0").toArray,
Some(0))(1)._2.timestamp()
- val timestamp2 =
- testUtils.sendMessages(topic, Seq("0", "0").toArray,
Some(1))(1)._2.timestamp()
- val timestamp3 =
- testUtils.sendMessages(topic, Seq("0", "0").toArray,
Some(2))(1)._2.timestamp()
+ // Two messages per partition with explicit, increasing timestamps so the
second message
+ // (offset 1) is deterministically resolved by offsetsForTimes. Without
explicit timestamps
+ // both messages can share a CreateTime millisecond and resolve to offset
0, making the test
+ // flaky (the `eventually` below does not help: produced timestamps are
fixed at produce time,
+ // so retrying always resolves the same wrong offset).
+ def sendTwoWithDistinctTs(part: Int): Long = {
Review Comment:
this function ends with `testUtils.sendMessages(...)((1)._2.timestamp())` to
read back the metadata timestamp of the second record. With explicit producer
timestamps and a CreateTime topic (Spark's `KafkaTestUtils` doesn't override
`log.message.timestamp.type`, so the broker default `CreateTime` applies),
`metadata.timestamp()` returns exactly `base + 1`. Skipping the round-trip and
returning `base + 1` directly makes the intent unambiguous and removes an
implicit dependency on broker-side timestamp semantics. If the round-trip is
intentionally defensive (e.g. for a hypothetical future LogAppendTime
override), say so in a one-line comment.
Suggested change:
```scala
def sendTwoWithDistinctTs(part: Int): Long = {
val base = System.currentTimeMillis()
testUtils.sendMessages(Seq(
new RecordBuilder(topic, "0").partition(part).timestamp(base).build(),
new RecordBuilder(topic, "0").partition(part).timestamp(base + 1).build()
))
base + 1
}
```
##########
connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala:
##########
@@ -675,12 +675,21 @@ abstract class KafkaRelationSuiteBase extends
SharedSparkSession with KafkaTest
test("resolved start offset greater than end offset (without latest)") {
val topic = newTopic()
testUtils.createTopic(topic, partitions = 3)
- val timestamp1 =
- testUtils.sendMessages(topic, Seq("0", "0").toArray,
Some(0))(1)._2.timestamp()
- val timestamp2 =
- testUtils.sendMessages(topic, Seq("0", "0").toArray,
Some(1))(1)._2.timestamp()
- val timestamp3 =
- testUtils.sendMessages(topic, Seq("0", "0").toArray,
Some(2))(1)._2.timestamp()
+ // Two messages per partition with explicit, increasing timestamps so the
second message
+ // (offset 1) is deterministically resolved by offsetsForTimes. Without
explicit timestamps
+ // both messages can share a CreateTime millisecond and resolve to offset
0, making the test
+ // flaky (the `eventually` below does not help: produced timestamps are
fixed at produce time,
+ // so retrying always resolves the same wrong offset).
+ def sendTwoWithDistinctTs(part: Int): Long = {
+ val base = System.currentTimeMillis()
+ testUtils.sendMessages(Seq(
+ new RecordBuilder(topic, "0").partition(part).timestamp(base).build(),
+ new RecordBuilder(topic, "0").partition(part).timestamp(base +
1).build()
+ ))(1)._2.timestamp() // = base + 1, resolves to offset 1
+ }
+ val timestamp1 = sendTwoWithDistinctTs(0)
Review Comment:
Pure style: `(0 to 2).map(sendTwoWithDistinctTs)` matches the surrounding
helper `prepareTimestampRelatedUnitTest` (`KafkaRelationSuite.scala:366-369`).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]