[GitHub] spark pull request: [SPARK-7889] [CORE] HistoryServer to refresh c...

2016-02-12 Thread steveloughran
Github user steveloughran closed the pull request at:

https://github.com/apache/spark/pull/6935


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7889] [CORE] HistoryServer to refresh c...

2016-02-11 Thread squito
Github user squito commented on the pull request:

https://github.com/apache/spark/pull/6935#issuecomment-183169031
  
ps can you close this one now?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7889] [CORE] HistoryServer to refresh c...

2016-02-08 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/6935#discussion_r52184854
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/ApplicationCache.scala ---
@@ -0,0 +1,668 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.util.NoSuchElementException
+import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
+import javax.servlet.{DispatcherType, Filter, FilterChain, FilterConfig, 
ServletException, ServletRequest, ServletResponse}
+
+import scala.collection.JavaConverters._
+import scala.util.control.NonFatal
+
+import com.codahale.metrics.{Counter, MetricRegistry, Timer}
+import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache, 
RemovalListener, RemovalNotification}
+import org.eclipse.jetty.servlet.FilterHolder
+
+import org.apache.spark.Logging
+import org.apache.spark.metrics.source.Source
+import org.apache.spark.ui.SparkUI
+import org.apache.spark.util.Clock
+
+/**
+ * Cache for applications.
+ *
+ * Completed applications are cached for as long as there is capacity for 
them.
+ * Incompleted applications have their update time checked on every
+ * retrieval; if the cached entry is out of date, it is refreshed.
+ *
+ * @note there must be only one instance of [[ApplicationCache]] in a
+ * JVM at a time. This is because a static field in 
[[ApplicationCacheCheckFilterRelay]]
+ * keeps a reference to the cache so that HTTP requests on the 
attempt-specific web UIs
+ * can probe the current cache to see if the attempts have changed.
+ *
+ * Creating multiple instances will break this routing.
+ * @param operations implementation of record access operations
+ * @param refreshInterval interval between refreshes in milliseconds.
+ * @param retainedApplications number of retained applications
+ * @param clock time source
+ */
+private[history] class ApplicationCache(
+val operations: ApplicationCacheOperations,
+val refreshInterval: Long,
+val retainedApplications: Int,
+val clock: Clock) extends Logging {
+
+  /**
+   * Services the load request from the cache.
+   */
+  private val appLoader = new CacheLoader[CacheKey, CacheEntry] {
+
+/** the cache key doesn't match an cached entry ... attempt to load it 
 */
+override def load(key: CacheKey): CacheEntry = {
+  loadApplicationEntry(key.appId, key.attemptId)
+}
+
+  }
+
+  /**
+   * Handler for callbacks from the cache of entry removal.
+   */
+  private val removalListener = new RemovalListener[CacheKey, CacheEntry] {
+
+/**
+ * Removal event notifies the provider to detach the UI.
+ * @param rm removal notification
+ */
+override def onRemoval(rm: RemovalNotification[CacheKey, CacheEntry]): 
Unit = {
+  metrics.evictionCount.inc()
+  val key = rm.getKey
+  logDebug(s"Evicting entry ${key}")
+  operations.detachSparkUI(key.appId, key.attemptId, rm.getValue().ui)
+}
+  }
+
+  /**
+   * The cache of applications.
+   *
+   * Tagged as `protected` so as to allow subclasses in tests to accesss 
it directly
+   */
+  protected val appCache: LoadingCache[CacheKey, CacheEntry] = {
+CacheBuilder.newBuilder()
+.maximumSize(retainedApplications)
+.removalListener(removalListener)
+.build(appLoader)
+  }
+
+  /**
+   * The metrics which are updated as the cache is used.
+   */
+  val metrics = new CacheMetrics("history.cache")
+
+  init()
+
+  /**
+   * Perform any startup operations.
+   *
+   * This includes declaring this instance as the cache to use in the
+   * [[ApplicationCacheCheckFilterRelay]].
+   */
+  private def init(): Unit = {
+ApplicationCacheCheckFilterRelay.setApplicationCache(this)
   

[GitHub] spark pull request: [SPARK-7889] [CORE] HistoryServer to refresh c...

2016-02-08 Thread squito
Github user squito commented on the pull request:

https://github.com/apache/spark/pull/6935#issuecomment-181426395
  
Jenkins, test this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7889] [CORE] HistoryServer to refresh c...

2016-02-08 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/6935#issuecomment-181440993
  
**[Test build #2522 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/2522/consoleFull)**
 for PR 6935 at commit 
[`728b12c`](https://github.com/apache/spark/commit/728b12c53b8b933a325b0777b185c23f77063e76).
 * This patch **fails Scala style tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7889] [CORE] HistoryServer to refresh c...

2016-02-08 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/6935#issuecomment-181440518
  
**[Test build #2522 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/2522/consoleFull)**
 for PR 6935 at commit 
[`728b12c`](https://github.com/apache/spark/commit/728b12c53b8b933a325b0777b185c23f77063e76).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7889] [CORE] HistoryServer to refresh c...

2016-02-08 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/6935#discussion_r52184490
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/ApplicationCache.scala ---
@@ -0,0 +1,668 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.util.NoSuchElementException
+import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
+import javax.servlet.{DispatcherType, Filter, FilterChain, FilterConfig, 
ServletException, ServletRequest, ServletResponse}
+
+import scala.collection.JavaConverters._
+import scala.util.control.NonFatal
+
+import com.codahale.metrics.{Counter, MetricRegistry, Timer}
+import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache, 
RemovalListener, RemovalNotification}
+import org.eclipse.jetty.servlet.FilterHolder
+
+import org.apache.spark.Logging
+import org.apache.spark.metrics.source.Source
+import org.apache.spark.ui.SparkUI
+import org.apache.spark.util.Clock
+
+/**
+ * Cache for applications.
+ *
+ * Completed applications are cached for as long as there is capacity for 
them.
+ * Incompleted applications have their update time checked on every
+ * retrieval; if the cached entry is out of date, it is refreshed.
+ *
+ * @note there must be only one instance of [[ApplicationCache]] in a
+ * JVM at a time. This is because a static field in 
[[ApplicationCacheCheckFilterRelay]]
+ * keeps a reference to the cache so that HTTP requests on the 
attempt-specific web UIs
+ * can probe the current cache to see if the attempts have changed.
+ *
+ * Creating multiple instances will break this routing.
+ * @param operations implementation of record access operations
+ * @param refreshInterval interval between refreshes in milliseconds.
+ * @param retainedApplications number of retained applications
+ * @param clock time source
+ */
+private[history] class ApplicationCache(
+val operations: ApplicationCacheOperations,
+val refreshInterval: Long,
+val retainedApplications: Int,
+val clock: Clock) extends Logging {
+
+  /**
+   * Services the load request from the cache.
+   */
+  private val appLoader = new CacheLoader[CacheKey, CacheEntry] {
+
+/** the cache key doesn't match an cached entry ... attempt to load it 
 */
+override def load(key: CacheKey): CacheEntry = {
+  loadApplicationEntry(key.appId, key.attemptId)
+}
+
+  }
+
+  /**
+   * Handler for callbacks from the cache of entry removal.
+   */
+  private val removalListener = new RemovalListener[CacheKey, CacheEntry] {
+
+/**
+ * Removal event notifies the provider to detach the UI.
+ * @param rm removal notification
+ */
+override def onRemoval(rm: RemovalNotification[CacheKey, CacheEntry]): 
Unit = {
+  metrics.evictionCount.inc()
+  val key = rm.getKey
+  logDebug(s"Evicting entry ${key}")
+  operations.detachSparkUI(key.appId, key.attemptId, rm.getValue().ui)
+}
+  }
+
+  /**
+   * The cache of applications.
+   *
+   * Tagged as `protected` so as to allow subclasses in tests to accesss 
it directly
+   */
+  protected val appCache: LoadingCache[CacheKey, CacheEntry] = {
+CacheBuilder.newBuilder()
+.maximumSize(retainedApplications)
+.removalListener(removalListener)
+.build(appLoader)
+  }
+
+  /**
+   * The metrics which are updated as the cache is used.
+   */
+  val metrics = new CacheMetrics("history.cache")
+
+  init()
+
+  /**
+   * Perform any startup operations.
+   *
+   * This includes declaring this instance as the cache to use in the
+   * [[ApplicationCacheCheckFilterRelay]].
+   */
+  private def init(): Unit = {
+ApplicationCacheCheckFilterRelay.setApplicationCache(this)
   

[GitHub] spark pull request: [SPARK-7889] [CORE] HistoryServer to refresh c...

2016-02-08 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/6935#discussion_r52202867
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -415,8 +488,59 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 }
 newIterator.foreach(addIfAbsent)
 oldIterator.foreach(addIfAbsent)
+logDebug(s" completed apps=${mergedApps.count(_._2.completed)}")
+logDebug(s" incomplete apps=${mergedApps.count(!_._2.completed)}")
+mergedApps
+  }
+
 
-applications = mergedApps
+  /**
+   * Build list of incomplete apps that have been updated since they were 
last examined.
+   *
+   * After the scan, if there were any updated attempts, [[applications]] 
is updated
+   * with the new values.
+   *
+   * 1. No attempt to replay the application is made; this scan is a low 
cost operation.
+   * 2. As this overwrites [[applications]] with a new value, it must not 
run concurrently
+   * with the main scan for new applications. That is: it must be in the 
[[checkForLogs()]]
+   * operation.
+   * 3. If an attempt's files are no longer present, the existing attempt 
is not considered
+   * out of date or otherwise modified.
+   */
+  private[history] def scanAndUpdateIncompleteAttemptInfo(): Unit = {
+val newAttempts: Iterable[FsApplicationAttemptInfo] = applications
+.filter( e => !e._2.completed)
+.flatMap { e =>
+  // build list of (false, attempt) or (true, attempt') values
+  e._2.attempts.flatMap { prevInfo: FsApplicationAttemptInfo =>
+val path = new Path(logDir, prevInfo.logPath)
+try {
+  val status = fs.getFileStatus(path)
+  val size = status.getLen
+  val aS = prevInfo.fileSize
+  if (size > aS) {
+logDebug(s"Attempt ${prevInfo.name}/${prevInfo.appId} size 
=> $size")
+Some(new FsApplicationAttemptInfo(prevInfo.logPath, 
prevInfo.name, prevInfo.appId,
+  prevInfo.attemptId, prevInfo.startTime, 
prevInfo.endTime, prevInfo.lastUpdated,
+  prevInfo.sparkUser, prevInfo.completed, size))
--- End diff --

this is part which got me thinking about the change the alternate version 
in https://github.com/apache/spark/pull/8.  I thought it seemed wrong that 
you were using `prevInfo.completed` -- if the app had transitioned to 
completed, woudl't you need to put the new status in?  Well, it turns out you 
don't, because `checkForLogs` / `mergeApplicationListings` will already re-scan 
the existing, in-progress attempts, and update them if necessary (though there 
is a filesize vs. timestamp issue).  And that is ultimately what transitions 
the app to complete status, since this can't do it.

And that led me to think that maybe we should just leverage that scan 
behavior rather than doing something more complicated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7889] [CORE] HistoryServer to refresh c...

2016-02-05 Thread steveloughran
Github user steveloughran commented on the pull request:

https://github.com/apache/spark/pull/6935#issuecomment-180374115
  
rebased against master; switch from scan of HTML view to REST API to 
enumerate listings of complete/incomplete apps, add @squito's ? arg redirection 
and test


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7889] [CORE] HistoryServer to refresh c...

2016-02-04 Thread steveloughran
Github user steveloughran commented on the pull request:

https://github.com/apache/spark/pull/6935#issuecomment-179849703
  
...thx for the feedback. I'm fixing the merge which is now triggering a 
regression —maybe a race condition in test startup—

apps should go from incomplete -> complete; that's one of the recurrent 
issues we have today. When the attempt is replayed, it's complete flag is 
changed and the fs history `applications` field updated. When the history 
server asks for list of apps, it gets the updated histories, and so picks up 
the change


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7889] [CORE] HistoryServer to refresh c...

2016-02-04 Thread steveloughran
Github user steveloughran commented on the pull request:

https://github.com/apache/spark/pull/6935#issuecomment-179850363
  
..I should add that it depends on the head attempt on the list being 
complete; the filter in HistoryServer is very sensitive to ordering. If there's 
an incomplete history older than a complete one, the app is considered 
incomplete


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7889] [CORE] HistoryServer to refresh c...

2016-02-01 Thread squito
Github user squito commented on the pull request:

https://github.com/apache/spark/pull/6935#issuecomment-178139026
  
jenkins, add to whitelist


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7889] [CORE] HistoryServer to refresh c...

2016-02-01 Thread squito
Github user squito commented on the pull request:

https://github.com/apache/spark/pull/6935#issuecomment-178142661
  
hi @steveloughran sorry for another really long delay on my end.  mostly 
this looks fine, there are some style nits and a couple of comments that need 
updating.  I also looked into the query param thing -- I played with it a bit 
more and realized its kind of nuisance to test, but I did write a test on 
`ApplicationCache` for it which I'll send to you.

I just had one more concern with the last read-through -- what happens when 
an app goes from incomplete to complete?  I did some manual testing, and things 
seem to work.  But, I have a fear that there is some lingering state that isn't 
getting cleaned up.  I will try to walk through things more carefully but maybe 
you understand it well enough that you can reassure me (or perhaps you should 
take another look yourself as well ...)

and of course, there are now merge conflicts which need to be fixed, sorry.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7889] [CORE] HistoryServer to refresh c...

2016-02-01 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/6935#discussion_r51446687
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -42,6 +42,35 @@ import org.apache.spark.util.{Clock, SystemClock, 
ThreadUtils, Utils}
  * A class that provides application history from event logs stored in the 
file system.
  * This provider checks for new finished applications in the background 
periodically and
  * renders the history application UI by parsing the associated event logs.
+ *
+ * ==How new and updated attempts are detected==
+ *
+ * - New attempts are detected in [[checkForLogs]]: the log dir is 
scanned, and any
+ * entries in the log dir whose modification time is greater than the last 
scan time
+ * are considered new or updated. These are replayed to create a new 
[[FsApplicationAttemptInfo]]
+ * entry and update or create a matching [[FsApplicationHistoryInfo]] 
element in the list
+ * of applications.
+ * - Updated attempts are checked by scanning all known attempts, and if 
their file size
+ * has changed, considering them as updated. A new 
[[FsApplicationAttemptInfo]] instance
+ * is created copying over all the original data, the current size, and an 
incremented version
+ * counter. Accordingly, the fact the attempt is updated is detected, but 
there is no replay
+ * cost.
+ * - When [[updateProbe()]] is invoked to check if a loaded [[SparkUI]]
+ * instance is out of date, the version counter of the application attempt 
loaded is
+ * compared with that attempt's current value; the loaded UI is considered 
out of date
+ * if its version is less than that of the current listing.
+ *
+ * The use of a version counter, rather than simply relying on 
modification times, is needed to
+ * address the following issues
+ * - some filesystems do not appear to update the `modtime` value whenever 
data is flushed to
+ * an open file output stream. Changes to the history may not be picked up.
+ * - the granularity of the `modtime` field may be 2+ seconds. Rapid 
changes to the FS can be
+ * missed.
--- End diff --

the coments about version counters is out of date now, you can replace that 
with file size.  (definitely keep the explanation of why modtime is 
insufficient.)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7889] [CORE] HistoryServer to refresh c...

2016-02-01 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/6935#discussion_r51449903
  
--- Diff: 
core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala 
---
@@ -0,0 +1,484 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.util.{Date, NoSuchElementException}
+import javax.servlet.Filter
+
+import scala.collection.mutable
+import scala.collection.mutable.ListBuffer
+import scala.language.postfixOps
+
+import com.codahale.metrics.Counter
+import com.google.common.cache.LoadingCache
+import com.google.common.util.concurrent.UncheckedExecutionException
+import org.eclipse.jetty.servlet.ServletContextHandler
+import org.mockito.Mockito._
+import org.scalatest.Matchers
+import org.scalatest.mock.MockitoSugar
+
+import org.apache.spark.status.api.v1.{ApplicationAttemptInfo => 
AttemptInfo, ApplicationInfo}
+import org.apache.spark.ui.SparkUI
+import org.apache.spark.util.{Clock, ManualClock, Utils}
+import org.apache.spark.{Logging, SparkFunSuite}
--- End diff --

class imports sort first, so this line goes first in the "spark" group (the 
style checker will complain about this now)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7889] [CORE] HistoryServer to refresh c...

2016-02-01 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/6935#discussion_r51451010
  
--- Diff: 
core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala ---
@@ -282,6 +297,197 @@ class HistoryServerSuite extends SparkFunSuite with 
BeforeAndAfter with Matchers
 all (siteRelativeLinks) should startWith (uiRoot)
   }
 
