[GitHub] spark issue #22771: [SPARK-25773][Core]Cancel zombie tasks in a result stage...

2018-10-30 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/22771
  
LGTM


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22771: [SPARK-25773][Core]Cancel zombie tasks in a resul...

2018-10-25 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/22771#discussion_r228371144
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1364,6 +1385,21 @@ private[spark] class DAGScheduler(
   if (job.numFinished == job.numPartitions) {
 markStageAsFinished(resultStage)
 cleanupStateForJobAndIndependentStages(job)
+try {
+  // killAllTaskAttempts will fail if a 
SchedulerBackend does not implement
+  // killTask.
+  logInfo(s"Job ${job.jobId} is finished. Killing 
potential speculative or " +
+s"zombie tasks for this job")
--- End diff --

Yes, other log messages probably also aren't very good. Maybe what we need 
is some additional explanation in the docs somewhere.

The issue that I am having is that if the log messages say that Tasks are 
being killed or canceled or whatever, many users will assume that that means 
that the Tasks will no longer be running on the Executors. In fact, what it 
means is that the DAGScheduler isn't going to try to run them anymore, and that 
any previously started Tasks may or may not still be running or continue to run 
on the Executors -- it depends on whether the Tasks are interruptible and on 
whether the interrupt on cancel configuration is set to true. The log messages 
make sense if you understand that subtlety, so we should probably try to 
explain it more fully in the docs.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22771: [SPARK-25773][Core]Cancel zombie tasks in a resul...

2018-10-25 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/22771#discussion_r228354020
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1364,6 +1385,21 @@ private[spark] class DAGScheduler(
   if (job.numFinished == job.numPartitions) {
 markStageAsFinished(resultStage)
 cleanupStateForJobAndIndependentStages(job)
+try {
+  // killAllTaskAttempts will fail if a 
SchedulerBackend does not implement
+  // killTask.
+  logInfo(s"Job ${job.jobId} is finished. Killing 
potential speculative or " +
+s"zombie tasks for this job")
--- End diff --

Isn't this misleading/confusing if !shouldInterruptTaskThread? You can 
attempt to kill speculative or zombie Tasks in that case, but nothing will 
actually happen if SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL is false.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22144: [SPARK-24935][SQL] : Problem with Executing Hive UDF's f...

2018-10-24 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/22144
  
Thanks @tgravescs for your latest posts -- they've saved me from posting 
something similar in many respects but more strongly worded.

What is bothering me (not just in the discussion of this PR, but more 
broadly) is that we have individuals making declarative statements about 
whether something can or can't block a release, or that something "is not that 
important to Spark at this point", etc. -- things for which there is no 
supporting PMC vote or declared policy. It may be your opinion, @cloud-fan , 
that Hive compatibility should no longer be important to the Apache Spark 
project, and I have no problem with you expressing your opinion on the matter. 
That may even be the internal policy at your employer, I don't know. But you 
are just not in a position on your own to make this declaration for the Apache 
Spark project.

I don't mean to single you out, @cloud-fan , as the only offender, since 
this isn't a unique instance. For example, heading into a recent release we 
also saw individual declarations that the data correctness issue caused by the 
shuffle replay partitioning issue was not a blocker because it was not a 
regression or that it was not significant enough to alter the release schedule. 
Rather, my point is that things like release schedules, the declaration of 
release candidates, labeling JIRA tickets with "blocker", and de facto or even 
declared policy on regressions and release blockers are just tools in the 
service of the PMC. If, as was the case with the shuffle data correctness 
issue, PMC members think that the issue must be fixed before the next release, 
then release schedules, RC-status, other individuals' perceptions of importance 
to the project or of policy ultimately don't matter -- only the vote of the PMC 
does. What is concerning me is that, instead of efforts to persuade the 
 PMC members that something should not block the next release or should not be 
important to the project, I am seeing flat declarations that an issue is not a 
blocker or not important. That may serve to stifle work to immediately fix a 
bug, or to discourage other contributions, but I can assure that trying to make 
the PMC serve the tools instead of the other way around won't serve to persuade 
at least some PMC members on how they should vote.

Sorry, I guess I can't avoid wording things strongly after all.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22144: [SPARK-24935][SQL] : Problem with Executing Hive UDF's f...

2018-10-23 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/22144
  
Yes, @rxin I know that I was a little unfair to you in order to make my 
point sharper. Apologies. My concern is real, though.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22144: [SPARK-24935][SQL] : Problem with Executing Hive UDF's f...

2018-10-23 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/22144
  
@srowen I understand and agree. What bothers me is that the block-no block 
decision now often seems to be "not a regression; automatic no block" -- and 
that doesn't seem right to me.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22144: [SPARK-24935][SQL] : Problem with Executing Hive UDF's f...

2018-10-23 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/22144
  
> It’s certainly not a blocker since it’s not a new regression

I really hate this line of argument. Somehow we seem to have slipped from 
"if it is a regression, then we must block" to "if it is not a regression, then 
we don't necessarily have to block, but we might still choose to" to "if it is 
not a regression, then we necessarily should not block." I'm not at all 
comfortable with what now appears to be our de facto policy on regressions and 
blocking releases.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22771: [SPARK-25773][Core]Cancel zombie tasks in a result stage...

2018-10-23 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/22771
  
> We can at least try to abort the tasks and still honors the interrupt on 
cancel flag. It seems like best case is things actually get killed and we free 
up resources, worst case seems to be that the task ignores the interrupt and 
continues just like now.

Right: Unless we can get to the point where we can confidently switch the 
interrupt on cancel default to true, efforts to more aggressively clean up 
executing but no longer needed Tasks won't accomplish anything unless that flag 
has been explicitly changed by the user. That is the most significant reason 
why the coding of such clean up efforts has not been attempted in the past.

> Do you have specific concerns where this would actually cause problems

Not really, and that was kind of the point of SPARK-17064: For a long time, 
we've had significant questions about what can happen when trying to interrupt 
Tasks, and while some of them were at least once connected to HDFS client or 
other libraries not handling interrupt requests correctly, we've never (to my 
knowledge) had concrete knowledge of specific effects resulting from one or 
more code paths. Mostly the concerns fall into two categories: 1) Not enough 
happening when we try to interrupt a Task; 2) Too much happening when we try to 
interrupt a Task. The first is what we already touched on: That requesting 
interruption of a Task may not stop the Task. That means that we can't write 
code that depends on resources being freed, etc. just because we tried to 
interrupt a Task. The follow-on question from that is whether we should try to 
detect this and do something more aggressive (such as killing the JVM) in order 
to guarantee resource availability. The second category of concern
  is essentially that interrupting a Task may cause side-effects that produce 
corrupt data or state, or that cause a Task to report a failure back to the 
DAGScheduler. Enough such Task failures can not only lead to Stage and Job 
failure, but also blacklisting of an Executor. The reporting of failures from 
an interrupted Task can probably be worked around with careful accounting in 
the DAGScheduler (i.e. to ignore any such reports from a Task that we already 
tried to stop), but the other unintended side-effects are more nebulous and 
insidious, since they would as likely as not be coming from library 
dependencies, rather than core Spark code. The end result of that is that we 
probably can't do away with the interrupt on cancel flag entirely even if we 
can get to the point where we can change the default to true -- some user Tasks 
may need to depend on a broken library. And that gets us back to the issue of 
not being able to depend on resources being freed, etc. just because we tried to
  interrupt a Task.

It's great to see more people looking at the scheduler code, since many of 
us who once spent more time on it can't do so as easily now; but it does 
concern me that sometimes old TODO items are picked up and reviewed rather 
naively. If it seems like there is a simple way to resolve a TODO, I'd 
encourage contributors to at least try to wake up those of us who wrote some of 
the older code. If the change really could be done easily and safely, we likely 
already would have done it. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22771: [SPARK-25773][Core]Cancel zombie tasks in a result stage...

2018-10-22 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/22771
  
There are long-standing questions here that I don't think have yet been 
adequately answered -- cf. https://issues.apache.org/jira/browse/SPARK-17064


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22463: remove annotation @Experimental

2018-09-19 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/22463
  
Where is the discussion on these utility methods no longer being 
Experimental? I'm not saying that they are not stable, but the Kafka 0.10 API 
in general being considered to be stable doesn't preclude some Kafka-related 
methods from still being other than stable, and promoting API to stable (i.e. 
cannot be changed without a major release) is a pretty big deal.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22112: [SPARK-23243][Core] Fix RDD.repartition() data co...

2018-08-27 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/22112#discussion_r213061324
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -1918,3 +1980,19 @@ object RDD {
 new DoubleRDDFunctions(rdd.map(x => num.toDouble(x)))
   }
 }
+
+/**
+ * The random level of RDD's output (i.e. what `RDD#compute` returns), 
which indicates how the
+ * output will diff when Spark reruns the tasks for the RDD. There are 3 
random levels, ordered
+ * by the randomness from low to high:
--- End diff --

Again, please remove "random" and "randomness". The issue is not 
randomness, but rather determinism. For example, the output of `RDD#compute` 
could be completely non-random but still dependent on state not contained in 
the RDD. That would still make it problematic in terms of recomputing only some 
partitions and aggregating the results.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22112: [SPARK-23243][Core] Fix RDD.repartition() data co...

2018-08-23 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/22112#discussion_r212395101
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -1876,6 +1920,22 @@ abstract class RDD[T: ClassTag](
  */
 object RDD {
 
+  /**
+   * The random level of RDD's computing function, which indicates the 
behavior when rerun the
+   * computing function. There are 3 random levels, ordered by the 
randomness from low to high:
+   * 1. IDEMPOTENT: The computing function always return the same result 
with same order when rerun.
+   * 2. UNORDERED: The computing function returns same data set in 
potentially a different order
+   *   when rerun.
+   * 3. INDETERMINATE. The computing function may return totally different 
result when rerun.
+   *
+   * Note that, the output of the computing function usually relies on 
parent RDDs. When a
+   * parent RDD's computing function is random, it's very likely this 
computing function is also
+   * random.
+   */
+  object RandomLevel extends Enumeration {
--- End diff --

I'm not completely wedded to the IDEMPOTENT, UNORDERED, INDETERMINATE 
naming, so if somebody has something better or less likely to lead to 
confusion, I'm fine with that.

I'd like to not use "random" in these names, though, since that implies 
actually randomness at some level, entropy guarantees, etc. What is key is not 
whether output values or ordering are truly random, but simply that we can't 
easily determine what they are or that they are fixed and repeatable. That's 
why I'd prefer that things like `RDD.RandomLevel.INDETERMINATE` be, I would 
suggest, `RDD.Determinism.INDETERMINATE`, and `computingRandomLevel` should be 
`computeDeterminism` (unless we want the slightly cheeky `determineDeterminism` 
:) ).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22176: [SPARK-25181][CORE] Limit Thread Pool size in BlockManag...

2018-08-22 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/22176
  
Yes, this is better than what we had, but maybe it can be better still.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22176: [SPARK-25181][CORE] Limit Thread Pool size in BlockManag...

2018-08-22 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/22176
  
@zsxwing we really should have considered whether this should be a 
configuration variable instead of a fixed number of threads in any environment.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22112: [SPARK-23243][Core] Fix RDD.repartition() data correctne...

2018-08-22 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/22112
  
I'm not a fan of the IDEMPOTENT, RANDOM_ORDER, COMPLETE_RANDOM naming. 
IDEMPOTENT is fine, but I'd prefer UNORDERED and INDETERMINATE to cover the 
cases of "same values in potentially a different order" and "potentially 
different values and different order" respectively. RANDOM implies something 
too specific.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21698: [SPARK-23243][Core] Fix RDD.repartition() data correctne...

2018-08-13 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/21698
  
> I really disagree with this.

I really agree with Tom. At this point, I think the working assumption 
should be that any 2.4.0 release candidate that doesn't deliver some fix for 
this issue will receive multiple -1 votes from PMC members.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22039: [SPARK-25036][SQL][FOLLOW-UP] Avoid match may not be exh...

2018-08-08 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/22039
  
> Yes, "quick hack", but, as opposed to what in these specific cases?

Yes, that is the key question. I'll admit, I haven't looked at all deeply 
to try to figure out whether something better is possible, so this is just my 
knee-jerk reaction: If you don't have to, then don't use a catch-all just to 
silence the compiler warning. If you're satisfied that there isn't a better 
option, then I'll accept that second-best is the best we can do here. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22039: [SPARK-25036][SQL][FOLLOW-UP] Avoid match may not be exh...

2018-08-08 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/22039
  
Hmmm... sorry to be late to this, but making pattern matches exhaustive by 
adding a catch-all case that then throws an exception, while easy, should be 
considered as a less than optimal fix. Ideally, the type handling should be 
tightened up so that the match can be exhaustive without a catch-all. The 
reason for this is that if in the future a type is added such that the pattern 
match should be extended to handle that type, then the presence of a catch-all 
will give the false impression that the new type is being handled, no compiler 
warning will be thrown, etc. If the pattern match is made exhaustive without a 
catch-all and the compiler option to convert warnings to errors is used, then 
it becomes pretty much impossible that future type additions/additional 
necessary cases will be silently mishandled.

Now I realize that it is not always feasible to achieve that level of type 
safety in Scala code, but has adequate consideration been given to making the 
effort here, or was this just a quick hack to make the compiler shut up?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21589: [SPARK-24591][CORE] Number of cores and executors in the...

2018-07-18 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/21589
  
Thank you, @HyukjinKwon 

There are a significant number of Spark users who use the Job Scheduler 
model with a SparkContext shared across many users and many Jobs. Promoting 
tools and patterns based upon the number of core or executors that a 
SparkContext has access to, encouraging users to create Jobs that try to use 
all of the available cores, very much leads those users in the wrong direction.

As much as possible, the public API should target policy that addresses 
real user problems (all users, not just a subset), and avoid targeting the 
particulars of Spark's internal implementation. A `repartition` that is 
extended to support policy or goal declarations (things along the lines of 
`repartition(availableCores)`, `repartition(availableDataNodes)`, 
`repartition(availableExecutors)`, `repartition(unreservedCores)`, etc.), 
relying upon Spark's internals (with it's compete knowledge of the total number 
of cores and executors, scheduling pool shares, number of reserved Task nodes 
sought in barrier scheduling, number of active Jobs, Stages, Tasks and 
Sessions, etc.) may be something that I can get behind. Exposing a couple of 
current Spark scheduler implementation details in the expectation that some 
subset of users in some subset of use cases will be able to make correct use of 
them is not. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21589: [SPARK-24591][CORE] Number of cores and executors in the...

2018-07-18 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/21589
  
I don't accept you assertions of what constitutes the majority and minority 
of Spark users or use cases or their relative importance. As a long-time 
maintainer of the Spark scheduler, it is also not my concern to define which 
Spark users are important or not, but rather to foster system internals and a 
public API that benefit all users.

I already have pointed out with some specificity how exposing the 
scheduler's low-level accounting of the number of cores or executors that are 
available at some point can encourage anti-patterns and sub-optimal Job 
execution. Repartitioning based upon a snapshot of the number of cores 
available cluster-wide is clearly not the correct thing to do in many instances 
and use cases. Beyond concern for users, as a developer of Spark internals, I 
don't appreciate being pinned to particular implementation details by having 
them directly exposed to users.

And I'll repeat, this JIRA and PR look to be defining the problem to fit a 
preconception of the solution. Even for the particular users and use cases 
targeted by this PR, I wouldn't expect that those users would embrace "I can't 
repartition based upon the scheduler's notion of the number of cores in the 
cluster at some point" as a more accurate statement of their problem than "My 
Spark Jobs don't use all of the CPU resources that I am entitled to use." Even 
if we were to stipulate that in a `repartition` call is inherently the only or 
best place to try to address that real user problem (and I far from convinced 
that this is the only or best approach), then I'd be far happier with extending 
the `repartition` API to include declarative goals than exposing to users only 
part of what is needed from Spark's internal to figure out what is the best 
repartitioning -- perhaps something along the lines of 
`repartition(MaximizeCPUs)` or other appropriate policy/goal enumerations.

And spark packages are not irrelevant here. In fact, a large part of their 
motivation was to handle extensions that are not appropriate for all users or 
to prove out ideas and APIs that are not yet clearly appropriate for inclusion 
in Spark itself. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21589: [SPARK-24591][CORE] Number of cores and executors in the...

2018-07-18 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/21589
  
It is precisely because the audience that I am concerned with is not 
limited to just data scientists or notebook users and their particular needs 
that I am far from convinced that exposing internals of the Spark scheduler in 
the public API is a good idea.

There are many ways that a higher-level declaration could be made. I'm not 
committed to any particular model at this point. The way that it is done for 
scheduling pools via `sc.setLocalProperty` is one way that Job execution can be 
put into a particular declarative context. That's not necessarily the best way 
to do it, but it isn't necessarily more difficult than figuring out correct 
imperative code after fetching a snapshot of the number of available cores at 
some point.

Doing this the right way likely requires an appropriate SPIP, not just a 
quick hack PR.

A spark-package would be another way to expose additional functionality 
without it needing to be bound into the Spark public API.  


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21589: [SPARK-24591][CORE] Number of cores and executors in the...

2018-07-18 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/21589
  
@ssimeonov the purpose of a public API is not to offer hack solutions to a 
subset of problems. What is needed is a high-level, declarative abstraction 
that can be used to specify requested Job resource-usage policy. Exposing 
low-level Spark scheduling internals is not the way to achieve that.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21589: [SPARK-24591][CORE] Number of cores and executors in the...

2018-07-18 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/21589
  
No, defaultParallelism isn't more useful in that case, but that just starts 
getting to my overall assessment of this JIRA and PR: It smells of defining the 
problem to align with a preconception of the solution.

Exposing the driver's current accounting of the number of cores active in 
the cluster is not something that we couldn't do or didn't know how to do along 
time ago. Rather, it is something that those of us working on the scheduler 
chose not to do because of the expectation that putting this in the public API 
(and thereby implicitly encouraging its use) was likely to produce as many 
problems as it solves. This was primarily because of two factors: 1) The number 
of cores and executors is not static; 2) Closely tailoring a Job to some 
expectation of the number of available cores or executors is not obviously a 
correct thing to encourage in general. 

