Github user uncleGen commented on a diff in the pull request: https://github.com/apache/spark/pull/16936#discussion_r101223265 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala --- @@ -437,6 +438,74 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false } /** + * Check if existing resource is enough to run job. + */ + private def checkResourceValid(): Unit = { + val coresPerTask = ssc.conf.get(CPUS_PER_TASK) + + def localCpuCount: Int = Runtime.getRuntime.availableProcessors() + + ssc.conf.get("spark.master") match { + case m if m.startsWith("yarn") => + val numCoresPerExecutor = ssc.conf.get(EXECUTOR_CORES) + val numExecutors = getTargetExecutorNumber() + if (numExecutors * numCoresPerExecutor / coresPerTask < numReceivers) { + throw new SparkException("There are no enough resource to run Spark Streaming job: " + + s"existing resource can only be used to scheduler some of receivers." + + s"$numExecutors executors, $numCoresPerExecutor cores per executor, $coresPerTask " + + s"cores per task and $numReceivers receivers.") + } + case m if m.startsWith("spark") || m.startsWith("mesos") => + val coresMax = ssc.conf.get(CORES_MAX).getOrElse(0) + if (coresMax / coresPerTask < numReceivers) { + throw new SparkException("There are no enough resource to run Spark Streaming job: " + + s"existing resource can only be used to scheduler some of receivers." + + s"$coresMax cores totally, $coresPerTask cores per task and $numReceivers receivers.") + } + case m if m.startsWith("local") => + m match { + case "local" => + throw new SparkException("There are no enough resource to run Spark Streaming job.") + case SparkMasterRegex.LOCAL_N_REGEX(threads) => + val threadCount = if (threads == "*") localCpuCount else threads.toInt + if (threadCount / coresPerTask < numReceivers) { + throw new SparkException("There are no enough resource to run Spark Streaming job: " + + s"existing resource can only be used to scheduler some of receivers." + + s"$threadCount threads, $coresPerTask threads per task and $numReceivers " + + s"receivers.") + } + case SparkMasterRegex.LOCAL_N_FAILURES_REGEX(threads, maxFailures) => + val threadCount = if (threads == "*") localCpuCount else threads.toInt + if (threadCount / coresPerTask < numReceivers) { + throw new SparkException("There are no enough resource to run Spark Streaming job: " + + s"existing resource can only be used to scheduler some of receivers." + + s"$threadCount threads, $coresPerTask threads per task and $numReceivers " + + s"receivers.") + } + case SparkMasterRegex.LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) => + val coresMax = numSlaves.toInt * coresPerSlave.toInt + if (coresMax / coresPerTask < numReceivers) { + throw new SparkException("There are no enough resource to run Spark Streaming job: " + + s"existing resource can only be used to scheduler some of receivers." + + s"$numSlaves slaves, $coresPerSlave cores per slave, $coresPerTask " + + s"cores per task and $numReceivers receivers.") + } + } + } + } + + private def getTargetExecutorNumber(): Int = { + if (Utils.isDynamicAllocationEnabled(ssc.conf)) { + ssc.conf.get(DYN_ALLOCATION_MAX_EXECUTORS) + } else { + val targetNumExecutors = + sys.env.get("SPARK_EXECUTOR_INSTANCES").map(_.toInt).getOrElse(2) + // System property can override environment variable. --- End diff -- here "2" refers to "YarnSparkHadoopUtil.DEFAULT_NUMBER_EXECUTORS"
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org