Github user steveloughran commented on a diff in the pull request:

    https://github.com/apache/spark/pull/5423#discussion_r28507293
  
    --- Diff: 
yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala
 ---
    @@ -0,0 +1,630 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.deploy.history.yarn
    +
    +import java.net.{ConnectException, URL}
    +import java.util.concurrent.LinkedBlockingQueue
    +import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
    +
    +import scala.collection.mutable.LinkedList
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.security.UserGroupInformation
    +import org.apache.hadoop.service.{AbstractService, Service}
    +import org.apache.hadoop.yarn.api.records.ApplicationId
    +import org.apache.hadoop.yarn.api.records.timeline.{TimelineDomain, 
TimelineEntity, TimelinePutResponse}
    +import org.apache.hadoop.yarn.client.api.TimelineClient
    +import org.apache.hadoop.yarn.conf.YarnConfiguration
    +import org.apache.hadoop.yarn.exceptions.YarnException
    +
    +import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._
    +import org.apache.spark.scheduler._
    +import org.apache.spark.scheduler.cluster.YarnService
    +import org.apache.spark.{Logging, SparkContext}
    +
    +/**
    + * Implements a Hadoop service with the init/start logic replaced by that
    + * of the YarnService.
    + * <p>
    + * As <code>AbstractService</code> implements <code>close()</code>, routing
    + * to its <code>stop</code> method, calling <code>close()</code> is 
sufficient
    + * to stop the service instance.
    + * <p>
    + * However, when registered to receive spark events, the service will 
continue to
    + * receive them until the spark context is stopped. Events received when 
this service
    + * is in a <code>STOPPED</code> state will be discarded.
    + */
    +private[spark] class YarnHistoryService  extends AbstractService("History 
Service")
    +  with YarnService with Logging {
    +
    +  private var sc: SparkContext = _
    +  private var appId: ApplicationId = _
    +  private var timelineClient: Option[TimelineClient] = None
    +  private var listener: YarnEventListener = _
    +  private var appName: String = null
    +  private var userName: String = null
    +  private var startTime: Long = _
    +
    +  private var batchSize: Int = YarnHistoryService.DEFAULT_BATCH_SIZE
    +
    +  // enqueue event to avoid blocking on main thread.
    +  private val eventQueue = new LinkedBlockingQueue[TimestampEvent]
    +  // cache layer to handle timeline client failure.
    +  private var entityList = new LinkedList[TimelineEntity]
    +  private var curEntity: Option[TimelineEntity] = None
    +  // Do we have enough information filled for the entity
    +  private var bAppStart = false
    +  private var bAppEnd = false
    +  // How many event we saved
    +  private var curEventNum = 0
    +  private val eventsProcessed: AtomicInteger = new AtomicInteger(0)
    +  private val eventsQueued: AtomicInteger = new AtomicInteger(0)
    +  private val eventPostFailures: AtomicInteger = new AtomicInteger(0)
    +  private val flushCount = new AtomicInteger(0)
    +  private var eventHandlingThread: Thread = null
    +  private val stopped: AtomicBoolean = new AtomicBoolean(true)
    +  private final val lock: AnyRef = new AnyRef
    +  private var maxTimeToWaitOnShutdown: Long = 
YarnHistoryService.SHUTDOWN_WAIT_TIME
    +  private var clientFailure = 0
    +  private var domainId: String = null
    +  private var timelineWebappAddress: URL = _
    +
    +
    +  /**
    +   * Create a timeline client
    +   * @return the timeline client
    +   */
    +  def createTimelineClient = {
    +    clientFailure += 1
    +    logInfo("Creating timelineClient " + clientFailure)
    +    val client = TimelineClient.createTimelineClient()
    +    client.init(sc.hadoopConfiguration)
    +    client.start
    +    timelineClient = Some(client)
    +    client
    +  }
    +
    +  /**
    +   * Get the timeline client; this will create it if needed
    +   * @return the client
    +   */
    +  def getTimelineClient: TimelineClient = {
    +    timelineClient.getOrElse(createTimelineClient)
    +  }
    +
    +  def getEventsProcessed: Int = {
    +    eventsProcessed.get()
    +  }
    +
    +  def getEventsQueued: Int = {
    +    eventsQueued.get()
    +  }
    +
    +  def getQueueSize: Int = {
    +    eventQueue.size()
    +  }
    +
    +  def getBatchSize: Int = {
    +    batchSize
    +  }
    +
    +  def getEventPostFailures: Int = {
    +    eventPostFailures.get()
    +  }
    +
    +  /**
    +   * Reset the timeline client
    +   * <p>
    +   * 1. Stop the timeline client service if running.
    +   * 2. set the <code>timelineClient</code> field to <code>None</code>
    +   */
    +  def resetTimelineClient(): Unit = {
    +    stopOptionalService(timelineClient)
    +    timelineClient = None
    +  }
    +
    +  /**
    +   * Split a comma separated String, filter out any empty items, and 
return a Set of strings
    +   */
    +  private def stringToSet(list: String): Set[String] = {
    +    list.split(',').map(_.trim).filter(!_.isEmpty).toSet
    +  }
    +
    +  private def createTimelineDomain(): String = {
    +    val sparkConf = sc.getConf
    +    val aclsOn = sparkConf.getOption("spark.acls.enable").getOrElse(
    +      sparkConf.get("spark.ui.acls.enable", "false")).toBoolean
    +    if (!aclsOn) {
    +      return null
    +    }
    +    val predefDomain = sparkConf.getOption("spark.ui.domain")
    +    if (predefDomain.isDefined) {
    +      domainId = predefDomain.get
    +      return null
    +    }
    +    val current = UserGroupInformation.getCurrentUser.getShortUserName
    +    val adminAcls  = stringToSet(sparkConf.get("spark.admin.acls", ""))
    +    val viewAcls = stringToSet(sparkConf.get("spark.ui.view.acls", ""))
    +    val modifyAcls = stringToSet(sparkConf.get("spark.modify.acls", ""))
    +
    +    val readers = (adminAcls ++ modifyAcls ++ 
viewAcls).foldLeft(current)(_ + " " + _)
    +    val writers = (adminAcls ++ modifyAcls).foldLeft(current)(_ + " " + _)
    +    var tmpId = YarnHistoryService.DOMAIN_ID_PREFIX + appId
    +    logInfo("Creating domain " + tmpId + " with  readers: "
    +      + readers + " and writers:" + writers)
    +    val timelineDomain = new TimelineDomain()
    +    timelineDomain.setId(tmpId)
    +
    +    timelineDomain.setReaders(readers)
    +    timelineDomain.setWriters(writers)
    +    try {
    +      getTimelineClient.putDomain(timelineDomain)
    +    } catch {
    +      case e: YarnException => {
    +        logError("cannot create the domain")
    +        // fallback to default
    +        tmpId = null
    +      }
    +    }
    +    tmpId
    +  }
    +
    +  def start(context: SparkContext, id: ApplicationId): Boolean = {
    +    sc = context
    +    appId = id
    +    val yarnConf = new YarnConfiguration(context.hadoopConfiguration)
    +    // the init() operation checks the state machine & prevents invocation 
out of sequence
    +    init(yarnConf)
    +    batchSize = sc.conf.getInt(YarnHistoryService.BATCH_SIZE, batchSize)
    +
    +    start()
    +    if (yarnConf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
    +      YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) {
    +      true
    +    } else {
    +      logInfo("Yarn timeline service not available, disabling client.")
    +      false
    +    }
    +  }
    +
    +  override protected def serviceStart {
    +    super.serviceStart()
    +    val conf: Configuration = getConfig
    +    if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
    +                             
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) {
    +      timelineWebappAddress = timelineWebappUri(conf).toURL
    +      createTimelineClient
    +      domainId = createTimelineDomain
    +      eventHandlingThread = new Thread(new Dequeue(), 
"HistoryEventHandlingThread")
    +      eventHandlingThread.start
    +    } else {
    +      logInfo("Yarn timeline service not available")
    +    }
    +    // irrespective of state, hook up to the listener
    +    registerListener
    +    logInfo(s"$this")
    +  }
    +
    +  /**
    +   * Return a summary of the service state to help diagnose problems
    +   * during test runs, possibly even production
    +   * @return a summary of the current service state
    +   */
    +  override def toString: String = {
    +    super.toString +
    +        s" endpoint=$timelineWebappAddress" +
    +        s" bonded to ATS=$bondedToATS" +
    +        s" listening=$listening" +
    +        s" batchSize=$batchSize" +
    +        s" flush count=$getFlushCount()" +
    +        s" queue size=$getEventsQueued processed=$getEventsProcessed" +
    +        s" post failures = $getEventPostFailures"
    +  }
    +
    +  def listening: Boolean = {
    +    listener != null;
    +  }
    +
    +  /**
    +   * Is the service hooked up to an ATS server. This does not
    +   * check the validity of the link, only whether or not the service
    +   * has been set up to talk to ATS.
    +   * @return true if the service has a timeline client
    +   */
    +  def bondedToATS: Boolean = {
    +    timelineClient != None;
    +  }
    +
    +  /**
    +   * Add the listener if it is not disabled.
    +   * This is accessible in the same package purely for testing
    +   * @return true if the register was enabled
    +   */
    +  private [yarn] def registerListener: Boolean = {
    +    assert(sc != null, "Null context")
    +    if (sc.conf.getBoolean(YarnHistoryService.REGISTER_LISTENER, true)) {
    +      log.debug("Registering listener to spark context")
    +      listener = new YarnEventListener(sc, this)
    +      sc.listenerBus.addListener(listener)
    +      true
    +    } else {
    +      log.info("Not listening to spark context events")
    +      false
    +    }
    +  }
    +
    +  /**
    +   * Queue an event, or if the service's <code>stopped</code> flag
    +   * is set, discard the event.
    +   * @param event
    +   * @return true if the event was queued
    +   */
    +  def enqueue(event: TimestampEvent): Boolean = {
    +    if (!stopped.get()) {
    +      innerEnqueue(event)
    +      true
    +    } else {
    +      logInfo(s"History service stopped; ignoring queued event : ${event}")
    +      false
    +    }
    +  }
    +
    +  /**
    +   * Inner operation to queue the event. This does not check for service 
state
    +   * @param event
    +   */
    +  private def innerEnqueue(event: TimestampEvent) = {
    +    eventsQueued.incrementAndGet();
    +    log.debug(s"Enqueue ${event}")
    --- End diff --
    
    fixed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to