HeartSaVioR commented on a change in pull request #31495:
URL: https://github.com/apache/spark/pull/31495#discussion_r571792973
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala
##########
@@ -46,6 +47,23 @@ import org.apache.spark.sql.connector.read.streaming.{Offset
=> OffsetV2}
class OffsetSeqLog(sparkSession: SparkSession, path: String)
extends HDFSMetadataLog[OffsetSeq](sparkSession, path) {
+ private val cachedMetadata = new ju.TreeMap[Long, OffsetSeq]()
+
+ override def add(batchId: Long, metadata: OffsetSeq): Boolean = {
+ val added = super.add(batchId, metadata)
+ if (added) {
+ // cache metadata as it will be read again
+ cachedMetadata.put(batchId, metadata)
+ // we don't access metadata for (batchId - 2) batches; evict them
Review comment:
```
import java.lang.{Long => JLong}
import java.util.{ArrayList, Collections, TreeMap}
def c(treeMap: TreeMap[Long, String]): Long = {
val t1 = System.nanoTime()
treeMap.put(1, "1")
treeMap.put(2, "3")
treeMap.put(3, "3")
treeMap.headMap(2, true).clear()
(System.nanoTime() - t1)
}
def d(treeMap: TreeMap[Long, String], idx: Long, value: String): Long = {
val t1 = System.nanoTime()
treeMap.put(idx, value)
treeMap.headMap(idx - 2, true).clear()
(System.nanoTime() - t1)
}
def experimentC(): Unit = {
val latencies = new ArrayList[JLong]()
val warmupCount = 1000000
val runCount = 10000000
(1 to warmupCount).foreach { _ =>
val t = new java.util.TreeMap[Long, String]()
c(t)
}
(1 to runCount).foreach { _ =>
val t = new java.util.TreeMap[Long, String]()
latencies.add(JLong.valueOf(c(t)))
}
java.util.Collections.sort(latencies)
printLatencies(latencies)
}
def experimentD(): Unit = {
val latencies = new ArrayList[JLong]()
val warmupCount = 1000000
val runCount = 10000000
val t = new java.util.TreeMap[Long, String]()
(1 to warmupCount).foreach { idx =>
d(t, idx, idx.toString)
}
val t2 = new java.util.TreeMap[Long, String]()
(1 to runCount).foreach { idx =>
latencies.add(JLong.valueOf(d(t2, idx, idx.toString)))
}
printLatencies(latencies)
}
def printLatencies(latencies: ArrayList[JLong]): Unit = {
val arraySize = latencies.size()
val minIdx = 0
val maxIdx = arraySize - 1
val percentile50 = (arraySize * 0.5).toInt
val percentile90 = (arraySize * 0.9).toInt
val percentile99 = (arraySize * 0.99).toInt
val percentile999 = (arraySize * 0.999).toInt
val percentile9999 = (arraySize * 0.9999).toInt
val percentile99999 = (arraySize * 0.99999).toInt
val percentile999999 = (arraySize * 0.999999).toInt
java.util.Collections.sort(latencies)
Seq(minIdx, percentile50, percentile90, percentile99, percentile999,
percentile9999, percentile99999, percentile999999, maxIdx).foreach { idx =>
printLatency(latencies, idx)
}
}
def printLatency(latencies: ArrayList[JLong], idx: Int): Unit = {
println(s"$idx th : ${latencies.get(idx) / 1000} microseconds =
${latencies.get(idx) / 1000000} milliseconds")
}
// experimentC()
/*
0 th : 0 microseconds = 0 milliseconds
5000000 th : 0 microseconds = 0 milliseconds
9000000 th : 0 microseconds = 0 milliseconds
9900000 th : 0 microseconds = 0 milliseconds
9990000 th : 1 microseconds = 0 milliseconds
9999000 th : 9 microseconds = 0 milliseconds
9999900 th : 37 microseconds = 0 milliseconds
9999990 th : 223 microseconds = 0 milliseconds
9999999 th : 53612 microseconds = 53 milliseconds
*/
experimentD()
/*
0 th : 0 microseconds = 0 milliseconds
5000000 th : 0 microseconds = 0 milliseconds
9000000 th : 0 microseconds = 0 milliseconds
9900000 th : 0 microseconds = 0 milliseconds
9990000 th : 0 microseconds = 0 milliseconds
9999000 th : 6 microseconds = 0 milliseconds
9999900 th : 25 microseconds = 0 milliseconds
9999990 th : 150 microseconds = 0 milliseconds
9999999 th : 57887 microseconds = 57 milliseconds
*/
```
2018 13-inch MBP, i7 quad-core 2.7Ghz
```
./bin/spark-shell --driver-memory 2g
...
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 3.0.1
/_/
Using Scala version 2.12.10 (Java HotSpot(TM) 64-Bit Server VM, Java
1.8.0_191)
```
Still think this really matters?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]