Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19751#discussion_r156775928
  
    --- Diff: 
core/src/main/scala/org/apache/spark/status/ElementTrackingStore.scala ---
    @@ -0,0 +1,168 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.status
    +
    +import java.util.concurrent.TimeUnit
    +
    +import scala.collection.mutable.{HashMap, ListBuffer}
    +
    +import org.apache.spark.SparkConf
    +import org.apache.spark.util.{ThreadUtils, Utils}
    +import org.apache.spark.util.kvstore._
    +
    +/**
    + * A KVStore wrapper that allows tracking the number of elements of 
specific types, and triggering
    + * actions once they reach a threshold. This allows writers, for example, 
to control how much data
    + * is stored by potentially deleting old data as new data is added.
    + *
    + * This store is used when populating data either from a live UI or an 
event log. On top of firing
    + * triggers when elements reach a certain threshold, it provides two extra 
bits of functionality:
    + *
    + * - a generic worker thread that can be used to run expensive tasks 
asynchronously; the tasks can
    + *   be configured to run on the calling thread when more determinism is 
desired (e.g. unit tests).
    + * - a generic flush mechanism so that listeners can be notified about 
when they should flush
    + *   internal state to the store (e.g. after the SHS finishes parsing an 
event log).
    + *
    + * The configured triggers are run on the same thread that triggered the 
write, after the write
    + * has completed.
    + */
    +private[spark] class ElementTrackingStore(store: KVStore, conf: SparkConf) 
extends KVStore {
    +
    +  import config._
    +
    +  private val triggers = new HashMap[Class[_], Seq[Trigger[_]]]()
    +  private val flushTriggers = new ListBuffer[() => Unit]()
    +  private val executor = if (conf.get(ASYNC_TRACKING_ENABLED)) {
    +    
Some(ThreadUtils.newDaemonSingleThreadExecutor("element-tracking-store-worker"))
    +  } else {
    +    None
    +  }
    +
    +  @volatile private var stopped = false
    +
    +  /**
    +   * Register a trigger that will be fired once the number of elements of 
a given type reaches
    +   * the given threshold.
    +   *
    +   * Triggers are fired in a separate thread, so that they can do more 
expensive operations
    +   * than would be allowed on the main threads populating the store.
    +   *
    +   * @param klass The type to monitor.
    +   * @param threshold The number of elements that should trigger the 
action.
    +   * @param action Action to run when the threshold is reached; takes as a 
parameter the number
    +   *               of elements of the registered type currently known to 
be in the store.
    +   */
    +  def addTrigger(klass: Class[_], threshold: Long)(action: Long => Unit): 
Unit = {
    +    val existing = triggers.getOrElse(klass, Seq())
    +    triggers(klass) = existing :+ Trigger(threshold, action)
    +  }
    +
    +  /**
    +   * Adds a trigger to be executed before the store is flushed. This 
normally happens before
    +   * closing, and is useful for flushing intermediate state to the store, 
e.g. when replaying
    +   * in-progress applications through the SHS.
    +   *
    +   * Flush triggers are called synchronously in the same thread that is 
closing the store.
    +   */
    +  def onFlush(action: => Unit): Unit = {
    +    flushTriggers += { () => action }
    +  }
    +
    +  /**
    +   * Enqueues an action to be executed asynchronously.
    +   */
    +  def doAsync(fn: => Unit): Unit = {
    +    executor match {
    +      case Some(exec) =>
    +        exec.submit(new Runnable() {
    +          override def run(): Unit = Utils.tryLog { fn }
    +        })
    +
    +      case _ =>
    +        fn
    +    }
    +  }
    +
    +  override def read[T](klass: Class[T], naturalKey: Any): T = 
store.read(klass, naturalKey)
    +
    +  override def write(value: Any): Unit = store.write(value)
    +
    +  /** Write an element to the store, optionally checking for whether to 
fire triggers. */
    +  def write(value: Any, checkTriggers: Boolean): Unit = {
    +    write(value)
    +
    +    if (checkTriggers && !stopped) {
    +      triggers.get(value.getClass()).foreach { list =>
    +        doAsync {
    +          val count = store.count(value.getClass())
    --- End diff --
    
    ok on another read, I see that tasks are actually tracked specially and 
don't use this trigger mechanism.  Though, still this api isn't really that 
useful in the end -- its good for jobs and stages, but not actually the right 
count for executors,  and you don't use it for tasks.  still might be easier to 
just track those other counts directly, without going through `kvstore.count()`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to