[GitHub] spark issue #23223: [SPARK-26269][YARN]Yarnallocator should have same blackl...

2018-12-09 Thread Ngone51
Github user Ngone51 commented on the issue:

https://github.com/apache/spark/pull/23223
  
Hi @tgravescs , I tried it, but found it's difficult to produce  
KILLED_BY_RESOURCEMANAGER exit status. I followed 
[YARN-73](https://issues.apache.org/jira/browse/YARN-73) 
[YARN-495](https://issues.apache.org/jira/browse/YARN-495), but things didn't 
go as I expected.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23223: [SPARK-26269][YARN]Yarnallocator should have same blackl...

2018-12-06 Thread Ngone51
Github user Ngone51 commented on the issue:

https://github.com/apache/spark/pull/23223
  
> it would be interesting to test it further to see if it does.

@tgravescs  Yeah, I have the same thought. I'd like to try it, but I can 
not guarantee that I can achieve it... Because I have never done this kind of 
test before. I'll try my best.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23223: [SPARK-26269][YARN]Yarnallocator should have same...

2018-12-05 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/23223#discussion_r239341126
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
 ---
@@ -612,11 +612,14 @@ private[yarn] class YarnAllocator(
 val message = "Container killed by YARN for exceeding physical 
memory limits. " +
   s"$diag Consider boosting ${EXECUTOR_MEMORY_OVERHEAD.key}."
 (true, message)
+  case exit_status if 
NOT_APP_AND_SYSTEM_FAULT_EXIT_STATUS.contains(exit_status) =>
--- End diff --

Updated. But I'm not sure about:

> That way values like ContainerExitStatus.SUCCESS from the set would be 
really used.

this part. @attilapiros 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23223: [SPARK-26269][YARN]Yarnallocator should have same...

2018-12-05 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/23223#discussion_r239316608
  
--- Diff: 
resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
 ---
@@ -417,4 +426,59 @@ class YarnAllocatorSuite extends SparkFunSuite with 
Matchers with BeforeAndAfter
 clock.advance(50 * 1000L)
 handler.getNumExecutorsFailed should be (0)
   }
+
+  test("SPARK-26296: YarnAllocator should have same blacklist behaviour 
with YARN") {
+val rmClientSpy = spy(rmClient)
+val maxExecutors = 11
+
+val handler = createAllocator(
+  maxExecutors,
+  rmClientSpy,
+  Map(
+"spark.yarn.blacklist.executor.launch.blacklisting.enabled" -> 
"true",
+"spark.blacklist.application.maxFailedExecutorsPerNode" -> "0"))
+handler.updateResourceRequests()
+
+val hosts = (0 until maxExecutors).map(i => s"host$i")
+val ids = (0 to maxExecutors).map(i => 
ContainerId.newContainerId(appAttemptId, i))
+val containers = createContainers(hosts, ids)
+handler.handleAllocatedContainers(containers.slice(0, 9))
+val cs0 = ContainerStatus.newInstance(containers(0).getId, 
ContainerState.COMPLETE,
+  "success", ContainerExitStatus.SUCCESS)
+val cs1 = ContainerStatus.newInstance(containers(1).getId, 
ContainerState.COMPLETE,
+  "preempted", ContainerExitStatus.PREEMPTED)
+val cs2 = ContainerStatus.newInstance(containers(2).getId, 
ContainerState.COMPLETE,
+  "killed_exceeded_vmem", ContainerExitStatus.KILLED_EXCEEDED_VMEM)
+val cs3 = ContainerStatus.newInstance(containers(3).getId, 
ContainerState.COMPLETE,
+  "killed_exceeded_pmem", ContainerExitStatus.KILLED_EXCEEDED_PMEM)
+val cs4 = ContainerStatus.newInstance(containers(4).getId, 
ContainerState.COMPLETE,
+  "killed_by_resourcemanager", 
ContainerExitStatus.KILLED_BY_RESOURCEMANAGER)
+val cs5 = ContainerStatus.newInstance(containers(5).getId, 
ContainerState.COMPLETE,
+  "killed_by_appmaster", ContainerExitStatus.KILLED_BY_APPMASTER)
+val cs6 = ContainerStatus.newInstance(containers(6).getId, 
ContainerState.COMPLETE,
+  "killed_after_app_completion", 
ContainerExitStatus.KILLED_AFTER_APP_COMPLETION)
+val cs7 = ContainerStatus.newInstance(containers(7).getId, 
ContainerState.COMPLETE,
+  "aborted", ContainerExitStatus.ABORTED)
+val cs8 = ContainerStatus.newInstance(containers(8).getId, 
ContainerState.COMPLETE,
+  "disk_failed", ContainerExitStatus.DISKS_FAILED)
--- End diff --

Nice suggestion!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23223: [SPARK-26269][YARN]Yarnallocator should have same...

2018-12-05 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/23223#discussion_r239316424
  
--- Diff: 
resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
 ---
@@ -114,13 +116,20 @@ class YarnAllocatorSuite extends SparkFunSuite with 
Matchers with BeforeAndAfter
   clock)
   }
 
-  def createContainer(host: String, resource: Resource = 
containerResource): Container = {
-val containerId = ContainerId.newContainerId(appAttemptId, 
containerNum)
+  def createContainer(
+  host: String,
+  containerId: ContainerId = ContainerId.newContainerId(appAttemptId, 
containerNum),
--- End diff --

Good idea.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23223: [SPARK-26269][YARN]Yarnallocator should have same blackl...

2018-12-05 Thread Ngone51
Github user Ngone51 commented on the issue:

https://github.com/apache/spark/pull/23223
  
> it looks like its only going to blacklist the node for the AM, not other 
nodes for general containers.

@squito Yarn have blacklist for AM when config 
`am-scheduling.node-blacklisting-enabled`=true, and have 
`ContainerFailureTracker`  for  general containers(haven't find a config for 
it).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23223: [SPARK-26269][YARN]Yarnallocator should have same blackl...

2018-12-05 Thread Ngone51
Github user Ngone51 commented on the issue:

https://github.com/apache/spark/pull/23223
  
> Are you seeing actual issues with this blacklisting when it shouldn't?

Unfortunately, no. @tgravescs @squito 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23223: [SPARK-26269][YARN]Yarnallocator should have same blackl...

2018-12-05 Thread Ngone51
Github user Ngone51 commented on the issue:

https://github.com/apache/spark/pull/23223
  
> I mean if node blacklisting in Spark would be perfectly aligned to YARN 
then it would be just redundant to have it in Spark in the first place. 

This change seems result in *perfectly* aligned to YARN for node 
blacklisting in Spark, but my original thought is that some exit status (e.g. 
KILLED_BY_RESOURCEMANAGER ),  currently, should not lead to a node 
blacklisting. So, actually, *perfectly* aligned to YARN is not the real target 
of this change, and we can also make some custom strategy for Spark.

> Take for example disk failure.

For spark  task level backlisting, is it should be delegated to 
**schedulerBlacklist** in YarnAllocatorBlacklistTracker ?

And it seems ContainerExitStatus.DISKS_FAILED in YARN is not same with 
Spark tasks' disk failure.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23223: Yarnallocator should have same blacklist behaviour with ...

2018-12-04 Thread Ngone51
Github user Ngone51 commented on the issue:

https://github.com/apache/spark/pull/23223
  
ping @attilapiros @vanzin @jerryshao for kindly review.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23223: Yarnallocator should have same blacklist behaviou...

2018-12-04 Thread Ngone51
GitHub user Ngone51 opened a pull request:

https://github.com/apache/spark/pull/23223

Yarnallocator should have same blacklist behaviour with yarn to maxmize use 
of cluster resource

## What changes were proposed in this pull request?

As I mentioned in jira 
[SPARK-26269](https://issues.apache.org/jira/browse/SPARK-26269), in order to 
maxmize the use of cluster resource,  this pr try to make `YarnAllocator` have 
the same blacklist behaviour with YARN.

## How was this patch tested?

Added.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/Ngone51/spark 
dev-YarnAllocator-should-have-same-blacklist-behaviour-with-YARN

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/23223.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #23223


commit 9f88e1c22876e4cdb1a0a6e952930e76f3206e96
Author: wuyi 
Date:   2018-12-04T16:17:35Z

YarnAllocator should have same blacklist behaviour with YARN

commit 65a70dcbb7993731104deab2592a5b969a31414e
Author: Ngone51 
Date:   2018-12-05T06:11:06Z

fix ut




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20269: [SPARK-23029] [DOCS] Specifying default units of ...

2018-12-02 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20269#discussion_r238135789
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -38,10 +38,13 @@ package object config {
 
ConfigBuilder("spark.driver.userClassPathFirst").booleanConf.createWithDefault(false)
 
   private[spark] val DRIVER_MEMORY = ConfigBuilder("spark.driver.memory")
+.doc("Amount of memory to use for the driver process, in MiB unless 
otherwise specified.")
 .bytesConf(ByteUnit.MiB)
 .createWithDefaultString("1g")
 
   private[spark] val DRIVER_MEMORY_OVERHEAD = 
ConfigBuilder("spark.driver.memoryOverhead")
+.doc("The amount of off-heap memory to be allocated per driver in 
cluster mode, " +
--- End diff --

Hi, @ferdonline , can you explain why this is  **off-heap** memory ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22288: [SPARK-22148][SPARK-15815][Scheduler] Acquire new execut...

2018-09-13 Thread Ngone51
Github user Ngone51 commented on the issue:

https://github.com/apache/spark/pull/22288
  
(I'm on a outside trip these days, so I have to use my mobile phone to type 
these words. Sorry for the format.)

> Is this the same as the current pr, but just killing only if idle?

Yes, simillar. This avoids a TaskSet to wait to be scheduled indefinitely. 
So, in case 2, if we do not find a idle executor before timeout, the TaskSet 
would abort, rather than hang.

> But in case 2, you'd probably end up requesting a whole bunch more 
executors very briefly, until there are enough failures on one specific task. 
or maybe we can ensure that even if there are a huge number of unschedulable 
tasks, we only ever request one extra executor?

I'm not sure I have understand this part totally. But I realized a fact 
that, by now, our DA' strategy is basically based on tasks' status, e.g. 
pending, specatulative. However, a executor whether to be blacklisted depends 
on a success TaskSet' status (IIRC). So this fact may introduce level mismatch 
when we want to introduce DA in TaskScheduleImpl. (hope I understood your main 
thought)




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22288: [SPARK-22148][SPARK-15815][Scheduler] Acquire new execut...

2018-09-11 Thread Ngone51
Github user Ngone51 commented on the issue:

https://github.com/apache/spark/pull/22288
  
As I mentioned at 
https://github.com/apache/spark/pull/22288#discussion_r216874530, I'm quite 
worry about this killing behaviour.  I thik we should kill a executor iff it is 
idle.

By looking through dissuction above, give my thoughts below:

* with dynamic allocation

Maybe, we can add `onTaskCompletelyBlacklisted()` method in DA manager's 
`Listener` and pass a e.g. `TaskCompletelyBlacklistedEvent` to it. Thus, DA 
manger will allocate new executor for us.

* with static allocation

Set `spark.scheduler.unschedulableTaskSetTimeout` for a `TaskSet`. If a 
task blacklisted completely, 
kill some executors iff they're idle (Maybe, taking executors' allocation 
time into acount here, we should increase timeout upperbound for a little for 
this `TaskSet`.).  Then, waiting until to be scheduled or timeout  


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22288: [SPARK-22148][SPARK-15815][Scheduler] Acquire new...

2018-09-11 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22288#discussion_r216874530
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -414,9 +425,48 @@ private[spark] class TaskSchedulerImpl(
 launchedAnyTask |= launchedTaskAtCurrentMaxLocality
   } while (launchedTaskAtCurrentMaxLocality)
 }
+
 if (!launchedAnyTask) {
-  taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
-}
+  taskSet.getCompletelyBlacklistedTaskIfAny(hostToExecutors) match 
{
+case taskIndex: Some[Int] => // Returns the taskIndex which 
was unschedulable
+
+  // If the taskSet is unschedulable we kill an existing 
blacklisted executor/s and
+  // kick off an abortTimer which after waiting will abort the 
taskSet if we were
+  // unable to schedule any task from the taskSet.
+  // Note: We keep a track of schedulability on a per taskSet 
basis rather than on a
+  // per task basis.
+  val executor = 
hostToExecutors.valuesIterator.next().iterator.next()
--- End diff --

I'm wondering is it worth to kill someone executor which has some tasks 
running on it ? After all, a task blaklisted on all executors(currently 
allocated) can not be guaranteed to run on a new allocated executor.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18492: [SPARK-19326] Speculated task attempts do not get...

2018-09-11 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/18492#discussion_r216597619
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -373,8 +373,14 @@ private[spark] class ExecutorAllocationManager(
 // If our target has not changed, do not send a message
 // to the cluster manager and reset our exponential growth
 if (delta == 0) {
-  numExecutorsToAdd = 1
-  return 0
+  // Check if there is any speculative jobs pending
+  if (listener.pendingTasks == 0 && listener.pendingSpeculativeTasks > 
0) {
+numExecutorsTarget =
+  math.max(math.min(maxNumExecutorsNeeded + 1, maxNumExecutors), 
minNumExecutors)
--- End diff --

Also confused by `+1` here. And I think we have already task 
`pendingSpeculativeTasks` into account @advancedxy :

```
def totalPendingTasks(): Int = {
  pendingTasks + pendingSpeculativeTasks
}
```
Seems this check is redundant.
And it doesn't sync to CM if `numExecutorsTarget ` change(after `+1`).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22288: [SPARK-22148][Scheduler] Acquire new executors to...

2018-09-03 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22288#discussion_r214720097
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -414,9 +425,54 @@ private[spark] class TaskSchedulerImpl(
 launchedAnyTask |= launchedTaskAtCurrentMaxLocality
   } while (launchedTaskAtCurrentMaxLocality)
 }
+
 if (!launchedAnyTask) {
-  taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
-}
+  taskSet.getCompletelyBlacklistedTaskIfAny(hostToExecutors) match 
{
+case taskIndex: Some[Int] => // Returns the taskIndex which 
was unschedulable
+  if (conf.getBoolean("spark.dynamicAllocation.enabled", 
false)) {
+// If the taskSet is unschedulable we kill the existing 
blacklisted executor/s and
+// kick off an abortTimer which after waiting will abort 
the taskSet if we were
+// unable to get new executors and couldn't schedule a 
task from the taskSet.
+// Note: We keep a track of schedulability on a per 
taskSet basis rather than on a
+// per task basis.
+if (!unschedulableTaskSetToExpiryTime.contains(taskSet)) {
+  hostToExecutors.valuesIterator.foreach(executors => 
executors.foreach({
+executor =>
+  logDebug("Killing executor because of task 
unschedulability: " + executor)
+  blacklistTrackerOpt.foreach(blt => 
blt.killBlacklistedExecutor(executor))
+  })
+  )
+  unschedulableTaskSetToExpiryTime(taskSet) = 
clock.getTimeMillis()
+  abortTimer.schedule(new TimerTask() {
+override def run() {
+  if 
(unschedulableTaskSetToExpiryTime.contains(taskSet) &&
+(unschedulableTaskSetToExpiryTime(taskSet)
+  + UNSCHEDULABLE_TASKSET_TIMEOUT_MS)
+  <= clock.getTimeMillis()
+  ) {
+logInfo("Cannot schedule any task because of 
complete blacklisting. " +
+  "Wait time for scheduling expired. Aborting the 
application.")
+
taskSet.abortSinceCompletelyBlacklisted(taskIndex.get)
+  } else {
+this.cancel()
+  }
+}
+  }, UNSCHEDULABLE_TASKSET_TIMEOUT_MS)
+}
+  } else {
+// TODO: try acquiring new executors for static allocation 
before aborting.
--- End diff --

How ? Waiting for other tasks finish and release resources ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22288: [SPARK-22148][Scheduler] Acquire new executors to...

2018-09-03 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22288#discussion_r214719743
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -414,9 +425,54 @@ private[spark] class TaskSchedulerImpl(
 launchedAnyTask |= launchedTaskAtCurrentMaxLocality
   } while (launchedTaskAtCurrentMaxLocality)
 }
+
 if (!launchedAnyTask) {
-  taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
-}
+  taskSet.getCompletelyBlacklistedTaskIfAny(hostToExecutors) match 
{
+case taskIndex: Some[Int] => // Returns the taskIndex which 
was unschedulable
+  if (conf.getBoolean("spark.dynamicAllocation.enabled", 
false)) {
+// If the taskSet is unschedulable we kill the existing 
blacklisted executor/s and
+// kick off an abortTimer which after waiting will abort 
the taskSet if we were
+// unable to get new executors and couldn't schedule a 
task from the taskSet.
+// Note: We keep a track of schedulability on a per 
taskSet basis rather than on a
+// per task basis.
+if (!unschedulableTaskSetToExpiryTime.contains(taskSet)) {
+  hostToExecutors.valuesIterator.foreach(executors => 
executors.foreach({
+executor =>
+  logDebug("Killing executor because of task 
unschedulability: " + executor)
+  blacklistTrackerOpt.foreach(blt => 
blt.killBlacklistedExecutor(executor))
--- End diff --

Seriously? You killed all executors ? What if other taskSets' tasks are 
running on them ?

BTW, if you want to refresh executors, you have to enable 
`spark.blacklist.killBlacklistedExecutors`  also.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22202: [SPARK-25211][Core] speculation and fetch failed result ...

2018-08-23 Thread Ngone51
Github user Ngone51 commented on the issue:

https://github.com/apache/spark/pull/22202
  
Since `stage 1` is only a `ShuffleMapStage`, so, why there're no other 
child stages to be submitted ?  


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22163: [SPARK-25166][CORE]Reduce the number of write ope...

2018-08-22 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22163#discussion_r212167438
  
--- Diff: 
core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java ---
@@ -206,14 +211,21 @@ private void writeSortedFile(boolean isLastFile) {
   long recordReadPosition = recordOffsetInPage + uaoSize; // skip over 
record length
   while (dataRemaining > 0) {
 final int toTransfer = Math.min(diskWriteBufferSize, 
dataRemaining);
-Platform.copyMemory(
-  recordPage, recordReadPosition, writeBuffer, 
Platform.BYTE_ARRAY_OFFSET, toTransfer);
-writer.write(writeBuffer, 0, toTransfer);
+if (bufferOffset > 0 && bufferOffset + toTransfer > 
DISK_WRITE_BUFFER_SIZE) {
--- End diff --

> The numRecordsWritten in DiskBlockObjectWriter is still correct during 
the process after this PR

The number is correct, but it is not consistent with what real happen 
compare to current behaviour.  But as you said, we will get correct result at 
the end. So, it may not be a big deal.  


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22163: [SPARK-25166][CORE]Reduce the number of write ope...

2018-08-22 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22163#discussion_r212163785
  
--- Diff: 
core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java ---
@@ -206,14 +211,21 @@ private void writeSortedFile(boolean isLastFile) {
   long recordReadPosition = recordOffsetInPage + uaoSize; // skip over 
record length
   while (dataRemaining > 0) {
 final int toTransfer = Math.min(diskWriteBufferSize, 
dataRemaining);
-Platform.copyMemory(
-  recordPage, recordReadPosition, writeBuffer, 
Platform.BYTE_ARRAY_OFFSET, toTransfer);
-writer.write(writeBuffer, 0, toTransfer);
+if (bufferOffset > 0 && bufferOffset + toTransfer > 
DISK_WRITE_BUFFER_SIZE) {
--- End diff --

Yeah, I agree there' s no difference as for final result. But 
`writeMetrics` in `DiskBlockObjectWriter` would be incorrect during the 
process. Not only `numRecordsWritten`, but also `_bytesWritten`(this could only 
be correctly counted when `writer.write()` is called. You can see 
`recordWritten#updateBytesWritten` for detail).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22163: [SPARK-25166][CORE]Reduce the number of write ope...

2018-08-22 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22163#discussion_r212160161
  
--- Diff: 
core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java ---
@@ -206,14 +211,21 @@ private void writeSortedFile(boolean isLastFile) {
   long recordReadPosition = recordOffsetInPage + uaoSize; // skip over 
record length
   while (dataRemaining > 0) {
 final int toTransfer = Math.min(diskWriteBufferSize, 
dataRemaining);
-Platform.copyMemory(
-  recordPage, recordReadPosition, writeBuffer, 
Platform.BYTE_ARRAY_OFFSET, toTransfer);
-writer.write(writeBuffer, 0, toTransfer);
+if (bufferOffset > 0 && bufferOffset + toTransfer > 
DISK_WRITE_BUFFER_SIZE) {
--- End diff --

Oh, I see. If so, I'm afraid you may have to change ` 
writer.recordWritten()`'s behaviour, which just count records one bye one right 
now.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22163: [SPARK-25166][CORE]Reduce the number of write ope...

2018-08-22 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22163#discussion_r211954019
  
--- Diff: 
core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java ---
@@ -206,14 +211,21 @@ private void writeSortedFile(boolean isLastFile) {
   long recordReadPosition = recordOffsetInPage + uaoSize; // skip over 
record length
   while (dataRemaining > 0) {
 final int toTransfer = Math.min(diskWriteBufferSize, 
dataRemaining);
-Platform.copyMemory(
-  recordPage, recordReadPosition, writeBuffer, 
Platform.BYTE_ARRAY_OFFSET, toTransfer);
-writer.write(writeBuffer, 0, toTransfer);
+if (bufferOffset > 0 && bufferOffset + toTransfer > 
DISK_WRITE_BUFFER_SIZE) {
--- End diff --

Not a bad idea, but codes here may not work as you expect. If we got a 
record with size `X` < `diskWriteBufferSize `(same as `DISK_WRITE_BUFFER_SIZE 
`), then we will only call `writer.write()` once. And if we got a record with 
size `Y` >= `diskWriteBufferSize `, then we will call `writer.write()` for  
(`Y` + `diskWriteBufferSize ` - 1)  / `diskWriteBufferSize`  times. And this do 
not change with the new code.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22001: [SPARK-24819][CORE] Fail fast when no enough slot...

2018-08-13 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22001#discussion_r209662081
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -402,6 +422,19 @@ class DAGScheduler(
 }
   }
 
+  /**
+   * Check whether the barrier stage requires more slots (to be able to 
launch all tasks in the
+   * barrier stage together) than the total number of active slots 
currently. Fail current check
+   * if trying to submit a barrier stage that requires more slots than 
current total number. If
+   * the check fails consecutively for three times for a job, then fail 
current job submission.
--- End diff --

Seems I do not find the code about `"consecutively for three times"`, but 
only `maxFailureNumTasksCheck ` ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22001: [SPARK-24819][CORE] Fail fast when no enough slot...

2018-08-13 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22001#discussion_r209658945
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -929,6 +955,28 @@ class DAGScheduler(
   // HadoopRDD whose underlying HDFS files have been deleted.
   finalStage = createResultStage(finalRDD, func, partitions, jobId, 
callSite)
 } catch {
+  case e: Exception if e.getMessage ==
+  
DAGScheduler.ERROR_MESSAGE_BARRIER_REQUIRE_MORE_SLOTS_THAN_CURRENT_TOTAL_NUMBER 
=>
+logWarning("The job requires to run a barrier stage that requires 
more slots than the " +
+  "total number of slots in the cluster currently.")
+jobIdToNumTasksCheckFailures.putIfAbsent(jobId, 0)
+val numCheckFailures = jobIdToNumTasksCheckFailures.get(jobId) + 1
--- End diff --

@kiszk IIUC, there's exactly only  one thread in `eventLoop`, so, the 
scenario mentioned above will not happen. And I even feel it is no need to use 
`ConcurrentHashMap` for `jobIdToNumTasksCheckFailures` at all. @jiangxb1987  


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21565: [SPARK-24558][Core]wrong Idle Timeout value is us...

2018-07-04 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21565#discussion_r200019612
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -488,9 +488,16 @@ private[spark] class ExecutorAllocationManager(
 newExecutorTotal = numExistingExecutors
 if (testing || executorsRemoved.nonEmpty) {
   executorsRemoved.foreach { removedExecutorId =>
+// If it is cachedBlcok timeout is configured using
+// spark.dynamicAllocation.cachedExecutorIdleTimeout
+val idleTimeout = if 
(blockManagerMaster.hasCachedBlocks(removedExecutorId)) {
--- End diff --

We do not maintain another HashMap, but alter its original structure.  In 
this way,  we do not need to issue extra rpc calls to `BlockManagerMaster` 
here. As you mentioned 'API', this thing happens after a rpc call happened.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21565: [SPARK-24558][Core]wrong Idle Timeout value is us...

2018-06-17 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21565#discussion_r195927590
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -488,9 +488,16 @@ private[spark] class ExecutorAllocationManager(
 newExecutorTotal = numExistingExecutors
 if (testing || executorsRemoved.nonEmpty) {
   executorsRemoved.foreach { removedExecutorId =>
+// If it is cachedBlcok timeout is configured using
+// spark.dynamicAllocation.cachedExecutorIdleTimeout
+val idleTimeout = if 
(blockManagerMaster.hasCachedBlocks(removedExecutorId)) {
--- End diff --

How about changing `removeTimes` to `HashMap[String, (Long, Boolean)]` (and 
the `Boolean` field indicates whether it is for cachedExecutor idle timeout or 
not) ? Thus, we do not need to ask `blockManagerMaster` again.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21486: [SPARK-24387][Core] Heartbeat-timeout executor is...

2018-06-11 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21486#discussion_r194606075
  
--- Diff: core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala ---
@@ -197,14 +197,14 @@ private[spark] class HeartbeatReceiver(sc: 
SparkContext, clock: Clock)
   if (now - lastSeenMs > executorTimeoutMs) {
 logWarning(s"Removing executor $executorId with no recent 
heartbeats: " +
   s"${now - lastSeenMs} ms exceeds timeout $executorTimeoutMs ms")
-scheduler.executorLost(executorId, SlaveLost("Executor heartbeat " 
+
-  s"timed out after ${now - lastSeenMs} ms"))
   // Asynchronously kill the executor to avoid blocking the 
current thread
 killExecutorThread.submit(new Runnable {
   override def run(): Unit = Utils.tryLogNonFatalError {
 // Note: we want to get an executor back after expiring this 
one,
 // so do not simply call `sc.killExecutor` here (SPARK-8119)
 sc.killAndReplaceExecutor(executorId)
--- End diff --

To be more specific, `killAndReplaceExecutor#killExecutors` will block 
until we get response from cluster manager or overtime after 120s (by default 
RPC timeout config).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21096: [SPARK-24011][CORE][WIP] cache rdd's immediate pa...

2018-06-11 Thread Ngone51
Github user Ngone51 closed the pull request at:

https://github.com/apache/spark/pull/21096


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20996: [SPARK-23884][CORE] hasLaunchedTask should be tru...

2018-06-11 Thread Ngone51
Github user Ngone51 closed the pull request at:

https://github.com/apache/spark/pull/20996


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21494: [WIP][SPARK-24375][Prototype] Support barrier scheduling

2018-06-06 Thread Ngone51
Github user Ngone51 commented on the issue:

https://github.com/apache/spark/pull/21494
  
Hi, @jiangxb1987 , can you explain more for what is `barrier scheduling` in 
spark and  elaborate an example which would only works with  `barrier 
scheduling`( but could not work under current spark schedule mechanism) for 
better understanding ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21486: [SPARK-24387][Core] Heartbeat-timeout executor is...

2018-06-03 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21486#discussion_r192592142
  
--- Diff: core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala 
---
@@ -207,6 +210,55 @@ class HeartbeatReceiverSuite
 assert(fakeClusterManager.getExecutorIdsToKill === Set(executorId1, 
executorId2))
   }
 
+  test("expired host should not be offered again") {
+scheduler = spy(new TaskSchedulerImpl(sc))
+scheduler.setDAGScheduler(sc.dagScheduler)
+when(sc.taskScheduler).thenReturn(scheduler)
+doReturn(true).when(scheduler).executorHeartbeatReceived(any(), any(), 
any())
+
+// Set up a fake backend and cluster manager to simulate killing 
executors
+val rpcEnv = sc.env.rpcEnv
+val fakeClusterManager = new FakeClusterManager(rpcEnv)
+val fakeClusterManagerRef = rpcEnv.setupEndpoint("fake-cm", 
fakeClusterManager)
+val fakeSchedulerBackend = new FakeSchedulerBackend(scheduler, rpcEnv, 
fakeClusterManagerRef)
+when(sc.schedulerBackend).thenReturn(fakeSchedulerBackend)
+
+fakeSchedulerBackend.start()
+val dummyExecutorEndpoint1 = new FakeExecutorEndpoint(rpcEnv)
+val dummyExecutorEndpointRef1 = 
rpcEnv.setupEndpoint("fake-executor-1", dummyExecutorEndpoint1)
+fakeSchedulerBackend.driverEndpoint.askSync[Boolean](
+  RegisterExecutor(executorId1, dummyExecutorEndpointRef1, "1.2.3.4", 
2, Map.empty))
+heartbeatReceiverRef.askSync[Boolean](TaskSchedulerIsSet)
+addExecutorAndVerify(executorId1)
+triggerHeartbeat(executorId1, executorShouldReregister = false)
+
+scheduler.initialize(fakeSchedulerBackend)
+sc.requestTotalExecutors(0, 0, Map.empty)
--- End diff --

why request 0 ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21486: [SPARK-24387][Core] Heartbeat-timeout executor is...

2018-06-03 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21486#discussion_r192592170
  
--- Diff: core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala 
---
@@ -207,6 +210,55 @@ class HeartbeatReceiverSuite
 assert(fakeClusterManager.getExecutorIdsToKill === Set(executorId1, 
executorId2))
   }
 
+  test("expired host should not be offered again") {
--- End diff --

Also, better to attach JIRA number.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21486: [SPARK-24387][Core] Heartbeat-timeout executor is...

2018-06-03 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21486#discussion_r192591845
  
--- Diff: core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala 
---
@@ -207,6 +210,55 @@ class HeartbeatReceiverSuite
 assert(fakeClusterManager.getExecutorIdsToKill === Set(executorId1, 
executorId2))
   }
 
+  test("expired host should not be offered again") {
--- End diff --

`host` or `executor` ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...

2018-05-29 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21440#discussion_r191623277
  
--- Diff: 
core/src/test/scala/org/apache/spark/io/ChunkedByteBufferFileRegionSuite.scala 
---
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io
+
+import java.nio.ByteBuffer
+import java.nio.channels.WritableByteChannel
+
+import scala.util.Random
+
+import org.mockito.Mockito.when
+import org.scalatest.BeforeAndAfterEach
+import org.scalatest.mockito.MockitoSugar
+
+import org.apache.spark.{SparkConf, SparkEnv, SparkFunSuite}
+import org.apache.spark.internal.config
+import org.apache.spark.util.io.ChunkedByteBuffer
+
+class ChunkedByteBufferFileRegionSuite extends SparkFunSuite with 
MockitoSugar
+with BeforeAndAfterEach {
+
+  override protected def beforeEach(): Unit = {
+super.beforeEach()
+val conf = new SparkConf()
+val env = mock[SparkEnv]
+SparkEnv.set(env)
+when(env.conf).thenReturn(conf)
+  }
+
+  override protected def afterEach(): Unit = {
+SparkEnv.set(null)
+  }
+
+  private def generateChunkByteBuffer(nChunks: Int, perChunk: Int): 
ChunkedByteBuffer = {
+val bytes = (0 until nChunks).map { chunkIdx =>
+  val bb = ByteBuffer.allocate(perChunk)
+  (0 until perChunk).foreach { idx =>
+bb.put((chunkIdx * perChunk + idx).toByte)
+  }
+  bb.position(0)
+  bb
+}.toArray
+new ChunkedByteBuffer(bytes)
+  }
+
+  test("transferTo can stop and resume correctly") {
+SparkEnv.get.conf.set(config.BUFFER_WRITE_CHUNK_SIZE, 9L)
+val cbb = generateChunkByteBuffer(4, 10)
+val fileRegion = cbb.toNetty
+
+val targetChannel = new LimitedWritableByteChannel(40)
+
+var pos = 0L
+// write the fileregion to the channel, but with the transfer limited 
at various spots along
+// the way.
+
+// limit to within the first chunk
+targetChannel.acceptNBytes = 5
+pos = fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 5)
+
+// a little bit further within the first chunk
+targetChannel.acceptNBytes = 2
+pos += fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 7)
+
+// past the first chunk, into the 2nd
+targetChannel.acceptNBytes = 6
+pos += fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 13)
+
+// right to the end of the 2nd chunk
+targetChannel.acceptNBytes = 7
+pos += fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 20)
+
+// rest of 2nd chunk, all of 3rd, some of 4th
+targetChannel.acceptNBytes = 15
+pos += fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 35)
+
+// now till the end
+targetChannel.acceptNBytes = 5
+pos += fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 40)
+
+// calling again at the end should be OK
+targetChannel.acceptNBytes = 20
+fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 40)
+  }
+
+  test(s"transfer to with random limits") {
+val rng = new Random()
+val seed = System.currentTimeMillis()
+logInfo(s"seed = $seed")
+rng.setSeed(seed)
+val chunkSize = 1e4.toInt
+SparkEnv.get.conf.set(config.BUFFER_WRITE_CHUNK_SIZE, 
rng.nextInt(chunkSize).toLong)
+
+val cbb = generateChunkByteBuffer(50, chunkSize)
+val fileRegion = cbb.toNetty
+val transferLimit = 1e5.toInt
+val targetChannel = new LimitedWritableByteChannel(transferLimit)
+while (targetChannel.pos < cbb.size) {
+  val nextTransferSize = rng.nextInt(transferLimi

[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...

2018-05-28 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21440#discussion_r191178697
  
--- Diff: 
core/src/test/scala/org/apache/spark/io/ChunkedByteBufferFileRegionSuite.scala 
---
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io
+
+import java.nio.ByteBuffer
+import java.nio.channels.WritableByteChannel
+
+import scala.util.Random
+
+import org.mockito.Mockito.when
+import org.scalatest.BeforeAndAfterEach
+import org.scalatest.mockito.MockitoSugar
+
+import org.apache.spark.{SparkConf, SparkEnv, SparkFunSuite}
+import org.apache.spark.internal.config
+import org.apache.spark.util.io.ChunkedByteBuffer
+
+class ChunkedByteBufferFileRegionSuite extends SparkFunSuite with 
MockitoSugar
+with BeforeAndAfterEach {
+
+  override protected def beforeEach(): Unit = {
+super.beforeEach()
+val conf = new SparkConf()
+val env = mock[SparkEnv]
+SparkEnv.set(env)
+when(env.conf).thenReturn(conf)
+  }
+
+  override protected def afterEach(): Unit = {
+SparkEnv.set(null)
+  }
+
+  private def generateChunkByteBuffer(nChunks: Int, perChunk: Int): 
ChunkedByteBuffer = {
+val bytes = (0 until nChunks).map { chunkIdx =>
+  val bb = ByteBuffer.allocate(perChunk)
+  (0 until perChunk).foreach { idx =>
+bb.put((chunkIdx * perChunk + idx).toByte)
+  }
+  bb.position(0)
+  bb
+}.toArray
+new ChunkedByteBuffer(bytes)
+  }
+
+  test("transferTo can stop and resume correctly") {
+SparkEnv.get.conf.set(config.BUFFER_WRITE_CHUNK_SIZE, 9L)
+val cbb = generateChunkByteBuffer(4, 10)
+val fileRegion = cbb.toNetty
+
+val targetChannel = new LimitedWritableByteChannel(40)
+
+var pos = 0L
+// write the fileregion to the channel, but with the transfer limited 
at various spots along
+// the way.
+
+// limit to within the first chunk
+targetChannel.acceptNBytes = 5
+pos = fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 5)
+
+// a little bit further within the first chunk
+targetChannel.acceptNBytes = 2
+pos += fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 7)
+
+// past the first chunk, into the 2nd
+targetChannel.acceptNBytes = 6
+pos += fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 13)
+
+// right to the end of the 2nd chunk
+targetChannel.acceptNBytes = 7
+pos += fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 20)
+
+// rest of 2nd chunk, all of 3rd, some of 4th
+targetChannel.acceptNBytes = 15
+pos += fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 35)
+
+// now till the end
+targetChannel.acceptNBytes = 5
+pos += fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 40)
+
+// calling again at the end should be OK
+targetChannel.acceptNBytes = 20
+fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 40)
+  }
+
+  test(s"transfer to with random limits") {
+val rng = new Random()
+val seed = System.currentTimeMillis()
+logInfo(s"seed = $seed")
+rng.setSeed(seed)
+val chunkSize = 1e4.toInt
+SparkEnv.get.conf.set(config.BUFFER_WRITE_CHUNK_SIZE, 
rng.nextInt(chunkSize).toLong)
+
+val cbb = generateChunkByteBuffer(50, chunkSize)
+val fileRegion = cbb.toNetty
+val transferLimit = 1e5.toInt
+val targetChannel = new LimitedWritableByteChannel(transferLimit)
+while (targetChannel.pos < cbb.size) {
+  val nextTransferSize = rng.nextInt(transferLimi

[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...

2018-05-28 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21440#discussion_r191182696
  
--- Diff: 
core/src/test/scala/org/apache/spark/io/ChunkedByteBufferFileRegionSuite.scala 
---
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io
+
+import java.nio.ByteBuffer
+import java.nio.channels.WritableByteChannel
+
+import scala.util.Random
+
+import org.mockito.Mockito.when
+import org.scalatest.BeforeAndAfterEach
+import org.scalatest.mockito.MockitoSugar
+
+import org.apache.spark.{SparkConf, SparkEnv, SparkFunSuite}
+import org.apache.spark.internal.config
+import org.apache.spark.util.io.ChunkedByteBuffer
+
+class ChunkedByteBufferFileRegionSuite extends SparkFunSuite with 
MockitoSugar
+with BeforeAndAfterEach {
+
+  override protected def beforeEach(): Unit = {
+super.beforeEach()
+val conf = new SparkConf()
+val env = mock[SparkEnv]
+SparkEnv.set(env)
+when(env.conf).thenReturn(conf)
+  }
+
+  override protected def afterEach(): Unit = {
+SparkEnv.set(null)
+  }
+
+  private def generateChunkByteBuffer(nChunks: Int, perChunk: Int): 
ChunkedByteBuffer = {
+val bytes = (0 until nChunks).map { chunkIdx =>
+  val bb = ByteBuffer.allocate(perChunk)
+  (0 until perChunk).foreach { idx =>
+bb.put((chunkIdx * perChunk + idx).toByte)
+  }
+  bb.position(0)
+  bb
+}.toArray
+new ChunkedByteBuffer(bytes)
+  }
+
+  test("transferTo can stop and resume correctly") {
+SparkEnv.get.conf.set(config.BUFFER_WRITE_CHUNK_SIZE, 9L)
+val cbb = generateChunkByteBuffer(4, 10)
+val fileRegion = cbb.toNetty
+
+val targetChannel = new LimitedWritableByteChannel(40)
+
+var pos = 0L
+// write the fileregion to the channel, but with the transfer limited 
at various spots along
+// the way.
+
+// limit to within the first chunk
+targetChannel.acceptNBytes = 5
+pos = fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 5)
+
+// a little bit further within the first chunk
+targetChannel.acceptNBytes = 2
+pos += fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 7)
+
+// past the first chunk, into the 2nd
+targetChannel.acceptNBytes = 6
+pos += fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 13)
+
+// right to the end of the 2nd chunk
+targetChannel.acceptNBytes = 7
+pos += fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 20)
+
+// rest of 2nd chunk, all of 3rd, some of 4th
+targetChannel.acceptNBytes = 15
+pos += fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 35)
+
+// now till the end
+targetChannel.acceptNBytes = 5
+pos += fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 40)
+
+// calling again at the end should be OK
+targetChannel.acceptNBytes = 20
+fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 40)
+  }
+
+  test(s"transfer to with random limits") {
+val rng = new Random()
+val seed = System.currentTimeMillis()
+logInfo(s"seed = $seed")
+rng.setSeed(seed)
+val chunkSize = 1e4.toInt
+SparkEnv.get.conf.set(config.BUFFER_WRITE_CHUNK_SIZE, 
rng.nextInt(chunkSize).toLong)
+
+val cbb = generateChunkByteBuffer(50, chunkSize)
+val fileRegion = cbb.toNetty
+val transferLimit = 1e5.toInt
+val targetChannel = new LimitedWritableByteChannel(transferLimit)
+while (targetChannel.pos < cbb.size) {
+  val nextTransferSize = rng.nextInt(transferLimi

[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...

2018-05-28 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21440#discussion_r191176828
  
--- Diff: 
core/src/test/scala/org/apache/spark/io/ChunkedByteBufferFileRegionSuite.scala 
---
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io
+
+import java.nio.ByteBuffer
+import java.nio.channels.WritableByteChannel
+
+import scala.util.Random
+
+import org.mockito.Mockito.when
+import org.scalatest.BeforeAndAfterEach
+import org.scalatest.mockito.MockitoSugar
+
+import org.apache.spark.{SparkConf, SparkEnv, SparkFunSuite}
+import org.apache.spark.internal.config
+import org.apache.spark.util.io.ChunkedByteBuffer
+
+class ChunkedByteBufferFileRegionSuite extends SparkFunSuite with 
MockitoSugar
+with BeforeAndAfterEach {
+
+  override protected def beforeEach(): Unit = {
+super.beforeEach()
+val conf = new SparkConf()
+val env = mock[SparkEnv]
+SparkEnv.set(env)
+when(env.conf).thenReturn(conf)
+  }
+
+  override protected def afterEach(): Unit = {
+SparkEnv.set(null)
+  }
+
+  private def generateChunkByteBuffer(nChunks: Int, perChunk: Int): 
ChunkedByteBuffer = {
--- End diff --

nit: generateChunk**ed**ByteBuffer


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...

2018-05-28 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21440#discussion_r191175242
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/io/ChunkedByteBufferFileRegion.scala 
---
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.util.io
+
+import java.nio.channels.WritableByteChannel
+
+import io.netty.channel.FileRegion
+import io.netty.util.AbstractReferenceCounted
+
+import org.apache.spark.internal.Logging
+
+
+/**
+ * This exposes a ChunkedByteBuffer as a netty FileRegion, just to allow 
sending > 2gb in one netty
+ * message.   This is because netty cannot send a ByteBuf > 2g, but it can 
send a large FileRegion,
+ * even though the data is not backed by a file.
+ */
+private[io] class ChunkedByteBufferFileRegion(
+val chunkedByteBuffer: ChunkedByteBuffer,
+val ioChunkSize: Int) extends AbstractReferenceCounted with FileRegion 
with Logging {
+
+  private var _transferred: Long = 0
+  // this duplicates the original chunks, so we're free to modify the 
position, limit, etc.
+  private val chunks = chunkedByteBuffer.getChunks()
+  private val cumLength = chunks.scanLeft(0L) { _ + _.remaining()}
+  private val size = cumLength.last
+  // Chunk size in bytes
+
+  protected def deallocate: Unit = {}
+
+  override def count(): Long = chunkedByteBuffer.size
+
+  // this is the "start position" of the overall Data in the backing file, 
not our current position
+  override def position(): Long = 0
+
+  override def transferred(): Long = _transferred
+
+  override def transfered(): Long = _transferred
+
+  override def touch(): ChunkedByteBufferFileRegion = this
+
+  override def touch(hint: Object): ChunkedByteBufferFileRegion = this
+
+  override def retain(): FileRegion = {
+super.retain()
+this
+  }
+
+  override def retain(increment: Int): FileRegion = {
+super.retain(increment)
+this
+  }
+
+  private var currentChunkIdx = 0
+
+  def transferTo(target: WritableByteChannel, position: Long): Long = {
+assert(position == _transferred)
+if (position == size) return 0L
+var keepGoing = true
+var written = 0L
+var currentChunk = chunks(currentChunkIdx)
+var originalLimit = currentChunk.limit()
--- End diff --

Seems it is unused.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...

2018-05-28 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21440#discussion_r191175890
  
--- Diff: 
core/src/test/scala/org/apache/spark/io/ChunkedByteBufferFileRegionSuite.scala 
---
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io
+
+import java.nio.ByteBuffer
+import java.nio.channels.WritableByteChannel
+
+import scala.util.Random
+
+import org.mockito.Mockito.when
+import org.scalatest.BeforeAndAfterEach
+import org.scalatest.mockito.MockitoSugar
+
+import org.apache.spark.{SparkConf, SparkEnv, SparkFunSuite}
+import org.apache.spark.internal.config
+import org.apache.spark.util.io.ChunkedByteBuffer
+
+class ChunkedByteBufferFileRegionSuite extends SparkFunSuite with 
MockitoSugar
+with BeforeAndAfterEach {
+
+  override protected def beforeEach(): Unit = {
+super.beforeEach()
+val conf = new SparkConf()
+val env = mock[SparkEnv]
+SparkEnv.set(env)
+when(env.conf).thenReturn(conf)
+  }
+
+  override protected def afterEach(): Unit = {
+SparkEnv.set(null)
+  }
+
+  private def generateChunkByteBuffer(nChunks: Int, perChunk: Int): 
ChunkedByteBuffer = {
+val bytes = (0 until nChunks).map { chunkIdx =>
+  val bb = ByteBuffer.allocate(perChunk)
+  (0 until perChunk).foreach { idx =>
+bb.put((chunkIdx * perChunk + idx).toByte)
+  }
+  bb.position(0)
+  bb
+}.toArray
+new ChunkedByteBuffer(bytes)
+  }
+
+  test("transferTo can stop and resume correctly") {
+SparkEnv.get.conf.set(config.BUFFER_WRITE_CHUNK_SIZE, 9L)
+val cbb = generateChunkByteBuffer(4, 10)
+val fileRegion = cbb.toNetty
+
+val targetChannel = new LimitedWritableByteChannel(40)
+
+var pos = 0L
+// write the fileregion to the channel, but with the transfer limited 
at various spots along
+// the way.
+
+// limit to within the first chunk
+targetChannel.acceptNBytes = 5
+pos = fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 5)
+
+// a little bit further within the first chunk
+targetChannel.acceptNBytes = 2
+pos += fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 7)
+
+// past the first chunk, into the 2nd
+targetChannel.acceptNBytes = 6
+pos += fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 13)
+
+// right to the end of the 2nd chunk
+targetChannel.acceptNBytes = 7
+pos += fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 20)
+
+// rest of 2nd chunk, all of 3rd, some of 4th
+targetChannel.acceptNBytes = 15
+pos += fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 35)
+
+// now till the end
+targetChannel.acceptNBytes = 5
+pos += fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 40)
+
+// calling again at the end should be OK
+targetChannel.acceptNBytes = 20
+fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 40)
+  }
+
+  test(s"transfer to with random limits") {
+val rng = new Random()
+val seed = System.currentTimeMillis()
+logInfo(s"seed = $seed")
+rng.setSeed(seed)
+val chunkSize = 1e4.toInt
+SparkEnv.get.conf.set(config.BUFFER_WRITE_CHUNK_SIZE, 
rng.nextInt(chunkSize).toLong)
+
+val cbb = generateChunkByteBuffer(50, chunkSize)
+val fileRegion = cbb.toNetty
+val transferLimit = 1e5.toInt
+val targetChannel = new LimitedWritableByteChannel(transferLimit)
+while (targetChannel.pos < cbb.size) {
+  val nextTransferSize = rng.nextInt(transferLimi

[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...

2018-05-28 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21440#discussion_r191117686
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/io/ChunkedByteBufferFileRegion.scala 
---
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.util.io
+
+import java.nio.channels.WritableByteChannel
+
+import io.netty.channel.FileRegion
+import io.netty.util.AbstractReferenceCounted
+
+import org.apache.spark.internal.Logging
+
+
+/**
+ * This exposes a ChunkedByteBuffer as a netty FileRegion, just to allow 
sending > 2gb in one netty
+ * message.   This is because netty cannot send a ByteBuf > 2g, but it can 
send a large FileRegion,
+ * even though the data is not backed by a file.
+ */
+private[io] class ChunkedByteBufferFileRegion(
+val chunkedByteBuffer: ChunkedByteBuffer,
+val ioChunkSize: Int) extends AbstractReferenceCounted with FileRegion 
with Logging {
+
+  private var _transferred: Long = 0
+  // this duplicates the original chunks, so we're free to modify the 
position, limit, etc.
+  private val chunks = chunkedByteBuffer.getChunks()
+  private val cumLength = chunks.scanLeft(0L) { _ + _.remaining()}
+  private val size = cumLength.last
+  // Chunk size in bytes
+
+  protected def deallocate: Unit = {}
+
+  override def count(): Long = chunkedByteBuffer.size
--- End diff --

What's the difference between `size` and `count`? Should `count` indicates 
the rest data's size can be transfered ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...

2018-05-28 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21440#discussion_r191175960
  
--- Diff: 
core/src/test/scala/org/apache/spark/io/ChunkedByteBufferFileRegionSuite.scala 
---
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io
+
+import java.nio.ByteBuffer
+import java.nio.channels.WritableByteChannel
+
+import scala.util.Random
+
+import org.mockito.Mockito.when
+import org.scalatest.BeforeAndAfterEach
+import org.scalatest.mockito.MockitoSugar
+
+import org.apache.spark.{SparkConf, SparkEnv, SparkFunSuite}
+import org.apache.spark.internal.config
+import org.apache.spark.util.io.ChunkedByteBuffer
+
+class ChunkedByteBufferFileRegionSuite extends SparkFunSuite with 
MockitoSugar
+with BeforeAndAfterEach {
+
+  override protected def beforeEach(): Unit = {
+super.beforeEach()
+val conf = new SparkConf()
+val env = mock[SparkEnv]
+SparkEnv.set(env)
+when(env.conf).thenReturn(conf)
+  }
+
+  override protected def afterEach(): Unit = {
+SparkEnv.set(null)
+  }
+
+  private def generateChunkByteBuffer(nChunks: Int, perChunk: Int): 
ChunkedByteBuffer = {
+val bytes = (0 until nChunks).map { chunkIdx =>
+  val bb = ByteBuffer.allocate(perChunk)
+  (0 until perChunk).foreach { idx =>
+bb.put((chunkIdx * perChunk + idx).toByte)
+  }
+  bb.position(0)
+  bb
+}.toArray
+new ChunkedByteBuffer(bytes)
+  }
+
+  test("transferTo can stop and resume correctly") {
+SparkEnv.get.conf.set(config.BUFFER_WRITE_CHUNK_SIZE, 9L)
+val cbb = generateChunkByteBuffer(4, 10)
+val fileRegion = cbb.toNetty
+
+val targetChannel = new LimitedWritableByteChannel(40)
+
+var pos = 0L
+// write the fileregion to the channel, but with the transfer limited 
at various spots along
+// the way.
+
+// limit to within the first chunk
+targetChannel.acceptNBytes = 5
+pos = fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 5)
+
+// a little bit further within the first chunk
+targetChannel.acceptNBytes = 2
+pos += fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 7)
+
+// past the first chunk, into the 2nd
+targetChannel.acceptNBytes = 6
+pos += fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 13)
+
+// right to the end of the 2nd chunk
+targetChannel.acceptNBytes = 7
+pos += fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 20)
+
+// rest of 2nd chunk, all of 3rd, some of 4th
+targetChannel.acceptNBytes = 15
+pos += fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 35)
+
+// now till the end
+targetChannel.acceptNBytes = 5
+pos += fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 40)
+
+// calling again at the end should be OK
+targetChannel.acceptNBytes = 20
+fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 40)
+  }
+
+  test(s"transfer to with random limits") {
+val rng = new Random()
+val seed = System.currentTimeMillis()
+logInfo(s"seed = $seed")
+rng.setSeed(seed)
+val chunkSize = 1e4.toInt
+SparkEnv.get.conf.set(config.BUFFER_WRITE_CHUNK_SIZE, 
rng.nextInt(chunkSize).toLong)
+
+val cbb = generateChunkByteBuffer(50, chunkSize)
+val fileRegion = cbb.toNetty
+val transferLimit = 1e5.toInt
+val targetChannel = new LimitedWritableByteChannel(transferLimit)
+while (targetChannel.pos < cbb.size) {
+  val nextTransferSize = rng.nextInt(transferLimi

[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...

2018-05-28 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21440#discussion_r191104760
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/io/ChunkedByteBufferFileRegion.scala 
---
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.util.io
+
+import java.nio.channels.WritableByteChannel
+
+import io.netty.channel.FileRegion
+import io.netty.util.AbstractReferenceCounted
+
+import org.apache.spark.internal.Logging
+
+
+/**
+ * This exposes a ChunkedByteBuffer as a netty FileRegion, just to allow 
sending > 2gb in one netty
+ * message.   This is because netty cannot send a ByteBuf > 2g, but it can 
send a large FileRegion,
+ * even though the data is not backed by a file.
+ */
+private[io] class ChunkedByteBufferFileRegion(
+val chunkedByteBuffer: ChunkedByteBuffer,
+val ioChunkSize: Int) extends AbstractReferenceCounted with FileRegion 
with Logging {
+
+  private var _transferred: Long = 0
+  // this duplicates the original chunks, so we're free to modify the 
position, limit, etc.
+  private val chunks = chunkedByteBuffer.getChunks()
+  private val cumLength = chunks.scanLeft(0L) { _ + _.remaining()}
+  private val size = cumLength.last
+  // Chunk size in bytes
--- End diff --

Should this comment be moved above last line ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...

2018-05-22 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21369#discussion_r189939603
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
 ---
@@ -267,7 +273,7 @@ class ExternalAppendOnlyMap[K, V, C](
*/
   def destructiveIterator(inMemoryIterator: Iterator[(K, C)]): 
Iterator[(K, C)] = {
 readingIterator = new SpillableIterator(inMemoryIterator)
-readingIterator
+readingIterator.toCompletionIterator
--- End diff --

`destructiveIterator` should  just return a destructive iterator 
(especially for map buffer) as it's function name implies, and it it none 
business of `CompletionIterator `. And developers should be free to define the 
complete function for the returned destructive iterator, in case of we want a 
different one somewhere else in future.

 > Your suggested codes does exactly the same but is less streamlined 

I don't think this little change will pay a huge influence on `streamlined 
`.

> and relies on an intermediate value (fortunately it's already a member 
variable)

The current fix leads to this, not me. And even this variable is not a 
member variable, we can define a temp local variable. It's not a big deal.






---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...

2018-05-22 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21369#discussion_r189894423
  
--- Diff: 
core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
 ---
@@ -414,6 +415,99 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite 
with LocalSparkContext {
 sc.stop()
   }
 
+  test("spill during iteration") {
--- End diff --

I understand what this test want to do. But it seems code without this PR 
could also pass it if everything goes normally. And I know it's a little hard 
to reflect the change by unit test. Also, I'd prefer to leave some comments to 
explain the potential memory leak in source code above.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...

2018-05-22 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21369#discussion_r189892444
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
 ---
@@ -267,7 +273,7 @@ class ExternalAppendOnlyMap[K, V, C](
*/
   def destructiveIterator(inMemoryIterator: Iterator[(K, C)]): 
Iterator[(K, C)] = {
 readingIterator = new SpillableIterator(inMemoryIterator)
-readingIterator
+readingIterator.toCompletionIterator
--- End diff --

This change the original behavior of `destructiveIterator `. I'd prefer do 
like this:

```
CompletionIterator[(K, C), Iterator[(K, C)]](
destructiveIterator(currentMap.iterator), readingIterator.destroy)
```
which keep compatibility with current code, and do not introduce 
unnecessary function.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...

2018-05-22 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21369#discussion_r189892547
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
 ---
@@ -585,17 +591,25 @@ class ExternalAppendOnlyMap[K, V, C](
   } else {
 logInfo(s"Task ${context.taskAttemptId} force spilling in-memory 
map to disk and " +
   s"it will release 
${org.apache.spark.util.Utils.bytesToString(getUsed())} memory")
-nextUpstream = spillMemoryIteratorToDisk(upstream)
+val nextUpstream = spillMemoryIteratorToDisk(upstream)
+assert(!upstream.hasNext)
 hasSpilled = true
+upstream = nextUpstream
 true
   }
 }
 
+private def destroy() : Unit = {
+  freeCurrentMap()
+  upstream = Iterator.empty
--- End diff --

Why `empy`, not `null`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks when spi...

2018-05-19 Thread Ngone51
Github user Ngone51 commented on the issue:

https://github.com/apache/spark/pull/21369
  
cc @JerryLead


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...

2018-05-19 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21369#discussion_r189438190
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
 ---
@@ -585,17 +592,15 @@ class ExternalAppendOnlyMap[K, V, C](
   } else {
 logInfo(s"Task ${context.taskAttemptId} force spilling in-memory 
map to disk and " +
   s"it will release 
${org.apache.spark.util.Utils.bytesToString(getUsed())} memory")
-nextUpstream = spillMemoryIteratorToDisk(upstream)
+val nextUpstream = spillMemoryIteratorToDisk(upstream)
+assert(!upstream.hasNext)
 hasSpilled = true
+upstream = nextUpstream
--- End diff --

Does the change means we should reassign `upstream` (which eliminates 
reference to `currentMap`) after  spill **immediately**, otherwise,  we may hit 
OOM (e.g. never `readNext()` after spill - is this the real cause for JIRA 
issue?) ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-05-13 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r187825561
  
--- Diff: core/src/main/scala/org/apache/spark/Heartbeater.scala ---
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.spark.util.{ThreadUtils, Utils}
+
+/**
+ * Creates a heartbeat thread which will call the specified 
reportHeartbeat function at
+ * intervals of intervalMs.
+ *
+ * @param reportHeartbeat the heartbeat reporting function to call.
+ * @param intervalMs the interval between heartbeats.
+ */
+private[spark] class Heartbeater(reportHeartbeat: () => Unit, intervalMs: 
Long) {
+  // Executor for the heartbeat task
+  private val heartbeater = 
ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-heartbeater")
--- End diff --

"pass in the name to the constructor" is better(if we do need to do this 
for the driver)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-05-13 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r187824094
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1753,9 +1766,21 @@ class DAGScheduler(
 messageScheduler.shutdownNow()
 eventProcessLoop.stop()
 taskScheduler.stop()
+heartbeater.stop()
+  }
+
+  /** Reports heartbeat metrics for the driver. */
+  private def reportHeartBeat(): Unit = {
--- End diff --

> With cluster mode, including YARN, there isn't a local executor, so the 
metrics for the driver would not be collected.

Yes. But the problem is can we use `executor`'s  
`getCurrentExecutorMetrics()` method for collecting memory metrics for `driver` 
? IIRC, `driver`  do not acqurie memory from execution memory pool at least.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-05-13 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r187823298
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala ---
@@ -169,6 +179,27 @@ private[spark] class EventLoggingListener(
 
   // Events that trigger a flush
   override def onStageCompleted(event: SparkListenerStageCompleted): Unit 
= {
+// log the peak executor metrics for the stage, for each executor
+val accumUpdates = new ArrayBuffer[(Long, Int, Int, 
Seq[AccumulableInfo])]()
+val executorMap = liveStageExecutorMetrics.remove(
+  (event.stageInfo.stageId, event.stageInfo.attemptNumber()))
+executorMap.foreach {
+  executorEntry => {
+for ((executorId, peakExecutorMetrics) <- executorEntry) {
--- End diff --

I revisited the code, I think you're right. My mistake, sorry.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-05-10 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r187248156
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala ---
@@ -93,6 +94,10 @@ private[spark] class EventLoggingListener(
   // Visible for tests only.
   private[scheduler] val logPath = getLogPath(logBaseDir, appId, 
appAttemptId, compressionCodecName)
 
+  // map of live stages, to peak executor metrics for the stage
+  private val liveStageExecutorMetrics = mutable.HashMap[(Int, Int),
--- End diff --

Why we should track executor's memory metrics for each stage?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-05-10 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r187236701
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1753,9 +1766,21 @@ class DAGScheduler(
 messageScheduler.shutdownNow()
 eventProcessLoop.stop()
 taskScheduler.stop()
+heartbeater.stop()
+  }
+
+  /** Reports heartbeat metrics for the driver. */
+  private def reportHeartBeat(): Unit = {
--- End diff --

Why we need this for `driver` ? If spark run in local mode, there's a local 
`executor`, which will report heartbeat.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-05-10 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r187239219
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala ---
@@ -169,6 +179,27 @@ private[spark] class EventLoggingListener(
 
   // Events that trigger a flush
   override def onStageCompleted(event: SparkListenerStageCompleted): Unit 
= {
+// log the peak executor metrics for the stage, for each executor
+val accumUpdates = new ArrayBuffer[(Long, Int, Int, 
Seq[AccumulableInfo])]()
+val executorMap = liveStageExecutorMetrics.remove(
+  (event.stageInfo.stageId, event.stageInfo.attemptNumber()))
+executorMap.foreach {
+  executorEntry => {
+for ((executorId, peakExecutorMetrics) <- executorEntry) {
--- End diff --

How about `case (executorId, peakExecutorMetrics) =>` ? It would be more 
readable.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-05-10 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r187244792
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala ---
@@ -169,6 +179,27 @@ private[spark] class EventLoggingListener(
 
   // Events that trigger a flush
   override def onStageCompleted(event: SparkListenerStageCompleted): Unit 
= {
+// log the peak executor metrics for the stage, for each executor
+val accumUpdates = new ArrayBuffer[(Long, Int, Int, 
Seq[AccumulableInfo])]()
+val executorMap = liveStageExecutorMetrics.remove(
--- End diff --

Do we always post a `SparkListenerStageCompleted` event for failed satges 
(I can't rememer clearly)? If not, I think we should clean up other attempts of 
the same stage here.  


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-05-10 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r187238940
  
--- Diff: core/src/main/scala/org/apache/spark/Heartbeater.scala ---
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.spark.util.{ThreadUtils, Utils}
+
+/**
+ * Creates a heartbeat thread which will call the specified 
reportHeartbeat function at
+ * intervals of intervalMs.
+ *
+ * @param reportHeartbeat the heartbeat reporting function to call.
+ * @param intervalMs the interval between heartbeats.
+ */
+private[spark] class Heartbeater(reportHeartbeat: () => Unit, intervalMs: 
Long) {
+  // Executor for the heartbeat task
+  private val heartbeater = 
ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-heartbeater")
--- End diff --

I'm wondering should the prefix name of heartbeater thread be 
`"executor-heartbeater"` ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-05-10 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r187247534
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/PeakExecutorMetrics.scala ---
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import org.apache.spark.executor.ExecutorMetrics
+import org.apache.spark.status.api.v1.PeakMemoryMetrics
+
+/**
+ * Records the peak values for executor level metrics. If 
jvmUsedHeapMemory is -1, then no
+ * values have been recorded yet.
+ */
+private[spark] class PeakExecutorMetrics {
--- End diff --

Do we really need this class? It seems `ExecutorMetrics` can already do the 
same work.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21209: [SPARK-24141][CORE] Fix bug in CoarseGrainedSchedulerBac...

2018-05-09 Thread Ngone51
Github user Ngone51 commented on the issue:

https://github.com/apache/spark/pull/21209
  
Thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20604: [SPARK-23365][CORE] Do not adjust num executors w...

2018-05-07 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20604#discussion_r186425765
  
--- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala ---
@@ -1643,7 +1646,10 @@ class SparkContext(config: SparkConf) extends 
Logging {
   def killExecutors(executorIds: Seq[String]): Boolean = {
 schedulerBackend match {
   case b: ExecutorAllocationClient =>
-b.killExecutors(executorIds, replace = false, force = 
true).nonEmpty
+require(executorAllocationManager.isEmpty,
--- End diff --

@squito any thoughts?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21096: [SPARK-24011][CORE][WIP] cache rdd's immediate parent Sh...

2018-05-07 Thread Ngone51
Github user Ngone51 commented on the issue:

https://github.com/apache/spark/pull/21096
  
Thanks for your opinions @squito @markhamstra . 

Maybe, I should leave it for now. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21209: [SPARK-24141][CORE] Fix bug in CoarseGrainedSchedulerBac...

2018-05-07 Thread Ngone51
Github user Ngone51 commented on the issue:

https://github.com/apache/spark/pull/21209
  
ping @jiangxb1987 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21212: [SPARK-24143] filter empty blocks when convert ma...

2018-05-05 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21212#discussion_r186261650
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala 
---
@@ -267,28 +269,30 @@ final class ShuffleBlockFetcherIterator(
 // at most maxBytesInFlight in order to limit the amount of data in 
flight.
 val remoteRequests = new ArrayBuffer[FetchRequest]
 
-// Tracks total number of blocks (including zero sized blocks)
-var totalBlocks = 0
 for ((address, blockInfos) <- blocksByAddress) {
-  totalBlocks += blockInfos.size
   if (address.executorId == blockManager.blockManagerId.executorId) {
-// Filter out zero-sized blocks
-localBlocks ++= blockInfos.filter(_._2 != 0).map(_._1)
+blockInfos.find(_._2 <= 0) match {
+  case Some((blockId, size)) if size < 0 =>
+throw new BlockException(blockId, "Negative block size " + 
size)
+  case Some((blockId, size)) if size == 0 =>
+throw new BlockException(blockId, "Zero-sized blocks should be 
excluded.")
--- End diff --

Is it necessary to throw exception here? If so, shall we also throw 
exception when detect 0-sized **remote** block rather than skip it silently? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21209: [SPARK-24141][CORE] Fix bug in CoarseGrainedSchedulerBac...

2018-05-01 Thread Ngone51
Github user Ngone51 commented on the issue:

https://github.com/apache/spark/pull/21209
  
ping @squito @vanzin 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21209: [SPARK-24141][CORE] Fix bug in CoarseGrainedSched...

2018-05-01 Thread Ngone51
GitHub user Ngone51 opened a pull request:

https://github.com/apache/spark/pull/21209

[SPARK-24141][CORE] Fix bug in CoarseGrainedSchedulerBackend.killExecutors

## What changes were proposed in this pull request?

In method *CoarseGrainedSchedulerBackend.killExecutors()*, 
`numPendingExecutors` should add 
`executorsToKill.size` rather than `knownExecutors.size` if we do not 
adjust target number of executors.

## How was this patch tested?

N/A


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/Ngone51/spark SPARK-24141

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21209.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21209


commit 264f316c178ff32ea632cc3db7e20ab68d555b85
Author: wuyi <ngone_5451@...>
Date:   2018-05-02T01:50:01Z

fix a bug in killExecutors




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20604: [SPARK-23365][CORE] Do not adjust num executors w...

2018-04-30 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20604#discussion_r185159109
  
--- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala ---
@@ -1643,7 +1646,10 @@ class SparkContext(config: SparkConf) extends 
Logging {
   def killExecutors(executorIds: Seq[String]): Boolean = {
 schedulerBackend match {
   case b: ExecutorAllocationClient =>
-b.killExecutors(executorIds, replace = false, force = 
true).nonEmpty
+require(executorAllocationManager.isEmpty,
--- End diff --

Hi @squito , thanks for your reply.

> but only *when* pending tasks increase.

`ExecutorAllocationManager ` will check pending (or backlog) tasks 
periodically. So, we do not have to wait for *increment* actually.

And for `Dynamic Allocation` & `User` case, yeah, that's hard to define. 

And I checked `SchedulerBackendUtils.getInitialTargetExecutorNumbe`, it set 
`DEFAULT_NUMBER_EXECUTORS` = 2. But, this is not consistent with `Master`, 
which set `executorLimit` to `Int.MaxValue` if we are not under dynamic 
allocation mode. Maybe we can just init `requestedTotalExecutors ` with 
`Int.MaxValue`(only when we are not under dynamic allocation mode). 
Or, we do not call `doRequestTotalExecutors` if we call `requestExecutors` 
or `killExecutors`, except `requestTotalExecutors`(only when we are not under 
dynamic allocation mode).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20604: [SPARK-23365][CORE] Do not adjust num executors when kil...

2018-04-27 Thread Ngone51
Github user Ngone51 commented on the issue:

https://github.com/apache/spark/pull/20604
  
ping @squito 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21175: [SPARK-24107][CORE] ChunkedByteBuffer.writeFully method ...

2018-04-27 Thread Ngone51
Github user Ngone51 commented on the issue:

https://github.com/apache/spark/pull/21175
  
cc @kiszk @maropu @cloud-fan @jiangxb1987 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21175: [SPARK-24107][CORE] ChunkedByteBuffer.writeFully ...

2018-04-27 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21175#discussion_r184607965
  
--- Diff: 
core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala ---
@@ -56,6 +56,12 @@ class ChunkedByteBufferSuite extends SparkFunSuite {
 assert(chunkedByteBuffer.getChunks().head.position() === 0)
   }
 
+  test("SPARK-24107: writeFully() write buffer which is larger than 
bufferWriteChunkSize") {
+val chunkedByteBuffer = new 
ChunkedByteBuffer(Array(ByteBuffer.allocate(80 * 1024 * 1024)))
+chunkedByteBuffer.writeFully(new 
ByteArrayWritableChannel(chunkedByteBuffer.size.toInt))
+assert(chunkedByteBuffer.size === (80L * 1024L * 1024L))
--- End diff --

`ByteArrayWritableChannel `'s size, not `chunkedByteBuffer`'s size.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21175: [SPARK-24107][CORE] ChunkedByteBuffer.writeFully ...

2018-04-26 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21175#discussion_r184597197
  
--- Diff: 
core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala ---
@@ -56,6 +56,12 @@ class ChunkedByteBufferSuite extends SparkFunSuite {
 assert(chunkedByteBuffer.getChunks().head.position() === 0)
   }
 
+  test("writeFully() can write buffer which is larger than 
bufferWriteChunkSize correctly") {
+val chunkedByteBuffer = new 
ChunkedByteBuffer(Array(ByteBuffer.allocate(80*1024*1024)))
--- End diff --

nit: space beside `*`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21175: [SPARK-24107][CORE] ChunkedByteBuffer.writeFully ...

2018-04-26 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21175#discussion_r184596199
  
--- Diff: 
core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala ---
@@ -56,6 +56,12 @@ class ChunkedByteBufferSuite extends SparkFunSuite {
 assert(chunkedByteBuffer.getChunks().head.position() === 0)
   }
 
+  test("writeFully() can write buffer which is larger than 
bufferWriteChunkSize correctly") {
+val chunkedByteBuffer = new 
ChunkedByteBuffer(Array(ByteBuffer.allocate(80*1024*1024)))
+chunkedByteBuffer.writeFully(new 
ByteArrayWritableChannel(chunkedByteBuffer.size.toInt))
+assert(chunkedByteBuffer.getChunks().head.position() === 0)
--- End diff --

This assert is unnecessary for this PR change. Please replace it with 
assert channel's length here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21175: [SPARK-24107][CORE] ChunkedByteBuffer.writeFully ...

2018-04-26 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21175#discussion_r184590989
  
--- Diff: 
core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala ---
@@ -56,6 +56,12 @@ class ChunkedByteBufferSuite extends SparkFunSuite {
 assert(chunkedByteBuffer.getChunks().head.position() === 0)
   }
 
+  test("writeFully() does not affect original buffer's position") {
--- End diff --

Hi @manbuyun .You should add a new unit test to support your own change. 
For example, "writeFully() can write buffer which is larger than 
`bufferWriteChunkSize` correctly. " And update the test code.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21131: [SPARK-23433][CORE] Late zombie task completions update ...

2018-04-26 Thread Ngone51
Github user Ngone51 commented on the issue:

https://github.com/apache/spark/pull/21131
  
LGTM, and nice UT.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21175: [SPARK-24107] ChunkedByteBuffer.writeFully method has no...

2018-04-26 Thread Ngone51
Github user Ngone51 commented on the issue:

https://github.com/apache/spark/pull/21175
  
@manbuyun you need to add the unit test into `ChunkedByteBufferSuite.scala` 
and push a new commit.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20930: [SPARK-23811][Core] FetchFailed comes before Success of ...

2018-04-26 Thread Ngone51
Github user Ngone51 commented on the issue:

https://github.com/apache/spark/pull/20930
  
No wonder I can't understand the issue for a long time since I've thought 
it happened on Spark2.3 . And now it makes sense. Thanks @jiangxb1987 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20604: [SPARK-23365][CORE] Do not adjust num executors when kil...

2018-04-25 Thread Ngone51
Github user Ngone51 commented on the issue:

https://github.com/apache/spark/pull/20604
  
ping @squito 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21131: [SPARK-23433][CORE] Late zombie task completions ...

2018-04-24 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21131#discussion_r183797532
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala ---
@@ -917,4 +917,111 @@ class TaskSchedulerImplSuite extends SparkFunSuite 
with LocalSparkContext with B
   taskScheduler.initialize(new FakeSchedulerBackend)
 }
   }
+
+  test("Completions in zombie tasksets update status of non-zombie 
taskset") {
+val taskScheduler = setupSchedulerWithMockTaskSetBlacklist()
+val valueSer = SparkEnv.get.serializer.newInstance()
+
+def completeTaskSuccessfully(tsm: TaskSetManager, partition: Int): 
Unit = {
+  val indexInTsm = tsm.partitionToIndex(partition)
+  val matchingTaskInfo = tsm.taskAttempts.flatten.filter(_.index == 
indexInTsm).head
+  val result = new DirectTaskResult[Int](valueSer.serialize(1), Seq())
+  tsm.handleSuccessfulTask(matchingTaskInfo.taskId, result)
+}
+
+// Submit a task set, have it fail with a fetch failed, and then 
re-submit the task attempt,
+// two times, so we have three active task sets for one stage.  (For 
this to really happen,
+// you'd need the previous stage to also get restarted, and then 
succeed, in between each
+// attempt, but that happens outside what we're mocking here.)
+val zombieAttempts = (0 until 2).map { stageAttempt =>
+  val attempt = FakeTask.createTaskSet(10, stageAttemptId = 
stageAttempt)
+  taskScheduler.submitTasks(attempt)
+  val tsm = taskScheduler.taskSetManagerForAttempt(0, stageAttempt).get
+  val offers = (0 until 10).map{ idx => WorkerOffer(s"exec-$idx", 
s"host-$idx", 1) }
+  taskScheduler.resourceOffers(offers)
+  assert(tsm.runningTasks === 10)
+  if (stageAttempt < 2) {
+// fail attempt
+tsm.handleFailedTask(tsm.taskAttempts.head.head.taskId, 
TaskState.FAILED,
+  FetchFailed(null, 0, 0, 0, "fetch failed"))
+// the attempt is a zombie, but the tasks are still running (this 
could be true even if
+// we actively killed those tasks, as killing is best-effort)
+assert(tsm.isZombie)
+assert(tsm.runningTasks === 9)
+  }
+  tsm
+}
+
+// we've now got 2 zombie attempts, each with 9 tasks still active.  
Submit the 3rd attempt for
+// the stage, but this time with insufficient resources so not all 
tasks are active.
+
+val finalAttempt = FakeTask.createTaskSet(10, stageAttemptId = 2)
+taskScheduler.submitTasks(finalAttempt)
+val finalTsm = taskScheduler.taskSetManagerForAttempt(0, 2).get
+val offers = (0 until 5).map{ idx => WorkerOffer(s"exec-$idx", 
s"host-$idx", 1) }
+val finalAttemptLaunchedPartitions = 
taskScheduler.resourceOffers(offers).flatten.map { task =>
--- End diff --

The explanation is quite clear and I get understand now. Thank you very 
mush! @squito 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21131: [SPARK-23433][CORE] Late zombie task completions ...

2018-04-24 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21131#discussion_r183790463
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala ---
@@ -917,4 +917,111 @@ class TaskSchedulerImplSuite extends SparkFunSuite 
with LocalSparkContext with B
   taskScheduler.initialize(new FakeSchedulerBackend)
 }
   }
+
+  test("Completions in zombie tasksets update status of non-zombie 
taskset") {
+val taskScheduler = setupSchedulerWithMockTaskSetBlacklist()
+val valueSer = SparkEnv.get.serializer.newInstance()
+
+def completeTaskSuccessfully(tsm: TaskSetManager, partition: Int): 
Unit = {
+  val indexInTsm = tsm.partitionToIndex(partition)
+  val matchingTaskInfo = tsm.taskAttempts.flatten.filter(_.index == 
indexInTsm).head
+  val result = new DirectTaskResult[Int](valueSer.serialize(1), Seq())
+  tsm.handleSuccessfulTask(matchingTaskInfo.taskId, result)
+}
+
+// Submit a task set, have it fail with a fetch failed, and then 
re-submit the task attempt,
+// two times, so we have three active task sets for one stage.  (For 
this to really happen,
+// you'd need the previous stage to also get restarted, and then 
succeed, in between each
+// attempt, but that happens outside what we're mocking here.)
+val zombieAttempts = (0 until 2).map { stageAttempt =>
+  val attempt = FakeTask.createTaskSet(10, stageAttemptId = 
stageAttempt)
+  taskScheduler.submitTasks(attempt)
+  val tsm = taskScheduler.taskSetManagerForAttempt(0, stageAttempt).get
+  val offers = (0 until 10).map{ idx => WorkerOffer(s"exec-$idx", 
s"host-$idx", 1) }
+  taskScheduler.resourceOffers(offers)
+  assert(tsm.runningTasks === 10)
+  if (stageAttempt < 2) {
+// fail attempt
+tsm.handleFailedTask(tsm.taskAttempts.head.head.taskId, 
TaskState.FAILED,
+  FetchFailed(null, 0, 0, 0, "fetch failed"))
+// the attempt is a zombie, but the tasks are still running (this 
could be true even if
+// we actively killed those tasks, as killing is best-effort)
+assert(tsm.isZombie)
+assert(tsm.runningTasks === 9)
+  }
+  tsm
+}
+
+// we've now got 2 zombie attempts, each with 9 tasks still active.  
Submit the 3rd attempt for
+// the stage, but this time with insufficient resources so not all 
tasks are active.
+
+val finalAttempt = FakeTask.createTaskSet(10, stageAttemptId = 2)
+taskScheduler.submitTasks(finalAttempt)
+val finalTsm = taskScheduler.taskSetManagerForAttempt(0, 2).get
+val offers = (0 until 5).map{ idx => WorkerOffer(s"exec-$idx", 
s"host-$idx", 1) }
+val finalAttemptLaunchedPartitions = 
taskScheduler.resourceOffers(offers).flatten.map { task =>
+  finalAttempt.tasks(task.index).partitionId
+}.toSet
+assert(finalTsm.runningTasks === 5)
+assert(!finalTsm.isZombie)
+
+// We simulate late completions from our zombie tasksets, 
corresponding to all the pending
+// partitions in our final attempt.  This means we're only waiting on 
the tasks we've already
+// launched.
+val finalAttemptPendingPartitions = (0 until 
10).toSet.diff(finalAttemptLaunchedPartitions)
+finalAttemptPendingPartitions.foreach { partition =>
+  completeTaskSuccessfully(zombieAttempts(0), partition)
+}
+
+// If there is another resource offer, we shouldn't run anything.  
Though our final attempt
+// used to have pending tasks, now those tasks have been completed by 
zombie attempts.  The
+// remaining tasks to compute are already active in the non-zombie 
attempt.
+assert(
+  taskScheduler.resourceOffers(IndexedSeq(WorkerOffer("exec-1", 
"host-1", 1))).flatten.isEmpty)
+
+val allTaskSets = zombieAttempts ++ Seq(finalTsm)
+val remainingTasks = (0 until 
10).toSet.diff(finalAttemptPendingPartitions)
+
+// finally, if we finish the remaining partitions from a mix of 
tasksets, all attempts should be
+// marked as zombie.
+// for each of the remaining tasks, find the tasksets with an active 
copy of the task, and
+// finish the task.
+remainingTasks.foreach { partition =>
+  val tsm = if (partition == 0) {
+// we failed this task on both zombie attempts, this one is only 
present in the latest
+// taskset
+finalTsm
+  } else {
+// should be active in every taskset.  We choose a zombie taskset 
just to make sure that
+// we transition the active taskset correctly eve

[GitHub] spark pull request #21131: [SPARK-23433][CORE] Late zombie task completions ...

2018-04-24 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21131#discussion_r183789814
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala ---
@@ -917,4 +917,111 @@ class TaskSchedulerImplSuite extends SparkFunSuite 
with LocalSparkContext with B
   taskScheduler.initialize(new FakeSchedulerBackend)
 }
   }
+
+  test("Completions in zombie tasksets update status of non-zombie 
taskset") {
+val taskScheduler = setupSchedulerWithMockTaskSetBlacklist()
+val valueSer = SparkEnv.get.serializer.newInstance()
+
+def completeTaskSuccessfully(tsm: TaskSetManager, partition: Int): 
Unit = {
+  val indexInTsm = tsm.partitionToIndex(partition)
+  val matchingTaskInfo = tsm.taskAttempts.flatten.filter(_.index == 
indexInTsm).head
+  val result = new DirectTaskResult[Int](valueSer.serialize(1), Seq())
+  tsm.handleSuccessfulTask(matchingTaskInfo.taskId, result)
+}
+
+// Submit a task set, have it fail with a fetch failed, and then 
re-submit the task attempt,
+// two times, so we have three active task sets for one stage.  (For 
this to really happen,
+// you'd need the previous stage to also get restarted, and then 
succeed, in between each
+// attempt, but that happens outside what we're mocking here.)
+val zombieAttempts = (0 until 2).map { stageAttempt =>
+  val attempt = FakeTask.createTaskSet(10, stageAttemptId = 
stageAttempt)
+  taskScheduler.submitTasks(attempt)
+  val tsm = taskScheduler.taskSetManagerForAttempt(0, stageAttempt).get
+  val offers = (0 until 10).map{ idx => WorkerOffer(s"exec-$idx", 
s"host-$idx", 1) }
+  taskScheduler.resourceOffers(offers)
+  assert(tsm.runningTasks === 10)
+  if (stageAttempt < 2) {
+// fail attempt
+tsm.handleFailedTask(tsm.taskAttempts.head.head.taskId, 
TaskState.FAILED,
+  FetchFailed(null, 0, 0, 0, "fetch failed"))
+// the attempt is a zombie, but the tasks are still running (this 
could be true even if
+// we actively killed those tasks, as killing is best-effort)
+assert(tsm.isZombie)
+assert(tsm.runningTasks === 9)
+  }
+  tsm
+}
+
+// we've now got 2 zombie attempts, each with 9 tasks still active.  
Submit the 3rd attempt for
+// the stage, but this time with insufficient resources so not all 
tasks are active.
+
+val finalAttempt = FakeTask.createTaskSet(10, stageAttemptId = 2)
+taskScheduler.submitTasks(finalAttempt)
+val finalTsm = taskScheduler.taskSetManagerForAttempt(0, 2).get
+val offers = (0 until 5).map{ idx => WorkerOffer(s"exec-$idx", 
s"host-$idx", 1) }
+val finalAttemptLaunchedPartitions = 
taskScheduler.resourceOffers(offers).flatten.map { task =>
--- End diff --

> because they won't be able to get their shuffle input, same as the 
original fetch failure

why? In `DAGScheduler`, we only unregister one MapStatus of parent stage, 
so other running tasks within the failed (child) stage (caused by a fetch fail 
task)  may still get MapOutputs from `MapOutputTrackerMaster`, and fetch data 
from other `Executor`s. So, they can success normally. 
Do I miss something?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21131: [SPARK-23433][CORE] Late zombie task completions ...

2018-04-24 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21131#discussion_r183701269
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala ---
@@ -917,4 +917,111 @@ class TaskSchedulerImplSuite extends SparkFunSuite 
with LocalSparkContext with B
   taskScheduler.initialize(new FakeSchedulerBackend)
 }
   }
+
+  test("Completions in zombie tasksets update status of non-zombie 
taskset") {
+val taskScheduler = setupSchedulerWithMockTaskSetBlacklist()
+val valueSer = SparkEnv.get.serializer.newInstance()
+
+def completeTaskSuccessfully(tsm: TaskSetManager, partition: Int): 
Unit = {
+  val indexInTsm = tsm.partitionToIndex(partition)
+  val matchingTaskInfo = tsm.taskAttempts.flatten.filter(_.index == 
indexInTsm).head
+  val result = new DirectTaskResult[Int](valueSer.serialize(1), Seq())
+  tsm.handleSuccessfulTask(matchingTaskInfo.taskId, result)
+}
+
+// Submit a task set, have it fail with a fetch failed, and then 
re-submit the task attempt,
+// two times, so we have three active task sets for one stage.  (For 
this to really happen,
+// you'd need the previous stage to also get restarted, and then 
succeed, in between each
+// attempt, but that happens outside what we're mocking here.)
+val zombieAttempts = (0 until 2).map { stageAttempt =>
+  val attempt = FakeTask.createTaskSet(10, stageAttemptId = 
stageAttempt)
+  taskScheduler.submitTasks(attempt)
+  val tsm = taskScheduler.taskSetManagerForAttempt(0, stageAttempt).get
+  val offers = (0 until 10).map{ idx => WorkerOffer(s"exec-$idx", 
s"host-$idx", 1) }
+  taskScheduler.resourceOffers(offers)
+  assert(tsm.runningTasks === 10)
+  if (stageAttempt < 2) {
+// fail attempt
+tsm.handleFailedTask(tsm.taskAttempts.head.head.taskId, 
TaskState.FAILED,
+  FetchFailed(null, 0, 0, 0, "fetch failed"))
+// the attempt is a zombie, but the tasks are still running (this 
could be true even if
+// we actively killed those tasks, as killing is best-effort)
+assert(tsm.isZombie)
+assert(tsm.runningTasks === 9)
+  }
+  tsm
+}
+
+// we've now got 2 zombie attempts, each with 9 tasks still active.  
Submit the 3rd attempt for
+// the stage, but this time with insufficient resources so not all 
tasks are active.
+
+val finalAttempt = FakeTask.createTaskSet(10, stageAttemptId = 2)
+taskScheduler.submitTasks(finalAttempt)
+val finalTsm = taskScheduler.taskSetManagerForAttempt(0, 2).get
+val offers = (0 until 5).map{ idx => WorkerOffer(s"exec-$idx", 
s"host-$idx", 1) }
+val finalAttemptLaunchedPartitions = 
taskScheduler.resourceOffers(offers).flatten.map { task =>
--- End diff --

Yet, launched tasks has nothing to do with other running tasks in other 
`TaskSet`s. But, is it possible to take those running tasks into consideration 
when launch a new task (in source code) ? For example,  launching FetchFailed 
task or tasks who do not have a running copy across `TaskSet`s firstly ?

(But, it seems we will always have running copies in other `TaskSet`s for 
our  final `TaskSet`, except FetchFailed task, right? It's more like we are not 
talking about resubmitting a stage, but resubmitting tasks who do not have 
running copies across previous `TaskSet`s.)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21131: [SPARK-23433][CORE] Late zombie task completions ...

2018-04-24 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21131#discussion_r183690133
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala ---
@@ -917,4 +917,111 @@ class TaskSchedulerImplSuite extends SparkFunSuite 
with LocalSparkContext with B
   taskScheduler.initialize(new FakeSchedulerBackend)
 }
   }
+
+  test("Completions in zombie tasksets update status of non-zombie 
taskset") {
+val taskScheduler = setupSchedulerWithMockTaskSetBlacklist()
+val valueSer = SparkEnv.get.serializer.newInstance()
+
+def completeTaskSuccessfully(tsm: TaskSetManager, partition: Int): 
Unit = {
+  val indexInTsm = tsm.partitionToIndex(partition)
+  val matchingTaskInfo = tsm.taskAttempts.flatten.filter(_.index == 
indexInTsm).head
+  val result = new DirectTaskResult[Int](valueSer.serialize(1), Seq())
+  tsm.handleSuccessfulTask(matchingTaskInfo.taskId, result)
+}
+
+// Submit a task set, have it fail with a fetch failed, and then 
re-submit the task attempt,
+// two times, so we have three active task sets for one stage.  (For 
this to really happen,
+// you'd need the previous stage to also get restarted, and then 
succeed, in between each
+// attempt, but that happens outside what we're mocking here.)
+val zombieAttempts = (0 until 2).map { stageAttempt =>
+  val attempt = FakeTask.createTaskSet(10, stageAttemptId = 
stageAttempt)
+  taskScheduler.submitTasks(attempt)
+  val tsm = taskScheduler.taskSetManagerForAttempt(0, stageAttempt).get
+  val offers = (0 until 10).map{ idx => WorkerOffer(s"exec-$idx", 
s"host-$idx", 1) }
+  taskScheduler.resourceOffers(offers)
+  assert(tsm.runningTasks === 10)
+  if (stageAttempt < 2) {
+// fail attempt
+tsm.handleFailedTask(tsm.taskAttempts.head.head.taskId, 
TaskState.FAILED,
+  FetchFailed(null, 0, 0, 0, "fetch failed"))
+// the attempt is a zombie, but the tasks are still running (this 
could be true even if
+// we actively killed those tasks, as killing is best-effort)
+assert(tsm.isZombie)
+assert(tsm.runningTasks === 9)
+  }
+  tsm
+}
+
+// we've now got 2 zombie attempts, each with 9 tasks still active.  
Submit the 3rd attempt for
+// the stage, but this time with insufficient resources so not all 
tasks are active.
+
+val finalAttempt = FakeTask.createTaskSet(10, stageAttemptId = 2)
+taskScheduler.submitTasks(finalAttempt)
+val finalTsm = taskScheduler.taskSetManagerForAttempt(0, 2).get
+val offers = (0 until 5).map{ idx => WorkerOffer(s"exec-$idx", 
s"host-$idx", 1) }
+val finalAttemptLaunchedPartitions = 
taskScheduler.resourceOffers(offers).flatten.map { task =>
+  finalAttempt.tasks(task.index).partitionId
+}.toSet
+assert(finalTsm.runningTasks === 5)
+assert(!finalTsm.isZombie)
+
+// We simulate late completions from our zombie tasksets, 
corresponding to all the pending
+// partitions in our final attempt.  This means we're only waiting on 
the tasks we've already
+// launched.
+val finalAttemptPendingPartitions = (0 until 
10).toSet.diff(finalAttemptLaunchedPartitions)
+finalAttemptPendingPartitions.foreach { partition =>
+  completeTaskSuccessfully(zombieAttempts(0), partition)
+}
+
+// If there is another resource offer, we shouldn't run anything.  
Though our final attempt
+// used to have pending tasks, now those tasks have been completed by 
zombie attempts.  The
+// remaining tasks to compute are already active in the non-zombie 
attempt.
+assert(
+  taskScheduler.resourceOffers(IndexedSeq(WorkerOffer("exec-1", 
"host-1", 1))).flatten.isEmpty)
+
+val allTaskSets = zombieAttempts ++ Seq(finalTsm)
+val remainingTasks = (0 until 
10).toSet.diff(finalAttemptPendingPartitions)
+
+// finally, if we finish the remaining partitions from a mix of 
tasksets, all attempts should be
+// marked as zombie.
+// for each of the remaining tasks, find the tasksets with an active 
copy of the task, and
+// finish the task.
+remainingTasks.foreach { partition =>
+  val tsm = if (partition == 0) {
+// we failed this task on both zombie attempts, this one is only 
present in the latest
+// taskset
+finalTsm
+  } else {
+// should be active in every taskset.  We choose a zombie taskset 
just to make sure that
+// we transition the active taskset correctly eve

[GitHub] spark pull request #21131: [SPARK-23433][CORE] Late zombie task completions ...

2018-04-24 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21131#discussion_r183619646
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -689,6 +689,20 @@ private[spark] class TaskSchedulerImpl(
 }
   }
 
+  /**
+   * Marks the task has completed in all TaskSetManagers for the given 
stage.
+   *
+   * After stage failure and retry, there may be multiple active 
TaskSetManagers for the stage.
--- End diff --

IIRC, there's only one active `TaskSetManager` for a given stage, and with 
some zombie `TaskSetManager`s possibly. Though, there may be some running tasks 
in zombie `TaskSetManager`s.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21131: [SPARK-23433][CORE] Late zombie task completions ...

2018-04-24 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21131#discussion_r183619704
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -689,6 +689,20 @@ private[spark] class TaskSchedulerImpl(
 }
   }
 
+  /**
+   * Marks the task has completed in all TaskSetManagers for the given 
stage.
+   *
+   * After stage failure and retry, there may be multiple active 
TaskSetManagers for the stage.
+   * If an earlier attempt of a stage completes a task, we should ensure 
that the later attempts
+   * do not also submit those same tasks.  That also means that a task 
completion from an  earlier
+   * attempt can lead to the entire stage getting marked as successful.
+   */
+  private[scheduler] def markPartitionCompletedInAllTaskSets(stageId: Int, 
partitionId: Int) = {
+taskSetsByStageIdAndAttempt.getOrElse(stageId, Map()).values.foreach { 
tsm =>
--- End diff --

Generally, it seems impossible for a unfinished `TaskSet` to get an empty 
`Map()` in `taskSetsByStageIdAndAttempt` .  But, if it does, maybe, we can tell 
the caller the stage has already finished.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20998: [SPARK-23888][CORE] correct the comment of hasAttemptOnH...

2018-04-23 Thread Ngone51
Github user Ngone51 commented on the issue:

https://github.com/apache/spark/pull/20998
  
Agree and thank you @squito .

And thanks for all of you. @felixcheung @mridulm @jiangxb1987 @srowen 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20998: [SPARK-23888][CORE] correct the comment of hasAtt...

2018-04-23 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20998#discussion_r183408481
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -287,7 +287,7 @@ private[spark] class TaskSetManager(
 None
   }
 
-  /** Check whether a task is currently running an attempt on a given host 
*/
+  /** Check whether a task once run an attempt on a given host */
--- End diff --

Yes. Thank you.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20930: [SPARK-23811][Core] FetchFailed comes before Success of ...

2018-04-21 Thread Ngone51
Github user Ngone51 commented on the issue:

https://github.com/apache/spark/pull/20930
  
> because we can get the MapStatus, but get a 'null'. If I'm not mistaken, 
this also because the ExecutorLost trigger removeOutputsOnExecutor

If there's a `null` MapStatus for stage 2, how can it retry 4 times without 
any tasks? IIUC, `null` MapStatus leads to missing partition, which means there 
will be some tasks to submit.

As for stage 3's shuffle Id, that's really weird. Hope you can fix it! 
@xuanyuanking 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20930: [SPARK-23811][Core] FetchFailed comes before Success of ...

2018-04-21 Thread Ngone51
Github user Ngone51 commented on the issue:

https://github.com/apache/spark/pull/20930
  
Hi, @xuanyuanking , thank for your patient explanation, sincerely.

With regard to your latest explanation:
 
> stage 2's shuffleID is 1, but stage 3 failed by missing an output for 
shuffle '0'! So here the stage 2's skip cause stage 3 got an error shuffleId.

However, I don't think stage 2's skip will lead to stage 3 got an error 
shuffleId, as we've already created all `ShuffleDependencies ` (constructed 
with certain ids) for `ShuffleMapStages` before any stages of a job submitted. 

As I struggle for understanding this issue for a while,  finally, I got my 
own inference:

(assume the 2 ShuffleMapTasks below is belong to stage 2, and stage 2 has 
two partitions on map side. And stage 2 has a parent stage named stage 1, and a 
child stage named stage 3.)

1. ShuffleMapTask 0.0 run on ExecutorB,  and write map output on ExecutorB, 
 succeed normally.
And now, there's only '1' available map output registered on 
`MapOutputTrackerMaster `.

2. ShuffleMapTask 1.0 is running on ExecutorA, and fetch data from 
ExecutorA, and write map output on ExecutorA, too.

3. ExecutorA lost for unknown reason after send `StatusUpdate` message to 
driver, which tells ShuffleMapTask 1.0's success. And all map outputs on 
ExecutorA lost, include ShuffleMapTask 1.0's map output.

4. And driver launch a speculative ShuffleMapTask 1.1 before it receives 
the `StatusUpdate` message. And ShuffleMapTask 1.1 get FetchFailed immediately.

5. `DAGScheduler` handle the FetchFailed ShuffleMapTask 1.1 firstly, mark 
stage 2 and it's parent stage 1 as failed. And stage 1 & stage 2 are waiting 
for resubmit.

6. `DAGScheduler ` handle the success ShuffleMapTask 1.0 before stage 1 & 
stage 2 resubmit, which trigger `MapOutputTrackerMaster.registerMapOutput` . 
And now, there's '2' available map output registered on `MapOutputTrackerMaster 
` (but knowing ShuffleMapTask 1.0's map output on ExecutorA has been lost.).

7. stage 1 resubmitted and succeed normally.

8. stage 2 resubmitted. As stage 2 has '2' available map output registered 
on `MapOutputTrackerMaster `, so there's no missing partitions for stage 2. 
Thus, stage 2 has no missing tasks to submit, too. 

9. And then, we submit stage 3. As stage 2's map output file lost on 
ExecutorA, so stage 3 must get a FetchFailed at the end. Then, we resubmit 
stage 2& stage 3.  And then we get into a loop until stag 3 abort.

But if the issue is what I described above, we should get 
`FetchFailedException` instead of `MetadataFetchFailedException`  shown in 
screenshot.  So, at this point which can not make sense. 

Please feel free to point my wrong spot out.

Anyway, thanks again.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21096: [SPARK-24011][CORE][WIP] cache rdd's immediate parent Sh...

2018-04-20 Thread Ngone51
Github user Ngone51 commented on the issue:

https://github.com/apache/spark/pull/21096
  
ping @jiangxb1987 @squito 

Would you please have a look at this PR? What's your opinions on the cache 
strategy?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21104: [SPARK-24021][CORE] fix bug in BlacklistTracker's update...

2018-04-18 Thread Ngone51
Github user Ngone51 commented on the issue:

https://github.com/apache/spark/pull/21104
  
ping @jerryshao 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21104: [SPARK-24021][CORE] fix bug in BlacklistTracker's...

2018-04-18 Thread Ngone51
GitHub user Ngone51 opened a pull request:

https://github.com/apache/spark/pull/21104

[SPARK-24021][CORE] fix bug in BlacklistTracker's 
updateBlacklistForFetchFailure

## What changes were proposed in this pull request?

There‘s a miswrite in BlacklistTracker's updateBlacklistForFetchFailure:
```
val blacklistedExecsOnNode =
nodeToBlacklistedExecs.getOrElseUpdate(exec, HashSet[String]())
blacklistedExecsOnNode += exec
```
where first **exec** should be **host**.
## How was this patch tested?

adjust existed test.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/Ngone51/spark SPARK-24021

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21104.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21104


commit d2257213ecd4b0e8ec91bfa52f7caf725c267b16
Author: wuyi <ngone_5451@...>
Date:   2018-04-19T02:42:51Z

fix bug in BlacklistTracker's updateBlacklistForFetchFailure




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21096: [SPARK-24011][CORE] cache rdd's immediate parent Shuffle...

2018-04-18 Thread Ngone51
Github user Ngone51 commented on the issue:

https://github.com/apache/spark/pull/21096
  
Thank you for your comments @markhamstra .
Yeah, I'm considering adding a UT to support this change. And thank for 
reminding me of DAGScheduler's basic principle.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21096: cache rdd's immediate parent ShuffleDependencies ...

2018-04-18 Thread Ngone51
GitHub user Ngone51 opened a pull request:

https://github.com/apache/spark/pull/21096

cache rdd's immediate parent ShuffleDependencies to accelerate 
getShuffleDependencies

## What changes were proposed in this pull request?

When creating stages for jobs, we need to find a rdd's (except the final 
rdd) immediate parent ShuffleDependencies by method `getShuffleDependencies()` 
for at least 2 times (first in
`getMissingAncestorShuffleDependencies()`, and second in 
`getOrCreateParentStages()`).

So, we can cache the result at the fist time we call 
`getShuffleDependencies()`.
This is helpful for cutting time consuming when there's many 
`NarrowDependencies` between the rdd and its immediate parent 
`ShuffleDependencies` or if the rdd has a number of immediate parent 
`ShuffleDependencies` .
 
There's an exception for checkpointed rdd. If a rdd is checkpointed, it's 
immediate parent `ShuffleDependencies` should adjust to empty.
## How was this patch tested?

exists.

Please review http://spark.apache.org/contributing.html before opening a 
pull request.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/Ngone51/spark SPARK-24011

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21096.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21096


commit 59fb931135b7bc8fc1f516c39015f7412ae25208
Author: wuyi <ngone_5451@...>
Date:   2018-04-18T10:35:22Z

cache rdd's immediate ShuffleDependencies




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20930: [SPARK-23811][Core] FetchFailed comes before Success of ...

2018-04-17 Thread Ngone51
Github user Ngone51 commented on the issue:

https://github.com/apache/spark/pull/20930
  
Hi, @xuanyuanking , I'm still confused (smile & cry). 
> Stage 2 retry 4 times triggered by Stage 3's fetch failed event. Actually 
in this scenario, stage 3 will always failed by fetch fail.

Stage 2 has no missing tasks, right? So,  there's no missing partitions for 
Stage 2 (which means Stage 3 can always get Stage 2's MapOutputs from 
`MapOutputTrackerMaster` ), right? So, why  Stage 3 will always failed by 
FetchFail?
 
Hope you can explain more. Thank you very much!



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20930: [SPARK-23811][Core] FetchFailed comes before Succ...

2018-04-17 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20930#discussion_r182308871
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
@@ -2399,6 +2399,50 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with TimeLi
 }
   }
 
+  /**
+   * This tests the case where origin task success after speculative task 
got FetchFailed
+   * before.
+   */
+  test("[SPARK-23811] FetchFailed comes before Success of same task will 
cause child stage" +
+" never succeed") {
+// Create 3 RDDs with shuffle dependencies on each other: rddA <--- 
rddB <--- rddC
+val rddA = new MyRDD(sc, 2, Nil)
+val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2))
+val shuffleIdA = shuffleDepA.shuffleId
+
+val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = 
mapOutputTracker)
+val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(2))
+
+val rddC = new MyRDD(sc, 2, List(shuffleDepB), tracker = 
mapOutputTracker)
+
+submit(rddC, Array(0, 1))
+
+// Complete both tasks in rddA.
+assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0)
+complete(taskSets(0), Seq(
+  (Success, makeMapStatus("hostA", 2)),
+  (Success, makeMapStatus("hostB", 2
+
+// The first task success
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(0), Success, makeMapStatus("hostB", 2)))
+
+// The second task's speculative attempt fails first, but task self 
still running.
+// This may caused by ExecutorLost.
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(1),
--- End diff --

Maybe, you can `runEvent(SpeculativeTaskSubmitted)` first to simulate a 
speculative task submitted before you `runEvent(makeCompletetionEvent())`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20998: [SPARK-23888][CORE] speculative task should not run on a...

2018-04-17 Thread Ngone51
Github user Ngone51 commented on the issue:

https://github.com/apache/spark/pull/20998
  
Will do, and it's okay. 
My view limited in the source code yet, but you guys have more practical 
experience. So I learned from your points. It's beneficial.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20604: [SPARK-23365][CORE] Do not adjust num executors when kil...

2018-04-17 Thread Ngone51
Github user Ngone51 commented on the issue:

https://github.com/apache/spark/pull/20604
  
> I'd go even further and suggest that with this fix in, we can actually 
remove SPARK-21834, as its no longer necessary. 

Yes, otherwise, this PR's work is meaningless.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20604: [SPARK-23365][CORE] Do not adjust num executors w...

2018-04-17 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20604#discussion_r182086337
  
--- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala ---
@@ -1643,7 +1646,10 @@ class SparkContext(config: SparkConf) extends 
Logging {
   def killExecutors(executorIds: Seq[String]): Boolean = {
 schedulerBackend match {
   case b: ExecutorAllocationClient =>
-b.killExecutors(executorIds, replace = false, force = 
true).nonEmpty
+require(executorAllocationManager.isEmpty,
--- End diff --

Hi, @squito , I'm quite questioned about the cases:
>  If you've got just one executor, and then you kill it, should your app 
sit with 0 executors?

if app sit with 0 executors, then pending tasks increase, which lead to 
`ExecutorAllocationManager` increases target number of executors. So, app will 
not always sit with 0 executors.

> Or even if you've got 10 executors, and you kill one -- when is dynamic 
allocation allowed to bump the total back up?

for this case, to be honest, I really do not get your point. But, it must 
blame my poor English.

And, what will happens if we use this method without 
`ExecutorAllocationManager `? Or do we really need adjust TargetNumExecutors 
(set `adjustTargetNumExecutors  = true` below) if we are not using 
`ExecutorAllocationManager `?

see these several lines in `killExecutors()`:
```
if (adjustTargetNumExecutors) {
  requestedTotalExecutors = math.max(requestedTotalExecutors - 
executorsToKill.size, 0)
  ...
  doRequestTotalExecutors(requestedTotalExecutors)
}
```
Set `adjustTargetNumExecutors  = true` will change 
`requestedTotalExecutors` . And IIUC, `requestedTotalExecutors ` is only used 
in dynamic allocation mode. So, if we are not  using `ExecutorAllocationManager 
`, allocation client will request `requestedTotalExecutors = 0`  number of 
executors to cluster manager (this is really terrible). But, actually, app 
without `ExecutorAllocationManager ` do not have a limit requesting executors 
(in default).

Actually, I think this series methods, including `killAndReplaceExecutor `, 
 `requestExecutors`, etc, are designed with dynamic allocation mode. And if we 
still want use these methods while app do not use `ExecutorAllocationManager`, 
we should not change `requestedTotalExecutors `, or even not request cluster 
manager with a specific number.

WDYT?




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21079: [SPARK-23992][CORE] ShuffleDependency does not ne...

2018-04-16 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21079#discussion_r181938225
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala ---
@@ -113,3 +118,24 @@ private[spark] class ShuffleMapTask(
 
   override def toString: String = "ShuffleMapTask(%d, %d)".format(stageId, 
partitionId)
 }
+
+object ShuffleMapTask extends Logging {
+  private val cache = CacheBuilder.newBuilder()
--- End diff --

Do we need to clear this `cache` at the end of a app ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20930: [SPARK-23811][Core] FetchFailed comes before Succ...

2018-04-16 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20930#discussion_r181729645
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -750,6 +752,10 @@ private[spark] class TaskSetManager(
   if (tasksSuccessful == numTasks) {
 isZombie = true
   }
+} else if (fetchFailedTaskIndexSet.contains(index)) {
+  logInfo("Ignoring task-finished event for " + info.id + " in stage " 
+ taskSet.id +
+" because task " + index + " has already failed by FetchFailed")
+  return
--- End diff --

We can not simply `return` here. And we should always send a task 
`CompletionEvent` to DAG, in case of there's any listeners are waiting for it. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20930: [SPARK-23811][Core] FetchFailed comes before Succ...

2018-04-16 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20930#discussion_r181732788
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -750,6 +752,10 @@ private[spark] class TaskSetManager(
   if (tasksSuccessful == numTasks) {
 isZombie = true
   }
+} else if (fetchFailedTaskIndexSet.contains(index)) {
+  logInfo("Ignoring task-finished event for " + info.id + " in stage " 
+ taskSet.id +
+" because task " + index + " has already failed by FetchFailed")
+  return
--- End diff --

Maybe, we can mark task as`FAILED` with `UnknownReason` here. And then, DAG 
will treat this task as no-op, and `registerMapOutput` will not be triggered. 
Though, it is not a elegant way.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20998: [SPARK-23888][CORE] speculative task should not r...

2018-04-11 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20998#discussion_r180937626
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala ---
@@ -880,8 +880,8 @@ class TaskSetManagerSuite extends SparkFunSuite with 
LocalSparkContext with Logg
 assert(manager.resourceOffer("execB", "host2", ANY).get.index === 3)
   }
 
-  test("speculative task should not run on a given host where another 
attempt " +
-"is already running on") {
+  test("SPARK-23888: speculative task should not run on a given host " +
+"where another attempt is already running on") {
--- End diff --

Sure. Also, do we need to reword PR and jira title? @squito 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   >