micheal-o commented on code in PR #52527:
URL: https://github.com/apache/spark/pull/52527#discussion_r2408433814
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBMemoryManager.scala:
##########
@@ -117,10 +117,45 @@ object RocksDBMemoryManager extends Logging with
UnmanagedMemoryConsumer {
totalMemoryUsage / numBoundedInstances
} else {
// In unbounded memory mode, each instance has its own memory
+
+ // Question: Should we return 0L if the instance is not registed in
instanceMemoryMap?
+ // but when it is not registed?
totalMemoryUsage
}
}
+ /**
+ * Get the pinned blocks memory usage for a specific instance, accounting
for bounded memory
+ * sharing.
+ * @param uniqueId The instance's unique identifier
+ * @param globalPinnedUsage The total pinned usage from the cache
+ * @return The adjusted pinned blocks memory usage accounting for sharing in
bounded memory mode
+ */
+ def getInstancePinnedBlocksMemUsage(
+ uniqueId: String,
+ globalPinnedUsage: Long,
+ requestedPinnedUsage: Long): Long = {
+ if (!instanceMemoryMap.containsKey(uniqueId)) {
+ // Instance not registered, return 0
+ return 0L
+ }
+
+ val instanceInfo = instanceMemoryMap.get(uniqueId)
+ if (instanceInfo.isBoundedMemory) {
+ // In bounded memory mode, divide by the number of bounded instances
+ // since they share the same cache
+ val numBoundedInstances = getNumRocksDBInstances(true /* boundedMemory
*/)
+ if (numBoundedInstances > 0) {
+ globalPinnedUsage / numBoundedInstances
+ } else {
+ 0L
Review Comment:
when would this happen? Given that we already found a boundedMemory instance
for this when we did `instanceMemoryMap.get(uniqueId)` above. So don't think
this is necessary.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBMemoryManager.scala:
##########
@@ -117,10 +117,45 @@ object RocksDBMemoryManager extends Logging with
UnmanagedMemoryConsumer {
totalMemoryUsage / numBoundedInstances
} else {
// In unbounded memory mode, each instance has its own memory
+
+ // Question: Should we return 0L if the instance is not registed in
instanceMemoryMap?
+ // but when it is not registed?
totalMemoryUsage
}
}
+ /**
+ * Get the pinned blocks memory usage for a specific instance, accounting
for bounded memory
+ * sharing.
+ * @param uniqueId The instance's unique identifier
+ * @param globalPinnedUsage The total pinned usage from the cache
+ * @return The adjusted pinned blocks memory usage accounting for sharing in
bounded memory mode
+ */
+ def getInstancePinnedBlocksMemUsage(
+ uniqueId: String,
+ globalPinnedUsage: Long,
+ requestedPinnedUsage: Long): Long = {
+ if (!instanceMemoryMap.containsKey(uniqueId)) {
+ // Instance not registered, return 0
+ return 0L
Review Comment:
why not just return the `requestedPinnedUsage` instead of 0L?
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBMemoryManager.scala:
##########
@@ -117,10 +117,45 @@ object RocksDBMemoryManager extends Logging with
UnmanagedMemoryConsumer {
totalMemoryUsage / numBoundedInstances
} else {
// In unbounded memory mode, each instance has its own memory
+
+ // Question: Should we return 0L if the instance is not registed in
instanceMemoryMap?
+ // but when it is not registed?
Review Comment:
If not registered, we currently just return the totalMemory passed in. Why
do you think 0L is better?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]