httfighter closed pull request #23436: [Spark 26524] If the application 
directory fails to be created on the SPARK_WORKER_DIR on some woker nodes (for 
example, bad disk or disk has no capacity), the application executor will be 
allocated indefinitely.
URL: https://github.com/apache/spark/pull/23436
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/resources/org/apache/spark/ui/static/utils.js 
b/core/src/main/resources/org/apache/spark/ui/static/utils.js
index deeafad4eb5f5..22985e31a7808 100644
--- a/core/src/main/resources/org/apache/spark/ui/static/utils.js
+++ b/core/src/main/resources/org/apache/spark/ui/static/utils.js
@@ -40,9 +40,9 @@ function formatDuration(milliseconds) {
 function formatBytes(bytes, type) {
     if (type !== 'display') return bytes;
     if (bytes == 0) return '0.0 B';
-    var k = 1000;
+    var k = 1024;
     var dm = 1;
-    var sizes = ['B', 'KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB'];
+    var sizes = ['B', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB', 'EiB', 'ZiB', 'YiB'];
     var i = Math.floor(Math.log(bytes) / Math.log(k));
     return parseFloat((bytes / Math.pow(k, i)).toFixed(dm)) + ' ' + sizes[i];
 }
diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala 
b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
index 49a319abb3238..866cc39953dac 100644
--- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
@@ -63,7 +63,8 @@ private[deploy] object DeployMessages {
       execId: Int,
       state: ExecutorState,
       message: Option[String],
-      exitStatus: Option[Int])
+      exitStatus: Option[Int],
+      workerId: String = "")
     extends DeployMessage
 
   case class DriverStateChanged(
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala 
b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index e1184248af460..d9cd2f2bed8de 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -25,11 +25,10 @@ import scala.collection.mutable.{ArrayBuffer, HashMap, 
HashSet}
 import scala.util.Random
 
 import org.apache.spark.{SecurityManager, SparkConf, SparkException}
-import org.apache.spark.deploy.{ApplicationDescription, DriverDescription,
-  ExecutorState, SparkHadoopUtil}
+import org.apache.spark.deploy.{ApplicationDescription, DriverDescription, 
ExecutorState, SparkHadoopUtil}
 import org.apache.spark.deploy.DeployMessages._
 import org.apache.spark.deploy.master.DriverState.DriverState
-import org.apache.spark.deploy.master.MasterMessages._
+import 
org.apache.spark.deploy.master.MasterMessages.{CheckForWorkerBlackTimeOut, _}
 import org.apache.spark.deploy.master.ui.MasterWebUI
 import org.apache.spark.deploy.rest.StandaloneRestServer
 import org.apache.spark.internal.Logging
@@ -60,6 +59,9 @@ private[deploy] class Master(
   private val REAPER_ITERATIONS = conf.getInt("spark.dead.worker.persistence", 
15)
   private val RECOVERY_MODE = conf.get("spark.deploy.recoveryMode", "NONE")
   private val MAX_EXECUTOR_RETRIES = 
conf.getInt("spark.deploy.maxExecutorRetries", 10)
+  private val MAX_EXECUTOR_THRESHOLD = conf.getInt(
+    "spark.deploy.executorFailedPerWorkerThreshold", 10)
+  private val WORKER_BLACK_TIMEOUT_MS = 
conf.getLong("spark.worker.black.timeout", 3600) * 1000
 
   val workers = new HashSet[WorkerInfo]
   val idToApp = new HashMap[String, ApplicationInfo]
@@ -108,6 +110,8 @@ private[deploy] class Master(
 
   private var checkForWorkerTimeOutTask: ScheduledFuture[_] = _
 
+  private var checkForWorkerBlackTimeOutTask: ScheduledFuture[_] = _
+
   // As a temporary workaround before better ways of configuring memory, we 
allow users to set
   // a flag that will perform round-robin scheduling across the nodes 
(spreading out each app
   // among all the nodes) instead of trying to consolidate each app onto a 
small # of nodes.
@@ -151,6 +155,12 @@ private[deploy] class Master(
       }
     }, 0, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS)
 
+    checkForWorkerBlackTimeOutTask = 
forwardMessageThread.scheduleAtFixedRate(new Runnable {
+      override def run(): Unit = Utils.tryLogNonFatalError {
+        self.send(CheckForWorkerBlackTimeOut)
+      }
+    }, 0, WORKER_BLACK_TIMEOUT_MS, TimeUnit.MILLISECONDS)
+
     if (restServerEnabled) {
       val port = conf.getInt("spark.master.rest.port", 6066)
       restServer = Some(new StandaloneRestServer(address.host, port, conf, 
self, masterUrl))
@@ -199,6 +209,9 @@ private[deploy] class Master(
     if (checkForWorkerTimeOutTask != null) {
       checkForWorkerTimeOutTask.cancel(true)
     }
+    if (checkForWorkerBlackTimeOutTask != null) {
+      checkForWorkerBlackTimeOutTask.cancel(true)
+    }
     forwardMessageThread.shutdownNow()
     webUi.stop()
     restServer.foreach(_.stop())
@@ -278,7 +291,7 @@ private[deploy] class Master(
         schedule()
       }
 
-    case ExecutorStateChanged(appId, execId, state, message, exitStatus) =>
+    case ExecutorStateChanged(appId, execId, state, message, exitStatus, 
workerId) =>
       val execOption = idToApp.get(appId).flatMap(app => 
app.executors.get(execId))
       execOption match {
         case Some(exec) =>
@@ -318,6 +331,15 @@ private[deploy] class Master(
                 removeApplication(appInfo, ApplicationState.FAILED)
               }
             }
+
+            if (state == ExecutorState.FAILED && exitStatus.isEmpty && 
workerId.nonEmpty) {
+
+              // Only retry certain number of times so we don't go into an 
infinite loop
+              // to allocate executor on the same worker
+              workers.filter(_.id == workerId).foreach(worker =>
+                handleExecutorLaunchFail(worker, appInfo.id))
+            }
+
           }
           schedule()
         case None =>
@@ -416,6 +438,18 @@ private[deploy] class Master(
     case CheckForWorkerTimeOut =>
       timeOutDeadWorkers()
 
+    case CheckForWorkerBlackTimeOut =>
+      timeOutBlackedWorkers()
+
+  }
+
+  def handleExecutorLaunchFail(worker: WorkerInfo, appId: String): Unit = {
+     worker.increaseFailedAppCount(appId)
+      if (MAX_EXECUTOR_THRESHOLD >= 0 && worker.getFailedFailedCount(appId) >=
+        MAX_EXECUTOR_THRESHOLD) {
+        worker.setIsBlack()
+        worker.removeFailedAppCount(appId)
+      }
   }
 
   override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, 
Unit] = {
@@ -681,7 +715,7 @@ private[deploy] class Master(
         // Filter out workers that don't have enough resources to launch an 
executor
         val usableWorkers = workers.toArray.filter(_.state == 
WorkerState.ALIVE)
           .filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB 
&&
-            worker.coresFree >= coresPerExecutor)
+            worker.coresFree >= coresPerExecutor && !worker.isBlack)
           .sortBy(_.coresFree).reverse
         val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, 
spreadOutApps)
 
@@ -858,6 +892,7 @@ private[deploy] class Master(
 
   private def finishApplication(app: ApplicationInfo) {
     removeApplication(app, ApplicationState.FINISHED)
+    workers.filter(_.isBlack).foreach(_.removeFailedAppCount(app.id))
   }
 
   def removeApplication(app: ApplicationInfo, state: ApplicationState.Value) {
@@ -1001,6 +1036,13 @@ private[deploy] class Master(
     }
   }
 
+  /** Check for, and set, any timed-out workers' black false */
+  private def timeOutBlackedWorkers() {
+    val currentTime = System.currentTimeMillis()
+    workers.filter(w => w.isBlack && (w.lastBlackTime < currentTime - 
WORKER_BLACK_TIMEOUT_MS))
+      .foreach(w => w.unsetBlack())
+  }
+
   private def newDriverId(submitDate: Date): String = {
     val appId = "driver-%s-%04d".format(createDateFormat.format(submitDate), 
nextDriverNumber)
     nextDriverNumber += 1
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala 
b/core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala
index a952cee36eb44..c6d1f570e9ae5 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala
@@ -32,6 +32,8 @@ private[master] object MasterMessages {
 
   case object CheckForWorkerTimeOut
 
+  case object CheckForWorkerBlackTimeOut
+
   case class BeginRecovery(storedApps: Seq[ApplicationInfo], storedWorkers: 
Seq[WorkerInfo])
 
   case object CompleteRecovery
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala 
b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala
index c87d6e24b78c6..62ddfb9a2b285 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala
@@ -37,12 +37,14 @@ private[spark] class WorkerInfo(
 
   @transient var executors: mutable.HashMap[String, ExecutorDesc] = _ // 
executorId => info
   @transient var drivers: mutable.HashMap[String, DriverInfo] = _ // driverId 
=> info
+  @transient var appIdToRetryCount: mutable.HashMap[String, Int] = _
   @transient var state: WorkerState.Value = _
   @transient var coresUsed: Int = _
   @transient var memoryUsed: Int = _
+  @transient var isBlack: Boolean = _
 
   @transient var lastHeartbeat: Long = _
-
+  @transient var lastBlackTime: Long = _
   init()
 
   def coresFree: Int = cores - coresUsed
@@ -56,10 +58,13 @@ private[spark] class WorkerInfo(
   private def init() {
     executors = new mutable.HashMap
     drivers = new mutable.HashMap
+    appIdToRetryCount = new mutable.HashMap
     state = WorkerState.ALIVE
     coresUsed = 0
     memoryUsed = 0
     lastHeartbeat = System.currentTimeMillis()
+    isBlack = false
+    lastBlackTime = 0
   }
 
   def hostPort: String = {
@@ -101,5 +106,27 @@ private[spark] class WorkerInfo(
     this.state = state
   }
 
+  def setIsBlack(): Unit = {
+    this.isBlack = true
+    this.lastBlackTime = System.currentTimeMillis()
+  }
+
+  def unsetBlack(): Unit = {
+    this.isBlack = false
+    this.lastBlackTime = 0
+  }
+
+  def increaseFailedAppCount(appId: String) {
+    appIdToRetryCount(appId) = appIdToRetryCount.get(appId).map(_ + 
1).getOrElse(1)
+  }
+
+  def getFailedFailedCount(appId: String): Int = {
+    appIdToRetryCount.getOrElse(appId, 1)
+  }
+
+  def removeFailedAppCount(appId: String) {
+    appIdToRetryCount.remove(appId)
+  }
+
   def isAlive(): Boolean = this.state == WorkerState.ALIVE
 }
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index d5ea2523c628b..f47663bdd5cf8 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -551,11 +551,11 @@ private[deploy] class Worker(
               executors -= appId + "/" + execId
             }
             sendToMaster(ExecutorStateChanged(appId, execId, 
ExecutorState.FAILED,
-              Some(e.toString), None))
+              Some(e.toString), None, workerId))
         }
       }
 
-    case executorStateChanged @ ExecutorStateChanged(appId, execId, state, 
message, exitStatus) =>
+    case executorStateChanged @ ExecutorStateChanged(_, _, _, _, _, _) =>
       handleExecutorStateChanged(executorStateChanged)
 
     case KillExecutor(masterUrl, appId, execId) =>
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 227c9e734f0af..cab01a15aa4db 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -1103,30 +1103,30 @@ private[spark] object Utils extends Logging {
   def bytesToString(size: Long): String = bytesToString(BigInt(size))
 
   def bytesToString(size: BigInt): String = {
-    val EB = 1L << 60
-    val PB = 1L << 50
-    val TB = 1L << 40
-    val GB = 1L << 30
-    val MB = 1L << 20
-    val KB = 1L << 10
-
-    if (size >= BigInt(1L << 11) * EB) {
+    val EiB = 1L << 60
+    val PiB = 1L << 50
+    val TiB = 1L << 40
+    val GiB = 1L << 30
+    val MiB = 1L << 20
+    val KiB = 1L << 10
+
+    if (size >= BigInt(1L << 11) * EiB) {
       // The number is too large, show it in scientific notation.
       BigDecimal(size, new MathContext(3, RoundingMode.HALF_UP)).toString() + 
" B"
     } else {
       val (value, unit) = {
-        if (size >= 2 * EB) {
-          (BigDecimal(size) / EB, "EB")
-        } else if (size >= 2 * PB) {
-          (BigDecimal(size) / PB, "PB")
-        } else if (size >= 2 * TB) {
-          (BigDecimal(size) / TB, "TB")
-        } else if (size >= 2 * GB) {
-          (BigDecimal(size) / GB, "GB")
-        } else if (size >= 2 * MB) {
-          (BigDecimal(size) / MB, "MB")
-        } else if (size >= 2 * KB) {
-          (BigDecimal(size) / KB, "KB")
+        if (size >= 2 * EiB) {
+          (BigDecimal(size) / EiB, "EiB")
+        } else if (size >= 2 * PiB) {
+          (BigDecimal(size) / PiB, "PiB")
+        } else if (size >= 2 * TiB) {
+          (BigDecimal(size) / TiB, "TiB")
+        } else if (size >= 2 * GiB) {
+          (BigDecimal(size) / GiB, "GiB")
+        } else if (size >= 2 * MiB) {
+          (BigDecimal(size) / MiB, "MiB")
+        } else if (size >= 2 * KiB) {
+          (BigDecimal(size) / KiB, "KiB")
         } else {
           (BigDecimal(size), "B")
         }
diff --git a/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala 
b/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala
index 959cf58fa0536..6f60b08088cd1 100644
--- a/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala
@@ -128,7 +128,7 @@ class DiskStoreSuite extends SparkFunSuite {
 
     assert(e.getMessage ===
       s"requirement failed: can't create a byte buffer of size 
${blockData.size}" +
-      " since it exceeds 10.0 KB.")
+      " since it exceeds 10.0 KiB.")
   }
 
   test("block data encryption") {
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala 
b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index 901a724da8a1b..64a4302b81a7c 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -189,13 +189,13 @@ class UtilsSuite extends SparkFunSuite with 
ResetSystemProperties with Logging {
   test("bytesToString") {
     assert(Utils.bytesToString(10) === "10.0 B")
     assert(Utils.bytesToString(1500) === "1500.0 B")
-    assert(Utils.bytesToString(2000000) === "1953.1 KB")
-    assert(Utils.bytesToString(2097152) === "2.0 MB")
-    assert(Utils.bytesToString(2306867) === "2.2 MB")
-    assert(Utils.bytesToString(5368709120L) === "5.0 GB")
-    assert(Utils.bytesToString(5L * (1L << 40)) === "5.0 TB")
-    assert(Utils.bytesToString(5L * (1L << 50)) === "5.0 PB")
-    assert(Utils.bytesToString(5L * (1L << 60)) === "5.0 EB")
+    assert(Utils.bytesToString(2000000) === "1953.1 KiB")
+    assert(Utils.bytesToString(2097152) === "2.0 MiB")
+    assert(Utils.bytesToString(2306867) === "2.2 MiB")
+    assert(Utils.bytesToString(5368709120L) === "5.0 GiB")
+    assert(Utils.bytesToString(5L * (1L << 40)) === "5.0 TiB")
+    assert(Utils.bytesToString(5L * (1L << 50)) === "5.0 PiB")
+    assert(Utils.bytesToString(5L * (1L << 60)) === "5.0 EiB")
     assert(Utils.bytesToString(BigInt(1L << 11) * (1L << 60)) === "2.36E+21 B")
   }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
index cb562d65b6147..02dc32d5f90ba 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
@@ -227,12 +227,12 @@ class StatisticsCollectionSuite extends 
StatisticsCollectionTestBase with Shared
       BigInt(0) -> (("0.0 B", "0")),
       BigInt(100) -> (("100.0 B", "100")),
       BigInt(2047) -> (("2047.0 B", "2.05E+3")),
-      BigInt(2048) -> (("2.0 KB", "2.05E+3")),
-      BigInt(3333333) -> (("3.2 MB", "3.33E+6")),
-      BigInt(4444444444L) -> (("4.1 GB", "4.44E+9")),
-      BigInt(5555555555555L) -> (("5.1 TB", "5.56E+12")),
-      BigInt(6666666666666666L) -> (("5.9 PB", "6.67E+15")),
-      BigInt(1L << 10 ) * (1L << 60) -> (("1024.0 EB", "1.18E+21")),
+      BigInt(2048) -> (("2.0 KiB", "2.05E+3")),
+      BigInt(3333333) -> (("3.2 MiB", "3.33E+6")),
+      BigInt(4444444444L) -> (("4.1 GiB", "4.44E+9")),
+      BigInt(5555555555555L) -> (("5.1 TiB", "5.56E+12")),
+      BigInt(6666666666666666L) -> (("5.9 PiB", "6.67E+15")),
+      BigInt(1L << 10 ) * (1L << 60) -> (("1024.0 EiB", "1.18E+21")),
       BigInt(1L << 11) * (1L << 60) -> (("2.36E+21 B", "2.36E+21"))
     )
     numbers.foreach { case (input, (expectedSize, expectedRows)) =>


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to