[GitHub] spark issue #22771: [SPARK-25773][Core]Cancel zombie tasks in a result stage...
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/22771 LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22771: [SPARK-25773][Core]Cancel zombie tasks in a resul...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/22771#discussion_r228371144 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1364,6 +1385,21 @@ private[spark] class DAGScheduler( if (job.numFinished == job.numPartitions) { markStageAsFinished(resultStage) cleanupStateForJobAndIndependentStages(job) +try { + // killAllTaskAttempts will fail if a SchedulerBackend does not implement + // killTask. + logInfo(s"Job ${job.jobId} is finished. Killing potential speculative or " + +s"zombie tasks for this job") --- End diff -- Yes, other log messages probably also aren't very good. Maybe what we need is some additional explanation in the docs somewhere. The issue that I am having is that if the log messages say that Tasks are being killed or canceled or whatever, many users will assume that that means that the Tasks will no longer be running on the Executors. In fact, what it means is that the DAGScheduler isn't going to try to run them anymore, and that any previously started Tasks may or may not still be running or continue to run on the Executors -- it depends on whether the Tasks are interruptible and on whether the interrupt on cancel configuration is set to true. The log messages make sense if you understand that subtlety, so we should probably try to explain it more fully in the docs. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22771: [SPARK-25773][Core]Cancel zombie tasks in a resul...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/22771#discussion_r228354020 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1364,6 +1385,21 @@ private[spark] class DAGScheduler( if (job.numFinished == job.numPartitions) { markStageAsFinished(resultStage) cleanupStateForJobAndIndependentStages(job) +try { + // killAllTaskAttempts will fail if a SchedulerBackend does not implement + // killTask. + logInfo(s"Job ${job.jobId} is finished. Killing potential speculative or " + +s"zombie tasks for this job") --- End diff -- Isn't this misleading/confusing if !shouldInterruptTaskThread? You can attempt to kill speculative or zombie Tasks in that case, but nothing will actually happen if SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL is false. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22144: [SPARK-24935][SQL] : Problem with Executing Hive UDF's f...
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/22144 Thanks @tgravescs for your latest posts -- they've saved me from posting something similar in many respects but more strongly worded. What is bothering me (not just in the discussion of this PR, but more broadly) is that we have individuals making declarative statements about whether something can or can't block a release, or that something "is not that important to Spark at this point", etc. -- things for which there is no supporting PMC vote or declared policy. It may be your opinion, @cloud-fan , that Hive compatibility should no longer be important to the Apache Spark project, and I have no problem with you expressing your opinion on the matter. That may even be the internal policy at your employer, I don't know. But you are just not in a position on your own to make this declaration for the Apache Spark project. I don't mean to single you out, @cloud-fan , as the only offender, since this isn't a unique instance. For example, heading into a recent release we also saw individual declarations that the data correctness issue caused by the shuffle replay partitioning issue was not a blocker because it was not a regression or that it was not significant enough to alter the release schedule. Rather, my point is that things like release schedules, the declaration of release candidates, labeling JIRA tickets with "blocker", and de facto or even declared policy on regressions and release blockers are just tools in the service of the PMC. If, as was the case with the shuffle data correctness issue, PMC members think that the issue must be fixed before the next release, then release schedules, RC-status, other individuals' perceptions of importance to the project or of policy ultimately don't matter -- only the vote of the PMC does. What is concerning me is that, instead of efforts to persuade the PMC members that something should not block the next release or should not be important to the project, I am seeing flat declarations that an issue is not a blocker or not important. That may serve to stifle work to immediately fix a bug, or to discourage other contributions, but I can assure that trying to make the PMC serve the tools instead of the other way around won't serve to persuade at least some PMC members on how they should vote. Sorry, I guess I can't avoid wording things strongly after all. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22144: [SPARK-24935][SQL] : Problem with Executing Hive UDF's f...
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/22144 Yes, @rxin I know that I was a little unfair to you in order to make my point sharper. Apologies. My concern is real, though. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22144: [SPARK-24935][SQL] : Problem with Executing Hive UDF's f...
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/22144 @srowen I understand and agree. What bothers me is that the block-no block decision now often seems to be "not a regression; automatic no block" -- and that doesn't seem right to me. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22144: [SPARK-24935][SQL] : Problem with Executing Hive UDF's f...
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/22144 > Itâs certainly not a blocker since itâs not a new regression I really hate this line of argument. Somehow we seem to have slipped from "if it is a regression, then we must block" to "if it is not a regression, then we don't necessarily have to block, but we might still choose to" to "if it is not a regression, then we necessarily should not block." I'm not at all comfortable with what now appears to be our de facto policy on regressions and blocking releases. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22771: [SPARK-25773][Core]Cancel zombie tasks in a result stage...
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/22771 > We can at least try to abort the tasks and still honors the interrupt on cancel flag. It seems like best case is things actually get killed and we free up resources, worst case seems to be that the task ignores the interrupt and continues just like now. Right: Unless we can get to the point where we can confidently switch the interrupt on cancel default to true, efforts to more aggressively clean up executing but no longer needed Tasks won't accomplish anything unless that flag has been explicitly changed by the user. That is the most significant reason why the coding of such clean up efforts has not been attempted in the past. > Do you have specific concerns where this would actually cause problems Not really, and that was kind of the point of SPARK-17064: For a long time, we've had significant questions about what can happen when trying to interrupt Tasks, and while some of them were at least once connected to HDFS client or other libraries not handling interrupt requests correctly, we've never (to my knowledge) had concrete knowledge of specific effects resulting from one or more code paths. Mostly the concerns fall into two categories: 1) Not enough happening when we try to interrupt a Task; 2) Too much happening when we try to interrupt a Task. The first is what we already touched on: That requesting interruption of a Task may not stop the Task. That means that we can't write code that depends on resources being freed, etc. just because we tried to interrupt a Task. The follow-on question from that is whether we should try to detect this and do something more aggressive (such as killing the JVM) in order to guarantee resource availability. The second category of concern is essentially that interrupting a Task may cause side-effects that produce corrupt data or state, or that cause a Task to report a failure back to the DAGScheduler. Enough such Task failures can not only lead to Stage and Job failure, but also blacklisting of an Executor. The reporting of failures from an interrupted Task can probably be worked around with careful accounting in the DAGScheduler (i.e. to ignore any such reports from a Task that we already tried to stop), but the other unintended side-effects are more nebulous and insidious, since they would as likely as not be coming from library dependencies, rather than core Spark code. The end result of that is that we probably can't do away with the interrupt on cancel flag entirely even if we can get to the point where we can change the default to true -- some user Tasks may need to depend on a broken library. And that gets us back to the issue of not being able to depend on resources being freed, etc. just because we tried to interrupt a Task. It's great to see more people looking at the scheduler code, since many of us who once spent more time on it can't do so as easily now; but it does concern me that sometimes old TODO items are picked up and reviewed rather naively. If it seems like there is a simple way to resolve a TODO, I'd encourage contributors to at least try to wake up those of us who wrote some of the older code. If the change really could be done easily and safely, we likely already would have done it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22771: [SPARK-25773][Core]Cancel zombie tasks in a result stage...
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/22771 There are long-standing questions here that I don't think have yet been adequately answered -- cf. https://issues.apache.org/jira/browse/SPARK-17064 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22463: remove annotation @Experimental
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/22463 Where is the discussion on these utility methods no longer being Experimental? I'm not saying that they are not stable, but the Kafka 0.10 API in general being considered to be stable doesn't preclude some Kafka-related methods from still being other than stable, and promoting API to stable (i.e. cannot be changed without a major release) is a pretty big deal. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22112: [SPARK-23243][Core] Fix RDD.repartition() data co...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/22112#discussion_r213061324 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala --- @@ -1918,3 +1980,19 @@ object RDD { new DoubleRDDFunctions(rdd.map(x => num.toDouble(x))) } } + +/** + * The random level of RDD's output (i.e. what `RDD#compute` returns), which indicates how the + * output will diff when Spark reruns the tasks for the RDD. There are 3 random levels, ordered + * by the randomness from low to high: --- End diff -- Again, please remove "random" and "randomness". The issue is not randomness, but rather determinism. For example, the output of `RDD#compute` could be completely non-random but still dependent on state not contained in the RDD. That would still make it problematic in terms of recomputing only some partitions and aggregating the results. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22112: [SPARK-23243][Core] Fix RDD.repartition() data co...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/22112#discussion_r212395101 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala --- @@ -1876,6 +1920,22 @@ abstract class RDD[T: ClassTag]( */ object RDD { + /** + * The random level of RDD's computing function, which indicates the behavior when rerun the + * computing function. There are 3 random levels, ordered by the randomness from low to high: + * 1. IDEMPOTENT: The computing function always return the same result with same order when rerun. + * 2. UNORDERED: The computing function returns same data set in potentially a different order + * when rerun. + * 3. INDETERMINATE. The computing function may return totally different result when rerun. + * + * Note that, the output of the computing function usually relies on parent RDDs. When a + * parent RDD's computing function is random, it's very likely this computing function is also + * random. + */ + object RandomLevel extends Enumeration { --- End diff -- I'm not completely wedded to the IDEMPOTENT, UNORDERED, INDETERMINATE naming, so if somebody has something better or less likely to lead to confusion, I'm fine with that. I'd like to not use "random" in these names, though, since that implies actually randomness at some level, entropy guarantees, etc. What is key is not whether output values or ordering are truly random, but simply that we can't easily determine what they are or that they are fixed and repeatable. That's why I'd prefer that things like `RDD.RandomLevel.INDETERMINATE` be, I would suggest, `RDD.Determinism.INDETERMINATE`, and `computingRandomLevel` should be `computeDeterminism` (unless we want the slightly cheeky `determineDeterminism` :) ). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22176: [SPARK-25181][CORE] Limit Thread Pool size in BlockManag...
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/22176 Yes, this is better than what we had, but maybe it can be better still. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22176: [SPARK-25181][CORE] Limit Thread Pool size in BlockManag...
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/22176 @zsxwing we really should have considered whether this should be a configuration variable instead of a fixed number of threads in any environment. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22112: [SPARK-23243][Core] Fix RDD.repartition() data correctne...
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/22112 I'm not a fan of the IDEMPOTENT, RANDOM_ORDER, COMPLETE_RANDOM naming. IDEMPOTENT is fine, but I'd prefer UNORDERED and INDETERMINATE to cover the cases of "same values in potentially a different order" and "potentially different values and different order" respectively. RANDOM implies something too specific. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21698: [SPARK-23243][Core] Fix RDD.repartition() data correctne...
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/21698 > I really disagree with this. I really agree with Tom. At this point, I think the working assumption should be that any 2.4.0 release candidate that doesn't deliver some fix for this issue will receive multiple -1 votes from PMC members. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22039: [SPARK-25036][SQL][FOLLOW-UP] Avoid match may not be exh...
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/22039 > Yes, "quick hack", but, as opposed to what in these specific cases? Yes, that is the key question. I'll admit, I haven't looked at all deeply to try to figure out whether something better is possible, so this is just my knee-jerk reaction: If you don't have to, then don't use a catch-all just to silence the compiler warning. If you're satisfied that there isn't a better option, then I'll accept that second-best is the best we can do here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22039: [SPARK-25036][SQL][FOLLOW-UP] Avoid match may not be exh...
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/22039 Hmmm... sorry to be late to this, but making pattern matches exhaustive by adding a catch-all case that then throws an exception, while easy, should be considered as a less than optimal fix. Ideally, the type handling should be tightened up so that the match can be exhaustive without a catch-all. The reason for this is that if in the future a type is added such that the pattern match should be extended to handle that type, then the presence of a catch-all will give the false impression that the new type is being handled, no compiler warning will be thrown, etc. If the pattern match is made exhaustive without a catch-all and the compiler option to convert warnings to errors is used, then it becomes pretty much impossible that future type additions/additional necessary cases will be silently mishandled. Now I realize that it is not always feasible to achieve that level of type safety in Scala code, but has adequate consideration been given to making the effort here, or was this just a quick hack to make the compiler shut up? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21589: [SPARK-24591][CORE] Number of cores and executors in the...
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/21589 Thank you, @HyukjinKwon There are a significant number of Spark users who use the Job Scheduler model with a SparkContext shared across many users and many Jobs. Promoting tools and patterns based upon the number of core or executors that a SparkContext has access to, encouraging users to create Jobs that try to use all of the available cores, very much leads those users in the wrong direction. As much as possible, the public API should target policy that addresses real user problems (all users, not just a subset), and avoid targeting the particulars of Spark's internal implementation. A `repartition` that is extended to support policy or goal declarations (things along the lines of `repartition(availableCores)`, `repartition(availableDataNodes)`, `repartition(availableExecutors)`, `repartition(unreservedCores)`, etc.), relying upon Spark's internals (with it's compete knowledge of the total number of cores and executors, scheduling pool shares, number of reserved Task nodes sought in barrier scheduling, number of active Jobs, Stages, Tasks and Sessions, etc.) may be something that I can get behind. Exposing a couple of current Spark scheduler implementation details in the expectation that some subset of users in some subset of use cases will be able to make correct use of them is not. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21589: [SPARK-24591][CORE] Number of cores and executors in the...
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/21589 I don't accept you assertions of what constitutes the majority and minority of Spark users or use cases or their relative importance. As a long-time maintainer of the Spark scheduler, it is also not my concern to define which Spark users are important or not, but rather to foster system internals and a public API that benefit all users. I already have pointed out with some specificity how exposing the scheduler's low-level accounting of the number of cores or executors that are available at some point can encourage anti-patterns and sub-optimal Job execution. Repartitioning based upon a snapshot of the number of cores available cluster-wide is clearly not the correct thing to do in many instances and use cases. Beyond concern for users, as a developer of Spark internals, I don't appreciate being pinned to particular implementation details by having them directly exposed to users. And I'll repeat, this JIRA and PR look to be defining the problem to fit a preconception of the solution. Even for the particular users and use cases targeted by this PR, I wouldn't expect that those users would embrace "I can't repartition based upon the scheduler's notion of the number of cores in the cluster at some point" as a more accurate statement of their problem than "My Spark Jobs don't use all of the CPU resources that I am entitled to use." Even if we were to stipulate that in a `repartition` call is inherently the only or best place to try to address that real user problem (and I far from convinced that this is the only or best approach), then I'd be far happier with extending the `repartition` API to include declarative goals than exposing to users only part of what is needed from Spark's internal to figure out what is the best repartitioning -- perhaps something along the lines of `repartition(MaximizeCPUs)` or other appropriate policy/goal enumerations. And spark packages are not irrelevant here. In fact, a large part of their motivation was to handle extensions that are not appropriate for all users or to prove out ideas and APIs that are not yet clearly appropriate for inclusion in Spark itself. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21589: [SPARK-24591][CORE] Number of cores and executors in the...
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/21589 It is precisely because the audience that I am concerned with is not limited to just data scientists or notebook users and their particular needs that I am far from convinced that exposing internals of the Spark scheduler in the public API is a good idea. There are many ways that a higher-level declaration could be made. I'm not committed to any particular model at this point. The way that it is done for scheduling pools via `sc.setLocalProperty` is one way that Job execution can be put into a particular declarative context. That's not necessarily the best way to do it, but it isn't necessarily more difficult than figuring out correct imperative code after fetching a snapshot of the number of available cores at some point. Doing this the right way likely requires an appropriate SPIP, not just a quick hack PR. A spark-package would be another way to expose additional functionality without it needing to be bound into the Spark public API. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21589: [SPARK-24591][CORE] Number of cores and executors in the...
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/21589 @ssimeonov the purpose of a public API is not to offer hack solutions to a subset of problems. What is needed is a high-level, declarative abstraction that can be used to specify requested Job resource-usage policy. Exposing low-level Spark scheduling internals is not the way to achieve that. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21589: [SPARK-24591][CORE] Number of cores and executors in the...
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/21589 No, defaultParallelism isn't more useful in that case, but that just starts getting to my overall assessment of this JIRA and PR: It smells of defining the problem to align with a preconception of the solution. Exposing the driver's current accounting of the number of cores active in the cluster is not something that we couldn't do or didn't know how to do along time ago. Rather, it is something that those of us working on the scheduler chose not to do because of the expectation that putting this in the public API (and thereby implicitly encouraging its use) was likely to produce as many problems as it solves. This was primarily because of two factors: 1) The number of cores and executors is not static; 2) Closely tailoring a Job to some expectation of the number of available cores or executors is not obviously a correct thing to encourage in general. Whether from node failures, dynamic executor allocation, backend scheduler elasticity/preemption, or just other Jobs running under the same SparkContext, the number of cores and executors available to any particular Job when it is created can easily be different from what is available when any of its Stages actually runs. Even if you could get reliable numbers for the cores and executors that will be available through the lifecycle of a Job, tailoring a Job to use all of those cores and executors is only the right thing to do in a subset of Spark use cases. For example, using many more executors than there are DFS partitions holding the data, or trying to use all of the cores when there are other Jobs pending, or trying to use all of the cores when another Job needs to acquire a minimum number for barrier scheduled execution, or trying to use more cores than a scheduling pool permits would all be examples of anti-patterns that would be more enabled by easy, context-free access to low-level numCores. There definitely are use cases where users need to be able to set policy for whether particular jobs should be encouraged to use more or less of the cluster's resources, but I believe that that needs to be done at a much higher level of abstraction in a declarative form, and that policy likely needs to be enforced dynamically/adaptively at Stage boundaries. The under-developed and under-used dynamic shuffle partitioning code in Spark SQL starts to go in that direction. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21589: [SPARK-24591][CORE] Number of cores and executors in the...
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/21589 @mridulm scheduler pools could also make the cluster-wide resource numbers not very meaningful. I don't think the maxShare work has been merged yet (kind of a stalled TODO on an open PR, IIRC), but once that is in, it's not terribly useful to know, e.g., that there are 5 million cores in the cluster if your Job is running in a scheduler pool that is restricted to using far fewer CPUs via the pool's maxShares. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21754: [SPARK-24705][SQL] Cannot reuse an exchange opera...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/21754#discussion_r203416454 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala --- @@ -85,14 +85,20 @@ case class ReusedExchangeExec(override val output: Seq[Attribute], child: Exchan */ case class ReuseExchange(conf: SQLConf) extends Rule[SparkPlan] { + private def supportReuseExchange(exchange: Exchange): Boolean = exchange match { +// If a coordinator defined in an exchange operator, the exchange cannot be reused --- End diff -- This seems overstated if this comment in the JIRA description is correct: "When the cache tabel device_loc is executed before this query is executed, everything is fine". In fact, if Xiao Li is correct in that statement, then this PR is eliminating a useful optimization in cases where it doesn't need to -- i.e. it is preventing Exchange reuse any time adaptive execution is used instead of only preventing reuse when it will actually cause a problem. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21598: [SPARK-24605][SQL] size(null) returns null instead of -1
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/21598 > case by case Yes, but... this by itself makes the decision appear far too discretionary. Instead, in any PR where you are changing the published interface or behavior of part of Spark's public API, you should be highlighting the change for additional review and providing a really strong argument for why we cannot retain the prior interface and/or default behavior. It is simply not up to an individual committer to decide on their own discretion that the public API should be different than what it, in fact, is. Changing the public API is a big deal -- which is why most additions to the public API should, in my opinion, come in with InterfaceStability annotation that will allow us to change them before a new major-number release. This doesn't apply to changes to internal APIs. Neither does it apply to bug fixes where Spark isn't actually doing what the public API says it is supposed to do -- although in cases where we expect that users have come to safely rely upon certain buggy behavior, we may choose to retain that buggy behavior under a new configuration setting. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21598: [SPARK-24605][SQL] size(null) returns null instead of -1
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/21598 @HyukjinKwon this is not new policy. It is what Apache Spark has guaranteed in its version numbering and public API since 1.0.0. It is not a matter of "from now on", but rather of whether committers have started allowing our standards to slip. It may well be time for a discussion of that and of better tools to help guarantee that additions and changes to the public API are adequately discussed and reviewed, appropriate InterfaceStability annotations are applied, etc. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21598: [SPARK-24605][SQL] size(null) returns null instead of -1
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/21598 > so we can't just change the default value in a feature release Agreed. Once a particular interface and behavior is in our released public API, then we effectively have a contract not to change that behavior. If we are going to provide another behavior before making a new major-number release (e.g. spark-3.0.0), then we have to provide a user configuration option to select that new behavior, and the default behavior if a user doesn't change configuration must be the same as before the optional new behavior. If there is a clear, severe bug (such as data loss or corruption), only then we can consider changing the public API before making a new major-number release -- but even then we are likely to either go immediately to a new major-number or to at least preserve the old, buggy behavior with a configuration option. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21527: [SPARK-24519] MapStatus has 2000 hardcoded
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/21527 Sure, as long as we are not telling users that this is something that they can or should use, that's fine. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21527: [SPARK-24519] MapStatus has 2000 hardcoded
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/21527 > we can definitely update the description with more details. Eventually, some of the motivation and advice/suggestions need to get into the main user docs, as well. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21527: [SPARK-24519] MapStatus has 2000 hardcoded
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/21527 @tgravescs If there is value in making it configurable, that is all fine and good. My argument is against making it configurable just for the sake of making it configurable. If there is more going on than that, then at a minimum we need documentation explaining why and when this number should be changed, and suggestions of appropriate values. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21527: [SPARK-24519] MapStatus has 2000 hardcoded
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/21527 > We should make it configurable. That's a debatable assertion all by itself -- and quite unfortunately, there is no more justification for this claim in the JIRA ticket. Without proof that there is a better configuration under certain circumstances, and without guidance as to why and how this value should be reconfigured, I'd argue that adding another knob for users to twiddle just adds confusion, potential for misconfiguration and more configuration space that is unlikely to be adequately tested. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/21440#discussion_r191063339 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -659,6 +659,11 @@ private[spark] class BlockManager( * Get block from remote block managers as serialized bytes. */ def getRemoteBytes(blockId: BlockId): Option[ChunkedByteBuffer] = { +// TODO if we change this method to return the ManagedBuffer, then getRemoteValues +// could just use the inputStream on the temp file, rather than memory-mapping the file. +// Until then, replication can go cause the process to use too much memory and get killed --- End diff -- grammar --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20930: [SPARK-23811][Core] FetchFailed comes before Succ...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/20930#discussion_r184462542 --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala --- @@ -2399,6 +2399,84 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi } } + /** + * This tests the case where origin task success after speculative task got FetchFailed + * before. + */ + test("SPARK-23811: ShuffleMapStage failed by FetchFailed should ignore following " + +"successful tasks") { +// Create 3 RDDs with shuffle dependencies on each other: rddA <--- rddB <--- rddC +val rddA = new MyRDD(sc, 2, Nil) +val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2)) +val shuffleIdA = shuffleDepA.shuffleId + +val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = mapOutputTracker) +val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(2)) + +val rddC = new MyRDD(sc, 2, List(shuffleDepB), tracker = mapOutputTracker) + +submit(rddC, Array(0, 1)) + +// Complete both tasks in rddA. +assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0) +complete(taskSets(0), Seq( + (Success, makeMapStatus("hostA", 2)), + (Success, makeMapStatus("hostB", 2 + +// The first task success +runEvent(makeCompletionEvent( + taskSets(1).tasks(0), Success, makeMapStatus("hostB", 2))) + +// The second task's speculative attempt fails first, but task self still running. +// This may caused by ExecutorLost. +runEvent(makeCompletionEvent( + taskSets(1).tasks(1), + FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, "ignored"), + null)) +// Check currently missing partition. + assert(mapOutputTracker.findMissingPartitions(shuffleDepB.shuffleId).get.size === 1) +// The second result task self success soon. +runEvent(makeCompletionEvent( + taskSets(1).tasks(1), Success, makeMapStatus("hostB", 2))) +// Missing partition number should not change, otherwise it will cause child stage +// never succeed. + assert(mapOutputTracker.findMissingPartitions(shuffleDepB.shuffleId).get.size === 1) + } + + test("SPARK-23811: check ResultStage failed by FetchFailed can ignore following " + +"successful tasks") { +val rddA = new MyRDD(sc, 2, Nil) +val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2)) +val shuffleIdA = shuffleDepA.shuffleId +val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = mapOutputTracker) +submit(rddB, Array(0, 1)) + +// Complete both tasks in rddA. +assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0) +complete(taskSets(0), Seq( + (Success, makeMapStatus("hostA", 2)), + (Success, makeMapStatus("hostB", 2 + +// The first task of rddB success +assert(taskSets(1).tasks(0).isInstanceOf[ResultTask[_, _]]) +runEvent(makeCompletionEvent( + taskSets(1).tasks(0), Success, makeMapStatus("hostB", 2))) + +// The second task's speculative attempt fails first, but task self still running. +// This may caused by ExecutorLost. +runEvent(makeCompletionEvent( + taskSets(1).tasks(1), + FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, "ignored"), + null)) +// Make sure failedStage is not empty now +assert(scheduler.failedStages.nonEmpty) +// The second result task self success soon. +assert(taskSets(1).tasks(1).isInstanceOf[ResultTask[_, _]]) +runEvent(makeCompletionEvent( + taskSets(1).tasks(1), Success, makeMapStatus("hostB", 2))) +assertDataStructuresEmpty() --- End diff -- Right. It is a check that we are cleaning up the contents of the DAGScheduler's data structures so that they do not grow without bound over time. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21096: [SPARK-24011][CORE] cache rdd's immediate parent Shuffle...
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/21096 As is, this PR isn't acceptable on multiple levels. Even if I were convinced (which I am not presently) that the sequence of `getShuffleDependencies` calls covered in this PR is the only one possible and therefore can be cached/memoized as this PR does, a complete lack of test coverage for the new code is not something we want in something as critical as the DAGScheduler. And I can tell you right now that there is at least one missing test that this PR will fail. You are adding a new data structure to the DAGScheduler, so at a bare minimum you also need to add that HashMap to `assertDataStructuresEmpty` in the DAGSchedulerSuite. Never deleting unneeded elements from a DAGScheduler data structure is an unacceptable resource leak. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21071: [SPARK-21962][CORE] Distributed Tracing in Spark
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/21071 @rxin +1 for each of your sentences. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20881: Add a note about jobs running in FIFO order in th...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/20881#discussion_r178928945 --- Diff: docs/job-scheduling.md --- @@ -215,6 +215,9 @@ pool), but inside each pool, jobs run in FIFO order. For example, if you create means that each user will get an equal share of the cluster, and that each user's queries will run in order instead of later queries taking resources from that user's earlier ones. +If jobs are not explicitly set to use a given pool, they end up in the default pool. This means that even if +`spark.scheduler.mode` is set to `FAIR` those jobs will be run in `FIFO` order (within the default pool). + --- End diff -- You seem to be missing a few somethings: 1) You can define your own default pool that does FAIR scheduling within that pool, so blanket statements about "the" default pool are dangerous; 2) `spark.scheduler.mode` controls the setup of the rootPool, not the scheduling within any pool; 3) If you don't define your own pool with a name corresponding to the DEFAULT_POOL_NAME (i.e. "default"), then you are going to get a default construction of "default", which does use FIFO scheduling within that pool. So, item 2) effectively means that `spark.scheduler.mode` controls whether fair scheduling is possible at all, and it also defines the kind of scheduling that is used among the shedulable entities contained in the root pool -- i.e. among the scheduling pools nested within rootPool. One of those nested pools will be DEFAULT_POOL_NAME/"default", which will use FIFO scheduling for schedulable entities within that pool if you haven't defined it to use fair scheduling. If you just want one scheduling pool that does fair scheduling among its schedulable entities, then you need to set `spark.scheduler.mode` to "FAIR" in your SparkConf and _also_ define in the pool configuration file a "default" pool to use schedulingMode FAIR. You could alternatively define such a fair-scheduling-inside pool named something other than "default" and then make sure that all of your jobs get assigned to that pool. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20881: Add a note about jobs running in FIFO order in th...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/20881#discussion_r178886364 --- Diff: docs/job-scheduling.md --- @@ -215,6 +215,9 @@ pool), but inside each pool, jobs run in FIFO order. For example, if you create means that each user will get an equal share of the cluster, and that each user's queries will run in order instead of later queries taking resources from that user's earlier ones. +If jobs are not explicitly set to use a given pool, they end up in the default pool. This means that even if +`spark.scheduler.mode` is set to `FAIR` those jobs will be run in `FIFO` order (within the default pool). + --- End diff -- This is not actually correct. There is no reason why you can't define a default pool that uses FAIR scheduling. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20770: [SPARK-23626][CORE] DAGScheduler blocked due to JobSubmi...
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/20770 @squito is the master of DAGSchedulerSuite, and can provide you the best advice on changing or adding to the existing DAGSchedulerSuite. I'll be back from skiing next week and try to find some time to look at this. Hopefully @kayousterhout can find some time too. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20016: SPARK-22830 Scala Coding style has been improved ...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/20016#discussion_r157790330 --- Diff: examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala --- @@ -42,7 +42,7 @@ object BroadcastTest { val arr1 = (0 until num).toArray for (i <- 0 until 3) { - println("Iteration " + i) + println(s"Iteration ${i}") --- End diff -- Beyond the unnecessary { } that @srowen has already mentioned, this isn't really a style improvement. `"a string " + anotherString` is arguably at least as good stylistically as using string interpolation for such simple concatenations of a string reference to the end of a string literal. It's only when there are multiple concatenations and/or multiple string references that interpolation is clearly the better way. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/19468#discussion_r151259662 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala --- @@ -0,0 +1,427 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import java.io.Closeable +import java.net.InetAddress +import java.util.concurrent.{ConcurrentHashMap, ExecutorService, ScheduledExecutorService, TimeUnit} +import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, AtomicReference} +import javax.annotation.concurrent.GuardedBy + +import io.fabric8.kubernetes.api.model._ +import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException, Watcher} +import io.fabric8.kubernetes.client.Watcher.Action +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.concurrent.{ExecutionContext, Future} + +import org.apache.spark.SparkException +import org.apache.spark.deploy.k8s.config._ +import org.apache.spark.deploy.k8s.constants._ +import org.apache.spark.rpc.{RpcAddress, RpcEndpointAddress, RpcEnv} +import org.apache.spark.scheduler.{ExecutorExited, SlaveLost, TaskSchedulerImpl} +import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, SchedulerBackendUtils} +import org.apache.spark.util.Utils + +private[spark] class KubernetesClusterSchedulerBackend( +scheduler: TaskSchedulerImpl, +rpcEnv: RpcEnv, +executorPodFactory: ExecutorPodFactory, +kubernetesClient: KubernetesClient, +allocatorExecutor: ScheduledExecutorService, +requestExecutorsService: ExecutorService) + extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv) { + + import KubernetesClusterSchedulerBackend._ + + private val EXECUTOR_ID_COUNTER = new AtomicLong(0L) + private val RUNNING_EXECUTOR_PODS_LOCK = new Object + @GuardedBy("RUNNING_EXECUTOR_PODS_LOCK") + private val runningExecutorsToPods = new mutable.HashMap[String, Pod] + private val executorPodsByIPs = new ConcurrentHashMap[String, Pod]() + private val podsWithKnownExitReasons = new ConcurrentHashMap[String, ExecutorExited]() + private val disconnectedPodsByExecutorIdPendingRemoval = new ConcurrentHashMap[String, Pod]() + + private val kubernetesNamespace = conf.get(KUBERNETES_NAMESPACE) + + private val kubernetesDriverPodName = conf +.get(KUBERNETES_DRIVER_POD_NAME) +.getOrElse(throw new SparkException("Must specify the driver pod name")) + private implicit val requestExecutorContext = ExecutionContext.fromExecutorService( +requestExecutorsService) + + private val driverPod = kubernetesClient.pods() +.inNamespace(kubernetesNamespace) +.withName(kubernetesDriverPodName) +.get() + + override val minRegisteredRatio = +if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) { + 0.8 +} else { + super.minRegisteredRatio +} + + private val executorWatchResource = new AtomicReference[Closeable] + protected val totalExpectedExecutors = new AtomicInteger(0) + + private val driverUrl = RpcEndpointAddress( +conf.get("spark.driver.host"), +conf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT), +CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString + + private val initialExecutors = SchedulerBackendUtils.getInitialTargetExecutorNumber(conf) + + private val podAllocationInterval = conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY) + + private val podAllocationSize = conf.get(KUBERNETES_ALLOCATION_BATCH_SIZE) + + private val allocatorRunnable = new Runnable { + +// Maintains a map of executor id to count of checks performed to learn the loss reason +//
[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/19468#discussion_r146072523 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala --- @@ -0,0 +1,456 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import java.io.Closeable +import java.net.InetAddress +import java.util.concurrent.{ConcurrentHashMap, ExecutorService, ScheduledExecutorService, TimeUnit} +import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, AtomicReference} + +import scala.collection.{concurrent, mutable} +import scala.collection.JavaConverters._ +import scala.concurrent.{ExecutionContext, Future} + +import io.fabric8.kubernetes.api.model._ +import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException, Watcher} +import io.fabric8.kubernetes.client.Watcher.Action + +import org.apache.spark.SparkException +import org.apache.spark.deploy.k8s.config._ +import org.apache.spark.deploy.k8s.constants._ +import org.apache.spark.rpc.{RpcAddress, RpcEndpointAddress, RpcEnv} +import org.apache.spark.scheduler.{ExecutorExited, SlaveLost, TaskSchedulerImpl} +import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend +import org.apache.spark.util.Utils + +private[spark] class KubernetesClusterSchedulerBackend( +scheduler: TaskSchedulerImpl, +rpcEnv: RpcEnv, +executorPodFactory: ExecutorPodFactory, +kubernetesClient: KubernetesClient, +allocatorExecutor: ScheduledExecutorService, +requestExecutorsService: ExecutorService) + extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv) { + + import KubernetesClusterSchedulerBackend._ + + private val EXECUTOR_ID_COUNTER = new AtomicLong(0L) + private val RUNNING_EXECUTOR_PODS_LOCK = new Object + // Indexed by executor IDs and guarded by RUNNING_EXECUTOR_PODS_LOCK. + private val runningExecutorsToPods = new mutable.HashMap[String, Pod] + // Indexed by executor pod names and guarded by RUNNING_EXECUTOR_PODS_LOCK. + private val runningPodsToExecutors = new mutable.HashMap[String, String] + private val executorPodsByIPs = new ConcurrentHashMap[String, Pod]() + private val podsWithKnownExitReasons = new ConcurrentHashMap[String, ExecutorExited]() + private val disconnectedPodsByExecutorIdPendingRemoval = new ConcurrentHashMap[String, Pod]() + + private val kubernetesNamespace = conf.get(KUBERNETES_NAMESPACE) + + private val kubernetesDriverPodName = conf +.get(KUBERNETES_DRIVER_POD_NAME) +.getOrElse( + throw new SparkException("Must specify the driver pod name")) + private implicit val requestExecutorContext = ExecutionContext.fromExecutorService( + requestExecutorsService) + + private val driverPod = try { +kubernetesClient.pods() + .inNamespace(kubernetesNamespace) + .withName(kubernetesDriverPodName) + .get() + } catch { +case throwable: Throwable => + logError(s"Executor cannot find driver pod.", throwable) + throw new SparkException(s"Executor cannot find driver pod", throwable) + } + + override val minRegisteredRatio = +if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) { + 0.8 +} else { + super.minRegisteredRatio +} + + private val executorWatchResource = new AtomicReference[Closeable] + protected var totalExpectedExecutors = new AtomicInteger(0) + + private val driverUrl = RpcEndpointAddress( + conf.get("spark.driver.host"), + conf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT), + CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString + + private val initialExecutors = getIniti
[GitHub] spark issue #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - Basic Sc...
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/19468 > iron out the kinks A large chunk of the difficulty in identifying and ironing out kinks in such a project is the difficulty of writing adequate tests of the scheduler code. I'd expect test coverage to take roughly the same amount of effort as all of the rest of the scheduler plug-in effort. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19287: [SPARK-22074][Core] Task killed by other attempt ...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/19287#discussion_r140788231 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala --- @@ -93,6 +104,8 @@ class TaskInfo( def running: Boolean = !finished + def needResubmit: Boolean = !killedAttempt --- End diff -- Since `killedAttempt`/`killedByAttempt` is never used except in this negated sense, it probably makes more sense to get rid of it and this `needResubmit` method entirely, and just have a `needResubmit` var with the opposite sense of `killedAttempt` -- but I'm also not all that happy with the `needResubmit` name. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19194: [SPARK-20589] Allow limiting task concurrency per...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/19194#discussion_r140340018 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -619,6 +625,47 @@ private[spark] class ExecutorAllocationManager( // place the executors. private val stageIdToExecutorPlacementHints = new mutable.HashMap[Int, (Int, Map[String, Int])] +override def onJobStart(jobStart: SparkListenerJobStart): Unit = { + jobStart.stageInfos.foreach(stageInfo => stageIdToJobId(stageInfo.stageId) = jobStart.jobId) + + var jobGroupId = if (jobStart.properties != null) { +jobStart.properties.getProperty(SparkContext.SPARK_JOB_GROUP_ID) + } else { +null + } + + val maxConTasks = if (jobGroupId != null && +conf.contains(s"spark.job.$jobGroupId.maxConcurrentTasks")) { +conf.get(s"spark.job.$jobGroupId.maxConcurrentTasks").toInt + } else { +Int.MaxValue + } + + if (maxConTasks <= 0) { +throw new IllegalArgumentException( + "Maximum Concurrent Tasks should be set greater than 0 for the job to progress.") + } + + if (jobGroupId == null || !conf.contains(s"spark.job.$jobGroupId.maxConcurrentTasks")) { +jobGroupId = DEFAULT_JOB_GROUP + } + + jobIdToJobGroup(jobStart.jobId) = jobGroupId + if (!jobGroupToMaxConTasks.contains(jobGroupId)) { --- End diff -- Do we even need mutable state associated with a job group? Some things would be a lot simpler if maxConTasks could only be set when the job group is created; and if you need a different number of maxConTasks, then you have to use a different job group. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19287: [SPARK-22074][Core] Task killed by other attempt ...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/19287#discussion_r140047573 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala --- @@ -66,6 +66,12 @@ class TaskInfo( */ var finishTime: Long = 0 + /** + * Set this tag when this task killed by other attempt. This kind of task should not resubmit + * while executor lost. + */ --- End diff -- nit: calling it a "tag" doesn't aid understanding, but rather leaves me wondering why it was called a "tag" instead of a "var" or something more descriptive. more consequential: Please explain what defines "this kind" of task. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19115: [SPARK-21882][CORE] OutputMetrics doesn't count written ...
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/19115 And now I see that the title was changed to something more useful. Pardon any offense, the end result of the title changes look good. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19115: [SPARK-21882][CORE] OutputMetrics doesn't count written ...
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/19115 I realize this PR is now closed, but to follow-up on Saisai's request concerning PR titles, I'll also note that the title of this PR isn't very useful even after the JIRA id and component tag are added. Titles like "fixed foo" or "updated bar" don't really tell reviewers or those looking at the commit logs in the future what the PR is about. The JIRA should tell us _why_ a change or addition is needed, the description in the PR should tell us _what_ was changed or added, and the PR title should give us enough of an idea of what is going on that we don't necessarily have to open the PR or look at the code changes just to see whether it is something that we are even at all interested in. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18805: [SPARK-19112][CORE] Support for ZStandard codec
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/18805 In addition to LICENSE, there is also COPYING in the v1.3.1 release: https://github.com/facebook/zstd/blob/v1.3.1/LICENSE https://github.com/facebook/zstd/blob/v1.3.1/COPYING I'm not going to pretend to have a fully informed opinion on this issue. On Mon, Aug 21, 2017 at 11:18 AM, Dongjoon Hyun <notificati...@github.com> wrote: > Thank you for confirming, @vanzin <https://github.com/vanzin> ! I see. > > â > You are receiving this because you were mentioned. > Reply to this email directly, view it on GitHub > <https://github.com/apache/spark/pull/18805#issuecomment-323815495>, or mute > the thread > <https://github.com/notifications/unsubscribe-auth/AAZ4-pqXm-R2schCRbHQawxygQGS9-q2ks5sacnxgaJpZM4Oqd2u> > . > --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18950: [SPARK-20589][Core][Scheduler] Allow limiting tas...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/18950#discussion_r133344532 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -602,6 +604,21 @@ private[spark] class ExecutorAllocationManager( // place the executors. private val stageIdToExecutorPlacementHints = new mutable.HashMap[Int, (Int, Map[String, Int])] +override def onJobStart(jobStart: SparkListenerJobStart): Unit = { + val jobGroupId = if (jobStart.properties != null) { +jobStart.properties.getProperty(SparkContext.SPARK_JOB_GROUP_ID) + } else { +"" + } + val maxConcurrentTasks = conf.getInt(s"spark.job.$jobGroupId.maxConcurrentTasks", +Int.MaxValue) + + logInfo(s"Setting maximum concurrent tasks for group: ${jobGroupId} to $maxConcurrentTasks") + allocationManager.synchronized { +allocationManager.maxConcurrentTasks = maxConcurrentTasks --- End diff -- Ummm... what? It is entirely possible to set a job group, spawn a bunch of threads that will eventually create jobs in that job group, then set another job group and spawn more threads that will be creating jobs in this new group simultaneously with jobs being created in the prior group. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18807: [SPARK-21601][BUILD] Modify the pom.xml file, increase t...
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/18807 Yes, it doesn't really hurt anything except to be confusing cruft that has a tendency to accumulate in POMs. If we're going to put those lines back, I suggest that they be accompanied by a comment that they are known to be a useful assist for certain versions of IntelliJ and are not needed for the correctness of the build itself. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18807: [SPARK-21601][BUILD] Modify the pom.xml file, increase t...
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/18807 Maven is the build of reference, true; but maven itself doesn't need the JDK version to be specified both in the scala plugin configuration and in the compiler plugin configuration. While I understand that IntelliJ is used by many Spark developers, it isn't some sort of officially preferred IDE of reference. That's why I have some qualms about embedding workarounds into our POM just to satisfy the idiosyncrasies of a particular IDE. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18807: [SPARK-21601][BUILD] Modify the pom.xml file, increase t...
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/18807 Hmmm... that's arguably broken behavior on the part of IntelliJ or something to be worked around in IntelliJ configuration, not by hacking our POM. Without the POM hack, though, it is definitely something that deserves explanation in the Building Spark docs. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18807: [SPARK-21601][BUILD] Modify the pom.xml file, increase t...
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/18807 These are maven-compiler-plugin configurations. We don't use maven-compiler-plugin to compile Java code: https://github.com/apache/spark/commit/74cda94c5e496e29f42f1044aab90cab7dbe9d38 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18093: [WIP][SPARK-20774][SQL] Cancel all jobs when Quer...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/18093#discussion_r129382985 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala --- @@ -89,8 +91,22 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) { lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan) /** Internal version of the RDD. Avoids copies and has no schema */ - lazy val toRdd: RDD[InternalRow] = executedPlan.execute() + lazy val toRdd: RDD[InternalRow] = executePlan() + private def executePlan(): RDD[InternalRow] = { +sparkSession.sparkContext.setJobGroup(runId.toString, getDescriptionString, --- End diff -- Wait... aren't you clobbering any JobGroup that is already set? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17955: [SPARK-20715] Store MapStatuses only in MapOutputTracker...
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/17955 @JoshRosen Yes, I agree that it is orthogonal -- at least for now. I'm mostly just offering a heads up that if we get around to addressing `interruptThread`, then there may also need to be some changes related to mapOutput tracking. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17955: [SPARK-20715] Store MapStatuses only in MapOutputTracker...
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/17955 I've looked at only the DAGScheduler changes so far. They LGTM. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17955: [SPARK-20715] Store MapStatuses only in MapOutputTracker...
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/17955 @JoshRosen The hard coding of interruptThread = true within TaskSetManager's handleSuccessfulTask to effect the killing of duplicate, speculative attempts of a task is potentially an issue -- not a new issue with this PR, but one that hasn't been fully analyzed and addressed AFAIK. https://issues.apache.org/jira/browse/SPARK-17064 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16165: [SPARK-8617] [WEBUI] HistoryServer: Include in-progress ...
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/16165 @merlintang Marcelo's point remains the same for 2.1.1. We don't typically backport changes to maintenance branches unless they are fixes for regression errors or severe bugs. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17522: [SPARK-18278] [Scheduler] Documentation to point ...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/17522#discussion_r109523328 --- Diff: docs/cluster-overview.md --- @@ -52,7 +52,11 @@ The system currently supports three cluster managers: * [Apache Mesos](running-on-mesos.html) -- a general cluster manager that can also run Hadoop MapReduce and service applications. * [Hadoop YARN](running-on-yarn.html) -- the resource manager in Hadoop 2. - +* [Kubernetes (experimental)](https://github.com/apache-spark-on-k8s/spark) -- In addition to the above, +there is experimental support for Kubernetes. Kubernetes is an open-source platform +for providing container-centric infrastructure. Kubernetes support is being actively +developed in an [apache-spark-on-k8s](https://github.com/apache-spark-on-k8s/) Github organization +and will eventually merge into the Apache Spark project. For documentation, refer to that project's README. --- End diff -- I'd change "will" to "may" or "is intended to" or something similar since there is not yet a formal, hard commitment to definitely merge this work into Apache Spark. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17485: [SPARK-20163] Kill all running tasks in a stage i...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/17485#discussion_r109174578 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala --- @@ -768,6 +767,19 @@ private[spark] class TaskSetManager( s" executor ${info.executorId}): ${reason.toErrorString}" val failureException: Option[Throwable] = reason match { case fetchFailed: FetchFailed => +if (!isZombie) { + for (i <- 0 until numTasks if i != index) { +// Only for the first occurance of the fetch failure, kill all running +// tasks in the task set +for (attemptInfo <- taskAttempts(i) if attemptInfo.running) { + sched.backend.killTask( +attemptInfo.taskId, +attemptInfo.executorId, +interruptThread = true, --- End diff -- We can do it then, but there is still the question of whether we should do it. That discussion belongs in SPARK-20178. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17485: [SPARK-20163] Kill all running tasks in a stage i...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/17485#discussion_r109038854 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala --- @@ -768,6 +767,19 @@ private[spark] class TaskSetManager( s" executor ${info.executorId}): ${reason.toErrorString}" val failureException: Option[Throwable] = reason match { case fetchFailed: FetchFailed => +if (!isZombie) { + for (i <- 0 until numTasks if i != index) { +// Only for the first occurance of the fetch failure, kill all running +// tasks in the task set +for (attemptInfo <- taskAttempts(i) if attemptInfo.running) { + sched.backend.killTask( +attemptInfo.taskId, +attemptInfo.executorId, +interruptThread = true, --- End diff -- That's not valid. We don't know that this can be done safely, which is why spark.job.interruptOnCancel defaults to false. SPARK-17064 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17297: [SPARK-14649][CORE] DagScheduler should not run duplicat...
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/17297 Agreed. Let's establish what we want to do before trying to discuss the details of how we are going to do it. On Tue, Mar 28, 2017 at 8:17 AM, Imran Rashid <notificati...@github.com> wrote: > @sitalkedia <https://github.com/sitalkedia> This change is pretty > contentious, there are lot of questions about whether or not this is a good > change. I don't think discussing this here in github comments on a PR is > the best form. I think of PR comments as being more about code details -- > clarity, tests, whether the implementation is correct, etc. But here we're > discussing whether the behavior is even desirable, as well as trying to > discuss this in relation to other changes. I think a better format would be > for you to open a jira and submit a design document (maybe a shared google > doc at first), where we can focus more on the desired behavior and consider > all the changes, even if the PRs are smaller to make them easier to review. > > I'm explicitly *not* making a judgement on whether or not this is a good > change. Also I do appreciate you having the code changes ready, as a POC, > as that can help folks consider the complexity of the change. But it seems > clear to me that first we need to come to a decision about the end goal. > > Also, assuming we do decide this is desirable behavior, there is also a > question about how we can get changes like this in without risking breaking > things -- I have started a thread on dev@ related to that topic in > general, but we should figure that for these changes in particular as well. > > @kayousterhout <https://github.com/kayousterhout> @tgravescs > <https://github.com/tgravescs> @markhamstra > <https://github.com/markhamstra> makes sense? > > â > You are receiving this because you were mentioned. > Reply to this email directly, view it on GitHub > <https://github.com/apache/spark/pull/17297#issuecomment-289803690>, or mute > the thread > <https://github.com/notifications/unsubscribe-auth/AAZ4-pbaJWHOMCLOB2JZFReBYx0E1xOHks5rqSSTgaJpZM4MdN08> > . > --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17447: [SPARK-20117][Scheduler]TaskSetManager checkSpeculatable...
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/17447 I wouldn't bother with the string interpolation change (there is a good argument to be made that string interpolation doesn't gain you anything in patterns like those in this PR where a single string is being concatenated to the end of another); and the var -> val change in a local variable isn't significant enough to justify a PR. Thanks for contributing, but please close this PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17088: [SPARK-19753][CORE] Un-register all shuffle outpu...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/17088#discussion_r107286200 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1365,19 +1375,43 @@ class DAGScheduler( */ private[scheduler] def handleExecutorLost( execId: String, - filesLost: Boolean, - maybeEpoch: Option[Long] = None) { + workerLost: Boolean): Unit = { +// if the cluster manager explicitly tells us that the entire worker was lost, then +// we know to unregister shuffle output. (Note that "worker" specifically refers to the process +// from a Standalone cluster, where the shuffle service lives in the Worker.) +val filesLost = workerLost || !env.blockManager.externalShuffleServiceEnabled +removeExecutorAndUnregisterOutputs( + execId = execId, + fileLost = filesLost, --- End diff -- The `fileLost` vs. `filesLost` naming difference is a little confusing -- is the distinction even conveying a difference worth paying attention to? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17088: [SPARK-19753][CORE] Un-register all shuffle outpu...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/17088#discussion_r107284202 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1683,11 +1716,12 @@ private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler dagScheduler.handleExecutorAdded(execId, host) case ExecutorLost(execId, reason) => - val filesLost = reason match { -case SlaveLost(_, true) => true + val workerLost = reason match { +case SlaveLost(_, true) => + true --- End diff -- nit: prefer it without the line break for something this simple --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17088: [SPARK-19753][CORE] Un-register all shuffle outpu...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/17088#discussion_r107284085 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1389,8 +1423,7 @@ class DAGScheduler( clearCacheLocs() } } else { - logDebug("Additional executor lost message for " + execId + - "(epoch " + currentEpoch + ")") + logDebug("Additional executor lost message for %s (epoch %d)".format(execId, currentEpoch)) --- End diff -- nit: prefer string interpolation over `format`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17088: [SPARK-19753][CORE] Un-register all shuffle outpu...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/17088#discussion_r107283229 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1331,7 +1328,20 @@ class DAGScheduler( // TODO: mark the executor as failed only if there were lots of fetch failures on it if (bmAddress != null) { -handleExecutorLost(bmAddress.executorId, filesLost = true, Some(task.epoch)) +val hostToUnregisterOutputs = if (env.blockManager.externalShuffleServiceEnabled) { + // We had a fetch failure with the external shuffle service, so we + // assume all shuffle data on the node is bad. + Some(bmAddress.host) +} else { + // Deregister shuffle data just for one executor (we don't have any --- End diff -- nit: "Unregister" is used elsewhere (function names, etc.), not "deregister". --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17357: [SPARK-20025][CORE] Ignore SPARK_LOCAL* env, while deplo...
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/17357 It's a lot better. Thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17297: [SPARK-14649][CORE] DagScheduler should not run d...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/17297#discussion_r107044660 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -929,12 +946,22 @@ class DAGScheduler( } } - /** Called when stage's parents are available and we can now do its task. */ + /** + * Called when stage's parents are available and we can now run its task. + * This only submits the partitions which are missing and have not been + * submitted to the lower-level scheduler for execution. + */ private def submitMissingTasks(stage: Stage, jobId: Int) { logDebug("submitMissingTasks(" + stage + ")") -// First figure out the indexes of partition ids to compute. -val partitionsToCompute: Seq[Int] = stage.findMissingPartitions() +val missingPartitions = stage.findMissingPartitions() +val partitionsToCompute = + missingPartitions.filter(id => !stage.pendingPartitions.contains(id)) --- End diff -- ```scala missingPartitions.filterNot(stage.pendingPartitions) ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17297: [SPARK-14649][CORE] DagScheduler should not run d...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/17297#discussion_r107044272 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -803,6 +810,16 @@ class DAGScheduler( stageIdToStage.get(taskSet.stageId).foreach { abortStage(_, reason, exception) } } + private[scheduler] def handleTasksAborted( + stageId: Int, + tasks: Seq[Task[_]]): Unit = { +for (stage <- stageIdToStage.get(stageId)) { + for (task <- tasks) { +stage.pendingPartitions -= task.partitionId + } +} --- End diff -- ```scala for { stage <- stageIdToStage.get(stageId) task <- tasks } stage.pendingPartitions -= task.partitionId ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17297: [SPARK-14649][CORE] DagScheduler should not run d...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/17297#discussion_r107040190 --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala --- @@ -418,6 +424,15 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf, cachedSerializedStatuses.contains(shuffleId) || mapStatuses.contains(shuffleId) } + /** Get the epoch for map output for a shuffle, if it is available */ + def getEpochForMapOutput(shuffleId: Int, mapId: Int): Option[Long] = { +val arrayOpt = mapStatuses.get(shuffleId) +if (arrayOpt.isDefined && arrayOpt.get != null && arrayOpt.get(mapId) != null) { + return Some(epochForMapStatus.get(shuffleId).get(mapId)) +} +None + } --- End diff -- First, `arrayOpt.get != null` isn't necessary since we don't put `null` values into `mapStatuses`. Second, `epochForMapStatus.get(shuffleId).get` is the same as `epochForMapStatus(shuffleId)`. Third, I don't like all the explicit `get`s,`null` checks and the unnecessary non-local `return`. To my mind, this is better: ``` scala def getEpochForMapOutput(shuffleId: Int, mapId: Int): Option[Long] = { for { mapStatus <- mapStatuses.get(shuffleId).flatMap { mapStatusArray => Option(mapStatusArray(mapId)) } } yield epochForMapStatus(shuffleId)(mapId) } ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17297: [SPARK-14649][CORE] DagScheduler should not run d...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/17297#discussion_r107018874 --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala --- @@ -378,15 +382,17 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf, val array = mapStatuses(shuffleId) array.synchronized { array(mapId) = status + val epochs = epochForMapStatus.get(shuffleId).get --- End diff -- ```scala val epochs = epochForMapStatus(shuffleId) ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17297: [SPARK-14649][CORE] DagScheduler should not run d...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/17297#discussion_r107018555 --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala --- @@ -378,15 +382,17 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf, val array = mapStatuses(shuffleId) array.synchronized { array(mapId) = status + val epochs = epochForMapStatus.get(shuffleId).get + epochs(mapId) = epoch } } /** Register multiple map output information for the given shuffle */ def registerMapOutputs(shuffleId: Int, statuses: Array[MapStatus], changeEpoch: Boolean = false) { -mapStatuses.put(shuffleId, statuses.clone()) if (changeEpoch) { incrementEpoch() } +mapStatuses.put(shuffleId, statuses.clone()) --- End diff -- What was the point of moving this? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17297: [SPARK-14649][CORE] DagScheduler should not run d...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/17297#discussion_r107017201 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1265,64 +1280,11 @@ class DAGScheduler( val failedStage = stageIdToStage(task.stageId) val mapStage = shuffleIdToMapStage(shuffleId) -if (failedStage.latestInfo.attemptId != task.stageAttemptId) { - logInfo(s"Ignoring fetch failure from $task as it's from $failedStage attempt" + -s" ${task.stageAttemptId} and there is a more recent attempt for that stage " + -s"(attempt ID ${failedStage.latestInfo.attemptId}) running") -} else { - // It is likely that we receive multiple FetchFailed for a single stage (because we have - // multiple tasks running concurrently on different executors). In that case, it is - // possible the fetch failure has already been handled by the scheduler. - if (runningStages.contains(failedStage)) { -logInfo(s"Marking $failedStage (${failedStage.name}) as failed " + - s"due to a fetch failure from $mapStage (${mapStage.name})") -markStageAsFinished(failedStage, Some(failureMessage)) - } else { -logDebug(s"Received fetch failure from $task, but its from $failedStage which is no " + - s"longer running") - } - - val shouldAbortStage = -failedStage.failedOnFetchAndShouldAbort(task.stageAttemptId) || -disallowStageRetryForTest - - if (shouldAbortStage) { -val abortMessage = if (disallowStageRetryForTest) { - "Fetch failure will not retry stage due to testing config" -} else { - s"""$failedStage (${failedStage.name}) - |has failed the maximum allowable number of - |times: ${Stage.MAX_CONSECUTIVE_FETCH_FAILURES}. - |Most recent failure reason: $failureMessage""".stripMargin.replaceAll("\n", " ") -} -abortStage(failedStage, abortMessage, None) - } else { // update failedStages and make sure a ResubmitFailedStages event is enqueued -// TODO: Cancel running tasks in the failed stage -- cf. SPARK-17064 -val noResubmitEnqueued = !failedStages.contains(failedStage) -failedStages += failedStage -failedStages += mapStage -if (noResubmitEnqueued) { - // We expect one executor failure to trigger many FetchFailures in rapid succession, - // but all of those task failures can typically be handled by a single resubmission of - // the failed stage. We avoid flooding the scheduler's event queue with resubmit - // messages by checking whether a resubmit is already in the event queue for the - // failed stage. If there is already a resubmit enqueued for a different failed - // stage, that event would also be sufficient to handle the current failed stage, but - // producing a resubmit for each failed stage makes debugging and logging a little - // simpler while not producing an overwhelming number of scheduler events. - logInfo( -s"Resubmitting $mapStage (${mapStage.name}) and " + -s"$failedStage (${failedStage.name}) due to fetch failure" - ) - messageScheduler.schedule( -new Runnable { - override def run(): Unit = eventProcessLoop.post(ResubmitFailedStages) -}, -DAGScheduler.RESUBMIT_TIMEOUT, -TimeUnit.MILLISECONDS - ) -} - } +val epochForMapOutput = mapOutputTracker.getEpochForMapOutput(shuffleId, mapId) +// It is possible that the map output was regenerated by rerun of the stage and the +// fetch failure is being reported for stale map output. In that case, we should just +// ignore the fetch failure and relaunch the task with latest map output info. +if (epochForMapOutput.nonEmpty && epochForMapOutput.get <= task.epoch) { --- End diff -- I'd be inclined to do this without the extra binding and `get`: ```scala for(epochForMapOutput <- mapOutputTracker.getEpochForMapOutput(shuffleId, mapId) if epochForMapOutput <= task.epoch) {
[GitHub] spark issue #17357: [SPARK-20025][CORE] Fix spark's driver failover mechanis...
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/17357 Please change the title of this PR. "Fixed foo" is nearly useless when scanning the commit log in the future since it doesn't tell us anything about either the nature of the problem or the actual code change. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17113: [SPARK-13669][Core] Improve the blacklist mechanism to h...
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/17113 @squito Correct, we really only try to kill running tasks currently on job failure (and if the config setting allows it); but there is the long-standing "TODO: Cancel running tasks in the stage" in `case FetchFailed` of `DAGScheduler#handleTaskCompletion`, which has languished as a TODO both because resolving it would require us both to make Spark itself handle task interruption in the fetch failure case and to deal with the same issues preventing us from making task interruption the default even for job cancelation. All I'm really saying is that we shouldn't design in a hard requirement that tasks cannot be interrupted (on job cancelation, fetch failure or some other event), because we'd often really like to be able to kill running tasks -- even though we don't know quite how to do that safely right now. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17113: [SPARK-13669][Core] Improve the blacklist mechanism to h...
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/17113 @tgravescs At the config level, it is spark.job.interruptOnCancel or SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, which then gets passed around as a boolean -- e.g. shouldInterruptThread. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17113: [SPARK-13669][Core] Improve the blacklist mechanism to h...
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/17113 @mridulm Correct, turning task interruption on by default is not so much a matter of Spark itself handling it well as it is a possible (though not completely known) issue with lower layer libraries not handling interruption well. The original concern with HDFS is likely fixed now, but there are similar concerns with Cassandra and other libraries. Logically, we'd like to interrupt Tasks when associated Jobs or Stages are killed in the DAGScheduler. In practice, nobody knows right now how to do that safely in all circumstances, so the default is to not attempt to interrupt the tasks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17113: [SPARK-13669][Core] Improve the blacklist mechanism to h...
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/17113 > Spark does immediately abort the stage but it doesn't kill the running tasks Whether running tasks are interrupted on stage abort or not depends on the state of a config boolean -- and ideally we'd like to get to the point where we can confidently set that config so that running tasks are interrupted when the associated job or stage dies. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17113: [SPARK-13669][Core] Improve the blacklist mechanism to h...
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/17113 "Current Spark's blacklist mechanism": please be more precise. The most recent released version of Spark, 2.1.0, does not include a lot of recent changes to blacklisting (mostly https://github.com/apache/spark/commit/93cdb8a7d0f124b4db069fd8242207c82e263c52). Are the problems you are describing fully explored with the master branch of Spark? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17088: [SPARK-19753][CORE] All shuffle files on a host should b...
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/17088 Even if I completely agreed that removing all of the shuffle files on a host was the correct design choice, I'd still be hesitant to merge this right now. That is simply because we have recently merged other changes related to fetch failure handling, and I'd really like to see some more time pass with just those changes in the code before we introduce more fetch failure changes. I don't want to get in the situation of merging this PR then getting reports of fetch failure bugs in master a week later, and not knowing whether to place the blame on this PR or the other recent fetch failure changes. That needn't preclude more discussion of this PR or possibly merging it after we have a little more experience and confidence with the code already in master. /cc @kayousterhout @squito --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17045: [SPARK-19373][MESOS] Base spark.scheduler.minRegisteredR...
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/17045 thanks --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17045: [SPARK-19373][MESOS] fix spark.scheduler.minRegisteredRe...
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/17045 Please avoid using "fix" as the description in a PR -- it doesn't tell us anything substantive about the nature of the problem or its resolution, so any future reviewing of commit messages will require digging into the actual committed code to get even a minimal idea of what was done. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16905: [SPARK-19567][CORE][SCHEDULER] Support some Schedulable ...
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/16905 Thanks, Shane & Kay! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16905: [SPARK-19567][CORE][SCHEDULER] Support some Schedulable ...
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/16905 If Jenkins is listening to me, that should have allowed you to trigger test for this PR. test this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16905: [SPARK-19567][CORE][SCHEDULER] Support some Schedulable ...
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/16905 ok to test --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/16620 LGTM --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #12524: [SPARK-12524][Core]DagScheduler may submit a task set fo...
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/12524 @JoshRosen I haven't tried to walk through the logs in your JIRA comment, but it wouldn't surprise me at all if this is the same issue that we've been working through in https://github.com/apache/spark/pull/16620 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15009: [SPARK-17443][SPARK-11035] Stop Spark Application...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/15009#discussion_r100950587 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala --- @@ -719,7 +719,23 @@ object SparkSubmit extends CommandLineUtils { printWarning("Subclasses of scala.App may not work correctly. Use a main() method instead.") } -val mainMethod = mainClass.getMethod("main", new Array[String](0).getClass) +val sparkAppMainMethodArr = mainClass.getMethods().filter{_.getName() == "sparkMain"} --- End diff -- And `getName` shouldn't be a mutator, so no parens. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16905: [SPARK-19567][CORE][SCHEDULER] Support some Sched...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/16905#discussion_r100836419 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -130,15 +130,17 @@ private[spark] class TaskSchedulerImpl private[scheduler]( val mapOutputTracker = SparkEnv.get.mapOutputTracker - var schedulableBuilder: SchedulableBuilder = null + private val SCHEDULER_MODE_PROPERTY = "spark.scheduler.mode" + private var schedulableBuilder: SchedulableBuilder = null var rootPool: Pool = null // default scheduler is FIFO - private val schedulingModeConf = conf.get("spark.scheduler.mode", "FIFO") + private val schedulingModeConf = conf.get(SCHEDULER_MODE_PROPERTY, SchedulingMode.FIFO.toString) val schedulingMode: SchedulingMode = try { SchedulingMode.withName(schedulingModeConf.toUpperCase) } catch { case e: java.util.NoSuchElementException => - throw new SparkException(s"Unrecognized spark.scheduler.mode: $schedulingModeConf") + throw new SparkException(s"Unrecognized $SCHEDULER_MODE_PROPERTY: $schedulingModeConf. " + +s"Supported modes: ${SchedulingMode.FAIR} or ${SchedulingMode.FIFO}.") --- End diff -- We're not likely to add or remove SchedulingModes with any frequency, if at all, so this isn't likely to cause much opportunity for error -- but I agree with the principle that extracting from `.values` is a better approach. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16905: [SPARK-19567][CORE][SCHEDULER] Support some Sched...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/16905#discussion_r100834970 --- Diff: core/src/main/scala/org/apache/spark/scheduler/Pool.scala --- @@ -37,25 +37,22 @@ private[spark] class Pool( val schedulableQueue = new ConcurrentLinkedQueue[Schedulable] val schedulableNameToSchedulable = new ConcurrentHashMap[String, Schedulable] - var weight = initWeight - var minShare = initMinShare + val weight = initWeight + val minShare = initMinShare var runningTasks = 0 - var priority = 0 + val priority = 0 // A pool's stage id is used to break the tie in scheduling. var stageId = -1 - var name = poolName + val name = poolName var parent: Pool = null - var taskSetSchedulingAlgorithm: SchedulingAlgorithm = { + private val taskSetSchedulingAlgorithm: SchedulingAlgorithm = { schedulingMode match { - case SchedulingMode.FAIR => -new FairSchedulingAlgorithm() - case SchedulingMode.FIFO => -new FIFOSchedulingAlgorithm() - case _ => -val msg = "Unsupported scheduling mode: $schedulingMode. Use FAIR or FIFO instead." -throw new IllegalArgumentException(msg) + case SchedulingMode.FAIR => new FairSchedulingAlgorithm() + case SchedulingMode.FIFO => new FIFOSchedulingAlgorithm() + case _ => throw new IllegalArgumentException("Unsupported scheduling mode: " + +s"$schedulingMode. Supported modes: ${SchedulingMode.FAIR} or ${SchedulingMode.FIFO}.") --- End diff -- I'd really rather not see this kind of change. Other that the missing string-interpolation `s` in `msg`, the prior code was at least as good (and arguably better) than the new style, and making such an inconsequential style change just adds complication to future investigations of the git history. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16905: [SPARK-19567][CORE][SCHEDULER] Support some Schedulable ...
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/16905 @srowen These refactorings of unnecessary vars to vals is something that we've noted in the discussions of a few other PRs as something that could and probably should be done in a separate PR (i.e. this one). @erenavsarogullari There is another such refactoring that can be rolled into this PR. See the changes that I've made to `rootPool` in `TaskSchedulerImpl`, `DAGSchedulerSuite` and `ExternalClusterManagerSuite` here: https://github.com/markhamstra/spark/commit/e11fe2a9817559492daee03c8c025879dc44d346 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/16620 Thanks for all the investigation and the write up, @kayousterhout This makes good sense to me, and should take us a long way toward both fixing the immediate bug and improving the code. We should also make sure that our intentions and understanding get preserved in documentation that is more obvious and accessible in the future than PR discussion threads. Probably more comments in the source code that cover the essence of your "very long write up", but maybe we should consider creating an external documentation page (wiki or something) that covers in long form what we know and intend; then we can scale down the in-code comments to a shorter form that includes pointers to the long form. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16876: [SPARK-19537] Move pendingPartitions to ShuffleMapStage.
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/16876 You're welcome -- but do be aware that I'm going to be extremely busy with non-Spark stuff for at least the next week, so for awhile my Spark code reviews are likely to be more cursory than they should be. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16876: [SPARK-19537] Move pendingPartitions to ShuffleMapStage.
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/16876 Makes good sense to me. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/16620 @kayousterhout yes, I also looked at duplicating `stage.pendingPartitions -= task.partitionId`. I could live with that. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16813: [SPARK-19466][CORE][SCHEDULER] Improve Fair Sched...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/16813#discussion_r99964246 --- Diff: core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala --- @@ -69,60 +72,81 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf) val DEFAULT_WEIGHT = 1 override def buildPools() { -var is: Option[InputStream] = None +var fileData: Option[FileData] = None try { - is = Option { -schedulerAllocFile.map { f => - new FileInputStream(f) -}.getOrElse { - Utils.getSparkClassLoader.getResourceAsStream(DEFAULT_SCHEDULER_FILE) -} - } - - is.foreach { i => buildFairSchedulerPool(i) } + fileData = getFileData() + fileData.foreach { data => buildFairSchedulerPool(data) } +} catch { + case NonFatal(t) => +logError("Error while building the fair scheduler pools: ", t) +throw t } finally { - is.foreach(_.close()) + fileData.foreach(_.inputStream.close()) } // finally create "default" pool buildDefaultPool() } + private def getFileData(): Option[FileData] = { +schedulerAllocFile.map { f => + val file = new File(f) + val fis = new FileInputStream(file) --- End diff -- Yes, and if they do have multiple files with the same name on different paths and are expecting one of them to be used when Spark is actually trying to use one on a different path, then having the full path in the log message will be crucial to short-circuiting the debugging confusion. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/16620 @kayousterhout don't overestimate my enthusiasm for my own suggestion. I'm really just thinking aloud in search of a solution, and I agree with you that the TaskSetManager and DAGScheduler being in disagreement is not good. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/16620 The way that I am thinking about this right now is that @kayousterhout is on the right track with the early return at https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1141 , but that her proposed `stage...attemptId != task.stageAttemptId` is broader than it needs to be. My idea is that we want to be throwing away task results from earlier attempts that were run on executors that failed (on the presumption that one fetch failure means that other fetches from there are also going to fail), but that if the executor didn't fail, then the outputs from earlier attempts of tasks that complete late but successfully on still-good executors should still be valid and available, so we should accept them as though they were successful task completions for the current attempt. What you end up with is that if-statement now looking like: ```scala val stageHasBeenCancelled = !stageIdToStage.contains(task.stageId) val shuffleMapTaskIsFromFailedExecutor = task match { case smt: ShuffleMapTask => val status = event.result.asInstanceOf[MapStatus] val execId = status.location.executorId failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId) case _ => false } if (stageHasBeenCancelled || shuffleMapTaskIsFromFailedExecutor) { return } ``` ...and then the `failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)` check can be removed from `case smt: ShuffleMapTask =>`. If we can do it cleanly, I think we should be avoiding re-running Tasks that complete successfully and should still be available. This is a bit different from the intent of SPARK-14649, which I am reading as an effort not to ignore the results of long-running tasks that start and eventually complete on an executor on which some other tasks actually run into fetch failures. I'm really only trying to preserve the results of successful tasks run on executors that haven't failed. Unfortunately, the DAGSchedulerSuite doesn't agree with my intentions, because the above change actually leads to multiple test failures. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org