HeartSaVioR commented on a change in pull request #31495:
URL: https://github.com/apache/spark/pull/31495#discussion_r571829604
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala
##########
@@ -46,6 +47,23 @@ import org.apache.spark.sql.connector.read.streaming.{Offset
=> OffsetV2}
class OffsetSeqLog(sparkSession: SparkSession, path: String)
extends HDFSMetadataLog[OffsetSeq](sparkSession, path) {
+ private val cachedMetadata = new ju.TreeMap[Long, OffsetSeq]()
+
+ override def add(batchId: Long, metadata: OffsetSeq): Boolean = {
+ val added = super.add(batchId, metadata)
+ if (added) {
+ // cache metadata as it will be read again
+ cachedMetadata.put(batchId, metadata)
+ // we don't access metadata for (batchId - 2) batches; evict them
Review comment:
This is another sort of micro-optimization; realistic latency of
micro-batch is 1s+ (doesn't matter if we consider very tight micro-batch, like
500ms) and we worry about creating "an" object per such period which will be
marked as "unused" after couple of batches.
This is the clear example why micro-optimization is bad without
understanding full context - optimization should evaluate about the impact and
proceed only when it contributes at least 1% (I'd rather not even concern about
1% though if the sub-optimal code is more intuitive).
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]