Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/8744#discussion_r39577849
  
    --- Diff: 
yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala
 ---
    @@ -0,0 +1,981 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.deploy.history.yarn
    +
    +import java.net.{ConnectException, URI}
    +import java.util.concurrent.LinkedBlockingQueue
    +import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
    +
    +import scala.collection.mutable.LinkedList
    +import scala.util.control.NonFatal
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.security.UserGroupInformation
    +import org.apache.hadoop.service.{AbstractService, Service}
    +import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, 
ApplicationId}
    +import org.apache.hadoop.yarn.api.records.timeline.{TimelineDomain, 
TimelineEntity, TimelinePutResponse}
    +import org.apache.hadoop.yarn.client.api.TimelineClient
    +import org.apache.hadoop.yarn.conf.YarnConfiguration
    +
    +import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._
    +import org.apache.spark.scheduler._
    +import org.apache.spark.scheduler.cluster.YarnExtensionService
    +import org.apache.spark.util.{SystemClock, Clock, Utils}
    +import org.apache.spark.{Logging, SparkContext}
    +
    +/**
    + * Implements a Hadoop service with the init/start logic replaced by that
    + * of the YarnService.
    + * <p>
    + * As [[AbstractService]] implements `close()`, routing
    + * to its `stop` method, calling `close()` is sufficient
    + * to stop the service instance.
    + * <p>
    + * However, when registered to receive spark events, the service will 
continue to
    + * receive them until the spark context is stopped. Events received when 
this service
    + * is in a `STOPPED` state will be discarded.
    + */
    +private[spark] class YarnHistoryService  extends AbstractService("History 
Service")
    +  with YarnExtensionService with Logging {
    +
    +  import org.apache.spark.deploy.history.yarn.YarnHistoryService._
    +  /**
    +   * Spark context; valid once started
    +   */
    +  private var sparkContext: SparkContext = _
    +
    +  /** application ID. */
    +  private var _applicationId: ApplicationId = _
    +
    +  /** attempt ID this will be null if the service is started in 
yarn-client mode */
    +  private var _attemptId: Option[ApplicationAttemptId] = None
    +
    +  /** YARN timeline client */
    +  private var timelineClient: Option[TimelineClient] = None
    +
    +  /** registered event listener */
    +  private var listener: Option[YarnEventListener] = None
    +
    +  /** Applicaton name */
    +  private var appName: String = _
    +
    +  /** Application ID from the spark start event */
    +  private var sparkApplicationId: Option[String] = None
    +
    +  /** Optional Attempt ID from the spark start event */
    +  private var sparkApplicationAttemptId: Option[String] = None
    +
    +  /** user name as derived from `SPARK_USER` env var or `UGI` */
    +  private var userName: String = Utils.getCurrentUserName
    +
    +  /**
    +   * Clock for recording time
    +   */
    +  private val clock: Clock = new SystemClock()
    +
    +  /**
    +   * Start time of the application, as received in the start event.
    +   */
    +  private var startTime: Long = _
    +
    +  /** number of events to batch up before posting*/
    +  private var batchSize: Int = DEFAULT_BATCH_SIZE
    +
    +  /** queue of actions*/
    +  private val actionQueue = new LinkedBlockingQueue[QueuedAction]
    +
    +  /** cache layer to handle timeline client failure.*/
    +  private var entityList = new LinkedList[TimelineEntity]
    +
    +  /** current entity; wil be created on demand. */
    +  private var curEntity: Option[TimelineEntity] = None
    +
    +  /** Has a start event been processed? */
    +  private val appStartEventProcessed = new AtomicBoolean(false)
    +
    +  /* has the application event event been processed */
    +  private val appEndEventProcessed = new AtomicBoolean(false)
    +
    +  /** How many events have been received in the current entity */
    +  private var currentEventCount = 0
    +
    +  /** counter of events processed -that is have been through 
handleEvent()*/
    +  private val eventsProcessed: AtomicInteger = new AtomicInteger(0)
    +
    +  /** counter of events queued. */
    +  private val eventsQueued: AtomicInteger = new AtomicInteger(0)
    +
    +  /** how many event postings failed? */
    +  private val eventPostFailures: AtomicInteger = new AtomicInteger(0)
    +
    +  /** how many flushes have taken place? */
    +  private val flushCount = new AtomicInteger(0)
    +
    +  /** Event handler */
    +  private var eventHandlingThread: Option[Thread] = None
    +
    +  /**
    +   * Flag to indicate the thread is stopped; events aren't being
    +   * processed.
    +   */
    +  private val stopped: AtomicBoolean = new AtomicBoolean(true)
    +
    +  /**
    +   * boolean to track whether a thread is active or not, for tests to
    +   * monitor and see if the thread has completed.
    +   */
    +  private val postThreadActive: AtomicBoolean = new AtomicBoolean(false)
    +
    +  /**
    +   * object used for a lock on entity operations
    +   */
    +  private val entityLock: AnyRef = new AnyRef
    +
    +  /**
    +   * How long to wait for shutdown before giving up
    +   */
    +  private var maxTimeToWaitOnShutdown: Long = SHUTDOWN_WAIT_TIME
    +
    +  /** Domain ID for entities: may be null */
    +  private var domainId: String = null
    +
    +  /** URI to timeline web application -valid after `serviceStart()` */
    +  private var timelineWebappAddress: URI = _
    +
    +  /**
    +   * Create a timeline client and start it. This does not update the
    +   * `timelineClient` field, though it does verify that the field
    +   * is unset.
    +   *
    +   * The method is private to the package so that tests can access it, 
which
    +   * some of the mock tests do to override the timeline client creation.
    +   * @return the timeline client
    +   */
    +  private [yarn] def createTimelineClient(): TimelineClient = {
    +    require(timelineClient.isEmpty, "timeline client already set")
    +    YarnTimelineUtils.createTimelineClient(sparkContext)
    +  }
    +
    +  /**
    +   * Get the timeline client.
    +   * @return the client
    +   * @throws Exception if the timeline client is not currently running
    +   */
    +  def getTimelineClient: TimelineClient = {
    +    timelineClient.getOrElse(throw new Exception("Timeline client not 
running"))
    +  }
    +
    +  /**
    +   * Get the total number of processed events
    +   * @return counter of events processed
    +   */
    +  def getEventsProcessed: Int = {
    +    eventsProcessed.get
    +  }
    +
    +  /**
    +   * Get the total number of events queued
    +   * @return the total event count
    +   */
    +  def getEventsQueued: Int = {
    +    eventsQueued.get
    +  }
    +
    +  /**
    +   * Get the current size of the queue
    +   * @return the current queue length
    +   */
    +  def getQueueSize: Int = {
    +    actionQueue.size
    +  }
    +
    +  /**
    +   * Get the current batch size
    +   * @return the batch size
    +   */
    +  def getBatchSize: Int = {
    +    batchSize
    +  }
    +
    +  /**
    +   * Get the total number of failed posts events
    +   * @return counter of timeline post operations which failed
    +   */
    +  def getEventPostFailures: Int = {
    +    eventPostFailures.get
    +  }
    +
    +  /**
    +   * is the asynchronous posting thread active?
    +   * @return true if the post thread has started; false if it has not 
yet/ever started, or
    +   *         if it has finished.
    +   */
    +  def isPostThreadActive: Boolean = {
    +    postThreadActive.get
    +  }
    +
    +  /**
    +   * The YARN application ID of this history service
    +   * @return the application ID provided when the service started
    +   */
    +  def applicationId: ApplicationId = { _applicationId }
    +
    +  /**
    +   * The YARN attempt ID of this history service
    +   * @return the attempt ID provided when the service started
    +   */
    +  def attemptId: Option[ApplicationAttemptId] = { _attemptId }
    +
    +  /**
    +   * Reset the timeline client
    +   * <p>
    +   * 1. Stop the timeline client service if running.
    +   * 2. set the `timelineClient` field to `None`
    +   */
    +  def stopTimelineClient(): Unit = {
    +    stopOptionalService(timelineClient)
    +    timelineClient = None
    +  }
    +
    +  /**
    +   * Create the timeline domain
    --- End diff --
    
    It would be nice to have a better doc - like, explaining what the timeline 
domain is.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to