Github user cloud-fan commented on a diff in the pull request:
https://github.com/apache/spark/pull/19751#discussion_r157639788
--- Diff:
core/src/main/scala/org/apache/spark/status/ElementTrackingStore.scala ---
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status
+
+import java.util.concurrent.TimeUnit
+
+import scala.collection.mutable.{HashMap, ListBuffer}
+
+import com.google.common.util.concurrent.MoreExecutors
+
+import org.apache.spark.SparkConf
+import org.apache.spark.util.{ThreadUtils, Utils}
+import org.apache.spark.util.kvstore._
+
+/**
+ * A KVStore wrapper that allows tracking the number of elements of
specific types, and triggering
+ * actions once they reach a threshold. This allows writers, for example,
to control how much data
+ * is stored by potentially deleting old data as new data is added.
+ *
+ * This store is used when populating data either from a live UI or an
event log. On top of firing
+ * triggers when elements reach a certain threshold, it provides two extra
bits of functionality:
+ *
+ * - a generic worker thread that can be used to run expensive tasks
asynchronously; the tasks can
+ * be configured to run on the calling thread when more determinism is
desired (e.g. unit tests).
+ * - a generic flush mechanism so that listeners can be notified about
when they should flush
+ * internal state to the store (e.g. after the SHS finishes parsing an
event log).
+ *
+ * The configured triggers are run on a separate thread by default; they
can be forced to run on
+ * the calling thread by setting the `ASYNC_TRACKING_ENABLED`
configuration to `false`.
+ */
+private[spark] class ElementTrackingStore(store: KVStore, conf: SparkConf)
extends KVStore {
+
+ import config._
+
+ private val triggers = new HashMap[Class[_], Seq[Trigger[_]]]()
+ private val flushTriggers = new ListBuffer[() => Unit]()
+ private val executor = if (conf.get(ASYNC_TRACKING_ENABLED)) {
+
ThreadUtils.newDaemonSingleThreadExecutor("element-tracking-store-worker")
+ } else {
+ MoreExecutors.sameThreadExecutor()
+ }
+
+ @volatile private var stopped = false
+
+ /**
+ * Register a trigger that will be fired once the number of elements of
a given type reaches
+ * the given threshold.
+ *
+ * @param klass The type to monitor.
+ * @param threshold The number of elements that should trigger the
action.
+ * @param action Action to run when the threshold is reached; takes as a
parameter the number
+ * of elements of the registered type currently known to
be in the store.
+ */
+ def addTrigger(klass: Class[_], threshold: Long)(action: Long => Unit):
Unit = {
+ val existing = triggers.getOrElse(klass, Seq())
+ triggers(klass) = existing :+ Trigger(threshold, action)
+ }
+
+ /**
+ * Adds a trigger to be executed before the store is flushed. This
normally happens before
+ * closing, and is useful for flushing intermediate state to the store,
e.g. when replaying
+ * in-progress applications through the SHS.
+ *
+ * Flush triggers are called synchronously in the same thread that is
closing the store.
+ */
+ def onFlush(action: => Unit): Unit = {
+ flushTriggers += { () => action }
+ }
+
+ /**
+ * Enqueues an action to be executed asynchronously. The task will run
on the calling thread if
+ * `ASYNC_TRACKING_ENABLED` is `false`.
+ */
+ def doAsync(fn: => Unit): Unit = {
+ executor.submit(new Runnable() {
+ override def run(): Unit = Utils.tryLog { fn }
+ })
+ }
+
+ override def read[T](klass: Class[T], naturalKey: Any): T =
store.read(klass, naturalKey)
+
+ override def write(value: Any): Unit = store.write(value)
+
+ /** Write an element to the store, optionally checking for whether to
fire triggers. */
+ def write(value: Any, checkTriggers: Boolean): Unit = {
+ write(value)
+
+ if (checkTriggers && !stopped) {
+ triggers.get(value.getClass()).foreach { list =>
+ doAsync {
+ val count = store.count(value.getClass())
+ list.foreach { t =>
+ if (count > t.threshold) {
+ t.action(count)
+ }
+ }
+ }
+ }
+ }
+ }
+
+ override def delete(klass: Class[_], naturalKey: Any): Unit =
store.delete(klass, naturalKey)
+
+ override def getMetadata[T](klass: Class[T]): T =
store.getMetadata(klass)
+
+ override def setMetadata(value: Any): Unit = store.setMetadata(value)
+
+ override def view[T](klass: Class[T]): KVStoreView[T] = store.view(klass)
+
+ override def count(klass: Class[_]): Long = store.count(klass)
+
+ override def count(klass: Class[_], index: String, indexedValue: Any):
Long = {
+ store.count(klass, index, indexedValue)
+ }
+
+ override def close(): Unit = {
+ close(true)
+ }
+
+ /** A close() method that optionally leaves the parent store open. */
+ def close(closeParent: Boolean): Unit = synchronized {
+ if (stopped) {
+ return
+ }
+
+ stopped = true
+ executor.shutdown()
+ if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
+ executor.shutdownNow()
+ }
+
+ flushTriggers.foreach { trigger =>
--- End diff --
`flush` sounds like we would do it periodicly, how about `closeTriggers`?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]