eason-yuchen-liu commented on code in PR #54550:
URL: https://github.com/apache/spark/pull/54550#discussion_r2868431609
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/LowLatencyMemoryStream.scala:
##########
@@ -278,21 +278,20 @@ class LowLatencyMemoryStreamPartitionReader(
throw new IllegalStateException("Task context was not set!")
}
override def nextWithTimeout(
- startTime: java.lang.Long, timeout: java.lang.Long): RecordStatus = {
+ startTimeMs: java.lang.Long, timeoutMs: java.lang.Long): RecordStatus = {
// SPARK-55699: Use the reference time passed in by the caller instead of
getting the latest
// time from LowLatencyClock, to avoid inconsistent reading when
LowLatencyClock is a
// manual clock.
- val startReadTime = startTime
var elapsedTimeMs = 0L
current = getRecordWithTimestamp
while (current.isEmpty) {
val POLL_TIME = 10L
- if (elapsedTimeMs >= timeout) {
+ if (elapsedTimeMs >= timeoutMs) {
return RecordStatus.newStatusWithoutArrivalTime(false)
}
Thread.sleep(POLL_TIME)
current = getRecordWithTimestamp
- elapsedTimeMs = (clock.nanoTime() - startReadTime) / 1000 / 1000
+ elapsedTimeMs = clock.getTimeMillis() - startTimeMs
Review Comment:
@HeartSaVioR This is the main change. Basically, in the last PR, we were
substract a millisecond off a nano second which is wrong.
I am not sure why no unit test is failing. Maybe this unveils that we have a
limited test coverage.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]