[GitHub] spark issue #22288: [SPARK-22148][SPARK-15815][Scheduler] Acquire new execut...

2018-11-06 Thread dhruve
Github user dhruve commented on the issue:

https://github.com/apache/spark/pull/22288
  
Thanks for the reviews and feedback @tgravescs , @squito !


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22288: [SPARK-22148][SPARK-15815][Scheduler] Acquire new execut...

2018-11-01 Thread dhruve
Github user dhruve commented on the issue:

https://github.com/apache/spark/pull/22288
  
@squito I have tested it again with both scenarios and I was able to verify 
the expected behavior. For the cases that are not covered in the PR, i will 
mention them in the jira. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22288: [SPARK-22148][SPARK-15815][Scheduler] Acquire new execut...

2018-10-31 Thread dhruve
Github user dhruve commented on the issue:

https://github.com/apache/spark/pull/22288
  
@tgravescs I have fixed a nit and its good to be reviewed. @squito I have 
updated the comment, let me know if its okay.

Thanks for the reviews.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22288: [SPARK-22148][SPARK-15815][Scheduler] Acquire new...

2018-10-26 Thread dhruve
Github user dhruve commented on a diff in the pull request:

https://github.com/apache/spark/pull/22288#discussion_r228637254
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -415,9 +420,54 @@ private[spark] class TaskSchedulerImpl(
 launchedAnyTask |= launchedTaskAtCurrentMaxLocality
   } while (launchedTaskAtCurrentMaxLocality)
 }
