[GitHub] spark issue #22288: [SPARK-22148][SPARK-15815][Scheduler] Acquire new execut...
Github user dhruve commented on the issue: https://github.com/apache/spark/pull/22288 Thanks for the reviews and feedback @tgravescs , @squito ! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22288: [SPARK-22148][SPARK-15815][Scheduler] Acquire new execut...
Github user dhruve commented on the issue: https://github.com/apache/spark/pull/22288 @squito I have tested it again with both scenarios and I was able to verify the expected behavior. For the cases that are not covered in the PR, i will mention them in the jira. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22288: [SPARK-22148][SPARK-15815][Scheduler] Acquire new execut...
Github user dhruve commented on the issue: https://github.com/apache/spark/pull/22288 @tgravescs I have fixed a nit and its good to be reviewed. @squito I have updated the comment, let me know if its okay. Thanks for the reviews. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22288: [SPARK-22148][SPARK-15815][Scheduler] Acquire new...
Github user dhruve commented on a diff in the pull request: https://github.com/apache/spark/pull/22288#discussion_r228637254 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -415,9 +420,54 @@ private[spark] class TaskSchedulerImpl( launchedAnyTask |= launchedTaskAtCurrentMaxLocality } while (launchedTaskAtCurrentMaxLocality) } + if (!launchedAnyTask) { - taskSet.abortIfCompletelyBlacklisted(hostToExecutors) + taskSet.getCompletelyBlacklistedTaskIfAny(hostToExecutors) match { +case Some(taskIndex) => // Returns the taskIndex which was unschedulable + + // If the taskSet is unschedulable we try to find an existing idle blacklisted + // executor. If we cannot find one, we abort immediately. Else we kill the idle + // executor and kick off an abortTimer which if it doesn't schedule a task within the + // the timeout will abort the taskSet if we were unable to schedule any task from the + // taskSet. + // Note 1: We keep track of schedulability on a per taskSet basis rather than on a per + // task basis. + // Note 2: The taskSet can still be aborted when there are more than one idle + // blacklisted executors and dynamic allocation is on. This can happen when a killed + // idle executor isn't replaced in time by ExecutorAllocationManager as it relies on + // pending tasks and doesn't kill executors on idle timeouts, resulting in the abort + // timer to expire and abort the taskSet. + executorIdToRunningTaskIds.find(x => !isExecutorBusy(x._1)) match { +case Some ((executorId, _)) => + if (!unschedulableTaskSetToExpiryTime.contains(taskSet)) { +blacklistTrackerOpt.foreach(blt => blt.killBlacklistedIdleExecutor(executorId)) + +val timeout = conf.get(config.UNSCHEDULABLE_TASKSET_TIMEOUT) * 1000 +unschedulableTaskSetToExpiryTime(taskSet) = clock.getTimeMillis() + timeout +logInfo(s"Waiting for $timeout ms for completely " + + s"blacklisted task to be schedulable again before aborting $taskSet.") +abortTimer.schedule( + createUnschedulableTaskSetAbortTimer(taskSet, taskIndex), timeout) + } +case _ => // Abort Immediately + logInfo("Cannot schedule any task because of complete blacklisting. No idle" + +s" executors can be found to kill. Aborting $taskSet." ) + taskSet.abortSinceCompletelyBlacklisted(taskIndex) + } +case _ => // Do nothing if no tasks completely blacklisted. + } +} else { + // We want to defer killing any taskSets as long as we have a non blacklisted executor + // which can be used to schedule a task from any active taskSets. This ensures that the + // job can make progress and if we encounter a flawed taskSet it will eventually either + // fail or abort due to being completely blacklisted. --- End diff -- Your understanding is correct. I will update the comment. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22288: [SPARK-22148][SPARK-15815][Scheduler] Acquire new...
Github user dhruve commented on a diff in the pull request: https://github.com/apache/spark/pull/22288#discussion_r228636999 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -415,9 +420,54 @@ private[spark] class TaskSchedulerImpl( launchedAnyTask |= launchedTaskAtCurrentMaxLocality } while (launchedTaskAtCurrentMaxLocality) } + if (!launchedAnyTask) { - taskSet.abortIfCompletelyBlacklisted(hostToExecutors) + taskSet.getCompletelyBlacklistedTaskIfAny(hostToExecutors) match { +case Some(taskIndex) => // Returns the taskIndex which was unschedulable + + // If the taskSet is unschedulable we try to find an existing idle blacklisted + // executor. If we cannot find one, we abort immediately. Else we kill the idle + // executor and kick off an abortTimer which if it doesn't schedule a task within the + // the timeout will abort the taskSet if we were unable to schedule any task from the + // taskSet. + // Note 1: We keep track of schedulability on a per taskSet basis rather than on a per + // task basis. + // Note 2: The taskSet can still be aborted when there are more than one idle + // blacklisted executors and dynamic allocation is on. This can happen when a killed + // idle executor isn't replaced in time by ExecutorAllocationManager as it relies on + // pending tasks and doesn't kill executors on idle timeouts, resulting in the abort + // timer to expire and abort the taskSet. + executorIdToRunningTaskIds.find(x => !isExecutorBusy(x._1)) match { +case Some ((executorId, _)) => + if (!unschedulableTaskSetToExpiryTime.contains(taskSet)) { +blacklistTrackerOpt.foreach(blt => blt.killBlacklistedIdleExecutor(executorId)) + +val timeout = conf.get(config.UNSCHEDULABLE_TASKSET_TIMEOUT) * 1000 +unschedulableTaskSetToExpiryTime(taskSet) = clock.getTimeMillis() + timeout +logInfo(s"Waiting for $timeout ms for completely " + + s"blacklisted task to be schedulable again before aborting $taskSet.") +abortTimer.schedule( + createUnschedulableTaskSetAbortTimer(taskSet, taskIndex), timeout) + } +case _ => // Abort Immediately --- End diff -- Makes sense. Will update it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22288: [SPARK-22148][SPARK-15815][Scheduler] Acquire new...
Github user dhruve commented on a diff in the pull request: https://github.com/apache/spark/pull/22288#discussion_r228636880 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -415,9 +420,54 @@ private[spark] class TaskSchedulerImpl( launchedAnyTask |= launchedTaskAtCurrentMaxLocality } while (launchedTaskAtCurrentMaxLocality) } + if (!launchedAnyTask) { - taskSet.abortIfCompletelyBlacklisted(hostToExecutors) + taskSet.getCompletelyBlacklistedTaskIfAny(hostToExecutors) match { +case Some(taskIndex) => // Returns the taskIndex which was unschedulable + + // If the taskSet is unschedulable we try to find an existing idle blacklisted + // executor. If we cannot find one, we abort immediately. Else we kill the idle + // executor and kick off an abortTimer which if it doesn't schedule a task within the + // the timeout will abort the taskSet if we were unable to schedule any task from the + // taskSet. + // Note 1: We keep track of schedulability on a per taskSet basis rather than on a per + // task basis. + // Note 2: The taskSet can still be aborted when there are more than one idle + // blacklisted executors and dynamic allocation is on. This can happen when a killed + // idle executor isn't replaced in time by ExecutorAllocationManager as it relies on + // pending tasks and doesn't kill executors on idle timeouts, resulting in the abort + // timer to expire and abort the taskSet. + executorIdToRunningTaskIds.find(x => !isExecutorBusy(x._1)) match { +case Some ((executorId, _)) => + if (!unschedulableTaskSetToExpiryTime.contains(taskSet)) { +blacklistTrackerOpt.foreach(blt => blt.killBlacklistedIdleExecutor(executorId)) + +val timeout = conf.get(config.UNSCHEDULABLE_TASKSET_TIMEOUT) * 1000 +unschedulableTaskSetToExpiryTime(taskSet) = clock.getTimeMillis() + timeout +logInfo(s"Waiting for $timeout ms for completely " + + s"blacklisted task to be schedulable again before aborting $taskSet.") +abortTimer.schedule( + createUnschedulableTaskSetAbortTimer(taskSet, taskIndex), timeout) + } +case _ => // Abort Immediately + logInfo("Cannot schedule any task because of complete blacklisting. No idle" + +s" executors can be found to kill. Aborting $taskSet." ) + taskSet.abortSinceCompletelyBlacklisted(taskIndex) + } +case _ => // Do nothing if no tasks completely blacklisted. --- End diff -- I have seen this style earlier in the code base. Is this a norm (just curious)? I read a few scenarios where this would be better. However, personally every time I read a foreach, its instinctive to think the entity on which its being invoked as an iterable rather than an option, so it feels a bit odd. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22288: [SPARK-22148][SPARK-15815][Scheduler] Acquire new execut...
Github user dhruve commented on the issue: https://github.com/apache/spark/pull/22288 @squito for the locality wait, it would be the same as the condition where it is not completely blacklisted. I have added a test for this. If we want to ensure the sequence for the timeout expiring and the task being scheduled, we will have to add some more delay. Let me know if we want to do it, or the test seems to suffice. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22288: [SPARK-22148][SPARK-15815][Scheduler] Acquire new...
Github user dhruve commented on a diff in the pull request: https://github.com/apache/spark/pull/22288#discussion_r227080534 --- Diff: core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala --- @@ -503,6 +507,145 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B verify(tsm).abort(anyString(), anyObject()) } + test("SPARK-22148 abort timer should kick in when task is completely blacklisted & no new " + +"executor can be acquired") { +// set the abort timer to fail immediately +taskScheduler = setupSchedulerWithMockTaskSetBlacklist( + config.UNSCHEDULABLE_TASKSET_TIMEOUT.key -> "0") + +// We have only 1 task remaining with 1 executor +val taskSet = FakeTask.createTaskSet(numTasks = 1, stageAttemptId = 0) +taskScheduler.submitTasks(taskSet) +val tsm = stageToMockTaskSetManager(0) + +// submit an offer with one executor +val firstTaskAttempts = taskScheduler.resourceOffers(IndexedSeq( + WorkerOffer("executor0", "host0", 1) +)).flatten + +// Fail the running task +val failedTask = firstTaskAttempts.find(_.executorId == "executor0").get +taskScheduler.statusUpdate(failedTask.taskId, TaskState.FAILED, ByteBuffer.allocate(0)) +// we explicitly call the handleFailedTask method here to avoid adding a sleep in the test suite +// Reason being - handleFailedTask is run by an executor service and there is a momentary delay +// before it is launched and this fails the assertion check. +tsm.handleFailedTask(failedTask.taskId, TaskState.FAILED, UnknownReason) +when(tsm.taskSetBlacklistHelperOpt.get.isExecutorBlacklistedForTask( + "executor0", failedTask.index)).thenReturn(true) + +// make an offer on the blacklisted executor. We won't schedule anything, and set the abort +// timer to kick in immediately +assert(taskScheduler.resourceOffers(IndexedSeq( + WorkerOffer("executor0", "host0", 1) +)).flatten.size === 0) +// Wait for the abort timer to kick in. Without sleep the test exits before the timer is +// triggered. +eventually(timeout(500.milliseconds)) { + assert(tsm.isZombie) +} + } + + test("SPARK-22148 try to acquire a new executor when task is unschedulable with 1 executor") { +taskScheduler = setupSchedulerWithMockTaskSetBlacklist( + config.UNSCHEDULABLE_TASKSET_TIMEOUT.key -> "10") + +// We have only 1 task remaining with 1 executor +val taskSet = FakeTask.createTaskSet(numTasks = 1, stageAttemptId = 0) +taskScheduler.submitTasks(taskSet) +val tsm = stageToMockTaskSetManager(0) + +// submit an offer with one executor +val firstTaskAttempts = taskScheduler.resourceOffers(IndexedSeq( + WorkerOffer("executor0", "host0", 1) +)).flatten + +// Fail the running task +val failedTask = firstTaskAttempts.find(_.executorId == "executor0").get +taskScheduler.statusUpdate(failedTask.taskId, TaskState.FAILED, ByteBuffer.allocate(0)) +// we explicitly call the handleFailedTask method here to avoid adding a sleep in the test suite +// Reason being - handleFailedTask is run by an executor service and there is a momentary delay +// before it is launched and this fails the assertion check. +tsm.handleFailedTask(failedTask.taskId, TaskState.FAILED, UnknownReason) +when(tsm.taskSetBlacklistHelperOpt.get.isExecutorBlacklistedForTask( + "executor0", failedTask.index)).thenReturn(true) + +// make an offer on the blacklisted executor. We won't schedule anything, and set the abort +// timer to expire if no new executors could be acquired. We kill the existing idle blacklisted +// executor and try to acquire a new one. +assert(taskScheduler.resourceOffers(IndexedSeq( + WorkerOffer("executor0", "host0", 1) +)).flatten.size === 0) +assert(!tsm.isZombie) + +// Offer a new executor which should be accepted +assert(taskScheduler.resourceOffers(IndexedSeq( + WorkerOffer("executor1", "host0", 1) +)).flatten.size === 1) --- End diff -- We specify the config in `seconds`. The expectation here is that timer should expire in `10 seconds`, which I think is sufficient to account for gc time. However, we could just remove this as the default is 120s. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22288: [SPARK-22148][SPARK-15815][Scheduler] Acquire new...
Github user dhruve commented on a diff in the pull request: https://github.com/apache/spark/pull/22288#discussion_r227082382 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -415,9 +420,55 @@ private[spark] class TaskSchedulerImpl( launchedAnyTask |= launchedTaskAtCurrentMaxLocality } while (launchedTaskAtCurrentMaxLocality) } + if (!launchedAnyTask) { - taskSet.abortIfCompletelyBlacklisted(hostToExecutors) + taskSet.getCompletelyBlacklistedTaskIfAny(hostToExecutors) match { +case Some(taskIndex) => // Returns the taskIndex which was unschedulable + + // If the taskSet is unschedulable we try to find an existing idle blacklisted + // executor. If we cannot find one, we abort immediately. Else we kill the idle + // executor and kick off an abortTimer which if it doesn't schedule a task within the + // the timeout will abort the taskSet if we were unable to schedule any task from the + // taskSet. + // Note 1: We keep track of schedulability on a per taskSet basis rather than on a per + // task basis. + // Note 2: The taskSet can still be aborted when there are more than one idle + // blacklisted executors and dynamic allocation is on. This can happen when a killed + // idle executor isn't replaced in time by ExecutorAllocationManager as it relies on + // pending tasks and doesn't kill executors on idle timeouts, resulting in the abort + // timer to expire and abort the taskSet. + executorIdToRunningTaskIds.find(x => !isExecutorBusy(x._1)) match { --- End diff -- I was preferring the code to be more readable. As this isn't a frequently running scenario, may be we could keep it. Thoughts? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22288: [SPARK-22148][SPARK-15815][Scheduler] Acquire new...
Github user dhruve commented on a diff in the pull request: https://github.com/apache/spark/pull/22288#discussion_r227095389 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -415,9 +420,55 @@ private[spark] class TaskSchedulerImpl( launchedAnyTask |= launchedTaskAtCurrentMaxLocality } while (launchedTaskAtCurrentMaxLocality) } + if (!launchedAnyTask) { - taskSet.abortIfCompletelyBlacklisted(hostToExecutors) + taskSet.getCompletelyBlacklistedTaskIfAny(hostToExecutors) match { +case Some(taskIndex) => // Returns the taskIndex which was unschedulable + + // If the taskSet is unschedulable we try to find an existing idle blacklisted + // executor. If we cannot find one, we abort immediately. Else we kill the idle --- End diff -- By clearing the abort timer as soon as a task is launched we are relaxing this situation. If there are large backlog of tasks: - If we acquire new executors or launch new tasks we will defer the check - If we cannot acquire new executors and we are running with long running tasks such that no new tasks can be launched and we have less no. of executors compared to max failures, in that case this will end up being harsh. This can happen, but seems more like a very specific edge case. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22288: [SPARK-22148][SPARK-15815][Scheduler] Acquire new...
Github user dhruve commented on a diff in the pull request: https://github.com/apache/spark/pull/22288#discussion_r227080844 --- Diff: core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala --- @@ -503,6 +507,145 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B verify(tsm).abort(anyString(), anyObject()) } + test("SPARK-22148 abort timer should kick in when task is completely blacklisted & no new " + +"executor can be acquired") { +// set the abort timer to fail immediately +taskScheduler = setupSchedulerWithMockTaskSetBlacklist( + config.UNSCHEDULABLE_TASKSET_TIMEOUT.key -> "0") + +// We have only 1 task remaining with 1 executor +val taskSet = FakeTask.createTaskSet(numTasks = 1, stageAttemptId = 0) +taskScheduler.submitTasks(taskSet) +val tsm = stageToMockTaskSetManager(0) + +// submit an offer with one executor +val firstTaskAttempts = taskScheduler.resourceOffers(IndexedSeq( + WorkerOffer("executor0", "host0", 1) +)).flatten + +// Fail the running task +val failedTask = firstTaskAttempts.find(_.executorId == "executor0").get +taskScheduler.statusUpdate(failedTask.taskId, TaskState.FAILED, ByteBuffer.allocate(0)) +// we explicitly call the handleFailedTask method here to avoid adding a sleep in the test suite +// Reason being - handleFailedTask is run by an executor service and there is a momentary delay +// before it is launched and this fails the assertion check. +tsm.handleFailedTask(failedTask.taskId, TaskState.FAILED, UnknownReason) +when(tsm.taskSetBlacklistHelperOpt.get.isExecutorBlacklistedForTask( + "executor0", failedTask.index)).thenReturn(true) + +// make an offer on the blacklisted executor. We won't schedule anything, and set the abort +// timer to kick in immediately +assert(taskScheduler.resourceOffers(IndexedSeq( + WorkerOffer("executor0", "host0", 1) +)).flatten.size === 0) +// Wait for the abort timer to kick in. Without sleep the test exits before the timer is +// triggered. +eventually(timeout(500.milliseconds)) { + assert(tsm.isZombie) +} + } + + test("SPARK-22148 try to acquire a new executor when task is unschedulable with 1 executor") { +taskScheduler = setupSchedulerWithMockTaskSetBlacklist( + config.UNSCHEDULABLE_TASKSET_TIMEOUT.key -> "10") + +// We have only 1 task remaining with 1 executor +val taskSet = FakeTask.createTaskSet(numTasks = 1, stageAttemptId = 0) +taskScheduler.submitTasks(taskSet) +val tsm = stageToMockTaskSetManager(0) + +// submit an offer with one executor +val firstTaskAttempts = taskScheduler.resourceOffers(IndexedSeq( + WorkerOffer("executor0", "host0", 1) +)).flatten + +// Fail the running task +val failedTask = firstTaskAttempts.find(_.executorId == "executor0").get +taskScheduler.statusUpdate(failedTask.taskId, TaskState.FAILED, ByteBuffer.allocate(0)) +// we explicitly call the handleFailedTask method here to avoid adding a sleep in the test suite +// Reason being - handleFailedTask is run by an executor service and there is a momentary delay +// before it is launched and this fails the assertion check. +tsm.handleFailedTask(failedTask.taskId, TaskState.FAILED, UnknownReason) +when(tsm.taskSetBlacklistHelperOpt.get.isExecutorBlacklistedForTask( + "executor0", failedTask.index)).thenReturn(true) + +// make an offer on the blacklisted executor. We won't schedule anything, and set the abort +// timer to expire if no new executors could be acquired. We kill the existing idle blacklisted +// executor and try to acquire a new one. +assert(taskScheduler.resourceOffers(IndexedSeq( + WorkerOffer("executor0", "host0", 1) +)).flatten.size === 0) +assert(!tsm.isZombie) + +// Offer a new executor which should be accepted +assert(taskScheduler.resourceOffers(IndexedSeq( + WorkerOffer("executor1", "host0", 1) +)).flatten.size === 1) + +assert(!tsm.isZombie) + } + + // This is to test a scenario where we have two taskSets completely blacklisted and on acquiring + // a new executor we don't want the abort timer for the second taskSet to expire and abort the job + test("SPARK-22148 abort timer should clear unschedulableTaskSetToExpiryTime for all TaskSets")
[GitHub] spark pull request #22288: [SPARK-22148][SPARK-15815][Scheduler] Acquire new...
Github user dhruve commented on a diff in the pull request: https://github.com/apache/spark/pull/22288#discussion_r227077071 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -453,6 +504,25 @@ private[spark] class TaskSchedulerImpl( return tasks } + private def createUnschedulableTaskSetAbortTimer( + taskSet: TaskSetManager, + taskIndex: Int, + timeout: Long): TimerTask = { --- End diff -- good catch. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22288: [SPARK-22148][SPARK-15815][Scheduler] Acquire new execut...
Github user dhruve commented on the issue: https://github.com/apache/spark/pull/22288 It applies to both DA and SA. I have updated the description. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22288: [SPARK-22148][SPARK-15815][Scheduler] Acquire new execut...
Github user dhruve commented on the issue: https://github.com/apache/spark/pull/22288 @squito I have made the changes and updated the description. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22288: [SPARK-22148][SPARK-15815][Scheduler] Acquire new execut...
Github user dhruve commented on the issue: https://github.com/apache/spark/pull/22288 test this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22288: [SPARK-22148][SPARK-15815][Scheduler] Acquire new...
Github user dhruve commented on a diff in the pull request: https://github.com/apache/spark/pull/22288#discussion_r226754849 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -415,9 +420,65 @@ private[spark] class TaskSchedulerImpl( launchedAnyTask |= launchedTaskAtCurrentMaxLocality } while (launchedTaskAtCurrentMaxLocality) } + if (!launchedAnyTask) { - taskSet.abortIfCompletelyBlacklisted(hostToExecutors) + taskSet.getCompletelyBlacklistedTaskIfAny(hostToExecutors) match { +case taskIndex: Some[Int] => // Returns the taskIndex which was unschedulable + + // If the taskSet is unschedulable we try to find an existing idle blacklisted + // executor. If we cannot find one, we abort immediately. Else we kill the idle + // executor and kick off an abortTimer which if it doesn't schedule a task within the + // the timeout will abort the taskSet if we were unable to schedule any task from the + // taskSet. + // Note 1: We keep track of schedulability on a per taskSet basis rather than on a per + // task basis. + // Note 2: The taskSet can still be aborted when there are more than one idle + // blacklisted executors and dynamic allocation is on. This can happen when a killed + // idle executor isn't replaced in time by ExecutorAllocationManager as it relies on + // pending tasks and doesn't kill executors on idle timeouts, resulting in the abort + // timer to expire and abort the taskSet. + executorIdToRunningTaskIds.find(x => !isExecutorBusy(x._1)) match { +case Some (x) => + val executorId = x._1 + if (!unschedulableTaskSetToExpiryTime.contains(taskSet)) { +blacklistTrackerOpt.foreach(blt => blt.killBlacklistedIdleExecutor(executorId)) + +unschedulableTaskSetToExpiryTime(taskSet) = clock.getTimeMillis() +val timeout = conf.get(config.UNSCHEDULABLE_TASKSET_TIMEOUT) * 1000 +logInfo(s"Waiting for $timeout ms for completely " + + s"blacklisted task to be schedulable again before aborting $taskSet.") +abortTimer.schedule(new TimerTask() { + override def run() { +if (unschedulableTaskSetToExpiryTime.contains(taskSet) && + (unschedulableTaskSetToExpiryTime(taskSet) + timeout) +<= clock.getTimeMillis() +) { + logInfo("Cannot schedule any task because of complete blacklisting. " + +s"Wait time for scheduling expired. Aborting $taskSet.") + taskSet.abortSinceCompletelyBlacklisted(taskIndex.get) +} else { + this.cancel() +} + } +}, timeout) + } +case _ => // Abort Immediately + logInfo("Cannot schedule any task because of complete blacklisting. No idle" + +s" executors can be found to kill. Aborting $taskSet." ) + taskSet.abortSinceCompletelyBlacklisted(taskIndex.get) + } +case _ => // Do nothing if no tasks completely blacklisted. + } +} else { + // If a task was scheduled, we clear the expiry time for all the taskSets. This ensures + // that we have got atleast a non blacklisted executor and the job can progress. The + // abort timer checks this entry to decide if we want to abort the taskSet. --- End diff -- That is correct. It also covers other scenario that @tgravescs originally pointed out. Lets say if you have multiple taskSets running which are completely blacklisted. If you were able to get an executor, you would just clear the timer for that specific taskSet. Now due to resource constraint, if you weren't able to obtain another executor within the timeout for the other taskSet, you would abort the other taskSet when you could actually wait for it to be scheduled on the newly obtained executor. So clearing the timer for all the taskSets ensures that currently we aren't in a completely blacklisted state and should try to run to completion. However if the taskset itself is flawed, we would eventually fail. This coul
[GitHub] spark issue #22288: [SPARK-22148][SPARK-15815][Scheduler] Acquire new execut...
Github user dhruve commented on the issue: https://github.com/apache/spark/pull/22288 Failure is unrelated. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22288: [SPARK-22148][SPARK-15815][Scheduler] Acquire new execut...
Github user dhruve commented on the issue: https://github.com/apache/spark/pull/22288 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22288: [SPARK-22148][SPARK-15815][Scheduler] Acquire new...
Github user dhruve commented on a diff in the pull request: https://github.com/apache/spark/pull/22288#discussion_r224892495 --- Diff: core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala --- @@ -146,21 +146,31 @@ private[scheduler] class BlacklistTracker ( nextExpiryTime = math.min(execMinExpiry, nodeMinExpiry) } + private def killExecutor(exec: String, msg: String): Unit = { +allocationClient match { + case Some(a) => +logInfo(msg) +a.killExecutors(Seq(exec), adjustTargetNumExecutors = false, countFailures = false, + force = true) + case None => +logInfo(s"Not attempting to kill blacklisted executor id $exec " + + s"since allocation client is not defined.") +} + } + private def killBlacklistedExecutor(exec: String): Unit = { if (conf.get(config.BLACKLIST_KILL_ENABLED)) { - allocationClient match { -case Some(a) => - logInfo(s"Killing blacklisted executor id $exec " + -s"since ${config.BLACKLIST_KILL_ENABLED.key} is set.") - a.killExecutors(Seq(exec), adjustTargetNumExecutors = false, countFailures = false, -force = true) -case None => - logWarning(s"Not attempting to kill blacklisted executor id $exec " + -s"since allocation client is not defined.") - } + killExecutor(exec, +s"Killing blacklisted executor id $exec since ${config.BLACKLIST_KILL_ENABLED.key} is set.") } } + private[scheduler] def killBlacklistedIdleExecutor(exec: String): Unit = { +killExecutor(exec, --- End diff -- It doesn't make sense to have a flag for it. Because if you have it `off`, your job would always fail when you encounter all the executors are blacklisted and you can't schedule any task. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22288: [SPARK-22148][SPARK-15815][Scheduler] Acquire new...
Github user dhruve commented on a diff in the pull request: https://github.com/apache/spark/pull/22288#discussion_r224873268 --- Diff: core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala --- @@ -146,21 +146,31 @@ private[scheduler] class BlacklistTracker ( nextExpiryTime = math.min(execMinExpiry, nodeMinExpiry) } + private def killExecutor(exec: String, msg: String): Unit = { +allocationClient match { + case Some(a) => +logInfo(msg) +a.killExecutors(Seq(exec), adjustTargetNumExecutors = false, countFailures = false, + force = true) + case None => +logInfo(s"Not attempting to kill blacklisted executor id $exec " + + s"since allocation client is not defined.") +} + } + private def killBlacklistedExecutor(exec: String): Unit = { if (conf.get(config.BLACKLIST_KILL_ENABLED)) { - allocationClient match { -case Some(a) => - logInfo(s"Killing blacklisted executor id $exec " + -s"since ${config.BLACKLIST_KILL_ENABLED.key} is set.") - a.killExecutors(Seq(exec), adjustTargetNumExecutors = false, countFailures = false, -force = true) -case None => - logWarning(s"Not attempting to kill blacklisted executor id $exec " + -s"since allocation client is not defined.") - } + killExecutor(exec, +s"Killing blacklisted executor id $exec since ${config.BLACKLIST_KILL_ENABLED.key} is set.") } } + private[scheduler] def killBlacklistedIdleExecutor(exec: String): Unit = { +killExecutor(exec, --- End diff -- We want to kill an idle executor which is completely blacklisted without having to enable killing for all the blacklisted executors, so we made the change otherwise we would have kept it as is. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22288: [SPARK-22148][SPARK-15815][Scheduler] Acquire new...
Github user dhruve commented on a diff in the pull request: https://github.com/apache/spark/pull/22288#discussion_r224167756 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -415,9 +419,61 @@ private[spark] class TaskSchedulerImpl( launchedAnyTask |= launchedTaskAtCurrentMaxLocality } while (launchedTaskAtCurrentMaxLocality) } + if (!launchedAnyTask) { - taskSet.abortIfCompletelyBlacklisted(hostToExecutors) + taskSet.getCompletelyBlacklistedTaskIfAny(hostToExecutors) match { +case taskIndex: Some[Int] => // Returns the taskIndex which was unschedulable + + // If the taskSet is unschedulable we try to find an existing idle blacklisted + // executor. If we cannot find one, we abort immediately. Else we kill the idle + // executor and kick off an abortTimer which if it doesn't schedule a task within the + // the timeout will abort the taskSet if we were unable to schedule any task from the + // taskSet. + // Note 1: We keep track of schedulability on a per taskSet basis rather than on a per + // task basis. + // Note 2: The taskSet can still be aborted when there are more than one idle + // blacklisted executors and dynamic allocation is on. This can happen when a killed + // idle executor isn't replaced in time by ExecutorAllocationManager as it relies on + // pending tasks and doesn't kill executors on idle timeouts, resulting in the abort + // timer to expire and abort the taskSet. + executorIdToRunningTaskIds.find(x => !isExecutorBusy(x._1)) match { +case Some (x) => + val executorId = x._1 + if (!unschedulableTaskSetToExpiryTime.contains(taskSet)) { +blacklistTrackerOpt.foreach(blt => blt.killBlacklistedIdleExecutor(executorId)) + +unschedulableTaskSetToExpiryTime(taskSet) = clock.getTimeMillis() +val timeout = conf.get(config.UNSCHEDULABLE_TASKSET_TIMEOUT) * 1000 +logInfo(s"Waiting for $timeout ms for completely " + + s"blacklisted task to be schedulable again before aborting $taskSet.") +abortTimer.schedule(new TimerTask() { + override def run() { +if (unschedulableTaskSetToExpiryTime.contains(taskSet) && + (unschedulableTaskSetToExpiryTime(taskSet) + timeout) +<= clock.getTimeMillis() --- End diff -- it doesn't fit within the 100 char limit --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22288: [SPARK-22148][SPARK-15815][Scheduler] Acquire new execut...
Github user dhruve commented on the issue: https://github.com/apache/spark/pull/22288 @tgravescs I have addressed the review comments. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user dhruve commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r223741374 --- Diff: core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala --- @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import java.io._ +import java.nio.charset.Charset +import java.nio.file.{Files, Paths} +import java.util.Locale + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.{SparkEnv, SparkException} +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.util.Utils + +private[spark] case class ProcfsBasedSystemsMetrics( +jvmVmemTotal: Long, +jvmRSSTotal: Long, +pythonVmemTotal: Long, +pythonRSSTotal: Long, +otherVmemTotal: Long, +otherRSSTotal: Long) + +// Some of the ideas here are taken from the ProcfsBasedProcessTree class in hadoop +// project. +private[spark] class ProcfsBasedSystems(val procfsDir: String = "/proc/") extends Logging { + val procfsStatFile = "stat" + val testing = sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing") + var pageSize = computePageSize() + var isAvailable: Boolean = isProcfsAvailable + private val pid = computePid() + private val ptree = mutable.Map[ Int, Set[Int]]() + + var allMetrics: ProcfsBasedSystemsMetrics = ProcfsBasedSystemsMetrics(0, 0, 0, 0, 0, 0) + private var latestJVMVmemTotal = 0L + private var latestJVMRSSTotal = 0L + private var latestPythonVmemTotal = 0L + private var latestPythonRSSTotal = 0L + private var latestOtherVmemTotal = 0L + private var latestOtherRSSTotal = 0L + + computeProcessTree() + + private def isProcfsAvailable: Boolean = { +if (testing) { + return true +} +try { + if (!Files.exists(Paths.get(procfsDir))) { +return false + } +} +catch { + case f: FileNotFoundException => return false +} +val shouldLogStageExecutorMetrics = + SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS) +val shouldLogStageExecutorProcessTreeMetrics = + SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS) +shouldLogStageExecutorProcessTreeMetrics && shouldLogStageExecutorMetrics + } + + private def computePid(): Int = { +if (!isAvailable || testing) { + return -1; +} +try { + // This can be simplified in java9: + // https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html + val cmd = Array("bash", "-c", "echo $PPID") + val length = 10 + val out2 = Utils.executeAndGetOutput(cmd) + val pid = Integer.parseInt(out2.split("\n")(0)) + return pid; +} +catch { + case e: SparkException => logDebug("IO Exception when trying to compute process tree." + +" As a result reporting of ProcessTree metrics is stopped", e) +isAvailable = false +return -1 +} + } + + private def computePageSize(): Long = { +if (testing) { + return 0; +} +val cmd = Array("getconf", "PAGESIZE") +val out2 = Utils.executeAndGetOutput(cmd) +return Integer.parseInt(out2.split("\n")(0)) + } + + private def computeProcessTree(): Unit = { +if (!isAvailable || testing) { + return +} +val queue = mutable.Queue.empty[Int] +queue += pid +while( !queue.isEmpty ) { + val p = queue.dequeue() + val c = getChildPids(p) + if(!c.isEmpty) { +queue ++= c +ptree += (p -> c.toSet)
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user dhruve commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r222783753 --- Diff: core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala --- @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import java.io._ +import java.nio.charset.Charset +import java.nio.file.{Files, Paths} +import java.util.Locale + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.Queue + +import org.apache.spark.SparkEnv +import org.apache.spark.internal.{config, Logging} + +private[spark] case class ProcfsBasedSystemsMetrics( +jvmVmemTotal: Long, +jvmRSSTotal: Long, +pythonVmemTotal: Long, +pythonRSSTotal: Long, +otherVmemTotal: Long, +otherRSSTotal: Long) + +// Some of the ideas here are taken from the ProcfsBasedProcessTree class in hadoop +// project. +private[spark] class ProcfsBasedSystems(val procfsDir: String = "/proc/") extends Logging { + val procfsStatFile = "stat" + var pageSize = computePageSize() + var isAvailable: Boolean = isProcfsAvailable + private val pid = computePid() + private val ptree = mutable.Map[ Int, Set[Int]]() + + var allMetrics: ProcfsBasedSystemsMetrics = ProcfsBasedSystemsMetrics(0, 0, 0, 0, 0, 0) + private var latestJVMVmemTotal = 0L + private var latestJVMRSSTotal = 0L + private var latestPythonVmemTotal = 0L + private var latestPythonRSSTotal = 0L + private var latestOtherVmemTotal = 0L + private var latestOtherRSSTotal = 0L + + computeProcessTree() + + private def isProcfsAvailable: Boolean = { +val testing = sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing") +if (testing) { + return true +} +try { + if (!Files.exists(Paths.get(procfsDir))) { +return false + } +} +catch { + case f: FileNotFoundException => return false +} +val shouldLogStageExecutorMetrics = + SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS) +val shouldLogStageExecutorProcessTreeMetrics = + SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS) +shouldLogStageExecutorProcessTreeMetrics && shouldLogStageExecutorMetrics + } + + private def computePid(): Int = { +if (!isAvailable) { + return -1; +} +try { + // This can be simplified in java9: + // https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html + val cmd = Array("bash", "-c", "echo $PPID") + val length = 10 + val out = Array.fill[Byte](length)(0) + Runtime.getRuntime.exec(cmd).getInputStream.read(out) + val pid = Integer.parseInt(new String(out, "UTF-8").trim) + return pid; +} +catch { + case e: IOException => logDebug("IO Exception when trying to compute process tree." + +" As a result reporting of ProcessTree metrics is stopped", e) +isAvailable = false +return -1 +} + } + + private def computePageSize(): Long = { +val testing = sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing") +if (testing) { + return 0; +} +val cmd = Array("getconf", "PAGESIZE") +val out = Array.fill[Byte](10)(0) +Runtime.getRuntime.exec(cmd).getInputStream.read(out) +return Integer.parseInt(new String(out, "UTF-8").trim) + } + + private def computeProcessTree(): Unit = { +if (!isAvailable) { + return +} +val queue = mutable.Queue.empty[Int] +queue += pi
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user dhruve commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r222739514 --- Diff: core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala --- @@ -0,0 +1,274 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import java.io._ +import java.nio.charset.Charset +import java.nio.file.{Files, Paths} + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.Queue + +import org.apache.spark.SparkEnv +import org.apache.spark.internal.{config, Logging} + +private[spark] case class ProcfsBasedSystemsMetrics( +jvmVmemTotal: Long, +jvmRSSTotal: Long, +pythonVmemTotal: Long, +pythonRSSTotal: Long, +otherVmemTotal: Long, +otherRSSTotal: Long) + +// Some of the ideas here are taken from the ProcfsBasedProcessTree class in hadoop +// project. +private[spark] class ProcfsBasedSystems(procfsDir: String = "/proc/") extends Logging { + val procfsStatFile = "stat" + var pageSize: Long = computePageSize() + var isAvailable: Boolean = isProcfsAvailable + private val pid: Int = computePid() + private val ptree: scala.collection.mutable.Map[ Int, Set[Int]] = +scala.collection.mutable.Map[ Int, Set[Int]]() + + var allMetrics: ProcfsBasedSystemsMetrics = ProcfsBasedSystemsMetrics(0, 0, 0, 0, 0, 0) + private var latestJVMVmemTotal: Long = 0 + private var latestJVMRSSTotal: Long = 0 + private var latestPythonVmemTotal: Long = 0 + private var latestPythonRSSTotal: Long = 0 + private var latestOtherVmemTotal: Long = 0 + private var latestOtherRSSTotal: Long = 0 + + computeProcessTree() + + private def isProcfsAvailable: Boolean = { +val testing = sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing") +if (testing) { + return true +} +try { + if (!Files.exists(Paths.get(procfsDir))) { +return false + } +} +catch { + case f: FileNotFoundException => return false +} +val shouldLogStageExecutorMetrics = + SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS) +val shouldLogStageExecutorProcessTreeMetrics = + SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS) +shouldLogStageExecutorProcessTreeMetrics && shouldLogStageExecutorMetrics + } + + private def computePid(): Int = { +if (!isAvailable) { + return -1; +} +try { + // This can be simplified in java9: + // https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html + val cmd = Array("bash", "-c", "echo $PPID") + val length = 10 + val out: Array[Byte] = Array.fill[Byte](length)(0) + Runtime.getRuntime.exec(cmd).getInputStream.read(out) --- End diff -- Agree with @mccheah here. We should use the Utils class. Infact I was thinking wouldn't it be better to move the logic to get the childPIDs to the utils class itself. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user dhruve commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r222784195 --- Diff: core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala --- @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import java.io._ +import java.nio.charset.Charset +import java.nio.file.{Files, Paths} +import java.util.Locale + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.Queue + +import org.apache.spark.SparkEnv +import org.apache.spark.internal.{config, Logging} + +private[spark] case class ProcfsBasedSystemsMetrics( +jvmVmemTotal: Long, +jvmRSSTotal: Long, +pythonVmemTotal: Long, +pythonRSSTotal: Long, +otherVmemTotal: Long, +otherRSSTotal: Long) + +// Some of the ideas here are taken from the ProcfsBasedProcessTree class in hadoop +// project. +private[spark] class ProcfsBasedSystems(val procfsDir: String = "/proc/") extends Logging { + val procfsStatFile = "stat" + var pageSize = computePageSize() + var isAvailable: Boolean = isProcfsAvailable + private val pid = computePid() + private val ptree = mutable.Map[ Int, Set[Int]]() + + var allMetrics: ProcfsBasedSystemsMetrics = ProcfsBasedSystemsMetrics(0, 0, 0, 0, 0, 0) + private var latestJVMVmemTotal = 0L + private var latestJVMRSSTotal = 0L + private var latestPythonVmemTotal = 0L + private var latestPythonRSSTotal = 0L + private var latestOtherVmemTotal = 0L + private var latestOtherRSSTotal = 0L + + computeProcessTree() + + private def isProcfsAvailable: Boolean = { +val testing = sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing") +if (testing) { + return true +} +try { + if (!Files.exists(Paths.get(procfsDir))) { +return false + } +} +catch { + case f: FileNotFoundException => return false +} +val shouldLogStageExecutorMetrics = + SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS) +val shouldLogStageExecutorProcessTreeMetrics = + SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS) +shouldLogStageExecutorProcessTreeMetrics && shouldLogStageExecutorMetrics + } + + private def computePid(): Int = { +if (!isAvailable) { + return -1; +} +try { + // This can be simplified in java9: + // https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html + val cmd = Array("bash", "-c", "echo $PPID") + val length = 10 + val out = Array.fill[Byte](length)(0) + Runtime.getRuntime.exec(cmd).getInputStream.read(out) + val pid = Integer.parseInt(new String(out, "UTF-8").trim) + return pid; +} +catch { + case e: IOException => logDebug("IO Exception when trying to compute process tree." + +" As a result reporting of ProcessTree metrics is stopped", e) +isAvailable = false +return -1 +} + } + + private def computePageSize(): Long = { +val testing = sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing") +if (testing) { + return 0; +} +val cmd = Array("getconf", "PAGESIZE") +val out = Array.fill[Byte](10)(0) +Runtime.getRuntime.exec(cmd).getInputStream.read(out) +return Integer.parseInt(new String(out, "UTF-8").trim) + } + + private def computeProcessTree(): Unit = { +if (!isAvailable) { + return +} +val queue = mutable.Queue.empty[Int] +queue += pi
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user dhruve commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r222785629 --- Diff: core/src/main/scala/org/apache/spark/metrics/ExecutorMetricType.scala --- @@ -59,6 +60,43 @@ case object JVMOffHeapMemory extends ExecutorMetricType { } } +case object ProcessTreeJVMRSSMemory extends ExecutorMetricType { + override private[spark] def getMetricValue(memoryManager: MemoryManager): Long = { +ExecutorMetricType.pTreeInfo.updateAllMetrics() --- End diff -- Should make ProcessTreeMemory extends ExecutorMetricType and individual metrics can be returned from it. This also makes the assumption of calculating the metrics only in the ProcessTreeJVMRSSMemory and subsequent calls using it. We shouldn't depend on the order here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user dhruve commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r222783588 --- Diff: core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala --- @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import java.io._ +import java.nio.charset.Charset +import java.nio.file.{Files, Paths} +import java.util.Locale + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.Queue + +import org.apache.spark.SparkEnv +import org.apache.spark.internal.{config, Logging} + +private[spark] case class ProcfsBasedSystemsMetrics( +jvmVmemTotal: Long, +jvmRSSTotal: Long, +pythonVmemTotal: Long, +pythonRSSTotal: Long, +otherVmemTotal: Long, +otherRSSTotal: Long) + +// Some of the ideas here are taken from the ProcfsBasedProcessTree class in hadoop +// project. +private[spark] class ProcfsBasedSystems(val procfsDir: String = "/proc/") extends Logging { + val procfsStatFile = "stat" + var pageSize = computePageSize() + var isAvailable: Boolean = isProcfsAvailable + private val pid = computePid() + private val ptree = mutable.Map[ Int, Set[Int]]() + + var allMetrics: ProcfsBasedSystemsMetrics = ProcfsBasedSystemsMetrics(0, 0, 0, 0, 0, 0) + private var latestJVMVmemTotal = 0L + private var latestJVMRSSTotal = 0L + private var latestPythonVmemTotal = 0L + private var latestPythonRSSTotal = 0L + private var latestOtherVmemTotal = 0L + private var latestOtherRSSTotal = 0L + + computeProcessTree() + + private def isProcfsAvailable: Boolean = { +val testing = sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing") +if (testing) { + return true +} +try { + if (!Files.exists(Paths.get(procfsDir))) { +return false + } +} +catch { + case f: FileNotFoundException => return false +} +val shouldLogStageExecutorMetrics = + SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS) +val shouldLogStageExecutorProcessTreeMetrics = + SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS) +shouldLogStageExecutorProcessTreeMetrics && shouldLogStageExecutorMetrics + } + + private def computePid(): Int = { +if (!isAvailable) { + return -1; +} +try { + // This can be simplified in java9: + // https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html + val cmd = Array("bash", "-c", "echo $PPID") + val length = 10 + val out = Array.fill[Byte](length)(0) + Runtime.getRuntime.exec(cmd).getInputStream.read(out) + val pid = Integer.parseInt(new String(out, "UTF-8").trim) + return pid; +} +catch { + case e: IOException => logDebug("IO Exception when trying to compute process tree." + +" As a result reporting of ProcessTree metrics is stopped", e) +isAvailable = false +return -1 +} + } + + private def computePageSize(): Long = { +val testing = sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing") +if (testing) { + return 0; +} +val cmd = Array("getconf", "PAGESIZE") +val out = Array.fill[Byte](10)(0) +Runtime.getRuntime.exec(cmd).getInputStream.read(out) +return Integer.parseInt(new String(out, "UTF-8").trim) + } + + private def computeProcessTree(): Unit = { +if (!isAvailable) { + return +} +val queue = mutable.Queue.empty[Int] +queue += pi
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user dhruve commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r222781707 --- Diff: core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala --- @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import java.io._ +import java.nio.charset.Charset +import java.nio.file.{Files, Paths} +import java.util.Locale + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.Queue + +import org.apache.spark.SparkEnv +import org.apache.spark.internal.{config, Logging} + +private[spark] case class ProcfsBasedSystemsMetrics( +jvmVmemTotal: Long, +jvmRSSTotal: Long, +pythonVmemTotal: Long, +pythonRSSTotal: Long, +otherVmemTotal: Long, +otherRSSTotal: Long) + +// Some of the ideas here are taken from the ProcfsBasedProcessTree class in hadoop +// project. +private[spark] class ProcfsBasedSystems(val procfsDir: String = "/proc/") extends Logging { + val procfsStatFile = "stat" + var pageSize = computePageSize() + var isAvailable: Boolean = isProcfsAvailable + private val pid = computePid() + private val ptree = mutable.Map[ Int, Set[Int]]() + + var allMetrics: ProcfsBasedSystemsMetrics = ProcfsBasedSystemsMetrics(0, 0, 0, 0, 0, 0) + private var latestJVMVmemTotal = 0L + private var latestJVMRSSTotal = 0L + private var latestPythonVmemTotal = 0L + private var latestPythonRSSTotal = 0L + private var latestOtherVmemTotal = 0L + private var latestOtherRSSTotal = 0L + + computeProcessTree() + + private def isProcfsAvailable: Boolean = { +val testing = sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing") +if (testing) { + return true +} +try { + if (!Files.exists(Paths.get(procfsDir))) { +return false + } +} +catch { + case f: FileNotFoundException => return false +} +val shouldLogStageExecutorMetrics = + SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS) +val shouldLogStageExecutorProcessTreeMetrics = + SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS) +shouldLogStageExecutorProcessTreeMetrics && shouldLogStageExecutorMetrics + } + + private def computePid(): Int = { +if (!isAvailable) { + return -1; +} +try { + // This can be simplified in java9: + // https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html + val cmd = Array("bash", "-c", "echo $PPID") + val length = 10 + val out = Array.fill[Byte](length)(0) + Runtime.getRuntime.exec(cmd).getInputStream.read(out) + val pid = Integer.parseInt(new String(out, "UTF-8").trim) + return pid; +} +catch { + case e: IOException => logDebug("IO Exception when trying to compute process tree." + +" As a result reporting of ProcessTree metrics is stopped", e) +isAvailable = false +return -1 +} + } + + private def computePageSize(): Long = { +val testing = sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing") +if (testing) { + return 0; +} +val cmd = Array("getconf", "PAGESIZE") +val out = Array.fill[Byte](10)(0) +Runtime.getRuntime.exec(cmd).getInputStream.read(out) +return Integer.parseInt(new String(out, "UTF-8").trim) + } + + private def computeProcessTree(): Unit = { +if (!isAvailable) { + return +} +val queue = mutable.Queue.empty[Int] +queue += pi
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user dhruve commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r222739822 --- Diff: core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala --- @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import java.io._ +import java.nio.charset.Charset +import java.nio.file.{Files, Paths} +import java.util.Locale + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.Queue + +import org.apache.spark.SparkEnv +import org.apache.spark.internal.{config, Logging} + +private[spark] case class ProcfsBasedSystemsMetrics( +jvmVmemTotal: Long, +jvmRSSTotal: Long, +pythonVmemTotal: Long, +pythonRSSTotal: Long, +otherVmemTotal: Long, +otherRSSTotal: Long) + +// Some of the ideas here are taken from the ProcfsBasedProcessTree class in hadoop +// project. +private[spark] class ProcfsBasedSystems(val procfsDir: String = "/proc/") extends Logging { + val procfsStatFile = "stat" + var pageSize = computePageSize() + var isAvailable: Boolean = isProcfsAvailable + private val pid = computePid() + private val ptree = mutable.Map[ Int, Set[Int]]() + + var allMetrics: ProcfsBasedSystemsMetrics = ProcfsBasedSystemsMetrics(0, 0, 0, 0, 0, 0) + private var latestJVMVmemTotal = 0L + private var latestJVMRSSTotal = 0L + private var latestPythonVmemTotal = 0L + private var latestPythonRSSTotal = 0L + private var latestOtherVmemTotal = 0L + private var latestOtherRSSTotal = 0L + + computeProcessTree() + + private def isProcfsAvailable: Boolean = { +val testing = sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing") +if (testing) { + return true +} +try { + if (!Files.exists(Paths.get(procfsDir))) { +return false + } +} +catch { + case f: FileNotFoundException => return false +} +val shouldLogStageExecutorMetrics = + SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS) +val shouldLogStageExecutorProcessTreeMetrics = + SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS) +shouldLogStageExecutorProcessTreeMetrics && shouldLogStageExecutorMetrics + } + + private def computePid(): Int = { +if (!isAvailable) { + return -1; +} +try { + // This can be simplified in java9: + // https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html + val cmd = Array("bash", "-c", "echo $PPID") + val length = 10 + val out = Array.fill[Byte](length)(0) + Runtime.getRuntime.exec(cmd).getInputStream.read(out) + val pid = Integer.parseInt(new String(out, "UTF-8").trim) + return pid; +} +catch { + case e: IOException => logDebug("IO Exception when trying to compute process tree." + +" As a result reporting of ProcessTree metrics is stopped", e) +isAvailable = false +return -1 +} + } + + private def computePageSize(): Long = { +val testing = sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing") +if (testing) { + return 0; +} +val cmd = Array("getconf", "PAGESIZE") +val out = Array.fill[Byte](10)(0) +Runtime.getRuntime.exec(cmd).getInputStream.read(out) +return Integer.parseInt(new String(out, "UTF-8").trim) + } + + private def computeProcessTree(): Unit = { +if (!isAvailable) { + return +} +val queue = mutable.Queue.empty[Int] +queue += pi
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user dhruve commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r222779045 --- Diff: core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala --- @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import java.io._ +import java.nio.charset.Charset +import java.nio.file.{Files, Paths} +import java.util.Locale + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.Queue + +import org.apache.spark.SparkEnv +import org.apache.spark.internal.{config, Logging} + +private[spark] case class ProcfsBasedSystemsMetrics( +jvmVmemTotal: Long, +jvmRSSTotal: Long, +pythonVmemTotal: Long, +pythonRSSTotal: Long, +otherVmemTotal: Long, +otherRSSTotal: Long) + +// Some of the ideas here are taken from the ProcfsBasedProcessTree class in hadoop +// project. +private[spark] class ProcfsBasedSystems(val procfsDir: String = "/proc/") extends Logging { + val procfsStatFile = "stat" + var pageSize = computePageSize() + var isAvailable: Boolean = isProcfsAvailable + private val pid = computePid() + private val ptree = mutable.Map[ Int, Set[Int]]() + + var allMetrics: ProcfsBasedSystemsMetrics = ProcfsBasedSystemsMetrics(0, 0, 0, 0, 0, 0) + private var latestJVMVmemTotal = 0L --- End diff -- These don't seem to be needed as far as we have ProcfsBasedSystemsMetrics. As others have pointed out and it makes more sense to have a Map of named metrics. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22288: [SPARK-22148][SPARK-15815][Scheduler] Acquire new execut...
Github user dhruve commented on the issue: https://github.com/apache/spark/pull/22288 the failures seem to be unrelated. I wasn't able to reproduce them. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22288: [SPARK-22148][SPARK-15815][Scheduler] Acquire new execut...
Github user dhruve commented on the issue: https://github.com/apache/spark/pull/22288 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22288: [SPARK-22148][SPARK-15815][Scheduler] Acquire new...
Github user dhruve commented on a diff in the pull request: https://github.com/apache/spark/pull/22288#discussion_r216795021 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -414,9 +425,48 @@ private[spark] class TaskSchedulerImpl( launchedAnyTask |= launchedTaskAtCurrentMaxLocality } while (launchedTaskAtCurrentMaxLocality) } + if (!launchedAnyTask) { - taskSet.abortIfCompletelyBlacklisted(hostToExecutors) -} + taskSet.getCompletelyBlacklistedTaskIfAny(hostToExecutors) match { +case taskIndex: Some[Int] => // Returns the taskIndex which was unschedulable + + // If the taskSet is unschedulable we kill an existing blacklisted executor/s and + // kick off an abortTimer which after waiting will abort the taskSet if we were + // unable to schedule any task from the taskSet. + // Note: We keep a track of schedulability on a per taskSet basis rather than on a + // per task basis. + val executor = hostToExecutors.valuesIterator.next().iterator.next() --- End diff -- That's a nice suggestion. There was a case where you could have a few executors running, let's say just 3 of them and all are blacklisted but have some tasks running on them. To satisfy this, I had started modifying this to take down an executor with the least no. of tasks running on them. I'll check some more on this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22288: [SPARK-22148][Scheduler] Acquire new executors to...
Github user dhruve commented on a diff in the pull request: https://github.com/apache/spark/pull/22288#discussion_r216788096 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -414,9 +425,48 @@ private[spark] class TaskSchedulerImpl( launchedAnyTask |= launchedTaskAtCurrentMaxLocality } while (launchedTaskAtCurrentMaxLocality) } + if (!launchedAnyTask) { - taskSet.abortIfCompletelyBlacklisted(hostToExecutors) -} + taskSet.getCompletelyBlacklistedTaskIfAny(hostToExecutors) match { +case taskIndex: Some[Int] => // Returns the taskIndex which was unschedulable + + // If the taskSet is unschedulable we kill an existing blacklisted executor/s and + // kick off an abortTimer which after waiting will abort the taskSet if we were + // unable to schedule any task from the taskSet. + // Note: We keep a track of schedulability on a per taskSet basis rather than on a + // per task basis. + val executor = hostToExecutors.valuesIterator.next().iterator.next() + logDebug("Killing executor because of task unschedulability: " + executor) --- End diff -- noted. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22288: [SPARK-22148][Scheduler] Acquire new executors to...
Github user dhruve commented on a diff in the pull request: https://github.com/apache/spark/pull/22288#discussion_r216788079 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -414,9 +425,48 @@ private[spark] class TaskSchedulerImpl( launchedAnyTask |= launchedTaskAtCurrentMaxLocality } while (launchedTaskAtCurrentMaxLocality) } + if (!launchedAnyTask) { - taskSet.abortIfCompletelyBlacklisted(hostToExecutors) -} + taskSet.getCompletelyBlacklistedTaskIfAny(hostToExecutors) match { +case taskIndex: Some[Int] => // Returns the taskIndex which was unschedulable + + // If the taskSet is unschedulable we kill an existing blacklisted executor/s and + // kick off an abortTimer which after waiting will abort the taskSet if we were + // unable to schedule any task from the taskSet. + // Note: We keep a track of schedulability on a per taskSet basis rather than on a + // per task basis. + val executor = hostToExecutors.valuesIterator.next().iterator.next() + logDebug("Killing executor because of task unschedulability: " + executor) + blacklistTrackerOpt.foreach(blt => blt.killBlacklistedExecutor(executor)) + + if (!unschedulableTaskSetToExpiryTime.contains(taskSet)) { + unschedulableTaskSetToExpiryTime(taskSet) = clock.getTimeMillis() --- End diff -- noted. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22288: [SPARK-22148][Scheduler] Acquire new executors to...
Github user dhruve commented on a diff in the pull request: https://github.com/apache/spark/pull/22288#discussion_r216788016 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala --- @@ -623,8 +623,9 @@ private[spark] class TaskSetManager( * * It is possible that this taskset has become impossible to schedule *anywhere* due to the * blacklist. The most common scenario would be if there are fewer executors than - * spark.task.maxFailures. We need to detect this so we can fail the task set, otherwise the job - * will hang. + * spark.task.maxFailures. We need to detect this so we can avoid the job from being hung. + * If dynamic allocation is enabled we try to acquire new executor/s by killing the existing one. + * In case of static allocation we abort the taskSet immediately to fail the job. --- End diff -- Yes. The change of removing a single executor takes care of static allocation as well. I will update the comments. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22288: [SPARK-22148][Scheduler] Acquire new executors to...
Github user dhruve commented on a diff in the pull request: https://github.com/apache/spark/pull/22288#discussion_r215036162 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -414,9 +425,54 @@ private[spark] class TaskSchedulerImpl( launchedAnyTask |= launchedTaskAtCurrentMaxLocality } while (launchedTaskAtCurrentMaxLocality) } + if (!launchedAnyTask) { - taskSet.abortIfCompletelyBlacklisted(hostToExecutors) -} + taskSet.getCompletelyBlacklistedTaskIfAny(hostToExecutors) match { +case taskIndex: Some[Int] => // Returns the taskIndex which was unschedulable + if (conf.getBoolean("spark.dynamicAllocation.enabled", false)) { +// If the taskSet is unschedulable we kill the existing blacklisted executor/s and +// kick off an abortTimer which after waiting will abort the taskSet if we were +// unable to get new executors and couldn't schedule a task from the taskSet. +// Note: We keep a track of schedulability on a per taskSet basis rather than on a +// per task basis. +if (!unschedulableTaskSetToExpiryTime.contains(taskSet)) { + hostToExecutors.valuesIterator.foreach(executors => executors.foreach({ +executor => + logDebug("Killing executor because of task unschedulability: " + executor) + blacklistTrackerOpt.foreach(blt => blt.killBlacklistedExecutor(executor)) --- End diff -- - To refresh executors, you need to enable `spark.blacklist.killBlacklistedExecutors`. - I was thinking about it, killing all the executors is a little too harsh. Killing only a single executor would help mitigate this, although this would also lead to failing the running tasks on that executor. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22288: [SPARK-22148][Scheduler] Acquire new executors to avoid ...
Github user dhruve commented on the issue: https://github.com/apache/spark/pull/22288 @squito @tgravescs Can you review this PR? Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22288: [SPARK-22148] Acquire new executors to avoid hang...
GitHub user dhruve opened a pull request: https://github.com/apache/spark/pull/22288 [SPARK-22148] Acquire new executors to avoid hang because of blacklisting ## What changes were proposed in this pull request? Every time a task is unschedulable because of the condition where no. of task failures < no. of executors available, we currently abort the taskSet - failing the job. This change tries to acquire new executors if dynamicAllocation is turned on so that we can complete the job successfully. ## How was this patch tested? I performed some manual tests to check and validate the behavior. ```scala val rdd = sc.parallelize(Seq(1 to 10), 3) import org.apache.spark.TaskContext val mapped = rdd.mapPartitionsWithIndex ( (index, iterator) => { if (index == 2) { Thread.sleep(30 * 1000); val attemptNum = TaskContext.get.attemptNumber; if (attemptNum < 3) throw new Exception("Fail for blacklisting")}; iterator.toList.map (x => x + " -> " + index).iterator } ) mapped.collect ``` Note: I am putting up this PR as initial draft to review the approach. Todo List: - Add unit tests - Agree upon the conf name & value and update the docs We can build on this approach further by: - Taking into account static allocation - Querying the RM to figure out if its a small cluster, then try to wait some more or abort immediately. - Try to distinguish between waiting for time while you acquire an executor and time for being unable to schedule a task. Open to suggestions. You can merge this pull request into a Git repository by running: $ git pull https://github.com/dhruve/spark bug/SPARK-22148 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22288.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22288 commit 5253b3134119b2a28cdaa1406d7bafb55f62cbc1 Author: Dhruve Ashar Date: 2018-08-30T18:08:58Z [SPARK-22148] Acquire new executors to avoid hang because of blacklisting --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22121: [SPARK-25133][SQL][Doc]Avro data source guide
Github user dhruve commented on a diff in the pull request: https://github.com/apache/spark/pull/22121#discussion_r212075677 --- Diff: docs/avro-data-source-guide.md --- @@ -0,0 +1,377 @@ +--- +layout: global +title: Apache Avro Data Source Guide +--- + +* This will become a table of contents (this text will be scraped). +{:toc} + +Since Spark 2.4 release, [Spark SQL](https://spark.apache.org/docs/latest/sql-programming-guide.html) provides built-in support for reading and writing Apache Avro data. + +## Deploying +The `spark-avro` module is external and not included in `spark-submit` or `spark-shell` by default. + +As with any Spark applications, `spark-submit` is used to launch your application. `spark-avro_{{site.SCALA_BINARY_VERSION}}` +and its dependencies can be directly added to `spark-submit` using `--packages`, such as, + +./bin/spark-submit --packages org.apache.spark:spark-avro_{{site.SCALA_BINARY_VERSION}}:{{site.SPARK_VERSION_SHORT}} ... + +For experimenting on `spark-shell`, you can also use `--packages` to add `org.apache.spark:spark-avro_{{site.SCALA_BINARY_VERSION}}` and its dependencies directly, + +./bin/spark-shell --packages org.apache.spark:spark-avro_{{site.SCALA_BINARY_VERSION}}:{{site.SPARK_VERSION_SHORT}} ... + +See [Application Submission Guide](submitting-applications.html) for more details about submitting applications with external dependencies. + +## Load and Save Functions + +Since `spark-avro` module is external, there is no `.avro` API in +`DataFrameReader` or `DataFrameWriter`. + +To load/save data in Avro format, you need to specify the data source option `format` as `avro`(or `org.apache.spark.sql.avro`). + + +{% highlight scala %} + +val usersDF = spark.read.format("avro").load("examples/src/main/resources/users.avro") +usersDF.select("name", "favorite_color").write.format("avro").save("namesAndFavColors.avro") + +{% endhighlight %} + + +{% highlight java %} + +Dataset usersDF = spark.read().format("avro").load("examples/src/main/resources/users.avro"); +usersDF.select("name", "favorite_color").write().format("avro").save("namesAndFavColors.avro"); + +{% endhighlight %} + + +{% highlight python %} + +df = spark.read.format("avro").load("examples/src/main/resources/users.avro") +df.select("name", "favorite_color").write.format("avro").save("namesAndFavColors.avro") + +{% endhighlight %} + + +{% highlight r %} + +df <- read.df("examples/src/main/resources/users.avro", "avro") +write.df(select(df, "name", "favorite_color"), "namesAndFavColors.avro", "avro") + +{% endhighlight %} + + + +## to_avro() and from_avro() +Spark SQL provides function `to_avro` to encode a struct as a string and `from_avro()` to retrieve the struct as a complex type. + +Using Avro record as columns are useful when reading from or writing to a streaming source like Kafka. Each +Kafka key-value record will be augmented with some metadata, such as the ingestion timestamp into Kafka, the offset in Kafka, etc. +* If the "value" field that contains your data is in Avro, you could use `from_avro()` to extract your data, enrich it, clean it, and then push it downstream to Kafka again or write it out to a file. +* `to_avro()` can be used to turn structs into Avro records. This method is particularly useful when you would like to re-encode multiple columns into a single one when writing data out to Kafka. + +Both methods are presently only available in Scala and Java. + + + +{% highlight scala %} +import org.apache.spark.sql.avro._ + +// `from_avro` requires Avro schema in JSON string format. +val jsonFormatSchema = new String(Files.readAllBytes(Paths.get("./examples/src/main/resources/user.avsc"))) + +val df = spark + .readStream + .format("kafka") + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") + .option("subscribe", "topic1") + .load() + +// 1. Decode the Avro data into a struct; +// 2. Filter by column `favorite_color`; +// 3. Encode the column `name` in Avro format. +val output = df + .select(from_avro('value, jsonFormatSchema) as 'user) + .where("user.favorite_color == \"red\"") + .select(to_avro($"user.name") as 'value) + +val ds = output + .writeStream + .format("kafka") +
[GitHub] spark pull request #22121: [SPARK-25133][SQL][Doc]Avro data source guide
Github user dhruve commented on a diff in the pull request: https://github.com/apache/spark/pull/22121#discussion_r212032223 --- Diff: docs/avro-data-source-guide.md --- @@ -0,0 +1,377 @@ +--- +layout: global +title: Apache Avro Data Source Guide +--- + +* This will become a table of contents (this text will be scraped). +{:toc} + +Since Spark 2.4 release, [Spark SQL](https://spark.apache.org/docs/latest/sql-programming-guide.html) provides built-in support for reading and writing Apache Avro data. + +## Deploying +The `spark-avro` module is external and not included in `spark-submit` or `spark-shell` by default. + +As with any Spark applications, `spark-submit` is used to launch your application. `spark-avro_{{site.SCALA_BINARY_VERSION}}` +and its dependencies can be directly added to `spark-submit` using `--packages`, such as, + +./bin/spark-submit --packages org.apache.spark:spark-avro_{{site.SCALA_BINARY_VERSION}}:{{site.SPARK_VERSION_SHORT}} ... + +For experimenting on `spark-shell`, you can also use `--packages` to add `org.apache.spark:spark-avro_{{site.SCALA_BINARY_VERSION}}` and its dependencies directly, + +./bin/spark-shell --packages org.apache.spark:spark-avro_{{site.SCALA_BINARY_VERSION}}:{{site.SPARK_VERSION_SHORT}} ... + +See [Application Submission Guide](submitting-applications.html) for more details about submitting applications with external dependencies. + +## Load and Save Functions + +Since `spark-avro` module is external, there is no `.avro` API in +`DataFrameReader` or `DataFrameWriter`. + +To load/save data in Avro format, you need to specify the data source option `format` as `avro`(or `org.apache.spark.sql.avro`). + + +{% highlight scala %} + +val usersDF = spark.read.format("avro").load("examples/src/main/resources/users.avro") +usersDF.select("name", "favorite_color").write.format("avro").save("namesAndFavColors.avro") + +{% endhighlight %} + + +{% highlight java %} + +Dataset usersDF = spark.read().format("avro").load("examples/src/main/resources/users.avro"); +usersDF.select("name", "favorite_color").write().format("avro").save("namesAndFavColors.avro"); + +{% endhighlight %} + + +{% highlight python %} + +df = spark.read.format("avro").load("examples/src/main/resources/users.avro") +df.select("name", "favorite_color").write.format("avro").save("namesAndFavColors.avro") + +{% endhighlight %} + + +{% highlight r %} + +df <- read.df("examples/src/main/resources/users.avro", "avro") +write.df(select(df, "name", "favorite_color"), "namesAndFavColors.avro", "avro") + +{% endhighlight %} + + + +## to_avro() and from_avro() +Spark SQL provides function `to_avro` to encode a struct as a string and `from_avro()` to retrieve the struct as a complex type. + +Using Avro record as columns are useful when reading from or writing to a streaming source like Kafka. Each +Kafka key-value record will be augmented with some metadata, such as the ingestion timestamp into Kafka, the offset in Kafka, etc. +* If the "value" field that contains your data is in Avro, you could use `from_avro()` to extract your data, enrich it, clean it, and then push it downstream to Kafka again or write it out to a file. +* `to_avro()` can be used to turn structs into Avro records. This method is particularly useful when you would like to re-encode multiple columns into a single one when writing data out to Kafka. + +Both methods are presently only available in Scala and Java. + + + +{% highlight scala %} +import org.apache.spark.sql.avro._ + +// `from_avro` requires Avro schema in JSON string format. +val jsonFormatSchema = new String(Files.readAllBytes(Paths.get("./examples/src/main/resources/user.avsc"))) + +val df = spark + .readStream + .format("kafka") + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") + .option("subscribe", "topic1") + .load() + +// 1. Decode the Avro data into a struct; +// 2. Filter by column `favorite_color`; +// 3. Encode the column `name` in Avro format. +val output = df + .select(from_avro('value, jsonFormatSchema) as 'user) + .where("user.favorite_color == \"red\"") + .select(to_avro($"user.name") as 'value) + +val ds = output + .writeStream + .format("kafka") +
[GitHub] spark issue #22015: [SPARK-20286][SPARK-24786][Core][DynamicAllocation] Rele...
Github user dhruve commented on the issue: https://github.com/apache/spark/pull/22015 This modifies the SparkListenerUnpersistRDD case class, which fails the MiMa tests. If this is something heavily used by developers, I can add either another ListenerBus message or modify the PR to make a direct RPC call to the ExecutorAllocationManager. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22015: [SPARK-20286] Release executors on unpersisting R...
GitHub user dhruve opened a pull request: https://github.com/apache/spark/pull/22015 [SPARK-20286] Release executors on unpersisting RDD ## What changes were proposed in this pull request? Currently, the executors acquired using dynamic allocation are not released when the cached RDD is unpersisted. This leads to wasting unnecessary grid resources. With this change, once the cached RDD is unpersisted, we check if the executor has any running tasks or not. If not then we do the following: 1 - If the executor has cached RDD blocks from other RDDs, we don't make any change. 2 - If the executor has no more cached RDD blocks and tasks running, we update the removal time based on the conf `spark.dynamicAllocation.cachedExecutorIdleTimeout` so the idle executor can be released back. ## How was this patch tested? Manually using a code snippet. ``` scala val rdd = sc.textFile("smallFile") rdd.cache val rdd2 = sc.textFile("largeFile") rdd2.cache rdd2.count // Cached data on around 500+ executors Thread.sleep(3) // sleep for 30s rdd.count // Cached data on around 20 executors // Verify only 20 executors remain, rest will timeout based on idleTimeout which i set to 60s rdd2.unpersist // eventunally all executors will be released as there are no tasks running on any executor. ``` You can merge this pull request into a Git repository by running: $ git pull https://github.com/dhruve/spark bug/SPARK-20286 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22015.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22015 commit 7e229dc120d0f1542cc8d7dbac1027baac36665e Author: Dhruve Ashar Date: 2018-08-06T20:32:47Z [SPARK-20286] Release executors on unpersisting RDD --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19194: [SPARK-20589] Allow limiting task concurrency per stage
Github user dhruve commented on the issue: https://github.com/apache/spark/pull/19194 @squito I have made the changes as requested. Can you have a look at this again. Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21601: [SPARK-24610] fix reading small files via wholeTextFiles
Github user dhruve commented on the issue: https://github.com/apache/spark/pull/21601 @attilapiros I will modify the test to add a check/assert which makes it easy to follow and validate what we are trying to achieve in the test. For the rest of the cases, since these are hadoop related configs and not directly related to spark, I didn't add additional test cases as these are more related to the `CombinedFileInputFormat` rather than `WholeTextFileInputFormat`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21601: [SPARK-24610] fix reading small files via wholeTe...
Github user dhruve commented on a diff in the pull request: https://github.com/apache/spark/pull/21601#discussion_r199597945 --- Diff: core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala --- @@ -53,6 +53,19 @@ private[spark] class WholeTextFileInputFormat val totalLen = files.map(file => if (file.isDirectory) 0L else file.getLen).sum val maxSplitSize = Math.ceil(totalLen * 1.0 / (if (minPartitions == 0) 1 else minPartitions)).toLong + +// For small files we need to ensure the min split size per node & rack <= maxSplitSize +val config = context.getConfiguration +val minSplitSizePerNode = config.getLong(CombineFileInputFormat.SPLIT_MINSIZE_PERNODE, 0L) +val minSplitSizePerRack = config.getLong(CombineFileInputFormat.SPLIT_MINSIZE_PERRACK, 0L) + +if (maxSplitSize < minSplitSizePerNode) { + super.setMinSplitSizeNode(maxSplitSize) --- End diff -- AFAIU If we set these to `0L` unconditionally, every time there is left over data which wasn't combined into a split, would result in its own split because minSplitSizePerNode is `0L`. This shouldn't be an issue for small no. of files. But if we have a large no. of small files which result in a similar situation, we will end up having more splits rather than combining these together to form lesser no. of splits. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21601: [SPARK-24610] fix reading small files via wholeTe...
Github user dhruve commented on a diff in the pull request: https://github.com/apache/spark/pull/21601#discussion_r199602993 --- Diff: core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala --- @@ -53,6 +53,19 @@ private[spark] class WholeTextFileInputFormat val totalLen = files.map(file => if (file.isDirectory) 0L else file.getLen).sum val maxSplitSize = Math.ceil(totalLen * 1.0 / (if (minPartitions == 0) 1 else minPartitions)).toLong + +// For small files we need to ensure the min split size per node & rack <= maxSplitSize +val config = context.getConfiguration +val minSplitSizePerNode = config.getLong(CombineFileInputFormat.SPLIT_MINSIZE_PERNODE, 0L) +val minSplitSizePerRack = config.getLong(CombineFileInputFormat.SPLIT_MINSIZE_PERRACK, 0L) + +if (maxSplitSize < minSplitSizePerNode) { + super.setMinSplitSizeNode(maxSplitSize) --- End diff -- Also if a user specifies them via configs we are ensuring that these don't break the code. If we set them to `0L` where a user specifies them, we would end up breaking the code anyways as the way `CombineFileInputFormat` works is it checks to see if the setting is `0L` or not. If it is 0 it ends up picking the value from the config. https://github.com/apache/hadoop/blob/release-2.8.2-RC0/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java#L182 So we would have to atleast set the config to avoid hitting the error. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21601: [SPARK-24610] fix reading small files via wholeTextFiles
Github user dhruve commented on the issue: https://github.com/apache/spark/pull/21601 @vanzin Can you review this PR? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21601: [SPARK-24610] fix reading small files via wholeTe...
GitHub user dhruve opened a pull request: https://github.com/apache/spark/pull/21601 [SPARK-24610] fix reading small files via wholeTextFiles ## What changes were proposed in this pull request? The `WholeTextFileInputFormat` determines the `maxSplitSize` for the file/s being read using the `wholeTextFiles` method. While this works well for large files, for smaller files where the maxSplitSize is smaller than the defaults being used with configs like hive-site.xml or explicitly passed in the form of `mapreduce.input.fileinputformat.split.minsize.per.node` or `mapreduce.input.fileinputformat.split.minsize.per.rack` , it just throws up an exception. ```java java.io.IOException: Minimum split size pernode 123456 cannot be larger than maximum split size 9962 at org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.getSplits(CombineFileInputFormat.java:200) at org.apache.spark.rdd.WholeTextFileRDD.getPartitions(WholeTextFileRDD.scala:50) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:250) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:250) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2096) at org.apache.spark.rdd.RDD.count(RDD.scala:1158) ... 48 elided ` This change checks the maxSplitSize against the minSplitSizePerNode and minSplitSizePerRack and set them if `maxSplitSize < minSplitSizePerNode/Rack` ## How was this patch tested? Test manually setting the conf while launching the job and added unit test. You can merge this pull request into a Git repository by running: $ git pull https://github.com/dhruve/spark bug/SPARK-24610 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21601.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21601 commit 2369e3acee730b7d4e45175870de0ecac601069b Author: Dhruve Ashar Date: 2018-06-20T16:34:36Z [SPARK-24610] fix reading small files via wholeTextFiles --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19194: [SPARK-20589] Allow limiting task concurrency per stage
Github user dhruve commented on the issue: https://github.com/apache/spark/pull/19194 Hi @squito. Sorry for the late response. I want to get back on this. As soon as I get a chance I will work on it and update the PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19194: [SPARK-20589] Allow limiting task concurrency per...
Github user dhruve commented on a diff in the pull request: https://github.com/apache/spark/pull/19194#discussion_r141482741 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala --- @@ -512,6 +535,9 @@ private[spark] class TaskSetManager( serializedTask) } } else { + if (runningTasks >= maxConcurrentTasks) { +logDebug("Already running max. no. of concurrent tasks.") --- End diff -- I'll make the change for this and also update any comments to explain the behavior so far. Also I am not clear on the earlier reply as to what was the resolution for accounting the activeJobId. Do you still have any inputs ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19194: [SPARK-20589] Allow limiting task concurrency per stage
Github user dhruve commented on the issue: https://github.com/apache/spark/pull/19194 Configuring at the stage level seems to be the appropriate and more deterministic choice. If we agree on changing the API, we can start another effort looking in that direction. Till then we can mark this feature as experimental or have an undocumented config. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19194: [SPARK-20589] Allow limiting task concurrency per stage
Github user dhruve commented on the issue: https://github.com/apache/spark/pull/19194 @squito Thanks for pointing that out. What you mentioned makes sense and I did dig some more on the `DAGScheduler` and `activeJobForStage` to gather more context. We could take into account the properties of the active job when the stage is submitted, however this behavior is indeterministic. Let's say if we have two jobs from two different job groups with different threshold of task concurrency, the one that's submitted first wins as the stage won't be recomputed for the second job. In this case, there is no control over which job can get submitted first before the second one (unless the user explicitly serializes them). The problem aggravates when the difference between the task concurrency threshold is large for the two jobs. In such a case, having a wrong value can completely take down your remote service. For a deterministic behavior, I believe the best way to tackle this would be to handle in the stage properties as was the ask. However, since it involves an API change, I didn't go that route as the scope for that could be much broader. If we have more fundamental use cases which require adding something like this on the stage level, we should continue in that direction if the community is open and welcomes an API change. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19194: [SPARK-20589] Allow limiting task concurrency per...
Github user dhruve commented on a diff in the pull request: https://github.com/apache/spark/pull/19194#discussion_r140361162 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -619,6 +625,47 @@ private[spark] class ExecutorAllocationManager( // place the executors. private val stageIdToExecutorPlacementHints = new mutable.HashMap[Int, (Int, Map[String, Int])] +override def onJobStart(jobStart: SparkListenerJobStart): Unit = { + jobStart.stageInfos.foreach(stageInfo => stageIdToJobId(stageInfo.stageId) = jobStart.jobId) + + var jobGroupId = if (jobStart.properties != null) { +jobStart.properties.getProperty(SparkContext.SPARK_JOB_GROUP_ID) + } else { +null + } + + val maxConTasks = if (jobGroupId != null && +conf.contains(s"spark.job.$jobGroupId.maxConcurrentTasks")) { +conf.get(s"spark.job.$jobGroupId.maxConcurrentTasks").toInt + } else { +Int.MaxValue + } + + if (maxConTasks <= 0) { +throw new IllegalArgumentException( + "Maximum Concurrent Tasks should be set greater than 0 for the job to progress.") + } + + if (jobGroupId == null || !conf.contains(s"spark.job.$jobGroupId.maxConcurrentTasks")) { +jobGroupId = DEFAULT_JOB_GROUP + } + + jobIdToJobGroup(jobStart.jobId) = jobGroupId + if (!jobGroupToMaxConTasks.contains(jobGroupId)) { --- End diff -- @squito Thanks for elaborating on this. I believe the last para of your explanation summarized your viewpoint. In a way that makes some sense because if you want to increase your max concurrent tasks, you know what you are doing, so if you see weird thing with other threads that you created, its fine. However, when you don't control the thread creation, I feel its best to set it just once to avoid the weirdness. Its much easier to use a different job group than explain one more weird behavior. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19194: [SPARK-20589] Allow limiting task concurrency per...
Github user dhruve commented on a diff in the pull request: https://github.com/apache/spark/pull/19194#discussion_r140321640 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -619,6 +625,47 @@ private[spark] class ExecutorAllocationManager( // place the executors. private val stageIdToExecutorPlacementHints = new mutable.HashMap[Int, (Int, Map[String, Int])] +override def onJobStart(jobStart: SparkListenerJobStart): Unit = { + jobStart.stageInfos.foreach(stageInfo => stageIdToJobId(stageInfo.stageId) = jobStart.jobId) + + var jobGroupId = if (jobStart.properties != null) { +jobStart.properties.getProperty(SparkContext.SPARK_JOB_GROUP_ID) + } else { +null + } + + val maxConTasks = if (jobGroupId != null && +conf.contains(s"spark.job.$jobGroupId.maxConcurrentTasks")) { +conf.get(s"spark.job.$jobGroupId.maxConcurrentTasks").toInt + } else { +Int.MaxValue + } + + if (maxConTasks <= 0) { +throw new IllegalArgumentException( + "Maximum Concurrent Tasks should be set greater than 0 for the job to progress.") + } + + if (jobGroupId == null || !conf.contains(s"spark.job.$jobGroupId.maxConcurrentTasks")) { +jobGroupId = DEFAULT_JOB_GROUP + } + + jobIdToJobGroup(jobStart.jobId) = jobGroupId + if (!jobGroupToMaxConTasks.contains(jobGroupId)) { --- End diff -- Okay. We did consider the job-server style deployments, however I am not able to follow the example/scenario you mentioned. So when you say that different set of users are assigned different groups, they have to be served by "different threads", because we can set the job group on a per thread basis only. While it is possible & valid for different threads to belong to the same job group, you cannot have multiple job groups for the same thread active simultaneously. So in this scenario where you would want to change the maxConcTasks for a given jobGroup in one thread, the changes would be visible in the other thread as well. Because both of them belong to the same job group. In this case, it seems that it wasn't configured correctly. I am still not able to follow how setting the value every single time will help here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19194: [SPARK-20589] Allow limiting task concurrency per...
Github user dhruve commented on a diff in the pull request: https://github.com/apache/spark/pull/19194#discussion_r140293577 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -619,6 +625,47 @@ private[spark] class ExecutorAllocationManager( // place the executors. private val stageIdToExecutorPlacementHints = new mutable.HashMap[Int, (Int, Map[String, Int])] +override def onJobStart(jobStart: SparkListenerJobStart): Unit = { + jobStart.stageInfos.foreach(stageInfo => stageIdToJobId(stageInfo.stageId) = jobStart.jobId) + + var jobGroupId = if (jobStart.properties != null) { +jobStart.properties.getProperty(SparkContext.SPARK_JOB_GROUP_ID) + } else { +null + } + + val maxConTasks = if (jobGroupId != null && +conf.contains(s"spark.job.$jobGroupId.maxConcurrentTasks")) { +conf.get(s"spark.job.$jobGroupId.maxConcurrentTasks").toInt + } else { +Int.MaxValue + } + + if (maxConTasks <= 0) { +throw new IllegalArgumentException( + "Maximum Concurrent Tasks should be set greater than 0 for the job to progress.") + } + + if (jobGroupId == null || !conf.contains(s"spark.job.$jobGroupId.maxConcurrentTasks")) { +jobGroupId = DEFAULT_JOB_GROUP + } + + jobIdToJobGroup(jobStart.jobId) = jobGroupId + if (!jobGroupToMaxConTasks.contains(jobGroupId)) { --- End diff -- I understand your reasoning about setting the maxConTasks every time the job is set. However, I am not able to understand the scenario which you described. If a job completes and new one kicks off immediately, how does the new job partially overlap? Its only when all the stages & underlying tasks for the previous job have finished we would mark it as complete. So a new job won't overlap with a completed one. Am I missing something here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19194: [SPARK-20589] Allow limiting task concurrency per...
Github user dhruve commented on a diff in the pull request: https://github.com/apache/spark/pull/19194#discussion_r140122744 --- Diff: core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala --- @@ -1255,6 +1255,97 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg assert(taskOption4.get.addedJars === addedJarsMidTaskSet) } + test("limit max concurrent running tasks in a job group when configured ") { +val conf = new SparkConf(). + set(config.BLACKLIST_ENABLED, true). + set("spark.job.testJobGroup.maxConcurrentTasks", "2") // Limit max concurrent tasks to 2 + +sc = new SparkContext("local", "test", conf) +sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2")) +val props = new Properties(); +props.setProperty(SparkContext.SPARK_JOB_GROUP_ID, "testJobGroup") // set the job group + +val tasks = Array.tabulate[Task[_]](10) { i => + new FakeTask(0, i, Nil) +} +val tsm = new TaskSetManager(sched, new TaskSet(tasks, 0, 0, 0, props), 2) + +// make some offers to our taskset +var taskDescs = Seq( + "exec1" -> "host1", + "exec2" -> "host1" +).flatMap { case (exec, host) => + // offer each executor twice (simulating 2 cores per executor) + (0 until 2).flatMap{ _ => tsm.resourceOffer(exec, host, TaskLocality.ANY)} +} +assert(taskDescs.size === 2) // Out of the 4 offers, tsm can accept up to maxConcurrentTasks. + +// make 4 more offers +val taskDescs2 = Seq( + "exec1" -> "host1", + "exec2" -> "host1" +).flatMap { case (exec, host) => + (0 until 2).flatMap{ _ => tsm.resourceOffer(exec, host, TaskLocality.ANY)} +} +assert(taskDescs2.size === 0) // tsm doesn't accept any as it is already running at max tasks + +// inform tsm that one task has completed +val directTaskResult = createTaskResult(0) +tsm.handleSuccessfulTask(taskDescs(0).taskId, directTaskResult) + +// make 4 more offers after previous task completed +taskDescs = Seq( + "exec1" -> "host1", + "exec2" -> "host1" +).flatMap { case (exec, host) => + (0 until 2).flatMap{ _ => tsm.resourceOffer(exec, host, TaskLocality.ANY)} +} +assert(taskDescs.size === 1) // tsm accepts one as it can run one more task + } + + test("do not limit max concurrent running tasks in a job group by default") { --- End diff -- The previous test covers all the necessary checks introduced by the change. I added this to cover the default scenario when no job group is specified. Can do away with this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19194: [SPARK-20589] Allow limiting task concurrency per...
Github user dhruve commented on a diff in the pull request: https://github.com/apache/spark/pull/19194#discussion_r140124886 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -619,6 +625,47 @@ private[spark] class ExecutorAllocationManager( // place the executors. private val stageIdToExecutorPlacementHints = new mutable.HashMap[Int, (Int, Map[String, Int])] +override def onJobStart(jobStart: SparkListenerJobStart): Unit = { + jobStart.stageInfos.foreach(stageInfo => stageIdToJobId(stageInfo.stageId) = jobStart.jobId) + + var jobGroupId = if (jobStart.properties != null) { +jobStart.properties.getProperty(SparkContext.SPARK_JOB_GROUP_ID) + } else { +null + } + + val maxConTasks = if (jobGroupId != null && +conf.contains(s"spark.job.$jobGroupId.maxConcurrentTasks")) { +conf.get(s"spark.job.$jobGroupId.maxConcurrentTasks").toInt + } else { +Int.MaxValue + } + + if (maxConTasks <= 0) { +throw new IllegalArgumentException( + "Maximum Concurrent Tasks should be set greater than 0 for the job to progress.") + } + + if (jobGroupId == null || !conf.contains(s"spark.job.$jobGroupId.maxConcurrentTasks")) { +jobGroupId = DEFAULT_JOB_GROUP + } + + jobIdToJobGroup(jobStart.jobId) = jobGroupId + if (!jobGroupToMaxConTasks.contains(jobGroupId)) { --- End diff -- From my understanding and in context of the current code, you would group jobs together when you want their concurrency to be limited. If you want different concurrency limits for different jobs, you would set them in a different jobgroup altogether. If there are multiple jobs in the same job group which run concurrently and one of them sets a value different, then which one wins for the existing jobs and the new job? If we want to have a different value for every job then the user would need a way to know and identify a spark job in his application code , probably by a job id. Only by means of identifying a job, would the user be able to set the config for that job. This cannot be known apriori and I don't know if there is an easy way that the user can know about the underlying spark job corresponding to the action. Hence we apply a setting at the jobgroup level which allows the user to allow him to control the concurrency without knowing the underlying job related details specific to spark in an easy manner. Let me know if anything is unclear here or if you have more questions. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19194: [SPARK-20589] Allow limiting task concurrency per...
Github user dhruve commented on a diff in the pull request: https://github.com/apache/spark/pull/19194#discussion_r140122769 --- Diff: core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala --- @@ -1255,6 +1255,97 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg assert(taskOption4.get.addedJars === addedJarsMidTaskSet) } + test("limit max concurrent running tasks in a job group when configured ") { +val conf = new SparkConf(). + set(config.BLACKLIST_ENABLED, true). + set("spark.job.testJobGroup.maxConcurrentTasks", "2") // Limit max concurrent tasks to 2 + +sc = new SparkContext("local", "test", conf) +sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2")) +val props = new Properties(); +props.setProperty(SparkContext.SPARK_JOB_GROUP_ID, "testJobGroup") // set the job group + +val tasks = Array.tabulate[Task[_]](10) { i => + new FakeTask(0, i, Nil) +} +val tsm = new TaskSetManager(sched, new TaskSet(tasks, 0, 0, 0, props), 2) + +// make some offers to our taskset +var taskDescs = Seq( + "exec1" -> "host1", + "exec2" -> "host1" +).flatMap { case (exec, host) => + // offer each executor twice (simulating 2 cores per executor) + (0 until 2).flatMap{ _ => tsm.resourceOffer(exec, host, TaskLocality.ANY)} +} +assert(taskDescs.size === 2) // Out of the 4 offers, tsm can accept up to maxConcurrentTasks. + +// make 4 more offers +val taskDescs2 = Seq( + "exec1" -> "host1", + "exec2" -> "host1" +).flatMap { case (exec, host) => + (0 until 2).flatMap{ _ => tsm.resourceOffer(exec, host, TaskLocality.ANY)} +} +assert(taskDescs2.size === 0) // tsm doesn't accept any as it is already running at max tasks + +// inform tsm that one task has completed +val directTaskResult = createTaskResult(0) +tsm.handleSuccessfulTask(taskDescs(0).taskId, directTaskResult) + +// make 4 more offers after previous task completed +taskDescs = Seq( + "exec1" -> "host1", + "exec2" -> "host1" +).flatMap { case (exec, host) => + (0 until 2).flatMap{ _ => tsm.resourceOffer(exec, host, TaskLocality.ANY)} +} +assert(taskDescs.size === 1) // tsm accepts one as it can run one more task + } + + test("do not limit max concurrent running tasks in a job group by default") { +val conf = new SparkConf(). + set(config.BLACKLIST_ENABLED, true) + +sc = new SparkContext("local", "test", conf) +sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2")) + +val tasks = Array.tabulate[Task[_]](10) { i => + new FakeTask(0, i, Nil) +} +val tsm = new TaskSetManager(sched, new TaskSet(tasks, 0, 0, 0, null), 2) + +// make 5 offers to our taskset +var taskDescs = Seq( + "exec1" -> "host1", + "exec2" -> "host2" +).flatMap { case (exec, host) => + // offer each executor twice (simulating 3 cores per executor) --- End diff -- okay. Won't be needed as will be removing the test case. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19194: [SPARK-20589] Allow limiting task concurrency per...
Github user dhruve commented on a diff in the pull request: https://github.com/apache/spark/pull/19194#discussion_r140123047 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -758,11 +825,52 @@ private[spark] class ExecutorAllocationManager( allocationManager.synchronized { stageIdToNumSpeculativeTasks(stageId) = stageIdToNumSpeculativeTasks.getOrElse(stageId, 0) + 1 +maxConcurrentTasks = getMaxConTasks +logDebug(s"Setting max concurrent tasks to $maxConcurrentTasks on spec. task submitted.") allocationManager.onSchedulerBacklogged() } } /** + * Calculate the maximum no. of concurrent tasks that can run currently. + */ +def getMaxConTasks(): Int = { + // We can limit the no. of concurrent tasks by a job group. A job group can have multiple jobs + // with multiple stages. We need to get all the active stages belonging to a job group to + // calculate the total no. of pending + running tasks to decide the maximum no. of executors + // we need at that time to serve the outstanding tasks. This is capped by the minimum no. of + // outstanding tasks and the max concurrent limit specified for the job group if any. + + def getIncompleteTasksForStage(stageId: Int, numTasks: Int): Int = { +totalPendingTasks(stageId) + totalRunningTasks(stageId) + } + + def sumIncompleteTasksForStages: (Int, (Int, Int)) => Int = (totalTasks, stageToNumTasks) => { +val activeTasks = getIncompleteTasksForStage(stageToNumTasks._1, stageToNumTasks._2) +sumOrMax(totalTasks, activeTasks) + } + // Get the total running & pending tasks for all stages in a job group. + def getIncompleteTasksForJobGroup(stagesItr: mutable.HashMap[Int, Int]): Int = { +stagesItr.foldLeft(0)(sumIncompleteTasksForStages) + } + + def sumIncompleteTasksForJobGroup: (Int, (String, mutable.HashMap[Int, Int])) => Int = { +(maxConTasks, x) => { + val totalIncompleteTasksForJobGroup = getIncompleteTasksForJobGroup(x._2) + val maxTasks = Math.min(jobGroupToMaxConTasks(x._1), totalIncompleteTasksForJobGroup) + sumOrMax(maxConTasks, maxTasks) +} + } + + def sumOrMax(a: Int, b: Int): Int = if (doesSumOverflow(a, b)) Int.MaxValue else (a + b) + + def doesSumOverflow(a: Int, b: Int): Boolean = b > (Int.MaxValue - a) + + val stagesByJobGroup = stageIdToNumTasks.groupBy(x => jobIdToJobGroup(stageIdToJobId(x._1))) --- End diff -- I like the idea. I think this can be done. Will update the PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19194: [SPARK-20589] Allow limiting task concurrency per stage
Github user dhruve commented on the issue: https://github.com/apache/spark/pull/19194 @tgraves I have addressed the comments and tried to cover the possible cases in the existing test for job groups and speculation. Kindly let me know if we need to add or address more use cases. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19194: [SPARK-20589] Allow limiting task concurrency per...
Github user dhruve commented on a diff in the pull request: https://github.com/apache/spark/pull/19194#discussion_r139163830 --- Diff: core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala --- @@ -1255,6 +1255,97 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg assert(taskOption4.get.addedJars === addedJarsMidTaskSet) } + test("limit max concurrent running tasks in a job group when configured ") { +val conf = new SparkConf(). + set(config.BLACKLIST_ENABLED, true). + set("spark.job.testJobGroup.maxConcurrentTasks", "2") // Limit max concurrent tasks to 2 + +sc = new SparkContext("local", "test", conf) +sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2")) +val props = new Properties(); +props.setProperty(SparkContext.SPARK_JOB_GROUP_ID, "testJobGroup") // set the job group + +val tasks = Array.tabulate[Task[_]](10) { i => + new FakeTask(0, i, Nil) +} +val tsm = new TaskSetManager(sched, new TaskSet(tasks, 0, 0, 0, props), 2) + +// make some offers to our taskset +var taskDescs = Seq( + "exec1" -> "host1", + "exec2" -> "host1" +).flatMap { case (exec, host) => + // offer each executor twice (simulating 2 cores per executor) + (0 until 2).flatMap{ _ => tsm.resourceOffer(exec, host, TaskLocality.ANY)} +} +assert(taskDescs.size === 2) // Out of the 4 offers, tsm can accept up to maxConcurrentTasks. + +// make 4 more offers +val taskDescs2 = Seq( + "exec1" -> "host1", + "exec2" -> "host1" +).flatMap { case (exec, host) => + (0 until 2).flatMap{ _ => tsm.resourceOffer(exec, host, TaskLocality.ANY)} +} +assert(taskDescs2.size === 0) // tsm doesn't accept any as it is already running at max tasks + +// inform tsm that one task has completed +val directTaskResult = createTaskResult(0) +tsm.handleSuccessfulTask(taskDescs(0).taskId, directTaskResult) + +// make 4 more offers after previous task completed +taskDescs = Seq( + "exec1" -> "host1", + "exec2" -> "host1" +).flatMap { case (exec, host) => + (0 until 2).flatMap{ _ => tsm.resourceOffer(exec, host, TaskLocality.ANY)} +} +assert(taskDescs.size === 1) // tsm accepts one as it can run one more task + } + + test("do not limit max concurrent running tasks in a job group by default") { +val conf = new SparkConf(). + set(config.BLACKLIST_ENABLED, true) + +sc = new SparkContext("local", "test", conf) +sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2")) + +val tasks = Array.tabulate[Task[_]](10) { i => + new FakeTask(0, i, Nil) +} +val tsm = new TaskSetManager(sched, new TaskSet(tasks, 0, 0, 0, null), 2) + +// make 5 offers to our taskset +var taskDescs = Seq( + "exec1" -> "host1", + "exec2" -> "host2" +).flatMap { case (exec, host) => + // offer each executor twice (simulating 2 cores per executor) --- End diff -- yes. It should be 2. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19194: [SPARK-20589] Allow limiting task concurrency per...
Github user dhruve commented on a diff in the pull request: https://github.com/apache/spark/pull/19194#discussion_r139163463 --- Diff: core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala --- @@ -210,22 +216,282 @@ class ExecutorAllocationManagerSuite assert(numExecutorsToAdd(manager) === 1) // Verify that running a task doesn't affect the target -sc.listenerBus.postToAll(SparkListenerTaskStart(1, 0, createTaskInfo(0, 0, "executor-1"))) +sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, createTaskInfo(0, 0, "executor-1"))) assert(numExecutorsTarget(manager) === 5) assert(addExecutors(manager) === 0) assert(numExecutorsToAdd(manager) === 1) // Verify that running a speculative task doesn't affect the target -sc.listenerBus.postToAll(SparkListenerTaskStart(1, 0, createTaskInfo(0, 0, "executor-2", true))) +sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, createTaskInfo(0, 0, "executor-2", true))) assert(numExecutorsTarget(manager) === 5) assert(addExecutors(manager) === 0) assert(numExecutorsToAdd(manager) === 1) } + test("add executors capped by max concurrent tasks for a job group with single core executors") { +val conf = new SparkConf() + .setMaster("myDummyLocalExternalClusterManager") + .setAppName("test-executor-allocation-manager") + .set("spark.dynamicAllocation.enabled", "true") + .set("spark.dynamicAllocation.testing", "true") + .set("spark.job.group1.maxConcurrentTasks", "2") + .set("spark.job.group2.maxConcurrentTasks", "5") +val sc = new SparkContext(conf) +contexts += sc +sc.setJobGroup("group1", "", false) + +val manager = sc.executorAllocationManager.get +val stages = Seq(createStageInfo(0, 10), createStageInfo(1, 10)) +// Submit the job and stage start/submit events +sc.listenerBus.postToAll(SparkListenerJobStart(0, 0, stages, sc.getLocalProperties)) +sc.listenerBus.postToAll(SparkListenerStageSubmitted(stages(0))) + +// Verify that we're capped at number of max concurrent tasks in the stage +assert(maxNumExecutorsNeeded(manager) === 2) + +// Submit another stage in the same job +sc.listenerBus.postToAll(SparkListenerStageSubmitted(stages(1))) +assert(maxNumExecutorsNeeded(manager) === 2) + +sc.listenerBus.postToAll(SparkListenerStageCompleted(stages(0))) +sc.listenerBus.postToAll(SparkListenerStageCompleted(stages(1))) +sc.listenerBus.postToAll(SparkListenerJobEnd(0, 10, JobSucceeded)) + +// Submit a new job in the same job group +val stage2 = createStageInfo(2, 20) +sc.listenerBus.postToAll(SparkListenerJobStart(1, 0, Seq(stage2), sc.getLocalProperties)) +sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage2)) +assert(maxNumExecutorsNeeded(manager) === 2) + +sc.listenerBus.postToAll(SparkListenerStageCompleted(stage2)) +sc.listenerBus.postToAll(SparkListenerJobEnd(1, 10, JobSucceeded)) + +// Set another jobGroup +sc.setJobGroup("group2", "", false) + +val stage3 = createStageInfo(3, 20) +sc.listenerBus.postToAll(SparkListenerJobStart(2, 0, Seq(stage3), sc.getLocalProperties)) +sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage3)) +assert(maxNumExecutorsNeeded(manager) === 5) + +sc.listenerBus.postToAll(SparkListenerStageCompleted(stage3)) +sc.listenerBus.postToAll(SparkListenerJobEnd(2, 10, JobSucceeded)) + +// Clear jobGroup +sc.clearJobGroup() + +val stage4 = createStageInfo(4, 50) +sc.listenerBus.postToAll(SparkListenerJobStart(2, 0, Seq(stage4), sc.getLocalProperties)) +sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage4)) +assert(maxNumExecutorsNeeded(manager) === 50) + } + + test("add executors capped by max concurrent tasks for a job group with multi cores executors") { +val conf = new SparkConf() + .setMaster("myDummyLocalExternalClusterManager") + .setAppName("test-executor-allocation-manager") + .set("spark.dynamicAllocation.enabled", "true") + .set("spark.dynamicAllocation.testing", "true") + .set("spark.job.group1.maxConcurrentTasks", "2") + .set("spark.job.group2.maxConcurrentTasks", "5") + .set("spark.executor.cores", "3") +val sc = new SparkContext(conf)
[GitHub] spark pull request #19194: [SPARK-20589] Allow limiting task concurrency per...
Github user dhruve commented on a diff in the pull request: https://github.com/apache/spark/pull/19194#discussion_r139163529 --- Diff: core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala --- @@ -210,22 +216,282 @@ class ExecutorAllocationManagerSuite assert(numExecutorsToAdd(manager) === 1) // Verify that running a task doesn't affect the target -sc.listenerBus.postToAll(SparkListenerTaskStart(1, 0, createTaskInfo(0, 0, "executor-1"))) +sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, createTaskInfo(0, 0, "executor-1"))) assert(numExecutorsTarget(manager) === 5) assert(addExecutors(manager) === 0) assert(numExecutorsToAdd(manager) === 1) // Verify that running a speculative task doesn't affect the target -sc.listenerBus.postToAll(SparkListenerTaskStart(1, 0, createTaskInfo(0, 0, "executor-2", true))) +sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, createTaskInfo(0, 0, "executor-2", true))) assert(numExecutorsTarget(manager) === 5) assert(addExecutors(manager) === 0) assert(numExecutorsToAdd(manager) === 1) } + test("add executors capped by max concurrent tasks for a job group with single core executors") { +val conf = new SparkConf() + .setMaster("myDummyLocalExternalClusterManager") + .setAppName("test-executor-allocation-manager") + .set("spark.dynamicAllocation.enabled", "true") + .set("spark.dynamicAllocation.testing", "true") + .set("spark.job.group1.maxConcurrentTasks", "2") + .set("spark.job.group2.maxConcurrentTasks", "5") +val sc = new SparkContext(conf) +contexts += sc +sc.setJobGroup("group1", "", false) + +val manager = sc.executorAllocationManager.get +val stages = Seq(createStageInfo(0, 10), createStageInfo(1, 10)) +// Submit the job and stage start/submit events +sc.listenerBus.postToAll(SparkListenerJobStart(0, 0, stages, sc.getLocalProperties)) +sc.listenerBus.postToAll(SparkListenerStageSubmitted(stages(0))) + +// Verify that we're capped at number of max concurrent tasks in the stage +assert(maxNumExecutorsNeeded(manager) === 2) + +// Submit another stage in the same job +sc.listenerBus.postToAll(SparkListenerStageSubmitted(stages(1))) +assert(maxNumExecutorsNeeded(manager) === 2) + +sc.listenerBus.postToAll(SparkListenerStageCompleted(stages(0))) +sc.listenerBus.postToAll(SparkListenerStageCompleted(stages(1))) +sc.listenerBus.postToAll(SparkListenerJobEnd(0, 10, JobSucceeded)) + +// Submit a new job in the same job group +val stage2 = createStageInfo(2, 20) +sc.listenerBus.postToAll(SparkListenerJobStart(1, 0, Seq(stage2), sc.getLocalProperties)) +sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage2)) +assert(maxNumExecutorsNeeded(manager) === 2) + +sc.listenerBus.postToAll(SparkListenerStageCompleted(stage2)) +sc.listenerBus.postToAll(SparkListenerJobEnd(1, 10, JobSucceeded)) + +// Set another jobGroup +sc.setJobGroup("group2", "", false) + +val stage3 = createStageInfo(3, 20) +sc.listenerBus.postToAll(SparkListenerJobStart(2, 0, Seq(stage3), sc.getLocalProperties)) +sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage3)) +assert(maxNumExecutorsNeeded(manager) === 5) + +sc.listenerBus.postToAll(SparkListenerStageCompleted(stage3)) +sc.listenerBus.postToAll(SparkListenerJobEnd(2, 10, JobSucceeded)) + +// Clear jobGroup +sc.clearJobGroup() + +val stage4 = createStageInfo(4, 50) +sc.listenerBus.postToAll(SparkListenerJobStart(2, 0, Seq(stage4), sc.getLocalProperties)) +sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage4)) +assert(maxNumExecutorsNeeded(manager) === 50) + } + + test("add executors capped by max concurrent tasks for a job group with multi cores executors") { +val conf = new SparkConf() + .setMaster("myDummyLocalExternalClusterManager") + .setAppName("test-executor-allocation-manager") + .set("spark.dynamicAllocation.enabled", "true") + .set("spark.dynamicAllocation.testing", "true") + .set("spark.job.group1.maxConcurrentTasks", "2") + .set("spark.job.group2.maxConcurrentTasks", "5") + .set("spark.executor.cores", "3") +val sc = new SparkContext(conf)
[GitHub] spark pull request #19194: [SPARK-20589] Allow limiting task concurrency per...
Github user dhruve commented on a diff in the pull request: https://github.com/apache/spark/pull/19194#discussion_r139163109 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -758,11 +812,58 @@ private[spark] class ExecutorAllocationManager( allocationManager.synchronized { stageIdToNumSpeculativeTasks(stageId) = stageIdToNumSpeculativeTasks.getOrElse(stageId, 0) + 1 +maxConcurrentTasks = getMaxConTasks +logDebug(s"Setting max concurrent tasks to $maxConcurrentTasks on spec. task submitted.") allocationManager.onSchedulerBacklogged() } } /** + * Calculate the maximum no. of concurrent tasks that can run currently. + */ +def getMaxConTasks(): Int = { + // We can limit the no. of concurrent tasks by a job group. A job group can have multiple jobs + // with multiple stages. We need to get all the active stages belonging to a job group to + // calculate the total no. of pending + running tasks to decide the maximum no. of executors + // we need at that time to serve the outstanding tasks. This is capped by the minimum of no. + // of outstanding tasks and the max concurrent limit specified for the job group if any. + + def getIncompleteTasksForStage(stageId: Int, numTasks: Int): Int = { +var runningTasks = 0 +if (stageIdToTaskIndices.contains(stageId)) { + runningTasks = +stageIdToTaskIndices(stageId).size - stageIdToCompleteTaskCount.getOrElse(stageId, 0) --- End diff -- Yes. Nice catch. We do need to account for all the tasks for a stage. And this should include speculative ones as well. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19194: [SPARK-20589] Allow limiting task concurrency per...
Github user dhruve commented on a diff in the pull request: https://github.com/apache/spark/pull/19194#discussion_r139162871 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -758,11 +812,58 @@ private[spark] class ExecutorAllocationManager( allocationManager.synchronized { stageIdToNumSpeculativeTasks(stageId) = stageIdToNumSpeculativeTasks.getOrElse(stageId, 0) + 1 +maxConcurrentTasks = getMaxConTasks +logDebug(s"Setting max concurrent tasks to $maxConcurrentTasks on spec. task submitted.") allocationManager.onSchedulerBacklogged() } } /** + * Calculate the maximum no. of concurrent tasks that can run currently. + */ +def getMaxConTasks(): Int = { + // We can limit the no. of concurrent tasks by a job group. A job group can have multiple jobs + // with multiple stages. We need to get all the active stages belonging to a job group to + // calculate the total no. of pending + running tasks to decide the maximum no. of executors + // we need at that time to serve the outstanding tasks. This is capped by the minimum of no. --- End diff -- okay. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19194: [SPARK-20589] Allow limiting task concurrency per stage
Github user dhruve commented on the issue: https://github.com/apache/spark/pull/19194 Rebased this PR with current master and have squashed the earlier commits. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19194: [SPARK-20589] Allow limiting task concurrency per...
GitHub user dhruve opened a pull request: https://github.com/apache/spark/pull/19194 [SPARK-20589] Allow limiting task concurrency per stage ## What changes were proposed in this pull request? This change allows the user to specify the maximum no. of tasks running in a given job group. (Kindly see the jira comments section for more context on why this is implemented at a job group level rather than a stage level). This change is beneficial where the user wants to avoid having a DoS while trying to access an eternal service from multiple executors without having the need to repartition or coalesce existing RDDs. This code change introduces a new user level configuration: `spark.job.[userJobGroup].maxConcurrentTasks` which is used to set the active no. of tasks executing at a given point in time. The user can use the feature by setting the appropriate jobGroup and passing the conf: ``` conf.set("spark.job.group1.maxConcurrentTasks", "10") ... sc.setJobGroup("group1", "", false) sc.parallelize(1 to 10, 10).map(x => x + 1).count sc.clearJobGroup ``` changes proposed in this fix This change limits the no. of tasks (in turn also the no. of executors to be acquired) than can run simultaneously in a given job group and its subsequent job/s and stage/s if the appropriate job group and max concurrency configs are set. ## How was this patch tested? Ran unit tests and multiple manual tests with various combinations of: - single/multiple/no job groups - executors with single/multi cores - dynamic allocation on/off You can merge this pull request into a Git repository by running: $ git pull https://github.com/dhruve/spark impr/SPARK-20589 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19194.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19194 commit 4281151df9010b4e9fe91e588c07e872b8e0dd69 Author: Dhruve Ashar <dhruveas...@gmail.com> Date: 2017-09-11T16:45:49Z [SPARK-20589] Allow limiting task concurrency per stage --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19157: [SPARK-20589][Core][Scheduler] Allow limiting tas...
Github user dhruve closed the pull request at: https://github.com/apache/spark/pull/19157 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19157: [SPARK-20589][Core][Scheduler] Allow limiting task concu...
Github user dhruve commented on the issue: https://github.com/apache/spark/pull/19157 @HyukjinKwon Thanks for pointing this out. I will do a rebase and then do a push. The message from appveyor wasn't very obvious so I didn't realize. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19157: [SPARK-20589][Core][Scheduler] Allow limiting task concu...
Github user dhruve commented on the issue: https://github.com/apache/spark/pull/19157 Reopened this because CI was having issues with the previous PR. [18950](https://github.com/apache/spark/pull/18950) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19157: [SPARK-20589][Core][Scheduler] Allow limiting task concu...
Github user dhruve commented on the issue: https://github.com/apache/spark/pull/19157 @squito @markhamstra @tgravescs Can you review this PR. Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19157: [SPARK-20589][Core][Scheduler] Allow limiting tas...
GitHub user dhruve opened a pull request: https://github.com/apache/spark/pull/19157 [SPARK-20589][Core][Scheduler] Allow limiting task concurrency per job group ## What changes were proposed in this pull request? This change allows the user to specify the maximum no. of tasks running in a given job group. (Kindly see the jira comments section for more context on why this is implemented at a job group level rather than a stage level). This change is beneficial where the user wants to avoid having a DoS while trying to access an eternal service from multiple executors without having the need to repartition or coalesce existing RDDs. This code change introduces a new user level configuration: `spark.job.[userJobGroup].maxConcurrentTasks` which is used to set the active no. of tasks executing at a given point in time. The user can use the feature by setting the appropriate jobGroup and passing the conf: ``` conf.set("spark.job.group1.maxConcurrentTasks", "10") ... sc.setJobGroup("group1", "", false) sc.parallelize(1 to 10, 10).map(x => x + 1).count sc.clearJobGroup ``` changes proposed in this fix This change limits the no. of tasks (in turn also the no. of executors to be acquired) than can run simultaneously in a given job group and its subsequent job/s and stage/s if the appropriate job group and max concurrency configs are set. ## How was this patch tested? Ran unit tests and multiple manual tests with various combinations of: - single/multiple/no job groups - executors with single/multi cores - dynamic allocation on/off You can merge this pull request into a Git repository by running: $ git pull https://github.com/dhruve/spark impr/SPARK-20589 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19157.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19157 commit 824396c82977171c38ab5d7f6c0f84bc19eccaba Author: Dhruve Ashar <dhruveas...@gmail.com> Date: 2017-08-15T14:18:21Z [SPARK-20589] Allow limiting task concurrency per stage commit d3f8162dab4ca7065d7f296fd03528ce6ddfb923 Author: Dhruve Ashar <dhruveas...@gmail.com> Date: 2017-08-15T14:45:18Z Merge branch 'master' of github.com:apache/spark into impr/SPARK-20589 commit 824621286ffb107010409c4d0d3442550628247d Author: Dhruve Ashar <dhruveas...@gmail.com> Date: 2017-08-21T16:51:41Z Allow limiting task concurrency per stage in concurrent job groups commit 517acb490ae5938a22c4175347f6bbc24b47781f Author: Dhruve Ashar <dhruveas...@gmail.com> Date: 2017-08-21T19:30:17Z Remove comment commit 65941f7884551e84a13a6cc2e7488a01e7d8beec Author: Dhruve Ashar <dhruveas...@gmail.com> Date: 2017-08-21T19:42:05Z Fix comment style commit 7aba73a31808f6b1017b85dfd4dd19e28365bd97 Author: Dhruve Ashar <dhruveas...@gmail.com> Date: 2017-08-22T14:54:10Z Merge branch 'master' of github.com:apache/spark into impr/SPARK-20589 commit 0e518f00ce97fd5d17fe89792c2503d2514b0473 Author: Dhruve Ashar <dhruveas...@gmail.com> Date: 2017-08-22T15:38:01Z Fix new unit test and add comments commit 8b3830004d69bd5f109fd9846f59583c23a910c7 Author: Dhruve Ashar <dhruveas...@gmail.com> Date: 2017-09-05T20:14:02Z Resolve merge conflict and add test for speculative task --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18950: [SPARK-20589][Core][Scheduler] Allow limiting task concu...
Github user dhruve commented on the issue: https://github.com/apache/spark/pull/18950 CI is having issues downloading my repo. Closing this PR and opening a new one. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18950: [SPARK-20589][Core][Scheduler] Allow limiting tas...
Github user dhruve closed the pull request at: https://github.com/apache/spark/pull/18950 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18950: [SPARK-20589][Core][Scheduler] Allow limiting tas...
Github user dhruve commented on a diff in the pull request: https://github.com/apache/spark/pull/18950#discussion_r134608269 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -598,13 +600,58 @@ private[spark] class ExecutorAllocationManager( private val executorIdToTaskIds = new mutable.HashMap[String, mutable.HashSet[Long]] // Number of tasks currently running on the cluster. Should be 0 when no stages are active. private var numRunningTasks: Int = _ +private val jobGroupToMaxConTasks = new mutable.HashMap[String, Int] +private val jobIdToJobGroup = new mutable.HashMap[Int, String] +private val stageIdToJobId = new mutable.HashMap[Int, Int] +private val stageIdToCompleteTaskCount = new mutable.HashMap[Int, Int] // stageId to tuple (the number of task with locality preferences, a map where each pair is a // node and the number of tasks that would like to be scheduled on that node) map, // maintain the executor placement hints for each stage Id used by resource framework to better // place the executors. private val stageIdToExecutorPlacementHints = new mutable.HashMap[Int, (Int, Map[String, Int])] +override def onJobStart(jobStart: SparkListenerJobStart): Unit = { + jobStart.stageInfos.foreach(stageInfo => stageIdToJobId(stageInfo.stageId) = jobStart.jobId) + + var jobGroupId = if (jobStart.properties != null) { +jobStart.properties.getProperty(SparkContext.SPARK_JOB_GROUP_ID) + } else { +null + } + + val maxConTasks = if (jobGroupId != null && +conf.contains(s"spark.job.$jobGroupId.maxConcurrentTasks")) { +conf.get(s"spark.job.$jobGroupId.maxConcurrentTasks").toInt + } else { +Int.MaxValue + } + + if (maxConTasks <= 0) { +throw new IllegalArgumentException( + "Maximum Concurrent Tasks should be set greater than 0 for the job to progress.") + } + + if (jobGroupId == null || !conf.contains(s"spark.job.$jobGroupId.maxConcurrentTasks")) { +jobGroupId = "default-group-" + jobStart.jobId.hashCode --- End diff -- Okay I will have the default job group named as `__default_job_group`. Let me know if we want to change it to a different one. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18950: [SPARK-20589][Core][Scheduler] Allow limiting tas...
Github user dhruve commented on a diff in the pull request: https://github.com/apache/spark/pull/18950#discussion_r134608326 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -727,6 +780,68 @@ private[spark] class ExecutorAllocationManager( } /** + * Calculate the maximum no. of concurrent tasks that can run currently. + */ +def getMaxConTasks(): Int = { + // We can limit the no. of concurrent tasks by a job group and multiple jobs can run with --- End diff -- Okay. I will update the comment to make it more clear. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18950: [SPARK-20589][Core][Scheduler] Allow limiting task concu...
Github user dhruve commented on the issue: https://github.com/apache/spark/pull/18950 @squito I will pull the test from the latest master and update it with the changes we made. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18950: [SPARK-20589][Core][Scheduler] Allow limiting task concu...
Github user dhruve commented on the issue: https://github.com/apache/spark/pull/18950 @squito @markhamstra I addressed the comments and have made the changes to account for running different job groups concurrently. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18950: [SPARK-20589][Core][Scheduler] Allow limiting tas...
GitHub user dhruve reopened a pull request: https://github.com/apache/spark/pull/18950 [SPARK-20589][Core][Scheduler] Allow limiting task concurrency per job group ## What changes were proposed in this pull request? This change allows the user to specify the maximum no. of tasks running in a given job group. (Kindly see the jira comments section for more context on why this is implemented at a job group level rather than a stage level). This change is beneficial where the user wants to avoid having a DoS while trying to access an eternal service from multiple executors without having the need to repartition or coalesce existing RDDs. This code change introduces a new user level configuration: `spark.job.[userJobGroup].maxConcurrentTasks` which is used to set the active no. of tasks executing at a given point in time. The user can use the feature by setting the appropriate jobGroup and passing the conf: ``` conf.set("spark.job.group1.maxConcurrentTasks", "10") ... sc.setJobGroup("group1", "", false) sc.parallelize(1 to 10, 10).map(x => x + 1).count sc.clearJobGroup ``` changes proposed in this fix This change limits the no. of tasks (in turn also the no. of executors to be acquired) than can run simultaneously in a given job group and its subsequent job/s and stage/s if the appropriate job group and max concurrency configs are set. ## How was this patch tested? Ran unit tests and multiple manual tests with various combinations of: - single/multiple/no job groups - executors with single/multi cores - dynamic allocation on/off You can merge this pull request into a Git repository by running: $ git pull https://github.com/dhruve/spark impr/SPARK-20589 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/18950.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #18950 commit 824396c82977171c38ab5d7f6c0f84bc19eccaba Author: Dhruve Ashar <dhruveas...@gmail.com> Date: 2017-08-15T14:18:21Z [SPARK-20589] Allow limiting task concurrency per stage commit d3f8162dab4ca7065d7f296fd03528ce6ddfb923 Author: Dhruve Ashar <dhruveas...@gmail.com> Date: 2017-08-15T14:45:18Z Merge branch 'master' of github.com:apache/spark into impr/SPARK-20589 commit 824621286ffb107010409c4d0d3442550628247d Author: Dhruve Ashar <dhruveas...@gmail.com> Date: 2017-08-21T16:51:41Z Allow limiting task concurrency per stage in concurrent job groups commit 517acb490ae5938a22c4175347f6bbc24b47781f Author: Dhruve Ashar <dhruveas...@gmail.com> Date: 2017-08-21T19:30:17Z Remove comment --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18950: [SPARK-20589][Core][Scheduler] Allow limiting tas...
Github user dhruve closed the pull request at: https://github.com/apache/spark/pull/18950 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18950: [SPARK-20589][Core][Scheduler] Allow limiting tas...
Github user dhruve commented on a diff in the pull request: https://github.com/apache/spark/pull/18950#discussion_r133547976 --- Diff: core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala --- @@ -188,6 +188,125 @@ class ExecutorAllocationManagerSuite assert(numExecutorsTarget(manager) === 10) } + test("add executors capped by max concurrent tasks for a job group with single core executors") { +val conf = new SparkConf() + .setMaster("myDummyLocalExternalClusterManager") + .setAppName("test-executor-allocation-manager") + .set("spark.dynamicAllocation.enabled", "true") + .set("spark.dynamicAllocation.testing", "true") + .set("spark.job.group1.maxConcurrentTasks", "2") + .set("spark.job.group2.maxConcurrentTasks", "5") +val sc = new SparkContext(conf) +contexts += sc +sc.setJobGroup("group1", "", false) + +val manager = sc.executorAllocationManager.get +val stage0 = createStageInfo(0, 10) +// Submit the job and stage start/submit events +sc.listenerBus.postToAll(SparkListenerJobStart(0, 0, Seq{stage0}, sc.getLocalProperties)) +sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage0)) + +// Verify that we're capped at number of max concurrent tasks in the stage +assert(maxNumExecutorsNeeded(manager) === 2) + +// Submit another stage in the same job +val stage1 = createStageInfo(1, 10) +sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage1)) +assert(maxNumExecutorsNeeded(manager) === 2) + +sc.listenerBus.postToAll(SparkListenerStageCompleted(stage0)) +sc.listenerBus.postToAll(SparkListenerStageCompleted(stage1)) +sc.listenerBus.postToAll(SparkListenerJobEnd(0, 10, JobSucceeded)) + +// Submit a new job in the same job group +val stage2 = createStageInfo(2, 20) +sc.listenerBus.postToAll(SparkListenerJobStart(1, 0, Seq{stage2}, sc.getLocalProperties)) +sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage2)) +assert(maxNumExecutorsNeeded(manager) === 2) + +sc.listenerBus.postToAll(SparkListenerStageCompleted(stage2)) +sc.listenerBus.postToAll(SparkListenerJobEnd(1, 10, JobSucceeded)) + +// Set another jobGroup +sc.setJobGroup("group2", "", false) + +val stage3 = createStageInfo(3, 20) +sc.listenerBus.postToAll(SparkListenerJobStart(2, 0, Seq{stage3}, sc.getLocalProperties)) +sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage3)) +assert(maxNumExecutorsNeeded(manager) === 5) + +sc.listenerBus.postToAll(SparkListenerStageCompleted(stage3)) +sc.listenerBus.postToAll(SparkListenerJobEnd(2, 10, JobSucceeded)) + +// Clear jobGroup +sc.clearJobGroup() + +val stage4 = createStageInfo(4, 50) +sc.listenerBus.postToAll(SparkListenerJobStart(2, 0, Seq{stage4}, sc.getLocalProperties)) +sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage4)) +assert(maxNumExecutorsNeeded(manager) === 50) + } + + test("add executors capped by max concurrent tasks for a job group with multi cores executors") { +val conf = new SparkConf() + .setMaster("myDummyLocalExternalClusterManager") + .setAppName("test-executor-allocation-manager") + .set("spark.dynamicAllocation.enabled", "true") + .set("spark.dynamicAllocation.testing", "true") + .set("spark.job.group1.maxConcurrentTasks", "2") + .set("spark.job.group2.maxConcurrentTasks", "5") + .set("spark.executor.cores", "3") +val sc = new SparkContext(conf) +contexts += sc +sc.setJobGroup("group1", "", false) + +val manager = sc.executorAllocationManager.get +val stage0 = createStageInfo(0, 10) +// Submit the job and stage start/submit events +sc.listenerBus.postToAll(SparkListenerJobStart(0, 0, Seq{stage0}, sc.getLocalProperties)) +sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage0)) + +// Verify that we're capped at number of max concurrent tasks in the stage +assert(maxNumExecutorsNeeded(manager) === 1) + +// Submit another stage in the same job +val stage1 = createStageInfo(1, 10) +sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage1)) +assert(maxNumExecutorsNeeded(manager) === 1) + +sc.listenerBus.postToAll(SparkListenerStageCompleted(stage0)) +sc.listenerB
[GitHub] spark pull request #18950: [SPARK-20589][Core][Scheduler] Allow limiting tas...
Github user dhruve commented on a diff in the pull request: https://github.com/apache/spark/pull/18950#discussion_r133547780 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala --- @@ -454,64 +477,68 @@ private[spark] class TaskSetManager( } } - dequeueTask(execId, host, allowedLocality).map { case ((index, taskLocality, speculative)) => -// Found a task; do some bookkeeping and return a task description -val task = tasks(index) -val taskId = sched.newTaskId() -// Do various bookkeeping -copiesRunning(index) += 1 -val attemptNum = taskAttempts(index).size -val info = new TaskInfo(taskId, index, attemptNum, curTime, - execId, host, taskLocality, speculative) -taskInfos(taskId) = info -taskAttempts(index) = info :: taskAttempts(index) -// Update our locality level for delay scheduling -// NO_PREF will not affect the variables related to delay scheduling -if (maxLocality != TaskLocality.NO_PREF) { - currentLocalityIndex = getLocalityIndex(taskLocality) - lastLaunchTime = curTime -} -// Serialize and return the task -val serializedTask: ByteBuffer = try { - ser.serialize(task) -} catch { - // If the task cannot be serialized, then there's no point to re-attempt the task, - // as it will always fail. So just abort the whole task-set. - case NonFatal(e) => -val msg = s"Failed to serialize task $taskId, not attempting to retry it." -logError(msg, e) -abort(s"$msg Exception during serialization: $e") -throw new TaskNotSerializableException(e) -} -if (serializedTask.limit > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024 && - !emittedTaskSizeWarning) { - emittedTaskSizeWarning = true - logWarning(s"Stage ${task.stageId} contains a task of very large size " + -s"(${serializedTask.limit / 1024} KB). The maximum recommended task size is " + -s"${TaskSetManager.TASK_SIZE_TO_WARN_KB} KB.") -} -addRunningTask(taskId) - -// We used to log the time it takes to serialize the task, but task size is already -// a good proxy to task serialization time. -// val timeTaken = clock.getTime() - startTime -val taskName = s"task ${info.id} in stage ${taskSet.id}" -logInfo(s"Starting $taskName (TID $taskId, $host, executor ${info.executorId}, " + - s"partition ${task.partitionId}, $taskLocality, ${serializedTask.limit} bytes)") - -sched.dagScheduler.taskStarted(task, info) -new TaskDescription( - taskId, - attemptNum, - execId, - taskName, - index, - addedFiles, - addedJars, - task.localProperties, - serializedTask) + dequeueTask(execId, host, allowedLocality).map { --- End diff -- yes. Will fix it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18950: [SPARK-20589][Core][Scheduler] Allow limiting tas...
Github user dhruve commented on a diff in the pull request: https://github.com/apache/spark/pull/18950#discussion_r133547673 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -602,6 +604,21 @@ private[spark] class ExecutorAllocationManager( // place the executors. private val stageIdToExecutorPlacementHints = new mutable.HashMap[Int, (Int, Map[String, Int])] +override def onJobStart(jobStart: SparkListenerJobStart): Unit = { + val jobGroupId = if (jobStart.properties != null) { +jobStart.properties.getProperty(SparkContext.SPARK_JOB_GROUP_ID) + } else { +"" + } + val maxConcurrentTasks = conf.getInt(s"spark.job.$jobGroupId.maxConcurrentTasks", +Int.MaxValue) + + logInfo(s"Setting maximum concurrent tasks for group: ${jobGroupId} to $maxConcurrentTasks") + allocationManager.synchronized { +allocationManager.maxConcurrentTasks = maxConcurrentTasks --- End diff -- @markhamstra Thanks for the feedback. I missed the properties being serialized so we can have multiple job groups running simultaneously. I am working on changes to address your comments and will update the PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18950: [SPARK-20589][Core][Scheduler] Allow limiting tas...
Github user dhruve commented on a diff in the pull request: https://github.com/apache/spark/pull/18950#discussion_r133321125 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -602,6 +604,21 @@ private[spark] class ExecutorAllocationManager( // place the executors. private val stageIdToExecutorPlacementHints = new mutable.HashMap[Int, (Int, Map[String, Int])] +override def onJobStart(jobStart: SparkListenerJobStart): Unit = { + val jobGroupId = if (jobStart.properties != null) { +jobStart.properties.getProperty(SparkContext.SPARK_JOB_GROUP_ID) + } else { +"" + } + val maxConcurrentTasks = conf.getInt(s"spark.job.$jobGroupId.maxConcurrentTasks", +Int.MaxValue) + + logInfo(s"Setting maximum concurrent tasks for group: ${jobGroupId} to $maxConcurrentTasks") + allocationManager.synchronized { +allocationManager.maxConcurrentTasks = maxConcurrentTasks --- End diff -- I am sorry if the config name caused the confusion. The limit is per jobGroup and not per job. so we can really name it as `spark.jobGroup.[userJobGroup].maxConcurrentTasks`. Also spark allows us to set only a single job group at any given point in time with a single spark context. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18950: [SPARK-20589][Core][Scheduler] Allow limiting task concu...
Github user dhruve commented on the issue: https://github.com/apache/spark/pull/18950 @kayousterhout @squito Can you review this PR ? Thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18950: [SPARK-20589][Core][Scheduler] Allow limiting tas...
GitHub user dhruve opened a pull request: https://github.com/apache/spark/pull/18950 [SPARK-20589][Core][Scheduler] Allow limiting task concurrency per job group ## What changes were proposed in this pull request? This change allows the user to specify the maximum no. of tasks running in a given job group. (Kindly see the jira comments section for more context on why this is implemented at a job group level rather than a stage level). This change is beneficial where the user wants to avoid having a DoS while trying to access an eternal service from multiple executors without having the need to repartition or coalesce existing RDDs. This code change introduces a new user level configuration: `spark.job.[userJobGroup].maxConcurrentTasks` which is used to set the active no. of tasks executing at a given point in time. The user can use the feature by setting the appropriate jobGroup and passing the conf: `conf.set("spark.job.group1.maxConcurrentTasks", "10")` `...` `sc.setJobGroup("group1", "", false)` `sc.parallelize(1 to 10, 10).map(x => x + 1).count` `sc.clearJobGroup` ` changes proposed in this fix This change limits the no. of tasks (in turn also the no. of executors to be acquired) than can run simultaneously in a given job group and its subsequent job/s and stage/s if the appropriate job group and max concurrency configs are set. ## How was this patch tested? Ran unit tests and multiple manual tests with various combinations of: - single/multiple/no job groups - executors with single/multi cores - dynamic allocation on/off You can merge this pull request into a Git repository by running: $ git pull https://github.com/dhruve/spark impr/SPARK-20589 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/18950.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #18950 commit 824396c82977171c38ab5d7f6c0f84bc19eccaba Author: Dhruve Ashar <dhruveas...@gmail.com> Date: 2017-08-15T14:18:21Z [SPARK-20589] Allow limiting task concurrency per stage commit d3f8162dab4ca7065d7f296fd03528ce6ddfb923 Author: Dhruve Ashar <dhruveas...@gmail.com> Date: 2017-08-15T14:45:18Z Merge branch 'master' of github.com:apache/spark into impr/SPARK-20589 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18691: [SPARK-21243][Core] Limit no. of map outputs in a...
Github user dhruve closed the pull request at: https://github.com/apache/spark/pull/18691 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18691: [SPARK-21243][Core] Limit no. of map outputs in a shuffl...
Github user dhruve commented on the issue: https://github.com/apache/spark/pull/18691 Thanks @tgravescs Closing the PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18487: [SPARK-21243][Core] Limit no. of map outputs in a...
Github user dhruve commented on a diff in the pull request: https://github.com/apache/spark/pull/18487#discussion_r128793623 --- Diff: core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala --- @@ -375,6 +390,7 @@ final class ShuffleBlockFetcherIterator( result match { case r @ SuccessFetchResult(blockId, address, size, buf, isNetworkReqDone) => if (address != blockManager.blockManagerId) { +numBlocksInFlightPerAddress(address) = numBlocksInFlightPerAddress(address) - 1 --- End diff -- @cloud-fan filed a JIRA for this => https://issues.apache.org/jira/browse/SPARK-21500 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18691: [SPARK-21243][Core] Limit no. of map outputs in a shuffl...
Github user dhruve commented on the issue: https://github.com/apache/spark/pull/18691 @cloud-fan @tgravescs I have resolved the merge conflicts for 2.2. This was just related to remove extra configs. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18487: [SPARK-21243][Core] Limit no. of map outputs in a shuffl...
Github user dhruve commented on the issue: https://github.com/apache/spark/pull/18487 @tgravescs Thanks for merging this. I have created a PR for 2.2 https://github.com/apache/spark/pull/18691 I had to remove a couple of newer config entries which landed while resolving a merge conflict. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18691: [SPARK-21243][Core] Limit no. of map outputs in a...
GitHub user dhruve opened a pull request: https://github.com/apache/spark/pull/18691 [SPARK-21243][Core] Limit no. of map outputs in a shuffle fetch For configurations with external shuffle enabled, we have observed that if a very large no. of blocks are being fetched from a remote host, it puts the NM under extra pressure and can crash it. This change introduces a configuration `spark.reducer.maxBlocksInFlightPerAddress` , to limit the no. of map outputs being fetched from a given remote address. The changes applied here are applicable for both the scenarios - when external shuffle is enabled as well as disabled. Ran the job with the default configuration which does not change the existing behavior and ran it with few configurations of lower values -10,20,50,100. The job ran fine and there is no change in the output. (I will update the metrics related to NM in some time.) Author: Dhruve Ashar <dhruveas...@gmail.com> Closes #18487 from dhruve/impr/SPARK-21243. ## What changes were proposed in this pull request? (Please fill in changes proposed in this fix) ## How was this patch tested? (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Please review http://spark.apache.org/contributing.html before opening a pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/dhruve/spark branch-2.2 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/18691.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #18691 commit 036eb17d32c1b9b0f76ab9c0c4a10f6248728476 Author: Dhruve Ashar <dhruveas...@gmail.com> Date: 2017-07-19T20:53:28Z [SPARK-21243][Core] Limit no. of map outputs in a shuffle fetch For configurations with external shuffle enabled, we have observed that if a very large no. of blocks are being fetched from a remote host, it puts the NM under extra pressure and can crash it. This change introduces a configuration `spark.reducer.maxBlocksInFlightPerAddress` , to limit the no. of map outputs being fetched from a given remote address. The changes applied here are applicable for both the scenarios - when external shuffle is enabled as well as disabled. Ran the job with the default configuration which does not change the existing behavior and ran it with few configurations of lower values -10,20,50,100. The job ran fine and there is no change in the output. (I will update the metrics related to NM in some time.) Author: Dhruve Ashar <dhruveas...@gmail.com> Closes #18487 from dhruve/impr/SPARK-21243. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18487: [SPARK-21243][Core] Limit no. of map outputs in a...
Github user dhruve commented on a diff in the pull request: https://github.com/apache/spark/pull/18487#discussion_r128572125 --- Diff: core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala --- @@ -375,6 +390,7 @@ final class ShuffleBlockFetcherIterator( result match { case r @ SuccessFetchResult(blockId, address, size, buf, isNetworkReqDone) => if (address != blockManager.blockManagerId) { +numBlocksInFlightPerAddress(address) = numBlocksInFlightPerAddress(address) - 1 --- End diff -- That is a good point. Infact, we could also move the other bookkeeping stuff right after the fetch result is enqueued. I would also want to look at the initialization of the BlockFetchingListener to see the effects of this as it would increase the size of the closure. Can we have a separate JIRA filed for this? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18487: [SPARK-21243][Core] Limit no. of map outputs in a shuffl...
Github user dhruve commented on the issue: https://github.com/apache/spark/pull/18487 @cloud-fan replied to your comments. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18487: [SPARK-21243][Core] Limit no. of map outputs in a...
Github user dhruve commented on a diff in the pull request: https://github.com/apache/spark/pull/18487#discussion_r128573651 --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala --- @@ -321,6 +321,17 @@ package object config { .intConf .createWithDefault(3) + private[spark] val REDUCER_MAX_BLOCKS_IN_FLIGHT_PER_ADDRESS = +ConfigBuilder("spark.reducer.maxBlocksInFlightPerAddress") + .doc("This configuration limits the number of remote blocks being fetched per reduce task" + +" from a given host port. When a large number of blocks are being requested from a given" + +" address in a single fetch or simultaneously, this could crash the serving executor or" + +" Node Manager. This is especially useful to reduce the load on the Node Manager when" + --- End diff -- If the shuffle service fails it can take down the Node Manager which is more severe and hence i have used it. And in the following sentence i have mentioned the external shuffle. If it is not clear, I am okay to change it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18487: [SPARK-21243][Core] Limit no. of map outputs in a...
Github user dhruve commented on a diff in the pull request: https://github.com/apache/spark/pull/18487#discussion_r128559318 --- Diff: core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala --- @@ -443,12 +459,57 @@ final class ShuffleBlockFetcherIterator( } private def fetchUpToMaxBytes(): Unit = { -// Send fetch requests up to maxBytesInFlight -while (fetchRequests.nonEmpty && - (bytesInFlight == 0 || -(reqsInFlight + 1 <= maxReqsInFlight && - bytesInFlight + fetchRequests.front.size <= maxBytesInFlight))) { - sendRequest(fetchRequests.dequeue()) +// Send fetch requests up to maxBytesInFlight. If you cannot fetch from a remote host +// immediately, defer the request until the next time it can be processed. + +// Process any outstanding deferred fetch requests if possible. +if (deferredFetchRequests.nonEmpty) { + for ((remoteAddress, defReqQueue) <- deferredFetchRequests) { +while (isRemoteBlockFetchable(defReqQueue) && +!isRemoteAddressMaxedOut(remoteAddress, defReqQueue.front)) { + val request = defReqQueue.dequeue() + logDebug(s"Processing deferred fetch request for $remoteAddress with " ++ s"${request.blocks.length} blocks") + send(remoteAddress, request) + if (defReqQueue.isEmpty) { +deferredFetchRequests -= remoteAddress + } +} + } +} + +// Process any regular fetch requests if possible. +while (isRemoteBlockFetchable(fetchRequests)) { + val request = fetchRequests.dequeue() + val remoteAddress = request.address + if (isRemoteAddressMaxedOut(remoteAddress, request)) { +logDebug(s"Deferring fetch request for $remoteAddress with ${request.blocks.size} blocks") +val defReqQueue = deferredFetchRequests.getOrElse(remoteAddress, new Queue[FetchRequest]()) +defReqQueue.enqueue(request) +deferredFetchRequests(remoteAddress) = defReqQueue --- End diff -- If it is the first time that we want to defer a request, `defReqQueue` has to be associated with its corresponding '`remoteAddress` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18487: [SPARK-21243][Core] Limit no. of map outputs in a...
Github user dhruve commented on a diff in the pull request: https://github.com/apache/spark/pull/18487#discussion_r128557269 --- Diff: core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala --- @@ -443,12 +459,57 @@ final class ShuffleBlockFetcherIterator( } private def fetchUpToMaxBytes(): Unit = { -// Send fetch requests up to maxBytesInFlight -while (fetchRequests.nonEmpty && - (bytesInFlight == 0 || -(reqsInFlight + 1 <= maxReqsInFlight && - bytesInFlight + fetchRequests.front.size <= maxBytesInFlight))) { - sendRequest(fetchRequests.dequeue()) +// Send fetch requests up to maxBytesInFlight. If you cannot fetch from a remote host +// immediately, defer the request until the next time it can be processed. + +// Process any outstanding deferred fetch requests if possible. +if (deferredFetchRequests.nonEmpty) { + for ((remoteAddress, defReqQueue) <- deferredFetchRequests) { +while (isRemoteBlockFetchable(defReqQueue) && +!isRemoteAddressMaxedOut(remoteAddress, defReqQueue.front)) { + val request = defReqQueue.dequeue() + logDebug(s"Processing deferred fetch request for $remoteAddress with " ++ s"${request.blocks.length} blocks") + send(remoteAddress, request) + if (defReqQueue.isEmpty) { +deferredFetchRequests -= remoteAddress --- End diff -- We would have to unnecessarily iterate through the map for all the block manager ids for which we deferred fetch requests at an earlier point to check if they have any pending fetch requests when they don't. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18487: [SPARK-21243][Core] Limit no. of map outputs in a...
Github user dhruve commented on a diff in the pull request: https://github.com/apache/spark/pull/18487#discussion_r127998722 --- Diff: core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala --- @@ -277,11 +290,13 @@ final class ShuffleBlockFetcherIterator( } else if (size < 0) { throw new BlockException(blockId, "Negative block size " + size) } - if (curRequestSize >= targetRequestSize) { + if (curRequestSize >= targetRequestSize || + curBlocks.size >= maxBlocksInFlightPerAddress) { --- End diff -- We are already doing it here => https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L330 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org