Github user gengliangwang commented on a diff in the pull request:
https://github.com/apache/spark/pull/19751#discussion_r151321285
--- Diff:
core/src/main/scala/org/apache/spark/status/ElementTrackingStore.scala ---
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status
+
+import java.util.concurrent.TimeUnit
+
+import scala.collection.mutable.{HashMap, ListBuffer}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.util.{ThreadUtils, Utils}
+import org.apache.spark.util.kvstore._
+
+/**
+ * A KVStore wrapper that allows tracking the number of elements of
specific types, and triggering
+ * actions once they reach a threshold. This allows writers, for example,
to control how much data
+ * is stored by potentially deleting old data as new data is added.
+ *
+ * This store is used when populating data either from a live UI or an
event log. On top of firing
+ * triggers when elements reach a certain threshold, it provides two extra
bits of functionality:
+ *
+ * - a generic worker thread that can be used to run expensive tasks
asynchronously; the tasks can
+ * be configured to run on the calling thread when more determinism is
desired (e.g. unit tests).
+ * - a generic flush mechanism so that listeners can be notified about
when they should flush
+ * internal state to the store (e.g. after the SHS finishes parsing an
event log).
+ *
+ * The configured triggers are run on the same thread that triggered the
write, after the write
+ * has completed.
+ */
+private[spark] class ElementTrackingStore(store: KVStore, conf: SparkConf)
extends KVStore {
+
+ import config._
+
+ private val triggers = new HashMap[Class[_], Seq[Trigger[_]]]()
+ private val flushTriggers = new ListBuffer[() => Unit]()
+ private val executor = if (conf.get(ASYNC_TRACKING_ENABLED)) {
+
Some(ThreadUtils.newDaemonSingleThreadExecutor("element-tracking-store-worker"))
+ } else {
+ None
+ }
+
+ @volatile private var stopped = false
+
+ /**
+ * Register a trigger that will be fired once the number of elements of
a given type reaches
+ * the given threshold.
+ *
+ * Triggers are fired in a separate thread, so that they can do more
expensive operations
+ * than would be allowed on the main threads populating the store.
+ *
+ * @param klass The type to monitor.
+ * @param threshold The number of elements that should trigger the
action.
+ * @param action Action to run when the threshold is reached; takes as a
parameter the number
+ * of elements of the registered type currently known to
be in the store.
+ */
+ def addTrigger(klass: Class[_], threshold: Long)(action: Long => Unit):
Unit = {
+ val existing = triggers.getOrElse(klass, Seq())
+ triggers(klass) = existing :+ Trigger(threshold, action)
+ }
+
+ /**
+ * Adds a trigger to be executed before the store is flushed. This
normally happens before
+ * closing, and is useful for flushing intermediate state to the store,
e.g. when replaying
+ * in-progress applications through the SHS.
+ *
+ * Flush triggers are called synchronously in the same thread that is
closing the store.
+ */
+ def onFlush(action: => Unit): Unit = {
+ flushTriggers += { () => action }
+ }
+
+ /**
+ * Enqueues an action to be executed asynchronously.
+ */
+ def doAsync(fn: => Unit): Unit = {
+ executor match {
+ case Some(exec) =>
+ exec.submit(new Runnable() {
+ override def run(): Unit = Utils.tryLog { fn }
+ })
+
+ case _ =>
+ fn
+ }
+ }
+
+ override def read[T](klass: Class[T], naturalKey: Any): T =
store.read(klass, naturalKey)
+
+ override def write(value: Any): Unit = store.write(value)
+
+ /** Write an element to the store, optionally checking for whether to
fire triggers. */
+ def write(value: Any, checkTriggers: Boolean): Unit = {
+ write(value)
+
+ if (checkTriggers && !stopped) {
+ triggers.get(value.getClass()).foreach { list =>
--- End diff --
we should remove the empty parens after `getClass`
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]