+  test("incomplete apps get refreshed") {
+
+implicit val webDriver: WebDriver = new HtmlUnitDriver
+implicit val formats = org.json4s.DefaultFormats
+
+// this test dir is explictly deleted on successful runs; retained for 
diagnostics when
+// not
+val logDir = 
Utils.createDirectory(System.getProperty("java.io.tmpdir", "logs"))
+
+// a new conf is used with the background thread set and running at 
its fastest
+// alllowed refresh rate (1Hz)
+val myConf = new SparkConf()
+  .set("spark.history.fs.logDirectory", logDir.getAbsolutePath)
+  .set("spark.eventLog.dir", logDir.getAbsolutePath)
+  .set("spark.history.fs.update.interval", "1s")
+  .set("spark.eventLog.enabled", "true")
+  .set("spark.history.cache.window", "250ms")
+  .remove("spark.testing")
+val provider = new FsHistoryProvider(myConf)
+val securityManager = new SecurityManager(myConf)
+
+sc = new SparkContext("local", "test", myConf)
+val logDirUri = logDir.toURI
+val logDirPath = new Path(logDirUri)
+val fs = FileSystem.get(logDirUri, sc.hadoopConfiguration)
+
+def listDir(dir: Path): Seq[FileStatus] = {
+  val statuses = fs.listStatus(dir)
+  statuses.flatMap(
+stat => if (stat.isDirectory) listDir(stat.getPath) else Seq(stat))
+}
+
+def dumpLogDir(msg: String = ""): Unit = {
+  if (log.isDebugEnabled) {
+logDebug(msg)
+listDir(logDirPath).foreach { status =>
+  val s = status.toString
+  logDebug(s)
+}
+  }
+}
+
+// stop the server with the old config, and start the new one
+server.stop()
+server = new HistoryServer(myConf, provider, securityManager, 18080)
+server.initialize()
+server.bind()
+val port = server.boundPort
+val metrics = server.cacheMetrics
+
+// assert that a metric has a value; if not dump the whole metrics 
instance
+def assertMetric(name: String, counter: Counter, expected: Long): Unit 
= {
+  val actual = counter.getCount
+  if (actual != expected) {
+// this is here because Scalatest loses stack depth
+fail(s"Wrong $name value - expected $expected but got $actual" +
+s" in metrics\n$metrics")
+  }
+}
+
+// build a URL for an app or app/attempt plus a page underneath
+def buildURL(appId: String, suffix: String): URL = {
+  new URL(s"http://localhost:$port/history/$appId$suffix;)
+}
+
+// build a rest URL for the application and suffix.
+def applications(appId: String, suffix: String): URL = {
+  new URL(s"http://localhost:$port/api/v1/applications/$appId$suffix;)
+}
+
+val historyServerRoot = new URL(s"http://localhost:$port/;)
+val historyServerIncompleted = new URL(historyServerRoot, 
"?page=1=true")
+
+// assert the body of a URL contains a string; return that body
+def assertUrlContains(url: URL, str: String): String = {
+  val body = HistoryServerSuite.getUrl(url)
+  assert(body.contains(str), s"did not find $str at $url : $body")
+  body
+}
+
+// start initial job
+val d = sc.parallelize(1 to 10)
+d.count()
+val stdInterval = interval(100 milliseconds)
+val appId = eventually(timeout(20 seconds), stdInterval) {
+  val json = getContentAndCode("applications", port)._2.get
+  val apps = parse(json).asInstanceOf[JArray].arr
+  apps should have size 1
+  (apps.head \ "id").extract[String]
+}
+
+// which lists as incomplete
+assertUrlContains(historyServerIncompleted, appId)
+
+val appIdRoot = buildURL(appId, "")
+val rootAppPage = HistoryServerSuite.getUrl(appIdRoot)
+logDebug(s"$appIdRoot ->[${rootAppPage.length}] \n$rootAppPage")
+// sanity check to make sure filter is chaining calls
+rootAppPage should not be empty
+
+def getAppUI: SparkUI = {
+  provider.getAppUI(appId, None).get.ui
+}
+
+// selenium isn't that useful on failures...add our own reporting
+def getNumJobs(suffix: String): Int = {
+  val target = buildURL(appId, suffix)
+  val targetBody = HistoryServerSuite.getUrl(target)
+

[GitHub] spark pull request: [SPARK-7889] [CORE] HistoryServer to refresh c...

2016-02-01 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/6935#discussion_r51445356
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/ApplicationCache.scala ---
@@ -0,0 +1,658 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.util.NoSuchElementException
+
+import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
+import javax.servlet.{DispatcherType, Filter, FilterChain, FilterConfig, 
ServletException, ServletRequest, ServletResponse}
+
+import scala.collection.JavaConverters._
+
+import com.codahale.metrics.{Counter, MetricRegistry, Timer}
+import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache, 
RemovalListener, RemovalNotification}
+import org.eclipse.jetty.servlet.FilterHolder
+
+import org.apache.spark.Logging
+import org.apache.spark.metrics.source.Source
+import org.apache.spark.ui.SparkUI
+import org.apache.spark.util.Clock
+
+/**
+ * Cache for applications.
+ *
+ * Completed applications are cached for as long as there is capacity for 
them.
+ * Incompleted applications have their update time checked on every
+ * retrieval; if the cached entry is out of date, it is refreshed.
+ *
+ * @note there must be only one instance of [[ApplicationCache]] in a
+ * JVM at a time. This is because a static field in 
[[ApplicationCacheCheckFilterRelay]]
+ * keeps a reference to the cache so that HTTP requests on the 
attempt-specific web UIs
+ * can probe the current cache to see if the attempts have changed.
+ *
+ * Creating multiple instances will break this routing.
+ * @param operations implementation of record access operations
+ * @param refreshInterval interval between refreshes in milliseconds.
+ * @param retainedApplications number of retained applications
+ * @param clock time source
+ */
+private[history] class ApplicationCache(
+val operations: ApplicationCacheOperations,
+val refreshInterval: Long,
+val retainedApplications: Int,
+val clock: Clock) extends Logging {
+
+  /**
+   * Services the load request from the cache.
+   */
+  private val appLoader = new CacheLoader[CacheKey, CacheEntry] {
+
+/** the cache key doesn't match an cached entry ... attempt to load it 
 */
+override def load(key: CacheKey): CacheEntry = {
+  loadApplicationEntry(key.appId, key.attemptId)
+}
+
+  }
+
+  /**
+   * Handler for callbacks from the cache of entry removal.
+   */
+  private val removalListener = new RemovalListener[CacheKey, CacheEntry] {
+
+/**
+ * Removal event notifies the provider to detach the UI.
+ * @param rm removal notification
+ */
+override def onRemoval(rm: RemovalNotification[CacheKey, CacheEntry]): 
Unit = {
+  metrics.evictionCount.inc()
+  val key = rm.getKey
+  logDebug(s"Evicting entry ${key}")
+  operations.detachSparkUI(key.appId, key.attemptId, rm.getValue().ui)
+}
+  }
+
+  /**
+   * The cache of applications.
+   *
+   * Tagged as `protected` so as to allow subclasses in tests to accesss 
it directly
+   */
+  protected val appCache: LoadingCache[CacheKey, CacheEntry] = {
+CacheBuilder.newBuilder()
+.maximumSize(retainedApplications)
+.removalListener(removalListener)
+.build(appLoader)
+  }
+
+  /**
+   * The metrics which are updated as the cache is used
+   */
+  val metrics = new CacheMetrics("history.cache")
+
+  init()
+
+  /**
+   * Perform any startup operations.
+   *
+   * This includes declaring this instance as the cache to use in the
+   * [[ApplicationCacheCheckFilterRelay]].
+   */
+  private def init(): Unit = {
+ApplicationCacheCheckFilterRelay.setApplicationCache(this)
+  }
+
+  /**
+   * 

[GitHub] spark pull request: [SPARK-7889] [CORE] HistoryServer to refresh c...

2016-02-01 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/6935#discussion_r51444707
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -430,8 +517,55 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 }
 newIterator.foreach(addIfAbsent)
 oldIterator.foreach(addIfAbsent)
+mergedApps
+  }
+
+
+  /**
+   * Scan through all known incomplete applications, and check for any 
that have been updated.
+   *
+   * After the scan, if there were any updated attempts, [[applications]] 
is updated
+   * with the new values.
+   *
+   * 1. No attempt to replay the application is made; this scan is a low 
cost operation.
+   * 2. As this overwrites [[applications]] with a new value, it must not 
run concurrently
+   * with the main scan for new applications. That is: it must be in the 
[[checkForLogs()]]
+   * operation.
+   */
+  private[history] def scanAndUpdateIncompleteAttemptInfo(): Unit = {
+val newAttempts: Iterable[FsApplicationAttemptInfo] = applications
+.filter( e => !e._2.completed)
+.flatMap { e =>
+  // build list of (false, attempt) or (true, attempt') values
--- End diff --

looks like this was never updated


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7889] [CORE] HistoryServer to refresh c...

2016-02-01 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/6935#discussion_r51447527
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -666,6 +804,40 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 dfs.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_GET)
   }
 
+  /**
+   * String description for diagnostics
+   * @return a summary of the component state
+   */
+  override def toString: String = {
+val header = s"""
+  | FsHistoryProvider: logdir=$logDir,
+  | last scan time=$lastScanTime
+  | Cached application count =${applications.size}}
+""".stripMargin
+val sb = new StringBuilder(header)
+applications.foreach(entry => sb.append(entry._2).append("\n"))
+sb.toString
+  }
+
+  /**
+   * The update probe of the is the generational counter of attempts:
+   * if the filesize is less than that of the latest attempt's size, it is 
out of date.
+   * @param appId application to probe
+   * @param attemptId attempt to probe
+   * @param fileSize the file size of the last attempt's logs
+   */
+  private def updateProbe(
+appId: String,
+attemptId: Option[String],
+fileSize: Long)(): Boolean = {
--- End diff --

rename to `prevFileSize`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7889] [CORE] HistoryServer to refresh c...

2016-02-01 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/6935#discussion_r51447480
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -666,6 +804,40 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 dfs.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_GET)
   }
 
+  /**
+   * String description for diagnostics
+   * @return a summary of the component state
+   */
+  override def toString: String = {
+val header = s"""
+  | FsHistoryProvider: logdir=$logDir,
+  | last scan time=$lastScanTime
+  | Cached application count =${applications.size}}
+""".stripMargin
+val sb = new StringBuilder(header)
+applications.foreach(entry => sb.append(entry._2).append("\n"))
+sb.toString
+  }
+
+  /**
+   * The update probe of the is the generational counter of attempts:
+   * if the filesize is less than that of the latest attempt's size, it is 
out of date.
--- End diff --

also remove the comment on generational counter here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7889] [CORE] HistoryServer to refresh c...

2016-02-01 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/6935#discussion_r51447272
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -613,13 +739,23 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
   private def getModificationTime(fsEntry: FileStatus): Option[Long] = {
 if (isLegacyLogDirectory(fsEntry)) {
   val statusList = fs.listStatus(fsEntry.getPath)
-  if (!statusList.isEmpty) 
Some(statusList.map(_.getModificationTime()).max) else None
+  if (statusList.nonEmpty) 
Some(statusList.map(_.getModificationTime()).max) else None
 } else {
   Some(fsEntry.getModificationTime())
 }
   }
 
   /**
+   * Get the size of the log, or `None` if there isn't one in the child
+   * directory of a legacy log entry
--- End diff --

I think the comment should just be " ... `None` for a legacy log entry".  
That is, if there is a legacy log entry, you don't try to look at any of the 
files in the directory, so you'll never update incompleted apps.  Which I think 
is fine.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7889] [CORE] HistoryServer to refresh c...

2016-02-01 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/6935#discussion_r51450618
  
--- Diff: 
core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala ---
@@ -44,7 +57,7 @@ import org.apache.spark.util.ResetSystemProperties
  * are considered part of Spark's public api.
  */
 class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with 
Matchers with MockitoSugar
-  with JsonTestUtils with ResetSystemProperties {
+  with JsonTestUtils with Eventually with WebBrowser with 
LocalSparkContext with ResetSystemProperties {
--- End diff --

line too long


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7889] [CORE] HistoryServer to refresh c...

2016-02-01 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/6935#discussion_r51450584
  
--- Diff: 
core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala ---
@@ -21,16 +21,29 @@ import java.net.{HttpURLConnection, URL}
 import java.util.zip.ZipInputStream
 import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
 
+import scala.concurrent.duration._
+import scala.language.postfixOps
+
+import com.codahale.metrics.Counter
 import com.google.common.base.Charsets
 import com.google.common.io.{ByteStreams, Files}
 import org.apache.commons.io.{FileUtils, IOUtils}
+import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
+import org.json4s.JsonAST.JArray
+import org.json4s.jackson.JsonMethods._
 import org.mockito.Mockito.when
+import org.openqa.selenium.htmlunit.HtmlUnitDriver
+import org.openqa.selenium.WebDriver
+import org.scalatest.concurrent.Eventually
+import org.scalatest.selenium.WebBrowser
 import org.scalatest.{BeforeAndAfter, Matchers}
 import org.scalatest.mock.MockitoSugar
--- End diff --

ordering of the scalatest import


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7889] [CORE] HistoryServer to refresh c...

2016-02-01 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/6935#discussion_r51444027
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/ApplicationCache.scala ---
@@ -0,0 +1,667 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.util.NoSuchElementException
+import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
+import javax.servlet.{DispatcherType, Filter, FilterChain, FilterConfig, 
ServletException, ServletRequest, ServletResponse}
+
+import scala.collection.JavaConverters._
+import scala.util.control.NonFatal
+
+import com.codahale.metrics.{Counter, MetricRegistry, Timer}
+import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache, 
RemovalListener, RemovalNotification}
+import org.eclipse.jetty.servlet.FilterHolder
+
+import org.apache.spark.Logging
+import org.apache.spark.metrics.source.Source
+import org.apache.spark.ui.SparkUI
+import org.apache.spark.util.Clock
+
+/**
+ * Cache for applications.
+ *
+ * Completed applications are cached for as long as there is capacity for 
them.
+ * Incompleted applications have their update time checked on every
+ * retrieval; if the cached entry is out of date, it is refreshed.
+ *
+ * @note there must be only one instance of [[ApplicationCache]] in a
+ * JVM at a time. This is because a static field in 
[[ApplicationCacheCheckFilterRelay]]
+ * keeps a reference to the cache so that HTTP requests on the 
attempt-specific web UIs
+ * can probe the current cache to see if the attempts have changed.
+ *
+ * Creating multiple instances will break this routing.
+ * @param operations implementation of record access operations
+ * @param refreshInterval interval between refreshes in milliseconds.
+ * @param retainedApplications number of retained applications
+ * @param clock time source
+ */
+private[history] class ApplicationCache(
+val operations: ApplicationCacheOperations,
+val refreshInterval: Long,
+val retainedApplications: Int,
+val clock: Clock) extends Logging {
+
+  /**
+   * Services the load request from the cache.
+   */
+  private val appLoader = new CacheLoader[CacheKey, CacheEntry] {
+
+/** the cache key doesn't match an cached entry ... attempt to load it 
 */
--- End diff --

can you add to the comment -- "or the cached entry has expired for an 
incomplete app"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7889] [CORE] HistoryServer to refresh c...

2016-02-01 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/6935#discussion_r51446839
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -386,24 +437,49 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
   }
 }
 
-if (newAttempts.isEmpty) {
-  return
+updateApplicationsWithNewAttempts(newAttempts)
+  }
+
+  /**
+   * Merge in all new attempts with those in [[applications]], updating 
the [[applications]]
+   * field afterwards. It _must not_ be executed concurrently, else 
attempt information may
+   * be lost.
+   * @param newAttempts a possibly empty list of new attempts
+   */
+  private def updateApplicationsWithNewAttempts(
+  newAttempts: Iterable[FsApplicationAttemptInfo]): Unit = {
+if (newAttempts.nonEmpty) {
+  applications = mergeAttempts(newAttempts, applications)
 }
+  }
+
+  /**
+   * Build a map containing all apps that contain new attempts. The app 
information in this map
+   * contains both the new app attempt, and those that were already loaded 
in the existing apps
+   * map. If an attempt has been updated, it replaces the old attempt in 
the list.
+   * The ordering is maintained
+   * @param newAttempts new attempt list
+   * @param current the current attempt list
+   * @return the updated list
+   */
+  private def mergeAttempts(
+  newAttempts: Iterable[FsApplicationAttemptInfo],
+  current: mutable.LinkedHashMap[String, FsApplicationHistoryInfo])
+  : mutable.LinkedHashMap[String, FsApplicationHistoryInfo] = {
 
-// Build a map containing all apps that contain new attempts. The app 
information in this map
-// contains both the new app attempt, and those that were already 
loaded in the existing apps
-// map. If an attempt has been updated, it replaces the old attempt in 
the list.
 val newAppMap = new mutable.HashMap[String, FsApplicationHistoryInfo]()
 newAttempts.foreach { attempt =>
   val appInfo = newAppMap.get(attempt.appId)
-.orElse(applications.get(attempt.appId))
+.orElse(current.get(attempt.appId))
 .map { app =>
   val attempts =
-app.attempts.filter(_.attemptId != attempt.attemptId).toList 
++ List(attempt)
+app.attempts.filter(_.attemptId != attempt.attemptId) ++ 
List(attempt)
+  var sortedAttempts = attempts.sortWith(compareAttemptInfo)
--- End diff --

can be a `val`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7889] [CORE] HistoryServer to refresh c...

2016-02-01 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/6935#discussion_r51450652
  
--- Diff: 
core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala ---
@@ -21,16 +21,29 @@ import java.net.{HttpURLConnection, URL}
 import java.util.zip.ZipInputStream
 import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
 
+import scala.concurrent.duration._
+import scala.language.postfixOps
+
+import com.codahale.metrics.Counter
 import com.google.common.base.Charsets
 import com.google.common.io.{ByteStreams, Files}
 import org.apache.commons.io.{FileUtils, IOUtils}
+import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
+import org.json4s.JsonAST.JArray
+import org.json4s.jackson.JsonMethods._
 import org.mockito.Mockito.when
+import org.openqa.selenium.htmlunit.HtmlUnitDriver
+import org.openqa.selenium.WebDriver
+import org.scalatest.concurrent.Eventually
+import org.scalatest.selenium.WebBrowser
 import org.scalatest.{BeforeAndAfter, Matchers}
 import org.scalatest.mock.MockitoSugar
 
-import org.apache.spark.{JsonTestUtils, SecurityManager, SparkConf, 
SparkFunSuite}
 import org.apache.spark.ui.{SparkUI, UIUtils}
+import org.apache.spark.ui.jobs.UIData.JobUIData
 import org.apache.spark.util.ResetSystemProperties
+import org.apache.spark._
--- End diff --

first in the spark group


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7889] [CORE] HistoryServer to refresh c...

2016-02-01 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/6935#discussion_r51451496
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/ApplicationCache.scala ---
@@ -0,0 +1,667 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.util.NoSuchElementException
+import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
+import javax.servlet.{DispatcherType, Filter, FilterChain, FilterConfig, 
ServletException, ServletRequest, ServletResponse}
--- End diff --

import order (above javax.servlet.http)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7889] [CORE] HistoryServer to refresh c...

2016-01-28 Thread steveloughran
Github user steveloughran commented on the pull request:

https://github.com/apache/spark/pull/6935#issuecomment-176209826
  
@squito —have you had a chance to look at the latest version? I think 
I've addressed all your issues, and it is building and testing against the 
current master


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7889] [CORE] HistoryServer to refresh c...

2016-01-26 Thread steveloughran
Github user steveloughran commented on the pull request:

https://github.com/apache/spark/pull/6935#issuecomment-175048241
  
I'm going to add a warning note here pointing to 
[HDFS-5478](https://issues.apache.org/jira/browse/HDFS-5478), some versions of 
HDFS aren't picking up changes immediately. I think this is older versions of 
Hadoop, not the current 2.6+ line.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7889] [CORE] HistoryServer to refresh c...

2015-12-29 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/6935#discussion_r48548650
  
