Github user dhruve commented on a diff in the pull request: https://github.com/apache/spark/pull/19194#discussion_r140124886 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -619,6 +625,47 @@ private[spark] class ExecutorAllocationManager( // place the executors. private val stageIdToExecutorPlacementHints = new mutable.HashMap[Int, (Int, Map[String, Int])] + override def onJobStart(jobStart: SparkListenerJobStart): Unit = { + jobStart.stageInfos.foreach(stageInfo => stageIdToJobId(stageInfo.stageId) = jobStart.jobId) + + var jobGroupId = if (jobStart.properties != null) { + jobStart.properties.getProperty(SparkContext.SPARK_JOB_GROUP_ID) + } else { + null + } + + val maxConTasks = if (jobGroupId != null && + conf.contains(s"spark.job.$jobGroupId.maxConcurrentTasks")) { + conf.get(s"spark.job.$jobGroupId.maxConcurrentTasks").toInt + } else { + Int.MaxValue + } + + if (maxConTasks <= 0) { + throw new IllegalArgumentException( + "Maximum Concurrent Tasks should be set greater than 0 for the job to progress.") + } + + if (jobGroupId == null || !conf.contains(s"spark.job.$jobGroupId.maxConcurrentTasks")) { + jobGroupId = DEFAULT_JOB_GROUP + } + + jobIdToJobGroup(jobStart.jobId) = jobGroupId + if (!jobGroupToMaxConTasks.contains(jobGroupId)) { --- End diff -- From my understanding and in context of the current code, you would group jobs together when you want their concurrency to be limited. If you want different concurrency limits for different jobs, you would set them in a different jobgroup altogether. If there are multiple jobs in the same job group which run concurrently and one of them sets a value different, then which one wins for the existing jobs and the new job? If we want to have a different value for every job then the user would need a way to know and identify a spark job in his application code , probably by a job id. Only by means of identifying a job, would the user be able to set the config for that job. This cannot be known apriori and I don't know if there is an easy way that the user can know about the underlying spark job corresponding to the action. Hence we apply a setting at the jobgroup level which allows the user to allow him to control the concurrency without knowing the underlying job related details specific to spark in an easy manner. Let me know if anything is unclear here or if you have more questions.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org