Github user jerryshao commented on a diff in the pull request:

    https://github.com/apache/spark/pull/8744#discussion_r43221438
  
    --- Diff: 
yarn/src/history/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala
 ---
    @@ -0,0 +1,1328 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.deploy.history.yarn
    +
    +import java.io.InterruptedIOException
    +import java.net.{ConnectException, URI}
    +import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
    +import java.util.concurrent.{LinkedBlockingDeque, TimeUnit}
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable
    +
    +import com.codahale.metrics.{Counter, MetricRegistry, Timer}
    +import org.apache.hadoop.security.UserGroupInformation
    +import org.apache.hadoop.yarn.api.records.timeline.{TimelineDomain, 
TimelineEntity, TimelineEvent}
    +import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, 
ApplicationId}
    +import org.apache.hadoop.yarn.client.api.TimelineClient
    +import org.apache.hadoop.yarn.conf.YarnConfiguration
    +
    +import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._
    +import org.apache.spark.metrics.source.Source
    +import org.apache.spark.scheduler._
    +import org.apache.spark.scheduler.cluster.{SchedulerExtensionService, 
SchedulerExtensionServiceBinding}
    +import org.apache.spark.util.{SystemClock, Utils}
    +import org.apache.spark.{Logging, SparkContext}
    +
    +/**
    + * A Yarn Extension Service to post lifecycle events to a registered YARN 
Timeline Server.
    + *
    + * Posting algorithm
    + *
    + * 1. The service subscribes to all events coming from the Spark Context.
    + * 1. These events are serialized into JSON objects for publishing to the 
timeline service through
    + * HTTP(S) posts.
    + * 1. Events are buffered into `pendingEvents` until a batch is aggregated 
into a
    + * [[TimelineEntity]] for posting.
    + * 1. That aggregation happens when a lifecycle event (application 
start/stop) takes place,
    + * or the number of pending events in a running application exceeds the 
limit set in
    + * `spark.hadoop.yarn.timeline.batch.size`.
    + * 1. Posting operations take place in a separate thread from the spark 
event listener.
    + * 1. If an attempt to post to the timeline server fails, the service 
sleeps and then
    + * it is re-attempted after the retry period defined by
    + * `spark.hadoop.yarn.timeline.post.retry.interval`.
    + * 1. If the number of events buffered in the history service exceed the 
limit set in
    + * `spark.hadoop.yarn.timeline.post.limit`, then further events other than 
application start/stop
    + * are dropped.
    + * 1. When the service is stopped, it will make a best-effort attempt to 
post all queued events.
    + * the call of [[stop()]] can block up to the duration of
    + * `spark.hadoop.yarn.timeline.shutdown.waittime` for this to take place.
    + * 1. No events are posted until the service receives a 
[[SparkListenerApplicationStart]] event.
    + *
    + * If the spark context has a metrics registry, then the internal counters 
of queued entities,
    + * post failures and successes, and the performance of the posting 
operation are all registered
    + * as metrics.
    + *
    + * The shutdown logic is somewhat convoluted, as the posting thread may be 
blocked on HTTP IO
    + * when the shutdown process begins. In this situation, the thread 
continues to be blocked, and
    + * will be interrupted once the wait time has expired. All time consumed 
during the ongoing
    + * operation will be counted as part of the shutdown time period.
    + */
    +private[spark] class YarnHistoryService extends SchedulerExtensionService
    +  with Logging with Source {
    +
    +  import org.apache.spark.deploy.history.yarn.YarnHistoryService._
    +
    +  /** Simple state model implemented in an atomic integer */
    +  private val _serviceState = new AtomicInteger(CreatedState)
    +
    +  /** Get the current state */
    +  def serviceState: Int = {
    +    _serviceState.get()
    +  }
    +
    +  /**
    +   * Enter a new state, return the old one. Atomic.
    +   * There are no checks on state model.
    +   * @param state new state
    +   * @return previous state
    +   */
    +  private def enterState(state: Int): Int = {
    +    logDebug(s"Entering state $state from $serviceState")
    +    _serviceState.getAndSet(state)
    +  }
    +
    +  /** Spark context; valid once started */
    +  private var sparkContext: SparkContext = _
    +
    +  /** YARN configuration from the spark context */
    +  private var config: YarnConfiguration = _
    +
    +  /** Application ID. */
    +  private var _applicationId: ApplicationId = _
    +
    +  /** Attempt ID -this will be null if the service is started in 
yarn-client mode */
    +  private var _attemptId: Option[ApplicationAttemptId] = None
    +
    +  /** YARN timeline client */
    +  private var _timelineClient: Option[TimelineClient] = None
    +
    +  /** Registered event listener */
    +  private var listener: Option[YarnEventListener] = None
    +
    +  /** Application name  from the spark start event */
    +  private var applicationName: String = _
    +
    +  /** Application ID */
    +  private var sparkApplicationId: Option[String] = None
    +
    +  /** Optional Attempt ID from the spark start event */
    +  private var sparkApplicationAttemptId: Option[String] = None
    +
    +  /** User name as derived from `SPARK_USER` env var or `UGI` */
    +  private var userName = Utils.getCurrentUserName()
    +
    +  /** Clock for recording time */
    +  private val clock = new SystemClock()
    +
    +  /** Start time of the application, as received in the start event. */
    +  private var startTime: Long = _
    +
    +  /** Start time of the application, as received in the end event. */
    +  private var endTime: Long = _
    +
    +  /** Number of events to batch up before posting */
    +  private var _batchSize = DEFAULT_BATCH_SIZE
    +
    +  /** Queue of entities to asynchronously post, plus the number of events 
in each entry */
    +  private val _postingQueue = new LinkedBlockingDeque[PostQueueAction]()
    +
    +  /** Size of post queue events */
    +  private val postQueueEventSize = new Counter()
    +
    +  /** Limit on the total number of events permitted */
    +  private var _postQueueLimit = DEFAULT_POST_EVENT_LIMIT
    +
    +  /** List of events which will be pulled into a timeline entity when 
created */
    +  private var pendingEvents = new mutable.LinkedList[TimelineEvent]()
    +
    +  /** The received application started event; `None` if no event has been 
received */
    +  private var applicationStartEvent: Option[SparkListenerApplicationStart] 
= None
    +
    +  /** The received application end event; `None` if no event has been 
received */
    +  private var applicationEndEvent: Option[SparkListenerApplicationEnd] = 
None
    +
    +  /** Has a start event been processed? */
    +  private val appStartEventProcessed = new AtomicBoolean(false)
    +
    +  /** Has the application event event been processed? */
    +  private val appEndEventProcessed = new AtomicBoolean(false)
    +
    +  /** Counter of events processed -that is have been through handleEvent() 
*/
    +  private val _eventsProcessed = new Counter()
    +
    +  /** Counter of events queued. */
    +  private val _eventsQueued = new Counter()
    +
    +  /** Counter of number of attempts to post entities. */
    +  private val _entityPostAttempts = new Counter()
    +
    +  /** Counter of number of successful entity post operations. */
    +  private val _entityPostSuccesses = new Counter()
    +
    +  /** How many entity postings failed? */
    +  private val _entityPostFailures = new Counter()
    +
    +  /** How many entity postings were rejected? */
    +  private val _entityPostRejections = new Counter()
    +
    +  /** The number of events which were dropped as the backlog of pending 
posts was too big. */
    +  private val _eventsDropped = new Counter()
    +
    +  /** How many flushes have taken place? */
    +  private val flushCount = new Counter()
    +
    +  /** Event handler thread */
    +  private var entityPostThread: Option[Thread] = None
    +
    +  /** Flag to indicate the queue is stopped; events aren't being 
processed. */
    +  private val queueStopped = new AtomicBoolean(true)
    +
    +  /** Boolean to track when the post thread is active; Set and reset in 
the thread itself. */
    +  private val postThreadActive = new AtomicBoolean(false)
    +
    +  /** How long to wait in millseconds for shutdown before giving up? */
    +  private var shutdownWaitTime = 0L
    +
    +  /** What is the initial and incrementing interval for POST retries? */
    +  private var retryInterval = 0L
    +
    +  /** Domain ID for entities: may be null */
    +  private var domainId: Option[String] = None
    +
    +  /** URI to timeline web application -valid after `serviceStart()` */
    +  private var _timelineWebappAddress: URI = _
    +
    +  /** Name for metrics: yarn_history */
    +  override val sourceName = METRICS_NAME
    +
    +  /** Metrics registry */
    +  override val metricRegistry = new MetricRegistry()
    +
    +  /** Timer to build up statistics on post operation times */
    +  private val postOperationTimer: Timer = 
metricRegistry.timer(MetricRegistry.name("posts"))
    +
    +  /**
    +   * Create a timeline client and start it. This does not update the
    +   * `timelineClient` field, though it does verify that the field
    +   * is unset.
    +   *
    +   * The method is private to the package so that tests can access it, 
which
    +   * some of the mock tests do to override the timeline client creation.
    +   * @return the timeline client
    +   */
    +  private[yarn] def createTimelineClient(): TimelineClient = {
    +    require(_timelineClient.isEmpty, "timeline client already set")
    +    YarnTimelineUtils.createTimelineClient(sparkContext)
    +  }
    +
    +  /**
    +   * Get the timeline client.
    +   * @return the client
    +   * @throws Exception if the timeline client is not currently running
    +   */
    +  def timelineClient: TimelineClient = {
    +    synchronized { _timelineClient.get }
    +  }
    +
    +  /**
    +   * Get the total number of events dropped due to the queue of
    +   * outstanding posts being too long.
    +   * @return counter of events processed
    +   */
    +
    +  def eventsDropped: Long = {
    +    _eventsDropped.getCount
    +  }
    +
    +  /**
    +   * Get the total number of processed events, those handled in the 
back-end thread without
    +   * being rejected.
    +   *
    +   * @return counter of events processed
    +   */
    +  def eventsProcessed: Long = {
    +    _eventsProcessed.getCount
    +  }
    +
    +  /**
    +   * Get the total number of events queued.
    +   *
    +   * @return the total event count
    +   */
    +  def eventsQueued: Long = {
    +    _eventsQueued.getCount
    +  }
    +
    +  /**
    +   * Get the current size of the queue.
    +   *
    +   * @return the current queue length
    +   */
    +  def getQueueSize: Int = {
    +    _postingQueue.size()
    +  }
    +
    +  /**
    +   * Get the current batch size.
    +   *
    +   * @return the batch size
    +   */
    +  def batchSize: Int = {
    +    _batchSize
    +  }
    +
    +  /**
    +   * Query the counter of attempts to post entities to the timeline 
service.
    +   *
    +   * @return the current value
    +   */
    +  def postAttempts: Long = _entityPostAttempts.getCount
    +
    +  /**
    +   * Get the total number of failed post operations.
    +   *
    +   * @return counter of timeline post operations which failed
    +   */
    +  def postFailures: Long = {
    +    _entityPostFailures.getCount
    +  }
    +
    +  /**
    +   * Query the counter of successful post operations (this is not the same 
as the
    +   * number of events posted).
    +   *
    +   * @return the number of successful post operations.
    +   */
    +  def postSuccesses: Long = _entityPostSuccesses.getCount
    +
    +  /**
    +   * Is the asynchronous posting thread active?
    +   *
    +   * @return true if the post thread has started; false if it has not 
yet/ever started, or
    +   *         if it has finished.
    +   */
    +  def isPostThreadActive: Boolean = {
    +    postThreadActive.get
    +  }
    +
    +  /**
    +   * The YARN application ID of this history service.
    +   *
    +   * @return the application ID provided when the service started
    +   */
    +  def applicationId: ApplicationId = _applicationId
    +
    +  /**
    +   * The YARN attempt ID of this history service.
    +   *
    +   * @return the attempt ID provided when the service started
    +   */
    +  def attemptId: Option[ApplicationAttemptId] = _attemptId
    +
    +  /**
    +   * Reset the timeline client. Idempotent.
    +   *
    +   * 1. Stop the timeline client service if running.
    +   * 2. set the `timelineClient` field to `None`
    +   */
    +  def stopTimelineClient(): Unit = {
    +    synchronized {
    +      _timelineClient.foreach(_.stop())
    +      _timelineClient = None
    +    }
    +  }
    +
    +  /**
    +   * Create the timeline domain.
    +   *
    +   * A Timeline Domain is a uniquely identified 'namespace' for accessing 
parts of the timeline.
    +   * Security levels are are managed at the domain level, so one is 
created if the
    +   * spark acls are enabled. Full access is then granted to the current 
user,
    +   * all users in the configuration options `"spark.modify.acls"` and 
`"spark.admin.acls"`;
    +   * read access to those users and those listed in `"spark.ui.view.acls"`
    +   *
    +   * @return an optional domain string. If `None`, then no domain was 
created.
    +   */
    +  private def createTimelineDomain(): Option[String] = {
    +    val sparkConf = sparkContext.getConf
    +    val aclsOn = sparkConf.getBoolean("spark.ui.acls.enable",
    +        sparkConf.getBoolean("spark.acls.enable", false))
    +    if (!aclsOn) {
    +      logDebug("ACLs are disabled; not creating the timeline domain")
    +      return None
    +    }
    +    val predefDomain = sparkConf.getOption(TIMELINE_DOMAIN)
    +    if (predefDomain.isDefined) {
    +      logDebug(s"Using predefined domain $predefDomain")
    +      return predefDomain
    +    }
    +    val current = UserGroupInformation.getCurrentUser.getShortUserName
    +    val adminAcls = stringToSet(sparkConf.get("spark.admin.acls", ""))
    +    val viewAcls = stringToSet(sparkConf.get("spark.ui.view.acls", ""))
    +    val modifyAcls = stringToSet(sparkConf.get("spark.modify.acls", ""))
    +
    +    val readers = (Seq(current) ++ adminAcls ++ modifyAcls ++ 
viewAcls).mkString(" ")
    +    val writers = (Seq(current) ++ adminAcls ++ modifyAcls).mkString(" ")
    +    val domain = DOMAIN_ID_PREFIX + _applicationId
    +    logInfo(s"Creating domain $domain with readers: $readers and writers: 
$writers")
    +
    +    // create the timeline domain with the reader and writer permissions
    +    val timelineDomain = new TimelineDomain()
    +    timelineDomain.setId(domain)
    +    timelineDomain.setReaders(readers)
    +    timelineDomain.setWriters(writers)
    +    try {
    +      timelineClient.putDomain(timelineDomain)
    +      Some(domain)
    +    } catch {
    +      case e: Exception =>
    +        logError(s"cannot create the domain $domain", e)
    +        // fallback to default
    +        None
    +    }
    +  }
    +
    +  /**
    +   * Start the service.
    +   *
    +   * @param binding binding to the spark application and YARN
    +   */
    +  override def start(binding: SchedulerExtensionServiceBinding): Unit = {
    +    val oldstate = enterState(StartedState)
    +    if (oldstate != CreatedState) {
    +      // state model violation
    +      _serviceState.set(oldstate)
    +      throw new IllegalArgumentException(s"Cannot start the service from 
state $oldstate")
    +    }
    +    val context = binding.sparkContext
    +    val appId = binding.applicationId
    +    val attemptId = binding.attemptId
    +    require(context != null, "Null context parameter")
    +    bindToYarnApplication(appId, attemptId)
    +    this.sparkContext = context
    +    this.config = new YarnConfiguration(context.hadoopConfiguration)
    +    val sparkConf = sparkContext.conf
    +
    +    // work out the attempt ID from the YARN attempt ID. No attempt, 
assume "1".
    +    val attempt1 = attemptId match {
    +      case Some(attempt) => attempt.getAttemptId.toString
    +      case None => CLIENT_BACKEND_ATTEMPT_ID
    +    }
    +    setContextAppAndAttemptInfo(Some(appId.toString), Some(attempt1))
    +    _batchSize = sparkConf.getInt(BATCH_SIZE, _batchSize)
    +    _postQueueLimit = sparkConf.getInt(POST_EVENT_LIMIT, _postQueueLimit)
    +    retryInterval = 1000 * sparkConf.getTimeAsSeconds(POST_RETRY_INTERVAL,
    +      DEFAULT_POST_RETRY_INTERVAL)
    +    shutdownWaitTime = 1000 * 
sparkConf.getTimeAsSeconds(SHUTDOWN_WAIT_TIME,
    +      DEFAULT_SHUTDOWN_WAIT_TIME)
    +
    +    // register metrics
    +    // the local registration always takes place, so test runs catch 
regressions.
    +    registerMetrics()
    +
    +    // the full metrics integration happens if the spark context has a 
metrics system
    +    val metrics = sparkContext.metricsSystem
    +    if (metrics != null) {
    +      metrics.registerSource(this)
    +    }
    +
    +    // set up the timeline service, unless it's been disabled for testing
    +    if (timelineServiceEnabled) {
    +      _timelineWebappAddress = getTimelineEndpoint(config)
    +
    +      logInfo(s"Starting $this")
    +      logInfo(s"Spark events will be published to the Timeline service" +
    +          s" at${_timelineWebappAddress}")
    +      _timelineClient = Some(createTimelineClient())
    +      domainId = createTimelineDomain()
    +      // declare that the processing is started
    +      queueStopped.set(false)
    +      val thread = new Thread(new EntityPoster(), "EventPoster")
    +      entityPostThread = Some(thread)
    +      thread.setDaemon(true)
    +      thread.start()
    +    } else {
    +      logInfo("Timeline service is disabled")
    +    }
    +    if (registerListener()) {
    +      logInfo(s"History Service listening for events: $this")
    +    } else {
    +      logInfo(s"History Service is not listening for events: $this")
    +    }
    +  }
    +
    +  /**
    +   * Check the service configuration to see if the timeline service is 
enabled.
    +   *
    +   * @return true if `YarnConfiguration.TIMELINE_SERVICE_ENABLED` is set.
    +   */
    +  def timelineServiceEnabled: Boolean = {
    +    YarnTimelineUtils.timelineServiceEnabled(config)
    +  }
    +
    +  /**
    +   * Register all system metrics into the [[metricRegistry]].
    +   */
    +  private def registerMetrics(): Unit = {
    +    metricRegistry.register("eventsProcessed", _eventsProcessed)
    +    metricRegistry.register("eventsQueued", _eventsQueued)
    +    metricRegistry.register("entityPostAttempts", _entityPostAttempts)
    +    metricRegistry.register("entityPostSuccesses", _entityPostSuccesses)
    +    metricRegistry.register("entityPostFailures", _entityPostFailures)
    +    metricRegistry.register("entityPostRejections", _entityPostRejections)
    +    metricRegistry.register("eventsDropped", _eventsDropped)
    +    metricRegistry.register("flushCount", flushCount)
    +  }
    +
    +  /**
    +   * Return a summary of the service state to help diagnose problems
    +   * during test runs, possibly even production.
    +   *
    +   * @return a summary of the current service state
    +   */
    +  override def toString(): String = {
    --- End diff --
    
    It would be better here to use:
    
    ```scala
    s"""
    |
    |
    """.stripMargin
    ```



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to