--- Diff: 
core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala 
---
@@ -0,0 +1,476 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.util.{Date, NoSuchElementException}
+
+import javax.servlet.Filter
+
+import scala.collection.mutable
+import scala.collection.mutable.ListBuffer
+import scala.language.postfixOps
+
+import com.codahale.metrics.Counter
+import com.google.common.cache.LoadingCache
+import com.google.common.util.concurrent.UncheckedExecutionException
+import org.eclipse.jetty.servlet.ServletContextHandler
+import org.mockito.Mockito._
+import org.scalatest.Matchers
+import org.scalatest.mock.MockitoSugar
+
+import org.apache.spark.status.api.v1.{ApplicationAttemptInfo => 
AttemptInfo, ApplicationInfo}
+import org.apache.spark.ui.SparkUI
+import org.apache.spark.util.{Clock, ManualClock, Utils}
+import org.apache.spark.{Logging, SparkFunSuite}
+
+class ApplicationCacheSuite extends SparkFunSuite with Logging with 
MockitoSugar with Matchers {
+
+  /**
+   * subclass with access to the cache internals
+   * @param refreshInterval interval between refreshes in milliseconds.
+   * @param retainedApplications number of retained applications
+   */
+  class TestApplicationCache(
+  operations: ApplicationCacheOperations = new StubCacheOperations(),
+  refreshInterval: Long,
+  retainedApplications: Int,
+  clock: Clock = new ManualClock(0))
+  extends ApplicationCache(operations, refreshInterval, 
retainedApplications, clock) {
+
+def cache(): LoadingCache[CacheKey, CacheEntry] = appCache
+  }
+
+  /**
+   * Stub cache operations.
+   * The state is kept in a map of [[CacheKey]] to [[CacheEntry]],
+   * the `probeTime` field in the cache entry setting the timestamp of the 
entry
+   */
+  class StubCacheOperations extends ApplicationCacheOperations with 
Logging {
+
+/** map to UI instances, including timestamps, which are used in 
update probes */
+val instances = mutable.HashMap.empty[CacheKey, CacheEntry]
+
+/** Map of attached spark UIs */
+val attached = mutable.HashMap.empty[CacheKey, SparkUI]
+
+var getAppUICount = 0L
+var attachCount = 0L
+var detachCount = 0L
+var updateProbeCount = 0L
+
+/**
+ * Get the application UI
+ * @param appId application ID
+ * @param attemptId attempt ID
+ * @return If found, the Spark UI and any history information to be 
used in the cache
+ */
+override def getAppUI(appId: String, attemptId: Option[String]): 
Option[LoadedAppUI] = {
+  logDebug(s"getAppUI($appId, $attemptId)")
+  getAppUICount += 1
+  instances.get(CacheKey(appId, attemptId)).map( e =>
+LoadedAppUI(e.ui, Some(new 
StubHistoryProviderUpdateState(e.probeTime
+}
+
+override def attachSparkUI(appId: String, attemptId: Option[String], 
ui: SparkUI,
+completed: Boolean): Unit = {
+  logDebug(s"attachSparkUI($appId, $attemptId, $ui)")
+  attachCount += 1
+  attached += (CacheKey(appId, attemptId) -> ui)
+}
+
+def putAndAttach(appId: String, attemptId: Option[String], completed: 
Boolean, started: Long,
+ended: Long, timestamp: Long): SparkUI = {
+  val ui = putAppUI(appId, attemptId, completed, started, ended, 
timestamp)
+  attachSparkUI(appId, attemptId, ui, completed)
+  ui
+}
+
+def putAppUI(appId: String, attemptId: Option[String], completed: 
Boolean, started: Long,
+ended: Long, timestamp: Long): SparkUI = {
+  val ui = newUI(appId, attemptId, completed, started, ended)
+  putInstance(appId, attemptId, ui, completed, 

[GitHub] spark pull request: [SPARK-7889] [CORE] HistoryServer to refresh c...

2015-12-28 Thread squito
Github user squito commented on the pull request:

https://github.com/apache/spark/pull/6935#issuecomment-167676684
  
hi @steveloughran , I think maybe some commits didn't get pushed?  I 
noticed a few places where things didn't seem to quite line up to your 
comments.  (also there is a merge conflict w/ master while you are touching it 
again.)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7889] [CORE] HistoryServer to refresh c...

2015-12-23 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/6935#discussion_r48363721
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/ApplicationCache.scala ---
@@ -0,0 +1,658 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.util.NoSuchElementException
+
+import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
+import javax.servlet.{DispatcherType, Filter, FilterChain, FilterConfig, 
ServletException, ServletRequest, ServletResponse}
+
+import scala.collection.JavaConverters._
+
+import com.codahale.metrics.{Counter, MetricRegistry, Timer}
+import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache, 
RemovalListener, RemovalNotification}
+import org.eclipse.jetty.servlet.FilterHolder
+
+import org.apache.spark.Logging
+import org.apache.spark.metrics.source.Source
+import org.apache.spark.ui.SparkUI
+import org.apache.spark.util.Clock
+
+/**
+ * Cache for applications.
+ *
+ * Completed applications are cached for as long as there is capacity for 
them.
+ * Incompleted applications have their update time checked on every
+ * retrieval; if the cached entry is out of date, it is refreshed.
+ *
+ * @note there must be only one instance of [[ApplicationCache]] in a
+ * JVM at a time. This is because a static field in 
[[ApplicationCacheCheckFilterRelay]]
+ * keeps a reference to the cache so that HTTP requests on the 
attempt-specific web UIs
+ * can probe the current cache to see if the attempts have changed.
+ *
+ * Creating multiple instances will break this routing.
+ * @param operations implementation of record access operations
+ * @param refreshInterval interval between refreshes in milliseconds.
+ * @param retainedApplications number of retained applications
+ * @param clock time source
+ */
+private[history] class ApplicationCache(
+val operations: ApplicationCacheOperations,
+val refreshInterval: Long,
+val retainedApplications: Int,
+val clock: Clock) extends Logging {
+
+  /**
+   * Services the load request from the cache.
+   */
+  private val appLoader = new CacheLoader[CacheKey, CacheEntry] {
+
+/** the cache key doesn't match an cached entry ... attempt to load it 
 */
+override def load(key: CacheKey): CacheEntry = {
+  loadApplicationEntry(key.appId, key.attemptId)
+}
+
+  }
+
+  /**
+   * Handler for callbacks from the cache of entry removal.
+   */
+  private val removalListener = new RemovalListener[CacheKey, CacheEntry] {
+
+/**
+ * Removal event notifies the provider to detach the UI.
+ * @param rm removal notification
+ */
+override def onRemoval(rm: RemovalNotification[CacheKey, CacheEntry]): 
Unit = {
+  metrics.evictionCount.inc()
+  val key = rm.getKey
+  logDebug(s"Evicting entry ${key}")
+  operations.detachSparkUI(key.appId, key.attemptId, rm.getValue().ui)
+}
+  }
+
+  /**
+   * The cache of applications.
+   *
+   * Tagged as `protected` so as to allow subclasses in tests to accesss 
it directly
+   */
+  protected val appCache: LoadingCache[CacheKey, CacheEntry] = {
+CacheBuilder.newBuilder()
+.maximumSize(retainedApplications)
+.removalListener(removalListener)
+.build(appLoader)
+  }
+
+  /**
+   * The metrics which are updated as the cache is used
+   */
+  val metrics = new CacheMetrics("history.cache")
+
+  init()
+
+  /**
+   * Perform any startup operations.
+   *
+   * This includes declaring this instance as the cache to use in the
+   * [[ApplicationCacheCheckFilterRelay]].
+   */
+  private def init(): Unit = {
+ApplicationCacheCheckFilterRelay.setApplicationCache(this)
+  }
+
+  /**

[GitHub] spark pull request: [SPARK-7889] [CORE] HistoryServer to refresh c...

2015-12-23 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/6935#discussion_r48364543
  
--- Diff: 
core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala 
---
@@ -0,0 +1,476 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.util.{Date, NoSuchElementException}
+
+import javax.servlet.Filter
+
+import scala.collection.mutable
+import scala.collection.mutable.ListBuffer
+import scala.language.postfixOps
+
+import com.codahale.metrics.Counter
+import com.google.common.cache.LoadingCache
+import com.google.common.util.concurrent.UncheckedExecutionException
+import org.eclipse.jetty.servlet.ServletContextHandler
+import org.mockito.Mockito._
+import org.scalatest.Matchers
+import org.scalatest.mock.MockitoSugar
+
+import org.apache.spark.status.api.v1.{ApplicationAttemptInfo => 
AttemptInfo, ApplicationInfo}
+import org.apache.spark.ui.SparkUI
+import org.apache.spark.util.{Clock, ManualClock, Utils}
+import org.apache.spark.{Logging, SparkFunSuite}
+
+class ApplicationCacheSuite extends SparkFunSuite with Logging with 
MockitoSugar with Matchers {
+
+  /**
+   * subclass with access to the cache internals
+   * @param refreshInterval interval between refreshes in milliseconds.
+   * @param retainedApplications number of retained applications
+   */
+  class TestApplicationCache(
+  operations: ApplicationCacheOperations = new StubCacheOperations(),
+  refreshInterval: Long,
+  retainedApplications: Int,
+  clock: Clock = new ManualClock(0))
+  extends ApplicationCache(operations, refreshInterval, 
retainedApplications, clock) {
+
+def cache(): LoadingCache[CacheKey, CacheEntry] = appCache
+  }
+
+  /**
+   * Stub cache operations.
+   * The state is kept in a map of [[CacheKey]] to [[CacheEntry]],
+   * the `probeTime` field in the cache entry setting the timestamp of the 
entry
+   */
+  class StubCacheOperations extends ApplicationCacheOperations with 
Logging {
+
+/** map to UI instances, including timestamps, which are used in 
update probes */
+val instances = mutable.HashMap.empty[CacheKey, CacheEntry]
+
+/** Map of attached spark UIs */
+val attached = mutable.HashMap.empty[CacheKey, SparkUI]
+
+var getAppUICount = 0L
+var attachCount = 0L
+var detachCount = 0L
+var updateProbeCount = 0L
+
+/**
+ * Get the application UI
+ * @param appId application ID
+ * @param attemptId attempt ID
+ * @return If found, the Spark UI and any history information to be 
used in the cache
+ */
+override def getAppUI(appId: String, attemptId: Option[String]): 
Option[LoadedAppUI] = {
+  logDebug(s"getAppUI($appId, $attemptId)")
+  getAppUICount += 1
+  instances.get(CacheKey(appId, attemptId)).map( e =>
+LoadedAppUI(e.ui, Some(new 
StubHistoryProviderUpdateState(e.probeTime
+}
+
+override def attachSparkUI(appId: String, attemptId: Option[String], 
ui: SparkUI,
+completed: Boolean): Unit = {
+  logDebug(s"attachSparkUI($appId, $attemptId, $ui)")
+  attachCount += 1
+  attached += (CacheKey(appId, attemptId) -> ui)
+}
+
+def putAndAttach(appId: String, attemptId: Option[String], completed: 
Boolean, started: Long,
+ended: Long, timestamp: Long): SparkUI = {
+  val ui = putAppUI(appId, attemptId, completed, started, ended, 
timestamp)
+  attachSparkUI(appId, attemptId, ui, completed)
+  ui
+}
+
+def putAppUI(appId: String, attemptId: Option[String], completed: 
Boolean, started: Long,
+ended: Long, timestamp: Long): SparkUI = {
+  val ui = newUI(appId, attemptId, completed, started, ended)
+  putInstance(appId, attemptId, ui, 

[GitHub] spark pull request: [SPARK-7889] [CORE] HistoryServer to refresh c...

2015-12-23 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/6935#discussion_r48367142
  
--- Diff: docs/monitoring.md ---
@@ -69,36 +83,53 @@ follows:
   
 
 
+### Spark configuration options
+
 
   Property NameDefaultMeaning
   
 spark.history.provider
-org.apache.spark.deploy.history.FsHistoryProvider
+org.apache.spark.deploy.history.FsHistoryProvider
 Name of the class implementing the application history backend. 
Currently there is only
 one implementation, provided by Spark, which looks for application 
logs stored in the
 file system.
   
   
+spark.history.retainedApplications
+50
--- End diff --

I'm not entirely sure why that ended up there. You could make the case for 
separating fs options from other ones, but really the most important is that 
log dir value, as without it: nothing else works


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7889] [CORE] HistoryServer to refresh c...

2015-12-23 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/6935#discussion_r48368082
  
--- Diff: docs/monitoring.md ---
@@ -69,36 +83,53 @@ follows:
   
 
 
+### Spark configuration options
+
 
   Property NameDefaultMeaning
   
 spark.history.provider
-org.apache.spark.deploy.history.FsHistoryProvider
+org.apache.spark.deploy.history.FsHistoryProvider
 Name of the class implementing the application history backend. 
Currently there is only
 one implementation, provided by Spark, which looks for application 
logs stored in the
 file system.
   
   
+spark.history.retainedApplications
+50
+
+  The number of application UIs to retain. If this cap is exceeded, 
then the oldest
+  applications will be removed.
+
+  
+  
 spark.history.fs.logDirectory
 file:/tmp/spark-events
 
- Directory that contains application event logs to be loaded by the 
history server
+For the filesystem history provider, the URL to the directory 
containing application event
+logs to load. This can be a local file:// path,
+an HDFS path hdfs://namenode/shared/spark-logs
+or that of an alternative filesystem supported by the Hadoop APIs.
 
   
   
 spark.history.fs.update.interval
 10s
 
-  The period at which information displayed by this history server is 
updated.
-  Each update checks for any changes made to the event logs in 
persisted storage.
+  The period at which the the filesystem history provider checks for 
new or
+  updated logs in the log directory. A shorter interval detects new 
applications faster,
+  at the expense of more server load re-reading updated applications.
+  As soon as an update has completed, listings of the completed and 
incomplete applications
+  will reflect the changes. For performance reasons, the UIs of web 
applications are
+  only updated at a slower interval, that defined in 
spark.history.cache.window 
--- End diff --

There's three costs in the system: listing cost, probe cost and replay 
costs.

* listing cost is pretty expensive in the history server, as it replays the 
entire history just to get a few flags which could be  cached alongside 
(completed flag, etc). That's why it can be slow to startup. After startup the 
async replay is only done on changed data. Load on HDFS: negligible.
* probe cost: simply checking the internal state of things updated in the 
update thread., ~0
* replay cost: expensive, O(events), so essentially O(filesize). Again, 
HDFS doesn't notice.

The rationale for having a probe interval is not so much code cost, but 
replay costs: have a 15s probe interval would mean "a user clicking through the 
UI of a busy app could trigger a reload every 15s". I don't have the stats to 
decide good or bad that is, but a longer interval worries me less.

FWIW, the Yarn timeline provider costs are
 -listing cost is  less expensive than for the FS history provider, but it 
does move some of the load into the timeline server (search of database, 
serialization of result).
 -probe cost. ~0 again
 -replay cost., same replay costs as for the FS, but now with json 
serialization and transmission over HTTP to add.

I suspect there you'd want a longer interval for probes, just to keep those 
replays down.

Again: more data is needed here. I've added the metrics to the cache as a 
start to that —add metrics publishing to the history server and this code is 
ready to be hooked up and so show the numbers on cache reload operations


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7889] [CORE] HistoryServer to refresh c...

2015-12-23 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/6935#discussion_r48364727
  
--- Diff: 
core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala 
---
@@ -0,0 +1,476 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.util.{Date, NoSuchElementException}
+
+import javax.servlet.Filter
+
+import scala.collection.mutable
+import scala.collection.mutable.ListBuffer
+import scala.language.postfixOps
+
+import com.codahale.metrics.Counter
+import com.google.common.cache.LoadingCache
+import com.google.common.util.concurrent.UncheckedExecutionException
+import org.eclipse.jetty.servlet.ServletContextHandler
+import org.mockito.Mockito._
+import org.scalatest.Matchers
+import org.scalatest.mock.MockitoSugar
+
+import org.apache.spark.status.api.v1.{ApplicationAttemptInfo => 
AttemptInfo, ApplicationInfo}
+import org.apache.spark.ui.SparkUI
+import org.apache.spark.util.{Clock, ManualClock, Utils}
+import org.apache.spark.{Logging, SparkFunSuite}
+
+class ApplicationCacheSuite extends SparkFunSuite with Logging with 
MockitoSugar with Matchers {
+
+  /**
+   * subclass with access to the cache internals
+   * @param refreshInterval interval between refreshes in milliseconds.
+   * @param retainedApplications number of retained applications
+   */
+  class TestApplicationCache(
+  operations: ApplicationCacheOperations = new StubCacheOperations(),
+  refreshInterval: Long,
+  retainedApplications: Int,
+  clock: Clock = new ManualClock(0))
+  extends ApplicationCache(operations, refreshInterval, 
retainedApplications, clock) {
+
+def cache(): LoadingCache[CacheKey, CacheEntry] = appCache
+  }
+
+  /**
+   * Stub cache operations.
+   * The state is kept in a map of [[CacheKey]] to [[CacheEntry]],
+   * the `probeTime` field in the cache entry setting the timestamp of the 
entry
+   */
+  class StubCacheOperations extends ApplicationCacheOperations with 
Logging {
+
+/** map to UI instances, including timestamps, which are used in 
update probes */
+val instances = mutable.HashMap.empty[CacheKey, CacheEntry]
+
+/** Map of attached spark UIs */
+val attached = mutable.HashMap.empty[CacheKey, SparkUI]
+
+var getAppUICount = 0L
+var attachCount = 0L
+var detachCount = 0L
+var updateProbeCount = 0L
+
+/**
+ * Get the application UI
+ * @param appId application ID
+ * @param attemptId attempt ID
+ * @return If found, the Spark UI and any history information to be 
used in the cache
+ */
+override def getAppUI(appId: String, attemptId: Option[String]): 
Option[LoadedAppUI] = {
+  logDebug(s"getAppUI($appId, $attemptId)")
+  getAppUICount += 1
+  instances.get(CacheKey(appId, attemptId)).map( e =>
+LoadedAppUI(e.ui, Some(new 
StubHistoryProviderUpdateState(e.probeTime
+}
+
+override def attachSparkUI(appId: String, attemptId: Option[String], 
ui: SparkUI,
+completed: Boolean): Unit = {
+  logDebug(s"attachSparkUI($appId, $attemptId, $ui)")
+  attachCount += 1
+  attached += (CacheKey(appId, attemptId) -> ui)
+}
+
+def putAndAttach(appId: String, attemptId: Option[String], completed: 
Boolean, started: Long,
+ended: Long, timestamp: Long): SparkUI = {
+  val ui = putAppUI(appId, attemptId, completed, started, ended, 
timestamp)
+  attachSparkUI(appId, attemptId, ui, completed)
+  ui
+}
+
+def putAppUI(appId: String, attemptId: Option[String], completed: 
Boolean, started: Long,
+ended: Long, timestamp: Long): SparkUI = {
+  val ui = newUI(appId, attemptId, completed, started, ended)
+  putInstance(appId, attemptId, ui, 

[GitHub] spark pull request: [SPARK-7889] [CORE] HistoryServer to refresh c...

2015-12-23 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/6935#discussion_r48364720
  
--- Diff: 
core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala 
---
@@ -0,0 +1,476 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.util.{Date, NoSuchElementException}
+
+import javax.servlet.Filter
+
+import scala.collection.mutable
+import scala.collection.mutable.ListBuffer
+import scala.language.postfixOps
+
+import com.codahale.metrics.Counter
+import com.google.common.cache.LoadingCache
+import com.google.common.util.concurrent.UncheckedExecutionException
+import org.eclipse.jetty.servlet.ServletContextHandler
+import org.mockito.Mockito._
+import org.scalatest.Matchers
+import org.scalatest.mock.MockitoSugar
+
+import org.apache.spark.status.api.v1.{ApplicationAttemptInfo => 
AttemptInfo, ApplicationInfo}
+import org.apache.spark.ui.SparkUI
+import org.apache.spark.util.{Clock, ManualClock, Utils}
+import org.apache.spark.{Logging, SparkFunSuite}
+
+class ApplicationCacheSuite extends SparkFunSuite with Logging with 
MockitoSugar with Matchers {
+
+  /**
+   * subclass with access to the cache internals
+   * @param refreshInterval interval between refreshes in milliseconds.
+   * @param retainedApplications number of retained applications
+   */
+  class TestApplicationCache(
+  operations: ApplicationCacheOperations = new StubCacheOperations(),
+  refreshInterval: Long,
+  retainedApplications: Int,
+  clock: Clock = new ManualClock(0))
+  extends ApplicationCache(operations, refreshInterval, 
retainedApplications, clock) {
+
+def cache(): LoadingCache[CacheKey, CacheEntry] = appCache
+  }
+
+  /**
+   * Stub cache operations.
+   * The state is kept in a map of [[CacheKey]] to [[CacheEntry]],
+   * the `probeTime` field in the cache entry setting the timestamp of the 
entry
+   */
+  class StubCacheOperations extends ApplicationCacheOperations with 
Logging {
+
+/** map to UI instances, including timestamps, which are used in 
update probes */
+val instances = mutable.HashMap.empty[CacheKey, CacheEntry]
+
+/** Map of attached spark UIs */
+val attached = mutable.HashMap.empty[CacheKey, SparkUI]
+
+var getAppUICount = 0L
+var attachCount = 0L
+var detachCount = 0L
+var updateProbeCount = 0L
+
+/**
+ * Get the application UI
+ * @param appId application ID
+ * @param attemptId attempt ID
+ * @return If found, the Spark UI and any history information to be 
used in the cache
+ */
+override def getAppUI(appId: String, attemptId: Option[String]): 
Option[LoadedAppUI] = {
+  logDebug(s"getAppUI($appId, $attemptId)")
+  getAppUICount += 1
+  instances.get(CacheKey(appId, attemptId)).map( e =>
+LoadedAppUI(e.ui, Some(new 
StubHistoryProviderUpdateState(e.probeTime
+}
+
+override def attachSparkUI(appId: String, attemptId: Option[String], 
ui: SparkUI,
+completed: Boolean): Unit = {
+  logDebug(s"attachSparkUI($appId, $attemptId, $ui)")
+  attachCount += 1
+  attached += (CacheKey(appId, attemptId) -> ui)
+}
+
+def putAndAttach(appId: String, attemptId: Option[String], completed: 
Boolean, started: Long,
+ended: Long, timestamp: Long): SparkUI = {
+  val ui = putAppUI(appId, attemptId, completed, started, ended, 
timestamp)
+  attachSparkUI(appId, attemptId, ui, completed)
+  ui
+}
+
+def putAppUI(appId: String, attemptId: Option[String], completed: 
Boolean, started: Long,
+ended: Long, timestamp: Long): SparkUI = {
+  val ui = newUI(appId, attemptId, completed, started, ended)
+  putInstance(appId, attemptId, ui, 

[GitHub] spark pull request: [SPARK-7889] [CORE] HistoryServer to refresh c...

2015-12-23 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/6935#discussion_r48364439
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -430,8 +517,55 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 }
 newIterator.foreach(addIfAbsent)
 oldIterator.foreach(addIfAbsent)
+mergedApps
+  }
+
+
+  /**
+   * Scan through all known incomplete applications, and check for any 
that have been updated.
+   *
+   * After the scan, if there were any updated attempts, [[applications]] 
is updated
+   * with the new values.
+   *
+   * 1. No attempt to replay the application is made; this scan is a low 
cost operation.
+   * 2. As this overwrites [[applications]] with a new value, it must not 
run concurrently
+   * with the main scan for new applications. That is: it must be in the 
[[checkForLogs()]]
+   * operation.
+   */
+  private[history] def scanAndUpdateIncompleteAttemptInfo(): Unit = {
+val newAttempts: Iterable[FsApplicationAttemptInfo] = applications
+.filter( e => !e._2.completed)
+.flatMap { e =>
+  // build list of (false, attempt) or (true, attempt') values
+  e._2.attempts.flatMap { prevInfo: FsApplicationAttemptInfo =>
+val path = new Path(logDir, prevInfo.logPath)
+try {
+  val status = fs.getFileStatus(path)
+  val size = getLogSize(status).getOrElse(-1L)
+  val aS = prevInfo.fileSize
+  if (size > aS) {
+logDebug(s"Attempt ${prevInfo.name}/${prevInfo.appId} size 
=> $size")
+Some(new FsApplicationAttemptInfo(prevInfo.logPath, 
prevInfo.name, prevInfo.appId,
+  prevInfo.attemptId, prevInfo.startTime, 
prevInfo.endTime, prevInfo.lastUpdated,
+  prevInfo.sparkUser, prevInfo.completed, size, 
attemptCounter.incrementAndGet()))
--- End diff --

done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7889] [CORE] HistoryServer to refresh c...

2015-12-23 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/6935#discussion_r48364502
  
--- Diff: 
core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala 
---
@@ -0,0 +1,476 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.util.{Date, NoSuchElementException}
+
+import javax.servlet.Filter
--- End diff --

fixed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7889] [CORE] HistoryServer to refresh c...

2015-12-23 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/6935#discussion_r48366527
  
--- Diff: 
core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala ---
@@ -21,15 +21,28 @@ import java.net.{HttpURLConnection, URL}
 import java.util.zip.ZipInputStream
 import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
 
+import scala.concurrent.duration._
+import scala.language.postfixOps
+
+import com.codahale.metrics.Counter
 import com.google.common.base.Charsets
 import com.google.common.io.{ByteStreams, Files}
 import org.apache.commons.io.{FileUtils, IOUtils}
+import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
+import org.json4s.JsonAST.JArray
+import org.json4s.jackson.JsonMethods._
 import org.mockito.Mockito.when
+import org.openqa.selenium.htmlunit.HtmlUnitDriver
+import org.openqa.selenium.WebDriver
+import org.scalatest.concurrent.Eventually
+import org.scalatest.selenium.WebBrowser
 import org.scalatest.{BeforeAndAfter, Matchers}
 import org.scalatest.mock.MockitoSugar
 
-import org.apache.spark.{JsonTestUtils, SecurityManager, SparkConf, 
SparkFunSuite}
+import org.apache.spark.ui.jobs.UIData.JobUIData
 import org.apache.spark.ui.{SparkUI, UIUtils}
+import org.apache.spark._
--- End diff --

I'll try and use that plugin very carefully and see what happens...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7889] [CORE] HistoryServer to refresh c...

2015-12-23 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/6935#discussion_r48366891
  
--- Diff: 
core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala ---
@@ -281,6 +296,191 @@ class HistoryServerSuite extends SparkFunSuite with 
BeforeAndAfter with Matchers
 all (siteRelativeLinks) should startWith (uiRoot)
   }
 
+  test("incomplete apps get refreshed") {
+
+implicit val webDriver: WebDriver = new HtmlUnitDriver
+implicit val formats = org.json4s.DefaultFormats
+
+// this test dir is explictly deleted on successful runs; retained for 
diagnostics when
+// not
+val logDir = 
Utils.createDirectory(System.getProperty("java.io.tmpdir", "logs"))
+
+// a new conf is used with the background thread set and running at 
its fastest
+// alllowed refresh rate (1Hz)
+val myConf = new SparkConf()
+  .set("spark.history.fs.logDirectory", logDir.getAbsolutePath)
+  .set("spark.eventLog.dir", logDir.getAbsolutePath)
+  .set("spark.history.fs.update.interval", "1s")
+  .set("spark.eventLog.enabled", "true")
+  .set("spark.history.cache.window", "250ms")
+  .remove("spark.testing")
+val provider = new FsHistoryProvider(myConf)
+val securityManager = new SecurityManager(myConf)
+
+val sc = new SparkContext("local", "test", myConf)
+val logDirUri = logDir.toURI
+val logDirPath = new Path(logDirUri)
+val fs = FileSystem.get(logDirUri, sc.hadoopConfiguration)
+
+def listDir(dir: Path): Seq[FileStatus] = {
+  val statuses = fs.listStatus(dir)
+  statuses.flatMap(
+stat => if (stat.isDirectory) listDir(stat.getPath) else Seq(stat))
+}
+
+def dumpLogDir(msg: String = ""): Unit = {
+  if (log.isDebugEnabled) {
+logDebug(msg)
+listDir(logDirPath).foreach { status =>
+  val s = status.toString
+  logDebug(s)
+}
+  }
+}
+
+// stop the server with the old config, and start the new one
+server.stop()
+server = new HistoryServer(myConf, provider, securityManager, 18080)
+server.initialize()
+server.bind()
+try {
+  val port = server.boundPort
+  val metrics = server.cacheMetrics
+
+  // assert that a metric has a value; if not dump the whole metrics 
instance
+  def assertMetric(name: String, counter: Counter, expected: Long): 
Unit = {
+val actual = counter.getCount
+if (actual != expected) {
+  // this is here because Scalatest loses stack depth
+  fail(s"Wrong $name value - expected $expected but got $actual" +
+  s" in metrics\n$metrics")
+}
+  }
+
+  // build a URL for an app or app/attempt plus a page underneath
+  def buildURL(appId: String, suffix: String): URL = {
+new URL(s"http://localhost:$port/history/$appId$suffix;)
+  }
+  val historyServerRoot = new URL(s"http://localhost:$port/;)
+  val historyServerIncompleted = new URL(historyServerRoot, 
"?page=1=true")
+
+  // assert the body of a URL contains a string; return that body
+  def assertUrlContains(url: URL, str: String): String = {
+val body = HistoryServerSuite.getUrl(url)
+assert(body.contains(str), s"did not find $str at $url : $body")
+body
+  }
+
+  // start initial job
+  val d = sc.parallelize(1 to 10)
+  d.count()
+  val stdInterval = interval(100 milliseconds)
+  val appId = eventually(timeout(20 seconds), stdInterval) {
+val json = getContentAndCode("applications", port)._2.get
+val apps = parse(json).asInstanceOf[JArray].arr
+apps should have size 1
+(apps.head \ "id").extract[String]
+  }
+
+  // which lists as incomplete
+  assertUrlContains(historyServerIncompleted, appId)
+
+  val appIdRoot = buildURL(appId, "")
+  val rootAppPage = HistoryServerSuite.getUrl(appIdRoot)
+  logDebug(s"$appIdRoot ->[${rootAppPage.length}] \n$rootAppPage")
+  // sanity check to make sure filter is chaining calls
+  rootAppPage should not be empty
+
+  def getAppUI: SparkUI = {
+provider.getAppUI(appId, None).get.ui
+  }
+
+  // selenium isn't that useful on failures...add our own reporting
+  def getNumJobs(suffix: String): Int = {
+val target = buildURL(appId, suffix)
+val targetBody = HistoryServerSuite.getUrl(target)
+try {
+  go to target.toExternalForm
+  findAll(cssSelector("tbody 

[GitHub] spark pull request: [SPARK-7889] [CORE] HistoryServer to refresh c...

2015-12-23 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/6935#discussion_r48363399
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/ApplicationCache.scala ---
@@ -0,0 +1,658 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.util.NoSuchElementException
+
+import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
+import javax.servlet.{DispatcherType, Filter, FilterChain, FilterConfig, 
ServletException, ServletRequest, ServletResponse}
+
+import scala.collection.JavaConverters._
+
+import com.codahale.metrics.{Counter, MetricRegistry, Timer}
+import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache, 
RemovalListener, RemovalNotification}
+import org.eclipse.jetty.servlet.FilterHolder
+
+import org.apache.spark.Logging
+import org.apache.spark.metrics.source.Source
+import org.apache.spark.ui.SparkUI
+import org.apache.spark.util.Clock
+
+/**
+ * Cache for applications.
+ *
+ * Completed applications are cached for as long as there is capacity for 
them.
+ * Incompleted applications have their update time checked on every
+ * retrieval; if the cached entry is out of date, it is refreshed.
+ *
+ * @note there must be only one instance of [[ApplicationCache]] in a
+ * JVM at a time. This is because a static field in 
[[ApplicationCacheCheckFilterRelay]]
+ * keeps a reference to the cache so that HTTP requests on the 
attempt-specific web UIs
+ * can probe the current cache to see if the attempts have changed.
+ *
+ * Creating multiple instances will break this routing.
+ * @param operations implementation of record access operations
+ * @param refreshInterval interval between refreshes in milliseconds.
+ * @param retainedApplications number of retained applications
+ * @param clock time source
+ */
+private[history] class ApplicationCache(
+val operations: ApplicationCacheOperations,
+val refreshInterval: Long,
+val retainedApplications: Int,
+val clock: Clock) extends Logging {
+
+  /**
+   * Services the load request from the cache.
+   */
+  private val appLoader = new CacheLoader[CacheKey, CacheEntry] {
+
+/** the cache key doesn't match an cached entry ... attempt to load it 
 */
+override def load(key: CacheKey): CacheEntry = {
+  loadApplicationEntry(key.appId, key.attemptId)
+}
+
+  }
+
+  /**
+   * Handler for callbacks from the cache of entry removal.
+   */
+  private val removalListener = new RemovalListener[CacheKey, CacheEntry] {
+
+/**
+ * Removal event notifies the provider to detach the UI.
+ * @param rm removal notification
+ */
+override def onRemoval(rm: RemovalNotification[CacheKey, CacheEntry]): 
Unit = {
+  metrics.evictionCount.inc()
+  val key = rm.getKey
+  logDebug(s"Evicting entry ${key}")
+  operations.detachSparkUI(key.appId, key.attemptId, rm.getValue().ui)
+}
+  }
+
+  /**
+   * The cache of applications.
+   *
+   * Tagged as `protected` so as to allow subclasses in tests to accesss 
it directly
+   */
+  protected val appCache: LoadingCache[CacheKey, CacheEntry] = {
+CacheBuilder.newBuilder()
+.maximumSize(retainedApplications)
+.removalListener(removalListener)
+.build(appLoader)
+  }
+
+  /**
+   * The metrics which are updated as the cache is used
+   */
+  val metrics = new CacheMetrics("history.cache")
+
+  init()
+
+  /**
+   * Perform any startup operations.
+   *
+   * This includes declaring this instance as the cache to use in the
+   * [[ApplicationCacheCheckFilterRelay]].
+   */
+  private def init(): Unit = {
+ApplicationCacheCheckFilterRelay.setApplicationCache(this)
+  }
+
+  /**

[GitHub] spark pull request: [SPARK-7889] [CORE] HistoryServer to refresh c...

2015-12-23 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/6935#discussion_r48364066
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -430,8 +517,55 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 }
 newIterator.foreach(addIfAbsent)
 oldIterator.foreach(addIfAbsent)
+mergedApps
+  }
+
+
+  /**
+   * Scan through all known incomplete applications, and check for any 
that have been updated.
+   *
+   * After the scan, if there were any updated attempts, [[applications]] 
is updated
+   * with the new values.
+   *
+   * 1. No attempt to replay the application is made; this scan is a low 
cost operation.
+   * 2. As this overwrites [[applications]] with a new value, it must not 
run concurrently
+   * with the main scan for new applications. That is: it must be in the 
[[checkForLogs()]]
+   * operation.
+   */
+  private[history] def scanAndUpdateIncompleteAttemptInfo(): Unit = {
+val newAttempts: Iterable[FsApplicationAttemptInfo] = applications
+.filter( e => !e._2.completed)
+.flatMap { e =>
+  // build list of (false, attempt) or (true, attempt') values
--- End diff --

pasted in your text. Also noted that if a file isn't found, that isn't an 
update/state change.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7889] [CORE] HistoryServer to refresh c...

2015-12-23 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/6935#discussion_r48366085
  
--- Diff: 
core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala 
---
@@ -0,0 +1,476 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.util.{Date, NoSuchElementException}
+
+import javax.servlet.Filter
+
+import scala.collection.mutable
+import scala.collection.mutable.ListBuffer
+import scala.language.postfixOps
+
+import com.codahale.metrics.Counter
+import com.google.common.cache.LoadingCache
+import com.google.common.util.concurrent.UncheckedExecutionException
+import org.eclipse.jetty.servlet.ServletContextHandler
+import org.mockito.Mockito._
+import org.scalatest.Matchers
+import org.scalatest.mock.MockitoSugar
+
+import org.apache.spark.status.api.v1.{ApplicationAttemptInfo => 
AttemptInfo, ApplicationInfo}
+import org.apache.spark.ui.SparkUI
+import org.apache.spark.util.{Clock, ManualClock, Utils}
+import org.apache.spark.{Logging, SparkFunSuite}
+
+class ApplicationCacheSuite extends SparkFunSuite with Logging with 
MockitoSugar with Matchers {
+
+  /**
+   * subclass with access to the cache internals
+   * @param refreshInterval interval between refreshes in milliseconds.
+   * @param retainedApplications number of retained applications
+   */
+  class TestApplicationCache(
+  operations: ApplicationCacheOperations = new StubCacheOperations(),
+  refreshInterval: Long,
+  retainedApplications: Int,
+  clock: Clock = new ManualClock(0))
+  extends ApplicationCache(operations, refreshInterval, 
retainedApplications, clock) {
+
+def cache(): LoadingCache[CacheKey, CacheEntry] = appCache
+  }
+
+  /**
+   * Stub cache operations.
+   * The state is kept in a map of [[CacheKey]] to [[CacheEntry]],
+   * the `probeTime` field in the cache entry setting the timestamp of the 
entry
+   */
+  class StubCacheOperations extends ApplicationCacheOperations with 
Logging {
+
+/** map to UI instances, including timestamps, which are used in 
update probes */
+val instances = mutable.HashMap.empty[CacheKey, CacheEntry]
+
+/** Map of attached spark UIs */
+val attached = mutable.HashMap.empty[CacheKey, SparkUI]
+
+var getAppUICount = 0L
+var attachCount = 0L
+var detachCount = 0L
+var updateProbeCount = 0L
+
+/**
+ * Get the application UI
+ * @param appId application ID
+ * @param attemptId attempt ID
+ * @return If found, the Spark UI and any history information to be 
used in the cache
+ */
+override def getAppUI(appId: String, attemptId: Option[String]): 
Option[LoadedAppUI] = {
+  logDebug(s"getAppUI($appId, $attemptId)")
+  getAppUICount += 1
+  instances.get(CacheKey(appId, attemptId)).map( e =>
+LoadedAppUI(e.ui, Some(new 
StubHistoryProviderUpdateState(e.probeTime
+}
+
+override def attachSparkUI(appId: String, attemptId: Option[String], 
ui: SparkUI,
+completed: Boolean): Unit = {
+  logDebug(s"attachSparkUI($appId, $attemptId, $ui)")
+  attachCount += 1
+  attached += (CacheKey(appId, attemptId) -> ui)
+}
+
+def putAndAttach(appId: String, attemptId: Option[String], completed: 
Boolean, started: Long,
+ended: Long, timestamp: Long): SparkUI = {
+  val ui = putAppUI(appId, attemptId, completed, started, ended, 
timestamp)
+  attachSparkUI(appId, attemptId, ui, completed)
+  ui
+}
+
+def putAppUI(appId: String, attemptId: Option[String], completed: 
Boolean, started: Long,
+ended: Long, timestamp: Long): SparkUI = {
+  val ui = newUI(appId, attemptId, completed, started, ended)
+  putInstance(appId, attemptId, ui, 

[GitHub] spark pull request: [SPARK-7889] [CORE] HistoryServer to refresh c...

2015-12-23 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/6935#discussion_r48366097
  
--- Diff: 
core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala 
---
@@ -0,0 +1,476 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.util.{Date, NoSuchElementException}
+
+import javax.servlet.Filter
+
+import scala.collection.mutable
+import scala.collection.mutable.ListBuffer
+import scala.language.postfixOps
+
+import com.codahale.metrics.Counter
+import com.google.common.cache.LoadingCache
+import com.google.common.util.concurrent.UncheckedExecutionException
+import org.eclipse.jetty.servlet.ServletContextHandler
+import org.mockito.Mockito._
+import org.scalatest.Matchers
+import org.scalatest.mock.MockitoSugar
+
+import org.apache.spark.status.api.v1.{ApplicationAttemptInfo => 
AttemptInfo, ApplicationInfo}
+import org.apache.spark.ui.SparkUI
+import org.apache.spark.util.{Clock, ManualClock, Utils}
+import org.apache.spark.{Logging, SparkFunSuite}
+
+class ApplicationCacheSuite extends SparkFunSuite with Logging with 
MockitoSugar with Matchers {
+
+  /**
+   * subclass with access to the cache internals
+   * @param refreshInterval interval between refreshes in milliseconds.
+   * @param retainedApplications number of retained applications
+   */
+  class TestApplicationCache(
+  operations: ApplicationCacheOperations = new StubCacheOperations(),
+  refreshInterval: Long,
+  retainedApplications: Int,
+  clock: Clock = new ManualClock(0))
+  extends ApplicationCache(operations, refreshInterval, 
retainedApplications, clock) {
+
+def cache(): LoadingCache[CacheKey, CacheEntry] = appCache
+  }
+
+  /**
+   * Stub cache operations.
+   * The state is kept in a map of [[CacheKey]] to [[CacheEntry]],
+   * the `probeTime` field in the cache entry setting the timestamp of the 
entry
+   */
+  class StubCacheOperations extends ApplicationCacheOperations with 
Logging {
+
+/** map to UI instances, including timestamps, which are used in 
update probes */
+val instances = mutable.HashMap.empty[CacheKey, CacheEntry]
+
+/** Map of attached spark UIs */
+val attached = mutable.HashMap.empty[CacheKey, SparkUI]
+
+var getAppUICount = 0L
+var attachCount = 0L
+var detachCount = 0L
+var updateProbeCount = 0L
+
+/**
+ * Get the application UI
+ * @param appId application ID
+ * @param attemptId attempt ID
+ * @return If found, the Spark UI and any history information to be 
used in the cache
+ */
+override def getAppUI(appId: String, attemptId: Option[String]): 
Option[LoadedAppUI] = {
+  logDebug(s"getAppUI($appId, $attemptId)")
+  getAppUICount += 1
+  instances.get(CacheKey(appId, attemptId)).map( e =>
+LoadedAppUI(e.ui, Some(new 
StubHistoryProviderUpdateState(e.probeTime
+}
+
+override def attachSparkUI(appId: String, attemptId: Option[String], 
ui: SparkUI,
+completed: Boolean): Unit = {
+  logDebug(s"attachSparkUI($appId, $attemptId, $ui)")
+  attachCount += 1
+  attached += (CacheKey(appId, attemptId) -> ui)
+}
+
+def putAndAttach(appId: String, attemptId: Option[String], completed: 
Boolean, started: Long,
+ended: Long, timestamp: Long): SparkUI = {
+  val ui = putAppUI(appId, attemptId, completed, started, ended, 
timestamp)
+  attachSparkUI(appId, attemptId, ui, completed)
+  ui
+}
+
+def putAppUI(appId: String, attemptId: Option[String], completed: 
Boolean, started: Long,
+ended: Long, timestamp: Long): SparkUI = {
+  val ui = newUI(appId, attemptId, completed, started, ended)
+  putInstance(appId, attemptId, ui, 

[GitHub] spark pull request: [SPARK-7889] [CORE] HistoryServer to refresh c...

2015-12-23 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/6935#discussion_r48364810
  
--- Diff: 
core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala 
---
@@ -0,0 +1,476 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.util.{Date, NoSuchElementException}
+
+import javax.servlet.Filter
+
+import scala.collection.mutable
+import scala.collection.mutable.ListBuffer
+import scala.language.postfixOps
+
+import com.codahale.metrics.Counter
+import com.google.common.cache.LoadingCache
+import com.google.common.util.concurrent.UncheckedExecutionException
+import org.eclipse.jetty.servlet.ServletContextHandler
+import org.mockito.Mockito._
+import org.scalatest.Matchers
+import org.scalatest.mock.MockitoSugar
+
+import org.apache.spark.status.api.v1.{ApplicationAttemptInfo => 
AttemptInfo, ApplicationInfo}
+import org.apache.spark.ui.SparkUI
+import org.apache.spark.util.{Clock, ManualClock, Utils}
+import org.apache.spark.{Logging, SparkFunSuite}
+
+class ApplicationCacheSuite extends SparkFunSuite with Logging with 
MockitoSugar with Matchers {
+
+  /**
+   * subclass with access to the cache internals
+   * @param refreshInterval interval between refreshes in milliseconds.
+   * @param retainedApplications number of retained applications
+   */
+  class TestApplicationCache(
+  operations: ApplicationCacheOperations = new StubCacheOperations(),
+  refreshInterval: Long,
+  retainedApplications: Int,
+  clock: Clock = new ManualClock(0))
+  extends ApplicationCache(operations, refreshInterval, 
retainedApplications, clock) {
+
+def cache(): LoadingCache[CacheKey, CacheEntry] = appCache
+  }
+
+  /**
+   * Stub cache operations.
+   * The state is kept in a map of [[CacheKey]] to [[CacheEntry]],
+   * the `probeTime` field in the cache entry setting the timestamp of the 
entry
+   */
+  class StubCacheOperations extends ApplicationCacheOperations with 
Logging {
+
+/** map to UI instances, including timestamps, which are used in 
update probes */
+val instances = mutable.HashMap.empty[CacheKey, CacheEntry]
+
+/** Map of attached spark UIs */
+val attached = mutable.HashMap.empty[CacheKey, SparkUI]
+
+var getAppUICount = 0L
+var attachCount = 0L
+var detachCount = 0L
+var updateProbeCount = 0L
+
+/**
+ * Get the application UI
+ * @param appId application ID
+ * @param attemptId attempt ID
+ * @return If found, the Spark UI and any history information to be 
used in the cache
+ */
+override def getAppUI(appId: String, attemptId: Option[String]): 
Option[LoadedAppUI] = {
+  logDebug(s"getAppUI($appId, $attemptId)")
+  getAppUICount += 1
+  instances.get(CacheKey(appId, attemptId)).map( e =>
+LoadedAppUI(e.ui, Some(new 
StubHistoryProviderUpdateState(e.probeTime
+}
+
+override def attachSparkUI(appId: String, attemptId: Option[String], 
ui: SparkUI,
+completed: Boolean): Unit = {
+  logDebug(s"attachSparkUI($appId, $attemptId, $ui)")
+  attachCount += 1
+  attached += (CacheKey(appId, attemptId) -> ui)
+}
+
+def putAndAttach(appId: String, attemptId: Option[String], completed: 
Boolean, started: Long,
+ended: Long, timestamp: Long): SparkUI = {
+  val ui = putAppUI(appId, attemptId, completed, started, ended, 
timestamp)
+  attachSparkUI(appId, attemptId, ui, completed)
+  ui
+}
+
+def putAppUI(appId: String, attemptId: Option[String], completed: 
Boolean, started: Long,
+ended: Long, timestamp: Long): SparkUI = {
+  val ui = newUI(appId, attemptId, completed, started, ended)
+  putInstance(appId, attemptId, ui, 

[GitHub] spark pull request: [SPARK-7889] [CORE] HistoryServer to refresh c...

2015-12-23 Thread steveloughran
Github user steveloughran commented on the pull request:

https://github.com/apache/spark/pull/6935#issuecomment-166966798
  
I've updated the patch with the comments, and reworked how the updated 
probe works, removing the need to have provider-specific state cached, returned 
in the callback and cast.

Now, when an app UI is loaded, a probe is returned which has it's 
{{isUpdated()}} predicate evaluated —and if true, a cache refresh triggered. 
The FS probe looks a filesize; the yarn one will do a version counter, anything 
else can do there own.

This is a lot cleaner and purer: the only more puritanical option would be 
to go from an abstract probe class to simply passing a function 
{{Unit=>Boolean}} around.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7889] [CORE] HistoryServer to refresh c...

2015-12-15 Thread steveloughran
Github user steveloughran commented on the pull request:

https://github.com/apache/spark/pull/6935#issuecomment-164740423
  
yeah, I found it surprisingly complex too ... if I known at the start I'd 
have steered clear. And don't worry about any delay; there are some other spark 
PRs of mine which are older, and I have been neglecting too many hadoop core 
patches to have any right to be feel even mildly neglected.

I'll look at the issues. 

I also want to extend the tests to the REST API too, including the actual 
getlogs operation.

I'll have something up by the weekend


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7889] [CORE] HistoryServer to refresh c...

2015-12-15 Thread 3ourroom
Github user 3ourroom commented on the pull request:

https://github.com/apache/spark/pull/6935#issuecomment-164741165
  

NAVER - http://www.naver.com/


3ourr...@naver.com 님께 보내신 메일  이 
다음과 같은 이유로 전송 실패했습니다.



받는 사람이 회원님의 메일을 수신차단 하였습니다. 






---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7889] [CORE] HistoryServer to refresh c...

2015-12-14 Thread squito
Github user squito commented on the pull request:

https://github.com/apache/spark/pull/6935#issuecomment-164540400
  
Jenkins, retest this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7889] [CORE] HistoryServer to refresh c...

2015-12-14 Thread squito
Github user squito commented on the pull request:

https://github.com/apache/spark/pull/6935#issuecomment-164540371
  
Jenkins, add to whitelist


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7889] [CORE] HistoryServer to refresh c...

2015-12-14 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/6935#discussion_r47552578
  
--- Diff: 
core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala 
---
@@ -0,0 +1,476 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.util.{Date, NoSuchElementException}
+
+import javax.servlet.Filter
--- End diff --

same group for java & javax


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7889] [CORE] HistoryServer to refresh c...

2015-12-14 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/6935#discussion_r47553423
  
--- Diff: 
core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala 
---
@@ -0,0 +1,476 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.util.{Date, NoSuchElementException}
+
+import javax.servlet.Filter
+
+import scala.collection.mutable
+import scala.collection.mutable.ListBuffer
+import scala.language.postfixOps
+
+import com.codahale.metrics.Counter
+import com.google.common.cache.LoadingCache
+import com.google.common.util.concurrent.UncheckedExecutionException
+import org.eclipse.jetty.servlet.ServletContextHandler
+import org.mockito.Mockito._
+import org.scalatest.Matchers
+import org.scalatest.mock.MockitoSugar
+
+import org.apache.spark.status.api.v1.{ApplicationAttemptInfo => 
AttemptInfo, ApplicationInfo}
+import org.apache.spark.ui.SparkUI
+import org.apache.spark.util.{Clock, ManualClock, Utils}
+import org.apache.spark.{Logging, SparkFunSuite}
+
+class ApplicationCacheSuite extends SparkFunSuite with Logging with 
MockitoSugar with Matchers {
+
+  /**
+   * subclass with access to the cache internals
+   * @param refreshInterval interval between refreshes in milliseconds.
+   * @param retainedApplications number of retained applications
+   */
+  class TestApplicationCache(
+  operations: ApplicationCacheOperations = new StubCacheOperations(),
+  refreshInterval: Long,
+  retainedApplications: Int,
+  clock: Clock = new ManualClock(0))
+  extends ApplicationCache(operations, refreshInterval, 
retainedApplications, clock) {
+
+def cache(): LoadingCache[CacheKey, CacheEntry] = appCache
+  }
+
+  /**
+   * Stub cache operations.
+   * The state is kept in a map of [[CacheKey]] to [[CacheEntry]],
+   * the `probeTime` field in the cache entry setting the timestamp of the 
entry
+   */
+  class StubCacheOperations extends ApplicationCacheOperations with 
Logging {
+
+/** map to UI instances, including timestamps, which are used in 
update probes */
+val instances = mutable.HashMap.empty[CacheKey, CacheEntry]
+
+/** Map of attached spark UIs */
+val attached = mutable.HashMap.empty[CacheKey, SparkUI]
+
+var getAppUICount = 0L
+var attachCount = 0L
+var detachCount = 0L
+var updateProbeCount = 0L
+
+/**
+ * Get the application UI
+ * @param appId application ID
+ * @param attemptId attempt ID
+ * @return If found, the Spark UI and any history information to be 
used in the cache
+ */
+override def getAppUI(appId: String, attemptId: Option[String]): 
Option[LoadedAppUI] = {
+  logDebug(s"getAppUI($appId, $attemptId)")
+  getAppUICount += 1
+  instances.get(CacheKey(appId, attemptId)).map( e =>
+LoadedAppUI(e.ui, Some(new 
StubHistoryProviderUpdateState(e.probeTime
+}
+
+override def attachSparkUI(appId: String, attemptId: Option[String], 
ui: SparkUI,
+completed: Boolean): Unit = {
+  logDebug(s"attachSparkUI($appId, $attemptId, $ui)")
+  attachCount += 1
+  attached += (CacheKey(appId, attemptId) -> ui)
+}
+
+def putAndAttach(appId: String, attemptId: Option[String], completed: 
Boolean, started: Long,
+ended: Long, timestamp: Long): SparkUI = {
+  val ui = putAppUI(appId, attemptId, completed, started, ended, 
timestamp)
+  attachSparkUI(appId, attemptId, ui, completed)
+  ui
+}
+
+def putAppUI(appId: String, attemptId: Option[String], completed: 
Boolean, started: Long,
+ended: Long, timestamp: Long): SparkUI = {
+  val ui = newUI(appId, attemptId, completed, started, ended)
+  putInstance(appId, attemptId, ui, completed, 

[GitHub] spark pull request: [SPARK-7889] [CORE] HistoryServer to refresh c...

2015-12-14 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/6935#discussion_r47549529
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/ApplicationCache.scala ---
@@ -0,0 +1,658 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.util.NoSuchElementException
+
+import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
--- End diff --

nit: no blank line between java & javax imports


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7889] [CORE] HistoryServer to refresh c...

2015-12-14 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/6935#discussion_r47550432
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/ApplicationCache.scala ---
@@ -0,0 +1,658 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.util.NoSuchElementException
+
+import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
+import javax.servlet.{DispatcherType, Filter, FilterChain, FilterConfig, 
ServletException, ServletRequest, ServletResponse}
+
+import scala.collection.JavaConverters._
+
+import com.codahale.metrics.{Counter, MetricRegistry, Timer}
+import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache, 
RemovalListener, RemovalNotification}
+import org.eclipse.jetty.servlet.FilterHolder
+
+import org.apache.spark.Logging
+import org.apache.spark.metrics.source.Source
+import org.apache.spark.ui.SparkUI
+import org.apache.spark.util.Clock
+
+/**
+ * Cache for applications.
+ *
+ * Completed applications are cached for as long as there is capacity for 
them.
+ * Incompleted applications have their update time checked on every
+ * retrieval; if the cached entry is out of date, it is refreshed.
+ *
+ * @note there must be only one instance of [[ApplicationCache]] in a
+ * JVM at a time. This is because a static field in 
[[ApplicationCacheCheckFilterRelay]]
+ * keeps a reference to the cache so that HTTP requests on the 
attempt-specific web UIs
+ * can probe the current cache to see if the attempts have changed.
+ *
+ * Creating multiple instances will break this routing.
+ * @param operations implementation of record access operations
+ * @param refreshInterval interval between refreshes in milliseconds.
+ * @param retainedApplications number of retained applications
+ * @param clock time source
+ */
+private[history] class ApplicationCache(
+val operations: ApplicationCacheOperations,
+val refreshInterval: Long,
+val retainedApplications: Int,
+val clock: Clock) extends Logging {
+
+  /**
+   * Services the load request from the cache.
+   */
+  private val appLoader = new CacheLoader[CacheKey, CacheEntry] {
+
+/** the cache key doesn't match an cached entry ... attempt to load it 
 */
+override def load(key: CacheKey): CacheEntry = {
+  loadApplicationEntry(key.appId, key.attemptId)
+}
+
+  }
+
+  /**
+   * Handler for callbacks from the cache of entry removal.
+   */
+  private val removalListener = new RemovalListener[CacheKey, CacheEntry] {
+
+/**
+ * Removal event notifies the provider to detach the UI.
+ * @param rm removal notification
+ */
+override def onRemoval(rm: RemovalNotification[CacheKey, CacheEntry]): 
Unit = {
+  metrics.evictionCount.inc()
+  val key = rm.getKey
+  logDebug(s"Evicting entry ${key}")
+  operations.detachSparkUI(key.appId, key.attemptId, rm.getValue().ui)
+}
+  }
+
+  /**
+   * The cache of applications.
+   *
+   * Tagged as `protected` so as to allow subclasses in tests to accesss 
it directly
+   */
+  protected val appCache: LoadingCache[CacheKey, CacheEntry] = {
+CacheBuilder.newBuilder()
+.maximumSize(retainedApplications)
+.removalListener(removalListener)
+.build(appLoader)
+  }
+
+  /**
+   * The metrics which are updated as the cache is used
+   */
+  val metrics = new CacheMetrics("history.cache")
+
+  init()
+
+  /**
+   * Perform any startup operations.
+   *
+   * This includes declaring this instance as the cache to use in the
+   * [[ApplicationCacheCheckFilterRelay]].
+   */
+  private def init(): Unit = {
+ApplicationCacheCheckFilterRelay.setApplicationCache(this)
+  }
+
+  /**
+   * 

[GitHub] spark pull request: [SPARK-7889] [CORE] HistoryServer to refresh c...

2015-12-14 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/6935#discussion_r47551365
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
 ---
@@ -33,7 +33,35 @@ private[spark] case class ApplicationAttemptInfo(
 private[spark] case class ApplicationHistoryInfo(
 id: String,
 name: String,
-attempts: List[ApplicationAttemptInfo])
+attempts: List[ApplicationAttemptInfo]) {
+
+  /**
+   * Has this application completed?
+   * @return true if the most recent attempt has completed
+   */
+  def completed: Boolean = {
+attempts.nonEmpty && attempts.head.completed
+  }
+}
+
+/**
+ * Case class which can be subclasses by any provider to store information 
which
+ * can then be used to determine whether an application attempt has been 
updated
+ * since the last time it was retrieved.
+ *
+ * Examples include: a timestamp, a counter or a checksum.
+ */
+private[history] case class HistoryProviderUpdateState()
--- End diff --

you probably don't want this to be a case class, case classes aren't meant 
to be extended.  The specific sub-classes could be case-classes if you want.  
But I guess this should just a trait


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7889] [CORE] HistoryServer to refresh c...

2015-12-14 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/6935#discussion_r47551552
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
 ---
@@ -73,4 +102,18 @@ private[history] abstract class 
ApplicationHistoryProvider {
   @throws(classOf[SparkException])
   def writeEventLogs(appId: String, attemptId: Option[String], zipStream: 
ZipOutputStream): Unit
 
+  /**
+   * Probe for an update to an (incompleted) application
+   * @param appId application ID
+   * @param attemptId optional attempt ID
+   * @param updateState state information needed by the provider to 
determine age
+   * @return true if the application has been updated
+   */
+  def isUpdated(
+  appId: String,
+  attemptId: Option[String],
+  updateState: Option[HistoryProviderUpdateState]): Boolean = {
+false
--- End diff --

why provide a default implementation here?  this is unlikely to be the 
right implementation, just means somebody implementing this interface is more 
likely to overlook supplying the right definition here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7889] [CORE] HistoryServer to refresh c...

2015-12-14 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/6935#discussion_r47551814
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -385,24 +448,48 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
   }
 }
 
-if (newAttempts.isEmpty) {
-  return
+updateApplicationsWithNewAttempts(newAttempts)
+  }
+
+  /**
+   * Merge in all new attempts with those in [[applications]], updating 
the [[applications]]
+   * field afterwards. It _must not_ be executed concurrently, else 
attempt information may
+   * be lost.
+   * @param newAttempts a possibly empty list of new attempts
+   */
+  private def updateApplicationsWithNewAttempts(newAttempts: 
Iterable[FsApplicationAttemptInfo])
+: Unit = {
--- End diff --

nit: move the arg to the second line, indent 4 spaces


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7889] [CORE] HistoryServer to refresh c...

2015-12-14 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/6935#discussion_r47551842
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -385,24 +448,48 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
   }
 }
 
-if (newAttempts.isEmpty) {
-  return
+updateApplicationsWithNewAttempts(newAttempts)
+  }
+
+  /**
+   * Merge in all new attempts with those in [[applications]], updating 
the [[applications]]
+   * field afterwards. It _must not_ be executed concurrently, else 
attempt information may
+   * be lost.
+   * @param newAttempts a possibly empty list of new attempts
+   */
+  private def updateApplicationsWithNewAttempts(newAttempts: 
Iterable[FsApplicationAttemptInfo])
+: Unit = {
+if (newAttempts.nonEmpty) {
+  applications = mergeAttempts(newAttempts, applications)
 }
+  }
+
+  /**
+   * Build a map containing all apps that contain new attempts. The app 
information in this map
+   * contains both the new app attempt, and those that were already loaded 
in the existing apps
+   * map. If an attempt has been updated, it replaces the old attempt in 
the list.
+   * The ordering is maintained
+   * @param newAttempts new attempt list
+   * @param current the current attempt list
+   * @return the updated list
+   */
+  private def mergeAttempts(newAttempts: 
Iterable[FsApplicationAttemptInfo],
--- End diff --

nit: first arg on its own line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7889] [CORE] HistoryServer to refresh c...

2015-12-14 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/6935#discussion_r47552043
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -430,8 +517,55 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 }
 newIterator.foreach(addIfAbsent)
 oldIterator.foreach(addIfAbsent)
+mergedApps
+  }
+
+
+  /**
+   * Scan through all known incomplete applications, and check for any 
that have been updated.
+   *
+   * After the scan, if there were any updated attempts, [[applications]] 
is updated
+   * with the new values.
+   *
+   * 1. No attempt to replay the application is made; this scan is a low 
cost operation.
+   * 2. As this overwrites [[applications]] with a new value, it must not 
run concurrently
+   * with the main scan for new applications. That is: it must be in the 
[[checkForLogs()]]
+   * operation.
+   */
+  private[history] def scanAndUpdateIncompleteAttemptInfo(): Unit = {
+val newAttempts: Iterable[FsApplicationAttemptInfo] = applications
+.filter( e => !e._2.completed)
+.flatMap { e =>
+  // build list of (false, attempt) or (true, attempt') values
--- End diff --

comment is still outdated, right?  should be something like "build list of 
incomplete apps that have been updated"?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7889] [CORE] HistoryServer to refresh c...

2015-12-14 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/6935#discussion_r47553168
  
--- Diff: 
core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala 
---
@@ -0,0 +1,476 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.util.{Date, NoSuchElementException}
+
+import javax.servlet.Filter
+
+import scala.collection.mutable
+import scala.collection.mutable.ListBuffer
+import scala.language.postfixOps
+
+import com.codahale.metrics.Counter
+import com.google.common.cache.LoadingCache
+import com.google.common.util.concurrent.UncheckedExecutionException
+import org.eclipse.jetty.servlet.ServletContextHandler
+import org.mockito.Mockito._
+import org.scalatest.Matchers
+import org.scalatest.mock.MockitoSugar
+
+import org.apache.spark.status.api.v1.{ApplicationAttemptInfo => 
AttemptInfo, ApplicationInfo}
+import org.apache.spark.ui.SparkUI
+import org.apache.spark.util.{Clock, ManualClock, Utils}
+import org.apache.spark.{Logging, SparkFunSuite}
+
+class ApplicationCacheSuite extends SparkFunSuite with Logging with 
MockitoSugar with Matchers {
+
+  /**
+   * subclass with access to the cache internals
+   * @param refreshInterval interval between refreshes in milliseconds.
+   * @param retainedApplications number of retained applications
+   */
+  class TestApplicationCache(
+  operations: ApplicationCacheOperations = new StubCacheOperations(),
+  refreshInterval: Long,
+  retainedApplications: Int,
+  clock: Clock = new ManualClock(0))
+  extends ApplicationCache(operations, refreshInterval, 
retainedApplications, clock) {
+
+def cache(): LoadingCache[CacheKey, CacheEntry] = appCache
+  }
+
+  /**
+   * Stub cache operations.
+   * The state is kept in a map of [[CacheKey]] to [[CacheEntry]],
+   * the `probeTime` field in the cache entry setting the timestamp of the 
entry
+   */
+  class StubCacheOperations extends ApplicationCacheOperations with 
Logging {
+
+/** map to UI instances, including timestamps, which are used in 
update probes */
+val instances = mutable.HashMap.empty[CacheKey, CacheEntry]
+
+/** Map of attached spark UIs */
+val attached = mutable.HashMap.empty[CacheKey, SparkUI]
+
+var getAppUICount = 0L
+var attachCount = 0L
+var detachCount = 0L
+var updateProbeCount = 0L
+
+/**
+ * Get the application UI
+ * @param appId application ID
+ * @param attemptId attempt ID
+ * @return If found, the Spark UI and any history information to be 
used in the cache
+ */
+override def getAppUI(appId: String, attemptId: Option[String]): 
Option[LoadedAppUI] = {
+  logDebug(s"getAppUI($appId, $attemptId)")
+  getAppUICount += 1
+  instances.get(CacheKey(appId, attemptId)).map( e =>
+LoadedAppUI(e.ui, Some(new 
StubHistoryProviderUpdateState(e.probeTime
+}
+
+override def attachSparkUI(appId: String, attemptId: Option[String], 
ui: SparkUI,
+completed: Boolean): Unit = {
+  logDebug(s"attachSparkUI($appId, $attemptId, $ui)")
+  attachCount += 1
+  attached += (CacheKey(appId, attemptId) -> ui)
+}
+
+def putAndAttach(appId: String, attemptId: Option[String], completed: 
Boolean, started: Long,
+ended: Long, timestamp: Long): SparkUI = {
+  val ui = putAppUI(appId, attemptId, completed, started, ended, 
timestamp)
+  attachSparkUI(appId, attemptId, ui, completed)
+  ui
+}
+
+def putAppUI(appId: String, attemptId: Option[String], completed: 
Boolean, started: Long,
+ended: Long, timestamp: Long): SparkUI = {
+  val ui = newUI(appId, attemptId, completed, started, ended)
+  putInstance(appId, attemptId, ui, completed, 

[GitHub] spark pull request: [SPARK-7889] [CORE] HistoryServer to refresh c...

2015-12-14 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/6935#discussion_r47553490
  
--- Diff: 
core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala 
---
@@ -0,0 +1,476 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.util.{Date, NoSuchElementException}
+
+import javax.servlet.Filter
+
+import scala.collection.mutable
+import scala.collection.mutable.ListBuffer
+import scala.language.postfixOps
+
+import com.codahale.metrics.Counter
+import com.google.common.cache.LoadingCache
+import com.google.common.util.concurrent.UncheckedExecutionException
+import org.eclipse.jetty.servlet.ServletContextHandler
+import org.mockito.Mockito._
+import org.scalatest.Matchers
+import org.scalatest.mock.MockitoSugar
+
+import org.apache.spark.status.api.v1.{ApplicationAttemptInfo => 
AttemptInfo, ApplicationInfo}
+import org.apache.spark.ui.SparkUI
+import org.apache.spark.util.{Clock, ManualClock, Utils}
+import org.apache.spark.{Logging, SparkFunSuite}
+
+class ApplicationCacheSuite extends SparkFunSuite with Logging with 
MockitoSugar with Matchers {
+
+  /**
+   * subclass with access to the cache internals
+   * @param refreshInterval interval between refreshes in milliseconds.
+   * @param retainedApplications number of retained applications
+   */
+  class TestApplicationCache(
+  operations: ApplicationCacheOperations = new StubCacheOperations(),
+  refreshInterval: Long,
+  retainedApplications: Int,
+  clock: Clock = new ManualClock(0))
+  extends ApplicationCache(operations, refreshInterval, 
retainedApplications, clock) {
+
+def cache(): LoadingCache[CacheKey, CacheEntry] = appCache
+  }
+
+  /**
+   * Stub cache operations.
+   * The state is kept in a map of [[CacheKey]] to [[CacheEntry]],
+   * the `probeTime` field in the cache entry setting the timestamp of the 
entry
+   */
+  class StubCacheOperations extends ApplicationCacheOperations with 
Logging {
+
+/** map to UI instances, including timestamps, which are used in 
update probes */
+val instances = mutable.HashMap.empty[CacheKey, CacheEntry]
+
+/** Map of attached spark UIs */
+val attached = mutable.HashMap.empty[CacheKey, SparkUI]
+
+var getAppUICount = 0L
+var attachCount = 0L
+var detachCount = 0L
+var updateProbeCount = 0L
+
+/**
+ * Get the application UI
+ * @param appId application ID
+ * @param attemptId attempt ID
+ * @return If found, the Spark UI and any history information to be 
used in the cache
+ */
+override def getAppUI(appId: String, attemptId: Option[String]): 
Option[LoadedAppUI] = {
+  logDebug(s"getAppUI($appId, $attemptId)")
+  getAppUICount += 1
+  instances.get(CacheKey(appId, attemptId)).map( e =>
+LoadedAppUI(e.ui, Some(new 
StubHistoryProviderUpdateState(e.probeTime
+}
+
+override def attachSparkUI(appId: String, attemptId: Option[String], 
ui: SparkUI,
+completed: Boolean): Unit = {
+  logDebug(s"attachSparkUI($appId, $attemptId, $ui)")
+  attachCount += 1
+  attached += (CacheKey(appId, attemptId) -> ui)
+}
+
+def putAndAttach(appId: String, attemptId: Option[String], completed: 
Boolean, started: Long,
+ended: Long, timestamp: Long): SparkUI = {
+  val ui = putAppUI(appId, attemptId, completed, started, ended, 
timestamp)
+  attachSparkUI(appId, attemptId, ui, completed)
+  ui
+}
+
+def putAppUI(appId: String, attemptId: Option[String], completed: 
Boolean, started: Long,
+ended: Long, timestamp: Long): SparkUI = {
+  val ui = newUI(appId, attemptId, completed, started, ended)
+  putInstance(appId, attemptId, ui, completed, 

[GitHub] spark pull request: [SPARK-7889] [CORE] HistoryServer to refresh c...

2015-12-14 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/6935#discussion_r47553757
  
--- Diff: 
core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala 
---
@@ -0,0 +1,476 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.util.{Date, NoSuchElementException}
+
+import javax.servlet.Filter
+
+import scala.collection.mutable
+import scala.collection.mutable.ListBuffer
+import scala.language.postfixOps
+
+import com.codahale.metrics.Counter
+import com.google.common.cache.LoadingCache
+import com.google.common.util.concurrent.UncheckedExecutionException
+import org.eclipse.jetty.servlet.ServletContextHandler
+import org.mockito.Mockito._
+import org.scalatest.Matchers
+import org.scalatest.mock.MockitoSugar
+
+import org.apache.spark.status.api.v1.{ApplicationAttemptInfo => 
AttemptInfo, ApplicationInfo}
+import org.apache.spark.ui.SparkUI
+import org.apache.spark.util.{Clock, ManualClock, Utils}
+import org.apache.spark.{Logging, SparkFunSuite}
+
+class ApplicationCacheSuite extends SparkFunSuite with Logging with 
MockitoSugar with Matchers {
+
+  /**
+   * subclass with access to the cache internals
+   * @param refreshInterval interval between refreshes in milliseconds.
+   * @param retainedApplications number of retained applications
+   */
+  class TestApplicationCache(
+  operations: ApplicationCacheOperations = new StubCacheOperations(),
+  refreshInterval: Long,
+  retainedApplications: Int,
+  clock: Clock = new ManualClock(0))
+  extends ApplicationCache(operations, refreshInterval, 
retainedApplications, clock) {
+
+def cache(): LoadingCache[CacheKey, CacheEntry] = appCache
+  }
+
+  /**
+   * Stub cache operations.
+   * The state is kept in a map of [[CacheKey]] to [[CacheEntry]],
+   * the `probeTime` field in the cache entry setting the timestamp of the 
entry
+   */
+  class StubCacheOperations extends ApplicationCacheOperations with 
Logging {
+
+/** map to UI instances, including timestamps, which are used in 
update probes */
+val instances = mutable.HashMap.empty[CacheKey, CacheEntry]
+
+/** Map of attached spark UIs */
+val attached = mutable.HashMap.empty[CacheKey, SparkUI]
+
+var getAppUICount = 0L
+var attachCount = 0L
+var detachCount = 0L
+var updateProbeCount = 0L
+
+/**
+ * Get the application UI
+ * @param appId application ID
+ * @param attemptId attempt ID
+ * @return If found, the Spark UI and any history information to be 
used in the cache
+ */
+override def getAppUI(appId: String, attemptId: Option[String]): 
Option[LoadedAppUI] = {
+  logDebug(s"getAppUI($appId, $attemptId)")
+  getAppUICount += 1
+  instances.get(CacheKey(appId, attemptId)).map( e =>
+LoadedAppUI(e.ui, Some(new 
StubHistoryProviderUpdateState(e.probeTime
+}
+
+override def attachSparkUI(appId: String, attemptId: Option[String], 
ui: SparkUI,
+completed: Boolean): Unit = {
+  logDebug(s"attachSparkUI($appId, $attemptId, $ui)")
+  attachCount += 1
+  attached += (CacheKey(appId, attemptId) -> ui)
+}
+
+def putAndAttach(appId: String, attemptId: Option[String], completed: 
Boolean, started: Long,
+ended: Long, timestamp: Long): SparkUI = {
+  val ui = putAppUI(appId, attemptId, completed, started, ended, 
timestamp)
+  attachSparkUI(appId, attemptId, ui, completed)
+  ui
+}
+
+def putAppUI(appId: String, attemptId: Option[String], completed: 
Boolean, started: Long,
+ended: Long, timestamp: Long): SparkUI = {
+  val ui = newUI(appId, attemptId, completed, started, ended)
+  putInstance(appId, attemptId, ui, completed, 

[GitHub] spark pull request: [SPARK-7889] [CORE] HistoryServer to refresh c...

2015-12-14 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/6935#discussion_r47554225
  
--- Diff: 
core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala ---
@@ -21,15 +21,28 @@ import java.net.{HttpURLConnection, URL}
 import java.util.zip.ZipInputStream
 import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
 
+import scala.concurrent.duration._
+import scala.language.postfixOps
+
+import com.codahale.metrics.Counter
 import com.google.common.base.Charsets
 import com.google.common.io.{ByteStreams, Files}
 import org.apache.commons.io.{FileUtils, IOUtils}
+import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
+import org.json4s.JsonAST.JArray
+import org.json4s.jackson.JsonMethods._
 import org.mockito.Mockito.when
+import org.openqa.selenium.htmlunit.HtmlUnitDriver
+import org.openqa.selenium.WebDriver
+import org.scalatest.concurrent.Eventually
+import org.scalatest.selenium.WebBrowser
 import org.scalatest.{BeforeAndAfter, Matchers}
 import org.scalatest.mock.MockitoSugar
 
-import org.apache.spark.{JsonTestUtils, SecurityManager, SparkConf, 
SparkFunSuite}
+import org.apache.spark.ui.jobs.UIData.JobUIData
 import org.apache.spark.ui.{SparkUI, UIUtils}
+import org.apache.spark._
--- End diff --

super nit: class imports sort before subclass imports, its not just 
alphabetical -- so actually even with "_" the line still sorts first in the 
spark group.  I really recommend using the IntelliJ import plugin, (not the 
builtin behavior) as explained here: 
https://cwiki.apache.org/confluence/display/SPARK/Spark+Code+Style+Guide#SparkCodeStyleGuide-Imports


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7889] [CORE] HistoryServer to refresh c...

2015-12-14 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/6935#discussion_r47555787
  
--- Diff: docs/monitoring.md ---
@@ -69,36 +83,53 @@ follows:
   
 
 
+### Spark configuration options
+
 
   Property NameDefaultMeaning
   
 spark.history.provider
-org.apache.spark.deploy.history.FsHistoryProvider
+org.apache.spark.deploy.history.FsHistoryProvider
 Name of the class implementing the application history backend. 
Currently there is only
 one implementation, provided by Spark, which looks for application 
logs stored in the
 file system.
   
   
+spark.history.retainedApplications
+50
--- End diff --

any particular reason to reorder this option?  I'm biased to keeping the 
old ordering just for git history


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7889] [CORE] HistoryServer to refresh c...

2015-12-14 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/6935#discussion_r47556139
  
--- Diff: docs/monitoring.md ---
@@ -69,36 +83,53 @@ follows:
   
 
 
+### Spark configuration options
+
 
   Property NameDefaultMeaning
   
 spark.history.provider
-org.apache.spark.deploy.history.FsHistoryProvider
+org.apache.spark.deploy.history.FsHistoryProvider
 Name of the class implementing the application history backend. 
Currently there is only
 one implementation, provided by Spark, which looks for application 
logs stored in the
 file system.
   
   
+spark.history.retainedApplications
+50
+
+  The number of application UIs to retain. If this cap is exceeded, 
then the oldest
+  applications will be removed.
+
+  
+  
 spark.history.fs.logDirectory
 file:/tmp/spark-events
 
- Directory that contains application event logs to be loaded by the 
history server
+For the filesystem history provider, the URL to the directory 
containing application event
+logs to load. This can be a local file:// path,
+an HDFS path hdfs://namenode/shared/spark-logs
+or that of an alternative filesystem supported by the Hadoop APIs.
 
   
   
 spark.history.fs.update.interval
 10s
 
-  The period at which information displayed by this history server is 
updated.
-  Each update checks for any changes made to the event logs in 
persisted storage.
+  The period at which the the filesystem history provider checks for 
new or
+  updated logs in the log directory. A shorter interval detects new 
applications faster,
+  at the expense of more server load re-reading updated applications.
+  As soon as an update has completed, listings of the completed and 
incomplete applications
+  will reflect the changes. For performance reasons, the UIs of web 
applications are
+  only updated at a slower interval, that defined in 
spark.history.cache.window 
--- End diff --

Reading this, I'm wondering if maybe it makes sense for 
"spark.history.cache.window" to be defaulted to 
"spark.history.fs.update.interval".  This might be more intuitive to the user 
for out-of-the-box behavior.  And the knob is still there for bigger clusters, 
where someone will need to look through these options more carefully in any 
case.

I'm not entirely convinced myself -- what do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7889] [CORE] HistoryServer to refresh c...

2015-12-14 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/6935#discussion_r47554002
  
--- Diff: 
core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala 
---
@@ -0,0 +1,476 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.util.{Date, NoSuchElementException}
+
+import javax.servlet.Filter
+
+import scala.collection.mutable
+import scala.collection.mutable.ListBuffer
+import scala.language.postfixOps
+
+import com.codahale.metrics.Counter
+import com.google.common.cache.LoadingCache
+import com.google.common.util.concurrent.UncheckedExecutionException
+import org.eclipse.jetty.servlet.ServletContextHandler
+import org.mockito.Mockito._
+import org.scalatest.Matchers
+import org.scalatest.mock.MockitoSugar
+
+import org.apache.spark.status.api.v1.{ApplicationAttemptInfo => 
AttemptInfo, ApplicationInfo}
+import org.apache.spark.ui.SparkUI
+import org.apache.spark.util.{Clock, ManualClock, Utils}
+import org.apache.spark.{Logging, SparkFunSuite}
+
+class ApplicationCacheSuite extends SparkFunSuite with Logging with 
MockitoSugar with Matchers {
+
+  /**
+   * subclass with access to the cache internals
+   * @param refreshInterval interval between refreshes in milliseconds.
+   * @param retainedApplications number of retained applications
+   */
+  class TestApplicationCache(
+  operations: ApplicationCacheOperations = new StubCacheOperations(),
+  refreshInterval: Long,
+  retainedApplications: Int,
+  clock: Clock = new ManualClock(0))
+  extends ApplicationCache(operations, refreshInterval, 
retainedApplications, clock) {
+
+def cache(): LoadingCache[CacheKey, CacheEntry] = appCache
+  }
+
+  /**
+   * Stub cache operations.
+   * The state is kept in a map of [[CacheKey]] to [[CacheEntry]],
+   * the `probeTime` field in the cache entry setting the timestamp of the 
entry
+   */
+  class StubCacheOperations extends ApplicationCacheOperations with 
Logging {
+
+/** map to UI instances, including timestamps, which are used in 
update probes */
+val instances = mutable.HashMap.empty[CacheKey, CacheEntry]
+
+/** Map of attached spark UIs */
+val attached = mutable.HashMap.empty[CacheKey, SparkUI]
+
+var getAppUICount = 0L
+var attachCount = 0L
+var detachCount = 0L
+var updateProbeCount = 0L
+
+/**
+ * Get the application UI
+ * @param appId application ID
+ * @param attemptId attempt ID
+ * @return If found, the Spark UI and any history information to be 
used in the cache
+ */
+override def getAppUI(appId: String, attemptId: Option[String]): 
Option[LoadedAppUI] = {
+  logDebug(s"getAppUI($appId, $attemptId)")
+  getAppUICount += 1
+  instances.get(CacheKey(appId, attemptId)).map( e =>
+LoadedAppUI(e.ui, Some(new 
StubHistoryProviderUpdateState(e.probeTime
+}
+
+override def attachSparkUI(appId: String, attemptId: Option[String], 
ui: SparkUI,
+completed: Boolean): Unit = {
+  logDebug(s"attachSparkUI($appId, $attemptId, $ui)")
+  attachCount += 1
+  attached += (CacheKey(appId, attemptId) -> ui)
+}
+
+def putAndAttach(appId: String, attemptId: Option[String], completed: 
Boolean, started: Long,
+ended: Long, timestamp: Long): SparkUI = {
+  val ui = putAppUI(appId, attemptId, completed, started, ended, 
timestamp)
+  attachSparkUI(appId, attemptId, ui, completed)
+  ui
+}
+
+def putAppUI(appId: String, attemptId: Option[String], completed: 
Boolean, started: Long,
+ended: Long, timestamp: Long): SparkUI = {
+  val ui = newUI(appId, attemptId, completed, started, ended)
+  putInstance(appId, attemptId, ui, completed, 

[GitHub] spark pull request: [SPARK-7889] [CORE] HistoryServer to refresh c...

2015-12-14 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/6935#discussion_r47554403
  
--- Diff: 
core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala ---
@@ -55,7 +68,7 @@ class HistoryServerSuite extends SparkFunSuite with 
BeforeAndAfter with Matchers
   def init(): Unit = {
 val conf = new SparkConf()
   .set("spark.history.fs.logDirectory", logDir.getAbsolutePath)
-  .set("spark.history.fs.updateInterval", "0")
+  .set("spark.history.fs.update.interval", "0")
--- End diff --

good catch


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7889] [CORE] HistoryServer to refresh c...

2015-12-14 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/6935#discussion_r47554883
  
--- Diff: 
core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala ---
@@ -281,6 +296,191 @@ class HistoryServerSuite extends SparkFunSuite with 
BeforeAndAfter with Matchers
 all (siteRelativeLinks) should startWith (uiRoot)
   }
 
+  test("incomplete apps get refreshed") {
+
+implicit val webDriver: WebDriver = new HtmlUnitDriver
+implicit val formats = org.json4s.DefaultFormats
+
+// this test dir is explictly deleted on successful runs; retained for 
diagnostics when
+// not
+val logDir = 
Utils.createDirectory(System.getProperty("java.io.tmpdir", "logs"))
+
+// a new conf is used with the background thread set and running at 
its fastest
+// alllowed refresh rate (1Hz)
+val myConf = new SparkConf()
+  .set("spark.history.fs.logDirectory", logDir.getAbsolutePath)
+  .set("spark.eventLog.dir", logDir.getAbsolutePath)
+  .set("spark.history.fs.update.interval", "1s")
+  .set("spark.eventLog.enabled", "true")
+  .set("spark.history.cache.window", "250ms")
+  .remove("spark.testing")
+val provider = new FsHistoryProvider(myConf)
+val securityManager = new SecurityManager(myConf)
+
+val sc = new SparkContext("local", "test", myConf)
+val logDirUri = logDir.toURI
+val logDirPath = new Path(logDirUri)
+val fs = FileSystem.get(logDirUri, sc.hadoopConfiguration)
+
+def listDir(dir: Path): Seq[FileStatus] = {
+  val statuses = fs.listStatus(dir)
+  statuses.flatMap(
+stat => if (stat.isDirectory) listDir(stat.getPath) else Seq(stat))
+}
+
+def dumpLogDir(msg: String = ""): Unit = {
+  if (log.isDebugEnabled) {
+logDebug(msg)
+listDir(logDirPath).foreach { status =>
+  val s = status.toString
+  logDebug(s)
+}
+  }
+}
+
+// stop the server with the old config, and start the new one
+server.stop()
+server = new HistoryServer(myConf, provider, securityManager, 18080)
+server.initialize()
+server.bind()
+try {
+  val port = server.boundPort
+  val metrics = server.cacheMetrics
+
+  // assert that a metric has a value; if not dump the whole metrics 
instance
+  def assertMetric(name: String, counter: Counter, expected: Long): 
Unit = {
+val actual = counter.getCount
+if (actual != expected) {
+  // this is here because Scalatest loses stack depth
+  fail(s"Wrong $name value - expected $expected but got $actual" +
+  s" in metrics\n$metrics")
+}
+  }
+
+  // build a URL for an app or app/attempt plus a page underneath
+  def buildURL(appId: String, suffix: String): URL = {
+new URL(s"http://localhost:$port/history/$appId$suffix;)
+  }
+  val historyServerRoot = new URL(s"http://localhost:$port/;)
+  val historyServerIncompleted = new URL(historyServerRoot, 
"?page=1=true")
+
+  // assert the body of a URL contains a string; return that body
+  def assertUrlContains(url: URL, str: String): String = {
+val body = HistoryServerSuite.getUrl(url)
+assert(body.contains(str), s"did not find $str at $url : $body")
+body
+  }
+
+  // start initial job
+  val d = sc.parallelize(1 to 10)
+  d.count()
+  val stdInterval = interval(100 milliseconds)
+  val appId = eventually(timeout(20 seconds), stdInterval) {
+val json = getContentAndCode("applications", port)._2.get
+val apps = parse(json).asInstanceOf[JArray].arr
+apps should have size 1
+(apps.head \ "id").extract[String]
+  }
+
+  // which lists as incomplete
+  assertUrlContains(historyServerIncompleted, appId)
+
+  val appIdRoot = buildURL(appId, "")
+  val rootAppPage = HistoryServerSuite.getUrl(appIdRoot)
+  logDebug(s"$appIdRoot ->[${rootAppPage.length}] \n$rootAppPage")
+  // sanity check to make sure filter is chaining calls
+  rootAppPage should not be empty
+
+  def getAppUI: SparkUI = {
+provider.getAppUI(appId, None).get.ui
+  }
+
+  // selenium isn't that useful on failures...add our own reporting
+  def getNumJobs(suffix: String): Int = {
+val target = buildURL(appId, suffix)
+val targetBody = HistoryServerSuite.getUrl(target)
+try {
+  go to target.toExternalForm
+  findAll(cssSelector("tbody 

[GitHub] spark pull request: [SPARK-7889] [CORE] HistoryServer to refresh c...

2015-12-14 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/6935#discussion_r47554923
  
--- Diff: 
core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala ---
@@ -281,6 +296,191 @@ class HistoryServerSuite extends SparkFunSuite with 
BeforeAndAfter with Matchers
 all (siteRelativeLinks) should startWith (uiRoot)
   }
 
+  test("incomplete apps get refreshed") {
+
+implicit val webDriver: WebDriver = new HtmlUnitDriver
+implicit val formats = org.json4s.DefaultFormats
+
+// this test dir is explictly deleted on successful runs; retained for 
diagnostics when
+// not
+val logDir = 
Utils.createDirectory(System.getProperty("java.io.tmpdir", "logs"))
+
+// a new conf is used with the background thread set and running at 
its fastest
+// alllowed refresh rate (1Hz)
+val myConf = new SparkConf()
+  .set("spark.history.fs.logDirectory", logDir.getAbsolutePath)
+  .set("spark.eventLog.dir", logDir.getAbsolutePath)
+  .set("spark.history.fs.update.interval", "1s")
+  .set("spark.eventLog.enabled", "true")
+  .set("spark.history.cache.window", "250ms")
+  .remove("spark.testing")
+val provider = new FsHistoryProvider(myConf)
+val securityManager = new SecurityManager(myConf)
+
+val sc = new SparkContext("local", "test", myConf)
+val logDirUri = logDir.toURI
+val logDirPath = new Path(logDirUri)
+val fs = FileSystem.get(logDirUri, sc.hadoopConfiguration)
+
+def listDir(dir: Path): Seq[FileStatus] = {
+  val statuses = fs.listStatus(dir)
+  statuses.flatMap(
+stat => if (stat.isDirectory) listDir(stat.getPath) else Seq(stat))
+}
+
+def dumpLogDir(msg: String = ""): Unit = {
+  if (log.isDebugEnabled) {
+logDebug(msg)
+listDir(logDirPath).foreach { status =>
+  val s = status.toString
+  logDebug(s)
+}
+  }
+}
+
+// stop the server with the old config, and start the new one
+server.stop()
+server = new HistoryServer(myConf, provider, securityManager, 18080)
+server.initialize()
+server.bind()
+try {
+  val port = server.boundPort
+  val metrics = server.cacheMetrics
+
+  // assert that a metric has a value; if not dump the whole metrics 
instance
+  def assertMetric(name: String, counter: Counter, expected: Long): 
Unit = {
+val actual = counter.getCount
+if (actual != expected) {
+  // this is here because Scalatest loses stack depth
+  fail(s"Wrong $name value - expected $expected but got $actual" +
+  s" in metrics\n$metrics")
+}
+  }
+
+  // build a URL for an app or app/attempt plus a page underneath
+  def buildURL(appId: String, suffix: String): URL = {
+new URL(s"http://localhost:$port/history/$appId$suffix;)
+  }
+  val historyServerRoot = new URL(s"http://localhost:$port/;)
+  val historyServerIncompleted = new URL(historyServerRoot, 
"?page=1=true")
+
+  // assert the body of a URL contains a string; return that body
+  def assertUrlContains(url: URL, str: String): String = {
+val body = HistoryServerSuite.getUrl(url)
+assert(body.contains(str), s"did not find $str at $url : $body")
+body
+  }
+
+  // start initial job
+  val d = sc.parallelize(1 to 10)
+  d.count()
+  val stdInterval = interval(100 milliseconds)
+  val appId = eventually(timeout(20 seconds), stdInterval) {
+val json = getContentAndCode("applications", port)._2.get
+val apps = parse(json).asInstanceOf[JArray].arr
+apps should have size 1
+(apps.head \ "id").extract[String]
+  }
+
+  // which lists as incomplete
+  assertUrlContains(historyServerIncompleted, appId)
+
+  val appIdRoot = buildURL(appId, "")
+  val rootAppPage = HistoryServerSuite.getUrl(appIdRoot)
+  logDebug(s"$appIdRoot ->[${rootAppPage.length}] \n$rootAppPage")
+  // sanity check to make sure filter is chaining calls
+  rootAppPage should not be empty
+
+  def getAppUI: SparkUI = {
+provider.getAppUI(appId, None).get.ui
+  }
+
+  // selenium isn't that useful on failures...add our own reporting
+  def getNumJobs(suffix: String): Int = {
+val target = buildURL(appId, suffix)
+val targetBody = HistoryServerSuite.getUrl(target)
+try {
+  go to target.toExternalForm
+  findAll(cssSelector("tbody 

[GitHub] spark pull request: [SPARK-7889] [CORE] HistoryServer to refresh c...

2015-12-14 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/6935#discussion_r47555335
  
--- Diff: 
core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala ---
@@ -281,6 +296,191 @@ class HistoryServerSuite extends SparkFunSuite with 
BeforeAndAfter with Matchers
 all (siteRelativeLinks) should startWith (uiRoot)
   }
 
+  test("incomplete apps get refreshed") {
+
+implicit val webDriver: WebDriver = new HtmlUnitDriver
+implicit val formats = org.json4s.DefaultFormats
+
+// this test dir is explictly deleted on successful runs; retained for 
diagnostics when
+// not
+val logDir = 
Utils.createDirectory(System.getProperty("java.io.tmpdir", "logs"))
+
+// a new conf is used with the background thread set and running at 
its fastest
+// alllowed refresh rate (1Hz)
+val myConf = new SparkConf()
+  .set("spark.history.fs.logDirectory", logDir.getAbsolutePath)
+  .set("spark.eventLog.dir", logDir.getAbsolutePath)
+  .set("spark.history.fs.update.interval", "1s")
+  .set("spark.eventLog.enabled", "true")
+  .set("spark.history.cache.window", "250ms")
+  .remove("spark.testing")
+val provider = new FsHistoryProvider(myConf)
+val securityManager = new SecurityManager(myConf)
+
+val sc = new SparkContext("local", "test", myConf)
+val logDirUri = logDir.toURI
+val logDirPath = new Path(logDirUri)
+val fs = FileSystem.get(logDirUri, sc.hadoopConfiguration)
+
+def listDir(dir: Path): Seq[FileStatus] = {
+  val statuses = fs.listStatus(dir)
+  statuses.flatMap(
+stat => if (stat.isDirectory) listDir(stat.getPath) else Seq(stat))
+}
+
+def dumpLogDir(msg: String = ""): Unit = {
+  if (log.isDebugEnabled) {
+logDebug(msg)
+listDir(logDirPath).foreach { status =>
+  val s = status.toString
+  logDebug(s)
+}
+  }
+}
+
+// stop the server with the old config, and start the new one
+server.stop()
+server = new HistoryServer(myConf, provider, securityManager, 18080)
+server.initialize()
+server.bind()
+try {
--- End diff --

there is room here for something to go wrong before you enter the try, and 
so you'd never clean up the spark context.  That could screw up other tests. I 
think the easiest way to handle this is to just use `LocalSparkContext`, which 
will automatically clean up its `sc` after each test.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7889] [CORE] HistoryServer to refresh c...

2015-12-14 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/6935#discussion_r47572920
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/ApplicationCache.scala ---
@@ -0,0 +1,658 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.util.NoSuchElementException
+
+import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
+import javax.servlet.{DispatcherType, Filter, FilterChain, FilterConfig, 
ServletException, ServletRequest, ServletResponse}
+
+import scala.collection.JavaConverters._
+
+import com.codahale.metrics.{Counter, MetricRegistry, Timer}
+import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache, 
RemovalListener, RemovalNotification}
+import org.eclipse.jetty.servlet.FilterHolder
+
+import org.apache.spark.Logging
+import org.apache.spark.metrics.source.Source
+import org.apache.spark.ui.SparkUI
+import org.apache.spark.util.Clock
+
+/**
+ * Cache for applications.
+ *
+ * Completed applications are cached for as long as there is capacity for 
them.
+ * Incompleted applications have their update time checked on every
+ * retrieval; if the cached entry is out of date, it is refreshed.
+ *
+ * @note there must be only one instance of [[ApplicationCache]] in a
+ * JVM at a time. This is because a static field in 
[[ApplicationCacheCheckFilterRelay]]
+ * keeps a reference to the cache so that HTTP requests on the 
attempt-specific web UIs
+ * can probe the current cache to see if the attempts have changed.
+ *
+ * Creating multiple instances will break this routing.
+ * @param operations implementation of record access operations
+ * @param refreshInterval interval between refreshes in milliseconds.
+ * @param retainedApplications number of retained applications
+ * @param clock time source
+ */
+private[history] class ApplicationCache(
+val operations: ApplicationCacheOperations,
+val refreshInterval: Long,
+val retainedApplications: Int,
+val clock: Clock) extends Logging {
+
+  /**
+   * Services the load request from the cache.
+   */
+  private val appLoader = new CacheLoader[CacheKey, CacheEntry] {
+
+/** the cache key doesn't match an cached entry ... attempt to load it 
 */
+override def load(key: CacheKey): CacheEntry = {
+  loadApplicationEntry(key.appId, key.attemptId)
+}
+
+  }
+
+  /**
+   * Handler for callbacks from the cache of entry removal.
+   */
+  private val removalListener = new RemovalListener[CacheKey, CacheEntry] {
+
+/**
+ * Removal event notifies the provider to detach the UI.
+ * @param rm removal notification
+ */
+override def onRemoval(rm: RemovalNotification[CacheKey, CacheEntry]): 
Unit = {
+  metrics.evictionCount.inc()
+  val key = rm.getKey
+  logDebug(s"Evicting entry ${key}")
+  operations.detachSparkUI(key.appId, key.attemptId, rm.getValue().ui)
+}
+  }
+
+  /**
+   * The cache of applications.
+   *
+   * Tagged as `protected` so as to allow subclasses in tests to accesss 
it directly
+   */
+  protected val appCache: LoadingCache[CacheKey, CacheEntry] = {
+CacheBuilder.newBuilder()
+.maximumSize(retainedApplications)
+.removalListener(removalListener)
+.build(appLoader)
+  }
+
+  /**
+   * The metrics which are updated as the cache is used
+   */
+  val metrics = new CacheMetrics("history.cache")
+
+  init()
+
+  /**
+   * Perform any startup operations.
+   *
+   * This includes declaring this instance as the cache to use in the
+   * [[ApplicationCacheCheckFilterRelay]].
+   */
+  private def init(): Unit = {
+ApplicationCacheCheckFilterRelay.setApplicationCache(this)
+  }
+
+  /**
+   * 

[GitHub] spark pull request: [SPARK-7889] [CORE] HistoryServer to refresh c...

2015-12-14 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/6935#discussion_r47573757
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/ApplicationCache.scala ---
@@ -0,0 +1,658 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.util.NoSuchElementException
+
+import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
+import javax.servlet.{DispatcherType, Filter, FilterChain, FilterConfig, 
ServletException, ServletRequest, ServletResponse}
+
+import scala.collection.JavaConverters._
+
+import com.codahale.metrics.{Counter, MetricRegistry, Timer}
+import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache, 
RemovalListener, RemovalNotification}
+import org.eclipse.jetty.servlet.FilterHolder
+
+import org.apache.spark.Logging
+import org.apache.spark.metrics.source.Source
+import org.apache.spark.ui.SparkUI
+import org.apache.spark.util.Clock
+
+/**
+ * Cache for applications.
+ *
+ * Completed applications are cached for as long as there is capacity for 
them.
+ * Incompleted applications have their update time checked on every
+ * retrieval; if the cached entry is out of date, it is refreshed.
+ *
+ * @note there must be only one instance of [[ApplicationCache]] in a
+ * JVM at a time. This is because a static field in 
[[ApplicationCacheCheckFilterRelay]]
+ * keeps a reference to the cache so that HTTP requests on the 
attempt-specific web UIs
+ * can probe the current cache to see if the attempts have changed.
+ *
+ * Creating multiple instances will break this routing.
+ * @param operations implementation of record access operations
+ * @param refreshInterval interval between refreshes in milliseconds.
+ * @param retainedApplications number of retained applications
+ * @param clock time source
+ */
+private[history] class ApplicationCache(
+val operations: ApplicationCacheOperations,
+val refreshInterval: Long,
+val retainedApplications: Int,
+val clock: Clock) extends Logging {
+
+  /**
+   * Services the load request from the cache.
+   */
+  private val appLoader = new CacheLoader[CacheKey, CacheEntry] {
+
+/** the cache key doesn't match an cached entry ... attempt to load it 
 */
+override def load(key: CacheKey): CacheEntry = {
+  loadApplicationEntry(key.appId, key.attemptId)
+}
+
+  }
+
+  /**
+   * Handler for callbacks from the cache of entry removal.
+   */
+  private val removalListener = new RemovalListener[CacheKey, CacheEntry] {
+
+/**
+ * Removal event notifies the provider to detach the UI.
+ * @param rm removal notification
+ */
+override def onRemoval(rm: RemovalNotification[CacheKey, CacheEntry]): 
Unit = {
+  metrics.evictionCount.inc()
+  val key = rm.getKey
+  logDebug(s"Evicting entry ${key}")
+  operations.detachSparkUI(key.appId, key.attemptId, rm.getValue().ui)
+}
+  }
+
+  /**
+   * The cache of applications.
+   *
+   * Tagged as `protected` so as to allow subclasses in tests to accesss 
it directly
+   */
+  protected val appCache: LoadingCache[CacheKey, CacheEntry] = {
+CacheBuilder.newBuilder()
+.maximumSize(retainedApplications)
+.removalListener(removalListener)
+.build(appLoader)
+  }
+
+  /**
+   * The metrics which are updated as the cache is used
+   */
+  val metrics = new CacheMetrics("history.cache")
+
+  init()
+
+  /**
+   * Perform any startup operations.
+   *
+   * This includes declaring this instance as the cache to use in the
+   * [[ApplicationCacheCheckFilterRelay]].
+   */
+  private def init(): Unit = {
+ApplicationCacheCheckFilterRelay.setApplicationCache(this)
+  }
+
+  /**
+   * 

[GitHub] spark pull request: [SPARK-7889] [CORE] HistoryServer to refresh c...

2015-12-14 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/6935#discussion_r47573952
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -371,9 +434,9 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 val bus = new ReplayListenerBus()
 val res = replay(fileStatus, bus)
 res match {
-  case Some(r) => logDebug(s"Application log ${r.logPath} loaded 
successfully.")
+  case Some(r) => logDebug(s"Application log ${r.logPath} loaded 
successfully: $r")
   case None => logWarning(s"Failed to load application log 
${fileStatus.getPath}. " +
-"The application may have not started.")
+  "The application may have not started.")
--- End diff --

original indentation was correct


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7889] [CORE] HistoryServer to refresh c...

2015-12-14 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/6935#discussion_r47574113
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -430,8 +517,55 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 }
 newIterator.foreach(addIfAbsent)
 oldIterator.foreach(addIfAbsent)
+mergedApps
+  }
+
+
+  /**
+   * Scan through all known incomplete applications, and check for any 
that have been updated.
+   *
+   * After the scan, if there were any updated attempts, [[applications]] 
is updated
+   * with the new values.
+   *
+   * 1. No attempt to replay the application is made; this scan is a low 
cost operation.
+   * 2. As this overwrites [[applications]] with a new value, it must not 
run concurrently
+   * with the main scan for new applications. That is: it must be in the 
[[checkForLogs()]]
+   * operation.
+   */
+  private[history] def scanAndUpdateIncompleteAttemptInfo(): Unit = {
+val newAttempts: Iterable[FsApplicationAttemptInfo] = applications
+.filter( e => !e._2.completed)
+.flatMap { e =>
+  // build list of (false, attempt) or (true, attempt') values
+  e._2.attempts.flatMap { prevInfo: FsApplicationAttemptInfo =>
+val path = new Path(logDir, prevInfo.logPath)
+try {
+  val status = fs.getFileStatus(path)
+  val size = getLogSize(status).getOrElse(-1L)
+  val aS = prevInfo.fileSize
+  if (size > aS) {
+logDebug(s"Attempt ${prevInfo.name}/${prevInfo.appId} size 
=> $size")
+Some(new FsApplicationAttemptInfo(prevInfo.logPath, 
prevInfo.name, prevInfo.appId,
+  prevInfo.attemptId, prevInfo.startTime, 
prevInfo.endTime, prevInfo.lastUpdated,
+  prevInfo.sparkUser, prevInfo.completed, size, 
attemptCounter.incrementAndGet()))
--- End diff --

since you're only using filesize now, do you need the attemptCounter?  
Can't you just use filesize?  It seems like it will work exactly the same.  And 
you still let other providers use something else entirely, since the counter 
was also local to FsHistoryProvider


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7889] [CORE] HistoryServer to refresh c...

2015-12-14 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/6935#discussion_r47574767
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala ---
@@ -103,7 +93,7 @@ class HistoryServer(
   // Note we don't use the UI retrieved from the cache; the cache 
loader above will register
   // the app's UI, and all we need to do is redirect the user to the 
same URI that was
   // requested, and the proper data should be served at that point.
-  res.sendRedirect(res.encodeRedirectURL(req.getRequestURI()))
+  res.sendRedirect(res.encodeRedirectURL(req.getRequestURI() + "/ui"))
--- End diff --

why "/ui"?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7889] [CORE] HistoryServer to refresh c...

2015-12-14 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/6935#discussion_r47574667
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -678,6 +826,52 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 method.invoke(dfs, action).asInstanceOf[Boolean]
   }
 
+  /**
+   * String description for diagnostics
+   * @return a summary of the component state
+   */
+  override def toString: String = {
+val header = s"""
+  | FsHistoryProvider: logdir=$logDir,
+  | last scan time=$lastScanTime
+  | Cached application count =${applications.size}}
+""".stripMargin
+val sb = new StringBuilder(header)
+applications.foreach(entry => sb.append(entry._2).append("\n"))
+sb.toString
+  }
+
+  /**
+   * Probe for an update to an (incompleted) application.
+   *
+   * @param appId application ID
+   * @param attemptId optional attempt ID
+   * @param state an [[FsHistoryProviderUpdateState]] instance
+   * @return true if the attempt has been updated.
+   */
+  override def isUpdated(
+  appId: String,
+  attemptId: Option[String],
+  state: Option[HistoryProviderUpdateState]): Boolean = {
+val updateState = state.get.asInstanceOf[FsHistoryProviderUpdateState]
--- End diff --

I still don't love this.  I haven't taken a look at the integration w/ the 
ATS yet, but I feel that (a) as far as this pr is concerned, you could just 
change the type of `state` to be `FsHistoryProviderUpdateState` directly (or 
perhaps eliminate that abstraction completely and just make it the 
attemptCounter / filesize). you can always add this when you need it, since its 
not buying us anything here. and (b) instanceOf is a code smell ... if we 
really want it, we should use generics.  at the very least, I'd add a comment 
that we know the type of `state` because it originally came from this provider, 
and is just making its way back to the same fshistoryprovider.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7889] [CORE] HistoryServer to refresh c...

2015-12-14 Thread squito
Github user squito commented on the pull request:

https://github.com/apache/spark/pull/6935#issuecomment-164591472
  
thanks for updating again @steveloughran .  Sorry for the long break in my 
reviews.  Looks really close, I think I just found one bug with the redirects.  
Other than that smaller style stuff.

This has turned out to be a lot more complicated than it seemed initially, 
thanks for sticking with it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7889] [CORE] HistoryServer to refresh c...

2015-12-09 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/6935#issuecomment-163168780
  
**[Test build #47424 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/47424/consoleFull)**
 for PR 6935 at commit 
[`9b76c44`](https://github.com/apache/spark/commit/9b76c44f8eefe2a4a6aab5b63724f92b5575b5f6).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7889] [CORE] HistoryServer to refresh c...

2015-12-09 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/6935#issuecomment-163196873
  
**[Test build #47424 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/47424/consoleFull)**
 for PR 6935 at commit 
[`9b76c44`](https://github.com/apache/spark/commit/9b76c44f8eefe2a4a6aab5b63724f92b5575b5f6).
 * This patch **fails PySpark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7889] [CORE] HistoryServer to refresh c...

2015-12-09 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/6935#issuecomment-163196933
  
Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7889] [CORE] HistoryServer to refresh c...

2015-12-09 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/6935#issuecomment-163196935
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/47424/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7889] [CORE] HistoryServer to refresh c...

2015-12-09 Thread steveloughran
Github user steveloughran commented on the pull request:

https://github.com/apache/spark/pull/6935#issuecomment-163203560
  
Failing test is pyspark —pretty unlikely to be related
```
==
FAIL: test_count_by_value_and_window (__main__.WindowFunctionTests)
--
Traceback (most recent call last):
  File 
"/home/jenkins/workspace/SparkPullRequestBuilder/python/pyspark/streaming/tests.py",
 line 635, in test_count_by_value_and_window
self._test_func(input, func, expected)
  File 
"/home/jenkins/workspace/SparkPullRequestBuilder/python/pyspark/streaming/tests.py",
 line 160, in _test_func
self.assertEqual(expected, result)
AssertionError: Lists differ: [[1], [2], [3], [4], [5], [6], [6], [6], [6], 
[6]] != [[1], [2], [3], [4], [5], [6]]

First list contains 4 additional elements.
First extra element 6:
[6]

- [[1], [2], [3], [4], [5], [6], [6], [6], [6], [6]]
?   

+ [[1], [2], [3], [4], [5], [6]]

--
Ran 7 tests in 43.333s
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7889] [CORE] HistoryServer to refresh c...

2015-12-08 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/6935#discussion_r46989032
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/ApplicationCache.scala ---
@@ -0,0 +1,648 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.util.NoSuchElementException
+
+import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
+import javax.servlet.{DispatcherType, Filter, FilterChain, FilterConfig, 
ServletException, ServletRequest, ServletResponse}
+
+import scala.collection.JavaConverters._
+
+import com.codahale.metrics.{Counter, MetricRegistry, Timer}
+import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache, 
RemovalListener, RemovalNotification}
+import org.eclipse.jetty.servlet.FilterHolder
+
+import org.apache.spark.Logging
+import org.apache.spark.metrics.source.Source
+import org.apache.spark.ui.SparkUI
+import org.apache.spark.util.Clock
+
+/**
+ * Cache for applications.
+ *
+ * Completed applications are cached for as long as there is capacity for 
them.
+ * Incompleted applications have their update time checked on every
+ * retrieval; if the cached entry is out of date, it is refreshed.
+ *
+ * @note there must be only one instance of [[ApplicationCache]] in a
+ * JVM at a time. This is because a static field in 
[[ApplicationCacheCheckFilterRelay]]
+ * keeps a reference to the cache so that HTTP requests on the 
attempt-specific web UIs
+ * can probe the current cache to see if the attempts have changed.
+ *
+ * Creating multiple instances will break this routing.
+ * @param operations implementation of record access operations
+ * @param refreshInterval interval between refreshes in milliseconds.
+ * @param retainedApplications number of retained applications
+ * @param clock time source
+ */
+private[history] class ApplicationCache(val operations: 
ApplicationCacheOperations,
+val refreshInterval: Long,
+val retainedApplications: Int,
+val clock: Clock) extends Logging {
+
+  /**
+   * Services the load request from the cache.
+   */
+  private val appLoader = new CacheLoader[CacheKey, CacheEntry] {
+
+/** the cache key doesn't match an cached entry ... attempt to load it 
 */
+override def load(key: CacheKey): CacheEntry = {
+  loadApplicationEntry(key.appId, key.attemptId)
+}
+
+  }
+
+  /**
+   * Handler for callbacks from the cache of entry removal.
+   */
+  private val removalListener = new RemovalListener[CacheKey, CacheEntry] {
+
+/**
+ * Removal event notifies the provider to detach the UI.
+ * @param rm removal notification
+ */
+override def onRemoval(rm: RemovalNotification[CacheKey, CacheEntry]): 
Unit = {
+  metrics.evictionCount.inc()
+  val key = rm.getKey
+  logDebug(s"Evicting entry ${key}")
+  operations.detachSparkUI(key.appId, key.attemptId, rm.getValue().ui)
+}
+  }
+
+  /**
+   * The cache of applications.
+   *
+   * Tagged as `protected` so as to allow subclasses in tests to accesss 
it directly
+   */
+  protected val appCache: LoadingCache[CacheKey, CacheEntry] = 
CacheBuilder.newBuilder()
+  .maximumSize(retainedApplications)
+  .removalListener(removalListener)
+  .build(appLoader)
+
+  /**
+   * The metrics which are updated as the cache is used
+   */
+  val metrics = new CacheMetrics("history.cache")
+
+  init()
+
+  /**
+   * Perform any startup operations.
+   *
+   * This includes declaring this instance as the cache to use in the
+   * [[ApplicationCacheCheckFilterRelay]].
+   */
+  private def init(): Unit = {
+ApplicationCacheCheckFilterRelay.setApplicationCache(this)
+  }
+
+  /**
+   * Stop the cache.
+   * This will 

[GitHub] spark pull request: [SPARK-7889] [CORE] HistoryServer to refresh c...

2015-12-08 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/6935#discussion_r46993661
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/ApplicationCache.scala ---
@@ -0,0 +1,648 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.util.NoSuchElementException
+
+import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
+import javax.servlet.{DispatcherType, Filter, FilterChain, FilterConfig, 
ServletException, ServletRequest, ServletResponse}
+
+import scala.collection.JavaConverters._
+
+import com.codahale.metrics.{Counter, MetricRegistry, Timer}
+import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache, 
RemovalListener, RemovalNotification}
+import org.eclipse.jetty.servlet.FilterHolder
+
+import org.apache.spark.Logging
+import org.apache.spark.metrics.source.Source
+import org.apache.spark.ui.SparkUI
+import org.apache.spark.util.Clock
+
+/**
+ * Cache for applications.
+ *
+ * Completed applications are cached for as long as there is capacity for 
them.
+ * Incompleted applications have their update time checked on every
+ * retrieval; if the cached entry is out of date, it is refreshed.
+ *
+ * @note there must be only one instance of [[ApplicationCache]] in a
+ * JVM at a time. This is because a static field in 
[[ApplicationCacheCheckFilterRelay]]
+ * keeps a reference to the cache so that HTTP requests on the 
attempt-specific web UIs
+ * can probe the current cache to see if the attempts have changed.
+ *
+ * Creating multiple instances will break this routing.
+ * @param operations implementation of record access operations
+ * @param refreshInterval interval between refreshes in milliseconds.
+ * @param retainedApplications number of retained applications
+ * @param clock time source
+ */
+private[history] class ApplicationCache(val operations: 
ApplicationCacheOperations,
+val refreshInterval: Long,
+val retainedApplications: Int,
+val clock: Clock) extends Logging {
+
+  /**
+   * Services the load request from the cache.
+   */
+  private val appLoader = new CacheLoader[CacheKey, CacheEntry] {
+
+/** the cache key doesn't match an cached entry ... attempt to load it 
 */
+override def load(key: CacheKey): CacheEntry = {
+  loadApplicationEntry(key.appId, key.attemptId)
+}
+
+  }
+
+  /**
+   * Handler for callbacks from the cache of entry removal.
+   */
+  private val removalListener = new RemovalListener[CacheKey, CacheEntry] {
+
+/**
+ * Removal event notifies the provider to detach the UI.
+ * @param rm removal notification
+ */
+override def onRemoval(rm: RemovalNotification[CacheKey, CacheEntry]): 
Unit = {
+  metrics.evictionCount.inc()
+  val key = rm.getKey
+  logDebug(s"Evicting entry ${key}")
+  operations.detachSparkUI(key.appId, key.attemptId, rm.getValue().ui)
+}
+  }
+
+  /**
+   * The cache of applications.
+   *
+   * Tagged as `protected` so as to allow subclasses in tests to accesss 
it directly
+   */
+  protected val appCache: LoadingCache[CacheKey, CacheEntry] = 
CacheBuilder.newBuilder()
+  .maximumSize(retainedApplications)
+  .removalListener(removalListener)
+  .build(appLoader)
+
+  /**
+   * The metrics which are updated as the cache is used
+   */
+  val metrics = new CacheMetrics("history.cache")
+
+  init()
+
+  /**
+   * Perform any startup operations.
+   *
+   * This includes declaring this instance as the cache to use in the
+   * [[ApplicationCacheCheckFilterRelay]].
+   */
+  private def init(): Unit = {
+ApplicationCacheCheckFilterRelay.setApplicationCache(this)
+  }
+
+  /**
+   * Stop the cache.
+   * 

[GitHub] spark pull request: [SPARK-7889] [CORE] HistoryServer to refresh c...

2015-12-08 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/6935#discussion_r46995257
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/ApplicationCache.scala ---
@@ -0,0 +1,648 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.util.NoSuchElementException
+
+import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
+import javax.servlet.{DispatcherType, Filter, FilterChain, FilterConfig, 
ServletException, ServletRequest, ServletResponse}
+
+import scala.collection.JavaConverters._
+
+import com.codahale.metrics.{Counter, MetricRegistry, Timer}
+import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache, 
RemovalListener, RemovalNotification}
+import org.eclipse.jetty.servlet.FilterHolder
+
+import org.apache.spark.Logging
+import org.apache.spark.metrics.source.Source
+import org.apache.spark.ui.SparkUI
+import org.apache.spark.util.Clock
+
+/**
+ * Cache for applications.
+ *
+ * Completed applications are cached for as long as there is capacity for 
them.
+ * Incompleted applications have their update time checked on every
+ * retrieval; if the cached entry is out of date, it is refreshed.
+ *
+ * @note there must be only one instance of [[ApplicationCache]] in a
+ * JVM at a time. This is because a static field in 
[[ApplicationCacheCheckFilterRelay]]
+ * keeps a reference to the cache so that HTTP requests on the 
attempt-specific web UIs
+ * can probe the current cache to see if the attempts have changed.
+ *
+ * Creating multiple instances will break this routing.
+ * @param operations implementation of record access operations
+ * @param refreshInterval interval between refreshes in milliseconds.
+ * @param retainedApplications number of retained applications
+ * @param clock time source
+ */
+private[history] class ApplicationCache(val operations: 
ApplicationCacheOperations,
+val refreshInterval: Long,
+val retainedApplications: Int,
+val clock: Clock) extends Logging {
+
+  /**
+   * Services the load request from the cache.
+   */
+  private val appLoader = new CacheLoader[CacheKey, CacheEntry] {
+
+/** the cache key doesn't match an cached entry ... attempt to load it 
 */
+override def load(key: CacheKey): CacheEntry = {
+  loadApplicationEntry(key.appId, key.attemptId)
+}
+
+  }
+
+  /**
+   * Handler for callbacks from the cache of entry removal.
+   */
+  private val removalListener = new RemovalListener[CacheKey, CacheEntry] {
+
+/**
+ * Removal event notifies the provider to detach the UI.
+ * @param rm removal notification
+ */
+override def onRemoval(rm: RemovalNotification[CacheKey, CacheEntry]): 
Unit = {
+  metrics.evictionCount.inc()
+  val key = rm.getKey
+  logDebug(s"Evicting entry ${key}")
+  operations.detachSparkUI(key.appId, key.attemptId, rm.getValue().ui)
+}
+  }
+
+  /**
+   * The cache of applications.
+   *
+   * Tagged as `protected` so as to allow subclasses in tests to accesss 
it directly
+   */
+  protected val appCache: LoadingCache[CacheKey, CacheEntry] = 
CacheBuilder.newBuilder()
+  .maximumSize(retainedApplications)
+  .removalListener(removalListener)
+  .build(appLoader)
+
+  /**
+   * The metrics which are updated as the cache is used
+   */
+  val metrics = new CacheMetrics("history.cache")
+
+  init()
+
+  /**
+   * Perform any startup operations.
+   *
+   * This includes declaring this instance as the cache to use in the
+   * [[ApplicationCacheCheckFilterRelay]].
+   */
+  private def init(): Unit = {
+ApplicationCacheCheckFilterRelay.setApplicationCache(this)
+  }
+
+  /**
+   * Stop the cache.
+   * This will 

[GitHub] spark pull request: [SPARK-7889] [CORE] HistoryServer to refresh c...

2015-12-08 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/6935#discussion_r46995458
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
 ---
@@ -73,4 +101,17 @@ private[history] abstract class 
ApplicationHistoryProvider {
   @throws(classOf[SparkException])
   def writeEventLogs(appId: String, attemptId: Option[String], zipStream: 
ZipOutputStream): Unit
 
+  /**
+   * Probe for an update to an (incompleted) application
+   * @param appId application ID
+   * @param attemptId optional attempt ID
+   * @param updateTimeMillis time in milliseconds to use as the threshold 
for an update.
+   * @param data any other data the operations implementation can use to 
determine age
+   * @return true if the application was updated since `updateTimeMillis`
+   */
+  def isUpdated(appId: String, attemptId: Option[String], 
updateTimeMillis: Long,
+  data: Option[HistoryProviderUpdateState]): Boolean = {
--- End diff --

nit: each arg on its own line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7889] [CORE] HistoryServer to refresh c...

2015-12-08 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/6935#discussion_r46995405
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
 ---
@@ -33,7 +33,35 @@ private[spark] case class ApplicationAttemptInfo(
 private[spark] case class ApplicationHistoryInfo(
 id: String,
 name: String,
-attempts: List[ApplicationAttemptInfo])
+attempts: List[ApplicationAttemptInfo]) {
+
+  /**
+   * Has this application completed?
+   * @return true if the most recent attempt has completed
+   */
+  def completed: Boolean = {
+attempts.nonEmpty && attempts.head.completed
+  }
+}
+
+/**
+ * Case class which can be subclasses by any provider to store information 
which
+ * can then be used to determine whether an application attempt has been 
updated
+ * since the last time it was retrieved.
+ *
+ * Examples include: a timestamp, a counter or a checksum.
+ */
+private[history] case class HistoryProviderUpdateState()
+
+/**
+ * All the information returned from a call to getAppUI
+ * @param ui Spark UI
+ * @param timestamp timestamp of the loaded data
+ * @param updateState any provider-specific update state
+ */
+private[history] case class LoadedAppUI(ui: SparkUI,
--- End diff --

nit: each arg on its own line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7889] [CORE] HistoryServer to refresh c...

2015-12-08 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/6935#discussion_r46995031
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/ApplicationCache.scala ---
@@ -0,0 +1,648 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.util.NoSuchElementException
+
+import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
+import javax.servlet.{DispatcherType, Filter, FilterChain, FilterConfig, 
ServletException, ServletRequest, ServletResponse}
+
+import scala.collection.JavaConverters._
+
+import com.codahale.metrics.{Counter, MetricRegistry, Timer}
+import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache, 
RemovalListener, RemovalNotification}
+import org.eclipse.jetty.servlet.FilterHolder
+
+import org.apache.spark.Logging
+import org.apache.spark.metrics.source.Source
+import org.apache.spark.ui.SparkUI
+import org.apache.spark.util.Clock
+
+/**
+ * Cache for applications.
+ *
+ * Completed applications are cached for as long as there is capacity for 
them.
+ * Incompleted applications have their update time checked on every
+ * retrieval; if the cached entry is out of date, it is refreshed.
+ *
+ * @note there must be only one instance of [[ApplicationCache]] in a
+ * JVM at a time. This is because a static field in 
[[ApplicationCacheCheckFilterRelay]]
+ * keeps a reference to the cache so that HTTP requests on the 
attempt-specific web UIs
+ * can probe the current cache to see if the attempts have changed.
+ *
+ * Creating multiple instances will break this routing.
+ * @param operations implementation of record access operations
+ * @param refreshInterval interval between refreshes in milliseconds.
+ * @param retainedApplications number of retained applications
+ * @param clock time source
+ */
+private[history] class ApplicationCache(val operations: 
ApplicationCacheOperations,
+val refreshInterval: Long,
+val retainedApplications: Int,
+val clock: Clock) extends Logging {
+
+  /**
+   * Services the load request from the cache.
+   */
+  private val appLoader = new CacheLoader[CacheKey, CacheEntry] {
+
+/** the cache key doesn't match an cached entry ... attempt to load it 
 */
+override def load(key: CacheKey): CacheEntry = {
+  loadApplicationEntry(key.appId, key.attemptId)
+}
+
+  }
+
+  /**
+   * Handler for callbacks from the cache of entry removal.
+   */
+  private val removalListener = new RemovalListener[CacheKey, CacheEntry] {
+
+/**
+ * Removal event notifies the provider to detach the UI.
+ * @param rm removal notification
+ */
+override def onRemoval(rm: RemovalNotification[CacheKey, CacheEntry]): 
Unit = {
+  metrics.evictionCount.inc()
+  val key = rm.getKey
+  logDebug(s"Evicting entry ${key}")
+  operations.detachSparkUI(key.appId, key.attemptId, rm.getValue().ui)
+}
+  }
+
+  /**
+   * The cache of applications.
+   *
+   * Tagged as `protected` so as to allow subclasses in tests to accesss 
it directly
+   */
+  protected val appCache: LoadingCache[CacheKey, CacheEntry] = 
CacheBuilder.newBuilder()
--- End diff --

that was lifted as-is from HistoryServer. I'll fix it here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7889] [CORE] HistoryServer to refresh c...

2015-12-08 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/6935#discussion_r46995495
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/ApplicationCache.scala ---
@@ -0,0 +1,579 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.util.NoSuchElementException
+
+import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
+import javax.servlet.{DispatcherType, Filter, FilterChain, FilterConfig, 
ServletException, ServletRequest, ServletResponse}
+
+import scala.collection.JavaConverters._
+
+import com.codahale.metrics.{Counter, MetricRegistry, Timer}
+import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache, 
RemovalListener, RemovalNotification}
+import com.google.common.util.concurrent.ListenableFuture
+import org.eclipse.jetty.servlet.FilterHolder
+
+import org.apache.spark.Logging
+import org.apache.spark.metrics.source.Source
+import org.apache.spark.ui.SparkUI
+import org.apache.spark.util.Clock
+
+/**
+ * Cache for applications.
+ *
+ * Completed applications are cached for as long as there is capacity for 
them.
+ * Incompleted applications have their update time checked on every
+ * retrieval; if the cached entry is out of date, it is refreshed.
+ *
+ * @param operations implementation of record access operations
+ * @param refreshInterval interval between refreshes in milliseconds.
+ * @param retainedApplications number of retained applications
+ * @param clock time source
+ */
+private[history] class ApplicationCache(val operations: 
ApplicationCacheOperations,
+val refreshInterval: Long,
+val retainedApplications: Int,
+val clock: Clock) extends Logging {
+
+  /**
+   * Services the load request from the cache.
+   */
+  private val appLoader = new CacheLoader[CacheKey, CacheEntry] {
+
+/** the cache key doesn't match an cached entry ... attempt to load it 
 */
+override def load(key: CacheKey): CacheEntry = {
+  loadApplicationEntry(key.appId, key.attemptId)
+}
+
+override def reload(key: CacheKey, oldValue: CacheEntry): 
ListenableFuture[CacheEntry] = super
+.reload(key, oldValue)
+  }
+
+  /**
+   * Handler for callbacks from the cache of entry removal.
+   */
+  private val removalListener = new RemovalListener[CacheKey, CacheEntry] {
+
+/**
+ * Removal event notifies the provider to detach the UI.
+ * @param rm removal notification
+ */
+override def onRemoval(rm: RemovalNotification[CacheKey, CacheEntry]): 
Unit = {
+  metrics.evictionCount.inc()
+  val key = rm.getKey
+  logDebug(s"Evicting entry ${key}")
+  operations.detachSparkUI(key.appId, key.attemptId, rm.getValue().ui)
+}
+  }
+
+  /**
+   * The cache of applications.
+   * Tagged as protected so as to allow subclasses in tests to accesss it 
directly
+   */
+  protected val appCache: LoadingCache[CacheKey, CacheEntry] = 
CacheBuilder.newBuilder()
+  .maximumSize(retainedApplications)
+  .removalListener(removalListener)
+  .build(appLoader)
+
+  /**
+   * The metrics which are updated as the cache is used
+   */
+  val metrics = new CacheMetrics("history.cache")
+
+  init()
+
+  /**
+   * Perform any startup operations. This includes declaring this instance
+   * as the cache to use in the [[ApplicationCacheCheckFilterRelay]].
+   */
+  private def init(): Unit = {
+ApplicationCacheCheckFilterRelay.setApplicationCache(this)
+  }
+
+  /** stop the cache */
+  def stop(): Unit = {
+ApplicationCacheCheckFilterRelay.resetApplicationCache()
+  }
+
+  /**
+   * Get an entry. Cache fetch/refresh will have taken place by
+   * the time this method returns.
+   * @param 

[GitHub] spark pull request: [SPARK-7889] [CORE] HistoryServer to refresh c...

2015-12-08 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/6935#discussion_r46997833
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -678,6 +827,54 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 method.invoke(dfs, action).asInstanceOf[Boolean]
   }
 
+  /**
+   * String description for diagnostics
+   * @return a summary of the component staet
--- End diff --

typo: state


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7889] [CORE] HistoryServer to refresh c...

2015-12-08 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/6935#discussion_r46998222
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala ---
@@ -146,22 +136,51 @@ class HistoryServer(
   override def stop() {
 super.stop()
 provider.stop()
+appCache.stop()
   }
 
   /** Attach a reconstructed UI to this server. Only valid after bind(). */
-  private def attachSparkUI(ui: SparkUI) {
+  override def attachSparkUI(appId: String,
--- End diff --

nit: each arg on its own line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7889] [CORE] HistoryServer to refresh c...

2015-12-08 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/6935#discussion_r46999417
  
--- Diff: 
core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala ---
@@ -281,6 +296,202 @@ class HistoryServerSuite extends SparkFunSuite with 
BeforeAndAfter with Matchers
 all (siteRelativeLinks) should startWith (uiRoot)
   }
 
+  test("incomplete apps get refreshed") {
+
+implicit val webDriver: WebDriver = new HtmlUnitDriver
+implicit val formats = org.json4s.DefaultFormats
+
+// this test dir is explictly deleted on successful runs; retained for 
diagnostics when
+// not
+val logDir = 
Utils.createDirectory(System.getProperty("java.io.tmpdir", "logs"))
+
+// a new conf is used with the background thread set and running at 
its fastest
+// alllowed refresh rate (1Hz)
+val myConf = new SparkConf()
+  .set("spark.history.fs.logDirectory", logDir.getAbsolutePath)
+  .set("spark.eventLog.dir", logDir.getAbsolutePath)
+  .set("spark.history.fs.update.interval", "1s")
+  .set("spark.eventLog.enabled", "true")
+  .set("spark.history.cache.window", "250ms")
+  .remove("spark.testing")
+val provider = new FsHistoryProvider(myConf)
+val securityManager = new SecurityManager(myConf)
+
+val sc = new SparkContext("local", "test", myConf)
+val logDirUri = logDir.toURI
+val logDirPath = new Path(logDirUri)
+val fs = FileSystem.get(logDirUri, sc.hadoopConfiguration)
+
+def listDir(dir: Path): Seq[FileStatus] = {
+  val statuses = fs.listStatus(dir)
+  statuses.flatMap(
+stat => if (stat.isDirectory) listDir(stat.getPath) else Seq(stat))
+}
+
+def dumpLogDir(msg: String = ""): Unit = {
+  if (log.isDebugEnabled) {
+logDebug(msg)
+listDir(logDirPath).foreach { status =>
+  val s = status.toString
+  logDebug(s)
+}
+  }
+}
+
+// stop the server with the old config, and start the new one
+server.stop()
+server = new HistoryServer(myConf, provider, securityManager, 18080)
+server.initialize()
+server.bind()
+try {
+  val port = server.boundPort
+  val metrics = server.cacheMetrics
+
+  // assert that a metric has a value; if not dump the whole metrics 
instance
+  def assertMetric(name: String, counter: Counter, expected: Long): 
Unit = {
+val actual = counter.getCount
+if (actual != expected) {
+  // this is here because Scalatest loses stack depth
+  throw new scala.Exception(s"Wrong $name value - expected 
$expected but got $actual" +
--- End diff --

this is weird. (a) why `scala.Exception` (b) I don't understand the comment 
on losing stack depth -- isn't this basically exactly the same as 
`assert(actual != expected, "...")`?  It seems you only remove one level, I 
can't imagine that matters?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7889] [CORE] HistoryServer to refresh c...

2015-12-08 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/6935#discussion_r46997864
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -678,6 +827,54 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 method.invoke(dfs, action).asInstanceOf[Boolean]
   }
 
+  /**
+   * String description for diagnostics
+   * @return a summary of the component staet
+   */
+  override def toString: String = {
+val header = s"""
+  | FsHistoryProvider: logdir=$logDir,
+  | last scan time=$lastScanTime
+  | Cached application count =${applications.size}}
+  |
+""".stripMargin
+val sb = new StringBuilder(header)
+applications.foreach(entry => sb.append(entry._2).append("\n"))
+sb.toString
+  }
+
+  /**
+   * Probe for an update to an (incompleted) application.
+   *
+   * @param appId application ID
+   * @param attemptId optional attempt ID
+   * @param updateTimeMillis time in milliseconds to use as the threshold 
for an update.
+   * @param data an [[FsHistoryProviderUpdateState]]
+   * @return true if the attempt has been updated.
+   */
+  override def isUpdated(appId: String,
--- End diff --

nit: each arg on its own line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7889] [CORE] HistoryServer to refresh c...

2015-12-08 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/6935#discussion_r47004101
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -678,6 +827,54 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 method.invoke(dfs, action).asInstanceOf[Boolean]
   }
 
+  /**
+   * String description for diagnostics
+   * @return a summary of the component staet
+   */
+  override def toString: String = {
+val header = s"""
+  | FsHistoryProvider: logdir=$logDir,
+  | last scan time=$lastScanTime
+  | Cached application count =${applications.size}}
+  |
+""".stripMargin
+val sb = new StringBuilder(header)
+applications.foreach(entry => sb.append(entry._2).append("\n"))
+sb.toString
+  }
+
+  /**
+   * Probe for an update to an (incompleted) application.
+   *
+   * @param appId application ID
+   * @param attemptId optional attempt ID
+   * @param updateTimeMillis time in milliseconds to use as the threshold 
for an update.
+   * @param data an [[FsHistoryProviderUpdateState]]
+   * @return true if the attempt has been updated.
+   */
+  override def isUpdated(appId: String,
+  attemptId: Option[String],
+  updateTimeMillis: Long,
+  data: Option[HistoryProviderUpdateState]): Boolean = {
+val updateState = data.get.asInstanceOf[FsHistoryProviderUpdateState]
+lookup(appId, attemptId) match {
+  case None =>
+logDebug(s"Application Attempt $appId/$attemptId not found")
+false
+  case Some(attempt) =>
+updateState.version < attempt.version
--- End diff --

`updateTimeMillis` isn't used here at all.  I'm not sure if it actually 
needs to be checked, though -- it seems like just relying on versions is 
enough?  in that case, you could remove it completely from `isUpdated`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7889] [CORE] HistoryServer to refresh c...

2015-12-08 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/6935#discussion_r47006645
  
--- Diff: 
core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala ---
@@ -281,6 +296,202 @@ class HistoryServerSuite extends SparkFunSuite with 
BeforeAndAfter with Matchers
 all (siteRelativeLinks) should startWith (uiRoot)
   }
 
+  test("incomplete apps get refreshed") {
+
+implicit val webDriver: WebDriver = new HtmlUnitDriver
+implicit val formats = org.json4s.DefaultFormats
+
+// this test dir is explictly deleted on successful runs; retained for 
diagnostics when
+// not
+val logDir = 
Utils.createDirectory(System.getProperty("java.io.tmpdir", "logs"))
+
+// a new conf is used with the background thread set and running at 
its fastest
+// alllowed refresh rate (1Hz)
+val myConf = new SparkConf()
+  .set("spark.history.fs.logDirectory", logDir.getAbsolutePath)
+  .set("spark.eventLog.dir", logDir.getAbsolutePath)
+  .set("spark.history.fs.update.interval", "1s")
+  .set("spark.eventLog.enabled", "true")
+  .set("spark.history.cache.window", "250ms")
+  .remove("spark.testing")
+val provider = new FsHistoryProvider(myConf)
+val securityManager = new SecurityManager(myConf)
+
+val sc = new SparkContext("local", "test", myConf)
+val logDirUri = logDir.toURI
+val logDirPath = new Path(logDirUri)
+val fs = FileSystem.get(logDirUri, sc.hadoopConfiguration)
+
+def listDir(dir: Path): Seq[FileStatus] = {
+  val statuses = fs.listStatus(dir)
+  statuses.flatMap(
+stat => if (stat.isDirectory) listDir(stat.getPath) else Seq(stat))
+}
+
+def dumpLogDir(msg: String = ""): Unit = {
+  if (log.isDebugEnabled) {
+logDebug(msg)
+listDir(logDirPath).foreach { status =>
+  val s = status.toString
+  logDebug(s)
+}
+  }
+}
+
+// stop the server with the old config, and start the new one
+server.stop()
+server = new HistoryServer(myConf, provider, securityManager, 18080)
+server.initialize()
+server.bind()
+try {
+  val port = server.boundPort
+  val metrics = server.cacheMetrics
+
+  // assert that a metric has a value; if not dump the whole metrics 
instance
+  def assertMetric(name: String, counter: Counter, expected: Long): 
Unit = {
+val actual = counter.getCount
+if (actual != expected) {
+  // this is here because Scalatest loses stack depth
+  throw new scala.Exception(s"Wrong $name value - expected 
$expected but got $actual" +
+  s" in metrics\n$metrics")
+}
+  }
+
+  // build a URL for an app or app/attempt plus a page underneath
+  def buildURL(appId: String, suffix: String): URL = {
+new URL(s"http://localhost:$port/history/$appId$suffix;)
+  }
+  val historyServerRoot = new URL(s"http://localhost:$port/;)
+  val historyServerIncompleted = new URL(historyServerRoot, 
"?page=1=true")
+
+  // assert the body of a URL contains a string; return that body
+  def assertUrlContains(url: URL, str: String): String = {
+val body = HistoryServerSuite.getUrl(url)
+assert(body.contains(str), s"did not find $str at $url : $body")
+body
+  }
+
+  // start initial job
+  val d = sc.parallelize(1 to 10)
+  d.count()
+  val stdInterval = interval(100 milliseconds)
+  val appId = eventually(timeout(20 seconds), stdInterval) {
+val json = getContentAndCode("applications", port)._2.get
+val apps = parse(json).asInstanceOf[JArray].arr
+apps should have size 1
+(apps.head \ "id").extract[String]
+  }
+
+  // which lists as incomplete
+  assertUrlContains(historyServerIncompleted, appId)
+
+  val appIdRoot = buildURL(appId, "")
+  val rootAppPage = HistoryServerSuite.getUrl(appIdRoot)
+  logDebug(s"$appIdRoot ->[${rootAppPage.length}] \n$rootAppPage")
+  // sanity check to make sure filter is chaining calls
+  rootAppPage should not be empty
+
+  def getAppUI: SparkUI = {
+provider.getAppUI(appId, None).get.ui
+  }
+
+  // selenium isn't that useful on failures...add our own reporting
+  def getNumJobs(suffix: String): Int = {
+val target = buildURL(appId, suffix)
+val targetBody = HistoryServerSuite.getUrl(target)
+try {
+  go to target.toExternalForm
+  findAll(cssSelector("tbody 

[GitHub] spark pull request: [SPARK-7889] [CORE] HistoryServer to refresh c...

2015-12-08 Thread squito
Github user squito commented on the pull request:

https://github.com/apache/spark/pull/6935#issuecomment-163002749
  
hi @steveloughran thanks for the updates.  The only real issue I see now is 
some inconsistency between whether the checks are time stamp or filesize based 
(or both).  Perhaps I am missing something, but I think its just some bits of 
leftover code after you changed logic around through working on it.  I think 
its just style stuff after that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7889] [CORE] HistoryServer to refresh c...

2015-12-08 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/6935#discussion_r47005658
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -430,8 +519,54 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 }
 newIterator.foreach(addIfAbsent)
 oldIterator.foreach(addIfAbsent)
+mergedApps
+  }
+
+
+  /**
+   * Scan through all incomplete applications, and check for any that have 
changed.
+   *
+   * After the scan, if there were any updated attempts, [[applications]] 
is updated
+   * with the new values.
+   *
+   * 1. No attempt to replay the application is made; this scan is a low 
cost operation.
+   * 2. As this overwrites [[applications]] with a new value, it must not 
run concurrently
+   */
+  private[history] def scanAndUpdateIncompleteAttemptInfo(): Unit = {
+val now = System.currentTimeMillis();
+val newAttempts: Iterable[FsApplicationAttemptInfo] = applications
+.filter( e => !e._2.completed)
+.flatMap { e =>
+  // build list of (false, attempt) or (true, attempt') values
+  e._2.attempts.flatMap { attempt: FsApplicationAttemptInfo =>
+val path = new Path(logDir, attempt.logPath)
+try {
+  val status = fs.getFileStatus(path)
+  val size = getLogSize(status).getOrElse(-1L)
+  val aS = attempt.fileSize
+  if (size > aS) {
--- End diff --

if it should update on timestamp (even if filesizes stay the same), then 
you should check timestamp here too


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7889] [CORE] HistoryServer to refresh c...

2015-12-08 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/6935#discussion_r47010882
  
--- Diff: 
core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala ---
@@ -281,6 +296,202 @@ class HistoryServerSuite extends SparkFunSuite with 
BeforeAndAfter with Matchers
 all (siteRelativeLinks) should startWith (uiRoot)
   }
 
+  test("incomplete apps get refreshed") {
+
+implicit val webDriver: WebDriver = new HtmlUnitDriver
+implicit val formats = org.json4s.DefaultFormats
+
+// this test dir is explictly deleted on successful runs; retained for 
diagnostics when
+// not
+val logDir = 
Utils.createDirectory(System.getProperty("java.io.tmpdir", "logs"))
+
+// a new conf is used with the background thread set and running at 
its fastest
+// alllowed refresh rate (1Hz)
+val myConf = new SparkConf()
+  .set("spark.history.fs.logDirectory", logDir.getAbsolutePath)
+  .set("spark.eventLog.dir", logDir.getAbsolutePath)
+  .set("spark.history.fs.update.interval", "1s")
+  .set("spark.eventLog.enabled", "true")
+  .set("spark.history.cache.window", "250ms")
+  .remove("spark.testing")
+val provider = new FsHistoryProvider(myConf)
+val securityManager = new SecurityManager(myConf)
+
+val sc = new SparkContext("local", "test", myConf)
+val logDirUri = logDir.toURI
+val logDirPath = new Path(logDirUri)
+val fs = FileSystem.get(logDirUri, sc.hadoopConfiguration)
+
+def listDir(dir: Path): Seq[FileStatus] = {
+  val statuses = fs.listStatus(dir)
+  statuses.flatMap(
+stat => if (stat.isDirectory) listDir(stat.getPath) else Seq(stat))
+}
+
+def dumpLogDir(msg: String = ""): Unit = {
+  if (log.isDebugEnabled) {
+logDebug(msg)
+listDir(logDirPath).foreach { status =>
+  val s = status.toString
+  logDebug(s)
+}
+  }
+}
+
+// stop the server with the old config, and start the new one
+server.stop()
+server = new HistoryServer(myConf, provider, securityManager, 18080)
+server.initialize()
+server.bind()
+try {
+  val port = server.boundPort
+  val metrics = server.cacheMetrics
+
+  // assert that a metric has a value; if not dump the whole metrics 
instance
+  def assertMetric(name: String, counter: Counter, expected: Long): 
Unit = {
+val actual = counter.getCount
+if (actual != expected) {
+  // this is here because Scalatest loses stack depth
+  throw new scala.Exception(s"Wrong $name value - expected 
$expected but got $actual" +
+  s" in metrics\n$metrics")
+}
+  }
+
+  // build a URL for an app or app/attempt plus a page underneath
+  def buildURL(appId: String, suffix: String): URL = {
+new URL(s"http://localhost:$port/history/$appId$suffix;)
+  }
+  val historyServerRoot = new URL(s"http://localhost:$port/;)
+  val historyServerIncompleted = new URL(historyServerRoot, 
"?page=1=true")
+
+  // assert the body of a URL contains a string; return that body
+  def assertUrlContains(url: URL, str: String): String = {
+val body = HistoryServerSuite.getUrl(url)
+assert(body.contains(str), s"did not find $str at $url : $body")
+body
+  }
+
+  // start initial job
+  val d = sc.parallelize(1 to 10)
+  d.count()
+  val stdInterval = interval(100 milliseconds)
+  val appId = eventually(timeout(20 seconds), stdInterval) {
+val json = getContentAndCode("applications", port)._2.get
+val apps = parse(json).asInstanceOf[JArray].arr
+apps should have size 1
+(apps.head \ "id").extract[String]
+  }
+
+  // which lists as incomplete
+  assertUrlContains(historyServerIncompleted, appId)
+
+  val appIdRoot = buildURL(appId, "")
+  val rootAppPage = HistoryServerSuite.getUrl(appIdRoot)
+  logDebug(s"$appIdRoot ->[${rootAppPage.length}] \n$rootAppPage")
+  // sanity check to make sure filter is chaining calls
+  rootAppPage should not be empty
+
+  def getAppUI: SparkUI = {
+provider.getAppUI(appId, None).get.ui
+  }
+
+  // selenium isn't that useful on failures...add our own reporting
+  def getNumJobs(suffix: String): Int = {
+val target = buildURL(appId, suffix)
+val targetBody = HistoryServerSuite.getUrl(target)
+try {
+  go to target.toExternalForm
+  

[GitHub] spark pull request: [SPARK-7889] [CORE] HistoryServer to refresh c...

2015-12-08 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/6935#discussion_r47012771
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
 ---
@@ -73,4 +101,17 @@ private[history] abstract class 
ApplicationHistoryProvider {
   @throws(classOf[SparkException])
   def writeEventLogs(appId: String, attemptId: Option[String], zipStream: 
ZipOutputStream): Unit
 
+  /**
+   * Probe for an update to an (incompleted) application
+   * @param appId application ID
+   * @param attemptId optional attempt ID
+   * @param updateTimeMillis time in milliseconds to use as the threshold 
for an update.
+   * @param data any other data the operations implementation can use to 
determine age
+   * @return true if the application was updated since `updateTimeMillis`
+   */
+  def isUpdated(appId: String, attemptId: Option[String], 
updateTimeMillis: Long,
+  data: Option[HistoryProviderUpdateState]): Boolean = {
--- End diff --

done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   >