Github user squito commented on a diff in the pull request:
https://github.com/apache/spark/pull/19194#discussion_r139821899
--- Diff:
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -619,6 +625,47 @@ private[spark] class ExecutorAllocationManager(
// place the executors.
private val stageIdToExecutorPlacementHints = new mutable.HashMap[Int,
(Int, Map[String, Int])]
+ override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
+ jobStart.stageInfos.foreach(stageInfo =>
stageIdToJobId(stageInfo.stageId) = jobStart.jobId)
+
+ var jobGroupId = if (jobStart.properties != null) {
+ jobStart.properties.getProperty(SparkContext.SPARK_JOB_GROUP_ID)
+ } else {
+ null
+ }
+
+ val maxConTasks = if (jobGroupId != null &&
+ conf.contains(s"spark.job.$jobGroupId.maxConcurrentTasks")) {
+ conf.get(s"spark.job.$jobGroupId.maxConcurrentTasks").toInt
+ } else {
+ Int.MaxValue
+ }
+
+ if (maxConTasks <= 0) {
+ throw new IllegalArgumentException(
+ "Maximum Concurrent Tasks should be set greater than 0 for the
job to progress.")
+ }
+
+ if (jobGroupId == null ||
!conf.contains(s"spark.job.$jobGroupId.maxConcurrentTasks")) {
+ jobGroupId = DEFAULT_JOB_GROUP
+ }
+
+ jobIdToJobGroup(jobStart.jobId) = jobGroupId
+ if (!jobGroupToMaxConTasks.contains(jobGroupId)) {
--- End diff --
this is probably a weird / unusual situation, but is this really the
behavior you want if there are multiple jobs submitted for the same job group?
Wouldn't you just take the conf for the job group at the time each job was
submitted?
Worst case with this approach: say you are *always* submitting multiple
jobs for each job group; when one finishes, you immediately start another one,
so that the new one partially overlaps the old one. Then even if you change
the conf, all jobs will keep using the old value forever.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]