Whether from node failures, dynamic executor allocation, backend scheduler 
elasticity/preemption, or just other Jobs running under the same SparkContext, 
the number of cores and executors available to any particular Job when it is 
created can easily be different from what is available when any of its Stages 
actually runs.

Even if you could get reliable numbers for the cores and executors that 
will be available through the lifecycle of a Job, tailoring a Job to use all of 
those cores and executors is only the right thing to do in a subset of Spark 
use cases. For example, using many more executors than there are DFS partitions 
holding the data, or trying to use all of the cores when there are other Jobs 
pending, or trying to use all of the cores when another Job needs to acquire a 
minimum number for barrier scheduled execution, or trying to use more cores 
than a scheduling pool permits would all be examples of anti-patterns that 
would be more enabled by easy, context-free access to low-level numCores.

There definitely are use cases where users need to be able to set policy 
for whether particular jobs should be encouraged to use more or less of the 
cluster's resources, but I believe that that needs to be done at a much higher 
level of abstraction in a declarative form, and that policy likely needs to be 
enforced dynamically/adaptively at Stage boundaries. The under-developed and 
under-used dynamic shuffle partitioning code in Spark SQL starts to go in that 
direction. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21589: [SPARK-24591][CORE] Number of cores and executors in the...

2018-07-18 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/21589
  
@mridulm scheduler pools could also make the cluster-wide resource numbers 
not very meaningful. I don't think the maxShare work has been merged yet (kind 
of a stalled TODO on an open PR, IIRC), but once that is in, it's not terribly 
useful to know, e.g., that there are 5 million cores in the cluster if your Job 
is running in a scheduler pool that is restricted to using far fewer CPUs via 
the pool's maxShares.   


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21754: [SPARK-24705][SQL] Cannot reuse an exchange opera...

