Github user dhruve commented on a diff in the pull request:
https://github.com/apache/spark/pull/19194#discussion_r140293577
--- Diff:
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -619,6 +625,47 @@ private[spark] class ExecutorAllocationManager(
// place the executors.
private val stageIdToExecutorPlacementHints = new mutable.HashMap[Int,
(Int, Map[String, Int])]
+ override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
+ jobStart.stageInfos.foreach(stageInfo =>
stageIdToJobId(stageInfo.stageId) = jobStart.jobId)
+
+ var jobGroupId = if (jobStart.properties != null) {
+ jobStart.properties.getProperty(SparkContext.SPARK_JOB_GROUP_ID)
+ } else {
+ null
+ }
+
+ val maxConTasks = if (jobGroupId != null &&
+ conf.contains(s"spark.job.$jobGroupId.maxConcurrentTasks")) {
+ conf.get(s"spark.job.$jobGroupId.maxConcurrentTasks").toInt
+ } else {
+ Int.MaxValue
+ }
+
+ if (maxConTasks <= 0) {
+ throw new IllegalArgumentException(
+ "Maximum Concurrent Tasks should be set greater than 0 for the
job to progress.")
+ }
+
+ if (jobGroupId == null ||
!conf.contains(s"spark.job.$jobGroupId.maxConcurrentTasks")) {
+ jobGroupId = DEFAULT_JOB_GROUP
+ }
+
+ jobIdToJobGroup(jobStart.jobId) = jobGroupId
+ if (!jobGroupToMaxConTasks.contains(jobGroupId)) {
--- End diff --
I understand your reasoning about setting the maxConTasks every time the
job is set. However, I am not able to understand the scenario which you
described. If a job completes and new one kicks off immediately, how does the
new job partially overlap? Its only when all the stages & underlying tasks for
the previous job have finished we would mark it as complete. So a new job won't
overlap with a completed one. Am I missing something here?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]