Github user dhruve commented on a diff in the pull request:
https://github.com/apache/spark/pull/19194#discussion_r140361162
--- Diff:
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -619,6 +625,47 @@ private[spark] class ExecutorAllocationManager(
// place the executors.
private val stageIdToExecutorPlacementHints = new mutable.HashMap[Int,
(Int, Map[String, Int])]
+ override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
+ jobStart.stageInfos.foreach(stageInfo =>
stageIdToJobId(stageInfo.stageId) = jobStart.jobId)
+
+ var jobGroupId = if (jobStart.properties != null) {
+ jobStart.properties.getProperty(SparkContext.SPARK_JOB_GROUP_ID)
+ } else {
+ null
+ }
+
+ val maxConTasks = if (jobGroupId != null &&
+ conf.contains(s"spark.job.$jobGroupId.maxConcurrentTasks")) {
+ conf.get(s"spark.job.$jobGroupId.maxConcurrentTasks").toInt
+ } else {
+ Int.MaxValue
+ }
+
+ if (maxConTasks <= 0) {
+ throw new IllegalArgumentException(
+ "Maximum Concurrent Tasks should be set greater than 0 for the
job to progress.")
+ }
+
+ if (jobGroupId == null ||
!conf.contains(s"spark.job.$jobGroupId.maxConcurrentTasks")) {
+ jobGroupId = DEFAULT_JOB_GROUP
+ }
+
+ jobIdToJobGroup(jobStart.jobId) = jobGroupId
+ if (!jobGroupToMaxConTasks.contains(jobGroupId)) {
--- End diff --
@squito Thanks for elaborating on this. I believe the last para of your
explanation summarized your viewpoint. In a way that makes some sense because
if you want to increase your max concurrent tasks, you know what you are doing,
so if you see weird thing with other threads that you created, its fine.
However, when you don't control the thread creation, I feel its best to set it
just once to avoid the weirdness. Its much easier to use a different job group
than explain one more weird behavior.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]