Github user andrewor14 commented on a diff in the pull request:
https://github.com/apache/spark/pull/5144#discussion_r27171549
--- Diff:
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
---
@@ -0,0 +1,552 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import java.io.File
+import java.text.SimpleDateFormat
+import java.util.concurrent.atomic.AtomicLong
+import java.util.concurrent.locks.ReentrantLock
+import java.util.{Date, List => JList}
+
+import org.apache.mesos.{SchedulerDriver, Scheduler}
+import org.apache.mesos.Protos._
+import org.apache.spark.deploy.master.DriverState
+import org.apache.spark.deploy.master.DriverState.DriverState
+import org.apache.spark.SparkConf
+import org.apache.spark.SparkException
+import org.apache.spark.util.Utils
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.JavaConversions._
+
+import scala.concurrent.duration.Duration
+import org.apache.mesos.Protos.Environment.Variable
+import org.apache.spark.deploy.mesos.MesosDriverDescription
+import org.apache.mesos.Protos.TaskStatus.Reason
+
+private[spark] class DriverSubmission(
+ val submissionId: String,
+ val desc: MesosDriverDescription,
+ val submitDate: Date) extends Serializable {
+
+ def canEqual(other: Any): Boolean = other.isInstanceOf[DriverSubmission]
+
+ override def equals(other: Any): Boolean = other match {
+ case that: DriverSubmission =>
+ (that canEqual this) &&
+ submissionId == that.submissionId
+ case _ => false
+ }
+}
+
+private [spark] case class ClusterTaskState(
+ val submission: DriverSubmission,
+ val taskId: TaskID,
+ val slaveId: SlaveID,
+ var taskState: Option[TaskStatus],
+ var driverState: DriverState,
+ var startDate: Date,
+ val lastRetry: Option[RetryState] = None) extends Serializable {
+
+ def copy(): ClusterTaskState = {
+ ClusterTaskState(submission, taskId, slaveId, taskState, driverState,
startDate, lastRetry)
+ }
+}
+
+private[spark] case class SubmitResponse(id: String, success: Boolean,
message: String)
+
+private[spark] case class StatusResponse(
+ id: String,
+ success: Boolean,
+ state: String,
+ status: Option[TaskStatus] = None)
+
+private[spark] case class KillResponse(id: String, success: Boolean,
message: String)
+
+private[spark] case class ClusterSchedulerState(
+ appId: String,
+ queuedDrivers: Iterable[DriverSubmission],
+ launchedDrivers: Iterable[ClusterTaskState],
+ finishedDrivers: Iterable[ClusterTaskState],
+ retryList: Iterable[RetryState])
+
+private[spark] trait ClusterScheduler {
+ def submitDriver(desc: MesosDriverDescription): SubmitResponse
+
+ def killDriver(submissionId: String): KillResponse
+
+ def getStatus(submissionId: String): StatusResponse
+
+ def getState(): ClusterSchedulerState
+}
+
+private[spark] class MesosClusterScheduler(
+ engineFactory: ClusterPersistenceEngineFactory,
+ conf: SparkConf) extends Scheduler with MesosSchedulerHelper with
ClusterScheduler {
+
+ var frameworkUrl: String = _
+
+ val master = conf.get("spark.master")
+ val appName = conf.get("spark.app.name")
+ val queuedCapacity = conf.getInt("spark.deploy.mesos.queuedDrivers", 200)
+ val retainedDrivers = conf.getInt("spark.deploy.retainedDrivers", 200)
+ val maxRetryWaitTime = conf.getInt("spark.mesos.cluster.retry.wait.max",
60) // 1 minute
+ val state = engineFactory.createEngine("scheduler")
+ val stateTimeout =
+ Duration.create(conf.getLong("spark.mesos.cluster.recover.timeout",
30), "seconds")
+
+ val stateLock = new ReentrantLock()
+
+ val finishedDrivers = new
mutable.ArrayBuffer[ClusterTaskState](retainedDrivers)
+
+ val nextDriverNumber: AtomicLong = new AtomicLong(0)
+ var appId: String = null
+
+ private var launchedDrivers: LaunchedDrivers = _
+
+ private var queue: DriverQueue = _
+
+ def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss") // For
application IDs
+
+ private var superviseRetryList: SuperviseRetryList = _
+
+ private def newDriverId(submitDate: Date): String = {
+ "driver-%s-%04d".format(
+ createDateFormat.format(submitDate),
nextDriverNumber.incrementAndGet())
+ }
+
+ def submitDriver(desc: MesosDriverDescription): SubmitResponse = {
+ stateLock.synchronized {
+ if (queue.isFull) {
+ return SubmitResponse("", false, "Already reached maximum
submission size")
+ }
+
+ val submitDate: Date = new Date()
+ val submissionId: String = newDriverId(submitDate)
+ val submission = new DriverSubmission(submissionId, desc, submitDate)
+ queue.offer(submission)
+ SubmitResponse(submissionId, true, "")
+ }
+ }
+
+ def killDriver(submissionId: String): KillResponse = {
+ stateLock.synchronized {
+ // We look for the requested driver in the following places:
+ // 1. Check if submission is running or launched.
+ // 2. Check if it's still queued.
+ // 3. Check if it's in the retry list.
+ if (launchedDrivers.contains(submissionId)) {
+ val task = launchedDrivers.get(submissionId)
+ driver.killTask(task.taskId)
+ return KillResponse(submissionId, true, "Killing running driver")
+ } else if (queue.remove(submissionId)) {
+ return KillResponse(submissionId, true, "Removed driver while it's
still pending")
+ } else if (superviseRetryList.remove(submissionId)) {
+ return KillResponse(submissionId, true, "Removed driver while it's
retrying")
+ } else {
+ return KillResponse(submissionId, false, "Cannot find driver")
--- End diff --
no need to use `return`; we're in scalaland
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]