2018-07-18 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/21754#discussion_r203416454
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala 
---
@@ -85,14 +85,20 @@ case class ReusedExchangeExec(override val output: 
Seq[Attribute], child: Exchan
  */
 case class ReuseExchange(conf: SQLConf) extends Rule[SparkPlan] {
 
+  private def supportReuseExchange(exchange: Exchange): Boolean = exchange 
match {
+// If a coordinator defined in an exchange operator, the exchange 
cannot be reused
--- End diff --

This seems overstated if this comment in the JIRA description is correct: 
"When the cache tabel device_loc is executed before this query is executed, 
everything is fine". In fact, if Xiao Li is correct in that statement, then 
this PR is eliminating a useful optimization in cases where it doesn't need to 
-- i.e. it is preventing Exchange reuse any time adaptive execution is used 
instead of only preventing reuse when it will actually cause a problem.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21598: [SPARK-24605][SQL] size(null) returns null instead of -1

2018-06-25 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/21598
  
> case by case

Yes, but... this by itself makes the decision appear far too discretionary. 
Instead, in any PR where you are changing the published interface or behavior 
of part of Spark's public API, you should be highlighting the change for 
additional review and providing a really strong argument for why we cannot 
retain the prior interface and/or default behavior. It is simply not up to an 
individual committer to decide on their own discretion that the public API 
should be different than what it, in fact, is. Changing the public API is a big 
deal -- which is why most additions to the public API should, in my opinion, 
come in with InterfaceStability annotation that will allow us to change them 
before a new major-number release.

This doesn't apply to changes to internal APIs. Neither does it apply to 
bug fixes where Spark isn't actually doing what the public API says it is 
supposed to do -- although in cases where we expect that users have come to 
safely rely upon certain buggy behavior, we may choose to retain that buggy 
behavior under a new configuration setting.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21598: [SPARK-24605][SQL] size(null) returns null instead of -1

2018-06-23 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/21598
  
@HyukjinKwon this is not new policy. It is what Apache Spark has guaranteed 
in its version numbering and public API since 1.0.0. It is not a matter of 
"from now on", but rather of whether committers have started allowing our 
standards to slip. It may well be time for a discussion of that and of better 
tools to help guarantee that additions and changes to the public API are 
adequately discussed and reviewed, appropriate InterfaceStability annotations 
are applied, etc.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21598: [SPARK-24605][SQL] size(null) returns null instead of -1

2018-06-21 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/21598
  
> so we can't just change the default value in a feature release

Agreed. Once a particular interface and behavior is in our released public 
API, then we effectively have a contract not to change that behavior. If we are 
going to provide another behavior before making a new major-number release 
(e.g. spark-3.0.0), then we have to provide a user configuration option to 
select that new behavior, and the default behavior if a user doesn't change 
configuration must be the same as before the optional new behavior.

If there is a clear, severe bug (such as data loss or corruption), only 
then we can consider changing the public API before making a new major-number 
release -- but even then we are likely to either go immediately to a new 
major-number or to at least preserve the old, buggy behavior with a 
configuration option. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21527: [SPARK-24519] MapStatus has 2000 hardcoded

2018-06-18 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/21527
  
Sure, as long as we are not telling users that this is something that they 
can or should use, that's fine.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21527: [SPARK-24519] MapStatus has 2000 hardcoded

2018-06-12 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/21527
  
> we can definitely update the description with more details.

Eventually, some of the motivation and advice/suggestions need to get into 
the main user docs, as well.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21527: [SPARK-24519] MapStatus has 2000 hardcoded

2018-06-11 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/21527
  
@tgravescs If there is value in making it configurable, that is all fine 
and good. My argument is against making it configurable just for the sake of 
making it configurable. If there is more going on than that, then at a minimum 
we need documentation explaining why and when this number should be changed, 
and suggestions of appropriate values.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21527: [SPARK-24519] MapStatus has 2000 hardcoded

2018-06-11 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/21527
  
> We should make it configurable.

That's a debatable assertion all by itself -- and quite unfortunately, 
there is no more justification for this claim in the JIRA ticket. Without proof 
that there is a better configuration under certain circumstances, and without 
guidance as to why and how this value should be reconfigured, I'd argue that 
adding another knob for users to twiddle just adds confusion, potential for 
misconfiguration and more configuration space that is unlikely to be adequately 
tested.  


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...

2018-05-26 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/21440#discussion_r191063339
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -659,6 +659,11 @@ private[spark] class BlockManager(
* Get block from remote block managers as serialized bytes.
*/
   def getRemoteBytes(blockId: BlockId): Option[ChunkedByteBuffer] = {
+// TODO if we change this method to return the ManagedBuffer, then 
getRemoteValues
+// could just use the inputStream on the temp file, rather than 
memory-mapping the file.
+// Until then, replication can go cause the process to use too much 
memory and get killed
--- End diff --

grammar


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20930: [SPARK-23811][Core] FetchFailed comes before Succ...

2018-04-26 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/20930#discussion_r184462542
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
@@ -2399,6 +2399,84 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with TimeLi
 }
   }
 
+  /**
+   * This tests the case where origin task success after speculative task 
got FetchFailed
+   * before.
+   */
+  test("SPARK-23811: ShuffleMapStage failed by FetchFailed should ignore 
following " +
+"successful tasks") {
+// Create 3 RDDs with shuffle dependencies on each other: rddA <--- 
rddB <--- rddC
+val rddA = new MyRDD(sc, 2, Nil)
+val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2))
+val shuffleIdA = shuffleDepA.shuffleId
+
+val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = 
mapOutputTracker)
+val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(2))
+
+val rddC = new MyRDD(sc, 2, List(shuffleDepB), tracker = 
mapOutputTracker)
+
+submit(rddC, Array(0, 1))
+
+// Complete both tasks in rddA.
+assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0)
+complete(taskSets(0), Seq(
+  (Success, makeMapStatus("hostA", 2)),
+  (Success, makeMapStatus("hostB", 2
+
+// The first task success
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(0), Success, makeMapStatus("hostB", 2)))
+
+// The second task's speculative attempt fails first, but task self 
still running.
+// This may caused by ExecutorLost.
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(1),
+  FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, 
"ignored"),
+  null))
+// Check currently missing partition.
+
assert(mapOutputTracker.findMissingPartitions(shuffleDepB.shuffleId).get.size 
=== 1)
+// The second result task self success soon.
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(1), Success, makeMapStatus("hostB", 2)))
+// Missing partition number should not change, otherwise it will cause 
child stage
+// never succeed.
+
assert(mapOutputTracker.findMissingPartitions(shuffleDepB.shuffleId).get.size 
=== 1)
+  }
+
+  test("SPARK-23811: check ResultStage failed by FetchFailed can ignore 
following " +
+"successful tasks") {
+val rddA = new MyRDD(sc, 2, Nil)
+val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2))
+val shuffleIdA = shuffleDepA.shuffleId
+val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = 
mapOutputTracker)
+submit(rddB, Array(0, 1))
+
+// Complete both tasks in rddA.
+assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0)
+complete(taskSets(0), Seq(
+  (Success, makeMapStatus("hostA", 2)),
+  (Success, makeMapStatus("hostB", 2
+
+// The first task of rddB success
+assert(taskSets(1).tasks(0).isInstanceOf[ResultTask[_, _]])
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(0), Success, makeMapStatus("hostB", 2)))
+
+// The second task's speculative attempt fails first, but task self 
still running.
+// This may caused by ExecutorLost.
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(1),
+  FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, 
"ignored"),
+  null))
+// Make sure failedStage is not empty now
+assert(scheduler.failedStages.nonEmpty)
+// The second result task self success soon.
+assert(taskSets(1).tasks(1).isInstanceOf[ResultTask[_, _]])
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(1), Success, makeMapStatus("hostB", 2)))
+assertDataStructuresEmpty()
--- End diff --

Right. It is a check that we are cleaning up the contents of the 
DAGScheduler's data structures so that they do not grow without bound over time.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21096: [SPARK-24011][CORE] cache rdd's immediate parent Shuffle...

2018-04-18 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/21096
  
As is, this PR isn't acceptable on multiple levels. Even if I were 
convinced (which I am not presently) that the sequence of 
`getShuffleDependencies` calls covered in this PR is the only one possible and 
therefore can be cached/memoized as this PR does, a complete lack of test 
coverage for the new code is not something we want in something as critical as 
the DAGScheduler. And I can tell you right now that there is at least one 
missing test that this PR will fail. You are adding a new data structure to the 
DAGScheduler, so at a bare minimum you also need to add that HashMap to 
`assertDataStructuresEmpty` in the DAGSchedulerSuite. Never deleting unneeded 
elements from a DAGScheduler data structure is an unacceptable resource leak.   


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21071: [SPARK-21962][CORE] Distributed Tracing in Spark

2018-04-16 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/21071
  
@rxin +1 for each of your sentences.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20881: Add a note about jobs running in FIFO order in th...

2018-04-03 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/20881#discussion_r178928945
  
--- Diff: docs/job-scheduling.md ---
@@ -215,6 +215,9 @@ pool), but inside each pool, jobs run in FIFO order. 
For example, if you create
 means that each user will get an equal share of the cluster, and that each 
user's queries will run in
 order instead of later queries taking resources from that user's earlier 
ones.
 
+If jobs are not explicitly set to use a given pool, they end up in the 
default pool. This means that even if
+`spark.scheduler.mode` is set to `FAIR` those jobs will be run in `FIFO` 
order (within the default pool).
+
--- End diff --

You seem to be missing a few somethings: 1) You can define your own default 
pool that does FAIR scheduling within that pool, so blanket statements about 
"the" default pool are dangerous; 2) `spark.scheduler.mode` controls the setup 
of the rootPool, not the scheduling within any pool; 3) If you don't define 
your own pool with a name corresponding to the DEFAULT_POOL_NAME (i.e. 
"default"), then you are going to get a default construction of "default", 
which does use FIFO scheduling within that pool.

