jerrypeng commented on code in PR #52620:
URL: https://github.com/apache/spark/pull/52620#discussion_r2461485055
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RealTimeStreamScanExec.scala:
##########
@@ -37,3 +49,121 @@ object LowLatencyClock {
clock = inputClock
}
}
+
+/**
+ * A wrap reader that turns a Partition Reader extending SupportsRealTimeRead
to a
+ * normal PartitionReader and follow the task termination time
`lowLatencyEndTime`, and
+ * report end offsets in the end to `endOffsets`.
+ */
+case class LowLatencyReaderWrap(
+ reader: SupportsRealTimeRead[InternalRow],
+ lowLatencyEndTime: Long,
+ endOffsets: CollectionAccumulator[PartitionOffsetWithIndex])
+ extends PartitionReader[InternalRow] {
+
+ override def next(): Boolean = {
+ val curTime = LowLatencyClock.getTimeMillis()
+ val ret = if (curTime >= lowLatencyEndTime) {
+ RecordStatus.newStatusWithoutArrivalTime(false)
+ } else {
+ reader.nextWithTimeout(lowLatencyEndTime - curTime)
+ }
+
+ if (!ret.hasRecord) {
+ // The way of using TaskContext.get().partitionId() to map to a partition
+ // may be fragile as it relies on thread locals.
Review Comment:
It is currently not a problem but the implementation may be fragile to
changes, e.g. a scenario that when this method is called is not from same
thread as the task thread.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]