vanzin commented on a change in pull request #23649: [SPARK-26726] The amount 
of memory used by the broadcast variable is …
URL: https://github.com/apache/spark/pull/23649#discussion_r251545277
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
 ##########
 @@ -995,6 +996,34 @@ private[spark] class AppStatusListener(
     }
   }
 
+  private def updateBroadcastBlock(event: SparkListenerBlockUpdated,
+     broadcast: BroadcastBlockId): Unit = {
+    val now = System.nanoTime()
+    val executorId = event.blockUpdatedInfo.blockManagerId.executorId
+    val storageLevel = event.blockUpdatedInfo.storageLevel
+
+    // Whether values are being added to or removed from the existing 
accounting.
+    val diskDelta = event.blockUpdatedInfo.diskSize * (if 
(storageLevel.useDisk) 1 else -1)
+    val memoryDelta = event.blockUpdatedInfo.memSize * (if 
(storageLevel.useMemory) 1 else -1)
+
+    // Function to apply a delta to a value, but ensure that it doesn't go 
negative.
+    def newValue(old: Long, delta: Long): Long = math.max(0, old + delta)
+
+    val maybeExec = liveExecutors.get(executorId)
+    maybeExec.foreach { exec =>
+      if (exec.hasMemoryInfo) {
 
 Review comment:
   This block exists in a very similar form in two other places. Feels like 
time to have a helper method.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to