HeartSaVioR commented on a change in pull request #31495:
URL: https://github.com/apache/spark/pull/31495#discussion_r571841510
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala
##########
@@ -46,6 +47,23 @@ import org.apache.spark.sql.connector.read.streaming.{Offset
=> OffsetV2}
class OffsetSeqLog(sparkSession: SparkSession, path: String)
extends HDFSMetadataLog[OffsetSeq](sparkSession, path) {
+ private val cachedMetadata = new ju.TreeMap[Long, OffsetSeq]()
+
+ override def add(batchId: Long, metadata: OffsetSeq): Boolean = {
+ val added = super.add(batchId, metadata)
+ if (added) {
+ // cache metadata as it will be read again
+ cachedMetadata.put(batchId, metadata)
+ // we don't access metadata for (batchId - 2) batches; evict them
Review comment:
The reason I am concerning on the input on possible micro-optimization
is that it changes the viewpoint of the codebase to performance and doesn't
evaluate based on other viewpoints. There're bunch of points to discuss on this
change, is this better to be put to OffsetSeq or better to place it on both
MicroBatchExecution/ContinuousExecution (and why), which code would be most
intuitive to maintainers or contributors, etc. Once the performance perspective
comes in and gains traction, other perspectives tend to be disregarded. That
should be just one of viewpoints, and shouldn't be overvalued.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]