Github user squito commented on a diff in the pull request:
https://github.com/apache/spark/pull/6935#discussion_r47553490
--- Diff:
core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala
---
@@ -0,0 +1,476 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.util.{Date, NoSuchElementException}
+
+import javax.servlet.Filter
+
+import scala.collection.mutable
+import scala.collection.mutable.ListBuffer
+import scala.language.postfixOps
+
+import com.codahale.metrics.Counter
+import com.google.common.cache.LoadingCache
+import com.google.common.util.concurrent.UncheckedExecutionException
+import org.eclipse.jetty.servlet.ServletContextHandler
+import org.mockito.Mockito._
+import org.scalatest.Matchers
+import org.scalatest.mock.MockitoSugar
+
+import org.apache.spark.status.api.v1.{ApplicationAttemptInfo =>
AttemptInfo, ApplicationInfo}
+import org.apache.spark.ui.SparkUI
+import org.apache.spark.util.{Clock, ManualClock, Utils}
+import org.apache.spark.{Logging, SparkFunSuite}
+
+class ApplicationCacheSuite extends SparkFunSuite with Logging with
MockitoSugar with Matchers {
+
+ /**
+ * subclass with access to the cache internals
+ * @param refreshInterval interval between refreshes in milliseconds.
+ * @param retainedApplications number of retained applications
+ */
+ class TestApplicationCache(
+ operations: ApplicationCacheOperations = new StubCacheOperations(),
+ refreshInterval: Long,
+ retainedApplications: Int,
+ clock: Clock = new ManualClock(0))
+ extends ApplicationCache(operations, refreshInterval,
retainedApplications, clock) {
+
+ def cache(): LoadingCache[CacheKey, CacheEntry] = appCache
+ }
+
+ /**
+ * Stub cache operations.
+ * The state is kept in a map of [[CacheKey]] to [[CacheEntry]],
+ * the `probeTime` field in the cache entry setting the timestamp of the
entry
+ */
+ class StubCacheOperations extends ApplicationCacheOperations with
Logging {
+
+ /** map to UI instances, including timestamps, which are used in
update probes */
+ val instances = mutable.HashMap.empty[CacheKey, CacheEntry]
+
+ /** Map of attached spark UIs */
+ val attached = mutable.HashMap.empty[CacheKey, SparkUI]
+
+ var getAppUICount = 0L
+ var attachCount = 0L
+ var detachCount = 0L
+ var updateProbeCount = 0L
+
+ /**
+ * Get the application UI
+ * @param appId application ID
+ * @param attemptId attempt ID
+ * @return If found, the Spark UI and any history information to be
used in the cache
+ */
+ override def getAppUI(appId: String, attemptId: Option[String]):
Option[LoadedAppUI] = {
+ logDebug(s"getAppUI($appId, $attemptId)")
+ getAppUICount += 1
+ instances.get(CacheKey(appId, attemptId)).map( e =>
+ LoadedAppUI(e.ui, Some(new
StubHistoryProviderUpdateState(e.probeTime))))
+ }
+
+ override def attachSparkUI(appId: String, attemptId: Option[String],
ui: SparkUI,
+ completed: Boolean): Unit = {
+ logDebug(s"attachSparkUI($appId, $attemptId, $ui)")
+ attachCount += 1
+ attached += (CacheKey(appId, attemptId) -> ui)
+ }
+
+ def putAndAttach(appId: String, attemptId: Option[String], completed:
Boolean, started: Long,
+ ended: Long, timestamp: Long): SparkUI = {
+ val ui = putAppUI(appId, attemptId, completed, started, ended,
timestamp)
+ attachSparkUI(appId, attemptId, ui, completed)
+ ui
+ }
+
+ def putAppUI(appId: String, attemptId: Option[String], completed:
Boolean, started: Long,
+ ended: Long, timestamp: Long): SparkUI = {
+ val ui = newUI(appId, attemptId, completed, started, ended)
+ putInstance(appId, attemptId, ui, completed, timestamp)
+ ui
+ }
+
+ def putInstance(appId: String, attemptId: Option[String], ui: SparkUI,
completed: Boolean,
+ timestamp: Long): Unit = {
+ instances += (CacheKey(appId, attemptId) ->
+ new CacheEntry(ui, completed, None, timestamp))
+ }
+
+ /**
+ * Detach a reconstructed UI
+ *
+ * @param ui Spark UI
+ */
+ override def detachSparkUI(appId: String, attemptId: Option[String],
ui: SparkUI): Unit = {
+ logDebug(s"detachSparkUI($appId, $attemptId, $ui)")
+ detachCount += 1
+ var name = ui.getAppName
+ val key = CacheKey(appId, attemptId)
+ attached.getOrElse(key, { throw new
java.util.NoSuchElementException() })
+ attached -= key
+ }
+
+ /**
+ * Update state probe.
+ * @param appId application ID
+ * @param attemptId optional attempt ID
+ * @param updateState state containing the timestamp of the data
previously loaded.
+ * @return true if the application has been updated
+ */
+ override def isUpdated(
+ appId: String,
+ attemptId: Option[String],
+ updateState: Option[HistoryProviderUpdateState]): Boolean = {
+ updateProbeCount += 1
+ val updateTimeMillis =
updateState.get.asInstanceOf[StubHistoryProviderUpdateState].updateTime
+ logDebug(s"isUpdated($appId, $attemptId, $updateTimeMillis)")
+ val entry = instances.get(CacheKey(appId, attemptId)).get
+ val updated = entry.probeTime > updateTimeMillis
+ logDebug(s"entry = $entry; updated = $updated")
+ updated
+ }
+
+ /**
+ * Lookup from the internal cache of attached UIs
+ */
+ def getAttached(appId: String, attemptId: Option[String]):
Option[SparkUI] = {
+ attached.get(CacheKey(appId, attemptId))
+ }
+
+ }
+
+ /**
+ * The update state for the [[StubCacheOperations]]
+ * @param updateTime a timestamp
+ */
+ private[history] class StubHistoryProviderUpdateState(val updateTime:
Long)
+ extends HistoryProviderUpdateState
+
+ /**
+ * Create a new UI. The info/attempt info classes here are from the
package
+ * `org.apache.spark.status.api.v1`, not the near-equivalents from the
history package
+ */
+ def newUI(name: String, attemptId: Option[String], completed: Boolean,
started: Long,
+ ended: Long): SparkUI = {
+ val info = new ApplicationInfo(name, name, Some(1), Some(1), Some(1),
Some(64),
+ Seq(new AttemptInfo(attemptId, new Date(started), new Date(ended),
"user", completed)))
+ val ui = mock[SparkUI]
+ when(ui.getApplicationInfoList).thenReturn(List(info).iterator)
+ when(ui.getAppName).thenReturn(name)
+ when(ui.appName).thenReturn(name)
+ val handler = new ServletContextHandler()
+ when(ui.getHandlers).thenReturn(Seq(handler))
+ ui
+ }
+
+ /**
+ * Test operations on completed UIs: they are loaded on demand, entries
+ * are removed on overload.
+ *
+ * This effectively tests the original behavior of the history server's
cache.
+ */
+ test("Completed UI get") {
+ val operations = new StubCacheOperations()
+ val clock = new ManualClock(1)
+ implicit val cache = new ApplicationCache(operations, 5, 2, clock)
+ val metrics = cache.metrics
+ // cache misses
+ val app1 = "app-1"
+ assertNotFound(app1, None)
+ assertMetric("lookupCount", metrics.lookupCount, 1)
+ assertMetric("lookupFailureCount", metrics.lookupFailureCount, 1)
+ assert(1 === operations.getAppUICount, "getAppUICount")
+ assertNotFound(app1, None)
+ assert(2 === operations.getAppUICount, "getAppUICount")
+ assert(0 === operations.attachCount, "attachCount")
+
+ val now = clock.getTimeMillis()
+ // add the entry
+ operations.putAppUI(app1, None, true, now, now, now)
+
+ // make sure its local
+ operations.getAppUI(app1, None).get
+ operations.getAppUICount = 0
+ // now expect it to be found
+ val cacheEntry = cache.lookupCacheEntry(app1, None)
+ assert(1 === cacheEntry.probeTime)
+ assert(cacheEntry.completed)
+ // assert about queries made of the opereations
+ assert(1 === operations.getAppUICount, "getAppUICount")
+ assert(1 === operations.attachCount, "attachCount")
+
+ // and in the map of attached
+ assert(operations.getAttached(app1, None).isDefined, s"attached entry
'1' from $cache")
+
+ // go forward in time
+ clock.setTime(10)
+ val time2 = clock.getTimeMillis()
+ val cacheEntry2 = cache.get(app1)
+ // no more refresh as this is a completed app
+ assert(1 === operations.getAppUICount, "getAppUICount")
+ assert(0 === operations.updateProbeCount, "updateProbeCount")
+ assert(0 === operations.detachCount, "attachCount")
+
+ // evict the entry
+ operations.putAndAttach("2", None, true, time2, time2, time2)
+ operations.putAndAttach("3", None, true, time2, time2, time2)
+ cache.get("2")
+ cache.get("3")
+
+ // there should have been a detachment here
+ assert(1 === operations.detachCount, s"detach count from $cache")
+ // and entry app1 no longer attached
+ assert(operations.getAttached(app1, None).isEmpty, s"get($app1) in
$cache")
+ val appId = "app1"
+ val attemptId = Some("_01")
+ val time3 = clock.getTimeMillis()
+ operations.putAppUI(appId, attemptId, false, time3, 0, time3)
+ // expect an error here
+ assertNotFound(appId, None)
+ }
+
+ /**
+ * Test that if an attempt ID is is set, it must be used in lookups
+ */
+ test("Naming") {
+ val operations = new StubCacheOperations()
+ val clock = new ManualClock(1)
+ implicit val cache = new ApplicationCache(operations,
+ refreshInterval = 5, retainedApplications = 10, clock = clock)
+ val appId = "app1"
+ val attemptId = Some("_01")
+ operations.putAppUI(appId, attemptId, false, clock.getTimeMillis(), 0,
0)
+ assertNotFound(appId, None)
+ }
+
+ /**
+ * Test that incomplete apps are not probed for updates during the time
window,
+ * but that they are checked if that window has expired and they are not
completed.
+ * Then, if they have changed, the old entry is replaced by a new one.
+ */
+ test("Incomplete apps refreshed") {
+ val operations = new StubCacheOperations()
+ val clock = new ManualClock(50)
+ val window = 500
+ val halfw = window / 2
+ implicit val cache = new ApplicationCache(operations,
+ refreshInterval = window, retainedApplications = 5, clock = clock)
+ val metrics = cache.metrics
+ // add the incomplete app
+ // add the entry
+ val started = clock.getTimeMillis()
+ val appId = "app1"
+ val attemptId = Some("001")
+ operations.putAppUI(appId, attemptId, false, started, 0, started)
+ val firstEntry = cache.lookupCacheEntry(appId, attemptId)
+ assert(started === firstEntry.probeTime, s"timestamp in $firstEntry")
+ assert(!firstEntry.completed, s"entry is complete: $firstEntry")
+ assertMetric("lookupCount", metrics.lookupCount, 1)
+
+ assert(0 === operations.updateProbeCount, "expected no update probe on
that first get")
+
+ // lookups within the refresh window returns the same value
+ clock.setTime(halfw)
+ assertCacheEntryEquals(appId, attemptId, firstEntry)
+ assertMetric("updateProbeCount", metrics.updateProbeCount, 0)
+ assertMetric("lookupCount", metrics.lookupCount, 2)
+ assert(0 === operations.updateProbeCount, "expected no updated probe
within the time window")
+
+ // but now move the ticker past that refresh
+ val checkTime = window * 2
+ clock.setTime(checkTime)
+ assert((clock.getTimeMillis() - firstEntry.probeTime) >
cache.refreshInterval)
+ val entry3 = cache.lookupCacheEntry(appId, attemptId)
+ assert(firstEntry !== entry3, s"updated entry test from $cache")
+ assertMetric("lookupCount", metrics.lookupCount, 3)
+ assertMetric("updateProbeCount", metrics.updateProbeCount, 1)
+ assertMetric("updateTriggeredCount", metrics.updateTriggeredCount, 0)
+ assert(1 === operations.updateProbeCount, s"refresh count in $cache")
+ assert(0 === operations.detachCount, s"detach count")
+ assert(entry3.probeTime === checkTime)
+
+ val updateTime = window * 2 + halfw
+ // update the cached value. This won't get picked up on until after
the refresh interval
+ val updatedApp = operations.putAppUI(appId, attemptId, true, started,
updateTime, updateTime)
+
+ // go a little bit forward again and the refresh window means no new
probe
+ clock.setTime(updateTime + 1)
+ assertCacheEntryEquals(appId, attemptId, entry3)
+ assertMetric("lookupCount", metrics.lookupCount, 4)
+ assertMetric("updateProbeCount", metrics.updateProbeCount, 1)
+
+ // and but once past the window again, a probe is triggered and the
change collected
+ val endTime = window * 10
+ clock.setTime(endTime)
+ logDebug(s"Before operation = $cache")
+ val entry5 = cache.lookupCacheEntry(appId, attemptId)
+ assertMetric("lookupCount", metrics.lookupCount, 5)
+ assertMetric("updateProbeCount", metrics.updateProbeCount, 2)
+ // the update was triggered
+ assertMetric("updateTriggeredCount", metrics.updateTriggeredCount, 1)
+ assert(updatedApp === entry5.ui, s"UI {$updatedApp} did not match
entry {$entry5} in $cache")
+
+ // at which point, the refreshes stop
+ clock.setTime(window * 20)
+ assertCacheEntryEquals(appId, attemptId, entry5)
+ assertMetric("updateProbeCount", metrics.updateProbeCount, 2)
+ }
+
+ /**
+ * Assert that a metric counter has a specific value; failure raises an
exception
+ * including the cache's toString value
+ * @param name counter name (for exceptions)
+ * @param counter counter
+ * @param expected expected value.
+ * @param cache cache
+ */
+ def assertMetric(name: String, counter: Counter, expected:
Long)(implicit cache: ApplicationCache)
+ : Unit = {
+ val actual = counter.getCount
+ if (actual != expected) {
+ // this is here because Scalatest loses stack depth
+ throw new Exception(s"Wrong $name value - expected $expected but got
$actual in $cache")
+ }
+ }
+
+ /**
+ * Look up the cache entry and assert that it maches in the expected
value.
+ * This assertion works if the two CacheEntries are different -it looks
at the fields.
+ * UI are compared on object equality; the timestamp and completed flags
directly.
+ * @param appId application ID
+ * @param attemptId attempt ID
+ * @param expected expected value
+ * @param cache app cache
+ */
+ def assertCacheEntryEquals(appId: String, attemptId: Option[String],
+ expected: CacheEntry)(implicit cache: ApplicationCache): Unit = {
+ val actual = cache.lookupCacheEntry(appId, attemptId)
+ val errorText = s"Expected get($appId, $attemptId) -> $expected, but
got $actual from $cache"
+ assert(expected.ui === actual.ui, errorText + " SparkUI reference")
+ assert(expected.completed === actual.completed, errorText + "
-completed flag")
+ assert(expected.probeTime === actual.probeTime, errorText + "
-timestamp")
+ }
+
+ /**
+ * Assert that a key wasn't found in cache or loaded.
+ *
+ * Looks for the specific nested exception raised by [[ApplicationCache]]
+ * @param appId application ID
+ * @param attemptId attempt ID
+ * @param cache app cache
+ */
+ def assertNotFound(appId: String, attemptId: Option[String])
+ (implicit cache: ApplicationCache): Unit = {
--- End diff --
nit: each arg on its own line
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]