Github user steveloughran commented on a diff in the pull request:
https://github.com/apache/spark/pull/8744#discussion_r42531200
--- Diff:
yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala
---
@@ -0,0 +1,1048 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history.yarn
+
+import java.net.{ConnectException, URI}
+import java.util.concurrent.atomic.{AtomicLong, AtomicBoolean,
AtomicInteger}
+import java.util.concurrent.{TimeUnit, LinkedBlockingDeque}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.yarn.api.records.timeline.{TimelineDomain,
TimelineEntity, TimelineEvent, TimelinePutResponse}
+import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId,
ApplicationId}
+import org.apache.hadoop.yarn.client.api.TimelineClient
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+
+import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._
+import org.apache.spark.scheduler._
+import org.apache.spark.scheduler.cluster.{YarnExtensionService,
YarnExtensionServiceBinding}
+import org.apache.spark.util.{SystemClock, Utils}
+import org.apache.spark.{Logging, SparkContext}
+
+/**
+ * A Yarn Extension Service to post lifecycle events to a registered
+ * YARN Timeline Server.
+ */
+private[spark] class YarnHistoryService extends YarnExtensionService with
Logging {
+
+ import org.apache.spark.deploy.history.yarn.YarnHistoryService._
+
+ /** Simple state model implemented in an atomic integer */
+ private val _serviceState = new AtomicInteger(CreatedState)
+
+ def serviceState: Int = {
+ _serviceState.get()
+ }
+ def enterState(state: Int): Int = {
+ logDebug(s"Entering state $state from $serviceState")
+ _serviceState.getAndSet(state)
+ }
+
+ /**
+ * Spark context; valid once started
+ */
+ private var sparkContext: SparkContext = _
+
+ /** YARN configuration from the spark context */
+ private var config: YarnConfiguration = _
+
+ /** application ID. */
+ private var _applicationId: ApplicationId = _
+
+ /** attempt ID this will be null if the service is started in
yarn-client mode */
+ private var _attemptId: Option[ApplicationAttemptId] = None
+
+ /** YARN timeline client */
+ private var _timelineClient: Option[TimelineClient] = None
+
+ /** registered event listener */
+ private var listener: Option[YarnEventListener] = None
+
+ /** Application name from the spark start event */
+ private var applicationName: String = _
+
+ /** Application ID*/
+ private var sparkApplicationId: Option[String] = None
+
+ /** Optional Attempt ID from the spark start event */
+ private var sparkApplicationAttemptId: Option[String] = None
+
+ /** user name as derived from `SPARK_USER` env var or `UGI` */
+ private var userName = Utils.getCurrentUserName()
+
+ /** Clock for recording time */
+ private val clock = new SystemClock()
+
+ /**
+ * Start time of the application, as received in the start event.
+ */
+ private var startTime: Long = _
+
+ /**
+ * Start time of the application, as received in the end event.
+ */
+ private var endTime: Long = _
+
+ /** number of events to batch up before posting*/
+ private var _batchSize = DEFAULT_BATCH_SIZE
+
+ /** queue of entities to asynchronously post, plus the number of events
in each entry */
+ private var _entityQueue = new LinkedBlockingDeque[TimelineEntity]()
+
+ /** limit on the total number of events permitted */
+ private var _postQueueLimit = DEFAULT_POST_QUEUE_LIMIT
+
+ /**
+ * List of events which will be pulled into a timeline
+ * entity when created
+ */
+ private var pendingEvents = new mutable.LinkedList[TimelineEvent]()
+
+ private var applicationStartEvent: Option[SparkListenerApplicationStart]
= None
+ private var applicationEndEvent: Option[SparkListenerApplicationEnd] =
None
+
+ /** Has a start event been processed? */
+ private val appStartEventProcessed = new AtomicBoolean(false)
+
+ /* has the application event event been processed */
+ private val appEndEventProcessed = new AtomicBoolean(false)
+
+ /** counter of events processed -that is have been through
handleEvent()*/
+ private val _eventsProcessed = new AtomicLong(0)
+
+ /** counter of events queued. */
+ private val _eventsQueued = new AtomicLong(0)
+
+ private val _entityPostAttempts = new AtomicLong(0)
+ private val _entityPostSuccesses = new AtomicLong(0)
+ /** how many entity postings failed? */
+ private val _entityPostFailures = new AtomicLong(0)
+ private val _eventsDropped = new AtomicLong(0)
+
+ /** how many flushes have taken place? */
+ private val flushCount = new AtomicLong(0)
+
+ /** Event handler */
+ private var eventHandlingThread: Option[Thread] = None
+
+ /**
+ * Flag to indicate the thread is stopped; events aren't being
+ * processed.
+ */
+ private val stopped = new AtomicBoolean(true)
+
+ /**
+ * Boolean to track whether a thread is active or not, for tests to
+ * monitor and see if the thread has completed.
+ */
+ private val postThreadActive = new AtomicBoolean(false)
+
+ /** How long to wait for shutdown before giving up */
+ private var shutdownWaitTime = 0L
--- End diff --
millis: now noted in javadocs
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]