Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21221#discussion_r190405223
  
    --- Diff: 
core/src/main/scala/org/apache/spark/scheduler/PeakExecutorMetrics.scala ---
    @@ -0,0 +1,127 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.scheduler
    +
    +import org.apache.spark.executor.ExecutorMetrics
    +import org.apache.spark.status.api.v1.PeakMemoryMetrics
    +
    +/**
    + * Records the peak values for executor level metrics. If 
jvmUsedHeapMemory is -1, then no
    + * values have been recorded yet.
    + */
    +private[spark] class PeakExecutorMetrics {
    +  private var _jvmUsedHeapMemory = -1L;
    +  private var _jvmUsedNonHeapMemory = 0L;
    +  private var _onHeapExecutionMemory = 0L
    +  private var _offHeapExecutionMemory = 0L
    +  private var _onHeapStorageMemory = 0L
    +  private var _offHeapStorageMemory = 0L
    +  private var _onHeapUnifiedMemory = 0L
    +  private var _offHeapUnifiedMemory = 0L
    +  private var _directMemory = 0L
    +  private var _mappedMemory = 0L
    +
    +  def jvmUsedHeapMemory: Long = _jvmUsedHeapMemory
    +
    +  def jvmUsedNonHeapMemory: Long = _jvmUsedNonHeapMemory
    +
    +  def onHeapExecutionMemory: Long = _onHeapExecutionMemory
    +
    +  def offHeapExecutionMemory: Long = _offHeapExecutionMemory
    +
    +  def onHeapStorageMemory: Long = _onHeapStorageMemory
    +
    +  def offHeapStorageMemory: Long = _offHeapStorageMemory
    +
    +  def onHeapUnifiedMemory: Long = _onHeapUnifiedMemory
    +
    +  def offHeapUnifiedMemory: Long = _offHeapUnifiedMemory
    +
    +  def directMemory: Long = _directMemory
    +
    +  def mappedMemory: Long = _mappedMemory
    +
    +  /**
    +   * Compare the specified memory values with the saved peak executor 
memory
    +   * values, and update if there is a new peak value.
    +   *
    +   * @param executorMetrics the executor metrics to compare
    +   * @return if there is a new peak value for any metric
    +   */
    +  def compareAndUpdate(executorMetrics: ExecutorMetrics): Boolean = {
    +    var updated: Boolean = false
    +
    +    if (executorMetrics.jvmUsedHeapMemory > _jvmUsedHeapMemory) {
    +      _jvmUsedHeapMemory = executorMetrics.jvmUsedHeapMemory
    +      updated = true
    +    }
    +    if (executorMetrics.jvmUsedNonHeapMemory > _jvmUsedNonHeapMemory) {
    +      _jvmUsedNonHeapMemory = executorMetrics.jvmUsedNonHeapMemory
    +      updated = true
    +    }
    +    if (executorMetrics.onHeapExecutionMemory > _onHeapExecutionMemory) {
    +      _onHeapExecutionMemory = executorMetrics.onHeapExecutionMemory
    +      updated = true
    +    }
    +    if (executorMetrics.offHeapExecutionMemory > _offHeapExecutionMemory) {
    +      _offHeapExecutionMemory = executorMetrics.offHeapExecutionMemory
    +      updated = true
    +    }
    +    if (executorMetrics.onHeapStorageMemory > _onHeapStorageMemory) {
    +      _onHeapStorageMemory = executorMetrics.onHeapStorageMemory
    +      updated = true
    +    }
    +    if (executorMetrics.offHeapStorageMemory > _offHeapStorageMemory) {
    +      _offHeapStorageMemory = executorMetrics.offHeapStorageMemory
    --- End diff --
    
    I know spark has this kind of code all over the place already, but I really 
hate how error prone it is -- way too easy for a copy paste error to result in 
comparing the wrong two metrics, or updating the wrong value, or forgetting to 
update this when another metric is added, etc.
    
    I just opened this https://github.com/edwinalu/spark/pull/1 as another way 
to do this that would eliminate a ton of boilerplate IMO.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to