So, item 2) effectively means that `spark.scheduler.mode` controls whether 
fair scheduling is possible at all, and it also defines the kind of scheduling 
that is used among the shedulable entities contained in the root pool -- i.e. 
among the scheduling pools nested within rootPool. One of those nested pools 
will be DEFAULT_POOL_NAME/"default", which will use FIFO scheduling for 
schedulable entities within that pool if you haven't defined it to use fair 
scheduling.

If you just want one scheduling pool that does fair scheduling among its 
schedulable entities, then you need to set `spark.scheduler.mode` to "FAIR" in 
your SparkConf and _also_ define in the pool configuration file a "default" 
pool to use schedulingMode FAIR. You could alternatively define such a 
fair-scheduling-inside pool named something other than "default" and then make 
sure that all of your jobs get assigned to that pool.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20881: Add a note about jobs running in FIFO order in th...

2018-04-03 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/20881#discussion_r178886364
  
--- Diff: docs/job-scheduling.md ---
@@ -215,6 +215,9 @@ pool), but inside each pool, jobs run in FIFO order. 
For example, if you create
 means that each user will get an equal share of the cluster, and that each 
user's queries will run in
 order instead of later queries taking resources from that user's earlier 
ones.
 
+If jobs are not explicitly set to use a given pool, they end up in the 
default pool. This means that even if
+`spark.scheduler.mode` is set to `FAIR` those jobs will be run in `FIFO` 
order (within the default pool).
+
--- End diff --

This is not actually correct. There is no reason why you can't define a 
default pool that uses FAIR scheduling.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20770: [SPARK-23626][CORE] DAGScheduler blocked due to JobSubmi...

2018-03-08 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/20770
  
@squito is the master of DAGSchedulerSuite, and can provide you the best 
advice on changing or adding to the existing DAGSchedulerSuite. I'll be back 
from skiing next week and try to find some time to look at this. Hopefully 
@kayousterhout can find some time too. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20016: SPARK-22830 Scala Coding style has been improved ...

2017-12-19 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/20016#discussion_r157790330
  