+
 if (!launchedAnyTask) {
-  taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
+  taskSet.getCompletelyBlacklistedTaskIfAny(hostToExecutors) match 
{
+case Some(taskIndex) => // Returns the taskIndex which was 
unschedulable
+
+  // If the taskSet is unschedulable we try to find an 
existing idle blacklisted
+  // executor. If we cannot find one, we abort immediately. 
Else we kill the idle
+  // executor and kick off an abortTimer which if it doesn't 
schedule a task within the
+  // the timeout will abort the taskSet if we were unable to 
schedule any task from the
+  // taskSet.
+  // Note 1: We keep track of schedulability on a per taskSet 
basis rather than on a per
+  // task basis.
+  // Note 2: The taskSet can still be aborted when there are 
more than one idle
+  // blacklisted executors and dynamic allocation is on. This 
can happen when a killed
+  // idle executor isn't replaced in time by 
ExecutorAllocationManager as it relies on
+  // pending tasks and doesn't kill executors on idle 
timeouts, resulting in the abort
+  // timer to expire and abort the taskSet.
+  executorIdToRunningTaskIds.find(x => !isExecutorBusy(x._1)) 
match {
+case Some ((executorId, _)) =>
+  if (!unschedulableTaskSetToExpiryTime.contains(taskSet)) 
{
+blacklistTrackerOpt.foreach(blt => 
blt.killBlacklistedIdleExecutor(executorId))
+
+val timeout = 
conf.get(config.UNSCHEDULABLE_TASKSET_TIMEOUT) * 1000
+unschedulableTaskSetToExpiryTime(taskSet) = 
clock.getTimeMillis() + timeout
+logInfo(s"Waiting for $timeout ms for completely "
+  + s"blacklisted task to be schedulable again before 
aborting $taskSet.")
+abortTimer.schedule(
+  createUnschedulableTaskSetAbortTimer(taskSet, 
taskIndex), timeout)
+  }
+case _ => // Abort Immediately
+  logInfo("Cannot schedule any task because of complete 
blacklisting. No idle" +
+s" executors can be found to kill. Aborting $taskSet." 
)
+  taskSet.abortSinceCompletelyBlacklisted(taskIndex)
+  }
+case _ => // Do nothing if no tasks completely blacklisted.
+  }
+} else {
+  // We want to defer killing any taskSets as long as we have a 
non blacklisted executor
+  // which can be used to schedule a task from any active 
taskSets. This ensures that the
+  // job can make progress and if we encounter a flawed taskSet it 
will eventually either
+  // fail or abort due to being completely blacklisted.
--- End diff --

Your understanding is correct. I will update the comment.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22288: [SPARK-22148][SPARK-15815][Scheduler] Acquire new...

2018-10-26 Thread dhruve
Github user dhruve commented on a diff in the pull request:

https://github.com/apache/spark/pull/22288#discussion_r228636999
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -415,9 +420,54 @@ private[spark] class TaskSchedulerImpl(
 launchedAnyTask |= launchedTaskAtCurrentMaxLocality
   } while (launchedTaskAtCurrentMaxLocality)
 }
+
 if (!launchedAnyTask) {
-  taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
+  taskSet.getCompletelyBlacklistedTaskIfAny(hostToExecutors) match 
{
+case Some(taskIndex) => // Returns the taskIndex which was 
unschedulable
+
+  // If the taskSet is unschedulable we try to find an 
existing idle blacklisted
+  // executor. If we cannot find one, we abort immediately. 
Else we kill the idle
+  // executor and kick off an abortTimer which if it doesn't 
schedule a task within the
+  // the timeout will abort the taskSet if we were unable to 
schedule any task from the
+  // taskSet.
+  // Note 1: We keep track of schedulability on a per taskSet 
basis rather than on a per
+  // task basis.
+  // Note 2: The taskSet can still be aborted when there are 
more than one idle
+  // blacklisted executors and dynamic allocation is on. This 
can happen when a killed
+  // idle executor isn't replaced in time by 
ExecutorAllocationManager as it relies on
+  // pending tasks and doesn't kill executors on idle 
timeouts, resulting in the abort
+  // timer to expire and abort the taskSet.
+  executorIdToRunningTaskIds.find(x => !isExecutorBusy(x._1)) 
match {
+case Some ((executorId, _)) =>
+  if (!unschedulableTaskSetToExpiryTime.contains(taskSet)) 
{
+blacklistTrackerOpt.foreach(blt => 
blt.killBlacklistedIdleExecutor(executorId))
+
+val timeout = 
conf.get(config.UNSCHEDULABLE_TASKSET_TIMEOUT) * 1000
+unschedulableTaskSetToExpiryTime(taskSet) = 
clock.getTimeMillis() + timeout
+logInfo(s"Waiting for $timeout ms for completely "
+  + s"blacklisted task to be schedulable again before 
aborting $taskSet.")
+abortTimer.schedule(
+  createUnschedulableTaskSetAbortTimer(taskSet, 
taskIndex), timeout)
+  }
+case _ => // Abort Immediately
--- End diff --

Makes sense. Will update it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22288: [SPARK-22148][SPARK-15815][Scheduler] Acquire new...

2018-10-26 Thread dhruve
Github user dhruve commented on a diff in the pull request:

https://github.com/apache/spark/pull/22288#discussion_r228636880
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -415,9 +420,54 @@ private[spark] class TaskSchedulerImpl(
 launchedAnyTask |= launchedTaskAtCurrentMaxLocality
   } while (launchedTaskAtCurrentMaxLocality)
 }
+
 if (!launchedAnyTask) {
-  taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
+  taskSet.getCompletelyBlacklistedTaskIfAny(hostToExecutors) match 
{
+case Some(taskIndex) => // Returns the taskIndex which was 
unschedulable
+
+  // If the taskSet is unschedulable we try to find an 
existing idle blacklisted
+  // executor. If we cannot find one, we abort immediately. 
Else we kill the idle
+  // executor and kick off an abortTimer which if it doesn't 
schedule a task within the
+  // the timeout will abort the taskSet if we were unable to 
schedule any task from the
+  // taskSet.
+  // Note 1: We keep track of schedulability on a per taskSet 
basis rather than on a per
+  // task basis.
+  // Note 2: The taskSet can still be aborted when there are 
more than one idle
+  // blacklisted executors and dynamic allocation is on. This 
can happen when a killed
+  // idle executor isn't replaced in time by 
ExecutorAllocationManager as it relies on
+  // pending tasks and doesn't kill executors on idle 
timeouts, resulting in the abort
+  // timer to expire and abort the taskSet.
+  executorIdToRunningTaskIds.find(x => !isExecutorBusy(x._1)) 
match {
+case Some ((executorId, _)) =>
+  if (!unschedulableTaskSetToExpiryTime.contains(taskSet)) 
{
+blacklistTrackerOpt.foreach(blt => 
blt.killBlacklistedIdleExecutor(executorId))
+
+val timeout = 
conf.get(config.UNSCHEDULABLE_TASKSET_TIMEOUT) * 1000
+unschedulableTaskSetToExpiryTime(taskSet) = 
clock.getTimeMillis() + timeout
+logInfo(s"Waiting for $timeout ms for completely "
+  + s"blacklisted task to be schedulable again before 
aborting $taskSet.")
+abortTimer.schedule(
+  createUnschedulableTaskSetAbortTimer(taskSet, 
taskIndex), timeout)
+  }
+case _ => // Abort Immediately
+  logInfo("Cannot schedule any task because of complete 
blacklisting. No idle" +
+s" executors can be found to kill. Aborting $taskSet." 
)
+  taskSet.abortSinceCompletelyBlacklisted(taskIndex)
+  }
+case _ => // Do nothing if no tasks completely blacklisted.
--- End diff --

I have seen this style earlier in the code base. Is this a norm (just 
curious)? I read a few scenarios where this would be better. However, 
personally every time I read a foreach, its instinctive to think the entity on 
which its being invoked as an iterable rather than an option, so it feels a bit 
odd.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22288: [SPARK-22148][SPARK-15815][Scheduler] Acquire new execut...

2018-10-23 Thread dhruve
Github user dhruve commented on the issue:

https://github.com/apache/spark/pull/22288
  
@squito for the locality wait, it would be the same as the condition where 
it is not completely blacklisted. I have added a test for this. If we want to 
ensure the sequence for the timeout expiring and the task being scheduled, we 
will have to add some more delay. Let me know if we want to do it, or the test 
seems to suffice.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22288: [SPARK-22148][SPARK-15815][Scheduler] Acquire new...

2018-10-22 Thread dhruve
Github user dhruve commented on a diff in the pull request:

https://github.com/apache/spark/pull/22288#discussion_r227080534
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala ---
@@ -503,6 +507,145 @@ class TaskSchedulerImplSuite extends SparkFunSuite 
with LocalSparkContext with B
 verify(tsm).abort(anyString(), anyObject())
   }
 
+  test("SPARK-22148 abort timer should kick in when task is completely 
blacklisted & no new " +
+"executor can be acquired") {
+// set the abort timer to fail immediately
+taskScheduler = setupSchedulerWithMockTaskSetBlacklist(
+  config.UNSCHEDULABLE_TASKSET_TIMEOUT.key -> "0")
+
+// We have only 1 task remaining with 1 executor
+val taskSet = FakeTask.createTaskSet(numTasks = 1, stageAttemptId = 0)
+taskScheduler.submitTasks(taskSet)
+val tsm = stageToMockTaskSetManager(0)
+
+// submit an offer with one executor
+val firstTaskAttempts = taskScheduler.resourceOffers(IndexedSeq(
+  WorkerOffer("executor0", "host0", 1)
+)).flatten
+
+// Fail the running task
+val failedTask = firstTaskAttempts.find(_.executorId == 
"executor0").get
+taskScheduler.statusUpdate(failedTask.taskId, TaskState.FAILED, 
ByteBuffer.allocate(0))
+// we explicitly call the handleFailedTask method here to avoid adding 
a sleep in the test suite
+// Reason being - handleFailedTask is run by an executor service and 
there is a momentary delay
+// before it is launched and this fails the assertion check.
+tsm.handleFailedTask(failedTask.taskId, TaskState.FAILED, 
UnknownReason)
+when(tsm.taskSetBlacklistHelperOpt.get.isExecutorBlacklistedForTask(
+  "executor0", failedTask.index)).thenReturn(true)
+
+// make an offer on the blacklisted executor.  We won't schedule 
anything, and set the abort
+// timer to kick in immediately
+assert(taskScheduler.resourceOffers(IndexedSeq(
+  WorkerOffer("executor0", "host0", 1)
+)).flatten.size === 0)
+// Wait for the abort timer to kick in. Without sleep the test exits 
before the timer is
+// triggered.
+eventually(timeout(500.milliseconds)) {
+  assert(tsm.isZombie)
+}
+  }
+
+  test("SPARK-22148 try to acquire a new executor when task is 
unschedulable with 1 executor") {
+taskScheduler = setupSchedulerWithMockTaskSetBlacklist(
+  config.UNSCHEDULABLE_TASKSET_TIMEOUT.key -> "10")
+
+// We have only 1 task remaining with 1 executor
+val taskSet = FakeTask.createTaskSet(numTasks = 1, stageAttemptId = 0)
+taskScheduler.submitTasks(taskSet)
+val tsm = stageToMockTaskSetManager(0)
+
+// submit an offer with one executor
+val firstTaskAttempts = taskScheduler.resourceOffers(IndexedSeq(
+  WorkerOffer("executor0", "host0", 1)
+)).flatten
+
+// Fail the running task
+val failedTask = firstTaskAttempts.find(_.executorId == 
"executor0").get
+taskScheduler.statusUpdate(failedTask.taskId, TaskState.FAILED, 
ByteBuffer.allocate(0))
+// we explicitly call the handleFailedTask method here to avoid adding 
a sleep in the test suite
+// Reason being - handleFailedTask is run by an executor service and 
there is a momentary delay
+// before it is launched and this fails the assertion check.
+tsm.handleFailedTask(failedTask.taskId, TaskState.FAILED, 
UnknownReason)
+when(tsm.taskSetBlacklistHelperOpt.get.isExecutorBlacklistedForTask(
+  "executor0", failedTask.index)).thenReturn(true)
+
+// make an offer on the blacklisted executor.  We won't schedule 
anything, and set the abort
+// timer to expire if no new executors could be acquired. We kill the 
existing idle blacklisted
+// executor and try to acquire a new one.
+assert(taskScheduler.resourceOffers(IndexedSeq(
+  WorkerOffer("executor0", "host0", 1)
+)).flatten.size === 0)
+assert(!tsm.isZombie)
+
+// Offer a new executor which should be accepted
+assert(taskScheduler.resourceOffers(IndexedSeq(
+  WorkerOffer("executor1", "host0", 1)
+)).flatten.size === 1)
--- End diff --

We specify the config in `seconds`. The expectation here is that timer 
should expire in `10 seconds`, which I think is sufficient to account for gc 
time. However, we could just remove this as the default is 120s. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22288: [SPARK-22148][SPARK-15815][Scheduler] Acquire new...

2018-10-22 Thread dhruve
Github user dhruve commented on a diff in the pull request:

https://github.com/apache/spark/pull/22288#discussion_r227082382
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -415,9 +420,55 @@ private[spark] class TaskSchedulerImpl(
 launchedAnyTask |= launchedTaskAtCurrentMaxLocality
   } while (launchedTaskAtCurrentMaxLocality)
 }
+
 if (!launchedAnyTask) {
-  taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
+  taskSet.getCompletelyBlacklistedTaskIfAny(hostToExecutors) match 
{
+case Some(taskIndex) => // Returns the taskIndex which was 
unschedulable
+
+  // If the taskSet is unschedulable we try to find an 
existing idle blacklisted
+  // executor. If we cannot find one, we abort immediately. 
Else we kill the idle
+  // executor and kick off an abortTimer which if it doesn't 
schedule a task within the
+  // the timeout will abort the taskSet if we were unable to 
schedule any task from the
+  // taskSet.
+  // Note 1: We keep track of schedulability on a per taskSet 
basis rather than on a per
+  // task basis.
+  // Note 2: The taskSet can still be aborted when there are 
more than one idle
+  // blacklisted executors and dynamic allocation is on. This 
can happen when a killed
+  // idle executor isn't replaced in time by 
ExecutorAllocationManager as it relies on
+  // pending tasks and doesn't kill executors on idle 
timeouts, resulting in the abort
+  // timer to expire and abort the taskSet.
+  executorIdToRunningTaskIds.find(x => !isExecutorBusy(x._1)) 
match {
--- End diff --

I was preferring the code to be more readable. As this isn't a frequently 
running scenario, may be we could keep it. Thoughts?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22288: [SPARK-22148][SPARK-15815][Scheduler] Acquire new...

2018-10-22 Thread dhruve
Github user dhruve commented on a diff in the pull request:

https://github.com/apache/spark/pull/22288#discussion_r227095389
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -415,9 +420,55 @@ private[spark] class TaskSchedulerImpl(
 launchedAnyTask |= launchedTaskAtCurrentMaxLocality
   } while (launchedTaskAtCurrentMaxLocality)
 }
+
 if (!launchedAnyTask) {
-  taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
+  taskSet.getCompletelyBlacklistedTaskIfAny(hostToExecutors) match 
{
+case Some(taskIndex) => // Returns the taskIndex which was 
unschedulable
+
+  // If the taskSet is unschedulable we try to find an 
existing idle blacklisted
+  // executor. If we cannot find one, we abort immediately. 
Else we kill the idle
--- End diff --

By clearing the abort timer as soon as a task is launched we are relaxing 
this situation. 
If there are large backlog of tasks:
- If we acquire new executors or launch new tasks we will defer the check
- If we cannot acquire new executors and we are running with long running 
tasks such that no new tasks can be launched and we have less no. of executors 
compared to max failures, in that case this will end up being harsh. This can 
happen, but seems more like a very specific edge case.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22288: [SPARK-22148][SPARK-15815][Scheduler] Acquire new...

2018-10-22 Thread dhruve
Github user dhruve commented on a diff in the pull request:

https://github.com/apache/spark/pull/22288#discussion_r227080844
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala ---
@@ -503,6 +507,145 @@ class TaskSchedulerImplSuite extends SparkFunSuite 
with LocalSparkContext with B
 verify(tsm).abort(anyString(), anyObject())
   }
 
+  test("SPARK-22148 abort timer should kick in when task is completely 
blacklisted & no new " +
+"executor can be acquired") {
+// set the abort timer to fail immediately
+taskScheduler = setupSchedulerWithMockTaskSetBlacklist(
+  config.UNSCHEDULABLE_TASKSET_TIMEOUT.key -> "0")
+
+// We have only 1 task remaining with 1 executor
+val taskSet = FakeTask.createTaskSet(numTasks = 1, stageAttemptId = 0)
+taskScheduler.submitTasks(taskSet)
+val tsm = stageToMockTaskSetManager(0)
+
+// submit an offer with one executor
+val firstTaskAttempts = taskScheduler.resourceOffers(IndexedSeq(
+  WorkerOffer("executor0", "host0", 1)
+)).flatten
+
+// Fail the running task
+val failedTask = firstTaskAttempts.find(_.executorId == 
"executor0").get
+taskScheduler.statusUpdate(failedTask.taskId, TaskState.FAILED, 
ByteBuffer.allocate(0))
+// we explicitly call the handleFailedTask method here to avoid adding 
a sleep in the test suite
+// Reason being - handleFailedTask is run by an executor service and 
there is a momentary delay
+// before it is launched and this fails the assertion check.
+tsm.handleFailedTask(failedTask.taskId, TaskState.FAILED, 
UnknownReason)
+when(tsm.taskSetBlacklistHelperOpt.get.isExecutorBlacklistedForTask(
+  "executor0", failedTask.index)).thenReturn(true)
+
+// make an offer on the blacklisted executor.  We won't schedule 
anything, and set the abort
+// timer to kick in immediately
+assert(taskScheduler.resourceOffers(IndexedSeq(
+  WorkerOffer("executor0", "host0", 1)
+)).flatten.size === 0)
+// Wait for the abort timer to kick in. Without sleep the test exits 
before the timer is
+// triggered.
+eventually(timeout(500.milliseconds)) {
+  assert(tsm.isZombie)
+}
+  }
+
+  test("SPARK-22148 try to acquire a new executor when task is 
unschedulable with 1 executor") {
+taskScheduler = setupSchedulerWithMockTaskSetBlacklist(
+  config.UNSCHEDULABLE_TASKSET_TIMEOUT.key -> "10")
+
+// We have only 1 task remaining with 1 executor
+val taskSet = FakeTask.createTaskSet(numTasks = 1, stageAttemptId = 0)
+taskScheduler.submitTasks(taskSet)
+val tsm = stageToMockTaskSetManager(0)
+
+// submit an offer with one executor
+val firstTaskAttempts = taskScheduler.resourceOffers(IndexedSeq(
+  WorkerOffer("executor0", "host0", 1)
+)).flatten
+
+// Fail the running task
+val failedTask = firstTaskAttempts.find(_.executorId == 
"executor0").get
+taskScheduler.statusUpdate(failedTask.taskId, TaskState.FAILED, 
ByteBuffer.allocate(0))
+// we explicitly call the handleFailedTask method here to avoid adding 
a sleep in the test suite
+// Reason being - handleFailedTask is run by an executor service and 
there is a momentary delay
+// before it is launched and this fails the assertion check.
+tsm.handleFailedTask(failedTask.taskId, TaskState.FAILED, 
UnknownReason)
+when(tsm.taskSetBlacklistHelperOpt.get.isExecutorBlacklistedForTask(
+  "executor0", failedTask.index)).thenReturn(true)
+
+// make an offer on the blacklisted executor.  We won't schedule 
anything, and set the abort
+// timer to expire if no new executors could be acquired. We kill the 
existing idle blacklisted
+// executor and try to acquire a new one.
+assert(taskScheduler.resourceOffers(IndexedSeq(
+  WorkerOffer("executor0", "host0", 1)
+)).flatten.size === 0)
+assert(!tsm.isZombie)
+
+// Offer a new executor which should be accepted
+assert(taskScheduler.resourceOffers(IndexedSeq(
+  WorkerOffer("executor1", "host0", 1)
+)).flatten.size === 1)
+
+assert(!tsm.isZombie)
+  }
+
+  // This is to test a scenario where we have two taskSets completely 
blacklisted and on acquiring
+  // a new executor we don't want the abort timer for the second taskSet 
to expire and abort the job
+  test("SPARK-22148 abort timer should clear 
unschedulableTaskSetToExpiryTime for all TaskSets") 

[GitHub] spark pull request #22288: [SPARK-22148][SPARK-15815][Scheduler] Acquire new...

2018-10-22 Thread dhruve
Github user dhruve commented on a diff in the pull request:

https://github.com/apache/spark/pull/22288#discussion_r227077071
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -453,6 +504,25 @@ private[spark] class TaskSchedulerImpl(
 return tasks
   }
 
+  private def createUnschedulableTaskSetAbortTimer(
+  taskSet: TaskSetManager,
+  taskIndex: Int,
+  timeout: Long): TimerTask = {
--- End diff --

good catch.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22288: [SPARK-22148][SPARK-15815][Scheduler] Acquire new execut...

2018-10-22 Thread dhruve
Github user dhruve commented on the issue:

https://github.com/apache/spark/pull/22288
  
It applies to both DA and SA. I have updated the description.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22288: [SPARK-22148][SPARK-15815][Scheduler] Acquire new execut...

2018-10-22 Thread dhruve
Github user dhruve commented on the issue:

https://github.com/apache/spark/pull/22288
  
@squito I have made the changes and updated the description. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22288: [SPARK-22148][SPARK-15815][Scheduler] Acquire new execut...

2018-10-19 Thread dhruve
Github user dhruve commented on the issue:

https://github.com/apache/spark/pull/22288
  
test this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22288: [SPARK-22148][SPARK-15815][Scheduler] Acquire new...

2018-10-19 Thread dhruve
Github user dhruve commented on a diff in the pull request:

https://github.com/apache/spark/pull/22288#discussion_r226754849
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -415,9 +420,65 @@ private[spark] class TaskSchedulerImpl(
 launchedAnyTask |= launchedTaskAtCurrentMaxLocality
   } while (launchedTaskAtCurrentMaxLocality)
 }
+
 if (!launchedAnyTask) {
-  taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
+  taskSet.getCompletelyBlacklistedTaskIfAny(hostToExecutors) match 
{
+case taskIndex: Some[Int] => // Returns the taskIndex which 
was unschedulable
+
+  // If the taskSet is unschedulable we try to find an 
existing idle blacklisted
+  // executor. If we cannot find one, we abort immediately. 
Else we kill the idle
+  // executor and kick off an abortTimer which if it doesn't 
schedule a task within the
+  // the timeout will abort the taskSet if we were unable to 
schedule any task from the
+  // taskSet.
+  // Note 1: We keep track of schedulability on a per taskSet 
basis rather than on a per
+  // task basis.
+  // Note 2: The taskSet can still be aborted when there are 
more than one idle
+  // blacklisted executors and dynamic allocation is on. This 
can happen when a killed
+  // idle executor isn't replaced in time by 
ExecutorAllocationManager as it relies on
+  // pending tasks and doesn't kill executors on idle 
timeouts, resulting in the abort
+  // timer to expire and abort the taskSet.
+  executorIdToRunningTaskIds.find(x => !isExecutorBusy(x._1)) 
match {
+case Some (x) =>
+  val executorId = x._1
+  if (!unschedulableTaskSetToExpiryTime.contains(taskSet)) 
{
+blacklistTrackerOpt.foreach(blt => 
blt.killBlacklistedIdleExecutor(executorId))
+
+unschedulableTaskSetToExpiryTime(taskSet) = 
clock.getTimeMillis()
+val timeout = 
conf.get(config.UNSCHEDULABLE_TASKSET_TIMEOUT) * 1000
+logInfo(s"Waiting for $timeout ms for completely "
+  + s"blacklisted task to be schedulable again before 
aborting $taskSet.")
+abortTimer.schedule(new TimerTask() {
+  override def run() {
+if 
(unschedulableTaskSetToExpiryTime.contains(taskSet) &&
+  (unschedulableTaskSetToExpiryTime(taskSet) + 
timeout)
+<= clock.getTimeMillis()
+) {
+  logInfo("Cannot schedule any task because of 
complete blacklisting. " +
+s"Wait time for scheduling expired. Aborting 
$taskSet.")
+  
taskSet.abortSinceCompletelyBlacklisted(taskIndex.get)
+} else {
+  this.cancel()
+}
+  }
+}, timeout)
+  }
+case _ => // Abort Immediately
+  logInfo("Cannot schedule any task because of complete 
blacklisting. No idle" +
+s" executors can be found to kill. Aborting $taskSet." 
)
+  taskSet.abortSinceCompletelyBlacklisted(taskIndex.get)
+  }
+case _ => // Do nothing if no tasks completely blacklisted.
+  }
+} else {
+  // If a task was scheduled, we clear the expiry time for all the 
taskSets. This ensures
+  // that we have got atleast a non blacklisted executor and the 
job can progress. The
+  // abort timer checks this entry to decide if we want to abort 
the taskSet.
--- End diff --

That is correct. It also covers other scenario that @tgravescs originally 
pointed out. 

Lets say if you have multiple taskSets running which are completely 
blacklisted. If you were able to get an executor, you would just clear the 
timer for that specific taskSet. Now due to resource constraint, if you weren't 
able to obtain another executor within the timeout for the other taskSet, you 
would abort the other taskSet when you could actually wait for it to be 
scheduled on the newly obtained executor.

So clearing the timer for all the taskSets ensures that currently we aren't 
in a completely blacklisted state and should try to run to completion. However 
if the taskset itself is flawed, we would eventually fail. This coul

[GitHub] spark issue #22288: [SPARK-22148][SPARK-15815][Scheduler] Acquire new execut...

2018-10-18 Thread dhruve
Github user dhruve commented on the issue:

https://github.com/apache/spark/pull/22288
  
Failure is unrelated.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22288: [SPARK-22148][SPARK-15815][Scheduler] Acquire new execut...

2018-10-18 Thread dhruve
Github user dhruve commented on the issue:

https://github.com/apache/spark/pull/22288
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22288: [SPARK-22148][SPARK-15815][Scheduler] Acquire new...

2018-10-12 Thread dhruve
Github user dhruve commented on a diff in the pull request:

https://github.com/apache/spark/pull/22288#discussion_r224892495
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala ---
@@ -146,21 +146,31 @@ private[scheduler] class BlacklistTracker (
 nextExpiryTime = math.min(execMinExpiry, nodeMinExpiry)
   }
 
+  private def killExecutor(exec: String, msg: String): Unit = {
+allocationClient match {
+  case Some(a) =>
+logInfo(msg)
+a.killExecutors(Seq(exec), adjustTargetNumExecutors = false, 
countFailures = false,
+  force = true)
+  case None =>
+logInfo(s"Not attempting to kill blacklisted executor id $exec " +
+  s"since allocation client is not defined.")
+}
+  }
+
   private def killBlacklistedExecutor(exec: String): Unit = {
 if (conf.get(config.BLACKLIST_KILL_ENABLED)) {
-  allocationClient match {
-case Some(a) =>
-  logInfo(s"Killing blacklisted executor id $exec " +
-s"since ${config.BLACKLIST_KILL_ENABLED.key} is set.")
-  a.killExecutors(Seq(exec), adjustTargetNumExecutors = false, 
countFailures = false,
-force = true)
-case None =>
-  logWarning(s"Not attempting to kill blacklisted executor id 
$exec " +
-s"since allocation client is not defined.")
-  }
+  killExecutor(exec,
+s"Killing blacklisted executor id $exec since 
${config.BLACKLIST_KILL_ENABLED.key} is set.")
 }
   }
 
+  private[scheduler] def killBlacklistedIdleExecutor(exec: String): Unit = 
{
+killExecutor(exec,
--- End diff --

It doesn't make sense to have a flag for it. Because if you have it `off`, 
your job would always fail when you encounter all the executors are blacklisted 
and you can't schedule any task.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22288: [SPARK-22148][SPARK-15815][Scheduler] Acquire new...

2018-10-12 Thread dhruve
Github user dhruve commented on a diff in the pull request:

https://github.com/apache/spark/pull/22288#discussion_r224873268
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala ---
@@ -146,21 +146,31 @@ private[scheduler] class BlacklistTracker (
 nextExpiryTime = math.min(execMinExpiry, nodeMinExpiry)
   }
 
+  private def killExecutor(exec: String, msg: String): Unit = {
+allocationClient match {
+  case Some(a) =>
+logInfo(msg)
+a.killExecutors(Seq(exec), adjustTargetNumExecutors = false, 
countFailures = false,
+  force = true)
+  case None =>
+logInfo(s"Not attempting to kill blacklisted executor id $exec " +
+  s"since allocation client is not defined.")
+}
+  }
+
   private def killBlacklistedExecutor(exec: String): Unit = {
 if (conf.get(config.BLACKLIST_KILL_ENABLED)) {
-  allocationClient match {
-case Some(a) =>
-  logInfo(s"Killing blacklisted executor id $exec " +
-s"since ${config.BLACKLIST_KILL_ENABLED.key} is set.")
-  a.killExecutors(Seq(exec), adjustTargetNumExecutors = false, 
countFailures = false,
-force = true)
-case None =>
-  logWarning(s"Not attempting to kill blacklisted executor id 
$exec " +
-s"since allocation client is not defined.")
-  }
+  killExecutor(exec,
+s"Killing blacklisted executor id $exec since 
${config.BLACKLIST_KILL_ENABLED.key} is set.")
 }
   }
 
+  private[scheduler] def killBlacklistedIdleExecutor(exec: String): Unit = 
{
+killExecutor(exec,
--- End diff --

We want to kill an idle executor which is completely blacklisted without 
having to enable killing for all the blacklisted executors, so we made the 
change otherwise we would have kept it as is.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22288: [SPARK-22148][SPARK-15815][Scheduler] Acquire new...

2018-10-10 Thread dhruve
Github user dhruve commented on a diff in the pull request:

https://github.com/apache/spark/pull/22288#discussion_r224167756
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -415,9 +419,61 @@ private[spark] class TaskSchedulerImpl(
 launchedAnyTask |= launchedTaskAtCurrentMaxLocality
   } while (launchedTaskAtCurrentMaxLocality)
 }
+
 if (!launchedAnyTask) {
-  taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
+  taskSet.getCompletelyBlacklistedTaskIfAny(hostToExecutors) match 
{
+case taskIndex: Some[Int] => // Returns the taskIndex which 
was unschedulable
+
+  // If the taskSet is unschedulable we try to find an 
existing idle blacklisted
+  // executor. If we cannot find one, we abort immediately. 
Else we kill the idle
+  // executor and kick off an abortTimer which if it doesn't 
schedule a task within the
+  // the timeout will abort the taskSet if we were unable to 
schedule any task from the
+  // taskSet.
+  // Note 1: We keep track of schedulability on a per taskSet 
basis rather than on a per
+  // task basis.
+  // Note 2: The taskSet can still be aborted when there are 
more than one idle
+  // blacklisted executors and dynamic allocation is on. This 
can happen when a killed
+  // idle executor isn't replaced in time by 
ExecutorAllocationManager as it relies on
+  // pending tasks and doesn't kill executors on idle 
timeouts, resulting in the abort
+  // timer to expire and abort the taskSet.
+  executorIdToRunningTaskIds.find(x => !isExecutorBusy(x._1)) 
match {
+case Some (x) =>
+  val executorId = x._1
+  if (!unschedulableTaskSetToExpiryTime.contains(taskSet)) 
{
+blacklistTrackerOpt.foreach(blt => 
blt.killBlacklistedIdleExecutor(executorId))
+
+unschedulableTaskSetToExpiryTime(taskSet) = 
clock.getTimeMillis()
+val timeout = 
conf.get(config.UNSCHEDULABLE_TASKSET_TIMEOUT) * 1000
+logInfo(s"Waiting for $timeout ms for completely "
+  + s"blacklisted task to be schedulable again before 
aborting $taskSet.")
+abortTimer.schedule(new TimerTask() {
+  override def run() {
+if 
(unschedulableTaskSetToExpiryTime.contains(taskSet) &&
+  (unschedulableTaskSetToExpiryTime(taskSet) + 
timeout)
+<= clock.getTimeMillis()
--- End diff --

it doesn't fit within the 100 char limit


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22288: [SPARK-22148][SPARK-15815][Scheduler] Acquire new execut...

2018-10-09 Thread dhruve
Github user dhruve commented on the issue:

https://github.com/apache/spark/pull/22288
  
@tgravescs I have addressed the review comments. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...

2018-10-09 Thread dhruve
Github user dhruve commented on a diff in the pull request:

https://github.com/apache/spark/pull/22612#discussion_r223741374
  
--- Diff: 
core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala ---
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.executor
+
+import java.io._
+import java.nio.charset.Charset
+import java.nio.file.{Files, Paths}
+import java.util.Locale
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{SparkEnv, SparkException}
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.util.Utils
+
+private[spark] case class ProcfsBasedSystemsMetrics(
+jvmVmemTotal: Long,
+jvmRSSTotal: Long,
+pythonVmemTotal: Long,
+pythonRSSTotal: Long,
+otherVmemTotal: Long,
+otherRSSTotal: Long)
+
+// Some of the ideas here are taken from the ProcfsBasedProcessTree class 
in hadoop
+// project.
+private[spark] class ProcfsBasedSystems(val procfsDir: String = "/proc/") 
extends Logging {
+  val procfsStatFile = "stat"
+  val testing = sys.env.contains("SPARK_TESTING") || 
sys.props.contains("spark.testing")
+  var pageSize = computePageSize()
+  var isAvailable: Boolean = isProcfsAvailable
+  private val pid = computePid()
+  private val ptree = mutable.Map[ Int, Set[Int]]()
+
+  var allMetrics: ProcfsBasedSystemsMetrics = ProcfsBasedSystemsMetrics(0, 
0, 0, 0, 0, 0)
+  private var latestJVMVmemTotal = 0L
+  private var latestJVMRSSTotal = 0L
+  private var latestPythonVmemTotal = 0L
+  private var latestPythonRSSTotal = 0L
+  private var latestOtherVmemTotal = 0L
+  private var latestOtherRSSTotal = 0L
+
+  computeProcessTree()
+
+  private def isProcfsAvailable: Boolean = {
+if (testing) {
+  return true
+}
+try {
+  if (!Files.exists(Paths.get(procfsDir))) {
+return false
+  }
+}
+catch {
+  case f: FileNotFoundException => return false
+}
+val shouldLogStageExecutorMetrics =
+  SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS)
+val shouldLogStageExecutorProcessTreeMetrics =
+  SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS)
+shouldLogStageExecutorProcessTreeMetrics && 
shouldLogStageExecutorMetrics
+  }
+
+  private def computePid(): Int = {
+if (!isAvailable || testing) {
+  return -1;
+}
+try {
+  // This can be simplified in java9:
+  // 
https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html
+  val cmd = Array("bash", "-c", "echo $PPID")
+  val length = 10
+  val out2 = Utils.executeAndGetOutput(cmd)
+  val pid = Integer.parseInt(out2.split("\n")(0))
+  return pid;
+}
+catch {
+  case e: SparkException => logDebug("IO Exception when trying to 
compute process tree." +
+" As a result reporting of ProcessTree metrics is stopped", e)
+isAvailable = false
+return -1
+}
+  }
+
+  private def computePageSize(): Long = {
+if (testing) {
+  return 0;
+}
+val cmd = Array("getconf", "PAGESIZE")
+val out2 = Utils.executeAndGetOutput(cmd)
+return Integer.parseInt(out2.split("\n")(0))
+  }
+
+  private def computeProcessTree(): Unit = {
+if (!isAvailable || testing) {
+  return
+}
+val queue = mutable.Queue.empty[Int]
+queue += pid
+while( !queue.isEmpty ) {
+  val p = queue.dequeue()
+  val c = getChildPids(p)
+  if(!c.isEmpty) {
+queue ++= c
+ptree += (p -> c.toSet)
 

[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...

2018-10-04 Thread dhruve
Github user dhruve commented on a diff in the pull request:

https://github.com/apache/spark/pull/22612#discussion_r222783753
  
--- Diff: 
core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala ---
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.executor
+
+import java.io._
+import java.nio.charset.Charset
+import java.nio.file.{Files, Paths}
+import java.util.Locale
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.Queue
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.{config, Logging}
+
+private[spark] case class ProcfsBasedSystemsMetrics(
+jvmVmemTotal: Long,
+jvmRSSTotal: Long,
+pythonVmemTotal: Long,
+pythonRSSTotal: Long,
+otherVmemTotal: Long,
+otherRSSTotal: Long)
+
+// Some of the ideas here are taken from the ProcfsBasedProcessTree class 
in hadoop
+// project.
+private[spark] class ProcfsBasedSystems(val procfsDir: String = "/proc/") 
extends Logging {
+  val procfsStatFile = "stat"
+  var pageSize = computePageSize()
+  var isAvailable: Boolean = isProcfsAvailable
+  private val pid = computePid()
+  private val ptree = mutable.Map[ Int, Set[Int]]()
+
+  var allMetrics: ProcfsBasedSystemsMetrics = ProcfsBasedSystemsMetrics(0, 
0, 0, 0, 0, 0)
+  private var latestJVMVmemTotal = 0L
+  private var latestJVMRSSTotal = 0L
+  private var latestPythonVmemTotal = 0L
+  private var latestPythonRSSTotal = 0L
+  private var latestOtherVmemTotal = 0L
+  private var latestOtherRSSTotal = 0L
+
+  computeProcessTree()
+
+  private def isProcfsAvailable: Boolean = {
+val testing = sys.env.contains("SPARK_TESTING") || 
sys.props.contains("spark.testing")
+if (testing) {
+  return true
+}
+try {
+  if (!Files.exists(Paths.get(procfsDir))) {
+return false
+  }
+}
+catch {
+  case f: FileNotFoundException => return false
+}
+val shouldLogStageExecutorMetrics =
+  SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS)
+val shouldLogStageExecutorProcessTreeMetrics =
+  SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS)
+shouldLogStageExecutorProcessTreeMetrics && 
shouldLogStageExecutorMetrics
+  }
+
+  private def computePid(): Int = {
+if (!isAvailable) {
+  return -1;
+}
+try {
+  // This can be simplified in java9:
+  // 
https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html
+  val cmd = Array("bash", "-c", "echo $PPID")
+  val length = 10
+  val out = Array.fill[Byte](length)(0)
+  Runtime.getRuntime.exec(cmd).getInputStream.read(out)
+  val pid = Integer.parseInt(new String(out, "UTF-8").trim)
+  return pid;
+}
+catch {
+  case e: IOException => logDebug("IO Exception when trying to compute 
process tree." +
+" As a result reporting of ProcessTree metrics is stopped", e)
+isAvailable = false
+return -1
+}
+  }
+
+  private def computePageSize(): Long = {
+val testing = sys.env.contains("SPARK_TESTING") || 
sys.props.contains("spark.testing")
+if (testing) {
+  return 0;
+}
+val cmd = Array("getconf", "PAGESIZE")
+val out = Array.fill[Byte](10)(0)
+Runtime.getRuntime.exec(cmd).getInputStream.read(out)
+return Integer.parseInt(new String(out, "UTF-8").trim)
+  }
+
+  private def computeProcessTree(): Unit = {
+if (!isAvailable) {
+  return
+}
+val queue = mutable.Queue.empty[Int]
+queue += pi

[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...

2018-10-04 Thread dhruve
Github user dhruve commented on a diff in the pull request:

https://github.com/apache/spark/pull/22612#discussion_r222739514
  
--- Diff: 
core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala ---
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.executor
+
+import java.io._
+import java.nio.charset.Charset
+import java.nio.file.{Files, Paths}
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.Queue
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.{config, Logging}
+
+private[spark] case class ProcfsBasedSystemsMetrics(
+jvmVmemTotal: Long,
+jvmRSSTotal: Long,
+pythonVmemTotal: Long,
+pythonRSSTotal: Long,
+otherVmemTotal: Long,
+otherRSSTotal: Long)
+
+// Some of the ideas here are taken from the ProcfsBasedProcessTree class 
in hadoop
+// project.
+private[spark] class ProcfsBasedSystems(procfsDir: String = "/proc/") 
extends Logging {
+  val procfsStatFile = "stat"
+  var pageSize: Long = computePageSize()
+  var isAvailable: Boolean = isProcfsAvailable
+  private val pid: Int = computePid()
+  private val ptree: scala.collection.mutable.Map[ Int, Set[Int]] =
+scala.collection.mutable.Map[ Int, Set[Int]]()
+
+  var allMetrics: ProcfsBasedSystemsMetrics = ProcfsBasedSystemsMetrics(0, 
0, 0, 0, 0, 0)
+  private var latestJVMVmemTotal: Long = 0
+  private var latestJVMRSSTotal: Long = 0
+  private var latestPythonVmemTotal: Long = 0
+  private var latestPythonRSSTotal: Long = 0
+  private var latestOtherVmemTotal: Long = 0
+  private var latestOtherRSSTotal: Long = 0
+
+  computeProcessTree()
+
+  private def isProcfsAvailable: Boolean = {
+val testing = sys.env.contains("SPARK_TESTING") || 
sys.props.contains("spark.testing")
+if (testing) {
+  return true
+}
+try {
+  if (!Files.exists(Paths.get(procfsDir))) {
+return false
+  }
+}
+catch {
+  case f: FileNotFoundException => return false
+}
+val shouldLogStageExecutorMetrics =
+  SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS)
+val shouldLogStageExecutorProcessTreeMetrics =
+  SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS)
+shouldLogStageExecutorProcessTreeMetrics && 
shouldLogStageExecutorMetrics
+  }
+
+  private def computePid(): Int = {
+if (!isAvailable) {
+  return -1;
+}
+try {
+  // This can be simplified in java9:
+  // 
https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html
+  val cmd = Array("bash", "-c", "echo $PPID")
+  val length = 10
+  val out: Array[Byte] = Array.fill[Byte](length)(0)
+  Runtime.getRuntime.exec(cmd).getInputStream.read(out)
--- End diff --

Agree with @mccheah  here. We should use the Utils class. Infact I was 
thinking wouldn't it be better to move the logic to get the childPIDs to the 
utils class itself.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...

2018-10-04 Thread dhruve
Github user dhruve commented on a diff in the pull request:

https://github.com/apache/spark/pull/22612#discussion_r222784195
  
--- Diff: 
core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala ---
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.executor
+
+import java.io._
+import java.nio.charset.Charset
+import java.nio.file.{Files, Paths}
+import java.util.Locale
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.Queue
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.{config, Logging}
+
+private[spark] case class ProcfsBasedSystemsMetrics(
+jvmVmemTotal: Long,
+jvmRSSTotal: Long,
+pythonVmemTotal: Long,
+pythonRSSTotal: Long,
+otherVmemTotal: Long,
+otherRSSTotal: Long)
+
+// Some of the ideas here are taken from the ProcfsBasedProcessTree class 
in hadoop
+// project.
+private[spark] class ProcfsBasedSystems(val procfsDir: String = "/proc/") 
extends Logging {
+  val procfsStatFile = "stat"
+  var pageSize = computePageSize()
+  var isAvailable: Boolean = isProcfsAvailable
+  private val pid = computePid()
+  private val ptree = mutable.Map[ Int, Set[Int]]()
+
+  var allMetrics: ProcfsBasedSystemsMetrics = ProcfsBasedSystemsMetrics(0, 
0, 0, 0, 0, 0)
+  private var latestJVMVmemTotal = 0L
+  private var latestJVMRSSTotal = 0L
+  private var latestPythonVmemTotal = 0L
+  private var latestPythonRSSTotal = 0L
+  private var latestOtherVmemTotal = 0L
+  private var latestOtherRSSTotal = 0L
+
+  computeProcessTree()
+
+  private def isProcfsAvailable: Boolean = {
+val testing = sys.env.contains("SPARK_TESTING") || 
sys.props.contains("spark.testing")
+if (testing) {
+  return true
+}
+try {
+  if (!Files.exists(Paths.get(procfsDir))) {
+return false
+  }
+}
+catch {
+  case f: FileNotFoundException => return false
+}
+val shouldLogStageExecutorMetrics =
+  SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS)
+val shouldLogStageExecutorProcessTreeMetrics =
+  SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS)
+shouldLogStageExecutorProcessTreeMetrics && 
shouldLogStageExecutorMetrics
+  }
+
+  private def computePid(): Int = {
+if (!isAvailable) {
+  return -1;
+}
+try {
+  // This can be simplified in java9:
+  // 
https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html
+  val cmd = Array("bash", "-c", "echo $PPID")
+  val length = 10
+  val out = Array.fill[Byte](length)(0)
+  Runtime.getRuntime.exec(cmd).getInputStream.read(out)
+  val pid = Integer.parseInt(new String(out, "UTF-8").trim)
+  return pid;
+}
+catch {
+  case e: IOException => logDebug("IO Exception when trying to compute 
process tree." +
+" As a result reporting of ProcessTree metrics is stopped", e)
+isAvailable = false
+return -1
+}
+  }
+
+  private def computePageSize(): Long = {
+val testing = sys.env.contains("SPARK_TESTING") || 
sys.props.contains("spark.testing")
+if (testing) {
+  return 0;
+}
+val cmd = Array("getconf", "PAGESIZE")
+val out = Array.fill[Byte](10)(0)
+Runtime.getRuntime.exec(cmd).getInputStream.read(out)
+return Integer.parseInt(new String(out, "UTF-8").trim)
+  }
+
+  private def computeProcessTree(): Unit = {
+if (!isAvailable) {
+  return
+}
+val queue = mutable.Queue.empty[Int]
+queue += pi

[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...

2018-10-04 Thread dhruve
Github user dhruve commented on a diff in the pull request:

https://github.com/apache/spark/pull/22612#discussion_r222785629
  
--- Diff: 
core/src/main/scala/org/apache/spark/metrics/ExecutorMetricType.scala ---
@@ -59,6 +60,43 @@ case object JVMOffHeapMemory extends ExecutorMetricType {
   }
 }
 
+case object ProcessTreeJVMRSSMemory extends ExecutorMetricType {
+  override private[spark] def getMetricValue(memoryManager: 
MemoryManager): Long = {
+ExecutorMetricType.pTreeInfo.updateAllMetrics()
--- End diff --

Should make ProcessTreeMemory extends ExecutorMetricType and individual 
metrics can be returned from it. This also makes the assumption of calculating 
the metrics only in the ProcessTreeJVMRSSMemory and subsequent calls using it. 
We shouldn't depend on the order here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...

2018-10-04 Thread dhruve
Github user dhruve commented on a diff in the pull request:

https://github.com/apache/spark/pull/22612#discussion_r222783588
  
--- Diff: 
core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala ---
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.executor
+
+import java.io._
+import java.nio.charset.Charset
+import java.nio.file.{Files, Paths}
+import java.util.Locale
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.Queue
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.{config, Logging}
+
+private[spark] case class ProcfsBasedSystemsMetrics(
+jvmVmemTotal: Long,
+jvmRSSTotal: Long,
+pythonVmemTotal: Long,
+pythonRSSTotal: Long,
+otherVmemTotal: Long,
+otherRSSTotal: Long)
+
+// Some of the ideas here are taken from the ProcfsBasedProcessTree class 
in hadoop
+// project.
+private[spark] class ProcfsBasedSystems(val procfsDir: String = "/proc/") 
extends Logging {
+  val procfsStatFile = "stat"
+  var pageSize = computePageSize()
+  var isAvailable: Boolean = isProcfsAvailable
+  private val pid = computePid()
+  private val ptree = mutable.Map[ Int, Set[Int]]()
+
+  var allMetrics: ProcfsBasedSystemsMetrics = ProcfsBasedSystemsMetrics(0, 
0, 0, 0, 0, 0)
+  private var latestJVMVmemTotal = 0L
+  private var latestJVMRSSTotal = 0L
+  private var latestPythonVmemTotal = 0L
+  private var latestPythonRSSTotal = 0L
+  private var latestOtherVmemTotal = 0L
+  private var latestOtherRSSTotal = 0L
+
+  computeProcessTree()
+
+  private def isProcfsAvailable: Boolean = {
+val testing = sys.env.contains("SPARK_TESTING") || 
sys.props.contains("spark.testing")
+if (testing) {
+  return true
+}
+try {
+  if (!Files.exists(Paths.get(procfsDir))) {
+return false
+  }
+}
+catch {
+  case f: FileNotFoundException => return false
+}
+val shouldLogStageExecutorMetrics =
+  SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS)
+val shouldLogStageExecutorProcessTreeMetrics =
+  SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS)
+shouldLogStageExecutorProcessTreeMetrics && 
shouldLogStageExecutorMetrics
+  }
+
+  private def computePid(): Int = {
+if (!isAvailable) {
+  return -1;
+}
+try {
+  // This can be simplified in java9:
+  // 
https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html
+  val cmd = Array("bash", "-c", "echo $PPID")
+  val length = 10
+  val out = Array.fill[Byte](length)(0)
+  Runtime.getRuntime.exec(cmd).getInputStream.read(out)
+  val pid = Integer.parseInt(new String(out, "UTF-8").trim)
+  return pid;
+}
+catch {
+  case e: IOException => logDebug("IO Exception when trying to compute 
process tree." +
+" As a result reporting of ProcessTree metrics is stopped", e)
+isAvailable = false
+return -1
+}
+  }
+
+  private def computePageSize(): Long = {
+val testing = sys.env.contains("SPARK_TESTING") || 
sys.props.contains("spark.testing")
+if (testing) {
+  return 0;
+}
+val cmd = Array("getconf", "PAGESIZE")
+val out = Array.fill[Byte](10)(0)
+Runtime.getRuntime.exec(cmd).getInputStream.read(out)
+return Integer.parseInt(new String(out, "UTF-8").trim)
+  }
+
+  private def computeProcessTree(): Unit = {
+if (!isAvailable) {
+  return
+}
+val queue = mutable.Queue.empty[Int]
+queue += pi

[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...

2018-10-04 Thread dhruve
Github user dhruve commented on a diff in the pull request:

https://github.com/apache/spark/pull/22612#discussion_r222781707
  
--- Diff: 
core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala ---
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.executor
+
+import java.io._
+import java.nio.charset.Charset
+import java.nio.file.{Files, Paths}
+import java.util.Locale
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.Queue
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.{config, Logging}
+
+private[spark] case class ProcfsBasedSystemsMetrics(
+jvmVmemTotal: Long,
+jvmRSSTotal: Long,
+pythonVmemTotal: Long,
+pythonRSSTotal: Long,
+otherVmemTotal: Long,
+otherRSSTotal: Long)
+
+// Some of the ideas here are taken from the ProcfsBasedProcessTree class 
in hadoop
+// project.
+private[spark] class ProcfsBasedSystems(val procfsDir: String = "/proc/") 
extends Logging {
+  val procfsStatFile = "stat"
+  var pageSize = computePageSize()
+  var isAvailable: Boolean = isProcfsAvailable
+  private val pid = computePid()
+  private val ptree = mutable.Map[ Int, Set[Int]]()
+
+  var allMetrics: ProcfsBasedSystemsMetrics = ProcfsBasedSystemsMetrics(0, 
0, 0, 0, 0, 0)
+  private var latestJVMVmemTotal = 0L
+  private var latestJVMRSSTotal = 0L
+  private var latestPythonVmemTotal = 0L
+  private var latestPythonRSSTotal = 0L
+  private var latestOtherVmemTotal = 0L
+  private var latestOtherRSSTotal = 0L
+
+  computeProcessTree()
+
+  private def isProcfsAvailable: Boolean = {
+val testing = sys.env.contains("SPARK_TESTING") || 
sys.props.contains("spark.testing")
+if (testing) {
+  return true
+}
+try {
+  if (!Files.exists(Paths.get(procfsDir))) {
+return false
+  }
+}
+catch {
+  case f: FileNotFoundException => return false
+}
+val shouldLogStageExecutorMetrics =
+  SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS)
+val shouldLogStageExecutorProcessTreeMetrics =
+  SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS)
+shouldLogStageExecutorProcessTreeMetrics && 
shouldLogStageExecutorMetrics
+  }
+
+  private def computePid(): Int = {
+if (!isAvailable) {
+  return -1;
+}
+try {
+  // This can be simplified in java9:
+  // 
https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html
+  val cmd = Array("bash", "-c", "echo $PPID")
+  val length = 10
+  val out = Array.fill[Byte](length)(0)
+  Runtime.getRuntime.exec(cmd).getInputStream.read(out)
+  val pid = Integer.parseInt(new String(out, "UTF-8").trim)
+  return pid;
+}
+catch {
+  case e: IOException => logDebug("IO Exception when trying to compute 
process tree." +
+" As a result reporting of ProcessTree metrics is stopped", e)
+isAvailable = false
+return -1
+}
+  }
+
+  private def computePageSize(): Long = {
+val testing = sys.env.contains("SPARK_TESTING") || 
sys.props.contains("spark.testing")
+if (testing) {
+  return 0;
+}
+val cmd = Array("getconf", "PAGESIZE")
+val out = Array.fill[Byte](10)(0)
+Runtime.getRuntime.exec(cmd).getInputStream.read(out)
+return Integer.parseInt(new String(out, "UTF-8").trim)
+  }
+
+  private def computeProcessTree(): Unit = {
+if (!isAvailable) {
+  return
+}
+val queue = mutable.Queue.empty[Int]
+queue += pi

[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...

2018-10-04 Thread dhruve
Github user dhruve commented on a diff in the pull request:

https://github.com/apache/spark/pull/22612#discussion_r222739822
  
--- Diff: 
core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala ---
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.executor
+
+import java.io._
+import java.nio.charset.Charset
+import java.nio.file.{Files, Paths}
+import java.util.Locale
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.Queue
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.{config, Logging}
+
+private[spark] case class ProcfsBasedSystemsMetrics(
+jvmVmemTotal: Long,
+jvmRSSTotal: Long,
+pythonVmemTotal: Long,
+pythonRSSTotal: Long,
+otherVmemTotal: Long,
+otherRSSTotal: Long)
+
+// Some of the ideas here are taken from the ProcfsBasedProcessTree class 
in hadoop
+// project.
+private[spark] class ProcfsBasedSystems(val procfsDir: String = "/proc/") 
extends Logging {
+  val procfsStatFile = "stat"
+  var pageSize = computePageSize()
+  var isAvailable: Boolean = isProcfsAvailable
+  private val pid = computePid()
+  private val ptree = mutable.Map[ Int, Set[Int]]()
+
+  var allMetrics: ProcfsBasedSystemsMetrics = ProcfsBasedSystemsMetrics(0, 
0, 0, 0, 0, 0)
+  private var latestJVMVmemTotal = 0L
+  private var latestJVMRSSTotal = 0L
+  private var latestPythonVmemTotal = 0L
+  private var latestPythonRSSTotal = 0L
+  private var latestOtherVmemTotal = 0L
+  private var latestOtherRSSTotal = 0L
+
+  computeProcessTree()
+
+  private def isProcfsAvailable: Boolean = {
+val testing = sys.env.contains("SPARK_TESTING") || 
sys.props.contains("spark.testing")
+if (testing) {
+  return true
+}
+try {
+  if (!Files.exists(Paths.get(procfsDir))) {
+return false
+  }
+}
+catch {
+  case f: FileNotFoundException => return false
+}
+val shouldLogStageExecutorMetrics =
+  SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS)
+val shouldLogStageExecutorProcessTreeMetrics =
+  SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS)
+shouldLogStageExecutorProcessTreeMetrics && 
shouldLogStageExecutorMetrics
+  }
+
+  private def computePid(): Int = {
+if (!isAvailable) {
+  return -1;
+}
+try {
+  // This can be simplified in java9:
+  // 
https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html
+  val cmd = Array("bash", "-c", "echo $PPID")
+  val length = 10
+  val out = Array.fill[Byte](length)(0)
+  Runtime.getRuntime.exec(cmd).getInputStream.read(out)
+  val pid = Integer.parseInt(new String(out, "UTF-8").trim)
+  return pid;
+}
+catch {
+  case e: IOException => logDebug("IO Exception when trying to compute 
process tree." +
+" As a result reporting of ProcessTree metrics is stopped", e)
+isAvailable = false
+return -1
+}
+  }
+
+  private def computePageSize(): Long = {
+val testing = sys.env.contains("SPARK_TESTING") || 
sys.props.contains("spark.testing")
+if (testing) {
+  return 0;
+}
+val cmd = Array("getconf", "PAGESIZE")
+val out = Array.fill[Byte](10)(0)
+Runtime.getRuntime.exec(cmd).getInputStream.read(out)
+return Integer.parseInt(new String(out, "UTF-8").trim)
+  }
+
+  private def computeProcessTree(): Unit = {
+if (!isAvailable) {
+  return
+}
+val queue = mutable.Queue.empty[Int]
+queue += pi

[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...

2018-10-04 Thread dhruve
Github user dhruve commented on a diff in the pull request:

https://github.com/apache/spark/pull/22612#discussion_r222779045
  
--- Diff: 
core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala ---
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.executor
+
+import java.io._
+import java.nio.charset.Charset
+import java.nio.file.{Files, Paths}
+import java.util.Locale
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.Queue
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.{config, Logging}
+
+private[spark] case class ProcfsBasedSystemsMetrics(
+jvmVmemTotal: Long,
+jvmRSSTotal: Long,
+pythonVmemTotal: Long,
+pythonRSSTotal: Long,
+otherVmemTotal: Long,
+otherRSSTotal: Long)
+
+// Some of the ideas here are taken from the ProcfsBasedProcessTree class 
in hadoop
+// project.
+private[spark] class ProcfsBasedSystems(val procfsDir: String = "/proc/") 
extends Logging {
+  val procfsStatFile = "stat"
+  var pageSize = computePageSize()
+  var isAvailable: Boolean = isProcfsAvailable
+  private val pid = computePid()
+  private val ptree = mutable.Map[ Int, Set[Int]]()
+
+  var allMetrics: ProcfsBasedSystemsMetrics = ProcfsBasedSystemsMetrics(0, 
0, 0, 0, 0, 0)
+  private var latestJVMVmemTotal = 0L
--- End diff --

These don't seem to be needed as far as we have ProcfsBasedSystemsMetrics. 
As others have pointed out and it makes more sense to have a Map of named 
metrics.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22288: [SPARK-22148][SPARK-15815][Scheduler] Acquire new execut...

2018-09-28 Thread dhruve
Github user dhruve commented on the issue:

https://github.com/apache/spark/pull/22288
  
the failures seem to be unrelated. I wasn't able to reproduce them.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22288: [SPARK-22148][SPARK-15815][Scheduler] Acquire new execut...

2018-09-28 Thread dhruve
Github user dhruve commented on the issue:

https://github.com/apache/spark/pull/22288
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22288: [SPARK-22148][SPARK-15815][Scheduler] Acquire new...

2018-09-11 Thread dhruve
Github user dhruve commented on a diff in the pull request:

https://github.com/apache/spark/pull/22288#discussion_r216795021
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -414,9 +425,48 @@ private[spark] class TaskSchedulerImpl(
 launchedAnyTask |= launchedTaskAtCurrentMaxLocality
   } while (launchedTaskAtCurrentMaxLocality)
 }
+
 if (!launchedAnyTask) {
-  taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
-}
+  taskSet.getCompletelyBlacklistedTaskIfAny(hostToExecutors) match 
{
+case taskIndex: Some[Int] => // Returns the taskIndex which 
was unschedulable
+
+  // If the taskSet is unschedulable we kill an existing 
blacklisted executor/s and
+  // kick off an abortTimer which after waiting will abort the 
taskSet if we were
+  // unable to schedule any task from the taskSet.
+  // Note: We keep a track of schedulability on a per taskSet 
basis rather than on a
+  // per task basis.
+  val executor = 
hostToExecutors.valuesIterator.next().iterator.next()
--- End diff --

That's a nice suggestion. 

There was a case where you could have a few executors running, let's say 
just 3 of them and all are blacklisted but have some tasks running on them. To 
satisfy this,  I had started modifying this to take down an executor with the 
least no. of tasks running on them. I'll check some more on this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22288: [SPARK-22148][Scheduler] Acquire new executors to...

2018-09-11 Thread dhruve
Github user dhruve commented on a diff in the pull request:

https://github.com/apache/spark/pull/22288#discussion_r216788096
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -414,9 +425,48 @@ private[spark] class TaskSchedulerImpl(
 launchedAnyTask |= launchedTaskAtCurrentMaxLocality
   } while (launchedTaskAtCurrentMaxLocality)
 }
+
 if (!launchedAnyTask) {
-  taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
-}
+  taskSet.getCompletelyBlacklistedTaskIfAny(hostToExecutors) match 
{
+case taskIndex: Some[Int] => // Returns the taskIndex which 
was unschedulable
+
+  // If the taskSet is unschedulable we kill an existing 
blacklisted executor/s and
+  // kick off an abortTimer which after waiting will abort the 
taskSet if we were
+  // unable to schedule any task from the taskSet.
+  // Note: We keep a track of schedulability on a per taskSet 
basis rather than on a
+  // per task basis.
+  val executor = 
hostToExecutors.valuesIterator.next().iterator.next()
+  logDebug("Killing executor because of task unschedulability: 
" + executor)
--- End diff --

noted.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22288: [SPARK-22148][Scheduler] Acquire new executors to...

2018-09-11 Thread dhruve
Github user dhruve commented on a diff in the pull request:

https://github.com/apache/spark/pull/22288#discussion_r216788079
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -414,9 +425,48 @@ private[spark] class TaskSchedulerImpl(
 launchedAnyTask |= launchedTaskAtCurrentMaxLocality
   } while (launchedTaskAtCurrentMaxLocality)
 }
+
 if (!launchedAnyTask) {
-  taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
-}
+  taskSet.getCompletelyBlacklistedTaskIfAny(hostToExecutors) match 
{
+case taskIndex: Some[Int] => // Returns the taskIndex which 
was unschedulable
+
+  // If the taskSet is unschedulable we kill an existing 
blacklisted executor/s and
+  // kick off an abortTimer which after waiting will abort the 
taskSet if we were
+  // unable to schedule any task from the taskSet.
+  // Note: We keep a track of schedulability on a per taskSet 
basis rather than on a
+  // per task basis.
+  val executor = 
hostToExecutors.valuesIterator.next().iterator.next()
+  logDebug("Killing executor because of task unschedulability: 
" + executor)
+  blacklistTrackerOpt.foreach(blt => 
blt.killBlacklistedExecutor(executor))
+
+  if (!unschedulableTaskSetToExpiryTime.contains(taskSet)) {
+  unschedulableTaskSetToExpiryTime(taskSet) = 
clock.getTimeMillis()
--- End diff --

noted.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22288: [SPARK-22148][Scheduler] Acquire new executors to...

2018-09-11 Thread dhruve
Github user dhruve commented on a diff in the pull request:

https://github.com/apache/spark/pull/22288#discussion_r216788016
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -623,8 +623,9 @@ private[spark] class TaskSetManager(
*
* It is possible that this taskset has become impossible to schedule 
*anywhere* due to the
* blacklist.  The most common scenario would be if there are fewer 
executors than
-   * spark.task.maxFailures. We need to detect this so we can fail the 
task set, otherwise the job
-   * will hang.
+   * spark.task.maxFailures. We need to detect this so we can avoid the 
job from being hung.
+   * If dynamic allocation is enabled we try to acquire new executor/s by 
killing the existing one.
+   * In case of static allocation we abort the taskSet immediately to fail 
the job.
--- End diff --

Yes. The change of removing a single executor takes care of static 
allocation as well. I will update the comments. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22288: [SPARK-22148][Scheduler] Acquire new executors to...

2018-09-04 Thread dhruve
Github user dhruve commented on a diff in the pull request:

https://github.com/apache/spark/pull/22288#discussion_r215036162
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -414,9 +425,54 @@ private[spark] class TaskSchedulerImpl(
 launchedAnyTask |= launchedTaskAtCurrentMaxLocality
   } while (launchedTaskAtCurrentMaxLocality)
 }
+
 if (!launchedAnyTask) {
-  taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
-}
+  taskSet.getCompletelyBlacklistedTaskIfAny(hostToExecutors) match 
{
+case taskIndex: Some[Int] => // Returns the taskIndex which 
was unschedulable
+  if (conf.getBoolean("spark.dynamicAllocation.enabled", 
false)) {
+// If the taskSet is unschedulable we kill the existing 
blacklisted executor/s and
+// kick off an abortTimer which after waiting will abort 
the taskSet if we were
+// unable to get new executors and couldn't schedule a 
task from the taskSet.
+// Note: We keep a track of schedulability on a per 
taskSet basis rather than on a
+// per task basis.
+if (!unschedulableTaskSetToExpiryTime.contains(taskSet)) {
+  hostToExecutors.valuesIterator.foreach(executors => 
executors.foreach({
+executor =>
+  logDebug("Killing executor because of task 
unschedulability: " + executor)
+  blacklistTrackerOpt.foreach(blt => 
blt.killBlacklistedExecutor(executor))
--- End diff --

- To refresh executors, you need to enable 
`spark.blacklist.killBlacklistedExecutors`.
- I was thinking about it, killing all the executors is a little too harsh. 
Killing only a single executor  would help mitigate this, although this would 
also lead to failing the running tasks on that executor.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22288: [SPARK-22148][Scheduler] Acquire new executors to avoid ...

2018-08-31 Thread dhruve
Github user dhruve commented on the issue:

https://github.com/apache/spark/pull/22288
  
@squito @tgravescs  Can you review this PR? Thanks.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22288: [SPARK-22148] Acquire new executors to avoid hang...

2018-08-30 Thread dhruve
GitHub user dhruve opened a pull request:

https://github.com/apache/spark/pull/22288

[SPARK-22148] Acquire new executors to avoid hang because of blacklisting

## What changes were proposed in this pull request?
Every time a task is unschedulable because of the condition where no. of 
task failures < no. of executors available, we currently abort the taskSet - 
failing the job. This change tries to acquire new executors if 
dynamicAllocation is turned on so that we can complete the job successfully.

## How was this patch tested?

I performed some manual tests to check and validate the behavior. 

```scala
val rdd = sc.parallelize(Seq(1 to 10), 3)

import org.apache.spark.TaskContext

val mapped = rdd.mapPartitionsWithIndex ( (index, iterator) => { if (index 
== 2) { Thread.sleep(30 * 1000); val attemptNum = 
TaskContext.get.attemptNumber; if (attemptNum < 3) throw new Exception("Fail 
for blacklisting")};  iterator.toList.map (x => x + " -> " + index).iterator } )

mapped.collect
```

Note: I am putting up this PR as initial draft to review the approach. 

Todo List:
- Add unit tests
- Agree upon the conf name & value and update the docs 

We can build on this approach further by:
- Taking into account static allocation
- Querying the RM to figure out if its a small cluster, then try to wait 
some more or abort immediately.
- Try to distinguish between waiting for time while you acquire an executor 
and time for being unable to schedule a task.

Open to suggestions.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/dhruve/spark bug/SPARK-22148

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22288.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22288


commit 5253b3134119b2a28cdaa1406d7bafb55f62cbc1
Author: Dhruve Ashar 
Date:   2018-08-30T18:08:58Z

[SPARK-22148] Acquire new executors to avoid hang because of blacklisting




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22121: [SPARK-25133][SQL][Doc]Avro data source guide

2018-08-22 Thread dhruve
Github user dhruve commented on a diff in the pull request:

https://github.com/apache/spark/pull/22121#discussion_r212075677
  
--- Diff: docs/avro-data-source-guide.md ---
@@ -0,0 +1,377 @@
+---
+layout: global
+title: Apache Avro Data Source Guide
+---
+
+* This will become a table of contents (this text will be scraped).
+{:toc}
+
+Since Spark 2.4 release, [Spark 
SQL](https://spark.apache.org/docs/latest/sql-programming-guide.html) provides 
built-in support for reading and writing Apache Avro data.
+
+## Deploying
+The `spark-avro` module is external and not included in `spark-submit` or 
`spark-shell` by default.
+
+As with any Spark applications, `spark-submit` is used to launch your 
application. `spark-avro_{{site.SCALA_BINARY_VERSION}}`
+and its dependencies can be directly added to `spark-submit` using 
`--packages`, such as,
+
+./bin/spark-submit --packages 
org.apache.spark:spark-avro_{{site.SCALA_BINARY_VERSION}}:{{site.SPARK_VERSION_SHORT}}
 ...
+
+For experimenting on `spark-shell`, you can also use `--packages` to add 
`org.apache.spark:spark-avro_{{site.SCALA_BINARY_VERSION}}` and its 
dependencies directly,
+
+./bin/spark-shell --packages 
org.apache.spark:spark-avro_{{site.SCALA_BINARY_VERSION}}:{{site.SPARK_VERSION_SHORT}}
 ...
+
+See [Application Submission Guide](submitting-applications.html) for more 
details about submitting applications with external dependencies.
+
+## Load and Save Functions
+
+Since `spark-avro` module is external, there is no `.avro` API in 
+`DataFrameReader` or `DataFrameWriter`.
+
+To load/save data in Avro format, you need to specify the data source 
option `format` as `avro`(or `org.apache.spark.sql.avro`).
+
+
+{% highlight scala %}
+
+val usersDF = 
spark.read.format("avro").load("examples/src/main/resources/users.avro")
+usersDF.select("name", 
"favorite_color").write.format("avro").save("namesAndFavColors.avro")
+
+{% endhighlight %}
+
+
+{% highlight java %}
+
+Dataset usersDF = 
spark.read().format("avro").load("examples/src/main/resources/users.avro");
+usersDF.select("name", 
"favorite_color").write().format("avro").save("namesAndFavColors.avro");
+
+{% endhighlight %}
+
+
+{% highlight python %}
+
+df = 
spark.read.format("avro").load("examples/src/main/resources/users.avro")
+df.select("name", 
"favorite_color").write.format("avro").save("namesAndFavColors.avro")
+
+{% endhighlight %}
+
+
+{% highlight r %}
+
+df <- read.df("examples/src/main/resources/users.avro", "avro")
+write.df(select(df, "name", "favorite_color"), "namesAndFavColors.avro", 
"avro")
+
+{% endhighlight %}
+
+
+
+## to_avro() and from_avro()
+Spark SQL provides function `to_avro` to encode a struct as a string and 
`from_avro()` to retrieve the struct as a complex type.
+
+Using Avro record as columns are useful when reading from or writing to a 
streaming source like Kafka. Each 
+Kafka key-value record will be augmented with some metadata, such as the 
ingestion timestamp into Kafka, the offset in Kafka, etc.
+* If the "value" field that contains your data is in Avro, you could use 
`from_avro()` to extract your data, enrich it, clean it, and then push it 
downstream to Kafka again or write it out to a file.
+* `to_avro()` can be used to turn structs into Avro records. This method 
is particularly useful when you would like to re-encode multiple columns into a 
single one when writing data out to Kafka.
+
+Both methods are presently only available in Scala and Java.
+
+
+
+{% highlight scala %}
+import org.apache.spark.sql.avro._
+
+// `from_avro` requires Avro schema in JSON string format.
+val jsonFormatSchema = new 
String(Files.readAllBytes(Paths.get("./examples/src/main/resources/user.avsc")))
+
+val df = spark
+  .readStream
+  .format("kafka")
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+  .option("subscribe", "topic1")
+  .load()
+
+// 1. Decode the Avro data into a struct;
+// 2. Filter by column `favorite_color`;
+// 3. Encode the column `name` in Avro format.
+val output = df
+  .select(from_avro('value, jsonFormatSchema) as 'user)
+  .where("user.favorite_color == \"red\"")
+  .select(to_avro($"user.name") as 'value)
+
+val ds = output
+  .writeStream
+  .format("kafka")
+

[GitHub] spark pull request #22121: [SPARK-25133][SQL][Doc]Avro data source guide

2018-08-22 Thread dhruve
Github user dhruve commented on a diff in the pull request:

https://github.com/apache/spark/pull/22121#discussion_r212032223
  
--- Diff: docs/avro-data-source-guide.md ---
@@ -0,0 +1,377 @@
+---
+layout: global
+title: Apache Avro Data Source Guide
+---
+
+* This will become a table of contents (this text will be scraped).
+{:toc}
+
+Since Spark 2.4 release, [Spark 
SQL](https://spark.apache.org/docs/latest/sql-programming-guide.html) provides 
built-in support for reading and writing Apache Avro data.
+
+## Deploying
+The `spark-avro` module is external and not included in `spark-submit` or 
`spark-shell` by default.
+
+As with any Spark applications, `spark-submit` is used to launch your 
application. `spark-avro_{{site.SCALA_BINARY_VERSION}}`
+and its dependencies can be directly added to `spark-submit` using 
`--packages`, such as,
+
+./bin/spark-submit --packages 
org.apache.spark:spark-avro_{{site.SCALA_BINARY_VERSION}}:{{site.SPARK_VERSION_SHORT}}
 ...
+
+For experimenting on `spark-shell`, you can also use `--packages` to add 
`org.apache.spark:spark-avro_{{site.SCALA_BINARY_VERSION}}` and its 
dependencies directly,
+
+./bin/spark-shell --packages 
org.apache.spark:spark-avro_{{site.SCALA_BINARY_VERSION}}:{{site.SPARK_VERSION_SHORT}}
 ...
+
+See [Application Submission Guide](submitting-applications.html) for more 
details about submitting applications with external dependencies.
+
+## Load and Save Functions
+
+Since `spark-avro` module is external, there is no `.avro` API in 
+`DataFrameReader` or `DataFrameWriter`.
+
+To load/save data in Avro format, you need to specify the data source 
option `format` as `avro`(or `org.apache.spark.sql.avro`).
+
+
+{% highlight scala %}
+
+val usersDF = 
spark.read.format("avro").load("examples/src/main/resources/users.avro")
+usersDF.select("name", 
"favorite_color").write.format("avro").save("namesAndFavColors.avro")
+
+{% endhighlight %}
+
+
+{% highlight java %}
+
+Dataset usersDF = 
spark.read().format("avro").load("examples/src/main/resources/users.avro");
+usersDF.select("name", 
"favorite_color").write().format("avro").save("namesAndFavColors.avro");
+
+{% endhighlight %}
+
+
+{% highlight python %}
+
+df = 
spark.read.format("avro").load("examples/src/main/resources/users.avro")
+df.select("name", 
"favorite_color").write.format("avro").save("namesAndFavColors.avro")
+
+{% endhighlight %}
+
+
+{% highlight r %}
+
+df <- read.df("examples/src/main/resources/users.avro", "avro")
+write.df(select(df, "name", "favorite_color"), "namesAndFavColors.avro", 
"avro")
+
+{% endhighlight %}
+
+
+
+## to_avro() and from_avro()
+Spark SQL provides function `to_avro` to encode a struct as a string and 
`from_avro()` to retrieve the struct as a complex type.
+
+Using Avro record as columns are useful when reading from or writing to a 
streaming source like Kafka. Each 
+Kafka key-value record will be augmented with some metadata, such as the 
ingestion timestamp into Kafka, the offset in Kafka, etc.
+* If the "value" field that contains your data is in Avro, you could use 
`from_avro()` to extract your data, enrich it, clean it, and then push it 
downstream to Kafka again or write it out to a file.
+* `to_avro()` can be used to turn structs into Avro records. This method 
is particularly useful when you would like to re-encode multiple columns into a 
single one when writing data out to Kafka.
+
+Both methods are presently only available in Scala and Java.
+
+
+
+{% highlight scala %}
+import org.apache.spark.sql.avro._
+
+// `from_avro` requires Avro schema in JSON string format.
+val jsonFormatSchema = new 
String(Files.readAllBytes(Paths.get("./examples/src/main/resources/user.avsc")))
+
+val df = spark
+  .readStream
+  .format("kafka")
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+  .option("subscribe", "topic1")
+  .load()
+
+// 1. Decode the Avro data into a struct;
+// 2. Filter by column `favorite_color`;
+// 3. Encode the column `name` in Avro format.
+val output = df
+  .select(from_avro('value, jsonFormatSchema) as 'user)
+  .where("user.favorite_color == \"red\"")
+  .select(to_avro($"user.name") as 'value)
+
+val ds = output
+  .writeStream
+  .format("kafka")
+

[GitHub] spark issue #22015: [SPARK-20286][SPARK-24786][Core][DynamicAllocation] Rele...

2018-08-07 Thread dhruve
Github user dhruve commented on the issue:

https://github.com/apache/spark/pull/22015
  
This modifies the SparkListenerUnpersistRDD case class, which fails the 
MiMa tests. If this is something heavily used by developers, I can add either 
another ListenerBus message or modify the PR to make a direct RPC call to the 
ExecutorAllocationManager. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22015: [SPARK-20286] Release executors on unpersisting R...

2018-08-06 Thread dhruve
GitHub user dhruve opened a pull request:

https://github.com/apache/spark/pull/22015

[SPARK-20286] Release executors on unpersisting RDD

## What changes were proposed in this pull request?
Currently, the executors acquired using dynamic allocation are not released 
when the cached RDD is unpersisted. This leads to wasting unnecessary grid 
resources. With this change, once the cached RDD is unpersisted, we check if 
the executor has any running tasks or not. If not then we do the following:
1 - If the executor has cached RDD blocks from other RDDs, we don't make 
any change.
2 - If the executor has no more cached RDD blocks and tasks running, we 
update the removal time based on the conf 
`spark.dynamicAllocation.cachedExecutorIdleTimeout` so the idle executor can be 
released back. 

## How was this patch tested?
Manually using a code snippet. 
``` scala
val rdd = sc.textFile("smallFile")
rdd.cache

val rdd2 = sc.textFile("largeFile")
rdd2.cache

rdd2.count // Cached data on around 500+ executors
Thread.sleep(3) // sleep for 30s
rdd.count // Cached data on around 20 executors

// Verify only 20 executors remain, rest will timeout based on idleTimeout 
which i set to 60s
rdd2.unpersist 

// eventunally  all executors will be released as there are no tasks 
running on any executor.
```


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dhruve/spark bug/SPARK-20286

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22015.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22015


commit 7e229dc120d0f1542cc8d7dbac1027baac36665e
Author: Dhruve Ashar 
Date:   2018-08-06T20:32:47Z

[SPARK-20286] Release executors on unpersisting RDD




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19194: [SPARK-20589] Allow limiting task concurrency per stage

2018-07-20 Thread dhruve
Github user dhruve commented on the issue:

https://github.com/apache/spark/pull/19194
  
@squito I have made the changes as requested. Can you have a look at this 
again. Thanks.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21601: [SPARK-24610] fix reading small files via wholeTextFiles

2018-07-02 Thread dhruve
Github user dhruve commented on the issue:

https://github.com/apache/spark/pull/21601
  
@attilapiros I will modify the test to add a check/assert which makes it 
easy to follow and validate what we are trying to achieve in the test. For the 
rest of the cases, since these are hadoop related configs and not directly 
related to spark, I didn't add additional test cases as these are more related 
to the `CombinedFileInputFormat` rather than `WholeTextFileInputFormat`. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21601: [SPARK-24610] fix reading small files via wholeTe...

2018-07-02 Thread dhruve
Github user dhruve commented on a diff in the pull request:

https://github.com/apache/spark/pull/21601#discussion_r199597945
  
--- Diff: 
core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala ---
@@ -53,6 +53,19 @@ private[spark] class WholeTextFileInputFormat
 val totalLen = files.map(file => if (file.isDirectory) 0L else 
file.getLen).sum
 val maxSplitSize = Math.ceil(totalLen * 1.0 /
   (if (minPartitions == 0) 1 else minPartitions)).toLong
+
+// For small files we need to ensure the min split size per node & 
rack <= maxSplitSize
+val config = context.getConfiguration
+val minSplitSizePerNode = 
config.getLong(CombineFileInputFormat.SPLIT_MINSIZE_PERNODE, 0L)
+val minSplitSizePerRack = 
config.getLong(CombineFileInputFormat.SPLIT_MINSIZE_PERRACK, 0L)
+
+if (maxSplitSize < minSplitSizePerNode) {
+  super.setMinSplitSizeNode(maxSplitSize)
--- End diff --

AFAIU If we set these to `0L` unconditionally, every time there is left 
over data which wasn't combined into a split, would result in its own split 
because minSplitSizePerNode is `0L`. 
This shouldn't be an issue for small no. of files. But if we have a large 
no. of small files which result in a similar situation, we will end up having 
more splits rather than combining these together to form lesser no. of splits.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21601: [SPARK-24610] fix reading small files via wholeTe...

2018-07-02 Thread dhruve
Github user dhruve commented on a diff in the pull request:

https://github.com/apache/spark/pull/21601#discussion_r199602993
  
--- Diff: 
core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala ---
@@ -53,6 +53,19 @@ private[spark] class WholeTextFileInputFormat
 val totalLen = files.map(file => if (file.isDirectory) 0L else 
file.getLen).sum
 val maxSplitSize = Math.ceil(totalLen * 1.0 /
   (if (minPartitions == 0) 1 else minPartitions)).toLong
+
+// For small files we need to ensure the min split size per node & 
rack <= maxSplitSize
+val config = context.getConfiguration
+val minSplitSizePerNode = 
config.getLong(CombineFileInputFormat.SPLIT_MINSIZE_PERNODE, 0L)
+val minSplitSizePerRack = 
config.getLong(CombineFileInputFormat.SPLIT_MINSIZE_PERRACK, 0L)
+
+if (maxSplitSize < minSplitSizePerNode) {
+  super.setMinSplitSizeNode(maxSplitSize)
--- End diff --

Also if a user specifies them via configs we are ensuring that these don't 
break the code. If we set them to `0L` where a user specifies them, we would 
end up breaking the code anyways as the way `CombineFileInputFormat` works is 
it checks to see if the setting is `0L` or not. If it is 0 it ends up picking 
the value from the config.  
https://github.com/apache/hadoop/blob/release-2.8.2-RC0/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java#L182
 So we would have to atleast set the config to avoid hitting the error. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21601: [SPARK-24610] fix reading small files via wholeTextFiles

2018-06-21 Thread dhruve
Github user dhruve commented on the issue:

https://github.com/apache/spark/pull/21601
  
@vanzin Can you review this PR?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21601: [SPARK-24610] fix reading small files via wholeTe...

2018-06-20 Thread dhruve
GitHub user dhruve opened a pull request:

https://github.com/apache/spark/pull/21601

[SPARK-24610] fix reading small files via wholeTextFiles

## What changes were proposed in this pull request?
The `WholeTextFileInputFormat` determines the `maxSplitSize` for the file/s 
being read using the `wholeTextFiles` method. While this works well for large 
files, for smaller files where the maxSplitSize is smaller than the defaults 
being used with configs like hive-site.xml or explicitly passed in the form of 
`mapreduce.input.fileinputformat.split.minsize.per.node` or 
`mapreduce.input.fileinputformat.split.minsize.per.rack` , it just throws up an 
exception.


```java
java.io.IOException: Minimum split size pernode 123456 cannot be larger 
than maximum split size 9962
at 
org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.getSplits(CombineFileInputFormat.java:200)
at 
org.apache.spark.rdd.WholeTextFileRDD.getPartitions(WholeTextFileRDD.scala:50)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
at 
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2096)
at org.apache.spark.rdd.RDD.count(RDD.scala:1158)
... 48 elided
`

This change checks the maxSplitSize against the minSplitSizePerNode and 
minSplitSizePerRack and set them if `maxSplitSize < minSplitSizePerNode/Rack`

## How was this patch tested?
Test manually setting the conf while launching the job and added unit test.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dhruve/spark bug/SPARK-24610

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21601.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21601


commit 2369e3acee730b7d4e45175870de0ecac601069b
Author: Dhruve Ashar 
Date:   2018-06-20T16:34:36Z

[SPARK-24610] fix reading small files via wholeTextFiles




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19194: [SPARK-20589] Allow limiting task concurrency per stage

2017-10-26 Thread dhruve
Github user dhruve commented on the issue:

https://github.com/apache/spark/pull/19194
  
Hi @squito. Sorry for the late response. I want to get back on this. As 
soon as I get a chance I will work on it and update the PR.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19194: [SPARK-20589] Allow limiting task concurrency per...

2017-09-27 Thread dhruve
Github user dhruve commented on a diff in the pull request:

https://github.com/apache/spark/pull/19194#discussion_r141482741
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -512,6 +535,9 @@ private[spark] class TaskSetManager(
   serializedTask)
   }
 } else {
+  if (runningTasks >= maxConcurrentTasks) {
+logDebug("Already running max. no. of concurrent tasks.")
--- End diff --

I'll make the change for this and also update any comments to explain the 
behavior so far. Also I am not clear on the earlier reply as to what was the 
resolution for accounting the activeJobId. Do you still have any inputs ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19194: [SPARK-20589] Allow limiting task concurrency per stage

2017-09-27 Thread dhruve
Github user dhruve commented on the issue:

https://github.com/apache/spark/pull/19194
  
Configuring at the stage level seems to be the appropriate and more 
deterministic choice. If we agree on changing the API, we can start another 
effort looking in that direction. Till then we can mark this feature as 
experimental or have an undocumented config.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19194: [SPARK-20589] Allow limiting task concurrency per stage

2017-09-27 Thread dhruve
Github user dhruve commented on the issue:

https://github.com/apache/spark/pull/19194
  
@squito Thanks for pointing that out. What you mentioned makes sense and I 
did dig some more on the `DAGScheduler` and `activeJobForStage` to gather more 
context. We could take into account the properties of the active job when the 
stage is submitted, however this behavior is indeterministic. 

Let's say if we have two jobs from two different job groups with different 
threshold of task concurrency, the one that's submitted first wins as the stage 
won't be recomputed for the second job. In this case, there is no control over 
which job can get submitted first before the second one (unless the user 
explicitly serializes them). The problem aggravates when the difference between 
the task concurrency threshold is large for the two jobs. In such a case, 
having a wrong value can completely take down your remote service.

For a deterministic behavior, I believe the best way to tackle this would 
be to handle in the stage properties as was the ask. However, since it involves 
an API change, I didn't go that route as the scope for that could be much 
broader. If we have more fundamental use cases which require adding something 
like this on the stage level, we should continue in that direction if the 
community is open and welcomes an API change.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19194: [SPARK-20589] Allow limiting task concurrency per...

2017-09-21 Thread dhruve
Github user dhruve commented on a diff in the pull request:

https://github.com/apache/spark/pull/19194#discussion_r140361162
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -619,6 +625,47 @@ private[spark] class ExecutorAllocationManager(
 // place the executors.
 private val stageIdToExecutorPlacementHints = new mutable.HashMap[Int, 
(Int, Map[String, Int])]
 
+override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
+  jobStart.stageInfos.foreach(stageInfo => 
stageIdToJobId(stageInfo.stageId) = jobStart.jobId)
+
+  var jobGroupId = if (jobStart.properties != null) {
+jobStart.properties.getProperty(SparkContext.SPARK_JOB_GROUP_ID)
+  } else {
+null
+  }
+
+  val maxConTasks = if (jobGroupId != null &&
+conf.contains(s"spark.job.$jobGroupId.maxConcurrentTasks")) {
+conf.get(s"spark.job.$jobGroupId.maxConcurrentTasks").toInt
+  } else {
+Int.MaxValue
+  }
+
+  if (maxConTasks <= 0) {
+throw new IllegalArgumentException(
+  "Maximum Concurrent Tasks should be set greater than 0 for the 
job to progress.")
+  }
+
+  if (jobGroupId == null || 
!conf.contains(s"spark.job.$jobGroupId.maxConcurrentTasks")) {
+jobGroupId = DEFAULT_JOB_GROUP
+  }
+
+  jobIdToJobGroup(jobStart.jobId) = jobGroupId
+  if (!jobGroupToMaxConTasks.contains(jobGroupId)) {
--- End diff --

@squito Thanks for elaborating on this. I believe the last para of your 
explanation summarized your viewpoint. In a way that makes some sense because 
if you want to increase your max concurrent tasks, you know what you are doing, 
so if you see weird thing with other threads that you created, its fine. 
However, when you don't control the thread creation, I feel its best to set it 
just once to avoid the weirdness. Its much easier to use a different job group 
than explain one more weird behavior.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19194: [SPARK-20589] Allow limiting task concurrency per...

2017-09-21 Thread dhruve
Github user dhruve commented on a diff in the pull request:

https://github.com/apache/spark/pull/19194#discussion_r140321640
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -619,6 +625,47 @@ private[spark] class ExecutorAllocationManager(
 // place the executors.
 private val stageIdToExecutorPlacementHints = new mutable.HashMap[Int, 
(Int, Map[String, Int])]
 
+override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
+  jobStart.stageInfos.foreach(stageInfo => 
stageIdToJobId(stageInfo.stageId) = jobStart.jobId)
+
+  var jobGroupId = if (jobStart.properties != null) {
+jobStart.properties.getProperty(SparkContext.SPARK_JOB_GROUP_ID)
+  } else {
+null
+  }
+
+  val maxConTasks = if (jobGroupId != null &&
+conf.contains(s"spark.job.$jobGroupId.maxConcurrentTasks")) {
+conf.get(s"spark.job.$jobGroupId.maxConcurrentTasks").toInt
+  } else {
+Int.MaxValue
+  }
+
+  if (maxConTasks <= 0) {
+throw new IllegalArgumentException(
+  "Maximum Concurrent Tasks should be set greater than 0 for the 
job to progress.")
+  }
+
+  if (jobGroupId == null || 
!conf.contains(s"spark.job.$jobGroupId.maxConcurrentTasks")) {
+jobGroupId = DEFAULT_JOB_GROUP
+  }
+
+  jobIdToJobGroup(jobStart.jobId) = jobGroupId
+  if (!jobGroupToMaxConTasks.contains(jobGroupId)) {
--- End diff --

Okay. We did consider the job-server style deployments, however I am not 
able to follow the example/scenario you mentioned.

So when you say that different set of users are assigned different groups, 
they have to be served by "different threads", because we can set the job group 
on a per thread basis only. 

While it is possible & valid for different threads to belong to the same 
job group, you cannot have multiple job groups for the same thread active 
simultaneously. 

So in this scenario where you would want to change the maxConcTasks for a 
given jobGroup in one thread, the changes would be visible in the other thread 
as well. Because both of them belong to the same job group. In this case, it 
seems that it wasn't configured correctly. I am still not able to follow how 
setting the value every single time will help here. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19194: [SPARK-20589] Allow limiting task concurrency per...

2017-09-21 Thread dhruve
Github user dhruve commented on a diff in the pull request:

https://github.com/apache/spark/pull/19194#discussion_r140293577
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -619,6 +625,47 @@ private[spark] class ExecutorAllocationManager(
 // place the executors.
 private val stageIdToExecutorPlacementHints = new mutable.HashMap[Int, 
(Int, Map[String, Int])]
 
+override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
+  jobStart.stageInfos.foreach(stageInfo => 
stageIdToJobId(stageInfo.stageId) = jobStart.jobId)
+
+  var jobGroupId = if (jobStart.properties != null) {
+jobStart.properties.getProperty(SparkContext.SPARK_JOB_GROUP_ID)
+  } else {
+null
+  }
+
+  val maxConTasks = if (jobGroupId != null &&
+conf.contains(s"spark.job.$jobGroupId.maxConcurrentTasks")) {
+conf.get(s"spark.job.$jobGroupId.maxConcurrentTasks").toInt
+  } else {
+Int.MaxValue
+  }
+
+  if (maxConTasks <= 0) {
+throw new IllegalArgumentException(
+  "Maximum Concurrent Tasks should be set greater than 0 for the 
job to progress.")
+  }
+
+  if (jobGroupId == null || 
!conf.contains(s"spark.job.$jobGroupId.maxConcurrentTasks")) {
+jobGroupId = DEFAULT_JOB_GROUP
+  }
+
+  jobIdToJobGroup(jobStart.jobId) = jobGroupId
+  if (!jobGroupToMaxConTasks.contains(jobGroupId)) {
--- End diff --

I understand your reasoning about setting the maxConTasks every time the 
job is set. However, I am not able to understand the scenario which you 
described. If a job completes and new one kicks off immediately, how does the 
new job partially overlap? Its only when all the stages & underlying tasks for 
the previous job have finished we would mark it as complete. So a new job won't 
overlap with a completed one. Am I missing something here?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19194: [SPARK-20589] Allow limiting task concurrency per...

2017-09-20 Thread dhruve
Github user dhruve commented on a diff in the pull request:

https://github.com/apache/spark/pull/19194#discussion_r140122744
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala ---
@@ -1255,6 +1255,97 @@ class TaskSetManagerSuite extends SparkFunSuite with 
LocalSparkContext with Logg
 assert(taskOption4.get.addedJars === addedJarsMidTaskSet)
   }
 
+  test("limit max concurrent running tasks in a job group when configured 
") {
+val conf = new SparkConf().
+  set(config.BLACKLIST_ENABLED, true).
+  set("spark.job.testJobGroup.maxConcurrentTasks", "2") // Limit max 
concurrent tasks to 2
+
+sc = new SparkContext("local", "test", conf)
+sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", 
"host2"))
+val props = new Properties();
+props.setProperty(SparkContext.SPARK_JOB_GROUP_ID, "testJobGroup") // 
set the job group
+
+val tasks = Array.tabulate[Task[_]](10) { i =>
+  new FakeTask(0, i, Nil)
+}
+val tsm = new TaskSetManager(sched, new TaskSet(tasks, 0, 0, 0, 
props), 2)
+
+// make some offers to our taskset
+var taskDescs = Seq(
+  "exec1" -> "host1",
+  "exec2" -> "host1"
+).flatMap { case (exec, host) =>
+  // offer each executor twice (simulating 2 cores per executor)
+  (0 until 2).flatMap{ _ => tsm.resourceOffer(exec, host, 
TaskLocality.ANY)}
+}
+assert(taskDescs.size === 2) // Out of the 4 offers, tsm can accept up 
to maxConcurrentTasks.
+
+// make 4 more offers
+val taskDescs2 = Seq(
+  "exec1" -> "host1",
+  "exec2" -> "host1"
+).flatMap { case (exec, host) =>
+  (0 until 2).flatMap{ _ => tsm.resourceOffer(exec, host, 
TaskLocality.ANY)}
+}
+assert(taskDescs2.size === 0) // tsm doesn't accept any as it is 
already running at max tasks
+
+// inform tsm that one task has completed
+val directTaskResult = createTaskResult(0)
+tsm.handleSuccessfulTask(taskDescs(0).taskId, directTaskResult)
+
+// make 4 more offers after previous task completed
+taskDescs = Seq(
+  "exec1" -> "host1",
+  "exec2" -> "host1"
+).flatMap { case (exec, host) =>
+  (0 until 2).flatMap{ _ => tsm.resourceOffer(exec, host, 
TaskLocality.ANY)}
+}
+assert(taskDescs.size === 1) // tsm accepts one as it can run one more 
task
+  }
+
+  test("do not limit max concurrent running tasks in a job group by 
default") {
--- End diff --

The previous test covers all the necessary checks introduced by the change. 
I added this to cover the default scenario when no job group is specified. Can 
do away with this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19194: [SPARK-20589] Allow limiting task concurrency per...

2017-09-20 Thread dhruve
Github user dhruve commented on a diff in the pull request:

https://github.com/apache/spark/pull/19194#discussion_r140124886
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -619,6 +625,47 @@ private[spark] class ExecutorAllocationManager(
 // place the executors.
 private val stageIdToExecutorPlacementHints = new mutable.HashMap[Int, 
(Int, Map[String, Int])]
 
+override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
+  jobStart.stageInfos.foreach(stageInfo => 
stageIdToJobId(stageInfo.stageId) = jobStart.jobId)
+
+  var jobGroupId = if (jobStart.properties != null) {
+jobStart.properties.getProperty(SparkContext.SPARK_JOB_GROUP_ID)
+  } else {
+null
+  }
+
+  val maxConTasks = if (jobGroupId != null &&
+conf.contains(s"spark.job.$jobGroupId.maxConcurrentTasks")) {
+conf.get(s"spark.job.$jobGroupId.maxConcurrentTasks").toInt
+  } else {
+Int.MaxValue
+  }
+
+  if (maxConTasks <= 0) {
+throw new IllegalArgumentException(
+  "Maximum Concurrent Tasks should be set greater than 0 for the 
job to progress.")
+  }
+
+  if (jobGroupId == null || 
!conf.contains(s"spark.job.$jobGroupId.maxConcurrentTasks")) {
+jobGroupId = DEFAULT_JOB_GROUP
+  }
+
+  jobIdToJobGroup(jobStart.jobId) = jobGroupId
+  if (!jobGroupToMaxConTasks.contains(jobGroupId)) {
--- End diff --

From my understanding and in context of the current code, you would group 
jobs together when you want their concurrency to be limited. If you want 
different concurrency limits for different jobs, you would set them in a 
different jobgroup altogether. 

If there are multiple jobs in the same job group which run concurrently and 
one of them sets a value different, then which one wins for the existing jobs 
and the new job? If we want to have a different value for every job then the 
user would need a way to know and identify a spark job in his application code 
, probably by a job id. Only by means of identifying a job, would the user be 
able to set the config for that job. This cannot be known apriori and I don't 
know if there is an easy way that the user can know about the underlying spark 
job corresponding to the action. Hence we apply a setting at the jobgroup level 
which allows the user to allow him to control the concurrency without knowing 
the underlying job related details specific to spark in an easy manner.

Let me know if anything is unclear here or if you have more questions. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19194: [SPARK-20589] Allow limiting task concurrency per...

2017-09-20 Thread dhruve
Github user dhruve commented on a diff in the pull request:

https://github.com/apache/spark/pull/19194#discussion_r140122769
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala ---
@@ -1255,6 +1255,97 @@ class TaskSetManagerSuite extends SparkFunSuite with 
LocalSparkContext with Logg
 assert(taskOption4.get.addedJars === addedJarsMidTaskSet)
   }
 
+  test("limit max concurrent running tasks in a job group when configured 
") {
+val conf = new SparkConf().
+  set(config.BLACKLIST_ENABLED, true).
+  set("spark.job.testJobGroup.maxConcurrentTasks", "2") // Limit max 
concurrent tasks to 2
+
+sc = new SparkContext("local", "test", conf)
+sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", 
"host2"))
+val props = new Properties();
+props.setProperty(SparkContext.SPARK_JOB_GROUP_ID, "testJobGroup") // 
set the job group
+
+val tasks = Array.tabulate[Task[_]](10) { i =>
+  new FakeTask(0, i, Nil)
+}
+val tsm = new TaskSetManager(sched, new TaskSet(tasks, 0, 0, 0, 
props), 2)
+
+// make some offers to our taskset
+var taskDescs = Seq(
+  "exec1" -> "host1",
+  "exec2" -> "host1"
+).flatMap { case (exec, host) =>
+  // offer each executor twice (simulating 2 cores per executor)
+  (0 until 2).flatMap{ _ => tsm.resourceOffer(exec, host, 
TaskLocality.ANY)}
+}
+assert(taskDescs.size === 2) // Out of the 4 offers, tsm can accept up 
to maxConcurrentTasks.
+
+// make 4 more offers
+val taskDescs2 = Seq(
+  "exec1" -> "host1",
+  "exec2" -> "host1"
+).flatMap { case (exec, host) =>
+  (0 until 2).flatMap{ _ => tsm.resourceOffer(exec, host, 
TaskLocality.ANY)}
+}
+assert(taskDescs2.size === 0) // tsm doesn't accept any as it is 
already running at max tasks
+
+// inform tsm that one task has completed
+val directTaskResult = createTaskResult(0)
+tsm.handleSuccessfulTask(taskDescs(0).taskId, directTaskResult)
+
+// make 4 more offers after previous task completed
+taskDescs = Seq(
+  "exec1" -> "host1",
+  "exec2" -> "host1"
+).flatMap { case (exec, host) =>
+  (0 until 2).flatMap{ _ => tsm.resourceOffer(exec, host, 
TaskLocality.ANY)}
+}
+assert(taskDescs.size === 1) // tsm accepts one as it can run one more 
task
+  }
+
+  test("do not limit max concurrent running tasks in a job group by 
default") {
+val conf = new SparkConf().
+  set(config.BLACKLIST_ENABLED, true)
+
+sc = new SparkContext("local", "test", conf)
+sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", 
"host2"))
+
+val tasks = Array.tabulate[Task[_]](10) { i =>
+  new FakeTask(0, i, Nil)
+}
+val tsm = new TaskSetManager(sched, new TaskSet(tasks, 0, 0, 0, null), 
2)
+
+// make 5 offers to our taskset
+var taskDescs = Seq(
+  "exec1" -> "host1",
+  "exec2" -> "host2"
+).flatMap { case (exec, host) =>
+  // offer each executor twice (simulating 3 cores per executor)
--- End diff --

okay. Won't be needed as will be removing the test case.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19194: [SPARK-20589] Allow limiting task concurrency per...

2017-09-20 Thread dhruve
Github user dhruve commented on a diff in the pull request:

https://github.com/apache/spark/pull/19194#discussion_r140123047
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -758,11 +825,52 @@ private[spark] class ExecutorAllocationManager(
   allocationManager.synchronized {
 stageIdToNumSpeculativeTasks(stageId) =
   stageIdToNumSpeculativeTasks.getOrElse(stageId, 0) + 1
+maxConcurrentTasks = getMaxConTasks
+logDebug(s"Setting max concurrent tasks to $maxConcurrentTasks on 
spec. task submitted.")
 allocationManager.onSchedulerBacklogged()
   }
 }
 
 /**
+ * Calculate the maximum no. of concurrent tasks that can run 
currently.
+ */
+def getMaxConTasks(): Int = {
+  // We can limit the no. of concurrent tasks by a job group. A job 
group can have multiple jobs
+  // with multiple stages. We need to get all the active stages 
belonging to a job group to
+  // calculate the total no. of pending + running tasks to decide the 
maximum no. of executors
+  // we need at that time to serve the outstanding tasks. This is 
capped by the minimum no. of
+  // outstanding tasks and the max concurrent limit specified for the 
job group if any.
+
+  def getIncompleteTasksForStage(stageId: Int, numTasks: Int): Int = {
+totalPendingTasks(stageId) + totalRunningTasks(stageId)
+  }
+
+  def sumIncompleteTasksForStages: (Int, (Int, Int)) => Int = 
(totalTasks, stageToNumTasks) => {
+val activeTasks = getIncompleteTasksForStage(stageToNumTasks._1, 
stageToNumTasks._2)
+sumOrMax(totalTasks, activeTasks)
+  }
+  // Get the total running & pending tasks for all stages in a job 
group.
+  def getIncompleteTasksForJobGroup(stagesItr: mutable.HashMap[Int, 
Int]): Int = {
+stagesItr.foldLeft(0)(sumIncompleteTasksForStages)
+  }
+
+  def sumIncompleteTasksForJobGroup: (Int, (String, 
mutable.HashMap[Int, Int])) => Int = {
+(maxConTasks, x) => {
+  val totalIncompleteTasksForJobGroup = 
getIncompleteTasksForJobGroup(x._2)
+  val maxTasks = Math.min(jobGroupToMaxConTasks(x._1), 
totalIncompleteTasksForJobGroup)
+  sumOrMax(maxConTasks, maxTasks)
+}
+  }
+
+  def sumOrMax(a: Int, b: Int): Int = if (doesSumOverflow(a, b)) 
Int.MaxValue else (a + b)
+
+  def doesSumOverflow(a: Int, b: Int): Boolean = b > (Int.MaxValue - a)
+
+  val stagesByJobGroup = stageIdToNumTasks.groupBy(x => 
jobIdToJobGroup(stageIdToJobId(x._1)))
--- End diff --

I like the idea. I think this can be done. Will update the PR.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19194: [SPARK-20589] Allow limiting task concurrency per stage

2017-09-18 Thread dhruve
Github user dhruve commented on the issue:

https://github.com/apache/spark/pull/19194
  
@tgraves I have addressed the comments and tried to cover the possible 
cases in the existing test for job groups and speculation. Kindly let me know 
if we need to add or address more use cases.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19194: [SPARK-20589] Allow limiting task concurrency per...

2017-09-15 Thread dhruve
Github user dhruve commented on a diff in the pull request:

https://github.com/apache/spark/pull/19194#discussion_r139163830
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala ---
@@ -1255,6 +1255,97 @@ class TaskSetManagerSuite extends SparkFunSuite with 
LocalSparkContext with Logg
 assert(taskOption4.get.addedJars === addedJarsMidTaskSet)
   }
 
+  test("limit max concurrent running tasks in a job group when configured 
") {
+val conf = new SparkConf().
+  set(config.BLACKLIST_ENABLED, true).
+  set("spark.job.testJobGroup.maxConcurrentTasks", "2") // Limit max 
concurrent tasks to 2
+
+sc = new SparkContext("local", "test", conf)
+sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", 
"host2"))
+val props = new Properties();
+props.setProperty(SparkContext.SPARK_JOB_GROUP_ID, "testJobGroup") // 
set the job group
+
+val tasks = Array.tabulate[Task[_]](10) { i =>
+  new FakeTask(0, i, Nil)
+}
+val tsm = new TaskSetManager(sched, new TaskSet(tasks, 0, 0, 0, 
props), 2)
+
+// make some offers to our taskset
+var taskDescs = Seq(
+  "exec1" -> "host1",
+  "exec2" -> "host1"
+).flatMap { case (exec, host) =>
+  // offer each executor twice (simulating 2 cores per executor)
+  (0 until 2).flatMap{ _ => tsm.resourceOffer(exec, host, 
TaskLocality.ANY)}
+}
+assert(taskDescs.size === 2) // Out of the 4 offers, tsm can accept up 
to maxConcurrentTasks.
+
+// make 4 more offers
+val taskDescs2 = Seq(
+  "exec1" -> "host1",
+  "exec2" -> "host1"
+).flatMap { case (exec, host) =>
+  (0 until 2).flatMap{ _ => tsm.resourceOffer(exec, host, 
TaskLocality.ANY)}
+}
+assert(taskDescs2.size === 0) // tsm doesn't accept any as it is 
already running at max tasks
+
+// inform tsm that one task has completed
+val directTaskResult = createTaskResult(0)
+tsm.handleSuccessfulTask(taskDescs(0).taskId, directTaskResult)
+
+// make 4 more offers after previous task completed
+taskDescs = Seq(
+  "exec1" -> "host1",
+  "exec2" -> "host1"
+).flatMap { case (exec, host) =>
+  (0 until 2).flatMap{ _ => tsm.resourceOffer(exec, host, 
TaskLocality.ANY)}
+}
+assert(taskDescs.size === 1) // tsm accepts one as it can run one more 
task
+  }
+
+  test("do not limit max concurrent running tasks in a job group by 
default") {
+val conf = new SparkConf().
+  set(config.BLACKLIST_ENABLED, true)
+
+sc = new SparkContext("local", "test", conf)
+sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", 
"host2"))
+
+val tasks = Array.tabulate[Task[_]](10) { i =>
+  new FakeTask(0, i, Nil)
+}
+val tsm = new TaskSetManager(sched, new TaskSet(tasks, 0, 0, 0, null), 
2)
+
+// make 5 offers to our taskset
+var taskDescs = Seq(
+  "exec1" -> "host1",
+  "exec2" -> "host2"
+).flatMap { case (exec, host) =>
+  // offer each executor twice (simulating 2 cores per executor)
--- End diff --

yes. It should be 2.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19194: [SPARK-20589] Allow limiting task concurrency per...

2017-09-15 Thread dhruve
Github user dhruve commented on a diff in the pull request:

https://github.com/apache/spark/pull/19194#discussion_r139163463
  
--- Diff: 
core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala ---
@@ -210,22 +216,282 @@ class ExecutorAllocationManagerSuite
 assert(numExecutorsToAdd(manager) === 1)
 
 // Verify that running a task doesn't affect the target
-sc.listenerBus.postToAll(SparkListenerTaskStart(1, 0, 
createTaskInfo(0, 0, "executor-1")))
+sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, 
createTaskInfo(0, 0, "executor-1")))
 assert(numExecutorsTarget(manager) === 5)
 assert(addExecutors(manager) === 0)
 assert(numExecutorsToAdd(manager) === 1)
 
 // Verify that running a speculative task doesn't affect the target
-sc.listenerBus.postToAll(SparkListenerTaskStart(1, 0, 
createTaskInfo(0, 0, "executor-2", true)))
+sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, 
createTaskInfo(0, 0, "executor-2", true)))
 assert(numExecutorsTarget(manager) === 5)
 assert(addExecutors(manager) === 0)
 assert(numExecutorsToAdd(manager) === 1)
   }
 
+  test("add executors capped by max concurrent tasks for a job group with 
single core executors") {
+val conf = new SparkConf()
+  .setMaster("myDummyLocalExternalClusterManager")
+  .setAppName("test-executor-allocation-manager")
+  .set("spark.dynamicAllocation.enabled", "true")
+  .set("spark.dynamicAllocation.testing", "true")
+  .set("spark.job.group1.maxConcurrentTasks", "2")
+  .set("spark.job.group2.maxConcurrentTasks", "5")
+val sc = new SparkContext(conf)
+contexts += sc
+sc.setJobGroup("group1", "", false)
+
+val manager = sc.executorAllocationManager.get
+val stages = Seq(createStageInfo(0, 10), createStageInfo(1, 10))
+// Submit the job and stage start/submit events
+sc.listenerBus.postToAll(SparkListenerJobStart(0, 0, stages, 
sc.getLocalProperties))
+sc.listenerBus.postToAll(SparkListenerStageSubmitted(stages(0)))
+
+// Verify that we're capped at number of max concurrent tasks in the 
stage
+assert(maxNumExecutorsNeeded(manager) === 2)
+
+// Submit another stage in the same job
+sc.listenerBus.postToAll(SparkListenerStageSubmitted(stages(1)))
+assert(maxNumExecutorsNeeded(manager) === 2)
+
+sc.listenerBus.postToAll(SparkListenerStageCompleted(stages(0)))
+sc.listenerBus.postToAll(SparkListenerStageCompleted(stages(1)))
+sc.listenerBus.postToAll(SparkListenerJobEnd(0, 10, JobSucceeded))
+
+// Submit a new job in the same job group
+val stage2 = createStageInfo(2, 20)
+sc.listenerBus.postToAll(SparkListenerJobStart(1, 0, Seq(stage2), 
sc.getLocalProperties))
+sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage2))
+assert(maxNumExecutorsNeeded(manager) === 2)
+
+sc.listenerBus.postToAll(SparkListenerStageCompleted(stage2))
+sc.listenerBus.postToAll(SparkListenerJobEnd(1, 10, JobSucceeded))
+
+// Set another jobGroup
+sc.setJobGroup("group2", "", false)
+
+val stage3 = createStageInfo(3, 20)
+sc.listenerBus.postToAll(SparkListenerJobStart(2, 0, Seq(stage3), 
sc.getLocalProperties))
+sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage3))
+assert(maxNumExecutorsNeeded(manager) === 5)
+
+sc.listenerBus.postToAll(SparkListenerStageCompleted(stage3))
+sc.listenerBus.postToAll(SparkListenerJobEnd(2, 10, JobSucceeded))
+
+// Clear jobGroup
+sc.clearJobGroup()
+
+val stage4 = createStageInfo(4, 50)
+sc.listenerBus.postToAll(SparkListenerJobStart(2, 0, Seq(stage4), 
sc.getLocalProperties))
+sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage4))
+assert(maxNumExecutorsNeeded(manager) === 50)
+  }
+
+  test("add executors capped by max concurrent tasks for a job group with 
multi cores executors") {
+val conf = new SparkConf()
+  .setMaster("myDummyLocalExternalClusterManager")
+  .setAppName("test-executor-allocation-manager")
+  .set("spark.dynamicAllocation.enabled", "true")
+  .set("spark.dynamicAllocation.testing", "true")
+  .set("spark.job.group1.maxConcurrentTasks", "2")
+  .set("spark.job.group2.maxConcurrentTasks", "5")
+  .set("spark.executor.cores", "3")
+val sc = new SparkContext(conf)

[GitHub] spark pull request #19194: [SPARK-20589] Allow limiting task concurrency per...

2017-09-15 Thread dhruve
Github user dhruve commented on a diff in the pull request:

https://github.com/apache/spark/pull/19194#discussion_r139163529
  
--- Diff: 
core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala ---
@@ -210,22 +216,282 @@ class ExecutorAllocationManagerSuite
 assert(numExecutorsToAdd(manager) === 1)
 
 // Verify that running a task doesn't affect the target
-sc.listenerBus.postToAll(SparkListenerTaskStart(1, 0, 
createTaskInfo(0, 0, "executor-1")))
+sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, 
createTaskInfo(0, 0, "executor-1")))
 assert(numExecutorsTarget(manager) === 5)
 assert(addExecutors(manager) === 0)
 assert(numExecutorsToAdd(manager) === 1)
 
 // Verify that running a speculative task doesn't affect the target
-sc.listenerBus.postToAll(SparkListenerTaskStart(1, 0, 
createTaskInfo(0, 0, "executor-2", true)))
+sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, 
createTaskInfo(0, 0, "executor-2", true)))
 assert(numExecutorsTarget(manager) === 5)
 assert(addExecutors(manager) === 0)
 assert(numExecutorsToAdd(manager) === 1)
   }
 
+  test("add executors capped by max concurrent tasks for a job group with 
single core executors") {
+val conf = new SparkConf()
+  .setMaster("myDummyLocalExternalClusterManager")
+  .setAppName("test-executor-allocation-manager")
+  .set("spark.dynamicAllocation.enabled", "true")
+  .set("spark.dynamicAllocation.testing", "true")
+  .set("spark.job.group1.maxConcurrentTasks", "2")
+  .set("spark.job.group2.maxConcurrentTasks", "5")
+val sc = new SparkContext(conf)
+contexts += sc
+sc.setJobGroup("group1", "", false)
+
+val manager = sc.executorAllocationManager.get
+val stages = Seq(createStageInfo(0, 10), createStageInfo(1, 10))
+// Submit the job and stage start/submit events
+sc.listenerBus.postToAll(SparkListenerJobStart(0, 0, stages, 
sc.getLocalProperties))
+sc.listenerBus.postToAll(SparkListenerStageSubmitted(stages(0)))
+
+// Verify that we're capped at number of max concurrent tasks in the 
stage
+assert(maxNumExecutorsNeeded(manager) === 2)
+
+// Submit another stage in the same job
+sc.listenerBus.postToAll(SparkListenerStageSubmitted(stages(1)))
+assert(maxNumExecutorsNeeded(manager) === 2)
+
+sc.listenerBus.postToAll(SparkListenerStageCompleted(stages(0)))
+sc.listenerBus.postToAll(SparkListenerStageCompleted(stages(1)))
+sc.listenerBus.postToAll(SparkListenerJobEnd(0, 10, JobSucceeded))
+
+// Submit a new job in the same job group
+val stage2 = createStageInfo(2, 20)
+sc.listenerBus.postToAll(SparkListenerJobStart(1, 0, Seq(stage2), 
sc.getLocalProperties))
+sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage2))
+assert(maxNumExecutorsNeeded(manager) === 2)
+
+sc.listenerBus.postToAll(SparkListenerStageCompleted(stage2))
+sc.listenerBus.postToAll(SparkListenerJobEnd(1, 10, JobSucceeded))
+
+// Set another jobGroup
+sc.setJobGroup("group2", "", false)
+
+val stage3 = createStageInfo(3, 20)
+sc.listenerBus.postToAll(SparkListenerJobStart(2, 0, Seq(stage3), 
sc.getLocalProperties))
+sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage3))
+assert(maxNumExecutorsNeeded(manager) === 5)
+
+sc.listenerBus.postToAll(SparkListenerStageCompleted(stage3))
+sc.listenerBus.postToAll(SparkListenerJobEnd(2, 10, JobSucceeded))
+
+// Clear jobGroup
+sc.clearJobGroup()
+
+val stage4 = createStageInfo(4, 50)
+sc.listenerBus.postToAll(SparkListenerJobStart(2, 0, Seq(stage4), 
sc.getLocalProperties))
+sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage4))
+assert(maxNumExecutorsNeeded(manager) === 50)
+  }
+
+  test("add executors capped by max concurrent tasks for a job group with 
multi cores executors") {
+val conf = new SparkConf()
+  .setMaster("myDummyLocalExternalClusterManager")
+  .setAppName("test-executor-allocation-manager")
+  .set("spark.dynamicAllocation.enabled", "true")
+  .set("spark.dynamicAllocation.testing", "true")
+  .set("spark.job.group1.maxConcurrentTasks", "2")
+  .set("spark.job.group2.maxConcurrentTasks", "5")
+  .set("spark.executor.cores", "3")
+val sc = new SparkContext(conf)

[GitHub] spark pull request #19194: [SPARK-20589] Allow limiting task concurrency per...

2017-09-15 Thread dhruve
Github user dhruve commented on a diff in the pull request:

https://github.com/apache/spark/pull/19194#discussion_r139163109
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -758,11 +812,58 @@ private[spark] class ExecutorAllocationManager(
   allocationManager.synchronized {
 stageIdToNumSpeculativeTasks(stageId) =
   stageIdToNumSpeculativeTasks.getOrElse(stageId, 0) + 1
+maxConcurrentTasks = getMaxConTasks
+logDebug(s"Setting max concurrent tasks to $maxConcurrentTasks on 
spec. task submitted.")
 allocationManager.onSchedulerBacklogged()
   }
 }
 
 /**
+ * Calculate the maximum no. of concurrent tasks that can run 
currently.
+ */
+def getMaxConTasks(): Int = {
+  // We can limit the no. of concurrent tasks by a job group. A job 
group can have multiple jobs
+  // with multiple stages. We need to get all the active stages 
belonging to a job group to
+  // calculate the total no. of pending + running tasks to decide the 
maximum no. of executors
+  // we need at that time to serve the outstanding tasks. This is 
capped by the minimum of no.
+  // of outstanding tasks and the max concurrent limit specified for 
the job group if any.
+
+  def getIncompleteTasksForStage(stageId: Int, numTasks: Int): Int = {
+var runningTasks = 0
+if (stageIdToTaskIndices.contains(stageId)) {
+  runningTasks =
+stageIdToTaskIndices(stageId).size - 
stageIdToCompleteTaskCount.getOrElse(stageId, 0)
--- End diff --

Yes. Nice catch. We do need to account for all the tasks for a stage. And 
this should include speculative ones as well.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19194: [SPARK-20589] Allow limiting task concurrency per...

2017-09-15 Thread dhruve
Github user dhruve commented on a diff in the pull request:

https://github.com/apache/spark/pull/19194#discussion_r139162871
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -758,11 +812,58 @@ private[spark] class ExecutorAllocationManager(
   allocationManager.synchronized {
 stageIdToNumSpeculativeTasks(stageId) =
   stageIdToNumSpeculativeTasks.getOrElse(stageId, 0) + 1
+maxConcurrentTasks = getMaxConTasks
+logDebug(s"Setting max concurrent tasks to $maxConcurrentTasks on 
spec. task submitted.")
 allocationManager.onSchedulerBacklogged()
   }
 }
 
 /**
+ * Calculate the maximum no. of concurrent tasks that can run 
currently.
+ */
+def getMaxConTasks(): Int = {
+  // We can limit the no. of concurrent tasks by a job group. A job 
group can have multiple jobs
+  // with multiple stages. We need to get all the active stages 
belonging to a job group to
+  // calculate the total no. of pending + running tasks to decide the 
maximum no. of executors
+  // we need at that time to serve the outstanding tasks. This is 
capped by the minimum of no.
--- End diff --

okay.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19194: [SPARK-20589] Allow limiting task concurrency per stage

2017-09-11 Thread dhruve
Github user dhruve commented on the issue:

https://github.com/apache/spark/pull/19194
  
Rebased this PR with current master and have squashed the earlier commits. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19194: [SPARK-20589] Allow limiting task concurrency per...

2017-09-11 Thread dhruve
GitHub user dhruve opened a pull request:

https://github.com/apache/spark/pull/19194

[SPARK-20589] Allow limiting task concurrency per stage

## What changes were proposed in this pull request?
This change allows the user to specify the maximum no. of tasks running in 
a given job group. (Kindly see the jira comments section for more context on 
why this is implemented at a job group level rather than a stage level). This 
change is beneficial where the user wants to avoid having a DoS while trying to 
access an eternal service from multiple executors without having the need to 
repartition or coalesce existing RDDs.

This code change introduces a new user level configuration: 
`spark.job.[userJobGroup].maxConcurrentTasks` which is used to set the active 
no. of tasks executing at a given point in time.

The user can use the feature by setting the appropriate jobGroup and 
passing the conf:

```
conf.set("spark.job.group1.maxConcurrentTasks", "10")
...
sc.setJobGroup("group1", "", false)
sc.parallelize(1 to 10, 10).map(x => x + 1).count
sc.clearJobGroup
```

 changes proposed in this fix 
This change limits the no. of tasks (in turn also the no. of executors to 
be acquired) than can run simultaneously in a given job group and its 
subsequent job/s and stage/s if the appropriate job group and max concurrency 
configs are set.

## How was this patch tested?
Ran unit tests and multiple manual tests with various combinations of:
- single/multiple/no job groups
- executors with single/multi cores
- dynamic allocation on/off


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dhruve/spark impr/SPARK-20589

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19194.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19194


commit 4281151df9010b4e9fe91e588c07e872b8e0dd69
Author: Dhruve Ashar <dhruveas...@gmail.com>
Date:   2017-09-11T16:45:49Z

[SPARK-20589] Allow limiting task concurrency per stage




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19157: [SPARK-20589][Core][Scheduler] Allow limiting tas...

2017-09-11 Thread dhruve
Github user dhruve closed the pull request at:

https://github.com/apache/spark/pull/19157


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19157: [SPARK-20589][Core][Scheduler] Allow limiting task concu...

2017-09-08 Thread dhruve
Github user dhruve commented on the issue:

https://github.com/apache/spark/pull/19157
  
@HyukjinKwon Thanks for pointing this out. I will do a rebase and then do a 
push. The message from appveyor wasn't very obvious so I didn't realize.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19157: [SPARK-20589][Core][Scheduler] Allow limiting task concu...

2017-09-07 Thread dhruve
Github user dhruve commented on the issue:

https://github.com/apache/spark/pull/19157
  
Reopened this because CI was having issues with the previous PR. 
[18950](https://github.com/apache/spark/pull/18950)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19157: [SPARK-20589][Core][Scheduler] Allow limiting task concu...

2017-09-07 Thread dhruve
Github user dhruve commented on the issue:

https://github.com/apache/spark/pull/19157
  
@squito @markhamstra @tgravescs Can you review this PR. Thanks.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19157: [SPARK-20589][Core][Scheduler] Allow limiting tas...

2017-09-07 Thread dhruve
GitHub user dhruve opened a pull request:

https://github.com/apache/spark/pull/19157

[SPARK-20589][Core][Scheduler] Allow limiting task concurrency per job group

## What changes were proposed in this pull request?
This change allows the user to specify the maximum no. of tasks running in 
a given job group. (Kindly see the jira comments section for more context on 
why this is implemented at a job group level rather than a stage level). This 
change is beneficial where the user wants to avoid having a DoS while trying to 
access an eternal service from multiple executors without having the need to 
repartition or coalesce existing RDDs.

This code change introduces a new user level configuration: 
`spark.job.[userJobGroup].maxConcurrentTasks` which is used to set the active 
no. of tasks executing at a given point in time.

The user can use the feature by setting the appropriate jobGroup and 
passing the conf:

```
conf.set("spark.job.group1.maxConcurrentTasks", "10")
...
sc.setJobGroup("group1", "", false)
sc.parallelize(1 to 10, 10).map(x => x + 1).count
sc.clearJobGroup
```

 changes proposed in this fix 
This change limits the no. of tasks (in turn also the no. of executors to 
be acquired) than can run simultaneously in a given job group and its 
subsequent job/s and stage/s if the appropriate job group and max concurrency 
configs are set.

## How was this patch tested?
Ran unit tests and multiple manual tests with various combinations of:
- single/multiple/no job groups
- executors with single/multi cores
- dynamic allocation on/off


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dhruve/spark impr/SPARK-20589

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19157.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19157


commit 824396c82977171c38ab5d7f6c0f84bc19eccaba
Author: Dhruve Ashar <dhruveas...@gmail.com>
Date:   2017-08-15T14:18:21Z

[SPARK-20589] Allow limiting task concurrency per stage

commit d3f8162dab4ca7065d7f296fd03528ce6ddfb923
Author: Dhruve Ashar <dhruveas...@gmail.com>
Date:   2017-08-15T14:45:18Z

Merge branch 'master' of github.com:apache/spark into impr/SPARK-20589

commit 824621286ffb107010409c4d0d3442550628247d
Author: Dhruve Ashar <dhruveas...@gmail.com>
Date:   2017-08-21T16:51:41Z

Allow limiting task concurrency per stage in concurrent job groups

commit 517acb490ae5938a22c4175347f6bbc24b47781f
Author: Dhruve Ashar <dhruveas...@gmail.com>
Date:   2017-08-21T19:30:17Z

Remove comment

commit 65941f7884551e84a13a6cc2e7488a01e7d8beec
Author: Dhruve Ashar <dhruveas...@gmail.com>
Date:   2017-08-21T19:42:05Z

Fix comment style

commit 7aba73a31808f6b1017b85dfd4dd19e28365bd97
Author: Dhruve Ashar <dhruveas...@gmail.com>
Date:   2017-08-22T14:54:10Z

Merge branch 'master' of github.com:apache/spark into impr/SPARK-20589

commit 0e518f00ce97fd5d17fe89792c2503d2514b0473
Author: Dhruve Ashar <dhruveas...@gmail.com>
Date:   2017-08-22T15:38:01Z

Fix new unit test and add comments

commit 8b3830004d69bd5f109fd9846f59583c23a910c7
Author: Dhruve Ashar <dhruveas...@gmail.com>
Date:   2017-09-05T20:14:02Z

Resolve merge conflict and add test for speculative task




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18950: [SPARK-20589][Core][Scheduler] Allow limiting task concu...

2017-09-07 Thread dhruve
Github user dhruve commented on the issue:

https://github.com/apache/spark/pull/18950
  
CI is having issues downloading my repo. Closing this PR and opening a new 
one.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18950: [SPARK-20589][Core][Scheduler] Allow limiting tas...

2017-09-07 Thread dhruve
Github user dhruve closed the pull request at:

https://github.com/apache/spark/pull/18950


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18950: [SPARK-20589][Core][Scheduler] Allow limiting tas...

2017-08-22 Thread dhruve
Github user dhruve commented on a diff in the pull request:

https://github.com/apache/spark/pull/18950#discussion_r134608269
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -598,13 +600,58 @@ private[spark] class ExecutorAllocationManager(
 private val executorIdToTaskIds = new mutable.HashMap[String, 
mutable.HashSet[Long]]
 // Number of tasks currently running on the cluster.  Should be 0 when 
no stages are active.
 private var numRunningTasks: Int = _
+private val jobGroupToMaxConTasks = new mutable.HashMap[String, Int]
+private val jobIdToJobGroup = new mutable.HashMap[Int, String]
+private val stageIdToJobId = new mutable.HashMap[Int, Int]
+private val stageIdToCompleteTaskCount = new mutable.HashMap[Int, Int]
 
 // stageId to tuple (the number of task with locality preferences, a 
map where each pair is a
 // node and the number of tasks that would like to be scheduled on 
that node) map,
 // maintain the executor placement hints for each stage Id used by 
resource framework to better
 // place the executors.
 private val stageIdToExecutorPlacementHints = new mutable.HashMap[Int, 
(Int, Map[String, Int])]
 
+override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
+  jobStart.stageInfos.foreach(stageInfo => 
stageIdToJobId(stageInfo.stageId) = jobStart.jobId)
+
+  var jobGroupId = if (jobStart.properties != null) {
+jobStart.properties.getProperty(SparkContext.SPARK_JOB_GROUP_ID)
+  } else {
+null
+  }
+
+  val maxConTasks = if (jobGroupId != null &&
+conf.contains(s"spark.job.$jobGroupId.maxConcurrentTasks")) {
+conf.get(s"spark.job.$jobGroupId.maxConcurrentTasks").toInt
+  } else {
+Int.MaxValue
+  }
+
+  if (maxConTasks <= 0) {
+throw new IllegalArgumentException(
+  "Maximum Concurrent Tasks should be set greater than 0 for the 
job to progress.")
+  }
+
+  if (jobGroupId == null || 
!conf.contains(s"spark.job.$jobGroupId.maxConcurrentTasks")) {
+jobGroupId = "default-group-" + jobStart.jobId.hashCode
--- End diff --

Okay I will have the default job group named as `__default_job_group`. Let 
me know if we want to change it to a different one.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18950: [SPARK-20589][Core][Scheduler] Allow limiting tas...

2017-08-22 Thread dhruve
Github user dhruve commented on a diff in the pull request:

https://github.com/apache/spark/pull/18950#discussion_r134608326
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -727,6 +780,68 @@ private[spark] class ExecutorAllocationManager(
 }
 
 /**
+ * Calculate the maximum no. of concurrent tasks that can run 
currently.
+ */
+def getMaxConTasks(): Int = {
+  // We can limit the no. of concurrent tasks by a job group and 
multiple jobs can run with
--- End diff --

Okay. I will update the comment to make it more clear.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18950: [SPARK-20589][Core][Scheduler] Allow limiting task concu...

2017-08-22 Thread dhruve
Github user dhruve commented on the issue:

https://github.com/apache/spark/pull/18950
  
@squito I will pull the test from the latest master and update it with the 
changes we made.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18950: [SPARK-20589][Core][Scheduler] Allow limiting task concu...

2017-08-21 Thread dhruve
Github user dhruve commented on the issue:

https://github.com/apache/spark/pull/18950
  
@squito @markhamstra I addressed the comments and have made the changes to 
account for running different job groups concurrently.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18950: [SPARK-20589][Core][Scheduler] Allow limiting tas...

2017-08-21 Thread dhruve
GitHub user dhruve reopened a pull request:

https://github.com/apache/spark/pull/18950

[SPARK-20589][Core][Scheduler] Allow limiting task concurrency per job group

## What changes were proposed in this pull request?
This change allows the user to specify the maximum no. of tasks running in 
a given job group. (Kindly see the jira comments section for more context on 
why this is implemented at a job group level rather than a stage level). This 
change is beneficial where the user wants to avoid having a DoS while trying to 
access an eternal service from multiple executors without having the need to 
repartition or coalesce existing RDDs.

This code change introduces a new user level configuration: 
`spark.job.[userJobGroup].maxConcurrentTasks` which is used to set the active 
no. of tasks executing at a given point in time.

The user can use the feature by setting the appropriate jobGroup and 
passing the conf:

```
conf.set("spark.job.group1.maxConcurrentTasks", "10")
...
sc.setJobGroup("group1", "", false)
sc.parallelize(1 to 10, 10).map(x => x + 1).count
sc.clearJobGroup
```

 changes proposed in this fix 
This change limits the no. of tasks (in turn also the no. of executors to 
be acquired) than can run simultaneously in a given job group and its 
subsequent job/s and stage/s if the appropriate job group and max concurrency 
configs are set.

## How was this patch tested?
Ran unit tests and multiple manual tests with various combinations of:
- single/multiple/no job groups
- executors with single/multi cores
- dynamic allocation on/off


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dhruve/spark impr/SPARK-20589

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/18950.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #18950


commit 824396c82977171c38ab5d7f6c0f84bc19eccaba
Author: Dhruve Ashar <dhruveas...@gmail.com>
Date:   2017-08-15T14:18:21Z

[SPARK-20589] Allow limiting task concurrency per stage

commit d3f8162dab4ca7065d7f296fd03528ce6ddfb923
Author: Dhruve Ashar <dhruveas...@gmail.com>
Date:   2017-08-15T14:45:18Z

Merge branch 'master' of github.com:apache/spark into impr/SPARK-20589

commit 824621286ffb107010409c4d0d3442550628247d
Author: Dhruve Ashar <dhruveas...@gmail.com>
Date:   2017-08-21T16:51:41Z

Allow limiting task concurrency per stage in concurrent job groups

commit 517acb490ae5938a22c4175347f6bbc24b47781f
Author: Dhruve Ashar <dhruveas...@gmail.com>
Date:   2017-08-21T19:30:17Z

Remove comment




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18950: [SPARK-20589][Core][Scheduler] Allow limiting tas...

2017-08-21 Thread dhruve
Github user dhruve closed the pull request at:

https://github.com/apache/spark/pull/18950


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18950: [SPARK-20589][Core][Scheduler] Allow limiting tas...

2017-08-16 Thread dhruve
Github user dhruve commented on a diff in the pull request:

https://github.com/apache/spark/pull/18950#discussion_r133547976
  
--- Diff: 
core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala ---
@@ -188,6 +188,125 @@ class ExecutorAllocationManagerSuite
 assert(numExecutorsTarget(manager) === 10)
   }
 
+  test("add executors capped by max concurrent tasks for a job group with 
single core executors") {
+val conf = new SparkConf()
+  .setMaster("myDummyLocalExternalClusterManager")
+  .setAppName("test-executor-allocation-manager")
+  .set("spark.dynamicAllocation.enabled", "true")
+  .set("spark.dynamicAllocation.testing", "true")
+  .set("spark.job.group1.maxConcurrentTasks", "2")
+  .set("spark.job.group2.maxConcurrentTasks", "5")
+val sc = new SparkContext(conf)
+contexts += sc
+sc.setJobGroup("group1", "", false)
+
+val manager = sc.executorAllocationManager.get
+val stage0 = createStageInfo(0, 10)
+// Submit the job and stage start/submit events
+sc.listenerBus.postToAll(SparkListenerJobStart(0, 0, Seq{stage0}, 
sc.getLocalProperties))
+sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage0))
+
+// Verify that we're capped at number of max concurrent tasks in the 
stage
+assert(maxNumExecutorsNeeded(manager) === 2)
+
+// Submit another stage in the same job
+val stage1 = createStageInfo(1, 10)
+sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage1))
+assert(maxNumExecutorsNeeded(manager) === 2)
+
+sc.listenerBus.postToAll(SparkListenerStageCompleted(stage0))
+sc.listenerBus.postToAll(SparkListenerStageCompleted(stage1))
+sc.listenerBus.postToAll(SparkListenerJobEnd(0, 10, JobSucceeded))
+
+// Submit a new job in the same job group
+val stage2 = createStageInfo(2, 20)
+sc.listenerBus.postToAll(SparkListenerJobStart(1, 0, Seq{stage2}, 
sc.getLocalProperties))
+sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage2))
+assert(maxNumExecutorsNeeded(manager) === 2)
+
+sc.listenerBus.postToAll(SparkListenerStageCompleted(stage2))
+sc.listenerBus.postToAll(SparkListenerJobEnd(1, 10, JobSucceeded))
+
+// Set another jobGroup
+sc.setJobGroup("group2", "", false)
+
+val stage3 = createStageInfo(3, 20)
+sc.listenerBus.postToAll(SparkListenerJobStart(2, 0, Seq{stage3}, 
sc.getLocalProperties))
+sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage3))
+assert(maxNumExecutorsNeeded(manager) === 5)
+
+sc.listenerBus.postToAll(SparkListenerStageCompleted(stage3))
+sc.listenerBus.postToAll(SparkListenerJobEnd(2, 10, JobSucceeded))
+
+// Clear jobGroup
+sc.clearJobGroup()
+
+val stage4 = createStageInfo(4, 50)
+sc.listenerBus.postToAll(SparkListenerJobStart(2, 0, Seq{stage4}, 
sc.getLocalProperties))
+sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage4))
+assert(maxNumExecutorsNeeded(manager) === 50)
+  }
+
+  test("add executors capped by max concurrent tasks for a job group with 
multi cores executors") {
+val conf = new SparkConf()
+  .setMaster("myDummyLocalExternalClusterManager")
+  .setAppName("test-executor-allocation-manager")
+  .set("spark.dynamicAllocation.enabled", "true")
+  .set("spark.dynamicAllocation.testing", "true")
+  .set("spark.job.group1.maxConcurrentTasks", "2")
+  .set("spark.job.group2.maxConcurrentTasks", "5")
+  .set("spark.executor.cores", "3")
+val sc = new SparkContext(conf)
+contexts += sc
+sc.setJobGroup("group1", "", false)
+
+val manager = sc.executorAllocationManager.get
+val stage0 = createStageInfo(0, 10)
+// Submit the job and stage start/submit events
+sc.listenerBus.postToAll(SparkListenerJobStart(0, 0, Seq{stage0}, 
sc.getLocalProperties))
+sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage0))
+
+// Verify that we're capped at number of max concurrent tasks in the 
stage
+assert(maxNumExecutorsNeeded(manager) === 1)
+
+// Submit another stage in the same job
+val stage1 = createStageInfo(1, 10)
+sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage1))
+assert(maxNumExecutorsNeeded(manager) === 1)
+
+sc.listenerBus.postToAll(SparkListenerStageCompleted(stage0))
+sc.listenerB

[GitHub] spark pull request #18950: [SPARK-20589][Core][Scheduler] Allow limiting tas...

2017-08-16 Thread dhruve
Github user dhruve commented on a diff in the pull request:

https://github.com/apache/spark/pull/18950#discussion_r133547780
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -454,64 +477,68 @@ private[spark] class TaskSetManager(
 }
   }
 
-  dequeueTask(execId, host, allowedLocality).map { case ((index, 
taskLocality, speculative)) =>
-// Found a task; do some bookkeeping and return a task description
-val task = tasks(index)
-val taskId = sched.newTaskId()
-// Do various bookkeeping
-copiesRunning(index) += 1
-val attemptNum = taskAttempts(index).size
-val info = new TaskInfo(taskId, index, attemptNum, curTime,
-  execId, host, taskLocality, speculative)
-taskInfos(taskId) = info
-taskAttempts(index) = info :: taskAttempts(index)
-// Update our locality level for delay scheduling
-// NO_PREF will not affect the variables related to delay 
scheduling
-if (maxLocality != TaskLocality.NO_PREF) {
-  currentLocalityIndex = getLocalityIndex(taskLocality)
-  lastLaunchTime = curTime
-}
-// Serialize and return the task
-val serializedTask: ByteBuffer = try {
-  ser.serialize(task)
-} catch {
-  // If the task cannot be serialized, then there's no point to 
re-attempt the task,
-  // as it will always fail. So just abort the whole task-set.
-  case NonFatal(e) =>
-val msg = s"Failed to serialize task $taskId, not attempting 
to retry it."
-logError(msg, e)
-abort(s"$msg Exception during serialization: $e")
-throw new TaskNotSerializableException(e)
-}
-if (serializedTask.limit > TaskSetManager.TASK_SIZE_TO_WARN_KB * 
1024 &&
-  !emittedTaskSizeWarning) {
-  emittedTaskSizeWarning = true
-  logWarning(s"Stage ${task.stageId} contains a task of very large 
size " +
-s"(${serializedTask.limit / 1024} KB). The maximum recommended 
task size is " +
-s"${TaskSetManager.TASK_SIZE_TO_WARN_KB} KB.")
-}
-addRunningTask(taskId)
-
-// We used to log the time it takes to serialize the task, but 
task size is already
-// a good proxy to task serialization time.
-// val timeTaken = clock.getTime() - startTime
-val taskName = s"task ${info.id} in stage ${taskSet.id}"
-logInfo(s"Starting $taskName (TID $taskId, $host, executor 
${info.executorId}, " +
-  s"partition ${task.partitionId}, $taskLocality, 
${serializedTask.limit} bytes)")
-
-sched.dagScheduler.taskStarted(task, info)
-new TaskDescription(
-  taskId,
-  attemptNum,
-  execId,
-  taskName,
-  index,
-  addedFiles,
-  addedJars,
-  task.localProperties,
-  serializedTask)
+  dequeueTask(execId, host, allowedLocality).map {
--- End diff --

yes. Will fix it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18950: [SPARK-20589][Core][Scheduler] Allow limiting tas...

2017-08-16 Thread dhruve
Github user dhruve commented on a diff in the pull request:

https://github.com/apache/spark/pull/18950#discussion_r133547673
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -602,6 +604,21 @@ private[spark] class ExecutorAllocationManager(
 // place the executors.
 private val stageIdToExecutorPlacementHints = new mutable.HashMap[Int, 
(Int, Map[String, Int])]
 
+override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
+  val jobGroupId = if (jobStart.properties != null) {
+jobStart.properties.getProperty(SparkContext.SPARK_JOB_GROUP_ID)
+  } else {
+""
+  }
+  val maxConcurrentTasks = 
conf.getInt(s"spark.job.$jobGroupId.maxConcurrentTasks",
+Int.MaxValue)
+
+  logInfo(s"Setting maximum concurrent tasks for group: ${jobGroupId} 
to $maxConcurrentTasks")
+  allocationManager.synchronized {
+allocationManager.maxConcurrentTasks = maxConcurrentTasks
--- End diff --

@markhamstra Thanks for the feedback. I missed the properties being 
serialized so we can have multiple job groups running simultaneously. I am 
working on changes to address your comments and will update the PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18950: [SPARK-20589][Core][Scheduler] Allow limiting tas...

2017-08-15 Thread dhruve
Github user dhruve commented on a diff in the pull request:

https://github.com/apache/spark/pull/18950#discussion_r133321125
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -602,6 +604,21 @@ private[spark] class ExecutorAllocationManager(
 // place the executors.
 private val stageIdToExecutorPlacementHints = new mutable.HashMap[Int, 
(Int, Map[String, Int])]
 
+override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
+  val jobGroupId = if (jobStart.properties != null) {
+jobStart.properties.getProperty(SparkContext.SPARK_JOB_GROUP_ID)
+  } else {
+""
+  }
+  val maxConcurrentTasks = 
conf.getInt(s"spark.job.$jobGroupId.maxConcurrentTasks",
+Int.MaxValue)
+
+  logInfo(s"Setting maximum concurrent tasks for group: ${jobGroupId} 
to $maxConcurrentTasks")
+  allocationManager.synchronized {
+allocationManager.maxConcurrentTasks = maxConcurrentTasks
--- End diff --

I am sorry if the config name caused the confusion. The limit is per 
jobGroup and not per job.
so we can really name it as 
`spark.jobGroup.[userJobGroup].maxConcurrentTasks`.

Also spark allows us to set only a single job group at any given point in 
time with a single spark context.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18950: [SPARK-20589][Core][Scheduler] Allow limiting task concu...

2017-08-15 Thread dhruve
Github user dhruve commented on the issue:

https://github.com/apache/spark/pull/18950
  
@kayousterhout @squito Can you review this PR ? Thanks. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18950: [SPARK-20589][Core][Scheduler] Allow limiting tas...

2017-08-15 Thread dhruve
GitHub user dhruve opened a pull request:

https://github.com/apache/spark/pull/18950

[SPARK-20589][Core][Scheduler] Allow limiting task concurrency per job group

## What changes were proposed in this pull request?
This change allows the user to specify the maximum no. of tasks running in 
a given job group. (Kindly see the jira comments section for more context on 
why this is implemented at a job group level rather than a stage level). This 
change is beneficial where the user wants to avoid having a DoS while trying to 
access an eternal service from multiple executors without having the need to 
repartition or coalesce existing RDDs.

This code change introduces a new user level configuration: 
`spark.job.[userJobGroup].maxConcurrentTasks` which is used to set the active 
no. of tasks executing at a given point in time.

The user can use the feature by setting the appropriate jobGroup and 
passing the conf:

`conf.set("spark.job.group1.maxConcurrentTasks", "10")`
`...`
`sc.setJobGroup("group1", "", false)`
`sc.parallelize(1 to 10, 10).map(x => x + 1).count`
`sc.clearJobGroup`

`

 changes proposed in this fix 
This change limits the no. of tasks (in turn also the no. of executors to 
be acquired) than can run simultaneously in a given job group and its 
subsequent job/s and stage/s if the appropriate job group and max concurrency 
configs are set.

## How was this patch tested?
Ran unit tests and multiple manual tests with various combinations of:
- single/multiple/no job groups
- executors with single/multi cores
- dynamic allocation on/off


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dhruve/spark impr/SPARK-20589

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/18950.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #18950


commit 824396c82977171c38ab5d7f6c0f84bc19eccaba
Author: Dhruve Ashar <dhruveas...@gmail.com>
Date:   2017-08-15T14:18:21Z

[SPARK-20589] Allow limiting task concurrency per stage

commit d3f8162dab4ca7065d7f296fd03528ce6ddfb923
Author: Dhruve Ashar <dhruveas...@gmail.com>
Date:   2017-08-15T14:45:18Z

Merge branch 'master' of github.com:apache/spark into impr/SPARK-20589




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18691: [SPARK-21243][Core] Limit no. of map outputs in a...

2017-07-22 Thread dhruve
Github user dhruve closed the pull request at:

https://github.com/apache/spark/pull/18691


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18691: [SPARK-21243][Core] Limit no. of map outputs in a shuffl...

2017-07-21 Thread dhruve
Github user dhruve commented on the issue:

https://github.com/apache/spark/pull/18691
  
Thanks @tgravescs Closing the PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18487: [SPARK-21243][Core] Limit no. of map outputs in a...

2017-07-21 Thread dhruve
Github user dhruve commented on a diff in the pull request:

https://github.com/apache/spark/pull/18487#discussion_r128793623
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala 
---
@@ -375,6 +390,7 @@ final class ShuffleBlockFetcherIterator(
   result match {
 case r @ SuccessFetchResult(blockId, address, size, buf, 
isNetworkReqDone) =>
   if (address != blockManager.blockManagerId) {
+numBlocksInFlightPerAddress(address) = 
numBlocksInFlightPerAddress(address) - 1
--- End diff --

@cloud-fan filed a JIRA for this => 
https://issues.apache.org/jira/browse/SPARK-21500


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18691: [SPARK-21243][Core] Limit no. of map outputs in a shuffl...

2017-07-21 Thread dhruve
Github user dhruve commented on the issue:

https://github.com/apache/spark/pull/18691
  
@cloud-fan @tgravescs I have resolved the merge conflicts for 2.2. This was 
just related to remove extra configs.  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18487: [SPARK-21243][Core] Limit no. of map outputs in a shuffl...

2017-07-20 Thread dhruve
Github user dhruve commented on the issue:

https://github.com/apache/spark/pull/18487
  
@tgravescs Thanks for merging this. I have created a PR for 2.2 
https://github.com/apache/spark/pull/18691

I had to remove a couple of newer config entries which landed while 
resolving a merge conflict.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18691: [SPARK-21243][Core] Limit no. of map outputs in a...

2017-07-20 Thread dhruve
GitHub user dhruve opened a pull request:

https://github.com/apache/spark/pull/18691

[SPARK-21243][Core] Limit no. of map outputs in a shuffle fetch

For configurations with external shuffle enabled, we have observed that if 
a very large no. of blocks are being fetched from a remote host, it puts the NM 
under extra pressure and can crash it. This change introduces a configuration 
`spark.reducer.maxBlocksInFlightPerAddress` , to limit the no. of map outputs 
being fetched from a given remote address. The changes applied here are 
applicable for both the scenarios - when external shuffle is enabled as well as 
disabled.

Ran the job with the default configuration which does not change the 
existing behavior and ran it with few configurations of lower values 
-10,20,50,100. The job ran fine and there is no change in the output. (I will 
update the metrics related to NM in some time.)

Author: Dhruve Ashar <dhruveas...@gmail.com>

Closes #18487 from dhruve/impr/SPARK-21243.

## What changes were proposed in this pull request?

(Please fill in changes proposed in this fix)

## How was this patch tested?

(Please explain how this patch was tested. E.g. unit tests, integration 
tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, 
remove this)

Please review http://spark.apache.org/contributing.html before opening a 
pull request.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dhruve/spark branch-2.2

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/18691.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #18691


commit 036eb17d32c1b9b0f76ab9c0c4a10f6248728476
Author: Dhruve Ashar <dhruveas...@gmail.com>
Date:   2017-07-19T20:53:28Z

[SPARK-21243][Core] Limit no. of map outputs in a shuffle fetch

For configurations with external shuffle enabled, we have observed that if 
a very large no. of blocks are being fetched from a remote host, it puts the NM 
under extra pressure and can crash it. This change introduces a configuration 
`spark.reducer.maxBlocksInFlightPerAddress` , to limit the no. of map outputs 
being fetched from a given remote address. The changes applied here are 
applicable for both the scenarios - when external shuffle is enabled as well as 
disabled.

Ran the job with the default configuration which does not change the 
existing behavior and ran it with few configurations of lower values 
-10,20,50,100. The job ran fine and there is no change in the output. (I will 
update the metrics related to NM in some time.)

Author: Dhruve Ashar <dhruveas...@gmail.com>

Closes #18487 from dhruve/impr/SPARK-21243.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18487: [SPARK-21243][Core] Limit no. of map outputs in a...

2017-07-20 Thread dhruve
Github user dhruve commented on a diff in the pull request:

https://github.com/apache/spark/pull/18487#discussion_r128572125
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala 
---
@@ -375,6 +390,7 @@ final class ShuffleBlockFetcherIterator(
   result match {
 case r @ SuccessFetchResult(blockId, address, size, buf, 
isNetworkReqDone) =>
   if (address != blockManager.blockManagerId) {
+numBlocksInFlightPerAddress(address) = 
numBlocksInFlightPerAddress(address) - 1
--- End diff --

That is a good point. Infact, we could also move the other bookkeeping 
stuff right after the fetch result is enqueued. 

I would also want to look at the initialization of the 
BlockFetchingListener to see the effects of this as it would increase the size 
of the closure. Can we have a separate JIRA filed for this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18487: [SPARK-21243][Core] Limit no. of map outputs in a shuffl...

2017-07-20 Thread dhruve
Github user dhruve commented on the issue:

https://github.com/apache/spark/pull/18487
  
@cloud-fan replied to your comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18487: [SPARK-21243][Core] Limit no. of map outputs in a...

2017-07-20 Thread dhruve
Github user dhruve commented on a diff in the pull request:

https://github.com/apache/spark/pull/18487#discussion_r128573651
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -321,6 +321,17 @@ package object config {
   .intConf
   .createWithDefault(3)
 
+  private[spark] val REDUCER_MAX_BLOCKS_IN_FLIGHT_PER_ADDRESS =
+ConfigBuilder("spark.reducer.maxBlocksInFlightPerAddress")
+  .doc("This configuration limits the number of remote blocks being 
fetched per reduce task" +
+" from a given host port. When a large number of blocks are being 
requested from a given" +
+" address in a single fetch or simultaneously, this could crash 
the serving executor or" +
+" Node Manager. This is especially useful to reduce the load on 
the Node Manager when" +
--- End diff --

If the shuffle service fails it can take down the Node Manager which is 
more severe and hence i have used it. And in the following sentence i have 
mentioned the external shuffle. If it is not clear, I am okay to change it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18487: [SPARK-21243][Core] Limit no. of map outputs in a...

2017-07-20 Thread dhruve
Github user dhruve commented on a diff in the pull request:

https://github.com/apache/spark/pull/18487#discussion_r128559318
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala 
---
@@ -443,12 +459,57 @@ final class ShuffleBlockFetcherIterator(
   }
 
   private def fetchUpToMaxBytes(): Unit = {
-// Send fetch requests up to maxBytesInFlight
-while (fetchRequests.nonEmpty &&
-  (bytesInFlight == 0 ||
-(reqsInFlight + 1 <= maxReqsInFlight &&
-  bytesInFlight + fetchRequests.front.size <= maxBytesInFlight))) {
-  sendRequest(fetchRequests.dequeue())
+// Send fetch requests up to maxBytesInFlight. If you cannot fetch 
from a remote host
+// immediately, defer the request until the next time it can be 
processed.
+
+// Process any outstanding deferred fetch requests if possible.
+if (deferredFetchRequests.nonEmpty) {
+  for ((remoteAddress, defReqQueue) <- deferredFetchRequests) {
+while (isRemoteBlockFetchable(defReqQueue) &&
+!isRemoteAddressMaxedOut(remoteAddress, defReqQueue.front)) {
+  val request = defReqQueue.dequeue()
+  logDebug(s"Processing deferred fetch request for $remoteAddress 
with "
++ s"${request.blocks.length} blocks")
+  send(remoteAddress, request)
+  if (defReqQueue.isEmpty) {
+deferredFetchRequests -= remoteAddress
+  }
+}
+  }
+}
+
+// Process any regular fetch requests if possible.
+while (isRemoteBlockFetchable(fetchRequests)) {
+  val request = fetchRequests.dequeue()
+  val remoteAddress = request.address
+  if (isRemoteAddressMaxedOut(remoteAddress, request)) {
+logDebug(s"Deferring fetch request for $remoteAddress with 
${request.blocks.size} blocks")
+val defReqQueue = deferredFetchRequests.getOrElse(remoteAddress, 
new Queue[FetchRequest]())
+defReqQueue.enqueue(request)
+deferredFetchRequests(remoteAddress) = defReqQueue
--- End diff --

If it is the first time that we want to defer a request, `defReqQueue` has 
to be associated with its corresponding '`remoteAddress`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18487: [SPARK-21243][Core] Limit no. of map outputs in a...

2017-07-20 Thread dhruve
Github user dhruve commented on a diff in the pull request:

https://github.com/apache/spark/pull/18487#discussion_r128557269
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala 
---
@@ -443,12 +459,57 @@ final class ShuffleBlockFetcherIterator(
   }
 
   private def fetchUpToMaxBytes(): Unit = {
-// Send fetch requests up to maxBytesInFlight
-while (fetchRequests.nonEmpty &&
-  (bytesInFlight == 0 ||
-(reqsInFlight + 1 <= maxReqsInFlight &&
-  bytesInFlight + fetchRequests.front.size <= maxBytesInFlight))) {
-  sendRequest(fetchRequests.dequeue())
+// Send fetch requests up to maxBytesInFlight. If you cannot fetch 
from a remote host
+// immediately, defer the request until the next time it can be 
processed.
+
+// Process any outstanding deferred fetch requests if possible.
+if (deferredFetchRequests.nonEmpty) {
+  for ((remoteAddress, defReqQueue) <- deferredFetchRequests) {
+while (isRemoteBlockFetchable(defReqQueue) &&
+!isRemoteAddressMaxedOut(remoteAddress, defReqQueue.front)) {
+  val request = defReqQueue.dequeue()
+  logDebug(s"Processing deferred fetch request for $remoteAddress 
with "
++ s"${request.blocks.length} blocks")
+  send(remoteAddress, request)
+  if (defReqQueue.isEmpty) {
+deferredFetchRequests -= remoteAddress
--- End diff --

We would have to unnecessarily iterate through the map for all the block 
manager ids for which we deferred fetch requests at an earlier point to check 
if they have any pending fetch requests when they don't.  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18487: [SPARK-21243][Core] Limit no. of map outputs in a...

2017-07-18 Thread dhruve
Github user dhruve commented on a diff in the pull request:

https://github.com/apache/spark/pull/18487#discussion_r127998722
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala 
---
@@ -277,11 +290,13 @@ final class ShuffleBlockFetcherIterator(
   } else if (size < 0) {
 throw new BlockException(blockId, "Negative block size " + 
size)
   }
-  if (curRequestSize >= targetRequestSize) {
+  if (curRequestSize >= targetRequestSize ||
+  curBlocks.size >= maxBlocksInFlightPerAddress) {
--- End diff --

We are already doing it here => 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L330


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   3   >