Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/5423#discussion_r33466420 --- Diff: yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryProvider.scala --- @@ -0,0 +1,1015 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history.yarn + +import java.io.FileNotFoundException +import java.net.URI +import java.util.Date +import java.util.concurrent.LinkedBlockingQueue +import java.util.concurrent.atomic.{AtomicLong, AtomicBoolean} +import java.util.zip.ZipOutputStream + +import scala.collection.JavaConversions._ + +import org.apache.hadoop.security.UserGroupInformation +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity +import org.apache.hadoop.yarn.conf.YarnConfiguration + +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._ +import org.apache.spark.deploy.history.yarn.rest.{JerseyBinding, TimelineQueryClient} +import org.apache.spark.deploy.history.{ApplicationHistoryInfo, ApplicationHistoryProvider, HistoryServer} +import org.apache.spark.scheduler.{ApplicationEventListener, SparkListenerBus} +import org.apache.spark.ui.SparkUI +import org.apache.spark.{SparkException, Logging, SecurityManager, SparkConf} + +/** + * A History provider which reads in the history from + * the YARN Timeline Service. + * + * The service is a remote HTTP service, so failure modes are + * different from simple file IO. + * + * 1. Application listings are asynchronous, and made on a schedule, though + * they can be forced (and the schedule disabled). + * 2. The results are cached and can be retrieved with [[getApplications()]]. + * 3. The most recent failure of any operation is stored, + * The [[getLastFailure()]] call will return the last exception + * or `None`. It is shared across threads so is primarily there for + * tests and basic diagnostics. + * 4. Listing the details of a single application in [[getAppUI()]] + * is synchronous and *not* cached. + * 5. the [[maybeCheckHealth()]] call performs a health check as the initial + * binding operation of this instance. This call invokes [[TimelineQueryClient.healthCheck()]] + * for better diagnostics on binding failures -particularly configuration problems. + * 6. Every REST call, synchronous or asynchronous, will invoke [[maybeCheckHealth()]] until + * the health check eventually succeeds. + * <p> + * If the timeline is not enabled, the API calls used by the web UI + * downgrade gracefully (returning empty entries), rather than fail. + * + * + * @param sparkConf configuration of the provider + */ +private[spark] class YarnHistoryProvider(sparkConf: SparkConf) + extends ApplicationHistoryProvider with Logging { + + /** + * The configuration here is a YarnConfiguration built off the spark configuration + * supplied in the constructor; this operation ensures that `yarn-default.xml` + * and `yarn-site.xml` are pulled in. Options in the spark conf will override + * those in the -default and -site XML resources which are not marked as final. + */ + private val yarnConf = { + new YarnConfiguration(SparkHadoopUtil.get.newConfiguration(sparkConf)) + } + + /** + * UI ACL option + */ + private val uiAclsEnabled = sparkConf.getBoolean("spark.history.ui.acls.enable", false) + + private val detailedInfo = sparkConf.getBoolean(YarnHistoryProvider.OPTION_DETAILED_INFO, false) + private val NOT_STARTED = "<Not Started>" + + /* minimum interval between each check for event log updates */ + private val refreshInterval = sparkConf.getLong(YarnHistoryProvider.OPTION_MIN_REFRESH_INTERVAL, + YarnHistoryProvider.DEFAULT_MIN_REFRESH_INTERVAL_SECONDS) * 1000 + + /** + * Window limit in milliseconds + */ + private val windowLimitMs = sparkConf.getLong(YarnHistoryProvider.OPTION_WINDOW_LIMIT, + YarnHistoryProvider.DEFAULT_WINDOW_LIMIT) * 1000 + + /** + * Number of events to get + */ + private val eventFetchLimit = sparkConf.getLong(YarnHistoryProvider.OPTION_EVENT_FETCH_LIMIT, + YarnHistoryProvider.DEFAULT_EVENT_FETCH_LIMIT) + + private val eventFetchOption: Option[Long] = if (eventFetchLimit > 0) Some(eventFetchLimit) else None + + /** + * Start time. Doesn't use the `now` call as tests can subclass that and + * it won't be valid until after the subclass has been constructed + */ + val serviceStartTime = System.currentTimeMillis() + + /** + * Timeline endpoint URI + */ + protected val timelineEndpoint = createTimelineEndpoint() + + /** + * The timeline query client which uses the `jersey` + * Jersey instance to talk to a timeline service running + * at [[timelineEndpoint]], and creates a timeline (write) client instance + * to handle token renewal + * + */ + protected val timelineQueryClient = { + createTimelineQueryClient() + } + + + /** + * Override point: create the timeline endpoint + * @return a URI to the timeline web service + */ + protected def createTimelineEndpoint(): URI = { + getTimelineEndpoint(yarnConf) + } + + /** + * Override point: create the timeline query client. + * This is called during instance creation. + * @return a timeline query client ot use for the duration + * of this instance + */ + protected def createTimelineQueryClient(): TimelineQueryClient = { + new TimelineQueryClient(timelineEndpoint, yarnConf, JerseyBinding.createClientConfig()) + } + + /** + * The empty listing, with a timestamp to indicate that the listing + * has never taken place. + */ + private val emptyListing = new ApplicationListingResults(0, Nil, None) + + /** + * List of applications. Initial result is empty + */ + private var applications: ApplicationListingResults = emptyListing + + /** + * Last exception seen and when + */ + protected var lastFailureCause: Option[(Throwable, Date)] = None + + private val refreshCount = new AtomicLong(0) + private val refreshFailedCount = new AtomicLong(0) + + /** + * Health marker + */ + private val healthy = new AtomicBoolean(false) + + /** + * Enabled flag + */ + private val _enabled = timelineServiceEnabled(yarnConf) + + /** + * Atomic boolean used to signal to the refresh thread that it + * must exit its loop. + */ + private val stopRefresh = new AtomicBoolean(false) + + /** + * refresher + */ + val refresher = new Refresher() + + /** + * Initialize the provider + */ + init() + + /** + * Check the configuration and log whether or not it is enabled; + * if it is enabled then the URL is logged too. + */ + private def init(): Unit = { + if (!enabled) { + logError(YarnHistoryProvider.TEXT_SERVICE_DISABLED) + } else { + logInfo(YarnHistoryProvider.TEXT_SERVICE_ENABLED) + logInfo(YarnHistoryProvider.KEY_SERVICE_URL + ": " + timelineEndpoint) + logDebug(sparkConf.toDebugString) + // get the thread time + logInfo(s"refresh interval $refreshInterval milliseconds") + if (refreshInterval < 0) { + throw new Exception(YarnHistoryProvider.TEXT_INVALID_UPDATE_INTERVAL + + s": ${refreshInterval/1000}") + } + startRefreshThread() + } + } + + + /** + * Stop the service. After this point operations will fail. + */ + override def stop(): Unit = { + logDebug(s"Stopping $this") + // attempt to stop the refresh thread + if (!stopRefreshThread()) { + closeQueryClient() + } + + } + + /** + * Close the query client + */ + def closeQueryClient(): Unit = { + // and otherwise, stop the query client + logDebug("Stopping Timeline client") + timelineQueryClient.close() + } + + /** + * Is the timeline service (and therefore this provider) enabled. + * (override point for tests). + * + * Important: this is called during construction, so test-time subclasses + * will be invoked before their own construction has taken place. + * Code appropriately. + * @return true if the provider/YARN configuration enables the timeline + * service. + */ + def enabled: Boolean = { + _enabled + } + + /** + * Get the timeline query client. Used internally to ease testing + * @return the client. + */ + def getTimelineQueryClient(): TimelineQueryClient = { + timelineQueryClient + } + + /** + * Set the last exception + * @param ex exception seen + */ + private def setLastFailure(ex: Throwable): Unit = { + setLastFailure(ex, now()) + } + + /** + * Set the last exception + * @param ex exception seen + * @param timestamp the timestamp of the failure + */ + private def setLastFailure(ex: Throwable, timestamp: Long): Unit = { + this.synchronized { + lastFailureCause = Some(ex, new Date(timestamp)) + } + } + + /** + * Reset the failure info + */ + private def resetLastFailure(): Unit = { + this.synchronized { + lastFailureCause = None + } + } + + /** + * Get the last exception + * @return the last exception or null + */ + def getLastFailure(): Option[(Throwable, Date)] = { + this.synchronized { + lastFailureCause + } + } + + /** + * Query for the connection being healthy + * @return + */ + def isHealthy(): Boolean = { + healthy.get() + } + + /** + * Get that the health flag itself. This allows test code to initialize it properly. + * Also: if accessed and set to false, it will trigger another health chek. + * @return + */ + protected def getHealthFlag(): AtomicBoolean = { + healthy; --- End diff -- It tries to bootstrap the connection with some basic health checks, rather than just exceptions. The common problem, based on my own experience, is getting the URL to the timeline service wrong, so having some HTML coming back instead of JSON. Jersey just generates some json-unmarshalling error there which is meaningless except to people that recognise that stack trace as "something other than Jersey came back". It's flipped to true briefly to disable the health check & stop re-entrant calls on different threads; if the single executing thread fails it is reverted back to unhealthy. That is: the first check assumes it will work.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org