Github user uncleGen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16936#discussion_r101223265
  
    --- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
 ---
    @@ -437,6 +438,74 @@ class ReceiverTracker(ssc: StreamingContext, 
skipReceiverLaunch: Boolean = false
       }
     
       /**
    +   * Check if existing resource is enough to run job.
    +   */
    +  private def checkResourceValid(): Unit = {
    +    val coresPerTask = ssc.conf.get(CPUS_PER_TASK)
    +
    +    def localCpuCount: Int = Runtime.getRuntime.availableProcessors()
    +
    +    ssc.conf.get("spark.master") match {
    +      case m if m.startsWith("yarn") =>
    +        val numCoresPerExecutor = ssc.conf.get(EXECUTOR_CORES)
    +        val numExecutors = getTargetExecutorNumber()
    +        if (numExecutors * numCoresPerExecutor / coresPerTask < 
numReceivers) {
    +          throw new SparkException("There are no enough resource to run 
Spark Streaming job: " +
    +            s"existing resource can only be used to scheduler some of 
receivers." +
    +            s"$numExecutors executors, $numCoresPerExecutor cores per 
executor, $coresPerTask " +
    +            s"cores per task and $numReceivers receivers.")
    +        }
    +      case m if m.startsWith("spark") || m.startsWith("mesos") =>
    +        val coresMax = ssc.conf.get(CORES_MAX).getOrElse(0)
    +        if (coresMax / coresPerTask < numReceivers) {
    +          throw new SparkException("There are no enough resource to run 
Spark Streaming job: " +
    +            s"existing resource can only be used to scheduler some of 
receivers." +
    +            s"$coresMax cores totally, $coresPerTask cores per task and 
$numReceivers receivers.")
    +        }
    +      case m if m.startsWith("local") =>
    +        m match {
    +          case "local" =>
    +            throw new SparkException("There are no enough resource to run 
Spark Streaming job.")
    +          case SparkMasterRegex.LOCAL_N_REGEX(threads) =>
    +            val threadCount = if (threads == "*") localCpuCount else 
threads.toInt
    +            if (threadCount / coresPerTask < numReceivers) {
    +              throw new SparkException("There are no enough resource to 
run Spark Streaming job: " +
    +                s"existing resource can only be used to scheduler some of 
receivers." +
    +                s"$threadCount threads, $coresPerTask threads per task and 
$numReceivers " +
    +                s"receivers.")
    +            }
    +          case SparkMasterRegex.LOCAL_N_FAILURES_REGEX(threads, 
maxFailures) =>
    +            val threadCount = if (threads == "*") localCpuCount else 
threads.toInt
    +            if (threadCount / coresPerTask < numReceivers) {
    +              throw new SparkException("There are no enough resource to 
run Spark Streaming job: " +
    +                s"existing resource can only be used to scheduler some of 
receivers." +
    +                s"$threadCount threads, $coresPerTask threads per task and 
$numReceivers " +
    +                s"receivers.")
    +            }
    +          case SparkMasterRegex.LOCAL_CLUSTER_REGEX(numSlaves, 
coresPerSlave, memoryPerSlave) =>
    +            val coresMax = numSlaves.toInt * coresPerSlave.toInt
    +            if (coresMax / coresPerTask < numReceivers) {
    +              throw new SparkException("There are no enough resource to 
run Spark Streaming job: " +
    +                s"existing resource can only be used to scheduler some of 
receivers." +
    +                s"$numSlaves slaves, $coresPerSlave cores per slave, 
$coresPerTask " +
    +                s"cores per task and $numReceivers receivers.")
    +            }
    +        }
    +    }
    +  }
    +
    +  private def getTargetExecutorNumber(): Int = {
    +    if (Utils.isDynamicAllocationEnabled(ssc.conf)) {
    +      ssc.conf.get(DYN_ALLOCATION_MAX_EXECUTORS)
    +    } else {
    +      val targetNumExecutors =
    +        sys.env.get("SPARK_EXECUTOR_INSTANCES").map(_.toInt).getOrElse(2)
    +      // System property can override environment variable.
    --- End diff --
    
    here "2" refers to "YarnSparkHadoopUtil.DEFAULT_NUMBER_EXECUTORS"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to