Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11118#discussion_r52203370
  
    --- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/ApplicationCache.scala ---
    @@ -0,0 +1,669 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.deploy.history
    +
    +import java.util.NoSuchElementException
    +import javax.servlet.{DispatcherType, Filter, FilterChain, FilterConfig, 
ServletException, ServletRequest, ServletResponse}
    +import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
    +
    +import scala.collection.JavaConverters._
    +import scala.util.control.NonFatal
    +
    +import com.codahale.metrics.{Counter, MetricRegistry, Timer}
    +import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache, 
RemovalListener, RemovalNotification}
    +import org.eclipse.jetty.servlet.FilterHolder
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.metrics.source.Source
    +import org.apache.spark.ui.SparkUI
    +import org.apache.spark.util.Clock
    +
    +/**
    + * Cache for applications.
    + *
    + * Completed applications are cached for as long as there is capacity for 
them.
    + * Incompleted applications have their update time checked on every
    + * retrieval; if the cached entry is out of date, it is refreshed.
    + *
    + * @note there must be only one instance of [[ApplicationCache]] in a
    + * JVM at a time. This is because a static field in 
[[ApplicationCacheCheckFilterRelay]]
    + * keeps a reference to the cache so that HTTP requests on the 
attempt-specific web UIs
    + * can probe the current cache to see if the attempts have changed.
    + *
    + * Creating multiple instances will break this routing.
    + * @param operations implementation of record access operations
    + * @param refreshInterval interval between refreshes in milliseconds.
    + * @param retainedApplications number of retained applications
    + * @param clock time source
    + */
    +private[history] class ApplicationCache(
    +    val operations: ApplicationCacheOperations,
    +    val refreshInterval: Long,
    +    val retainedApplications: Int,
    +    val clock: Clock) extends Logging {
    +
    +  /**
    +   * Services the load request from the cache.
    +   */
    +  private val appLoader = new CacheLoader[CacheKey, CacheEntry] {
    +
    +    /** the cache key doesn't match a cached entry, or the entry is 
out-of-date, so load it. */
    +    override def load(key: CacheKey): CacheEntry = {
    +      loadApplicationEntry(key.appId, key.attemptId)
    +    }
    +
    +  }
    +
    +  /**
    +   * Handler for callbacks from the cache of entry removal.
    +   */
    +  private val removalListener = new RemovalListener[CacheKey, CacheEntry] {
    +
    +    /**
    +     * Removal event notifies the provider to detach the UI.
    +     * @param rm removal notification
    +     */
    +    override def onRemoval(rm: RemovalNotification[CacheKey, CacheEntry]): 
Unit = {
    +      metrics.evictionCount.inc()
    +      val key = rm.getKey
    +      logDebug(s"Evicting entry ${key}")
    +      operations.detachSparkUI(key.appId, key.attemptId, rm.getValue().ui)
    +    }
    +  }
    +
    +  /**
    +   * The cache of applications.
    +   *
    +   * Tagged as `protected` so as to allow subclasses in tests to accesss 
it directly
    +   */
    +  protected val appCache: LoadingCache[CacheKey, CacheEntry] = {
    +    CacheBuilder.newBuilder()
    +        .maximumSize(retainedApplications)
    +        .removalListener(removalListener)
    +        .build(appLoader)
    +  }
    +
    +  /**
    +   * The metrics which are updated as the cache is used.
    +   */
    +  val metrics = new CacheMetrics("history.cache")
    +
    +  init()
    +
    +  /**
    +   * Perform any startup operations.
    +   *
    +   * This includes declaring this instance as the cache to use in the
    +   * [[ApplicationCacheCheckFilterRelay]].
    +   */
    +  private def init(): Unit = {
    +    ApplicationCacheCheckFilterRelay.setApplicationCache(this)
    +  }
    +
    +  /**
    +   * Stop the cache.
    +   * This will reset the relay in [[ApplicationCacheCheckFilterRelay]].
    +   */
    +  def stop(): Unit = {
    +    ApplicationCacheCheckFilterRelay.resetApplicationCache()
    +  }
    +
    +  /**
    +   * Get an entry.
    +   *
    +   * Cache fetch/refresh will have taken place by the time this method 
returns.
    +   * @param appAndAttempt application to look up in the format needed by 
the history server web UI,
    +   *                      `appId/attemptId` or `appId`.
    +   * @return the entry
    +   */
    +  def get(appAndAttempt: String): SparkUI = {
    +    val parts = splitAppAndAttemptKey(appAndAttempt)
    +    get(parts._1, parts._2)
    +  }
    +
    +  /**
    +   * Get the Spark UI, converting a lookup failure from an exception to 
`None`.
    +   * @param appAndAttempt application to look up in the format needed by 
the history server web UI,
    +   *                      `appId/attemptId` or `appId`.
    +   * @return the entry
    +   */
    +  def getSparkUI(appAndAttempt: String): Option[SparkUI] = {
    +    try {
    +      val ui = get(appAndAttempt)
    +      Some(ui)
    +    } catch {
    +      case NonFatal(e) => e.getCause() match {
    +        case nsee: NoSuchElementException =>
    +          None
    +        case cause: Exception => throw cause
    +      }
    +    }
    +  }
    +
    +  /**
    +   * Get the associated spark UI.
    +   *
    +   * Cache fetch/refresh will have taken place by the time this method 
returns.
    +   * @param appId application ID
    +   * @param attemptId optional attempt ID
    +   * @return the entry
    +   */
    +  def get(appId: String, attemptId: Option[String]): SparkUI = {
    +    lookupAndUpdate(appId, attemptId)._1.ui
    +  }
    +
    +  /**
    +   * Look up the entry; update it if needed.
    +   * @param appId application ID
    +   * @param attemptId optional attempt ID
    +   * @return the underlying cache entry -which can have its timestamp 
changed, and a flag to
    +   *         indicate that the entry has changed
    +   */
    +  private def lookupAndUpdate(appId: String, attemptId: Option[String]): 
(CacheEntry, Boolean) = {
    +    metrics.lookupCount.inc()
    +    val cacheKey = CacheKey(appId, attemptId)
    +    var entry = appCache.getIfPresent(cacheKey)
    +    var updated = false
    +    if (entry == null) {
    +      // no entry, so fetch without any post-fetch probes for 
out-of-dateness
    +      // this will trigger a callback to loadApplicationEntry()
    +      entry = appCache.get(cacheKey)
    +    } else if (!entry.completed) {
    +      val now = clock.getTimeMillis()
    +      if (now - entry.probeTime > refreshInterval) {
    +        log.debug(s"Probing at time $now for updated application $cacheKey 
-> $entry")
    +        metrics.updateProbeCount.inc()
    +        updated = time(metrics.updateProbeTimer) {
    +          entry.updateProbe()
    --- End diff --
    
    Note that this check is now extremely cheap (at least with the 
`FSHistoryProvider`).  Actually checking for an update to the logs happens on 
its own schedule, as that scans logs looking for both new apps and updates to 
existing ones.  That suggests that we could either drop this extra interval 
completely, and just do this check on every request, or if we want to leave it 
for other `HistoryProvider`s, we could at least make the default very rapid.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to