--- Diff: 
examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala ---
@@ -42,7 +42,7 @@ object BroadcastTest {
 val arr1 = (0 until num).toArray
 
 for (i <- 0 until 3) {
-  println("Iteration " + i)
+  println(s"Iteration ${i}")
--- End diff --

Beyond the unnecessary { } that @srowen has already mentioned, this isn't 
really a style improvement. `"a string " + anotherString` is arguably at least 
as good stylistically as using string interpolation for such simple 
concatenations of a string reference to the end of a string literal. It's only 
when there are multiple concatenations and/or multiple string references that 
interpolation is clearly the better way.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...

2017-11-15 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/19468#discussion_r151259662
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
 ---
@@ -0,0 +1,427 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.io.Closeable
+import java.net.InetAddress
+import java.util.concurrent.{ConcurrentHashMap, ExecutorService, 
ScheduledExecutorService, TimeUnit}
+import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, 
AtomicReference}
+import javax.annotation.concurrent.GuardedBy
+
+import io.fabric8.kubernetes.api.model._
+import io.fabric8.kubernetes.client.{KubernetesClient, 
KubernetesClientException, Watcher}
+import io.fabric8.kubernetes.client.Watcher.Action
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.concurrent.{ExecutionContext, Future}
+
+import org.apache.spark.SparkException
+import org.apache.spark.deploy.k8s.config._
+import org.apache.spark.deploy.k8s.constants._
+import org.apache.spark.rpc.{RpcAddress, RpcEndpointAddress, RpcEnv}
+import org.apache.spark.scheduler.{ExecutorExited, SlaveLost, 
TaskSchedulerImpl}
+import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, 
SchedulerBackendUtils}
+import org.apache.spark.util.Utils
+
+private[spark] class KubernetesClusterSchedulerBackend(
+scheduler: TaskSchedulerImpl,
+rpcEnv: RpcEnv,
+executorPodFactory: ExecutorPodFactory,
+kubernetesClient: KubernetesClient,
+allocatorExecutor: ScheduledExecutorService,
+requestExecutorsService: ExecutorService)
+  extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv) {
+
+  import KubernetesClusterSchedulerBackend._
+
+  private val EXECUTOR_ID_COUNTER = new AtomicLong(0L)
+  private val RUNNING_EXECUTOR_PODS_LOCK = new Object
+  @GuardedBy("RUNNING_EXECUTOR_PODS_LOCK")
+  private val runningExecutorsToPods = new mutable.HashMap[String, Pod]
+  private val executorPodsByIPs = new ConcurrentHashMap[String, Pod]()
+  private val podsWithKnownExitReasons = new ConcurrentHashMap[String, 
ExecutorExited]()
+  private val disconnectedPodsByExecutorIdPendingRemoval = new 
ConcurrentHashMap[String, Pod]()
+
+  private val kubernetesNamespace = conf.get(KUBERNETES_NAMESPACE)
+
+  private val kubernetesDriverPodName = conf
+.get(KUBERNETES_DRIVER_POD_NAME)
+.getOrElse(throw new SparkException("Must specify the driver pod 
name"))
+  private implicit val requestExecutorContext = 
ExecutionContext.fromExecutorService(
+requestExecutorsService)
+
+  private val driverPod = kubernetesClient.pods()
+.inNamespace(kubernetesNamespace)
+.withName(kubernetesDriverPodName)
+.get()
+
+  override val minRegisteredRatio =
+if 
(conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) {
+  0.8
+} else {
+  super.minRegisteredRatio
+}
+
+  private val executorWatchResource = new AtomicReference[Closeable]
+  protected val totalExpectedExecutors = new AtomicInteger(0)
+
+  private val driverUrl = RpcEndpointAddress(
+conf.get("spark.driver.host"),
+conf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT),
+CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
+
+  private val initialExecutors = 
SchedulerBackendUtils.getInitialTargetExecutorNumber(conf)
+
+  private val podAllocationInterval = 
conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY)
+
+  private val podAllocationSize = 
conf.get(KUBERNETES_ALLOCATION_BATCH_SIZE)
+
+  private val allocatorRunnable = new Runnable {
+
+// Maintains a map of executor id to count of checks performed to 
learn the loss reason
+// 

[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...

2017-10-20 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/19468#discussion_r146072523
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
 ---
@@ -0,0 +1,456 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.io.Closeable
+import java.net.InetAddress
+import java.util.concurrent.{ConcurrentHashMap, ExecutorService, 
ScheduledExecutorService, TimeUnit}
+import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, 
AtomicReference}
+
+import scala.collection.{concurrent, mutable}
+import scala.collection.JavaConverters._
+import scala.concurrent.{ExecutionContext, Future}
+
+import io.fabric8.kubernetes.api.model._
+import io.fabric8.kubernetes.client.{KubernetesClient, 
KubernetesClientException, Watcher}
+import io.fabric8.kubernetes.client.Watcher.Action
+
+import org.apache.spark.SparkException
+import org.apache.spark.deploy.k8s.config._
+import org.apache.spark.deploy.k8s.constants._
+import org.apache.spark.rpc.{RpcAddress, RpcEndpointAddress, RpcEnv}
+import org.apache.spark.scheduler.{ExecutorExited, SlaveLost, 
TaskSchedulerImpl}
+import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
+import org.apache.spark.util.Utils
+
+private[spark] class KubernetesClusterSchedulerBackend(
+scheduler: TaskSchedulerImpl,
+rpcEnv: RpcEnv,
+executorPodFactory: ExecutorPodFactory,
+kubernetesClient: KubernetesClient,
+allocatorExecutor: ScheduledExecutorService,
+requestExecutorsService: ExecutorService)
+  extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv) {
+
+  import KubernetesClusterSchedulerBackend._
+
+  private val EXECUTOR_ID_COUNTER = new AtomicLong(0L)
+  private val RUNNING_EXECUTOR_PODS_LOCK = new Object
+  // Indexed by executor IDs and guarded by RUNNING_EXECUTOR_PODS_LOCK.
+  private val runningExecutorsToPods = new mutable.HashMap[String, Pod]
+  // Indexed by executor pod names and guarded by 
RUNNING_EXECUTOR_PODS_LOCK.
+  private val runningPodsToExecutors = new mutable.HashMap[String, String]
+  private val executorPodsByIPs = new ConcurrentHashMap[String, Pod]()
+  private val podsWithKnownExitReasons = new ConcurrentHashMap[String, 
ExecutorExited]()
+  private val disconnectedPodsByExecutorIdPendingRemoval = new 
ConcurrentHashMap[String, Pod]()
+
+  private val kubernetesNamespace = conf.get(KUBERNETES_NAMESPACE)
+
+  private val kubernetesDriverPodName = conf
+.get(KUBERNETES_DRIVER_POD_NAME)
+.getOrElse(
+  throw new SparkException("Must specify the driver pod name"))
+  private implicit val requestExecutorContext = 
ExecutionContext.fromExecutorService(
+  requestExecutorsService)
+
+  private val driverPod = try {
+kubernetesClient.pods()
+  .inNamespace(kubernetesNamespace)
+  .withName(kubernetesDriverPodName)
+  .get()
+  } catch {
+case throwable: Throwable =>
+  logError(s"Executor cannot find driver pod.", throwable)
+  throw new SparkException(s"Executor cannot find driver pod", 
throwable)
+  }
+
+  override val minRegisteredRatio =
+if 
(conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) {
+  0.8
+} else {
+  super.minRegisteredRatio
+}
+
+  private val executorWatchResource = new AtomicReference[Closeable]
+  protected var totalExpectedExecutors = new AtomicInteger(0)
+
+  private val driverUrl = RpcEndpointAddress(
+  conf.get("spark.driver.host"),
+  conf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT),
+  CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
+
+  private val initialExecutors = getIniti

[GitHub] spark issue #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - Basic Sc...

2017-10-11 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/19468
  
> iron out the kinks

A large chunk of the difficulty in identifying and ironing out kinks in 
such a project is the difficulty of writing adequate tests of the scheduler 
code. I'd expect test coverage to take roughly the same amount of effort as all 
of the rest of the scheduler plug-in effort.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19287: [SPARK-22074][Core] Task killed by other attempt ...

2017-09-25 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/19287#discussion_r140788231
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala ---
@@ -93,6 +104,8 @@ class TaskInfo(
 
   def running: Boolean = !finished
 
+  def needResubmit: Boolean = !killedAttempt
--- End diff --

Since `killedAttempt`/`killedByAttempt` is never used except in this 
negated sense, it probably makes more sense to get rid of it and this 
`needResubmit` method entirely, and just have a `needResubmit` var with the 
opposite sense of `killedAttempt` -- but I'm also not all that happy with the 
`needResubmit` name.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19194: [SPARK-20589] Allow limiting task concurrency per...

2017-09-21 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/19194#discussion_r140340018
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -619,6 +625,47 @@ private[spark] class ExecutorAllocationManager(
 // place the executors.
 private val stageIdToExecutorPlacementHints = new mutable.HashMap[Int, 
(Int, Map[String, Int])]
 
+override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
+  jobStart.stageInfos.foreach(stageInfo => 
stageIdToJobId(stageInfo.stageId) = jobStart.jobId)
+
+  var jobGroupId = if (jobStart.properties != null) {
+jobStart.properties.getProperty(SparkContext.SPARK_JOB_GROUP_ID)
+  } else {
+null
+  }
+
+  val maxConTasks = if (jobGroupId != null &&
+conf.contains(s"spark.job.$jobGroupId.maxConcurrentTasks")) {
+conf.get(s"spark.job.$jobGroupId.maxConcurrentTasks").toInt
+  } else {
+Int.MaxValue
+  }
+
+  if (maxConTasks <= 0) {
+throw new IllegalArgumentException(
+  "Maximum Concurrent Tasks should be set greater than 0 for the 
job to progress.")
+  }
+
+  if (jobGroupId == null || 
!conf.contains(s"spark.job.$jobGroupId.maxConcurrentTasks")) {
+jobGroupId = DEFAULT_JOB_GROUP
+  }
+
+  jobIdToJobGroup(jobStart.jobId) = jobGroupId
+  if (!jobGroupToMaxConTasks.contains(jobGroupId)) {
--- End diff --

Do we even need mutable state associated with a job group? Some things 
would be a lot simpler if maxConTasks could only be set when the job group is 
created; and if you need a different number of maxConTasks, then you have to 
use a different job group.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19287: [SPARK-22074][Core] Task killed by other attempt ...

2017-09-20 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/19287#discussion_r140047573
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala ---
@@ -66,6 +66,12 @@ class TaskInfo(
*/
   var finishTime: Long = 0
 
+  /**
+   * Set this tag when this task killed by other attempt. This kind of 
task should not resubmit
+   * while executor lost.
+   */
--- End diff --

nit: calling it a "tag" doesn't aid understanding, but rather leaves me 
wondering why it was called a "tag" instead of a "var" or something more 
descriptive.

more consequential: Please explain what defines "this kind" of task. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19115: [SPARK-21882][CORE] OutputMetrics doesn't count written ...

2017-09-04 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/19115
  
And now I see that the title was changed to something more useful. Pardon 
any offense, the end result of the title changes look good.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19115: [SPARK-21882][CORE] OutputMetrics doesn't count written ...

2017-09-04 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/19115
  
I realize this PR is now closed, but to follow-up on Saisai's request 
concerning PR titles, I'll also note that the title of this PR isn't very 
useful even after the JIRA id and component tag are added. Titles like "fixed 
foo" or "updated bar" don't really tell reviewers or those looking at the 
commit logs in the future what the PR is about. The JIRA should tell us _why_ a 
change or addition is needed, the description in the PR should tell us _what_ 
was changed or added, and the PR title should give us enough of an idea of what 
is going on that we don't necessarily have to open the PR or look at the code 
changes just to see whether it is something that we are even at all interested 
in.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18805: [SPARK-19112][CORE] Support for ZStandard codec

2017-08-21 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/18805
  
In addition to LICENSE, there is also COPYING in the v1.3.1 release:

https://github.com/facebook/zstd/blob/v1.3.1/LICENSE
https://github.com/facebook/zstd/blob/v1.3.1/COPYING

I'm not going to pretend to have a fully informed opinion on this issue.

On Mon, Aug 21, 2017 at 11:18 AM, Dongjoon Hyun <notificati...@github.com>
wrote:

> Thank you for confirming, @vanzin <https://github.com/vanzin> ! I see.
>
> —
> You are receiving this because you were mentioned.
> Reply to this email directly, view it on GitHub
> <https://github.com/apache/spark/pull/18805#issuecomment-323815495>, or 
mute
> the thread
> 
<https://github.com/notifications/unsubscribe-auth/AAZ4-pqXm-R2schCRbHQawxygQGS9-q2ks5sacnxgaJpZM4Oqd2u>
> .
>



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18950: [SPARK-20589][Core][Scheduler] Allow limiting tas...

2017-08-15 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/18950#discussion_r133344532
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -602,6 +604,21 @@ private[spark] class ExecutorAllocationManager(
 // place the executors.
 private val stageIdToExecutorPlacementHints = new mutable.HashMap[Int, 
(Int, Map[String, Int])]
 
+override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
+  val jobGroupId = if (jobStart.properties != null) {
+jobStart.properties.getProperty(SparkContext.SPARK_JOB_GROUP_ID)
+  } else {
+""
+  }
+  val maxConcurrentTasks = 
conf.getInt(s"spark.job.$jobGroupId.maxConcurrentTasks",
+Int.MaxValue)
+
+  logInfo(s"Setting maximum concurrent tasks for group: ${jobGroupId} 
to $maxConcurrentTasks")
+  allocationManager.synchronized {
+allocationManager.maxConcurrentTasks = maxConcurrentTasks
--- End diff --

Ummm... what? It is entirely possible to set a job group, spawn a bunch of 
threads that will eventually create jobs in that job group, then set another 
job group and spawn more threads that will be creating jobs in this new group 
simultaneously with jobs being created in the prior group.  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18807: [SPARK-21601][BUILD] Modify the pom.xml file, increase t...

2017-08-02 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/18807
  
Yes, it doesn't really hurt anything except to be confusing cruft that has 
a tendency to accumulate in POMs. If we're going to put those lines back, I 
suggest that they be accompanied by a comment that they are known to be a 
useful assist for certain versions of IntelliJ and are not needed for the 
correctness of the build itself.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18807: [SPARK-21601][BUILD] Modify the pom.xml file, increase t...

2017-08-02 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/18807
  
Maven is the build of reference, true; but maven itself doesn't need the 
JDK version to be specified both in the scala plugin configuration and in the 
compiler plugin configuration. While I understand that IntelliJ is used by many 
Spark developers, it isn't some sort of officially preferred IDE of reference. 
That's why I have some qualms about embedding workarounds into our POM just to 
satisfy the idiosyncrasies of a particular IDE. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18807: [SPARK-21601][BUILD] Modify the pom.xml file, increase t...

2017-08-02 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/18807
  
Hmmm... that's arguably broken behavior on the part of IntelliJ or 
something to be worked around in IntelliJ configuration, not by hacking our 
POM. Without the POM hack, though, it is definitely something that deserves 
explanation in the Building Spark docs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18807: [SPARK-21601][BUILD] Modify the pom.xml file, increase t...

2017-08-01 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/18807
  
These are maven-compiler-plugin configurations. We don't use 
maven-compiler-plugin to compile Java code: 
https://github.com/apache/spark/commit/74cda94c5e496e29f42f1044aab90cab7dbe9d38


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18093: [WIP][SPARK-20774][SQL] Cancel all jobs when Quer...

2017-07-25 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/18093#discussion_r129382985
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala ---
@@ -89,8 +91,22 @@ class QueryExecution(val sparkSession: SparkSession, val 
logical: LogicalPlan) {
   lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan)
 
   /** Internal version of the RDD. Avoids copies and has no schema */
-  lazy val toRdd: RDD[InternalRow] = executedPlan.execute()
+  lazy val toRdd: RDD[InternalRow] = executePlan()
 
+  private def executePlan(): RDD[InternalRow] = {
+sparkSession.sparkContext.setJobGroup(runId.toString, 
getDescriptionString,
--- End diff --

Wait... aren't you clobbering any JobGroup that is already set?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17955: [SPARK-20715] Store MapStatuses only in MapOutputTracker...

2017-05-15 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/17955
  
@JoshRosen Yes, I agree that it is orthogonal -- at least for now. I'm 
mostly just offering a heads up that if we get around to addressing 
`interruptThread`, then there may also need to be some changes related to 
mapOutput tracking.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17955: [SPARK-20715] Store MapStatuses only in MapOutputTracker...

2017-05-15 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/17955
  
I've looked at only the DAGScheduler changes so far. They LGTM.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17955: [SPARK-20715] Store MapStatuses only in MapOutputTracker...

2017-05-15 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/17955
  
@JoshRosen The hard coding of interruptThread = true within 
TaskSetManager's handleSuccessfulTask to effect the killing of duplicate, 
speculative attempts of a task is potentially an issue -- not a new issue with 
this PR, but one that hasn't been fully analyzed and addressed AFAIK. 
https://issues.apache.org/jira/browse/SPARK-17064


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16165: [SPARK-8617] [WEBUI] HistoryServer: Include in-progress ...

2017-04-05 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/16165
  
@merlintang Marcelo's point remains the same for 2.1.1. We don't typically 
backport changes to maintenance branches unless they are fixes for regression 
errors or severe bugs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17522: [SPARK-18278] [Scheduler] Documentation to point ...

2017-04-03 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/17522#discussion_r109523328
  
--- Diff: docs/cluster-overview.md ---
@@ -52,7 +52,11 @@ The system currently supports three cluster managers:
 * [Apache Mesos](running-on-mesos.html) -- a general cluster manager that 
can also run Hadoop MapReduce
   and service applications.
 * [Hadoop YARN](running-on-yarn.html) -- the resource manager in Hadoop 2.
-
+* [Kubernetes 
(experimental)](https://github.com/apache-spark-on-k8s/spark) -- In addition to 
the above,
+there is experimental support for Kubernetes. Kubernetes is an open-source 
platform
+for providing container-centric infrastructure. Kubernetes support is 
being actively
+developed in an 
[apache-spark-on-k8s](https://github.com/apache-spark-on-k8s/) Github 
organization
+and will eventually merge into the Apache Spark project. For 
documentation, refer to that project's README.
--- End diff --

I'd change "will" to "may" or "is intended to" or something similar since 
there is not yet a formal, hard commitment to definitely merge this work into 
Apache Spark. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17485: [SPARK-20163] Kill all running tasks in a stage i...

2017-03-31 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/17485#discussion_r109174578
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -768,6 +767,19 @@ private[spark] class TaskSetManager(
   s" executor ${info.executorId}): ${reason.toErrorString}"
 val failureException: Option[Throwable] = reason match {
   case fetchFailed: FetchFailed =>
+if (!isZombie) {
+  for (i <- 0 until numTasks if i != index) {
+// Only for the first occurance of the fetch failure, kill all 
running
+// tasks in the task set
+for (attemptInfo <- taskAttempts(i) if attemptInfo.running) {
+  sched.backend.killTask(
+attemptInfo.taskId,
+attemptInfo.executorId,
+interruptThread = true,
--- End diff --

We can do it then, but there is still the question of whether we should do 
it. That discussion belongs in SPARK-20178.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17485: [SPARK-20163] Kill all running tasks in a stage i...

2017-03-30 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/17485#discussion_r109038854
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -768,6 +767,19 @@ private[spark] class TaskSetManager(
   s" executor ${info.executorId}): ${reason.toErrorString}"
 val failureException: Option[Throwable] = reason match {
   case fetchFailed: FetchFailed =>
+if (!isZombie) {
+  for (i <- 0 until numTasks if i != index) {
+// Only for the first occurance of the fetch failure, kill all 
running
+// tasks in the task set
+for (attemptInfo <- taskAttempts(i) if attemptInfo.running) {
+  sched.backend.killTask(
+attemptInfo.taskId,
+attemptInfo.executorId,
+interruptThread = true,
--- End diff --

That's not valid. We don't know that this can be done safely, which is why 
spark.job.interruptOnCancel defaults to false. SPARK-17064


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17297: [SPARK-14649][CORE] DagScheduler should not run duplicat...

2017-03-28 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/17297
  
Agreed. Let's establish what we want to do before trying to discuss the
details of how we are going to do it.

On Tue, Mar 28, 2017 at 8:17 AM, Imran Rashid <notificati...@github.com>
wrote:

> @sitalkedia <https://github.com/sitalkedia> This change is pretty
> contentious, there are lot of questions about whether or not this is a 
good
> change. I don't think discussing this here in github comments on a PR is
> the best form. I think of PR comments as being more about code details --
> clarity, tests, whether the implementation is correct, etc. But here we're
> discussing whether the behavior is even desirable, as well as trying to
> discuss this in relation to other changes. I think a better format would 
be
> for you to open a jira and submit a design document (maybe a shared google
> doc at first), where we can focus more on the desired behavior and 
consider
> all the changes, even if the PRs are smaller to make them easier to 
review.
>
> I'm explicitly *not* making a judgement on whether or not this is a good
> change. Also I do appreciate you having the code changes ready, as a POC,
> as that can help folks consider the complexity of the change. But it seems
> clear to me that first we need to come to a decision about the end goal.
>
> Also, assuming we do decide this is desirable behavior, there is also a
> question about how we can get changes like this in without risking 
breaking
> things -- I have started a thread on dev@ related to that topic in
> general, but we should figure that for these changes in particular as 
well.
>
> @kayousterhout <https://github.com/kayousterhout> @tgravescs
    > <https://github.com/tgravescs> @markhamstra
> <https://github.com/markhamstra> makes sense?
>
> —
> You are receiving this because you were mentioned.
> Reply to this email directly, view it on GitHub
> <https://github.com/apache/spark/pull/17297#issuecomment-289803690>, or 
mute
> the thread
> 
<https://github.com/notifications/unsubscribe-auth/AAZ4-pbaJWHOMCLOB2JZFReBYx0E1xOHks5rqSSTgaJpZM4MdN08>
> .
>



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17447: [SPARK-20117][Scheduler]TaskSetManager checkSpeculatable...

2017-03-27 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/17447
  
I wouldn't bother with the string interpolation change (there is a good 
argument to be made that string interpolation doesn't gain you anything in 
patterns like those in this PR where a single string is being concatenated to 
the end of another); and the var -> val change in a local variable isn't 
significant enough to justify a PR. Thanks for contributing, but please close 
this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17088: [SPARK-19753][CORE] Un-register all shuffle outpu...

2017-03-21 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/17088#discussion_r107286200
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1365,19 +1375,43 @@ class DAGScheduler(
*/
   private[scheduler] def handleExecutorLost(
   execId: String,
-  filesLost: Boolean,
-  maybeEpoch: Option[Long] = None) {
+  workerLost: Boolean): Unit = {
+// if the cluster manager explicitly tells us that the entire worker 
was lost, then
+// we know to unregister shuffle output.  (Note that "worker" 
specifically refers to the process
+// from a Standalone cluster, where the shuffle service lives in the 
Worker.)
+val filesLost = workerLost || 
!env.blockManager.externalShuffleServiceEnabled
+removeExecutorAndUnregisterOutputs(
+  execId = execId,
+  fileLost = filesLost,
--- End diff --

The `fileLost` vs. `filesLost` naming difference is a little confusing -- 
is the distinction even conveying a difference worth paying attention to?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17088: [SPARK-19753][CORE] Un-register all shuffle outpu...

2017-03-21 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/17088#discussion_r107284202
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1683,11 +1716,12 @@ private[scheduler] class 
DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler
   dagScheduler.handleExecutorAdded(execId, host)
 
 case ExecutorLost(execId, reason) =>
-  val filesLost = reason match {
-case SlaveLost(_, true) => true
+  val workerLost = reason match {
+case SlaveLost(_, true) =>
+  true
--- End diff --

nit: prefer it without the line break for something this simple


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17088: [SPARK-19753][CORE] Un-register all shuffle outpu...

2017-03-21 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/17088#discussion_r107284085
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1389,8 +1423,7 @@ class DAGScheduler(
 clearCacheLocs()
   }
 } else {
-  logDebug("Additional executor lost message for " + execId +
-   "(epoch " + currentEpoch + ")")
+  logDebug("Additional executor lost message for %s (epoch 
%d)".format(execId, currentEpoch))
--- End diff --

nit: prefer string interpolation over `format`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17088: [SPARK-19753][CORE] Un-register all shuffle outpu...

2017-03-21 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/17088#discussion_r107283229
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1331,7 +1328,20 @@ class DAGScheduler(
 
   // TODO: mark the executor as failed only if there were lots of 
fetch failures on it
   if (bmAddress != null) {
-handleExecutorLost(bmAddress.executorId, filesLost = true, 
Some(task.epoch))
+val hostToUnregisterOutputs = if 
(env.blockManager.externalShuffleServiceEnabled) {
+  // We had a fetch failure with the external shuffle service, 
so we
+  // assume all shuffle data on the node is bad.
+  Some(bmAddress.host)
+} else {
+  // Deregister shuffle data just for one executor (we don't 
have any
--- End diff --

nit: "Unregister" is used elsewhere (function names, etc.), not 
"deregister".


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17357: [SPARK-20025][CORE] Ignore SPARK_LOCAL* env, while deplo...

2017-03-20 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/17357
  
It's a lot better.  Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17297: [SPARK-14649][CORE] DagScheduler should not run d...

2017-03-20 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/17297#discussion_r107044660
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -929,12 +946,22 @@ class DAGScheduler(
 }
   }
 
-  /** Called when stage's parents are available and we can now do its 
task. */
+  /**
+   * Called when stage's parents are available and we can now run its task.
+   * This only submits the partitions which are missing and have not been
+   * submitted to the lower-level scheduler for execution.
+   */
   private def submitMissingTasks(stage: Stage, jobId: Int) {
 logDebug("submitMissingTasks(" + stage + ")")
 
-// First figure out the indexes of partition ids to compute.
-val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()
+val missingPartitions = stage.findMissingPartitions()
+val partitionsToCompute =
+  missingPartitions.filter(id => !stage.pendingPartitions.contains(id))
--- End diff --

```scala
missingPartitions.filterNot(stage.pendingPartitions)
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17297: [SPARK-14649][CORE] DagScheduler should not run d...

2017-03-20 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/17297#discussion_r107044272
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -803,6 +810,16 @@ class DAGScheduler(
 stageIdToStage.get(taskSet.stageId).foreach { abortStage(_, reason, 
exception) }
   }
 
+  private[scheduler] def handleTasksAborted(
+  stageId: Int,
+  tasks: Seq[Task[_]]): Unit = {
+for (stage <- stageIdToStage.get(stageId)) {
+  for (task <- tasks) {
+stage.pendingPartitions -= task.partitionId
+  }
+}
--- End diff --

```scala
for {
  stage <- stageIdToStage.get(stageId)
  task <- tasks
} stage.pendingPartitions -= task.partitionId
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17297: [SPARK-14649][CORE] DagScheduler should not run d...

2017-03-20 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/17297#discussion_r107040190
  
--- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
@@ -418,6 +424,15 @@ private[spark] class MapOutputTrackerMaster(conf: 
SparkConf,
 cachedSerializedStatuses.contains(shuffleId) || 
mapStatuses.contains(shuffleId)
   }
 
+  /** Get the epoch for map output for a shuffle, if it is available */
+  def getEpochForMapOutput(shuffleId: Int, mapId: Int): Option[Long] = {
+val arrayOpt = mapStatuses.get(shuffleId)
+if (arrayOpt.isDefined && arrayOpt.get != null && arrayOpt.get(mapId) 
!= null) {
+   return Some(epochForMapStatus.get(shuffleId).get(mapId))
+}
+None
+  }
--- End diff --

First, `arrayOpt.get != null` isn't necessary since we don't put `null` 
values into `mapStatuses`. Second, `epochForMapStatus.get(shuffleId).get` is 
the same as `epochForMapStatus(shuffleId)`. Third, I don't like all the 
explicit `get`s,`null` checks and the unnecessary non-local `return`. To my 
mind, this is better:
``` scala
  def getEpochForMapOutput(shuffleId: Int, mapId: Int): Option[Long] = {
for {
  mapStatus <- mapStatuses.get(shuffleId).flatMap { mapStatusArray =>
Option(mapStatusArray(mapId))
  }
} yield epochForMapStatus(shuffleId)(mapId)
  }
``` 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17297: [SPARK-14649][CORE] DagScheduler should not run d...

2017-03-20 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/17297#discussion_r107018874
  
--- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
@@ -378,15 +382,17 @@ private[spark] class MapOutputTrackerMaster(conf: 
SparkConf,
 val array = mapStatuses(shuffleId)
 array.synchronized {
   array(mapId) = status
+  val epochs = epochForMapStatus.get(shuffleId).get
--- End diff --

```scala
val epochs = epochForMapStatus(shuffleId)
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17297: [SPARK-14649][CORE] DagScheduler should not run d...

2017-03-20 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/17297#discussion_r107018555
  
--- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
@@ -378,15 +382,17 @@ private[spark] class MapOutputTrackerMaster(conf: 
SparkConf,
 val array = mapStatuses(shuffleId)
 array.synchronized {
   array(mapId) = status
+  val epochs = epochForMapStatus.get(shuffleId).get
+  epochs(mapId) = epoch
 }
   }
 
   /** Register multiple map output information for the given shuffle */
   def registerMapOutputs(shuffleId: Int, statuses: Array[MapStatus], 
changeEpoch: Boolean = false) {
-mapStatuses.put(shuffleId, statuses.clone())
 if (changeEpoch) {
   incrementEpoch()
 }
+mapStatuses.put(shuffleId, statuses.clone())
--- End diff --

What was the point of moving this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17297: [SPARK-14649][CORE] DagScheduler should not run d...

2017-03-20 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/17297#discussion_r107017201
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1265,64 +1280,11 @@ class DAGScheduler(
 val failedStage = stageIdToStage(task.stageId)
 val mapStage = shuffleIdToMapStage(shuffleId)
 
-if (failedStage.latestInfo.attemptId != task.stageAttemptId) {
-  logInfo(s"Ignoring fetch failure from $task as it's from 
$failedStage attempt" +
-s" ${task.stageAttemptId} and there is a more recent attempt 
for that stage " +
-s"(attempt ID ${failedStage.latestInfo.attemptId}) running")
-} else {
-  // It is likely that we receive multiple FetchFailed for a 
single stage (because we have
-  // multiple tasks running concurrently on different executors). 
In that case, it is
-  // possible the fetch failure has already been handled by the 
scheduler.
-  if (runningStages.contains(failedStage)) {
-logInfo(s"Marking $failedStage (${failedStage.name}) as failed 
" +
-  s"due to a fetch failure from $mapStage (${mapStage.name})")
-markStageAsFinished(failedStage, Some(failureMessage))
-  } else {
-logDebug(s"Received fetch failure from $task, but its from 
$failedStage which is no " +
-  s"longer running")
-  }
-
-  val shouldAbortStage =
-failedStage.failedOnFetchAndShouldAbort(task.stageAttemptId) ||
-disallowStageRetryForTest
-
-  if (shouldAbortStage) {
-val abortMessage = if (disallowStageRetryForTest) {
-  "Fetch failure will not retry stage due to testing config"
-} else {
-  s"""$failedStage (${failedStage.name})
- |has failed the maximum allowable number of
- |times: ${Stage.MAX_CONSECUTIVE_FETCH_FAILURES}.
- |Most recent failure reason: 
$failureMessage""".stripMargin.replaceAll("\n", " ")
-}
-abortStage(failedStage, abortMessage, None)
-  } else { // update failedStages and make sure a 
ResubmitFailedStages event is enqueued
-// TODO: Cancel running tasks in the failed stage -- cf. 
SPARK-17064
-val noResubmitEnqueued = !failedStages.contains(failedStage)
-failedStages += failedStage
-failedStages += mapStage
-if (noResubmitEnqueued) {
-  // We expect one executor failure to trigger many 
FetchFailures in rapid succession,
-  // but all of those task failures can typically be handled 
by a single resubmission of
-  // the failed stage.  We avoid flooding the scheduler's 
event queue with resubmit
-  // messages by checking whether a resubmit is already in the 
event queue for the
-  // failed stage.  If there is already a resubmit enqueued 
for a different failed
-  // stage, that event would also be sufficient to handle the 
current failed stage, but
-  // producing a resubmit for each failed stage makes 
debugging and logging a little
-  // simpler while not producing an overwhelming number of 
scheduler events.
-  logInfo(
-s"Resubmitting $mapStage (${mapStage.name}) and " +
-s"$failedStage (${failedStage.name}) due to fetch failure"
-  )
-  messageScheduler.schedule(
-new Runnable {
-  override def run(): Unit = 
eventProcessLoop.post(ResubmitFailedStages)
-},
-DAGScheduler.RESUBMIT_TIMEOUT,
-TimeUnit.MILLISECONDS
-  )
-}
-  }
+val epochForMapOutput = 
mapOutputTracker.getEpochForMapOutput(shuffleId, mapId)
+// It is possible that the map output was regenerated by rerun of 
the stage and the
+// fetch failure is being reported for stale map output. In that 
case, we should just
+// ignore the fetch failure and relaunch the task with latest map 
output info.
+if (epochForMapOutput.nonEmpty && epochForMapOutput.get <= 
task.epoch) {
--- End diff --

I'd be inclined to do this without the extra binding and `get`:
```scala
for(epochForMapOutput <- 
mapOutputTracker.getEpochForMapOutput(shuffleId, mapId) if
epochForMapOutput <= task.epoch) {

[GitHub] spark issue #17357: [SPARK-20025][CORE] Fix spark's driver failover mechanis...

2017-03-20 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/17357
  
Please change the title of this PR. "Fixed foo" is nearly useless when 
scanning the commit log in the future since it doesn't tell us anything about 
either the nature of the problem or the actual code change.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17113: [SPARK-13669][Core] Improve the blacklist mechanism to h...

2017-03-06 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/17113
  
@squito Correct, we really only try to kill running tasks currently on job 
failure (and if the config setting allows it); but there is the long-standing 
"TODO: Cancel running tasks in the stage" in `case FetchFailed` of 
`DAGScheduler#handleTaskCompletion`, which has languished as a TODO both 
because resolving it would require us both to make Spark itself handle task 
interruption in the fetch failure case and to deal with the same issues 
preventing us from making task interruption the default even for job 
cancelation.

All I'm really saying is that we shouldn't design in a hard requirement 
that tasks cannot be interrupted (on job cancelation, fetch failure or some 
other event), because we'd often really like to be able to kill running tasks 
-- even though we don't know quite how to do that safely right now. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17113: [SPARK-13669][Core] Improve the blacklist mechanism to h...

2017-03-06 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/17113
  
@tgravescs At the config level, it is spark.job.interruptOnCancel or 
SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, which then gets passed around as a 
boolean -- e.g. shouldInterruptThread.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17113: [SPARK-13669][Core] Improve the blacklist mechanism to h...

2017-03-06 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/17113
  
@mridulm Correct, turning task interruption on by default is not so much a 
matter of Spark itself handling it well as it is a possible (though not 
completely known) issue with lower layer libraries not handling interruption 
well. The original concern with HDFS is likely fixed now, but there are similar 
concerns with Cassandra and other libraries. Logically, we'd like to interrupt 
Tasks when associated Jobs or Stages are killed in the DAGScheduler. In 
practice, nobody knows right now how to do that safely in all circumstances, so 
the default is to not attempt to interrupt the tasks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17113: [SPARK-13669][Core] Improve the blacklist mechanism to h...

2017-03-06 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/17113
  
> Spark does immediately abort the stage but it doesn't kill the running 
tasks

Whether running tasks are interrupted on stage abort or not depends on the 
state of a config boolean -- and ideally we'd like to get to the point where we 
can confidently set that config so that running tasks are interrupted when the 
associated job or stage dies.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17113: [SPARK-13669][Core] Improve the blacklist mechanism to h...

2017-03-01 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/17113
  
"Current Spark's blacklist mechanism": please be more precise. The most 
recent released version of Spark, 2.1.0, does not include a lot of recent 
changes to blacklisting (mostly 
https://github.com/apache/spark/commit/93cdb8a7d0f124b4db069fd8242207c82e263c52).
 Are the problems you are describing fully explored with the master branch of 
Spark?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17088: [SPARK-19753][CORE] All shuffle files on a host should b...

2017-02-27 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/17088
  
Even if I completely agreed that removing all of the shuffle files on a 
host was the correct design choice, I'd still be hesitant to merge this right 
now. That is simply because we have recently merged other changes related to 
fetch failure handling, and I'd really like to see some more time pass with 
just those changes in the code before we introduce more fetch failure changes. 
I don't want to get in the situation of merging this PR then getting reports of 
fetch failure bugs in master a week later, and not knowing whether to place the 
blame on this PR or the other recent fetch failure changes.

That needn't preclude more discussion of this PR or possibly merging it 
after we have a little more experience and confidence with the code already in 
master.

/cc @kayousterhout  @squito 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17045: [SPARK-19373][MESOS] Base spark.scheduler.minRegisteredR...

2017-02-23 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/17045
  
thanks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17045: [SPARK-19373][MESOS] fix spark.scheduler.minRegisteredRe...

2017-02-23 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/17045
  
Please avoid using "fix" as the description in a PR -- it doesn't tell us 
anything substantive about the nature of the problem or its resolution, so any 
future reviewing of commit messages will require digging into the actual 
committed code to get even a minimal idea of what was done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16905: [SPARK-19567][CORE][SCHEDULER] Support some Schedulable ...

2017-02-21 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/16905
  
Thanks, Shane & Kay!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16905: [SPARK-19567][CORE][SCHEDULER] Support some Schedulable ...

2017-02-21 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/16905
  
If Jenkins is listening to me, that should have allowed you to trigger test 
for this PR.

test this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16905: [SPARK-19567][CORE][SCHEDULER] Support some Schedulable ...

2017-02-21 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/16905
  
ok to test


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

2017-02-16 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/16620
  
LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #12524: [SPARK-12524][Core]DagScheduler may submit a task set fo...

2017-02-16 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/12524
  
@JoshRosen  I haven't tried to walk through the logs in your JIRA comment, 
but it wouldn't surprise me at all if this is the same issue that we've been 
working through in https://github.com/apache/spark/pull/16620


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15009: [SPARK-17443][SPARK-11035] Stop Spark Application...

2017-02-13 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/15009#discussion_r100950587
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala ---
@@ -719,7 +719,23 @@ object SparkSubmit extends CommandLineUtils {
   printWarning("Subclasses of scala.App may not work correctly. Use a 
main() method instead.")
 }
 
-val mainMethod = mainClass.getMethod("main", new 
Array[String](0).getClass)
+val sparkAppMainMethodArr = mainClass.getMethods().filter{_.getName() 
== "sparkMain"}
--- End diff --

And `getName` shouldn't be a mutator, so no parens.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16905: [SPARK-19567][CORE][SCHEDULER] Support some Sched...

2017-02-13 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/16905#discussion_r100836419
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -130,15 +130,17 @@ private[spark] class TaskSchedulerImpl 
private[scheduler](
 
   val mapOutputTracker = SparkEnv.get.mapOutputTracker
 
-  var schedulableBuilder: SchedulableBuilder = null
+  private val SCHEDULER_MODE_PROPERTY = "spark.scheduler.mode"
+  private var schedulableBuilder: SchedulableBuilder = null
   var rootPool: Pool = null
   // default scheduler is FIFO
-  private val schedulingModeConf = conf.get("spark.scheduler.mode", "FIFO")
+  private val schedulingModeConf = conf.get(SCHEDULER_MODE_PROPERTY, 
SchedulingMode.FIFO.toString)
   val schedulingMode: SchedulingMode = try {
 SchedulingMode.withName(schedulingModeConf.toUpperCase)
   } catch {
 case e: java.util.NoSuchElementException =>
-  throw new SparkException(s"Unrecognized spark.scheduler.mode: 
$schedulingModeConf")
+  throw new SparkException(s"Unrecognized $SCHEDULER_MODE_PROPERTY: 
$schedulingModeConf. " +
+s"Supported modes: ${SchedulingMode.FAIR} or 
${SchedulingMode.FIFO}.")
--- End diff --

We're not likely to add or remove SchedulingModes with any frequency, if at 
all, so this isn't likely to cause much opportunity for error -- but I agree 
with the principle that extracting from `.values` is a better approach. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16905: [SPARK-19567][CORE][SCHEDULER] Support some Sched...

2017-02-13 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/16905#discussion_r100834970
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/Pool.scala ---
@@ -37,25 +37,22 @@ private[spark] class Pool(
 
   val schedulableQueue = new ConcurrentLinkedQueue[Schedulable]
   val schedulableNameToSchedulable = new ConcurrentHashMap[String, 
Schedulable]
-  var weight = initWeight
-  var minShare = initMinShare
+  val weight = initWeight
+  val minShare = initMinShare
   var runningTasks = 0
-  var priority = 0
+  val priority = 0
 
   // A pool's stage id is used to break the tie in scheduling.
   var stageId = -1
-  var name = poolName
+  val name = poolName
   var parent: Pool = null
 
-  var taskSetSchedulingAlgorithm: SchedulingAlgorithm = {
+  private val taskSetSchedulingAlgorithm: SchedulingAlgorithm = {
 schedulingMode match {
-  case SchedulingMode.FAIR =>
-new FairSchedulingAlgorithm()
-  case SchedulingMode.FIFO =>
-new FIFOSchedulingAlgorithm()
-  case _ =>
-val msg = "Unsupported scheduling mode: $schedulingMode. Use FAIR 
or FIFO instead."
-throw new IllegalArgumentException(msg)
+  case SchedulingMode.FAIR => new FairSchedulingAlgorithm()
+  case SchedulingMode.FIFO => new FIFOSchedulingAlgorithm()
+  case _ => throw new IllegalArgumentException("Unsupported scheduling 
mode: " +
+s"$schedulingMode. Supported modes: ${SchedulingMode.FAIR} or 
${SchedulingMode.FIFO}.")
--- End diff --

I'd really rather not see this kind of change. Other that the missing 
string-interpolation `s` in `msg`, the prior code was at least as good (and 
arguably better) than the new style, and making such an inconsequential style 
change just adds complication to future investigations of the git history.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16905: [SPARK-19567][CORE][SCHEDULER] Support some Schedulable ...

2017-02-13 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/16905
  
@srowen These refactorings of unnecessary vars to vals is something that 
we've noted in the discussions of a few other PRs as something that could and 
probably should be done in a separate PR (i.e. this one).

@erenavsarogullari There is another such refactoring that can be rolled 
into this PR.  See the changes that I've made to `rootPool` in 
`TaskSchedulerImpl`, `DAGSchedulerSuite` and `ExternalClusterManagerSuite` 
here: 
https://github.com/markhamstra/spark/commit/e11fe2a9817559492daee03c8c025879dc44d346


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

2017-02-11 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/16620
  
Thanks for all the investigation and the write up, @kayousterhout  This 
makes good sense to me, and should take us a long way toward both fixing the 
immediate bug and improving the code. We should also make sure that our 
intentions and understanding get preserved in documentation that is more 
obvious and accessible in the future than PR discussion threads. Probably more 
comments in the source code that cover the essence of your "very long write 
up", but maybe we should consider creating an external documentation page (wiki 
or something) that covers in long form what we know and intend; then we can 
scale down the in-code comments to a shorter form that includes pointers to the 
long form.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16876: [SPARK-19537] Move pendingPartitions to ShuffleMapStage.

2017-02-09 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/16876
  
You're welcome -- but do be aware that I'm going to be extremely busy with 
non-Spark stuff for at least the next week, so for awhile my Spark code reviews 
are likely to be more cursory than they should be.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16876: [SPARK-19537] Move pendingPartitions to ShuffleMapStage.

2017-02-09 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/16876
  
Makes good sense to me.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

2017-02-07 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/16620
  
@kayousterhout yes, I also looked at duplicating `stage.pendingPartitions 
-= task.partitionId`.  I could live with that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16813: [SPARK-19466][CORE][SCHEDULER] Improve Fair Sched...

2017-02-07 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/16813#discussion_r99964246
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala ---
@@ -69,60 +72,81 @@ private[spark] class FairSchedulableBuilder(val 
rootPool: Pool, conf: SparkConf)
   val DEFAULT_WEIGHT = 1
 
   override def buildPools() {
-var is: Option[InputStream] = None
+var fileData: Option[FileData] = None
 try {
-  is = Option {
-schedulerAllocFile.map { f =>
-  new FileInputStream(f)
-}.getOrElse {
-  
Utils.getSparkClassLoader.getResourceAsStream(DEFAULT_SCHEDULER_FILE)
-}
-  }
-
-  is.foreach { i => buildFairSchedulerPool(i) }
+  fileData = getFileData()
+  fileData.foreach { data => buildFairSchedulerPool(data) }
+} catch {
+  case NonFatal(t) =>
+logError("Error while building the fair scheduler pools: ", t)
+throw t
 } finally {
-  is.foreach(_.close())
+  fileData.foreach(_.inputStream.close())
 }
 
 // finally create "default" pool
 buildDefaultPool()
   }
 
+  private def getFileData(): Option[FileData] = {
+schedulerAllocFile.map { f =>
+  val file = new File(f)
+  val fis = new FileInputStream(file)
--- End diff --

Yes, and if they do have multiple files with the same name on different 
paths and are expecting one of them to be used when Spark is actually trying to 
use one on a different path, then having the full path in the log message will 
be crucial to short-circuiting the debugging confusion. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

2017-02-07 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/16620
  
@kayousterhout don't overestimate my enthusiasm for my own suggestion.  I'm 
really just thinking aloud in search of a solution, and I agree with you that 
the TaskSetManager and DAGScheduler being in disagreement is not good.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

2017-02-07 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/16620
  
The way that I am thinking about this right now is that @kayousterhout is 
on the right track with the early return at 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1141
 , but that her proposed `stage...attemptId != task.stageAttemptId` is broader 
than it needs to be.  My idea is that we want to be throwing away task results 
from earlier attempts that were run on executors that failed (on the 
presumption that one fetch failure means that other fetches from there are also 
going to fail), but that if the executor didn't fail, then the outputs from 
earlier attempts of tasks that complete late but successfully on still-good 
executors should still be valid and available, so we should accept them as 
though they were successful task completions for the current attempt.

What you end up with is that if-statement now looking like:
```scala
val stageHasBeenCancelled = !stageIdToStage.contains(task.stageId)
val shuffleMapTaskIsFromFailedExecutor = task match {
  case smt: ShuffleMapTask =>
val status = event.result.asInstanceOf[MapStatus]
val execId = status.location.executorId
failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)
  case _ => false
}
if (stageHasBeenCancelled || shuffleMapTaskIsFromFailedExecutor) {
  return
}
```
...and then the `failedEpoch.contains(execId) && smt.epoch <= 
failedEpoch(execId)` check can be removed from `case smt: ShuffleMapTask =>`.

If we can do it cleanly, I think we should be avoiding re-running Tasks 
that complete successfully and should still be available.  This is a bit 
different from the intent of SPARK-14649, which I am reading as an effort not 
to ignore the results of long-running tasks that start and eventually complete 
on an executor on which some other tasks actually run into fetch failures.  I'm 
really only trying to preserve the results of successful tasks run on executors 
that haven't failed.

Unfortunately, the DAGSchedulerSuite doesn't agree with my intentions, 
because the above change actually leads to multiple test failures.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   3   4   5   6   7   >