Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/6935#discussion_r47553168
  
    --- Diff: 
core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala 
---
    @@ -0,0 +1,476 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.deploy.history
    +
    +import java.util.{Date, NoSuchElementException}
    +
    +import javax.servlet.Filter
    +
    +import scala.collection.mutable
    +import scala.collection.mutable.ListBuffer
    +import scala.language.postfixOps
    +
    +import com.codahale.metrics.Counter
    +import com.google.common.cache.LoadingCache
    +import com.google.common.util.concurrent.UncheckedExecutionException
    +import org.eclipse.jetty.servlet.ServletContextHandler
    +import org.mockito.Mockito._
    +import org.scalatest.Matchers
    +import org.scalatest.mock.MockitoSugar
    +
    +import org.apache.spark.status.api.v1.{ApplicationAttemptInfo => 
AttemptInfo, ApplicationInfo}
    +import org.apache.spark.ui.SparkUI
    +import org.apache.spark.util.{Clock, ManualClock, Utils}
    +import org.apache.spark.{Logging, SparkFunSuite}
    +
    +class ApplicationCacheSuite extends SparkFunSuite with Logging with 
MockitoSugar with Matchers {
    +
    +  /**
    +   * subclass with access to the cache internals
    +   * @param refreshInterval interval between refreshes in milliseconds.
    +   * @param retainedApplications number of retained applications
    +   */
    +  class TestApplicationCache(
    +      operations: ApplicationCacheOperations = new StubCacheOperations(),
    +      refreshInterval: Long,
    +      retainedApplications: Int,
    +      clock: Clock = new ManualClock(0))
    +      extends ApplicationCache(operations, refreshInterval, 
retainedApplications, clock) {
    +
    +    def cache(): LoadingCache[CacheKey, CacheEntry] = appCache
    +  }
    +
    +  /**
    +   * Stub cache operations.
    +   * The state is kept in a map of [[CacheKey]] to [[CacheEntry]],
    +   * the `probeTime` field in the cache entry setting the timestamp of the 
entry
    +   */
    +  class StubCacheOperations extends ApplicationCacheOperations with 
Logging {
    +
    +    /** map to UI instances, including timestamps, which are used in 
update probes */
    +    val instances = mutable.HashMap.empty[CacheKey, CacheEntry]
    +
    +    /** Map of attached spark UIs */
    +    val attached = mutable.HashMap.empty[CacheKey, SparkUI]
    +
    +    var getAppUICount = 0L
    +    var attachCount = 0L
    +    var detachCount = 0L
    +    var updateProbeCount = 0L
    +
    +    /**
    +     * Get the application UI
    +     * @param appId application ID
    +     * @param attemptId attempt ID
    +     * @return If found, the Spark UI and any history information to be 
used in the cache
    +     */
    +    override def getAppUI(appId: String, attemptId: Option[String]): 
Option[LoadedAppUI] = {
    +      logDebug(s"getAppUI($appId, $attemptId)")
    +      getAppUICount += 1
    +      instances.get(CacheKey(appId, attemptId)).map( e =>
    +        LoadedAppUI(e.ui, Some(new 
StubHistoryProviderUpdateState(e.probeTime))))
    +    }
    +
    +    override def attachSparkUI(appId: String, attemptId: Option[String], 
ui: SparkUI,
    +        completed: Boolean): Unit = {
    +      logDebug(s"attachSparkUI($appId, $attemptId, $ui)")
    +      attachCount += 1
    +      attached += (CacheKey(appId, attemptId) -> ui)
    +    }
    +
    +    def putAndAttach(appId: String, attemptId: Option[String], completed: 
Boolean, started: Long,
    +        ended: Long, timestamp: Long): SparkUI = {
    +      val ui = putAppUI(appId, attemptId, completed, started, ended, 
timestamp)
    +      attachSparkUI(appId, attemptId, ui, completed)
    +      ui
    +    }
    +
    +    def putAppUI(appId: String, attemptId: Option[String], completed: 
Boolean, started: Long,
    +        ended: Long, timestamp: Long): SparkUI = {
    +      val ui = newUI(appId, attemptId, completed, started, ended)
    +      putInstance(appId, attemptId, ui, completed, timestamp)
    +      ui
    +    }
    +
    +    def putInstance(appId: String, attemptId: Option[String], ui: SparkUI, 
completed: Boolean,
    +        timestamp: Long): Unit = {
    +      instances += (CacheKey(appId, attemptId) ->
    +          new CacheEntry(ui, completed, None, timestamp))
    +    }
    +
    +    /**
    +     * Detach a reconstructed UI
    +     *
    +     * @param ui Spark UI
    +     */
    +    override def detachSparkUI(appId: String, attemptId: Option[String], 
ui: SparkUI): Unit = {
    +      logDebug(s"detachSparkUI($appId, $attemptId, $ui)")
    +      detachCount += 1
    +      var name = ui.getAppName
    +      val key = CacheKey(appId, attemptId)
    +      attached.getOrElse(key, { throw new 
java.util.NoSuchElementException() })
    +      attached -= key
    +    }
    +
    +    /**
    +     * Update state probe.
    +     * @param appId application ID
    +     * @param attemptId optional attempt ID
    +     * @param updateState state containing the timestamp of the data 
previously loaded.
    +     * @return true if the application has been updated
    +     */
    +    override def isUpdated(
    +        appId: String,
    +        attemptId: Option[String],
    +        updateState: Option[HistoryProviderUpdateState]): Boolean = {
    +      updateProbeCount += 1
    +      val updateTimeMillis = 
updateState.get.asInstanceOf[StubHistoryProviderUpdateState].updateTime
    +      logDebug(s"isUpdated($appId, $attemptId, $updateTimeMillis)")
    +      val entry = instances.get(CacheKey(appId, attemptId)).get
    +      val updated = entry.probeTime > updateTimeMillis
    +      logDebug(s"entry = $entry; updated = $updated")
    +      updated
    +    }
    +
    +    /**
    +     * Lookup from the internal cache of attached UIs
    +     */
    +    def getAttached(appId: String, attemptId: Option[String]): 
Option[SparkUI] = {
    +      attached.get(CacheKey(appId, attemptId))
    +    }
    +
    +  }
    +
    +  /**
    +   * The update state for the [[StubCacheOperations]]
    +   * @param updateTime a timestamp
    +   */
    +  private[history] class StubHistoryProviderUpdateState(val updateTime: 
Long)
    +      extends HistoryProviderUpdateState
    +
    +  /**
    +   * Create a new UI. The info/attempt info classes here are from the 
package
    +   * `org.apache.spark.status.api.v1`, not the near-equivalents from the 
history package
    +   */
    +  def newUI(name: String, attemptId: Option[String], completed: Boolean, 
started: Long,
    +      ended: Long): SparkUI = {
    +    val info = new ApplicationInfo(name, name, Some(1), Some(1), Some(1), 
Some(64),
    +      Seq(new AttemptInfo(attemptId, new Date(started), new Date(ended), 
"user", completed)))
    +    val ui = mock[SparkUI]
    +    when(ui.getApplicationInfoList).thenReturn(List(info).iterator)
    +    when(ui.getAppName).thenReturn(name)
    +    when(ui.appName).thenReturn(name)
    +    val handler = new ServletContextHandler()
    +    when(ui.getHandlers).thenReturn(Seq(handler))
    +    ui
    +  }
    +
    +  /**
    +   * Test operations on completed UIs: they are loaded on demand, entries
    +   * are removed on overload.
    +   *
    +   * This effectively tests the original behavior of the history server's 
cache.
    +   */
    +  test("Completed UI get") {
    +    val operations = new StubCacheOperations()
    +    val clock = new ManualClock(1)
    +    implicit val cache = new ApplicationCache(operations, 5, 2, clock)
    +    val metrics = cache.metrics
    +    // cache misses
    +    val app1 = "app-1"
    +    assertNotFound(app1, None)
    +    assertMetric("lookupCount", metrics.lookupCount, 1)
    +    assertMetric("lookupFailureCount", metrics.lookupFailureCount, 1)
    +    assert(1 === operations.getAppUICount, "getAppUICount")
    +    assertNotFound(app1, None)
    +    assert(2 === operations.getAppUICount, "getAppUICount")
    +    assert(0 === operations.attachCount, "attachCount")
    +
    +    val now = clock.getTimeMillis()
    +    // add the entry
    +    operations.putAppUI(app1, None, true, now, now, now)
    +
    +    // make sure its local
    +    operations.getAppUI(app1, None).get
    +    operations.getAppUICount = 0
    +    // now expect it to be found
    +    val cacheEntry = cache.lookupCacheEntry(app1, None)
    +    assert(1 === cacheEntry.probeTime)
    +    assert(cacheEntry.completed)
    +    // assert about queries made of the opereations
    +    assert(1 === operations.getAppUICount, "getAppUICount")
    +    assert(1 === operations.attachCount, "attachCount")
    +
    +    // and in the map of attached
    +    assert(operations.getAttached(app1, None).isDefined, s"attached entry 
'1' from $cache")
    +
    +    // go forward in time
    +    clock.setTime(10)
    +    val time2 = clock.getTimeMillis()
    +    val cacheEntry2 = cache.get(app1)
    +    // no more refresh as this is a completed app
    +    assert(1 === operations.getAppUICount, "getAppUICount")
    +    assert(0 === operations.updateProbeCount, "updateProbeCount")
    +    assert(0 === operations.detachCount, "attachCount")
    +
    +    // evict the entry
    +    operations.putAndAttach("2", None, true, time2, time2, time2)
    +    operations.putAndAttach("3", None, true, time2, time2, time2)
    +    cache.get("2")
    +    cache.get("3")
    +
    +    // there should have been a detachment here
    +    assert(1 === operations.detachCount, s"detach count from $cache")
    +    // and entry app1 no longer attached
    +    assert(operations.getAttached(app1, None).isEmpty, s"get($app1) in 
$cache")
    +    val appId = "app1"
    +    val attemptId = Some("_01")
    +    val time3 = clock.getTimeMillis()
    +    operations.putAppUI(appId, attemptId, false, time3, 0, time3)
    +    // expect an error here
    +    assertNotFound(appId, None)
    +  }
    +
    +  /**
    +   * Test that if an attempt ID is is set, it must be used in lookups
    +   */
    +  test("Naming") {
    --- End diff --
    
    can you just put the comment as the test name, "if an attempt ID is is set, 
it must be used in lookups"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to