[jira] [Commented] (SPARK-27816) make TreeNode tag type safe
[ https://issues.apache.org/jira/browse/SPARK-27816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16847624#comment-16847624 ] Mark Hamstra commented on SPARK-27816: -- Jira issues with no description are really irritating – even worse when there is little to no discussion in any associated PR. > make TreeNode tag type safe > --- > > Key: SPARK-27816 > URL: https://issues.apache.org/jira/browse/SPARK-27816 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.0.0 >Reporter: Wenchen Fan >Assignee: Wenchen Fan >Priority: Major > Fix For: 3.0.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-27726) Performance of InMemoryStore suffers under load
[ https://issues.apache.org/jira/browse/SPARK-27726?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16840636#comment-16840636 ] Mark Hamstra commented on SPARK-27726: -- [~vanzin] > Performance of InMemoryStore suffers under load > --- > > Key: SPARK-27726 > URL: https://issues.apache.org/jira/browse/SPARK-27726 > Project: Spark > Issue Type: Umbrella > Components: Spark Core >Affects Versions: 2.3.3 >Reporter: David C Navas >Priority: Major > Attachments: GCRateIssues.pdf, PerformanceBeforeAndAfter.pdf > > > When our Spark system has been under load for an extended period of time, GC > remains highly active and the jobs page becomes unresponsive even when load > eases. Please see the attached GCRateIssues for more details regarding the > problem definition. > We found a number of separate issues which are detailed in the subtasks. I > anticipate committing a single PR for all subtasks whose commits roughly > align with the descriptions in the subtasks. > The performance of the code is measured before and after the change and is > attached in the document PerformanceBeforeAndAfter. tl;dr in our use case, > we saw about five (computed) orders of magnitude improvement. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26502) Get rid of hiveResultString() in QueryExecution
[ https://issues.apache.org/jira/browse/SPARK-26502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16732435#comment-16732435 ] Mark Hamstra commented on SPARK-26502: -- Don't lose track of this comment: [https://github.com/apache/spark/blob/948414afe706e0b526d7f83f598cbd204d2fc687/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala#L41] Any significant change to QueryExecution needs to doc'd carefully and included in the release notes since you will be forcing 3rd party changes. > Get rid of hiveResultString() in QueryExecution > --- > > Key: SPARK-26502 > URL: https://issues.apache.org/jira/browse/SPARK-26502 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0 >Reporter: Maxim Gekk >Priority: Minor > > The method hiveResultString() of QueryExecution is used in test and > SparkSQLDriver. It should be moved from QueryExecution to more specific class. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-21084) Improvements to dynamic allocation for notebook use cases
[ https://issues.apache.org/jira/browse/SPARK-21084?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mark Hamstra updated SPARK-21084: - Description: One important application of Spark is to support many notebook users with a single YARN or Spark Standalone cluster. We at IBM have seen this requirement across multiple deployments of Spark: on-premises and private cloud deployments at our clients, as well as on the IBM cloud. The scenario goes something like this: "Every morning at 9am, 500 analysts log into their computers and start running Spark notebooks intermittently for the next 8 hours." I'm sure that many other members of the community are interested in making similar scenarios work. Dynamic allocation is supposed to support these kinds of use cases by shifting cluster resources towards users who are currently executing scalable code. In our own testing, we have encountered a number of issues with using the current implementation of dynamic allocation for this purpose: *Issue #1: Starvation.* A Spark job acquires all available containers, preventing other jobs or applications from starting. *Issue #2: Request latency.* Jobs that would normally finish in less than 30 seconds take 2-4x longer than normal with dynamic allocation. *Issue #3: Unfair resource allocation due to cached data.* Applications that have cached RDD partitions hold onto executors indefinitely, denying those resources to other applications. *Issue #4: Loss of cached data leads to thrashing.* Applications repeatedly lose partitions of cached RDDs because the underlying executors are removed; the applications then need to rerun expensive computations. This umbrella JIRA covers efforts to address these issues by making enhancements to Spark. Here's a high-level summary of the current planned work: * [SPARK-21097]: Preserve an executor's cached data when shutting down the executor. * [SPARK-21122]: Make Spark give up executors in a controlled fashion when the RM indicates it is running low on capacity. * (JIRA TBD): Reduce the delay for dynamic allocation to spin up new executors. Note that this overall plan is subject to change, and other members of the community should feel free to suggest changes and to help out. was: One important application of Spark is to support many notebook users with a single YARN or Spark Standalone cluster. We at IBM have seen this requirement across multiple deployments of Spark: on-premises and private cloud deployments at our clients, as well as on the IBM cloud. The scenario goes something like this: "Every morning at 9am, 500 analysts log into their computers and start running Spark notebooks intermittently for the next 8 hours." I'm sure that many other members of the community are interested in making similar scenarios work. Dynamic allocation is supposed to support these kinds of use cases by shifting cluster resources towards users who are currently executing scalable code. In our own testing, we have encountered a number of issues with using the current implementation of dynamic allocation for this purpose: *Issue #1: Starvation.* A Spark job acquires all available containers, preventing other jobs or applications from starting. *Issue #2: Request latency.* Jobs that would normally finish in less than 30 seconds take 2-4x longer than normal with dynamic allocation. *Issue #3: Unfair resource allocation due to cached data.* Applications that have cached RDD partitions hold onto executors indefinitely, denying those resources to other applications. *Issue #4: Loss of cached data leads to thrashing.* Applications repeatedly lose partitions of cached RDDs because the underlying executors are removed; the applications then need to rerun expensive computations. This umbrella JIRA covers efforts to address these issues by making enhancements to Spark. Here's a high-level summary of the current planned work: * [SPARK-21097]: Preserve an executor's cached data when shutting down the executor. * (JIRA TBD): Make Spark give up executors in a controlled fashion when the RM indicates it is running low on capacity. * (JIRA TBD): Reduce the delay for dynamic allocation to spin up new executors. Note that this overall plan is subject to change, and other members of the community should feel free to suggest changes and to help out. > Improvements to dynamic allocation for notebook use cases > - > > Key: SPARK-21084 > URL: https://issues.apache.org/jira/browse/SPARK-21084 > Project: Spark > Issue Type: Umbrella > Components: Block Manager, Scheduler, Spark Core, YARN >Affects Versions: 2.2.0, 2.3.0 >Reporter: Frederick Reiss >Priority: Major > > One important application of Spark is to support many notebook users with a > single
[jira] [Commented] (SPARK-22683) DynamicAllocation wastes resources by allocating containers that will barely be used
[ https://issues.apache.org/jira/browse/SPARK-22683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16355943#comment-16355943 ] Mark Hamstra commented on SPARK-22683: -- I agree that setting the config to 1 should be sufficient to retain current behaviour; however, there seemed to be at least some urging in the discussion toward a default of 2. I'm not sure that we want to change Spark's default behavior in that way. > DynamicAllocation wastes resources by allocating containers that will barely > be used > > > Key: SPARK-22683 > URL: https://issues.apache.org/jira/browse/SPARK-22683 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.1.0, 2.2.0 >Reporter: Julien Cuquemelle >Priority: Major > Labels: pull-request-available > > While migrating a series of jobs from MR to Spark using dynamicAllocation, > I've noticed almost a doubling (+114% exactly) of resource consumption of > Spark w.r.t MR, for a wall clock time gain of 43% > About the context: > - resource usage stands for vcore-hours allocation for the whole job, as seen > by YARN > - I'm talking about a series of jobs because we provide our users with a way > to define experiments (via UI / DSL) that automatically get translated to > Spark / MR jobs and submitted on the cluster > - we submit around 500 of such jobs each day > - these jobs are usually one shot, and the amount of processing can vary a > lot between jobs, and as such finding an efficient number of executors for > each job is difficult to get right, which is the reason I took the path of > dynamic allocation. > - Some of the tests have been scheduled on an idle queue, some on a full > queue. > - experiments have been conducted with spark.executor-cores = 5 and 10, only > results for 5 cores have been reported because efficiency was overall better > than with 10 cores > - the figures I give are averaged over a representative sample of those jobs > (about 600 jobs) ranging from tens to thousands splits in the data > partitioning and between 400 to 9000 seconds of wall clock time. > - executor idle timeout is set to 30s; > > Definition: > - let's say an executor has spark.executor.cores / spark.task.cpus taskSlots, > which represent the max number of tasks an executor will process in parallel. > - the current behaviour of the dynamic allocation is to allocate enough > containers to have one taskSlot per task, which minimizes latency, but wastes > resources when tasks are small regarding executor allocation and idling > overhead. > The results using the proposal (described below) over the job sample (600 > jobs): > - by using 2 tasks per taskSlot, we get a 5% (against -114%) reduction in > resource usage, for a 37% (against 43%) reduction in wall clock time for > Spark w.r.t MR > - by trying to minimize the average resource consumption, I ended up with 6 > tasks per core, with a 30% resource usage reduction, for a similar wall clock > time w.r.t. MR > What did I try to solve the issue with existing parameters (summing up a few > points mentioned in the comments) ? > - change dynamicAllocation.maxExecutors: this would need to be adapted for > each job (tens to thousands splits can occur), and essentially remove the > interest of using the dynamic allocation. > - use dynamicAllocation.backlogTimeout: > - setting this parameter right to avoid creating unused executors is very > dependant on wall clock time. One basically needs to solve the exponential > ramp up for the target time. So this is not an option for my use case where I > don't want a per-job tuning. > - I've still done a series of experiments, details in the comments. > Result is that after manual tuning, the best I could get was a similar > resource consumption at the expense of 20% more wall clock time, or a similar > wall clock time at the expense of 60% more resource consumption than what I > got using my proposal @ 6 tasks per slot (this value being optimized over a > much larger range of jobs as already stated) > - as mentioned in another comment, tampering with the exponential ramp up > might yield task imbalance and such old executors could become contention > points for other exes trying to remotely access blocks in the old exes (not > witnessed in the jobs I'm talking about, but we did see this behavior in > other jobs) > Proposal: > Simply add a tasksPerExecutorSlot parameter, which makes it possible to > specify how many tasks a single taskSlot should ideally execute to mitigate > the overhead of executor allocation. > PR: https://github.com/apache/spark/pull/19881 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (SPARK-22683) DynamicAllocation wastes resources by allocating containers that will barely be used
[ https://issues.apache.org/jira/browse/SPARK-22683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16355895#comment-16355895 ] Mark Hamstra commented on SPARK-22683: -- A concern that I have is that the discussion seems to be very focused on Spark "jobs" that are actually Spark Applications that run only a single Job – i.e. a workload where Applications and Jobs are submitted 1:1. Tuning for just this kind of workload has the potential to negatively impact other Spark usage patterns where several Jobs run in a single Application or the common Job-server pattern where a single, long-running Application serves many Jobs over an extended time. I'm not saying that the proposals made here and in the associated PR will necessarily affect those other usage patterns negatively. What I am saying is that there must be strong demonstration that those patterns will not be negatively affected before I will be comfortable approving the requested changes. > DynamicAllocation wastes resources by allocating containers that will barely > be used > > > Key: SPARK-22683 > URL: https://issues.apache.org/jira/browse/SPARK-22683 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.1.0, 2.2.0 >Reporter: Julien Cuquemelle >Priority: Major > Labels: pull-request-available > > While migrating a series of jobs from MR to Spark using dynamicAllocation, > I've noticed almost a doubling (+114% exactly) of resource consumption of > Spark w.r.t MR, for a wall clock time gain of 43% > About the context: > - resource usage stands for vcore-hours allocation for the whole job, as seen > by YARN > - I'm talking about a series of jobs because we provide our users with a way > to define experiments (via UI / DSL) that automatically get translated to > Spark / MR jobs and submitted on the cluster > - we submit around 500 of such jobs each day > - these jobs are usually one shot, and the amount of processing can vary a > lot between jobs, and as such finding an efficient number of executors for > each job is difficult to get right, which is the reason I took the path of > dynamic allocation. > - Some of the tests have been scheduled on an idle queue, some on a full > queue. > - experiments have been conducted with spark.executor-cores = 5 and 10, only > results for 5 cores have been reported because efficiency was overall better > than with 10 cores > - the figures I give are averaged over a representative sample of those jobs > (about 600 jobs) ranging from tens to thousands splits in the data > partitioning and between 400 to 9000 seconds of wall clock time. > - executor idle timeout is set to 30s; > > Definition: > - let's say an executor has spark.executor.cores / spark.task.cpus taskSlots, > which represent the max number of tasks an executor will process in parallel. > - the current behaviour of the dynamic allocation is to allocate enough > containers to have one taskSlot per task, which minimizes latency, but wastes > resources when tasks are small regarding executor allocation and idling > overhead. > The results using the proposal (described below) over the job sample (600 > jobs): > - by using 2 tasks per taskSlot, we get a 5% (against -114%) reduction in > resource usage, for a 37% (against 43%) reduction in wall clock time for > Spark w.r.t MR > - by trying to minimize the average resource consumption, I ended up with 6 > tasks per core, with a 30% resource usage reduction, for a similar wall clock > time w.r.t. MR > What did I try to solve the issue with existing parameters (summing up a few > points mentioned in the comments) ? > - change dynamicAllocation.maxExecutors: this would need to be adapted for > each job (tens to thousands splits can occur), and essentially remove the > interest of using the dynamic allocation. > - use dynamicAllocation.backlogTimeout: > - setting this parameter right to avoid creating unused executors is very > dependant on wall clock time. One basically needs to solve the exponential > ramp up for the target time. So this is not an option for my use case where I > don't want a per-job tuning. > - I've still done a series of experiments, details in the comments. > Result is that after manual tuning, the best I could get was a similar > resource consumption at the expense of 20% more wall clock time, or a similar > wall clock time at the expense of 60% more resource consumption than what I > got using my proposal @ 6 tasks per slot (this value being optimized over a > much larger range of jobs as already stated) > - as mentioned in another comment, tampering with the exponential ramp up > might yield task imbalance and such old executors could become
[jira] [Commented] (SPARK-21619) Fail the execution of canonicalized plans explicitly
[ https://issues.apache.org/jira/browse/SPARK-21619?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16113605#comment-16113605 ] Mark Hamstra commented on SPARK-21619: -- But part of the point of the split in my half-baked example is to fork the query execution pipeline before physical plan generation, allowing the cost of that generation to be parallelized with an instance per execution engine. Yes, maybe doing dispatch of physical plans via the CBO or other means is all that I should realistically hope for, but it doesn't mean that it isn't worth thinking about alternatives. > Fail the execution of canonicalized plans explicitly > > > Key: SPARK-21619 > URL: https://issues.apache.org/jira/browse/SPARK-21619 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.2.0 >Reporter: Reynold Xin >Assignee: Reynold Xin > > Canonicalized plans are not supposed to be executed. I ran into a case in > which there's some code that accidentally calls execute on a canonicalized > plan. This patch throws a more explicit exception when that happens. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21619) Fail the execution of canonicalized plans explicitly
[ https://issues.apache.org/jira/browse/SPARK-21619?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16113586#comment-16113586 ] Mark Hamstra commented on SPARK-21619: -- Or you can just enlighten me on how one should design a dispatch function for multiple expressions of semantically equivalent query plans under the current architecture. :) Dispatching based on a canonical form of a plan seems like an obvious solution to me, but maybe I'm missing something. > Fail the execution of canonicalized plans explicitly > > > Key: SPARK-21619 > URL: https://issues.apache.org/jira/browse/SPARK-21619 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.2.0 >Reporter: Reynold Xin >Assignee: Reynold Xin > > Canonicalized plans are not supposed to be executed. I ran into a case in > which there's some code that accidentally calls execute on a canonicalized > plan. This patch throws a more explicit exception when that happens. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-21619) Fail the execution of canonicalized plans explicitly
[ https://issues.apache.org/jira/browse/SPARK-21619?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16113555#comment-16113555 ] Mark Hamstra edited comment on SPARK-21619 at 8/3/17 10:01 PM: --- Yes, I absolutely understand that this issue and PR are meant to address an immediate need, and that a deeper redesign would be one or likely more separate issues. I'm more trying to raise awareness or improve my understanding than to delay or block progress on addressing the immediate need. I do have concerns, though, that making canonical plans unexecutable just because they are in canonical form does make certain evolutions of Spark more difficult. As one half-baked example, you could want to decouple query plans from a single execution engine, so that certain kinds of logical plans could be sent toward execution on one engine (or cluster configuration) while other plans could be directed to a separate engine (presumably more suitable to those plans in some way.) Splitting and forking Spark's query execution pipeline in that kind of way isn't really that difficult (I've done it in at least a proof-of-concept), and has some perhaps significant potential benefits. To do that, though, you'd really like to have a single, canonical form for any semantically equivalent queries by the time they reach your dispatch function for determining the destination execution engine for a query (and where results will be cached locally, etc.) Making the canonical form unexecutable throws a wrench into that. was (Author: markhamstra): Yes, I absolutely understand that this issue and PR are meant to address an immediate need, and that a deeper redesign would be one or likely more separate issues. I more trying to raise awareness or improve my understanding than to delay or block progress on addressing the immediate need. I do have concerns, though, that making canonical plans unexecutable just because they are in canonical form does make certain evolutions of Spark more difficult. As one half-baked example, you could want to decouple query plans from a single execution engine, so that certain kinds of logical plans could be sent toward execution on one engine (or cluster configuration) while other plans could be directed to a separate engine (presumably more suitable to those plans in some way.) Splitting and forking Spark's query execution pipeline in that kind of way isn't really that difficult (I've done it in at least a proof-of-concept), and has some perhaps significant potential benefits. To do that, though, you'd really like to have a single, canonical form for any semantically equivalent queries by the time they reach your dispatch function for determining the destination execution engine for a query (and where results will be cached locally, etc.) Making the canonical form unexecutable throws a wrench into that. > Fail the execution of canonicalized plans explicitly > > > Key: SPARK-21619 > URL: https://issues.apache.org/jira/browse/SPARK-21619 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.2.0 >Reporter: Reynold Xin >Assignee: Reynold Xin > > Canonicalized plans are not supposed to be executed. I ran into a case in > which there's some code that accidentally calls execute on a canonicalized > plan. This patch throws a more explicit exception when that happens. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21619) Fail the execution of canonicalized plans explicitly
[ https://issues.apache.org/jira/browse/SPARK-21619?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16113571#comment-16113571 ] Mark Hamstra commented on SPARK-21619: -- _"Why would you want to execute multiple semantically equivalent plans in different forms?" -> Because they can be executed in different times, using different aliases, etc?_ Right, so for separate executions of semantically equivalent plans you need to maintain a mapping between the aliases of a particular plan and their canonical form, but after doing that you can more easily recover data and metadata associated with a prior execution of an equivalent plan. > Fail the execution of canonicalized plans explicitly > > > Key: SPARK-21619 > URL: https://issues.apache.org/jira/browse/SPARK-21619 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.2.0 >Reporter: Reynold Xin >Assignee: Reynold Xin > > Canonicalized plans are not supposed to be executed. I ran into a case in > which there's some code that accidentally calls execute on a canonicalized > plan. This patch throws a more explicit exception when that happens. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21619) Fail the execution of canonicalized plans explicitly
[ https://issues.apache.org/jira/browse/SPARK-21619?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16113555#comment-16113555 ] Mark Hamstra commented on SPARK-21619: -- Yes, I absolutely understand that this issue and PR are meant to address an immediate need, and that a deeper redesign would be one or likely more separate issues. I more trying to raise awareness or improve my understanding than to delay or block progress on addressing the immediate need. I do have concerns, though, that making canonical plans unexecutable just because they are in canonical form does make certain evolutions of Spark more difficult. As one half-baked example, you could want to decouple query plans from a single execution engine, so that certain kinds of logical plans could be sent toward execution on one engine (or cluster configuration) while other plans could be directed to a separate engine (presumably more suitable to those plans in some way.) Splitting and forking Spark's query execution pipeline in that kind of way isn't really that difficult (I've done it in at least a proof-of-concept), and has some perhaps significant potential benefits. To do that, though, you'd really like to have a single, canonical form for any semantically equivalent queries by the time they reach your dispatch function for determining the destination execution engine for a query (and where results will be cached locally, etc.) Making the canonical form unexecutable throws a wrench into that. > Fail the execution of canonicalized plans explicitly > > > Key: SPARK-21619 > URL: https://issues.apache.org/jira/browse/SPARK-21619 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.2.0 >Reporter: Reynold Xin >Assignee: Reynold Xin > > Canonicalized plans are not supposed to be executed. I ran into a case in > which there's some code that accidentally calls execute on a canonicalized > plan. This patch throws a more explicit exception when that happens. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21619) Fail the execution of canonicalized plans explicitly
[ https://issues.apache.org/jira/browse/SPARK-21619?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16113526#comment-16113526 ] Mark Hamstra commented on SPARK-21619: -- Two reason, mostly: 1) To provide better guarantees that plans that are deemed to be semantically equivalent actually end up being expressed the same way before execution and thus go down the same code paths; 2) To simplify some downstream logic; so instead of needing to maintain a mapping between multiple, semantically equivalent plans and a single canonical form, after a certain canonicalization point the plans really are the same. To perhaps clear up my confusion, maybe you can answer the question going the other way: Why would you want to execute multiple semantically equivalent plans in different forms? > Fail the execution of canonicalized plans explicitly > > > Key: SPARK-21619 > URL: https://issues.apache.org/jira/browse/SPARK-21619 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.2.0 >Reporter: Reynold Xin >Assignee: Reynold Xin > > Canonicalized plans are not supposed to be executed. I ran into a case in > which there's some code that accidentally calls execute on a canonicalized > plan. This patch throws a more explicit exception when that happens. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21619) Fail the execution of canonicalized plans explicitly
[ https://issues.apache.org/jira/browse/SPARK-21619?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16113510#comment-16113510 ] Mark Hamstra commented on SPARK-21619: -- Ok, but my point is that if plans are to be canonicalized for some reasons, maybe they should also be canonicalized before execution. It seems odd both to execute plans that are not in a canonical form and to not be able to execute plans that are in a canonical form. That view makes failing the execution of canonical plans look more like a workaround/hack (maybe needed in the short term) than a solution to a deeper issue. > Fail the execution of canonicalized plans explicitly > > > Key: SPARK-21619 > URL: https://issues.apache.org/jira/browse/SPARK-21619 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.2.0 >Reporter: Reynold Xin >Assignee: Reynold Xin > > Canonicalized plans are not supposed to be executed. I ran into a case in > which there's some code that accidentally calls execute on a canonicalized > plan. This patch throws a more explicit exception when that happens. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21619) Fail the execution of canonicalized plans explicitly
[ https://issues.apache.org/jira/browse/SPARK-21619?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16113494#comment-16113494 ] Mark Hamstra commented on SPARK-21619: -- Can you provide a little more context, Reynold, since on its face it would seem that if plans are to be blocked from executing based on their form, then non-canonical plans would be the ones that should be blocked. > Fail the execution of canonicalized plans explicitly > > > Key: SPARK-21619 > URL: https://issues.apache.org/jira/browse/SPARK-21619 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.2.0 >Reporter: Reynold Xin >Assignee: Reynold Xin > > Canonicalized plans are not supposed to be executed. I ran into a case in > which there's some code that accidentally calls execute on a canonicalized > plan. This patch throws a more explicit exception when that happens. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19276) FetchFailures can be hidden by user (or sql) exception handling
[ https://issues.apache.org/jira/browse/SPARK-19276?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15830519#comment-15830519 ] Mark Hamstra commented on SPARK-19276: -- Ok, I haven't read your PR closely yet, so I missed that. This question looks like something that could use more eyes and insights. [~kayousterhout][~matei][~r...@databricks.com] > FetchFailures can be hidden by user (or sql) exception handling > --- > > Key: SPARK-19276 > URL: https://issues.apache.org/jira/browse/SPARK-19276 > Project: Spark > Issue Type: Bug > Components: Scheduler, Spark Core, SQL >Affects Versions: 2.1.0 >Reporter: Imran Rashid >Priority: Critical > > The scheduler handles node failures by looking for a special > {{FetchFailedException}} thrown by the shuffle block fetcher. This is > handled in {{Executor}} and then passed as a special msg back to the driver: > https://github.com/apache/spark/blob/278fa1eb305220a85c816c948932d6af8fa619aa/core/src/main/scala/org/apache/spark/executor/Executor.scala#L403 > However, user code exists in between the shuffle block fetcher and that catch > block -- it could intercept the exception, wrap it with something else, and > throw a different exception. If that happens, spark treats it as an ordinary > task failure, and retries the task, rather than regenerating the missing > shuffle data. The task eventually is retried 4 times, its doomed to fail > each time, and the job is failed. > You might think that no user code should do that -- but even sparksql does it: > https://github.com/apache/spark/blob/278fa1eb305220a85c816c948932d6af8fa619aa/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala#L214 > Here's an example stack trace. This is from Spark 1.6, so the sql code is > not the same, but the problem is still there: > {noformat} > 17/01/13 19:18:02 WARN scheduler.TaskSetManager: Lost task 0.0 in stage > 1983.0 (TID 304851, xxx): org.apache.spark.SparkException: Task failed while > writing rows. > at > org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:414) > at > org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150) > at > org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) > at org.apache.spark.scheduler.Task.run(Task.scala:89) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: org.apache.spark.shuffle.FetchFailedException: Failed to connect > to xxx/yyy:zzz > at > org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:323) > ... > 17/01/13 19:19:29 ERROR scheduler.TaskSetManager: Task 0 in stage 1983.0 > failed 4 times; aborting job > {noformat} > I think the right fix here is to also set a fetch failure status in the > {{TaskContextImpl}}, so the executor can check that instead of just one > exception. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19276) FetchFailures can be hidden by user (or sql) exception handling
[ https://issues.apache.org/jira/browse/SPARK-19276?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15830387#comment-15830387 ] Mark Hamstra commented on SPARK-19276: -- This all makes sense, and the PR is a good effort to fix this kind of "accidental" swallowing of FetchFailedException. I guess my only real question is if we should allow for the possibility of a FetchFailedException not only being caught, but also the failure being remedied by some means other than the usual handling in the driver. I'm not sure exactly how or why that kind of "fix the fetch failure before Spark tries to handle it" would be done; and it would seem that something like that would be prone to subtle errors, so maybe we should just set it in stone that nobody but the driver should try to fix a fetch failure -- which would make the approach of your "guarantee that the FetchFailedException is seen by the driver" PR completely correct. > FetchFailures can be hidden by user (or sql) exception handling > --- > > Key: SPARK-19276 > URL: https://issues.apache.org/jira/browse/SPARK-19276 > Project: Spark > Issue Type: Bug > Components: Scheduler, Spark Core, SQL >Affects Versions: 2.1.0 >Reporter: Imran Rashid >Priority: Critical > > The scheduler handles node failures by looking for a special > {{FetchFailedException}} thrown by the shuffle block fetcher. This is > handled in {{Executor}} and then passed as a special msg back to the driver: > https://github.com/apache/spark/blob/278fa1eb305220a85c816c948932d6af8fa619aa/core/src/main/scala/org/apache/spark/executor/Executor.scala#L403 > However, user code exists in between the shuffle block fetcher and that catch > block -- it could intercept the exception, wrap it with something else, and > throw a different exception. If that happens, spark treats it as an ordinary > task failure, and retries the task, rather than regenerating the missing > shuffle data. The task eventually is retried 4 times, its doomed to fail > each time, and the job is failed. > You might think that no user code should do that -- but even sparksql does it: > https://github.com/apache/spark/blob/278fa1eb305220a85c816c948932d6af8fa619aa/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala#L214 > Here's an example stack trace. This is from Spark 1.6, so the sql code is > not the same, but the problem is still there: > {noformat} > 17/01/13 19:18:02 WARN scheduler.TaskSetManager: Lost task 0.0 in stage > 1983.0 (TID 304851, xxx): org.apache.spark.SparkException: Task failed while > writing rows. > at > org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:414) > at > org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150) > at > org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) > at org.apache.spark.scheduler.Task.run(Task.scala:89) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: org.apache.spark.shuffle.FetchFailedException: Failed to connect > to xxx/yyy:zzz > at > org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:323) > ... > 17/01/13 19:19:29 ERROR scheduler.TaskSetManager: Task 0 in stage 1983.0 > failed 4 times; aborting job > {noformat} > I think the right fix here is to also set a fetch failure status in the > {{TaskContextImpl}}, so the executor can check that instead of just one > exception. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18886) Delay scheduling should not delay some executors indefinitely if one task is scheduled before delay timeout
[ https://issues.apache.org/jira/browse/SPARK-18886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15752098#comment-15752098 ] Mark Hamstra commented on SPARK-18886: -- That's a great explanation of the issue, and nice example code, [~imranr]. I'm sure that I have seen this kind of excessive Task stickiness with many quick-to-execute Tasks, but I never got to the level of diagnosing the problem that you have. Your listed workarounds, while interesting, aren't a complete long-term solution, of course. Have you thought at all yet about possible paths to a solution? One idea that comes to my mind is that speculative execution has at least the potential to get the delayed Tasks executed more quickly elsewhere -- but our prior concerns or lack of confidence with speculative execution remain. > Delay scheduling should not delay some executors indefinitely if one task is > scheduled before delay timeout > --- > > Key: SPARK-18886 > URL: https://issues.apache.org/jira/browse/SPARK-18886 > Project: Spark > Issue Type: Bug > Components: Scheduler >Affects Versions: 2.1.0 >Reporter: Imran Rashid > > Delay scheduling can introduce an unbounded delay and underutilization of > cluster resources under the following circumstances: > 1. Tasks have locality preferences for a subset of available resources > 2. Tasks finish in less time than the delay scheduling. > Instead of having *one* delay to wait for resources with better locality, > spark waits indefinitely. > As an example, consider a cluster with 100 executors, and a taskset with 500 > tasks. Say all tasks have a preference for one executor, which is by itself > on one host. Given the default locality wait of 3s per level, we end up with > a 6s delay till we schedule on other hosts (process wait + host wait). > If each task takes 5 seconds (under the 6 second delay), then _all 500_ tasks > get scheduled on _only one_ executor. This means you're only using a 1% of > your cluster, and you get a ~100x slowdown. You'd actually be better off if > tasks took 7 seconds. > *WORKAROUNDS*: > (1) You can change the locality wait times so that it is shorter than the > task execution time. You need to take into account the sum of all wait times > to use all the resources on your cluster. For example, if you have resources > on different racks, this will include the sum of > "spark.locality.wait.process" + "spark.locality.wait.node" + > "spark.locality.wait.rack". Those each default to "3s". The simplest way to > be to set "spark.locality.process" to your desired wait interval, and set > both "spark.locality.wait.node" and "spark.locality.wait.rack" to "0". For > example, if your tasks take ~3 seconds on average, you might set > "spark.locality.wait.process" to "1s". > Note that this workaround isn't perfect --with less delay scheduling, you may > not get as good resource locality. After this issue is fixed, you'd most > likely want to undo these configuration changes. > (2) The worst case here will only happen if your tasks have extreme skew in > their locality preferences. Users may be able to modify their job to > controlling the distribution of the original input data. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17064) Reconsider spark.job.interruptOnCancel
[ https://issues.apache.org/jira/browse/SPARK-17064?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15729602#comment-15729602 ] Mark Hamstra commented on SPARK-17064: -- Related JIRA and PR: https://issues.apache.org/jira/browse/SPARK-18761 https://github.com/apache/spark/pull/16189 > Reconsider spark.job.interruptOnCancel > -- > > Key: SPARK-17064 > URL: https://issues.apache.org/jira/browse/SPARK-17064 > Project: Spark > Issue Type: Improvement > Components: Scheduler, Spark Core >Reporter: Mark Hamstra > > There is a frequent need or desire in Spark to cancel already running Tasks. > This has been recognized for a very long time (see, e.g., the ancient TODO > comment in the DAGScheduler: "Cancel running tasks in the stage"), but we've > never had more than an incomplete solution. Killing running Tasks at the > Executor level has been implemented by interrupting the threads running the > Tasks (taskThread.interrupt in o.a.s.scheduler.Task#kill.) Since > https://github.com/apache/spark/commit/432201c7ee9e1ea1d70a6418cbad1c5ad2653ed3 > addressing https://issues.apache.org/jira/browse/SPARK-1582, interrupting > Task threads in this way has only been possible if interruptThread is true, > and that typically comes from the setting of the interruptOnCancel property > in the JobGroup, which in turn typically comes from the setting of > spark.job.interruptOnCancel. Because of concerns over > https://issues.apache.org/jira/browse/HDFS-1208 and the possibility of nodes > being marked dead when a Task thread is interrupted, the default value of the > boolean has been "false" -- i.e. by default we do not interrupt Tasks already > running on Executor even when the Task has been canceled in the DAGScheduler, > or the Stage has been abort, or the Job has been killed, etc. > There are several issues resulting from this current state of affairs, and > they each probably need to spawn their own JIRA issue and PR once we decide > on an overall strategy here. Among those issues: > * Is HDFS-1208 still an issue, or has it been resolved adequately in the HDFS > versions that Spark now supports so that we can set the default value of > spark.job.interruptOnCancel to "true" or eliminate this boolean flag entirely? > * Even if interrupting Task threads is no longer an issue for HDFS, is it > still enough of an issue for non-HDFS usage (e.g. Cassandra) so that we still > need protection similar to what the current default value of > spark.job.interruptOnCancel provides? > * If interrupting Task threads isn't safe enough, what should we do instead? > * Once we have a safe mechanism to stop and clean up after already executing > Tasks, there is still the question of whether we _should_ end executing > Tasks. While that is likely a good thing to do in cases where individual > Tasks are lightweight in terms of resource usage, at least in some cases not > all running Tasks should be ended: https://github.com/apache/spark/pull/12436 > That means that we probably need to continue to make allowing Task > interruption configurable at the Job or JobGroup level (and we need better > documentation explaining how and when to allow interruption or not.) > * There is one place in the current code > (TaskSetManager#handleSuccessfulTask) that hard codes interruptThread to > "true". This should be fixed, and similar misuses of killTask be denied in > pull requests until this issue is adequately resolved. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17064) Reconsider spark.job.interruptOnCancel
[ https://issues.apache.org/jira/browse/SPARK-17064?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15706502#comment-15706502 ] Mark Hamstra commented on SPARK-17064: -- Ignore above PR comment -- wrong JIRA ticket at first > Reconsider spark.job.interruptOnCancel > -- > > Key: SPARK-17064 > URL: https://issues.apache.org/jira/browse/SPARK-17064 > Project: Spark > Issue Type: Improvement > Components: Scheduler, Spark Core >Reporter: Mark Hamstra > > There is a frequent need or desire in Spark to cancel already running Tasks. > This has been recognized for a very long time (see, e.g., the ancient TODO > comment in the DAGScheduler: "Cancel running tasks in the stage"), but we've > never had more than an incomplete solution. Killing running Tasks at the > Executor level has been implemented by interrupting the threads running the > Tasks (taskThread.interrupt in o.a.s.scheduler.Task#kill.) Since > https://github.com/apache/spark/commit/432201c7ee9e1ea1d70a6418cbad1c5ad2653ed3 > addressing https://issues.apache.org/jira/browse/SPARK-1582, interrupting > Task threads in this way has only been possible if interruptThread is true, > and that typically comes from the setting of the interruptOnCancel property > in the JobGroup, which in turn typically comes from the setting of > spark.job.interruptOnCancel. Because of concerns over > https://issues.apache.org/jira/browse/HDFS-1208 and the possibility of nodes > being marked dead when a Task thread is interrupted, the default value of the > boolean has been "false" -- i.e. by default we do not interrupt Tasks already > running on Executor even when the Task has been canceled in the DAGScheduler, > or the Stage has been abort, or the Job has been killed, etc. > There are several issues resulting from this current state of affairs, and > they each probably need to spawn their own JIRA issue and PR once we decide > on an overall strategy here. Among those issues: > * Is HDFS-1208 still an issue, or has it been resolved adequately in the HDFS > versions that Spark now supports so that we can set the default value of > spark.job.interruptOnCancel to "true" or eliminate this boolean flag entirely? > * Even if interrupting Task threads is no longer an issue for HDFS, is it > still enough of an issue for non-HDFS usage (e.g. Cassandra) so that we still > need protection similar to what the current default value of > spark.job.interruptOnCancel provides? > * If interrupting Task threads isn't safe enough, what should we do instead? > * Once we have a safe mechanism to stop and clean up after already executing > Tasks, there is still the question of whether we _should_ end executing > Tasks. While that is likely a good thing to do in cases where individual > Tasks are lightweight in terms of resource usage, at least in some cases not > all running Tasks should be ended: https://github.com/apache/spark/pull/12436 > That means that we probably need to continue to make allowing Task > interruption configurable at the Job or JobGroup level (and we need better > documentation explaining how and when to allow interruption or not.) > * There is one place in the current code > (TaskSetManager#handleSuccessfulTask) that hard codes interruptThread to > "true". This should be fixed, and similar misuses of killTask be denied in > pull requests until this issue is adequately resolved. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-18631) Avoid making data skew worse in ExchangeCoordinator
Mark Hamstra created SPARK-18631: Summary: Avoid making data skew worse in ExchangeCoordinator Key: SPARK-18631 URL: https://issues.apache.org/jira/browse/SPARK-18631 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.0.2, 1.6.3, 2.1.0 Reporter: Mark Hamstra Assignee: Mark Hamstra The logic to resize partitions in the ExchangeCoordinator is to not start a new partition until the targetPostShuffleInputSize is equalled or exceeded. This can make data skew problems worse since a number of small partitions can first be combined as long as the combined size remains smaller than the targetPostShuffleInputSize, and then a large, data-skewed partition can be further combined, making it even bigger than it already was. It's a fairly simple to change the logic to create a new partition if adding a new piece would exceed the targetPostShuffleInputSize instead of only creating a new partition after the targetPostShuffleInputSize has already been exceeded. This results in a few more partitions being created by the ExchangeCoordinator, but data skew problems are at least not made worse even though they are not made any better. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17911) Scheduler does not need messageScheduler for ResubmitFailedStages
[ https://issues.apache.org/jira/browse/SPARK-17911?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15583334#comment-15583334 ] Mark Hamstra commented on SPARK-17911: -- I think we're pretty much on the same page when it comes to the net effects of just eliminating the RESUBMIT_TIMEOUT delay. I need to find some time to think about what something better than the current delayed-resubmit-event approach would look like. > Scheduler does not need messageScheduler for ResubmitFailedStages > - > > Key: SPARK-17911 > URL: https://issues.apache.org/jira/browse/SPARK-17911 > Project: Spark > Issue Type: Improvement > Components: Scheduler >Affects Versions: 2.0.0 >Reporter: Imran Rashid > > Its not totally clear what the purpose of the {{messageScheduler}} is in > {{DAGScheduler}}. It can perhaps be eliminated completely; or perhaps we > should just clearly document its purpose. > This comes from a long discussion w/ [~markhamstra] on an unrelated PR here: > https://github.com/apache/spark/pull/15335/files/c80ad22a242255cac91cce2c7c537f9b21100f70#diff-6a9ff7fb74fd490a50462d45db2d5e11 > But its tricky so breaking it out here for archiving the discussion. > Note: this issue requires a decision on what to do before a code change, so > lets just discuss it on jira first. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-17769) Some FetchFailure refactoring in the DAGScheduler
Mark Hamstra created SPARK-17769: Summary: Some FetchFailure refactoring in the DAGScheduler Key: SPARK-17769 URL: https://issues.apache.org/jira/browse/SPARK-17769 Project: Spark Issue Type: Improvement Components: Scheduler Reporter: Mark Hamstra Assignee: Mark Hamstra Priority: Minor SPARK-17644 opened up a discussion about further refactoring of the DAGScheduler's handling of FetchFailure events. These include: * rewriting code and comments to improve readability * doing fetchFailedAttemptIds.add(stageAttemptId) even when disallowStageRetryForTest is true * issuing a ResubmitFailedStages event based on whether one is already enqueued for the current failed stage, not any prior failed stage * logging the resubmission of all failed stages -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-17529) On highly skewed data, outer join merges are slow
[ https://issues.apache.org/jira/browse/SPARK-17529?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mark Hamstra updated SPARK-17529: - Priority: Major (was: Trivial) > On highly skewed data, outer join merges are slow > - > > Key: SPARK-17529 > URL: https://issues.apache.org/jira/browse/SPARK-17529 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 1.6.2, 2.0.0 >Reporter: David C Navas > > All timings were taken from 1.6.2, but it appears that 2.0.0 suffers from the > same performance problem. > My co-worker Yewei Zhang was investigating a performance problem with a > highly skewed dataset. > "Part of this query performs a full outer join over [an ID] on highly skewed > data. On the left view, there is one record for id = 0 out of 2,272,486 > records; On the right view there are 8,353,097 records for id = 0 out of > 12,685,073 records" > The sub-query was taking 5.2 minutes. We discovered that snappy was > responsible for some measure of this problem and incorporated the new snappy > release. This brought the sub-query down to 2.4 minutes. A large percentage > of the remaining time was spent in the merge code which I tracked down to a > BitSet clearing issue. We have noted that you already have the snappy fix, > this issue describes the problem with the BitSet. > The BitSet grows to handle the largest matching set of keys and is used to > track joins. The BitSet is re-used in all subsequent joins (unless it is too > small) > The skewing of our data caused a very large BitSet to be allocated on the > very first row joined. Unfortunately, the entire BitSet is cleared on each > re-use. For each of the remaining rows which likely match only a few rows on > the other side, the entire 1MB of the BitSet is cleared. If unpartitioned, > this would happen roughly 6 million times. The fix I developed for this is > to clear only the portion of the BitSet that is needed. After applying it, > the sub-query dropped from 2.4 minutes to 29 seconds. > Small (0 or negative) IDs are often used as place-holders for null, empty, or > unknown data, so I expect this fix to be generally useful, if rarely > encountered to this particular degree. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-17064) Reconsider spark.job.interruptOnCancel
[ https://issues.apache.org/jira/browse/SPARK-17064?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mark Hamstra updated SPARK-17064: - Description: There is a frequent need or desire in Spark to cancel already running Tasks. This has been recognized for a very long time (see, e.g., the ancient TODO comment in the DAGScheduler: "Cancel running tasks in the stage"), but we've never had more than an incomplete solution. Killing running Tasks at the Executor level has been implemented by interrupting the threads running the Tasks (taskThread.interrupt in o.a.s.scheduler.Task#kill.) Since https://github.com/apache/spark/commit/432201c7ee9e1ea1d70a6418cbad1c5ad2653ed3 addressing https://issues.apache.org/jira/browse/SPARK-1582, interrupting Task threads in this way has only been possible if interruptThread is true, and that typically comes from the setting of the interruptOnCancel property in the JobGroup, which in turn typically comes from the setting of spark.job.interruptOnCancel. Because of concerns over https://issues.apache.org/jira/browse/HDFS-1208 and the possibility of nodes being marked dead when a Task thread is interrupted, the default value of the boolean has been "false" -- i.e. by default we do not interrupt Tasks already running on Executor even when the Task has been canceled in the DAGScheduler, or the Stage has been abort, or the Job has been killed, etc. There are several issues resulting from this current state of affairs, and they each probably need to spawn their own JIRA issue and PR once we decide on an overall strategy here. Among those issues: * Is HDFS-1208 still an issue, or has it been resolved adequately in the HDFS versions that Spark now supports so that we can set the default value of spark.job.interruptOnCancel to "true" or eliminate this boolean flag entirely? * Even if interrupting Task threads is no longer an issue for HDFS, is it still enough of an issue for non-HDFS usage (e.g. Cassandra) so that we still need protection similar to what the current default value of spark.job.interruptOnCancel provides? * If interrupting Task threads isn't safe enough, what should we do instead? * Once we have a safe mechanism to stop and clean up after already executing Tasks, there is still the question of whether we _should_ end executing Tasks. While that is likely a good thing to do in cases where individual Tasks are lightweight in terms of resource usage, at least in some cases not all running Tasks should be ended: https://github.com/apache/spark/pull/12436 That means that we probably need to continue to make allowing Task interruption configurable at the Job or JobGroup level (and we need better documentation explaining how and when to allow interruption or not.) * There is one place in the current code (TaskSetManager#handleSuccessfulTask) that hard codes interruptThread to "true". This should be fixed, and similar misuses of killTask be denied in pull requests until this issue is adequately resolved. was: There is a frequent need or desire in Spark to cancel already running Tasks. This has been recognized for a very long time (see, e.g., the ancient TODO comment in the DAGScheduler: "Cancel running tasks in the stage"), but we've never had more than an incomplete solution. Killing running Tasks at the Executor level has been implemented by interrupting the threads running the Tasks (taskThread.interrupt in o.a.s.scheduler.Task#kill.) Since https://github.com/apache/spark/commit/432201c7ee9e1ea1d70a6418cbad1c5ad2653ed3 addressing https://issues.apache.org/jira/browse/SPARK-1582, interrupting Task threads in this way has only been possible if interruptThread is true, and that typically comes from the setting of the interruptOnCancel property in the JobGroup, which in turn typically comes from the setting of spark.job.interruptOnCancel. Because of concerns over https://issues.apache.org/jira/browse/HDFS-1208 and the possibility of nodes being marked dead when a Task thread is interrupted, the default value of the boolean has been "false" -- i.e. by default we do not interrupt Tasks already running on Executor even when the Task has been canceled in the DAGScheduler, or the Stage has been abort, or the Job has been killed, etc. There are several issues resulting from this current state of affairs, and they each probably need to spawn their own JIRA issue and PR once we decide on an overall strategy here. Among those issues: * Is HDFS-1208 still an issue, or has it been resolved adequately in the HDFS versions that Spark now supports so that we set the default value of spark.job.interruptOnCancel to "true" or eliminate this boolean flag entirely? * Even if interrupting Task threads is no longer an issue for HDFS, is it still enough of an issue for non-HDFS usage (e.g. Cassandra) so that we still need protection similar to what the current
[jira] [Commented] (SPARK-17064) Reconsider spark.job.interruptOnCancel
[ https://issues.apache.org/jira/browse/SPARK-17064?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15421503#comment-15421503 ] Mark Hamstra commented on SPARK-17064: -- [~kayousterhout] [~r...@databricks.com] [~imranr] > Reconsider spark.job.interruptOnCancel > -- > > Key: SPARK-17064 > URL: https://issues.apache.org/jira/browse/SPARK-17064 > Project: Spark > Issue Type: Improvement > Components: Scheduler, Spark Core >Reporter: Mark Hamstra > > There is a frequent need or desire in Spark to cancel already running Tasks. > This has been recognized for a very long time (see, e.g., the ancient TODO > comment in the DAGScheduler: "Cancel running tasks in the stage"), but we've > never had more than an incomplete solution. Killing running Tasks at the > Executor level has been implemented by interrupting the threads running the > Tasks (taskThread.interrupt in o.a.s.scheduler.Task#kill.) Since > https://github.com/apache/spark/commit/432201c7ee9e1ea1d70a6418cbad1c5ad2653ed3 > addressing https://issues.apache.org/jira/browse/SPARK-1582, interrupting > Task threads in this way has only been possible if interruptThread is true, > and that typically comes from the setting of the interruptOnCancel property > in the JobGroup, which in turn typically comes from the setting of > spark.job.interruptOnCancel. Because of concerns over > https://issues.apache.org/jira/browse/HDFS-1208 and the possibility of nodes > being marked dead when a Task thread is interrupted, the default value of the > boolean has been "false" -- i.e. by default we do not interrupt Tasks already > running on Executor even when the Task has been canceled in the DAGScheduler, > or the Stage has been abort, or the Job has been killed, etc. > There are several issues resulting from this current state of affairs, and > they each probably need to spawn their own JIRA issue and PR once we decide > on an overall strategy here. Among those issues: > * Is HDFS-1208 still an issue, or has it been resolved adequately in the HDFS > versions that Spark now supports so that we set the default value of > spark.job.interruptOnCancel to "true" or eliminate this boolean flag entirely? > * Even if interrupting Task threads is no longer an issue for HDFS, is it > still enough of an issue for non-HDFS usage (e.g. Cassandra) so that we still > need protection similar to what the current default value of > spark.job.interruptOnCancel provides? > * If interrupting Task threads isn't safe enough, what should we do instead? > * Once we have a safe mechanism to stop and clean up after already executing > Tasks, there is still the question of whether we _should_ end executing > Tasks. While that is likely a good thing to do in cases where individual > Tasks are lightweight in terms of resource usage, at least in some cases not > all running Tasks should be ended: https://github.com/apache/spark/pull/12436 > That means that we probably need to continue to make allowing Task > interruption configurable at the Job or JobGroup level (and we need better > documentation explaining how and when to allow interruption or not.) > * There is one place in the current code > (TaskSetManager#handleSuccessfulTask) that hard codes interruptThread to > "true". This should be fixed, and similar misuses of killTask be denied in > pull requests until this issue is adequately resolved. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-17064) Reconsider spark.job.interruptOnCancel
Mark Hamstra created SPARK-17064: Summary: Reconsider spark.job.interruptOnCancel Key: SPARK-17064 URL: https://issues.apache.org/jira/browse/SPARK-17064 Project: Spark Issue Type: Improvement Components: Scheduler, Spark Core Reporter: Mark Hamstra There is a frequent need or desire in Spark to cancel already running Tasks. This has been recognized for a very long time (see, e.g., the ancient TODO comment in the DAGScheduler: "Cancel running tasks in the stage"), but we've never had more than an incomplete solution. Killing running Tasks at the Executor level has been implemented by interrupting the threads running the Tasks (taskThread.interrupt in o.a.s.scheduler.Task#kill.) Since https://github.com/apache/spark/commit/432201c7ee9e1ea1d70a6418cbad1c5ad2653ed3 addressing https://issues.apache.org/jira/browse/SPARK-1582, interrupting Task threads in this way has only been possible if interruptThread is true, and that typically comes from the setting of the interruptOnCancel property in the JobGroup, which in turn typically comes from the setting of spark.job.interruptOnCancel. Because of concerns over https://issues.apache.org/jira/browse/HDFS-1208 and the possibility of nodes being marked dead when a Task thread is interrupted, the default value of the boolean has been "false" -- i.e. by default we do not interrupt Tasks already running on Executor even when the Task has been canceled in the DAGScheduler, or the Stage has been abort, or the Job has been killed, etc. There are several issues resulting from this current state of affairs, and they each probably need to spawn their own JIRA issue and PR once we decide on an overall strategy here. Among those issues: * Is HDFS-1208 still an issue, or has it been resolved adequately in the HDFS versions that Spark now supports so that we set the default value of spark.job.interruptOnCancel to "true" or eliminate this boolean flag entirely? * Even if interrupting Task threads is no longer an issue for HDFS, is it still enough of an issue for non-HDFS usage (e.g. Cassandra) so that we still need protection similar to what the current default value of spark.job.interruptOnCancel provides? * If interrupting Task threads isn't safe enough, what should we do instead? * Once we have a safe mechanism to stop and clean up after already executing Tasks, there is still the question of whether we _should_ end executing Tasks. While that is likely a good thing to do in cases where individual Tasks are lightweight in terms of resource usage, at least in some cases not all running Tasks should be ended: https://github.com/apache/spark/pull/12436 That means that we probably need to continue to make allowing Task interruption configurable at the Job or JobGroup level (and we need better documentation explaining how and when to allow interruption or not.) * There is one place in the current code (TaskSetManager#handleSuccessfulTask) that hard codes interruptThread to "true". This should be fixed, and similar misuses of killTask be denied in pull requests until this issue is adequately resolved. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-16693) Remove R deprecated methods
[ https://issues.apache.org/jira/browse/SPARK-16693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15392517#comment-15392517 ] Mark Hamstra commented on SPARK-16693: -- As much as makes sense and is possible, we should also strive to coordinate removal of stuff deprecated in 2.0.0 of SparkR with removal of stuff deprecated elsewhere in Spark 2.0.0. It's just easier for all parties concerned to be able to say that all the things were removed as of release x.y.z than to have Spark developers and users need to keep track that SparkR removed stuff in release x.y.a, Spark SQL removed stuff in x.y.b, etc. > Remove R deprecated methods > --- > > Key: SPARK-16693 > URL: https://issues.apache.org/jira/browse/SPARK-16693 > Project: Spark > Issue Type: Bug > Components: SparkR >Affects Versions: 2.0.0 >Reporter: Felix Cheung > > For methods deprecated in Spark 2.0.0, we should remove them in 2.1.0 -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-16693) Remove R deprecated methods
[ https://issues.apache.org/jira/browse/SPARK-16693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15390865#comment-15390865 ] Mark Hamstra commented on SPARK-16693: -- The 2.1.0 release is the very earliest that we can allow removal of things first deprecated in 2.0.0. That is different from saying that we *should* remove those things in 2.1.0. The decision as to exactly when we should and will remove things first deprecated in 2.0.0 needs more discussion. > Remove R deprecated methods > --- > > Key: SPARK-16693 > URL: https://issues.apache.org/jira/browse/SPARK-16693 > Project: Spark > Issue Type: Bug > Components: SparkR >Affects Versions: 2.0.0 >Reporter: Felix Cheung > > For methods deprecated in Spark 2.0.0, we should remove them in 2.1.0 -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-11153) Turns off Parquet filter push-down for string and binary columns
[ https://issues.apache.org/jira/browse/SPARK-11153?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15311252#comment-15311252 ] Mark Hamstra commented on SPARK-11153: -- If I am not mistaken, Parquet 1.8.1 and filter push-down for string and binary remain in the master branch; the reversion only affects branch 2.0. There is an additional bug with filter push-down for timestamps that Ian has tracked down, and that should now be addressable in master. > Turns off Parquet filter push-down for string and binary columns > > > Key: SPARK-11153 > URL: https://issues.apache.org/jira/browse/SPARK-11153 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.5.0, 1.5.1 >Reporter: Cheng Lian >Assignee: Cheng Lian >Priority: Blocker > Fix For: 1.5.2, 1.6.0 > > > Due to PARQUET-251, {{BINARY}} columns in existing Parquet files may be > written with corrupted statistics information. This information is used by > filter push-down optimization. Since Spark 1.5 turns on Parquet filter > push-down by default, we may end up with wrong query results. PARQUET-251 has > been fixed in parquet-mr 1.8.1, but Spark 1.5 is still using 1.7.0. > Note that this kind of corrupted Parquet files could be produced by any > Parquet data models. > This affects all Spark SQL data types that can be mapped to Parquet > {{BINARY}}, namely: > - {{StringType}} > - {{BinaryType}} > - {{DecimalType}} (but Spark SQL doesn't support pushing down {{DecimalType}} > columns for now.) > To avoid wrong query results, we should disable filter push-down for columns > of {{StringType}} and {{BinaryType}} until we upgrade to parquet-mr 1.8. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-15176) Job Scheduling Within Application Suffers from Priority Inversion
[ https://issues.apache.org/jira/browse/SPARK-15176?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15311073#comment-15311073 ] Mark Hamstra commented on SPARK-15176: -- I'm not strongly committed to any API (other than the fact that most of it is already public), but I think we should strive to have as much symmetry as makes sense between the-thing-that-enforces-a-lower-resource-bound and the-thing-that-enforces-an-upper-resource-bound on a pool. That includes how the documentation talks about these things. To encumber the discussion with something that likely needs to go into another JIRA and additional PRs, what I think we want longer term is not just a static upper bound on the number of cores that a pool can use, but rather to allow the pool to acquire as many cores over its minShare as are available, but for running Tasks to be preemptible until no more than the maximum number of cores for that pool are used. With static upper bounds, we're likely to leave cores unused and for Jobs to take longer than they need to in many instances. > Job Scheduling Within Application Suffers from Priority Inversion > - > > Key: SPARK-15176 > URL: https://issues.apache.org/jira/browse/SPARK-15176 > Project: Spark > Issue Type: Bug > Components: Scheduler >Affects Versions: 1.6.1 >Reporter: Nick White > > Say I have two pools, and N cores in my cluster: > * I submit a job to one, which has M >> N tasks > * N of the M tasks are scheduled > * I submit a job to the second pool - but none of its tasks get scheduled > until a task from the other pool finishes! > This can lead to unbounded denial-of-service for the second pool - regardless > of `minShare` or `weight` settings. Ideally Spark would support a pre-emption > mechanism, or an upper bound on a pool's resource usage. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-15176) Job Scheduling Within Application Suffers from Priority Inversion
[ https://issues.apache.org/jira/browse/SPARK-15176?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15304589#comment-15304589 ] Mark Hamstra commented on SPARK-15176: -- It's not an unreasonable use case, and is similar in many respects to the issues that arise when trying to do multi-tenancy within a single Spark Application -- i.e. you don't want a single query or ingestion Job to consume all the cluster resources, blocking other Jobs from making any progress. There currently isn't a good way to do this within a single Application since even the fair-scheduling pools don't have a way to cap the resources that a particular Job can grab and hold onto indefinitely as long as no other Job is asking for those resources at the point where the greedy Job makes its resource grab. As a consequence, the only way to address these kinds of maximum resource usage requirements at present is to run multiple Applications while capping the maximum resource usage for each Application. That, of course, makes sharing RDDs much less convenient. I'm not convinced that limiting the amount of tasks a given pool can run is the correct approach. There is, however, an interesting asymmetry in the current fair-scheduling. The minShare property can set the minimum number of cores that a pool will require to be able to schedule anything, but there is no corresponding property to cap the maximum number of cores that a pool will use. That kind of maxShare cap is really what this JIRA and https://github.com/apache/spark/pull/12951 are, or should be, driving at -- the discussion is currently a little muddled with its emphasis on limiting Tasks when the focus of the discussion should really be on limiting the cores available to a given pool. If we want to be able to guarantee that one Job within an Application cannot block all other Jobs from making any progress, then we need this maxShare change; else we are forcing using users to run multiple Applications. > Job Scheduling Within Application Suffers from Priority Inversion > - > > Key: SPARK-15176 > URL: https://issues.apache.org/jira/browse/SPARK-15176 > Project: Spark > Issue Type: Bug > Components: Scheduler >Affects Versions: 1.6.1 >Reporter: Nick White > > Say I have two pools, and N cores in my cluster: > * I submit a job to one, which has M >> N tasks > * N of the M tasks are scheduled > * I submit a job to the second pool - but none of its tasks get scheduled > until a task from the other pool finishes! > This can lead to unbounded denial-of-service for the second pool - regardless > of `minShare` or `weight` settings. Ideally Spark would support a pre-emption > mechanism, or an upper bound on a pool's resource usage. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-14582) Increase the parallelism for small tables
[ https://issues.apache.org/jira/browse/SPARK-14582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15240389#comment-15240389 ] Mark Hamstra commented on SPARK-14582: -- The total absence of any description in both this JIRA and the accompanying PR is really annoying -- especially because I find that queries involving small tables frequently suffer from using too much parallelism, not too little. > Increase the parallelism for small tables > - > > Key: SPARK-14582 > URL: https://issues.apache.org/jira/browse/SPARK-14582 > Project: Spark > Issue Type: Improvement > Components: SQL >Reporter: Davies Liu >Assignee: Davies Liu > -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-9882) Priority-based scheduling for Spark applications
[ https://issues.apache.org/jira/browse/SPARK-9882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15234024#comment-15234024 ] Mark Hamstra edited comment on SPARK-9882 at 4/10/16 9:49 AM: -- This isn't a very well written JIRA. You are just duplicating some of the description you gave in the PR. That description is good for a PR -- it explains what is being done. That is not good for a JIRA ticket, which should explain why something should be changed or added. In other words, the "Motivation" section of your PR description really belongs here in the JIRA description. was (Author: markhamstra): This isn't a very well written JIRA. You are just duplicating the description you gave in the PR. That description is good for a PR -- it explains what is being done. That is not good for a JIRA ticket, which should explain why something should be changed or added. > Priority-based scheduling for Spark applications > > > Key: SPARK-9882 > URL: https://issues.apache.org/jira/browse/SPARK-9882 > Project: Spark > Issue Type: New Feature > Components: Spark Core >Reporter: Liang-Chi Hsieh > > This is a priority-based scheduling based on the interface proposed in #7958. > It uses a XML configuration file to specify the submission pools for > applications. This patch adds a new parameter `--pool` to SparkSubmit under > standalone mode to specify which pool the submitted application should be > assigned to. The priority of the submitted application is defined by the > assigned pool. It also defines the cores it can acquire. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-9882) Priority-based scheduling for Spark applications
[ https://issues.apache.org/jira/browse/SPARK-9882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15234024#comment-15234024 ] Mark Hamstra commented on SPARK-9882: - This isn't a very well written JIRA. You are just duplicating the description you gave in the PR. That description is good for a PR -- it explains what is being done. That is not good for a JIRA ticket, which should explain why something should be changed or added. > Priority-based scheduling for Spark applications > > > Key: SPARK-9882 > URL: https://issues.apache.org/jira/browse/SPARK-9882 > Project: Spark > Issue Type: New Feature > Components: Spark Core >Reporter: Liang-Chi Hsieh > > This is a priority-based scheduling based on the interface proposed in #7958. > It uses a XML configuration file to specify the submission pools for > applications. This patch adds a new parameter `--pool` to SparkSubmit under > standalone mode to specify which pool the submitted application should be > assigned to. The priority of the submitted application is defined by the > assigned pool. It also defines the cores it can acquire. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-13806) SQL round() produces incorrect results for negative values
[ https://issues.apache.org/jira/browse/SPARK-13806?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15205134#comment-15205134 ] Mark Hamstra commented on SPARK-13806: -- Yes, there is the mostly orthogonal question about which rounding strategy should be used -- see the comments in SPARK-8279. But, assuming that we are adopting the ROUND_HALF_UP strategy, there is the problem with negative values that this JIRA points out: When using ROUND_HALF_UP and scale == 0, -x.5 must round to -(x+1), but Math.round will round it to -x. In addition to this, the code gen for rounding of negative floating point values with negative scales is broken. All of this stems from Spark SQL's implementation of round() being untested with negative values. > SQL round() produces incorrect results for negative values > -- > > Key: SPARK-13806 > URL: https://issues.apache.org/jira/browse/SPARK-13806 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.0, 1.6.1, 2.0.0 >Reporter: Mark Hamstra > > Round in catalyst/expressions/mathExpressions.scala appears to be untested > with negative values, and it doesn't handle them correctly. > There are at least two issues here: > First, in the genCode for FloatType and DoubleType with _scale == 0, round() > will not produce the same results as for the BigDecimal.ROUND_HALF_UP > strategy used in all other cases. This is because Math.round is used for > these _scale == 0 cases. For example, Math.round(-3.5) is -3, while > BigDecimal.ROUND_HALF_UP at scale 0 for -3.5 is -4. > Even after this bug is fixed with something like... > {code} > if (${ce.value} < 0) { > ${ev.value} = -1 * Math.round(-1 * ${ce.value}); > } else { > ${ev.value} = Math.round(${ce.value}); > } > {code} > ...which will allow an additional test like this to succeed in > MathFunctionsSuite.scala: > {code} > checkEvaluation(Round(-3.5D, 0), -4.0D, EmptyRow) > {code} > ...there still appears to be a problem on at least the > checkEvalutionWithUnsafeProjection path, where failures like this are > produced: > {code} > Incorrect evaluation in unsafe mode: round(-3.141592653589793, -6), actual: > [0,0], expected: [0,8000] (ExpressionEvalHelper.scala:145) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-13872) Memory leak in SortMergeOuterJoin
[ https://issues.apache.org/jira/browse/SPARK-13872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15196561#comment-15196561 ] Mark Hamstra commented on SPARK-13872: -- [~joshrosen] > Memory leak in SortMergeOuterJoin > - > > Key: SPARK-13872 > URL: https://issues.apache.org/jira/browse/SPARK-13872 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.1 >Reporter: Ian > Attachments: Screen Shot 2016-03-11 at 5.42.32 PM.png > > > SortMergeJoin composes its partition/iterator from > org.apache.spark.sql.execution.Sort, which in turns designates the sorting to > UnsafeExternalRowSorter. > UnsafeExternalRowSorter's implementation cleans up the resources when: > 1. org.apache.spark.sql.catalyst.util.AbstractScalaRowIterator is fully > iterated. > 2. task is done execution. > In outer join case of SortMergeJoin, when the left or right iterator is not > fully iterated, the only chance for the resources to be cleaned up is at the > end of the spark task run. > This probably ok most of the time, however when a SortMergeOuterJoin is > nested within a CartesianProduct, the "deferred" resources cleanup allows a > none-ignorable memory leak amplified/cumulated by the loop driven by the > CartesianRdd's looping iteration. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-13806) SQL round() produces incorrect results for negative values
[ https://issues.apache.org/jira/browse/SPARK-13806?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mark Hamstra updated SPARK-13806: - Description: Round in catalyst/expressions/mathExpressions.scala appears to be untested with negative values, and it doesn't handle them correctly. There are at least two issues here: First, in the genCode for FloatType and DoubleType with _scale == 0, round() will not produce the same results as for the BigDecimal.ROUND_HALF_UP strategy used in all other cases. This is because Math.round is used for these _scale == 0 cases. For example, Math.round(-3.5) is -3, while BigDecimal.ROUND_HALF_UP at scale 0 for -3.5 is -4. Even after this bug is fixed with something like... {code} if (${ce.value} < 0) { ${ev.value} = -1 * Math.round(-1 * ${ce.value}); } else { ${ev.value} = Math.round(${ce.value}); } {code} ...which will allow an additional test like this to succeed in MathFunctionsSuite.scala: {code} checkEvaluation(Round(-3.5D, 0), -4.0D, EmptyRow) {code} ...there still appears to be a problem on at least the checkEvalutionWithUnsafeProjection path, where failures like this are produced: {code} Incorrect evaluation in unsafe mode: round(-3.141592653589793, -6), actual: [0,0], expected: [0,8000] (ExpressionEvalHelper.scala:145) {code} was: Round in catalyst/expressions/mathExpressions.scala appears to be untested with negative values, and it doesn't handle them correctly. There are at least two issues here: First, in the genCode for FloatType and DoubleType with _scale == 0, round() will not produce the same results as for the BigDecimal.ROUND_HALF_UP strategy used in all other cases. This is because Math.round is used for these _scale == 0 cases. For example, Math.round(-3.5) is -3, while BigDecimal.ROUND_HALF_UP at scale 0 for -3.5 is -4. Even after this bug is fixed with something like... {code} if (${ce.value} < 0) { ${ev.value} = -1 * Math.round(-1 * ${ce.value}); } else { ${ev.value} = Math.round(${ce.value}); } {code} ...which will allow an additional test like this to succeed in MathFunctionsSuite.scala: {code} checkEvaluation(Round(-3.5D, 0), -4.0D) {code} ...there still appears to be a problem on at least the checkEvalutionWithUnsafeProjection path, where failures like this are produced: {code} Incorrect evaluation in unsafe mode: round(-3.141592653589793, -6), actual: [0,0], expected: [0,8000] (ExpressionEvalHelper.scala:145) {code} > SQL round() produces incorrect results for negative values > -- > > Key: SPARK-13806 > URL: https://issues.apache.org/jira/browse/SPARK-13806 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.0, 1.6.1, 2.0.0 >Reporter: Mark Hamstra > > Round in catalyst/expressions/mathExpressions.scala appears to be untested > with negative values, and it doesn't handle them correctly. > There are at least two issues here: > First, in the genCode for FloatType and DoubleType with _scale == 0, round() > will not produce the same results as for the BigDecimal.ROUND_HALF_UP > strategy used in all other cases. This is because Math.round is used for > these _scale == 0 cases. For example, Math.round(-3.5) is -3, while > BigDecimal.ROUND_HALF_UP at scale 0 for -3.5 is -4. > Even after this bug is fixed with something like... > {code} > if (${ce.value} < 0) { > ${ev.value} = -1 * Math.round(-1 * ${ce.value}); > } else { > ${ev.value} = Math.round(${ce.value}); > } > {code} > ...which will allow an additional test like this to succeed in > MathFunctionsSuite.scala: > {code} > checkEvaluation(Round(-3.5D, 0), -4.0D, EmptyRow) > {code} > ...there still appears to be a problem on at least the > checkEvalutionWithUnsafeProjection path, where failures like this are > produced: > {code} > Incorrect evaluation in unsafe mode: round(-3.141592653589793, -6), actual: > [0,0], expected: [0,8000] (ExpressionEvalHelper.scala:145) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-13806) SQL round() produces incorrect results for negative values
[ https://issues.apache.org/jira/browse/SPARK-13806?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mark Hamstra updated SPARK-13806: - Description: Round in catalyst/expressions/mathExpressions.scala appears to be untested with negative values, and it doesn't handle them correctly. There are at least two issues here: First, in the genCode for FloatType and DoubleType with _scale == 0, round() will not produce the same results as for the BigDecimal.ROUND_HALF_UP strategy used in all other cases. This is because Math.round is used for these _scale == 0 cases. For example, Math.round(-3.5) is -3, while BigDecimal.ROUND_HALF_UP at scale 0 for -3.5 is -4. Even after this bug is fixed with something like... {code} if (${ce.value} < 0) { ${ev.value} = -1 * Math.round(-1 * ${ce.value}); } else { ${ev.value} = Math.round(${ce.value}); } {code} ...which will allow an additional test like this to succeed in MathFunctionsSuite.scala: {code} checkEvaluation(Round(-3.5D, 0), -4.0D) {code} ...there still appears to be a problem on at least the checkEvalutionWithUnsafeProjection path, where failures like this are produced: {code} Incorrect evaluation in unsafe mode: round(-3.141592653589793, -6), actual: [0,0], expected: [0,8000] (ExpressionEvalHelper.scala:145) {code} was: Round in catalyst/expressions/mathExpressions.scala appears to be untested with negative values, and it doesn't handle them correctly. There are at least two issues here: 1) In the genCode for FloatType and DoubleType with _scale == 0, round() will not produce the same results as for the BigDecimal.ROUND_HALF_UP strategy used in all other cases. This is because Math.round is used for these _scale == 0 cases. For example, Math.round(-3.5) is -3, while BigDecimal.ROUND_HALF_UP at scale 0 for -3.5 is -4. Even after this bug is fixed with something like... {code} if (${ce.value} < 0) { ${ev.value} = -1 * Math.round(-1 * ${ce.value}); } else { ${ev.value} = Math.round(${ce.value}); } {code} ...which will allow an additional test like this to succeed in MathFunctionsSuite.scala: {code} checkEvaluation(Round(-3.5D, 0), -4.0D) {code} ...there still appears to be a problem on at least the checkEvalutionWithUnsafeProjection path, where failures like this are produced: {code} Incorrect evaluation in unsafe mode: round(-3.141592653589793, -6), actual: [0,0], expected: [0,8000] (ExpressionEvalHelper.scala:145) {code} > SQL round() produces incorrect results for negative values > -- > > Key: SPARK-13806 > URL: https://issues.apache.org/jira/browse/SPARK-13806 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.0, 1.6.1, 2.0.0 >Reporter: Mark Hamstra > > Round in catalyst/expressions/mathExpressions.scala appears to be untested > with negative values, and it doesn't handle them correctly. > There are at least two issues here: > First, in the genCode for FloatType and DoubleType with _scale == 0, round() > will not produce the same results as for the BigDecimal.ROUND_HALF_UP > strategy used in all other cases. This is because Math.round is used for > these _scale == 0 cases. For example, Math.round(-3.5) is -3, while > BigDecimal.ROUND_HALF_UP at scale 0 for -3.5 is -4. > Even after this bug is fixed with something like... > {code} > if (${ce.value} < 0) { > ${ev.value} = -1 * Math.round(-1 * ${ce.value}); > } else { > ${ev.value} = Math.round(${ce.value}); > } > {code} > ...which will allow an additional test like this to succeed in > MathFunctionsSuite.scala: > {code} > checkEvaluation(Round(-3.5D, 0), -4.0D) > {code} > ...there still appears to be a problem on at least the > checkEvalutionWithUnsafeProjection path, where failures like this are > produced: > {code} > Incorrect evaluation in unsafe mode: round(-3.141592653589793, -6), actual: > [0,0], expected: [0,8000] (ExpressionEvalHelper.scala:145) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-13806) SQL round() produces incorrect results for negative values
Mark Hamstra created SPARK-13806: Summary: SQL round() produces incorrect results for negative values Key: SPARK-13806 URL: https://issues.apache.org/jira/browse/SPARK-13806 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 1.6.1, 1.6.0, 2.0.0 Reporter: Mark Hamstra Round in catalyst/expressions/mathExpressions.scala appears to be untested with negative values, and it doesn't handle them correctly. There are at least two issues here: 1) In the genCode for FloatType and DoubleType with _scale == 0, round() will not produce the same results as for the BigDecimal.ROUND_HALF_UP strategy used in all other cases. This is because Math.round is used for these _scale == 0 cases. For example, Math.round(-3.5) is -3, while BigDecimal.ROUND_HALF_UP at scale 0 for -3.5 is -4. Even after this bug is fixed with something like... {code} if (${ce.value} < 0) { ${ev.value} = -1 * Math.round(-1 * ${ce.value}); } else { ${ev.value} = Math.round(${ce.value}); } {code} ...which will allow an additional test like this to succeed in MathFunctionsSuite.scala: {code} checkEvaluation(Round(-3.5D, 0), -4.0D) {code} ...there still appears to be a problem on at least the checkEvalutionWithUnsafeProjection path, where failures like this are produced: {code} Incorrect evaluation in unsafe mode: round(-3.141592653589793, -6), actual: [0,0], expected: [0,8000] (ExpressionEvalHelper.scala:145) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-11838) Spark SQL query fragment RDD reuse across queries
[ https://issues.apache.org/jira/browse/SPARK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mark Hamstra updated SPARK-11838: - Summary: Spark SQL query fragment RDD reuse across queries (was: Spark SQL query fragment RDD reuse) > Spark SQL query fragment RDD reuse across queries > - > > Key: SPARK-11838 > URL: https://issues.apache.org/jira/browse/SPARK-11838 > Project: Spark > Issue Type: Improvement > Components: SQL >Reporter: Mikhail Bautin > > With many analytical Spark SQL workloads against slowly changing tables, > successive queries frequently share fragments that produce the same result. > Instead of re-computing those fragments for every query, it makes sense to > detect similar fragments and substitute RDDs previously created for matching > SparkPlan fragments into every new SparkPlan at the execution time whenever > possible. Even if no RDDs are persist()-ed to memory/disk/off-heap memory, > many stages can still be skipped due to map output files being present on > executor nodes. > The implementation involves the following steps: > (1) Logical plan "canonicalization". > Logical plans mapping to the same "canonical" logical plan should always > produce the same results (except for possible output column reordering), > although the inverse statement won't always be true. > - Re-mapping expression ids to "canonical expression ids" (successively > increasing numbers always starting with 1). > - Eliminating alias names that are unimportant after analysis completion. > Only the names that are necessary to determine the Hive table columns to be > scanned are retained. > - Reordering columns in projections, grouping/aggregation expressions, etc. > This can be done e.g. by using the string representation as a sort key. Union > inputs always have to be reordered the same way. > - Tree traversal has to happen starting from leaves and progressing towards > the root, because we need to already have identified canonical expression ids > for children of a node before we can come up with sort keys that would allow > to reorder expressions in a node deterministically. This is a bit more > complicated for Union nodes. > - Special handling for MetastoreRelations. We replace MetastoreRelation > with a special class CanonicalMetastoreRelation that uses attributes and > partitionKeys as part of its equals() and hashCode() implementation, but the > visible attributes and aprtitionKeys are restricted to expression ids that > the rest of the query actually needs from that MetastoreRelation. > An example of logical plans and corresponding canonical logical plans: > https://gist.githubusercontent.com/mbautin/ef1317b341211d9606cf/raw > (2) Tracking LogicalPlan fragments corresponding to SparkPlan fragments. When > generating a SparkPlan, we keep an optional reference to a LogicalPlan > instance in every node. This allows us to populate the cache with mappings > from canonical logical plans of query fragments to the corresponding RDDs > generated as part of query execution. Note that there is no new work > necessary to generate the RDDs, we are merely utilizing the RDDs that would > have been produced as part of SparkPlan execution anyway. > (3) SparkPlan fragment substitution. After generating a SparkPlan and before > calling prepare() or execute() on it, we check if any of its nodes have an > associated LogicalPlan that maps to a canonical logical plan matching a cache > entry. If so, we substitute a PhysicalRDD (or a new class UnsafePhysicalRDD > wrapping an RDD of UnsafeRow) scanning the previously created RDD instead of > the current query fragment. If the expected column order differs from what > the current SparkPlan fragment produces, we add a projection to reorder the > columns. We also add safe/unsafe row conversions as necessary to match the > row type that is expected by the parent of the current SparkPlan fragment. > (4) The execute() method of SparkPlan also needs to perform the cache lookup > and substitution described above before producing a new RDD for the current > SparkPlan node. The "loading cache" pattern (e.g. as implemented in Guava) > allows to reuse query fragments between simultaneously submitted queries: > whichever query runs execute() for a particular fragment's canonical logical > plan starts producing an RDD first, and if another query has a fragment with > the same canonical logical plan, it waits for the RDD to be produced by the > first query and inserts it in its SparkPlan instead. > This kind of query fragment caching will mostly be useful for slowly-changing > or static tables. Even with slowly-changing tables, the cache needs to be > invalidated when those data set changes take place. One of the following > approaches could be
[jira] [Comment Edited] (SPARK-13756) Reuse Query Fragments
[ https://issues.apache.org/jira/browse/SPARK-13756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15185991#comment-15185991 ] Mark Hamstra edited comment on SPARK-13756 at 3/8/16 10:42 PM: --- Collecting the prior issues under an umbrella. was (Author: markhamstra): Fragment reuse across queries > Reuse Query Fragments > - > > Key: SPARK-13756 > URL: https://issues.apache.org/jira/browse/SPARK-13756 > Project: Spark > Issue Type: Umbrella > Components: SQL >Reporter: Mark Hamstra > > Query fragments that have been materialized in RDDs can and should be reused > either within the same query or in subsequent queries. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-13756) Reuse Query Fragments
[ https://issues.apache.org/jira/browse/SPARK-13756?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mark Hamstra updated SPARK-13756: - Description: Query fragments that have been materialized in RDDs can and should be reused either within the same query or in subsequent queries. (was: Query fragments that have been materialized in RDDs can and should be reused in subsequent queries.) > Reuse Query Fragments > - > > Key: SPARK-13756 > URL: https://issues.apache.org/jira/browse/SPARK-13756 > Project: Spark > Issue Type: Umbrella > Components: SQL >Reporter: Mark Hamstra > > Query fragments that have been materialized in RDDs can and should be reused > either within the same query or in subsequent queries. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-13756) Reuse Query Fragments
[ https://issues.apache.org/jira/browse/SPARK-13756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15185991#comment-15185991 ] Mark Hamstra commented on SPARK-13756: -- Fragment reuse across queries > Reuse Query Fragments > - > > Key: SPARK-13756 > URL: https://issues.apache.org/jira/browse/SPARK-13756 > Project: Spark > Issue Type: Umbrella > Components: SQL >Reporter: Mark Hamstra > > Query fragments that have been materialized in RDDs can and should be reused > in subsequent queries. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-13523) Reuse the exchanges in a query
[ https://issues.apache.org/jira/browse/SPARK-13523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15185967#comment-15185967 ] Mark Hamstra edited comment on SPARK-13523 at 3/8/16 10:36 PM: --- Yes that is a good point. But they are closely enough linked (e.g. in the need for plan canonicalization) that I'm going to create an ubmrella to cover both issues. was (Author: markhamstra): Yes that is a good point. But they are closely enough linked (e.g. in the need for plan canonicalization) that I'm going to create an epic to cover both issues. > Reuse the exchanges in a query > -- > > Key: SPARK-13523 > URL: https://issues.apache.org/jira/browse/SPARK-13523 > Project: Spark > Issue Type: New Feature > Components: SQL >Reporter: Davies Liu >Assignee: Davies Liu > > In exchange, the RDD will be materialized (shuffled or collected), it's a > good point to eliminate common part of a query. > In some TPCDS queries (for example, Q64), the same exchange (ShuffleExchange > or BroadcastExchange) could be used multiple times, we should re-use them to > avoid the duplicated work and reduce the memory for broadcast. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-13523) Reuse the exchanges in a query
[ https://issues.apache.org/jira/browse/SPARK-13523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15185967#comment-15185967 ] Mark Hamstra edited comment on SPARK-13523 at 3/8/16 10:36 PM: --- Yes that is a good point. But they are closely enough linked (e.g. in the need for plan canonicalization) that I'm going to create an umbrella to cover both issues. was (Author: markhamstra): Yes that is a good point. But they are closely enough linked (e.g. in the need for plan canonicalization) that I'm going to create an ubmrella to cover both issues. > Reuse the exchanges in a query > -- > > Key: SPARK-13523 > URL: https://issues.apache.org/jira/browse/SPARK-13523 > Project: Spark > Issue Type: New Feature > Components: SQL >Reporter: Davies Liu >Assignee: Davies Liu > > In exchange, the RDD will be materialized (shuffled or collected), it's a > good point to eliminate common part of a query. > In some TPCDS queries (for example, Q64), the same exchange (ShuffleExchange > or BroadcastExchange) could be used multiple times, we should re-use them to > avoid the duplicated work and reduce the memory for broadcast. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-13756) Reuse Query Fragments
Mark Hamstra created SPARK-13756: Summary: Reuse Query Fragments Key: SPARK-13756 URL: https://issues.apache.org/jira/browse/SPARK-13756 Project: Spark Issue Type: Umbrella Components: SQL Reporter: Mark Hamstra Query fragments that have been materialized in RDDs can and should be reused in subsequent queries. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-13523) Reuse the exchanges in a query
[ https://issues.apache.org/jira/browse/SPARK-13523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15185967#comment-15185967 ] Mark Hamstra commented on SPARK-13523: -- Yes that is a good point. But they are closely enough linked (e.g. in the need for plan canonicalization) that I'm going to create an epic to cover both issues. > Reuse the exchanges in a query > -- > > Key: SPARK-13523 > URL: https://issues.apache.org/jira/browse/SPARK-13523 > Project: Spark > Issue Type: New Feature > Components: SQL >Reporter: Davies Liu >Assignee: Davies Liu > > In exchange, the RDD will be materialized (shuffled or collected), it's a > good point to eliminate common part of a query. > In some TPCDS queries (for example, Q64), the same exchange (ShuffleExchange > or BroadcastExchange) could be used multiple times, we should re-use them to > avoid the duplicated work and reduce the memory for broadcast. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-13523) Reuse the exchanges in a query
[ https://issues.apache.org/jira/browse/SPARK-13523?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mark Hamstra resolved SPARK-13523. -- Resolution: Duplicate > Reuse the exchanges in a query > -- > > Key: SPARK-13523 > URL: https://issues.apache.org/jira/browse/SPARK-13523 > Project: Spark > Issue Type: New Feature > Components: SQL >Reporter: Davies Liu > > In exchange, the RDD will be materialized (shuffled or collected), it's a > good point to eliminate common part of a query. > In some TPCDS queries (for example, Q64), the same exchange (ShuffleExchange > or BroadcastExchange) could be used multiple times, we should re-use them to > avoid the duplicated work and reduce the memory for broadcast. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-13087) Grouping by a complex expression may lead to incorrect AttributeReferences in aggregations
Mark Hamstra created SPARK-13087: Summary: Grouping by a complex expression may lead to incorrect AttributeReferences in aggregations Key: SPARK-13087 URL: https://issues.apache.org/jira/browse/SPARK-13087 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 1.6.0 Reporter: Mark Hamstra This is a regression from 1.5. An example of the failure: Working with this table... {code} 0: jdbc:hive2://10.1.3.203:1> DESCRIBE csd_0ae1abc1_a3af_4c63_95b0_9599faca6c3d; +---++--+--+ | col_name| data_type | comment | +---++--+--+ | c_date| timestamp | NULL | | c_count | int| NULL | | c_location_fips_code | string | NULL | | c_airtemp | float | NULL | | c_dewtemp | float | NULL | | c_pressure| int| NULL | | c_rain| float | NULL | | c_snow| float | NULL | +---++--+--+ {code} ...and this query (which isn't necessarily all that sensical or useful, but has been adapted from a similarly failing query that uses a custom UDF where the Spark SQL built-in `day` function has been substituted into this query)... {code} SELECT day ( c_date ) AS c_date, percentile_approx(c_rain, 0.5) AS c_expr_1256887735 FROM csd_0ae1abc1_a3af_4c63_95b0_9599faca6c3d GROUP BY day ( c_date ) ORDER BY c_date; {code} Spark 1.5 produces the expected results without error. In Spark 1.6, this plan is produced... {code} Exchange rangepartitioning(c_date#63009 ASC,16), None +- SortBasedAggregate(key=[dayofmonth(cast(c_date#63011 as date))#63020], functions=[(hiveudaffunction(HiveFunctionWrapper(org.apache.hadoop.hive.ql.udf.generic.GenericUDAFPercentileApprox,org.apache.hadoop.hive.ql.udf.generic.Gene ricUDAFPercentileApprox@6f211801),c_rain#63017,0.5,false,0,0),mode=Complete,isDistinct=false)], output=[c_date#63009,c_expr_1256887735#63010]) +- ConvertToSafe +- !Sort [dayofmonth(cast(c_date#63011 as date))#63020 ASC], false, 0 +- !TungstenExchange hashpartitioning(dayofmonth(cast(c_date#63011 as date))#63020,16), None +- ConvertToUnsafe +- HiveTableScan [c_date#63011,c_rain#63017], MetastoreRelation default, csd_0ae1abc1_a3af_4c63_95b0_9599faca6c3d, None {code} ...which fails with a TreeNodeException and stack traces that include this... {code} Caused by: ! org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2842.0 failed 4 times, most recent failure: Lost task 0.3 in stage 2842.0 (TID 15007, ip-10-1-1-59.dev.clearstory.com): org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Binding attribute, tree: dayofmonth(cast(c_date#63011 as date))#63020 at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:49) at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:86) at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:85) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:259) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:259) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:258) at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:249) at org.apache.spark.sql.catalyst.expressions.BindReferences$.bindReference(BoundAttribute.scala:85) at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection$$anonfun$$init$$2.apply(Projection.scala:62) at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection$$anonfun$$init$$2.apply(Projection.scala:62) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.(Projection.scala:62) at org.apache.spark.sql.execution.SparkPlan$$anonfun$newMutableProjection$1.apply(SparkPlan.scala:254) at org.apache.spark.sql.execution.SparkPlan$$anonfun$newMutableProjection$1.apply(SparkPlan.scala:254) at
[jira] [Commented] (SPARK-8279) udf_round_3 test fails
[ https://issues.apache.org/jira/browse/SPARK-8279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15117924#comment-15117924 ] Mark Hamstra commented on SPARK-8279: - It's a bit of an under-defined mess, actually. I don't have a SQL Standard document in front of me to be able to say whether any of the standards specifies a particular rounding scheme. W3 suggests: "Many database systems have adopted the IEEE 754 standard for arithmetic operations, according to which the default rounding behavior is 'round half to even.'" PostgreSQL and Oracle both use round half away from zero for NUMBER values but bankers rounding/half-even for BINARY_FLOAT and BINARY_DOUBLE (cf. IEEE754 Conformance for BINARY_FLOAT and BINARY_DOUBLE in https://docs.oracle.com/database/121/SQLRF/sql_elements001.htm#SQLRF0021 -- where for these types "The default rounding mode [i.e. round half to even] is supported." Other data-related tools also don't rigorously adhere to a standard rounding scheme, and Microsoft products are not consistent across, e.g., versions of VB, .NET, SQL Server, and EXCEL. Much of the time, the precise rounding scheme being used doesn't matter much. When it does matter, though, we should at least have the ability to require that bankers rounding (and perhaps other rounding schemes) be enforced. > udf_round_3 test fails > -- > > Key: SPARK-8279 > URL: https://issues.apache.org/jira/browse/SPARK-8279 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Reynold Xin >Assignee: Yijie Shen >Priority: Blocker > Fix For: 1.5.0 > > > query > {code} > select round(cast(negative(pow(2, 31)) as INT)), round(cast((pow(2, 31) - 1) > as INT)), round(-32769), round(32768) from src tablesample (1 rows); > {code} > {code} > [info] - udf_round_3 *** FAILED *** (4 seconds, 803 milliseconds) > [info] Failed to execute query using catalyst: > [info] Error: java.lang.Integer cannot be cast to java.lang.Double > [info] java.lang.ClassCastException: java.lang.Integer cannot be cast to > java.lang.Double > [info]at > scala.runtime.BoxesRunTime.unboxToDouble(BoxesRunTime.java:119) > [info]at > org.apache.spark.sql.catalyst.expressions.BinaryMathExpression.eval(math.scala:86) > [info]at > org.apache.spark.sql.hive.HiveInspectors$class.toInspector(HiveInspectors.scala:628) > [info]at > org.apache.spark.sql.hive.HiveGenericUdf.toInspector(hiveUdfs.scala:148) > [info]at > org.apache.spark.sql.hive.HiveGenericUdf$$anonfun$argumentInspectors$1.apply(hiveUdfs.scala:160) > [info]at > org.apache.spark.sql.hive.HiveGenericUdf$$anonfun$argumentInspectors$1.apply(hiveUdfs.scala:160) > [info]at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > [info]at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > [info]at scala.collection.immutable.List.foreach(List.scala:318) > [info]at > scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > [info]at > scala.collection.AbstractTraversable.map(Traversable.scala:105) > [info]at > org.apache.spark.sql.hive.HiveGenericUdf.argumentInspectors$lzycompute(hiveUdfs.scala:160) > [info]at > org.apache.spark.sql.hive.HiveGenericUdf.argumentInspectors(hiveUdfs.scala:160) > [info]at > org.apache.spark.sql.hive.HiveGenericUdf.returnInspector$lzycompute(hiveUdfs.scala:164) > [info]at > org.apache.spark.sql.hive.HiveGenericUdf.returnInspector(hiveUdfs.scala:163) > [info]at > org.apache.spark.sql.hive.HiveGenericUdf.dataType$lzycompute(hiveUdfs.scala:180) > [info]at > org.apache.spark.sql.hive.HiveGenericUdf.dataType(hiveUdfs.scala:180) > [info]at > org.apache.spark.sql.catalyst.expressions.Cast.resolved$lzycompute(Cast.scala:31) > [info]at > org.apache.spark.sql.catalyst.expressions.Cast.resolved(Cast.scala:31) > [info]at > org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$childrenResolved$1.apply(Expression.scala:121) > [info]at > org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$childrenResolved$1.apply(Expression.scala:121) > [info]at > scala.collection.LinearSeqOptimized$class.forall(LinearSeqOptimized.scala:70) > [info]at scala.collection.immutable.List.forall(List.scala:84) > [info]at > org.apache.spark.sql.catalyst.expressions.Expression.childrenResolved(Expression.scala:121) > [info]at > org.apache.spark.sql.catalyst.expressions.Expression.resolved$lzycompute(Expression.scala:109) > [info]at > org.apache.spark.sql.catalyst.expressions.Expression.resolved(Expression.scala:109) > [info]at >
[jira] [Commented] (SPARK-12485) Rename "dynamic allocation" to "elastic scaling"
[ https://issues.apache.org/jira/browse/SPARK-12485?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15106820#comment-15106820 ] Mark Hamstra commented on SPARK-12485: -- Actually, Sean, I'd argue that they are not the same and that something like "elastic scaling" be retained for another concept. What Spark is doing with dynamic allocation is to re-allocate resources from an essentially fixed pool of cluster resources -- a particular Application may be re-assigned a different number of Executors, for example, but the total number of Worker nodes remains the same. Elastic scaling, on the other hand, I would argue should be applied to dynamically changing the number of Worker nodes or other cluster resources -- changing the size of the pool, not just re-allocating from a constant size pool. > Rename "dynamic allocation" to "elastic scaling" > > > Key: SPARK-12485 > URL: https://issues.apache.org/jira/browse/SPARK-12485 > Project: Spark > Issue Type: Sub-task > Components: Spark Core >Reporter: Andrew Or >Assignee: Andrew Or > > Fewer syllables, sounds more natural. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12485) Rename "dynamic allocation" to "elastic scaling"
[ https://issues.apache.org/jira/browse/SPARK-12485?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15106966#comment-15106966 ] Mark Hamstra commented on SPARK-12485: -- Right, I am saying that "dynamic allocation" is the right name for what Spark currently does, and that "elastic scaling" would be a good name for changing the number of cluster nodes (or otherwise modifying the size of the cluster resource pool), which Spark currently doesn't do or effectively support (but which would be a very nice feature to have.) > Rename "dynamic allocation" to "elastic scaling" > > > Key: SPARK-12485 > URL: https://issues.apache.org/jira/browse/SPARK-12485 > Project: Spark > Issue Type: Sub-task > Components: Spark Core >Reporter: Andrew Or >Assignee: Andrew Or > > Fewer syllables, sounds more natural. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-8279) udf_round_3 test fails
[ https://issues.apache.org/jira/browse/SPARK-8279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15090098#comment-15090098 ] Mark Hamstra commented on SPARK-8279: - Why is `round` in Spark SQL using HALF_UP instead of the typical database practice of using the IEEE 754 standard for arithmetic operations -- i.e. HALF_EVEN, a.k.a. "Banker's rounding"? > udf_round_3 test fails > -- > > Key: SPARK-8279 > URL: https://issues.apache.org/jira/browse/SPARK-8279 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Reynold Xin >Assignee: Yijie Shen >Priority: Blocker > Fix For: 1.5.0 > > > query > {code} > select round(cast(negative(pow(2, 31)) as INT)), round(cast((pow(2, 31) - 1) > as INT)), round(-32769), round(32768) from src tablesample (1 rows); > {code} > {code} > [info] - udf_round_3 *** FAILED *** (4 seconds, 803 milliseconds) > [info] Failed to execute query using catalyst: > [info] Error: java.lang.Integer cannot be cast to java.lang.Double > [info] java.lang.ClassCastException: java.lang.Integer cannot be cast to > java.lang.Double > [info]at > scala.runtime.BoxesRunTime.unboxToDouble(BoxesRunTime.java:119) > [info]at > org.apache.spark.sql.catalyst.expressions.BinaryMathExpression.eval(math.scala:86) > [info]at > org.apache.spark.sql.hive.HiveInspectors$class.toInspector(HiveInspectors.scala:628) > [info]at > org.apache.spark.sql.hive.HiveGenericUdf.toInspector(hiveUdfs.scala:148) > [info]at > org.apache.spark.sql.hive.HiveGenericUdf$$anonfun$argumentInspectors$1.apply(hiveUdfs.scala:160) > [info]at > org.apache.spark.sql.hive.HiveGenericUdf$$anonfun$argumentInspectors$1.apply(hiveUdfs.scala:160) > [info]at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > [info]at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > [info]at scala.collection.immutable.List.foreach(List.scala:318) > [info]at > scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > [info]at > scala.collection.AbstractTraversable.map(Traversable.scala:105) > [info]at > org.apache.spark.sql.hive.HiveGenericUdf.argumentInspectors$lzycompute(hiveUdfs.scala:160) > [info]at > org.apache.spark.sql.hive.HiveGenericUdf.argumentInspectors(hiveUdfs.scala:160) > [info]at > org.apache.spark.sql.hive.HiveGenericUdf.returnInspector$lzycompute(hiveUdfs.scala:164) > [info]at > org.apache.spark.sql.hive.HiveGenericUdf.returnInspector(hiveUdfs.scala:163) > [info]at > org.apache.spark.sql.hive.HiveGenericUdf.dataType$lzycompute(hiveUdfs.scala:180) > [info]at > org.apache.spark.sql.hive.HiveGenericUdf.dataType(hiveUdfs.scala:180) > [info]at > org.apache.spark.sql.catalyst.expressions.Cast.resolved$lzycompute(Cast.scala:31) > [info]at > org.apache.spark.sql.catalyst.expressions.Cast.resolved(Cast.scala:31) > [info]at > org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$childrenResolved$1.apply(Expression.scala:121) > [info]at > org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$childrenResolved$1.apply(Expression.scala:121) > [info]at > scala.collection.LinearSeqOptimized$class.forall(LinearSeqOptimized.scala:70) > [info]at scala.collection.immutable.List.forall(List.scala:84) > [info]at > org.apache.spark.sql.catalyst.expressions.Expression.childrenResolved(Expression.scala:121) > [info]at > org.apache.spark.sql.catalyst.expressions.Expression.resolved$lzycompute(Expression.scala:109) > [info]at > org.apache.spark.sql.catalyst.expressions.Expression.resolved(Expression.scala:109) > [info]at > org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$childrenResolved$1.apply(Expression.scala:121) > [info]at > org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$childrenResolved$1.apply(Expression.scala:121) > [info]at > scala.collection.LinearSeqOptimized$class.forall(LinearSeqOptimized.scala:70) > [info]at scala.collection.immutable.List.forall(List.scala:84) > [info]at > org.apache.spark.sql.catalyst.expressions.Expression.childrenResolved(Expression.scala:121) > [info]at > org.apache.spark.sql.catalyst.analysis.HiveTypeCoercion$ConvertNaNs$$anonfun$apply$2$$anonfun$applyOrElse$2.applyOrElse(HiveTypeCoercion.scala:138) > [info]at > org.apache.spark.sql.catalyst.analysis.HiveTypeCoercion$ConvertNaNs$$anonfun$apply$2$$anonfun$applyOrElse$2.applyOrElse(HiveTypeCoercion.scala:136) > [info]at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:222) > [info]at >
[jira] [Commented] (SPARK-6416) RDD.fold() requires the operator to be commutative
[ https://issues.apache.org/jira/browse/SPARK-6416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15076704#comment-15076704 ] Mark Hamstra commented on SPARK-6416: - I still don't see RDD#fold as being out of bounds with what should be expected from the Scala parallel collections model -- there, too, you can get confusing results if you don't pay attention to the partitioned nature of the operation: {code} scala> val list1 = (1 to 1).toList scala> val list2 = (1 to 100).toList scala> list1.fold(0){ case (a, b) => a + 1 } res0: Int = 1 scala> list1.par.fold(0){ case (a, b) => a + 1 } res1: Int = 162 scala> list2.fold(0){ case (a, b) => a + 1 } res2: Int = 100 scala> list2.par.fold(0){ case (a, b) => a + 1 } res3: Int = 7 {code} > RDD.fold() requires the operator to be commutative > -- > > Key: SPARK-6416 > URL: https://issues.apache.org/jira/browse/SPARK-6416 > Project: Spark > Issue Type: Bug > Components: Documentation, Spark Core >Reporter: Josh Rosen >Priority: Critical > > Spark's {{RDD.fold}} operation has some confusing behaviors when a > non-commutative reduce function is used. > Here's an example, which was originally reported on StackOverflow > (https://stackoverflow.com/questions/29150202/pyspark-fold-method-output): > {code} > sc.parallelize([1,25,8,4,2]).fold(0,lambda a,b:a+1 ) > 8 > {code} > To understand what's going on here, let's look at the definition of Spark's > `fold` operation. > I'm going to show the Python version of the code, but the Scala version > exhibits the exact same behavior (you can also [browse the source on > GitHub|https://github.com/apache/spark/blob/8cb23a1f9a3ed08e57865bcb6cc1cc7902881073/python/pyspark/rdd.py#L780]: > {code} > def fold(self, zeroValue, op): > """ > Aggregate the elements of each partition, and then the results for all > the partitions, using a given associative function and a neutral "zero > value." > The function C{op(t1, t2)} is allowed to modify C{t1} and return it > as its result value to avoid object allocation; however, it should not > modify C{t2}. > >>> from operator import add > >>> sc.parallelize([1, 2, 3, 4, 5]).fold(0, add) > 15 > """ > def func(iterator): > acc = zeroValue > for obj in iterator: > acc = op(obj, acc) > yield acc > vals = self.mapPartitions(func).collect() > return reduce(op, vals, zeroValue) > {code} > (For comparison, see the [Scala implementation of > `RDD.fold`|https://github.com/apache/spark/blob/8cb23a1f9a3ed08e57865bcb6cc1cc7902881073/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L943]). > Spark's `fold` operates by first folding each partition and then folding the > results. The problem is that an empty partition gets folded down to the zero > element, so the final driver-side fold ends up folding one value for _every_ > partition rather than one value for each _non-empty_ partition. This means > that the result of `fold` is sensitive to the number of partitions: > {code} > >>> sc.parallelize([1,25,8,4,2], 100).fold(0,lambda a,b:a+1 ) > 100 > >>> sc.parallelize([1,25,8,4,2], 50).fold(0,lambda a,b:a+1 ) > 50 > >>> sc.parallelize([1,25,8,4,2], 1).fold(0,lambda a,b:a+1 ) > 1 > {code} > In this last case, what's happening is that the single partition is being > folded down to the correct value, then that value is folded with the > zero-value at the driver to yield 1. > I think the underlying problem here is that our fold() operation implicitly > requires the operator to be commutative in addition to associative, but this > isn't documented anywhere. Due to ordering non-determinism elsewhere in > Spark, such as SPARK-5750, I don't think there's an easy way to fix this. > Therefore, I think we should update the documentation and examples to clarify > this requirement and explain that our fold acts more like a reduce with a > default value than the type of ordering-sensitive fold() that users may > expect in functional languages. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-6416) RDD.fold() requires the operator to be commutative
[ https://issues.apache.org/jira/browse/SPARK-6416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15076367#comment-15076367 ] Mark Hamstra commented on SPARK-6416: - I don't see any reason to change the API wrt `fold`. With operations on RDDs, we generally try to achieve the same semantics as for Scala parallel collections, and that does hold true for `fold`: scala> val list = (1 to 1).toList scala> list.fold(0)(_ + _) res0: Int = 50005000 scala> list.par.fold(0)(_ + _) res1: Int = 50005000 scala> list.fold(1)(_ + _) res2: Int = 50005001 scala> list.par.fold(1)(_ + _) res3: Int = 50005039 If we need to change anything, it would simply be to change our API documentation to more closely match that of the Scala Standard Library, where the first argument to `fold` is described as: "a neutral element for the fold operation, it may be added to the result an arbitrary number of times, not changing the result (e.g. Nil for list concatenation, 0 for addition, or 1 for multiplication)". > RDD.fold() requires the operator to be commutative > -- > > Key: SPARK-6416 > URL: https://issues.apache.org/jira/browse/SPARK-6416 > Project: Spark > Issue Type: Bug > Components: Documentation, Spark Core >Reporter: Josh Rosen >Priority: Critical > > Spark's {{RDD.fold}} operation has some confusing behaviors when a > non-commutative reduce function is used. > Here's an example, which was originally reported on StackOverflow > (https://stackoverflow.com/questions/29150202/pyspark-fold-method-output): > {code} > sc.parallelize([1,25,8,4,2]).fold(0,lambda a,b:a+1 ) > 8 > {code} > To understand what's going on here, let's look at the definition of Spark's > `fold` operation. > I'm going to show the Python version of the code, but the Scala version > exhibits the exact same behavior (you can also [browse the source on > GitHub|https://github.com/apache/spark/blob/8cb23a1f9a3ed08e57865bcb6cc1cc7902881073/python/pyspark/rdd.py#L780]: > {code} > def fold(self, zeroValue, op): > """ > Aggregate the elements of each partition, and then the results for all > the partitions, using a given associative function and a neutral "zero > value." > The function C{op(t1, t2)} is allowed to modify C{t1} and return it > as its result value to avoid object allocation; however, it should not > modify C{t2}. > >>> from operator import add > >>> sc.parallelize([1, 2, 3, 4, 5]).fold(0, add) > 15 > """ > def func(iterator): > acc = zeroValue > for obj in iterator: > acc = op(obj, acc) > yield acc > vals = self.mapPartitions(func).collect() > return reduce(op, vals, zeroValue) > {code} > (For comparison, see the [Scala implementation of > `RDD.fold`|https://github.com/apache/spark/blob/8cb23a1f9a3ed08e57865bcb6cc1cc7902881073/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L943]). > Spark's `fold` operates by first folding each partition and then folding the > results. The problem is that an empty partition gets folded down to the zero > element, so the final driver-side fold ends up folding one value for _every_ > partition rather than one value for each _non-empty_ partition. This means > that the result of `fold` is sensitive to the number of partitions: > {code} > >>> sc.parallelize([1,25,8,4,2], 100).fold(0,lambda a,b:a+1 ) > 100 > >>> sc.parallelize([1,25,8,4,2], 50).fold(0,lambda a,b:a+1 ) > 50 > >>> sc.parallelize([1,25,8,4,2], 1).fold(0,lambda a,b:a+1 ) > 1 > {code} > In this last case, what's happening is that the single partition is being > folded down to the correct value, then that value is folded with the > zero-value at the driver to yield 1. > I think the underlying problem here is that our fold() operation implicitly > requires the operator to be commutative in addition to associative, but this > isn't documented anywhere. Due to ordering non-determinism elsewhere in > Spark, such as SPARK-5750, I don't think there's an easy way to fix this. > Therefore, I think we should update the documentation and examples to clarify > this requirement and explain that our fold acts more like a reduce with a > default value than the type of ordering-sensitive fold() that users may > expect in functional languages. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-12258) Hive Timestamp UDF is binded with '1969-12-31 15:59:59.999999' for null value
[ https://issues.apache.org/jira/browse/SPARK-12258?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mark Hamstra updated SPARK-12258: - Component/s: SQL > Hive Timestamp UDF is binded with '1969-12-31 15:59:59.99' for null value > - > > Key: SPARK-12258 > URL: https://issues.apache.org/jira/browse/SPARK-12258 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.0 >Reporter: Ian > > {code} > test("Timestamp UDF and Null value") { > hiveContext.runSqlHive("CREATE TABLE ts_test (ts TIMESTAMP) STORED AS > TEXTFILE") > hiveContext.runSqlHive("INSERT INTO TABLE ts_test VALUES(Null)") > hiveContext.udf.register("dummy", > (ts: Timestamp) => ts > ) > val result = hiveContext.sql("SELECT dummy(ts) FROM > ts_test").collect().mkString("\n") > assertResult("[null]")(result) > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-11838) Spark SQL query fragment RDD reuse
[ https://issues.apache.org/jira/browse/SPARK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15012724#comment-15012724 ] Mark Hamstra commented on SPARK-11838: -- One significant difference between this and CacheManager is that what is proposed here is caching and reuse of the RDD itself, not the blocks of data computed by that RDD. As Mikhail noted, that can avoid significant amounts of duplicate computation even when nothing is explicitly persisted/cached. > Spark SQL query fragment RDD reuse > -- > > Key: SPARK-11838 > URL: https://issues.apache.org/jira/browse/SPARK-11838 > Project: Spark > Issue Type: Improvement >Reporter: Mikhail Bautin > > With many analytical Spark SQL workloads against slowly changing tables, > successive queries frequently share fragments that produce the same result. > Instead of re-computing those fragments for every query, it makes sense to > detect similar fragments and substitute RDDs previously created for matching > SparkPlan fragments into every new SparkPlan at the execution time whenever > possible. Even if no RDDs are persist()-ed to memory/disk/off-heap memory, > many stages can still be skipped due to map output files being present on > executor nodes. > The implementation involves the following steps: > (1) Logical plan "canonicalization". > Logical plans mapping to the same "canonical" logical plan should always > produce the same results (except for possible output column reordering), > although the inverse statement won't always be true. > - Re-mapping expression ids to "canonical expression ids" (successively > increasing numbers always starting with 1). > - Eliminating alias names that are unimportant after analysis completion. > Only the names that are necessary to determine the Hive table columns to be > scanned are retained. > - Reordering columns in projections, grouping/aggregation expressions, etc. > This can be done e.g. by using the string representation as a sort key. Union > inputs always have to be reordered the same way. > - Tree traversal has to happen starting from leaves and progressing towards > the root, because we need to already have identified canonical expression ids > for children of a node before we can come up with sort keys that would allow > to reorder expressions in a node deterministically. This is a bit more > complicated for Union nodes. > - Special handling for MetastoreRelations. We replace MetastoreRelation > with a special class CanonicalMetastoreRelation that uses attributes and > partitionKeys as part of its equals() and hashCode() implementation, but the > visible attributes and aprtitionKeys are restricted to expression ids that > the rest of the query actually needs from that MetastoreRelation. > An example of logical plans and corresponding canonical logical plans: > https://gist.githubusercontent.com/mbautin/ef1317b341211d9606cf/raw > (2) Tracking LogicalPlan fragments corresponding to SparkPlan fragments. When > generating a SparkPlan, we keep an optional reference to a LogicalPlan > instance in every node. This allows us to populate the cache with mappings > from canonical logical plans of query fragments to the corresponding RDDs > generated as part of query execution. Note that there is no new work > necessary to generate the RDDs, we are merely utilizing the RDDs that would > have been produced as part of SparkPlan execution anyway. > (3) SparkPlan fragment substitution. After generating a SparkPlan and before > calling prepare() or execute() on it, we check if any of its nodes have an > associated LogicalPlan that maps to a canonical logical plan matching a cache > entry. If so, we substitute a PhysicalRDD (or a new class UnsafePhysicalRDD > wrapping an RDD of UnsafeRow) scanning the previously created RDD instead of > the current query fragment. If the expected column order differs from what > the current SparkPlan fragment produces, we add a projection to reorder the > columns. We also add safe/unsafe row conversions as necessary to match the > row type that is expected by the parent of the current SparkPlan fragment. > (4) The execute() method of SparkPlan also needs to perform the cache lookup > and substitution described above before producing a new RDD for the current > SparkPlan node. The "loading cache" pattern (e.g. as implemented in Guava) > allows to reuse query fragments between simultaneously submitted queries: > whichever query runs execute() for a particular fragment's canonical logical > plan starts producing an RDD first, and if another query has a fragment with > the same canonical logical plan, it waits for the RDD to be produced by the > first query and inserts it in its SparkPlan instead. > This kind of query fragment caching will mostly be useful for
[jira] [Commented] (SPARK-11153) Turns off Parquet filter push-down for string and binary columns
[ https://issues.apache.org/jira/browse/SPARK-11153?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15005542#comment-15005542 ] Mark Hamstra commented on SPARK-11153: -- Thanks. > Turns off Parquet filter push-down for string and binary columns > > > Key: SPARK-11153 > URL: https://issues.apache.org/jira/browse/SPARK-11153 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.5.0, 1.5.1 >Reporter: Cheng Lian >Assignee: Cheng Lian >Priority: Blocker > Fix For: 1.5.2, 1.6.0 > > > Due to PARQUET-251, {{BINARY}} columns in existing Parquet files may be > written with corrupted statistics information. This information is used by > filter push-down optimization. Since Spark 1.5 turns on Parquet filter > push-down by default, we may end up with wrong query results. PARQUET-251 has > been fixed in parquet-mr 1.8.1, but Spark 1.5 is still using 1.7.0. > Note that this kind of corrupted Parquet files could be produced by any > Parquet data models. > This affects all Spark SQL data types that can be mapped to Parquet > {{BINARY}}, namely: > - {{StringType}} > - {{BinaryType}} > - {{DecimalType}} (but Spark SQL doesn't support pushing down {{DecimalType}} > columns for now.) > To avoid wrong query results, we should disable filter push-down for columns > of {{StringType}} and {{BinaryType}} until we upgrade to parquet-mr 1.8. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-11153) Turns off Parquet filter push-down for string and binary columns
[ https://issues.apache.org/jira/browse/SPARK-11153?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15004944#comment-15004944 ] Mark Hamstra commented on SPARK-11153: -- Is there a reason why parquet.version hasn't been pushed up in Spark 1.6 and this issue actually fixed instead of just disabling filter push-down for strings and binaries? > Turns off Parquet filter push-down for string and binary columns > > > Key: SPARK-11153 > URL: https://issues.apache.org/jira/browse/SPARK-11153 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.5.0, 1.5.1 >Reporter: Cheng Lian >Assignee: Cheng Lian >Priority: Blocker > Fix For: 1.5.2, 1.6.0 > > > Due to PARQUET-251, {{BINARY}} columns in existing Parquet files may be > written with corrupted statistics information. This information is used by > filter push-down optimization. Since Spark 1.5 turns on Parquet filter > push-down by default, we may end up with wrong query results. PARQUET-251 has > been fixed in parquet-mr 1.8.1, but Spark 1.5 is still using 1.7.0. > Note that this kind of corrupted Parquet files could be produced by any > Parquet data models. > This affects all Spark SQL data types that can be mapped to Parquet > {{BINARY}}, namely: > - {{StringType}} > - {{BinaryType}} > - {{DecimalType}} (but Spark SQL doesn't support pushing down {{DecimalType}} > columns for now.) > To avoid wrong query results, we should disable filter push-down for columns > of {{StringType}} and {{BinaryType}} until we upgrade to parquet-mr 1.8. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-11326) Split networking in standalone mode
[ https://issues.apache.org/jira/browse/SPARK-11326?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14992282#comment-14992282 ] Mark Hamstra commented on SPARK-11326: -- Can someone clarify for me just what is motivating this JIRA and PR? My meta-point is that even though the standalone mode has drifted a fair amount from its originally intended use as a simple, limited cluster deployment that could be quickly and easily setup when all the functionality of Mesos (and later YARN) was not needed, I think that it is still important that limits on its functionality be retained. We don't really want to get into a pattern of continually adding more and more functionality to standalone mode until it becomes a functional rival of Mesos and YARN -- two such full-featured resource managers is already plenty, and I really don't see the advantage of sliding into the maintenance burden of developing a third within Spark itself. So, my questions really boil down to: 1) Where do we intend to draw the line limiting the functionality of standalone mode? and 2) Why does adding stronger security clearly fall on the "include it" side of that line? or Why can't we just tell Spark users who need stronger security to use YARN or Mesos? > Split networking in standalone mode > --- > > Key: SPARK-11326 > URL: https://issues.apache.org/jira/browse/SPARK-11326 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Reporter: Jacek Lewandowski > > h3.The idea > Currently, in standalone mode, all components, for all network connections > need to use the same secure token if they want to have any security ensured. > This ticket is intended to split the communication in standalone mode to make > it more like in Yarn mode - application internal communication, scheduler > internal communication and communication between the client and scheduler. > Such refactoring will allow for the scheduler (master, workers) to use a > distinct secret, which will remain unknown for the users. Similarly, it will > allow for better security in applications, because each application will be > able to use a distinct secret as well. > By providing Kerberos based SASL authentication/encryption for connections > between a client (Client or AppClient) and Spark Master, it will be possible > to introduce authentication and automatic generation of digest tokens and > safe sharing them among the application processes. > h3.User facing changes when running application > h4.General principles: > - conf: {{spark.authenticate.secret}} is *never sent* over the wire > - env: {{SPARK_AUTH_SECRET}} is *never sent* over the wire > - In all situations env variable will overwrite conf variable if present. > - In all situations when a user has to pass secret, it is better (safer) to > do this through env variable > - In work modes with multiple secrets we assume encrypted communication > between client and master, between driver and master, between master and > workers > > h4.Work modes and descriptions > h5.Client mode, single secret > h6.Configuration > - env: {{SPARK_AUTH_SECRET=secret}} or conf: > {{spark.authenticate.secret=secret}} > h6.Description > - The driver is running locally > - The driver will neither send env: {{SPARK_AUTH_SECRET}} nor conf: > {{spark.authenticate.secret}} > - The driver will use either env: {{SPARK_AUTH_SECRET}} or conf: > {{spark.authenticate.secret}} for connection to the master > - _ExecutorRunner_ will not find any secret in _ApplicationDescription_ so it > will look for it in the worker configuration and it will find it there (its > presence is implied). > > h5.Client mode, multiple secrets > h6.Configuration > - env: {{SPARK_APP_AUTH_SECRET=app_secret}} or conf: > {{spark.app.authenticate.secret=secret}} > - env: {{SPARK_SUBMISSION_AUTH_SECRET=scheduler_secret}} or conf: > {{spark.submission.authenticate.secret=scheduler_secret}} > h6.Description > - The driver is running locally > - The driver will use either env: {{SPARK_SUBMISSION_AUTH_SECRET}} or conf: > {{spark.submission.authenticate.secret}} to connect to the master > - The driver will neither send env: {{SPARK_SUBMISSION_AUTH_SECRET}} nor > conf: {{spark.submission.authenticate.secret}} > - The driver will use either {{SPARK_APP_AUTH_SECRET}} or conf: > {{spark.app.authenticate.secret}} for communication with the executors > - The driver will send {{spark.executorEnv.SPARK_AUTH_SECRET=app_secret}} so > that the executors can use it to communicate with the driver > - _ExecutorRunner_ will find that secret in _ApplicationDescription_ and it > will set it in env: {{SPARK_AUTH_SECRET}} which will be read by > _ExecutorBackend_ afterwards and used for all the connections (with driver, > other executors and external shuffle service). > >
[jira] [Commented] (SPARK-11326) Split networking in standalone mode
[ https://issues.apache.org/jira/browse/SPARK-11326?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14992478#comment-14992478 ] Mark Hamstra commented on SPARK-11326: -- [~rspitzer] A focus on reaching feature parity is definitely not where standalone mode started. Both it and Mesos were really begun by [~matei], so he clearly intended standalone mode to be something different from the already existing Mesos cluster manager. Equally clearly, the distinction in purpose between standalone mode and each of the other cluster managers has become blurry. > Split networking in standalone mode > --- > > Key: SPARK-11326 > URL: https://issues.apache.org/jira/browse/SPARK-11326 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Reporter: Jacek Lewandowski > > h3.The idea > Currently, in standalone mode, all components, for all network connections > need to use the same secure token if they want to have any security ensured. > This ticket is intended to split the communication in standalone mode to make > it more like in Yarn mode - application internal communication, scheduler > internal communication and communication between the client and scheduler. > Such refactoring will allow for the scheduler (master, workers) to use a > distinct secret, which will remain unknown for the users. Similarly, it will > allow for better security in applications, because each application will be > able to use a distinct secret as well. > By providing Kerberos based SASL authentication/encryption for connections > between a client (Client or AppClient) and Spark Master, it will be possible > to introduce authentication and automatic generation of digest tokens and > safe sharing them among the application processes. > h3.User facing changes when running application > h4.General principles: > - conf: {{spark.authenticate.secret}} is *never sent* over the wire > - env: {{SPARK_AUTH_SECRET}} is *never sent* over the wire > - In all situations env variable will overwrite conf variable if present. > - In all situations when a user has to pass secret, it is better (safer) to > do this through env variable > - In work modes with multiple secrets we assume encrypted communication > between client and master, between driver and master, between master and > workers > > h4.Work modes and descriptions > h5.Client mode, single secret > h6.Configuration > - env: {{SPARK_AUTH_SECRET=secret}} or conf: > {{spark.authenticate.secret=secret}} > h6.Description > - The driver is running locally > - The driver will neither send env: {{SPARK_AUTH_SECRET}} nor conf: > {{spark.authenticate.secret}} > - The driver will use either env: {{SPARK_AUTH_SECRET}} or conf: > {{spark.authenticate.secret}} for connection to the master > - _ExecutorRunner_ will not find any secret in _ApplicationDescription_ so it > will look for it in the worker configuration and it will find it there (its > presence is implied). > > h5.Client mode, multiple secrets > h6.Configuration > - env: {{SPARK_APP_AUTH_SECRET=app_secret}} or conf: > {{spark.app.authenticate.secret=secret}} > - env: {{SPARK_SUBMISSION_AUTH_SECRET=scheduler_secret}} or conf: > {{spark.submission.authenticate.secret=scheduler_secret}} > h6.Description > - The driver is running locally > - The driver will use either env: {{SPARK_SUBMISSION_AUTH_SECRET}} or conf: > {{spark.submission.authenticate.secret}} to connect to the master > - The driver will neither send env: {{SPARK_SUBMISSION_AUTH_SECRET}} nor > conf: {{spark.submission.authenticate.secret}} > - The driver will use either {{SPARK_APP_AUTH_SECRET}} or conf: > {{spark.app.authenticate.secret}} for communication with the executors > - The driver will send {{spark.executorEnv.SPARK_AUTH_SECRET=app_secret}} so > that the executors can use it to communicate with the driver > - _ExecutorRunner_ will find that secret in _ApplicationDescription_ and it > will set it in env: {{SPARK_AUTH_SECRET}} which will be read by > _ExecutorBackend_ afterwards and used for all the connections (with driver, > other executors and external shuffle service). > > h5.Cluster mode, single secret > h6.Configuration > - env: {{SPARK_AUTH_SECRET=secret}} or conf: > {{spark.authenticate.secret=secret}} > h6.Description > - The driver is run by _DriverRunner_ which is is a part of the worker > - The client will neither send env: {{SPARK_AUTH_SECRET}} nor conf: > {{spark.authenticate.secret}} > - The client will use either env: {{SPARK_AUTH_SECRET}} or conf: > {{spark.authenticate.secret}} for connection to the master and submit the > driver > - _DriverRunner_ will not find any secret in _DriverDescription_ so it will > look for it in the worker configuration and it will find it there (its > presence is implied) > - _DriverRunner_ will
[jira] [Commented] (SPARK-11539) Debian packaging
[ https://issues.apache.org/jira/browse/SPARK-11539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14992601#comment-14992601 ] Mark Hamstra commented on SPARK-11539: -- I agree with [~srowen]. See https://issues.apache.org/jira/browse/SPARK-5727 -- I'm not seeing anything that would lead us to reverse that decision. > Debian packaging > > > Key: SPARK-11539 > URL: https://issues.apache.org/jira/browse/SPARK-11539 > Project: Spark > Issue Type: New Feature > Components: Build >Reporter: Simon Hafner > > This patch adds debian packaging with systemd. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-11326) Split networking in standalone mode
[ https://issues.apache.org/jira/browse/SPARK-11326?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14992468#comment-14992468 ] Mark Hamstra commented on SPARK-11326: -- bq. I do believe that there are quite a lot of people who don't want to setup the whole Yarn or Mesos clusters to achieve basic security And I remain unconvinced, or at least not persuaded that there aren't at least as many who would rather not have the standalone mode be any more complex or any further encumbered with configuration requirements or options. We really need to have a clear architectural direction for standalone mode in order to decide whether these kinds of new features should be added. [~rxin] or [~pwendell], care to take a pass at the larger question? > Split networking in standalone mode > --- > > Key: SPARK-11326 > URL: https://issues.apache.org/jira/browse/SPARK-11326 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Reporter: Jacek Lewandowski > > h3.The idea > Currently, in standalone mode, all components, for all network connections > need to use the same secure token if they want to have any security ensured. > This ticket is intended to split the communication in standalone mode to make > it more like in Yarn mode - application internal communication, scheduler > internal communication and communication between the client and scheduler. > Such refactoring will allow for the scheduler (master, workers) to use a > distinct secret, which will remain unknown for the users. Similarly, it will > allow for better security in applications, because each application will be > able to use a distinct secret as well. > By providing Kerberos based SASL authentication/encryption for connections > between a client (Client or AppClient) and Spark Master, it will be possible > to introduce authentication and automatic generation of digest tokens and > safe sharing them among the application processes. > h3.User facing changes when running application > h4.General principles: > - conf: {{spark.authenticate.secret}} is *never sent* over the wire > - env: {{SPARK_AUTH_SECRET}} is *never sent* over the wire > - In all situations env variable will overwrite conf variable if present. > - In all situations when a user has to pass secret, it is better (safer) to > do this through env variable > - In work modes with multiple secrets we assume encrypted communication > between client and master, between driver and master, between master and > workers > > h4.Work modes and descriptions > h5.Client mode, single secret > h6.Configuration > - env: {{SPARK_AUTH_SECRET=secret}} or conf: > {{spark.authenticate.secret=secret}} > h6.Description > - The driver is running locally > - The driver will neither send env: {{SPARK_AUTH_SECRET}} nor conf: > {{spark.authenticate.secret}} > - The driver will use either env: {{SPARK_AUTH_SECRET}} or conf: > {{spark.authenticate.secret}} for connection to the master > - _ExecutorRunner_ will not find any secret in _ApplicationDescription_ so it > will look for it in the worker configuration and it will find it there (its > presence is implied). > > h5.Client mode, multiple secrets > h6.Configuration > - env: {{SPARK_APP_AUTH_SECRET=app_secret}} or conf: > {{spark.app.authenticate.secret=secret}} > - env: {{SPARK_SUBMISSION_AUTH_SECRET=scheduler_secret}} or conf: > {{spark.submission.authenticate.secret=scheduler_secret}} > h6.Description > - The driver is running locally > - The driver will use either env: {{SPARK_SUBMISSION_AUTH_SECRET}} or conf: > {{spark.submission.authenticate.secret}} to connect to the master > - The driver will neither send env: {{SPARK_SUBMISSION_AUTH_SECRET}} nor > conf: {{spark.submission.authenticate.secret}} > - The driver will use either {{SPARK_APP_AUTH_SECRET}} or conf: > {{spark.app.authenticate.secret}} for communication with the executors > - The driver will send {{spark.executorEnv.SPARK_AUTH_SECRET=app_secret}} so > that the executors can use it to communicate with the driver > - _ExecutorRunner_ will find that secret in _ApplicationDescription_ and it > will set it in env: {{SPARK_AUTH_SECRET}} which will be read by > _ExecutorBackend_ afterwards and used for all the connections (with driver, > other executors and external shuffle service). > > h5.Cluster mode, single secret > h6.Configuration > - env: {{SPARK_AUTH_SECRET=secret}} or conf: > {{spark.authenticate.secret=secret}} > h6.Description > - The driver is run by _DriverRunner_ which is is a part of the worker > - The client will neither send env: {{SPARK_AUTH_SECRET}} nor conf: > {{spark.authenticate.secret}} > - The client will use either env: {{SPARK_AUTH_SECRET}} or conf: > {{spark.authenticate.secret}} for connection to the master and submit the > driver > -
[jira] [Commented] (SPARK-10723) Add RDD.reduceOption method
[ https://issues.apache.org/jira/browse/SPARK-10723?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14900763#comment-14900763 ] Mark Hamstra commented on SPARK-10723: -- Either check `isEmpty` or use `fold` to handle empty RDDs. I'll agree with Sean that we don't need to be adding another method for this. > Add RDD.reduceOption method > --- > > Key: SPARK-10723 > URL: https://issues.apache.org/jira/browse/SPARK-10723 > Project: Spark > Issue Type: Improvement > Components: PySpark, Spark Core >Reporter: Tatsuya Atsumi >Priority: Minor > > h2. Problem > RDD.reduce throws exception if the RDD is empty. > It is appropriate behavior if RDD is expected to be not empty, but if it is > not sure until runtime that the RDD is empty or not, it needs to wrap with > try-catch to call reduce safely. > Example Code > {code} > // This RDD may be empty or not > val rdd: RDD[Int] = originalRdd.filter(_ > 10) > val reduced: Option[Int] = try { > Some(rdd.reduce(_ + _)) > } catch { > // if rdd is empty return None. > case e:UnsupportedOperationException => None > } > {code} > h2. Improvement idea > Scala’s List has reduceOption method, which returns None if List is empty. > If RDD has reduceOption API like Scala’s List, it will become easy to handle > above case. > Example Code > {code} > val reduced: Option[Int] = originalRdd.filter(_ > 10).reduceOption(_ + _) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-10707) Set operation output columns may have incorrect nullability
[ https://issues.apache.org/jira/browse/SPARK-10707?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mark Hamstra reassigned SPARK-10707: Assignee: Mark Hamstra > Set operation output columns may have incorrect nullability > --- > > Key: SPARK-10707 > URL: https://issues.apache.org/jira/browse/SPARK-10707 > Project: Spark > Issue Type: Bug >Reporter: Mikhail Bautin >Assignee: Mark Hamstra > > If "dual" is a one-row table, the query > SELECT count(v) FROM ( > SELECT v FROM ( > SELECT NULL AS v FROM dual > UNION ALL > SELECT 'foo' AS v FROM dual > ) my_union WHERE isnull(v) > ) my_subview; > returns 0 and the same query with union'ed sub-queries switched returns 1: > SELECT count(v) FROM ( > SELECT v FROM ( > SELECT 'foo' AS v FROM dual > UNION ALL > SELECT NULL AS v FROM dual > ) my_union WHERE isnull(v) > ) my_subview; > Example output (with Catalyst tracing turned on): > https://gist.githubusercontent.com/mbautin/c916a2a7ce733d039137/raw > This is caused by the behavior in set operation implementation where only the > nullability of the first child is taken into account: > SetOperation in org.apache.spark.sql.catalyst.plans.logical: > https://github.com/apache/spark/blob/82268f07abfa658869df2354ae72f8d6ddd119e8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala#L93 > Union in org.apache.spark.sql.execution: > https://github.com/apache/spark/blob/e626ac5f5c27dcc74113070f2fec03682bcd12bd/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala#L178 > Intersect in org.apache.spark.sql.execution: > https://github.com/apache/spark/blob/e626ac5f5c27dcc74113070f2fec03682bcd12bd/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala#L320 > Except in org.apache.spark.sql.execution: > https://github.com/apache/spark/blob/e626ac5f5c27dcc74113070f2fec03682bcd12bd/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala#L307 -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-10666) Use properties from ActiveJob associated with a Stage
Mark Hamstra created SPARK-10666: Summary: Use properties from ActiveJob associated with a Stage Key: SPARK-10666 URL: https://issues.apache.org/jira/browse/SPARK-10666 Project: Spark Issue Type: Bug Components: Scheduler, Spark Core Affects Versions: 1.5.0, 1.4.1 Reporter: Mark Hamstra Assignee: Mark Hamstra This issue was addressed in #5494, but the fix in that PR, while safe in the sense that it will prevent the SparkContext from shutting down, misses the actual bug. The intent of submitMissingTasks should be understood as "submit the Tasks that are missing for the Stage, and run them as part of the ActiveJob identified by jobId". Because of a long-standing bug, the jobId parameter was never being used. Instead, we were trying to use the jobId with which the Stage was created -- which may no longer exist as an ActiveJob, hence the crash reported in SPARK-6880. The correct fix is to use the ActiveJob specified by the supplied jobId parameter, which is guaranteed to exist at the call sites of submitMissingTasks. This fix should be applied to all maintenance branches, since it has existed since 1.0. Tasks for a Stage that was previously part of a Job that is no longer active would be re-submitted as though they were part of the prior Job and with no properties set. Since properties are what are used to set an other-than-default scheduling pool, this would affect FAIR scheduler usage, but it would also affect anything else that depends on the settings of the properties (which would be just user code at this point, since Spark itself doesn't really use the properties for anything else other than Job Group and Description, which end up in the WebUI, can be used to kill by JobGroup, etc.) Even the default, FIFO scheduling would be affected, however, since the resubmission of the Tasks under the earlier jobId would effectively give them a higher priority/greater urgency than the ActiveJob that now actually needs them. In any event, the Tasks would generate correct results. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-6880) Spark Shutdowns with NoSuchElementException when running parallel collect on cachedRDD
[ https://issues.apache.org/jira/browse/SPARK-6880?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14803261#comment-14803261 ] Mark Hamstra commented on SPARK-6880: - see SPARK-10666 > Spark Shutdowns with NoSuchElementException when running parallel collect on > cachedRDD > -- > > Key: SPARK-6880 > URL: https://issues.apache.org/jira/browse/SPARK-6880 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 1.2.1 > Environment: CentOs6.0, java7 >Reporter: pankaj arora >Assignee: pankaj arora > Fix For: 1.4.0 > > > Spark Shutdowns with NoSuchElementException when running parallel collect on > cachedRDDs > Below is the stack trace > 15/03/27 11:12:43 ERROR DAGSchedulerActorSupervisor: eventProcesserActor > failed; shutting down SparkContext > java.util.NoSuchElementException: key not found: 28 > at scala.collection.MapLike$class.default(MapLike.scala:228) > at scala.collection.AbstractMap.default(Map.scala:58) > at scala.collection.mutable.HashMap.apply(HashMap.scala:64) > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:808) > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:778) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:781) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:780) > at scala.collection.immutable.List.foreach(List.scala:318) > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:780) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:781) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:780) > at scala.collection.immutable.List.foreach(List.scala:318) > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:780) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:781) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:780) > at scala.collection.immutable.List.foreach(List.scala:318) > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:780) > at > org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:762) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1389) > at akka.actor.Actor$class.aroundReceive(Actor.scala:465) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-6416) RDD.fold() requires the operator to be commutative
[ https://issues.apache.org/jira/browse/SPARK-6416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14554871#comment-14554871 ] Mark Hamstra commented on SPARK-6416: - Why remove it? It's very useful when used correctly. RDD.fold() requires the operator to be commutative -- Key: SPARK-6416 URL: https://issues.apache.org/jira/browse/SPARK-6416 Project: Spark Issue Type: Bug Components: Documentation, Spark Core Reporter: Josh Rosen Priority: Critical Spark's {{RDD.fold}} operation has some confusing behaviors when a non-commutative reduce function is used. Here's an example, which was originally reported on StackOverflow (https://stackoverflow.com/questions/29150202/pyspark-fold-method-output): {code} sc.parallelize([1,25,8,4,2]).fold(0,lambda a,b:a+1 ) 8 {code} To understand what's going on here, let's look at the definition of Spark's `fold` operation. I'm going to show the Python version of the code, but the Scala version exhibits the exact same behavior (you can also [browse the source on GitHub|https://github.com/apache/spark/blob/8cb23a1f9a3ed08e57865bcb6cc1cc7902881073/python/pyspark/rdd.py#L780]: {code} def fold(self, zeroValue, op): Aggregate the elements of each partition, and then the results for all the partitions, using a given associative function and a neutral zero value. The function C{op(t1, t2)} is allowed to modify C{t1} and return it as its result value to avoid object allocation; however, it should not modify C{t2}. from operator import add sc.parallelize([1, 2, 3, 4, 5]).fold(0, add) 15 def func(iterator): acc = zeroValue for obj in iterator: acc = op(obj, acc) yield acc vals = self.mapPartitions(func).collect() return reduce(op, vals, zeroValue) {code} (For comparison, see the [Scala implementation of `RDD.fold`|https://github.com/apache/spark/blob/8cb23a1f9a3ed08e57865bcb6cc1cc7902881073/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L943]). Spark's `fold` operates by first folding each partition and then folding the results. The problem is that an empty partition gets folded down to the zero element, so the final driver-side fold ends up folding one value for _every_ partition rather than one value for each _non-empty_ partition. This means that the result of `fold` is sensitive to the number of partitions: {code} sc.parallelize([1,25,8,4,2], 100).fold(0,lambda a,b:a+1 ) 100 sc.parallelize([1,25,8,4,2], 50).fold(0,lambda a,b:a+1 ) 50 sc.parallelize([1,25,8,4,2], 1).fold(0,lambda a,b:a+1 ) 1 {code} In this last case, what's happening is that the single partition is being folded down to the correct value, then that value is folded with the zero-value at the driver to yield 1. I think the underlying problem here is that our fold() operation implicitly requires the operator to be commutative in addition to associative, but this isn't documented anywhere. Due to ordering non-determinism elsewhere in Spark, such as SPARK-5750, I don't think there's an easy way to fix this. Therefore, I think we should update the documentation and examples to clarify this requirement and explain that our fold acts more like a reduce with a default value than the type of ordering-sensitive fold() that users may expect in functional languages. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-7750) Rename json endpoints to api endpoints
[ https://issues.apache.org/jira/browse/SPARK-7750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14551718#comment-14551718 ] Mark Hamstra commented on SPARK-7750: - Including `@Produces` annotations is also probably a good idea, especially if we are anticipating producing more than one kind of output. Rename json endpoints to api endpoints -- Key: SPARK-7750 URL: https://issues.apache.org/jira/browse/SPARK-7750 Project: Spark Issue Type: Bug Components: Web UI Affects Versions: 1.3.0 Reporter: Hari Shreedharan Priority: Critical Sorry for bringing this in at the last moment, but we should rename the endpoints from address/json/v1... to address/api/v1.. since it is likely we will add API that can do more than return JSON, like being able to download event logs. In that case the current api can be misleading. This is a compatibility issue, so it has to go into 1.4 -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-6880) Spark Shutdowns with NoSuchElementException when running parallel collect on cachedRDD
[ https://issues.apache.org/jira/browse/SPARK-6880?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14551647#comment-14551647 ] Mark Hamstra commented on SPARK-6880: - This fix should also be applied as far back as we care to go in the maintenance branches, since the bug also exists in branch-1.0, branch-1.1, branch-1.2 and branch-1.3. Spark Shutdowns with NoSuchElementException when running parallel collect on cachedRDD -- Key: SPARK-6880 URL: https://issues.apache.org/jira/browse/SPARK-6880 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.2.1 Environment: CentOs6.0, java7 Reporter: pankaj arora Assignee: pankaj arora Fix For: 1.4.0 Spark Shutdowns with NoSuchElementException when running parallel collect on cachedRDDs Below is the stack trace 15/03/27 11:12:43 ERROR DAGSchedulerActorSupervisor: eventProcesserActor failed; shutting down SparkContext java.util.NoSuchElementException: key not found: 28 at scala.collection.MapLike$class.default(MapLike.scala:228) at scala.collection.AbstractMap.default(Map.scala:58) at scala.collection.mutable.HashMap.apply(HashMap.scala:64) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:808) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:778) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:781) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:780) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:780) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:781) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:780) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:780) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:781) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:780) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:780) at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:762) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1389) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-839) Bug in how failed executors are removed by ID from standalone cluster
[ https://issues.apache.org/jira/browse/SPARK-839?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14311734#comment-14311734 ] Mark Hamstra commented on SPARK-839: Fixed long ago. Bug in how failed executors are removed by ID from standalone cluster - Key: SPARK-839 URL: https://issues.apache.org/jira/browse/SPARK-839 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 0.8.0, 0.7.3 Reporter: Mark Hamstra Priority: Critical ClearStory data reported the following issue, where some hashmaps are indexed by executorId and some by appId/executorId, and we use the wrong string to search for an executor: https://github.com/clearstorydata/spark/pull/9. This affects FT on the standalone mode. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Closed] (SPARK-839) Bug in how failed executors are removed by ID from standalone cluster
[ https://issues.apache.org/jira/browse/SPARK-839?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mark Hamstra closed SPARK-839. -- Resolution: Fixed Bug in how failed executors are removed by ID from standalone cluster - Key: SPARK-839 URL: https://issues.apache.org/jira/browse/SPARK-839 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 0.8.0, 0.7.3 Reporter: Mark Hamstra Priority: Critical ClearStory data reported the following issue, where some hashmaps are indexed by executorId and some by appId/executorId, and we use the wrong string to search for an executor: https://github.com/clearstorydata/spark/pull/9. This affects FT on the standalone mode. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-4498) Standalone Master can fail to recognize completed/failed applications
[ https://issues.apache.org/jira/browse/SPARK-4498?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14230910#comment-14230910 ] Mark Hamstra commented on SPARK-4498: - I'd argue against reverting 2425 on the grounds that a long-running application being killed when it is still able to make progress is a worse bug than Executors repeatedly trying to run an application that no longer exists. Either way, it seems to me that the existing logic may not be too far from being right. The flaw simply seems to be that an Executor process starting and successfully connecting to stderr and stdout are necessary but not sufficient conditions for that Executor to be transitioned to RUNNING. If the Executor doesn't become RUNNING until it succeeds in connecting to its Application, then I think the problem is almost entirely and perhaps completely solved. (Although I think SPARK-2424 should also be implemented.) Standalone Master can fail to recognize completed/failed applications - Key: SPARK-4498 URL: https://issues.apache.org/jira/browse/SPARK-4498 Project: Spark Issue Type: Bug Components: Deploy, Spark Core Affects Versions: 1.1.1, 1.2.0 Environment: - Linux dn11.chi.shopify.com 3.2.0-57-generic #87-Ubuntu SMP 3 x86_64 x86_64 x86_64 GNU/Linux - Standalone Spark built from apache/spark#c6e0c2ab1c29c184a9302d23ad75e4ccd8060242 - Python 2.7.3 java version 1.7.0_71 Java(TM) SE Runtime Environment (build 1.7.0_71-b14) Java HotSpot(TM) 64-Bit Server VM (build 24.71-b01, mixed mode) - 1 Spark master, 40 Spark workers with 32 cores a piece and 60-90 GB of memory a piece - All client code is PySpark Reporter: Harry Brundage Priority: Blocker Attachments: all-master-logs-around-blip.txt, one-applications-master-logs.txt We observe the spark standalone master not detecting that a driver application has completed after the driver process has shut down indefinitely, leaving that driver's resources consumed indefinitely. The master reports applications as Running, but the driver process has long since terminated. The master continually spawns one executor for the application. It boots, times out trying to connect to the driver application, and then dies with the exception below. The master then spawns another executor on a different worker, which does the same thing. The application lives until the master (and workers) are restarted. This happens to many jobs at once, all right around the same time, two or three times a day, where they all get suck. Before and after this blip applications start, get resources, finish, and are marked as finished properly. The blip is mostly conjecture on my part, I have no hard evidence that it exists other than my identification of the pattern in the Running Applications table. See http://cl.ly/image/2L383s0e2b3t/Screen%20Shot%202014-11-19%20at%203.43.09%20PM.png : the applications started before the blip at 1.9 hours ago still have active drivers. All the applications started 1.9 hours ago do not, and the applications started less than 1.9 hours ago (at the top of the table) do in fact have active drivers. Deploy mode: - PySpark drivers running on one node outside the cluster, scheduled by a cron-like application, not master supervised Other factoids: - In most places, we call sc.stop() explicitly before shutting down our driver process - Here's the sum total of spark configuration options we don't set to the default: {code} spark.cores.max: 30 spark.eventLog.dir: hdfs://nn.shopify.com:8020/var/spark/event-logs spark.eventLog.enabled: true spark.executor.memory: 7g spark.hadoop.fs.defaultFS: hdfs://nn.shopify.com:8020/ spark.io.compression.codec: lzf spark.ui.killEnabled: true {code} - The exception the executors die with is this: {code} 14/11/19 19:42:37 INFO CoarseGrainedExecutorBackend: Registered signal handlers for [TERM, HUP, INT] 14/11/19 19:42:37 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 14/11/19 19:42:37 INFO SecurityManager: Changing view acls to: spark,azkaban 14/11/19 19:42:37 INFO SecurityManager: Changing modify acls to: spark,azkaban 14/11/19 19:42:37 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(spark, azkaban); users with modify permissions: Set(spark, azkaban) 14/11/19 19:42:37 INFO Slf4jLogger: Slf4jLogger started 14/11/19 19:42:37 INFO Remoting: Starting remoting 14/11/19 19:42:38 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://driverpropsfetc...@dn13.chi.shopify.com:37682] 14/11/19 19:42:38 INFO
[jira] [Commented] (SPARK-4498) Standalone Master can fail to recognize completed/failed applications
[ https://issues.apache.org/jira/browse/SPARK-4498?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14228947#comment-14228947 ] Mark Hamstra commented on SPARK-4498: - On a quick look-through, your analysis looks likely to be correct, [~joshrosen]. Making sure that failed applications are always accompanied by a DisassociatedEvent would be a good thing. The belt-and-suspenders fix would be to also change the executor state-change semantics so that either RUNNING means not just that the executor process is running, but also that it has successfully connected to the application, or else introduce an additional executor state (perhaps REGISTERED) along with state transitions and finer-grained state logic controlling executor restart and application removal. Standalone Master can fail to recognize completed/failed applications - Key: SPARK-4498 URL: https://issues.apache.org/jira/browse/SPARK-4498 Project: Spark Issue Type: Bug Components: Deploy, Spark Core Affects Versions: 1.1.1, 1.2.0 Environment: - Linux dn11.chi.shopify.com 3.2.0-57-generic #87-Ubuntu SMP 3 x86_64 x86_64 x86_64 GNU/Linux - Standalone Spark built from apache/spark#c6e0c2ab1c29c184a9302d23ad75e4ccd8060242 - Python 2.7.3 java version 1.7.0_71 Java(TM) SE Runtime Environment (build 1.7.0_71-b14) Java HotSpot(TM) 64-Bit Server VM (build 24.71-b01, mixed mode) - 1 Spark master, 40 Spark workers with 32 cores a piece and 60-90 GB of memory a piece - All client code is PySpark Reporter: Harry Brundage Priority: Blocker Attachments: all-master-logs-around-blip.txt, one-applications-master-logs.txt We observe the spark standalone master not detecting that a driver application has completed after the driver process has shut down indefinitely, leaving that driver's resources consumed indefinitely. The master reports applications as Running, but the driver process has long since terminated. The master continually spawns one executor for the application. It boots, times out trying to connect to the driver application, and then dies with the exception below. The master then spawns another executor on a different worker, which does the same thing. The application lives until the master (and workers) are restarted. This happens to many jobs at once, all right around the same time, two or three times a day, where they all get suck. Before and after this blip applications start, get resources, finish, and are marked as finished properly. The blip is mostly conjecture on my part, I have no hard evidence that it exists other than my identification of the pattern in the Running Applications table. See http://cl.ly/image/2L383s0e2b3t/Screen%20Shot%202014-11-19%20at%203.43.09%20PM.png : the applications started before the blip at 1.9 hours ago still have active drivers. All the applications started 1.9 hours ago do not, and the applications started less than 1.9 hours ago (at the top of the table) do in fact have active drivers. Deploy mode: - PySpark drivers running on one node outside the cluster, scheduled by a cron-like application, not master supervised Other factoids: - In most places, we call sc.stop() explicitly before shutting down our driver process - Here's the sum total of spark configuration options we don't set to the default: {code} spark.cores.max: 30 spark.eventLog.dir: hdfs://nn.shopify.com:8020/var/spark/event-logs spark.eventLog.enabled: true spark.executor.memory: 7g spark.hadoop.fs.defaultFS: hdfs://nn.shopify.com:8020/ spark.io.compression.codec: lzf spark.ui.killEnabled: true {code} - The exception the executors die with is this: {code} 14/11/19 19:42:37 INFO CoarseGrainedExecutorBackend: Registered signal handlers for [TERM, HUP, INT] 14/11/19 19:42:37 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 14/11/19 19:42:37 INFO SecurityManager: Changing view acls to: spark,azkaban 14/11/19 19:42:37 INFO SecurityManager: Changing modify acls to: spark,azkaban 14/11/19 19:42:37 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(spark, azkaban); users with modify permissions: Set(spark, azkaban) 14/11/19 19:42:37 INFO Slf4jLogger: Slf4jLogger started 14/11/19 19:42:37 INFO Remoting: Starting remoting 14/11/19 19:42:38 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://driverpropsfetc...@dn13.chi.shopify.com:37682] 14/11/19 19:42:38 INFO Utils: Successfully started service 'driverPropsFetcher' on port 37682. 14/11/19 19:42:38 WARN Remoting: Tried to associate with unreachable
[jira] [Closed] (SPARK-4473) [Core] StageInfo should have ActiveJob's group ID as a field
[ https://issues.apache.org/jira/browse/SPARK-4473?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mark Hamstra closed SPARK-4473. --- Resolution: Duplicate [Core] StageInfo should have ActiveJob's group ID as a field Key: SPARK-4473 URL: https://issues.apache.org/jira/browse/SPARK-4473 Project: Spark Issue Type: Bug Components: Spark Core Reporter: Aniket Bhatnagar Priority: Minor It would be convenient to have active job's group ID in StageInfo so that JobProgressListener can be used to track specific job/jobs in a group. Perhaps, stage's group ID can also be shown in default Spark's UI. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-4473) [Core] StageInfo should have ActiveJob's group ID as a field
[ https://issues.apache.org/jira/browse/SPARK-4473?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14218057#comment-14218057 ] Mark Hamstra commented on SPARK-4473: - This is already covered by https://issues.apache.org/jira/browse/SPARK-2321 and https://issues.apache.org/jira/browse/SPARK-4145 Please add any additional commentary to those issues or their associated pull requests. [Core] StageInfo should have ActiveJob's group ID as a field Key: SPARK-4473 URL: https://issues.apache.org/jira/browse/SPARK-4473 Project: Spark Issue Type: Bug Components: Spark Core Reporter: Aniket Bhatnagar Priority: Minor It would be convenient to have active job's group ID in StageInfo so that JobProgressListener can be used to track specific job/jobs in a group. Perhaps, stage's group ID can also be shown in default Spark's UI. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-4436) Debian packaging misses datanucleus jars
Mark Hamstra created SPARK-4436: --- Summary: Debian packaging misses datanucleus jars Key: SPARK-4436 URL: https://issues.apache.org/jira/browse/SPARK-4436 Project: Spark Issue Type: Bug Components: Build Affects Versions: 1.1.0, 1.0.1, 1.0.0 Reporter: Mark Hamstra Assignee: Mark Hamstra Priority: Minor If Spark is built with HIve support (i.e. -Phive), then the necessary datanucleus jars end up in lib_managed, not as part of the uber-jar. The debian packaging isn't including anything from lib_managed. As a consequence, HiveContext et al. will fail with the packaged Spark even though it was built with -Phive. see comment in bin/compute-classpath.sh Packaging everything from lib_managed/jars into spark/lib is an adequate solution. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-4436) Debian packaging misses datanucleus jars
[ https://issues.apache.org/jira/browse/SPARK-4436?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mark Hamstra updated SPARK-4436: Description: If Spark is built with Hive support (i.e. -Phive), then the necessary datanucleus jars end up in lib_managed, not as part of the uber-jar. The debian packaging isn't including anything from lib_managed. As a consequence, HiveContext et al. will fail with the packaged Spark even though it was built with -Phive. see comment in bin/compute-classpath.sh Packaging everything from lib_managed/jars into spark/lib is an adequate solution. was: If Spark is built with HIve support (i.e. -Phive), then the necessary datanucleus jars end up in lib_managed, not as part of the uber-jar. The debian packaging isn't including anything from lib_managed. As a consequence, HiveContext et al. will fail with the packaged Spark even though it was built with -Phive. see comment in bin/compute-classpath.sh Packaging everything from lib_managed/jars into spark/lib is an adequate solution. Debian packaging misses datanucleus jars Key: SPARK-4436 URL: https://issues.apache.org/jira/browse/SPARK-4436 Project: Spark Issue Type: Bug Components: Build Affects Versions: 1.0.0, 1.0.1, 1.1.0 Reporter: Mark Hamstra Assignee: Mark Hamstra Priority: Minor If Spark is built with Hive support (i.e. -Phive), then the necessary datanucleus jars end up in lib_managed, not as part of the uber-jar. The debian packaging isn't including anything from lib_managed. As a consequence, HiveContext et al. will fail with the packaged Spark even though it was built with -Phive. see comment in bin/compute-classpath.sh Packaging everything from lib_managed/jars into spark/lib is an adequate solution. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-4428) Use ${scala.binary.version} property for artifactId.
[ https://issues.apache.org/jira/browse/SPARK-4428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14213664#comment-14213664 ] Mark Hamstra commented on SPARK-4428: - This is not a bug, nor is it a major issue, nor is parameterizing artifactId's in this way permissible. This is a Won't Fix. Use ${scala.binary.version} property for artifactId. Key: SPARK-4428 URL: https://issues.apache.org/jira/browse/SPARK-4428 Project: Spark Issue Type: Bug Components: Build Reporter: Takuya Ueshin -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-4428) Use ${scala.binary.version} property for artifactId.
[ https://issues.apache.org/jira/browse/SPARK-4428?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mark Hamstra resolved SPARK-4428. - Resolution: Won't Fix Use ${scala.binary.version} property for artifactId. Key: SPARK-4428 URL: https://issues.apache.org/jira/browse/SPARK-4428 Project: Spark Issue Type: Bug Components: Build Reporter: Takuya Ueshin -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-2321) Design a proper progress reporting event listener API
[ https://issues.apache.org/jira/browse/SPARK-2321?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14143814#comment-14143814 ] Mark Hamstra commented on SPARK-2321: - Which would be kind of the opposite half of the SparkListenerJobStart event, which includes an array of the StageIds in a Job. I included that way back when as a suggestion of at least some of what might be needed to implement better job-based progress reporting. I'd have to look, but I don't believe anything is actually using that stage-reporting on JobStart right now. In any event, any proper progress reporting should rationalize, extend or eliminate that part of SparkListenerJobStart. Design a proper progress reporting event listener API --- Key: SPARK-2321 URL: https://issues.apache.org/jira/browse/SPARK-2321 Project: Spark Issue Type: Improvement Components: Java API, Spark Core Affects Versions: 1.0.0 Reporter: Reynold Xin Assignee: Josh Rosen Priority: Critical This is a ticket to track progress on redesigning the SparkListener and JobProgressListener API. There are multiple problems with the current design, including: 0. I'm not sure if the API is usable in Java (there are at least some enums we used in Scala and a bunch of case classes that might complicate things). 1. The whole API is marked as DeveloperApi, because we haven't paid a lot of attention to it yet. Something as important as progress reporting deserves a more stable API. 2. There is no easy way to connect jobs with stages. Similarly, there is no easy way to connect job groups with jobs / stages. 3. JobProgressListener itself has no encapsulation at all. States can be arbitrarily mutated by external programs. Variable names are sort of randomly decided and inconsistent. We should just revisit these and propose a new, concrete design. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-1021) sortByKey() launches a cluster job when it shouldn't
[ https://issues.apache.org/jira/browse/SPARK-1021?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mark Hamstra updated SPARK-1021: Assignee: Erik Erlandson (was: Mark Hamstra) sortByKey() launches a cluster job when it shouldn't Key: SPARK-1021 URL: https://issues.apache.org/jira/browse/SPARK-1021 Project: Spark Issue Type: Sub-task Components: Spark Core Affects Versions: 0.8.0, 0.9.0, 1.0.0, 1.1.0 Reporter: Andrew Ash Assignee: Erik Erlandson Labels: starter The sortByKey() method is listed as a transformation, not an action, in the documentation. But it launches a cluster job regardless. http://spark.incubator.apache.org/docs/latest/scala-programming-guide.html Some discussion on the mailing list suggested that this is a problem with the rdd.count() call inside Partitioner.scala's rangeBounds method. https://github.com/apache/incubator-spark/blob/master/core/src/main/scala/org/apache/spark/Partitioner.scala#L102 Josh Rosen suggests that rangeBounds should be made into a lazy variable: {quote} I wonder whether making RangePartitoner .rangeBounds into a lazy val would fix this (https://github.com/apache/incubator-spark/blob/6169fe14a140146602fb07cfcd13eee6efad98f9/core/src/main/scala/org/apache/spark/Partitioner.scala#L95). We'd need to make sure that rangeBounds() is never called before an action is performed. This could be tricky because it's called in the RangePartitioner.equals() method. Maybe it's sufficient to just compare the number of partitions, the ids of the RDDs used to create the RangePartitioner, and the sort ordering. This still supports the case where I range-partition one RDD and pass the same partitioner to a different RDD. It breaks support for the case where two range partitioners created on different RDDs happened to have the same rangeBounds(), but it seems unlikely that this would really harm performance since it's probably unlikely that the range partitioners are equal by chance. {quote} Can we please make this happen? I'll send a PR on GitHub to start the discussion and testing. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3289) Avoid job failures due to rescheduling of failing tasks on buggy machines
[ https://issues.apache.org/jira/browse/SPARK-3289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14114611#comment-14114611 ] Mark Hamstra commented on SPARK-3289: - https://github.com/apache/spark/pull/1360 Avoid job failures due to rescheduling of failing tasks on buggy machines - Key: SPARK-3289 URL: https://issues.apache.org/jira/browse/SPARK-3289 Project: Spark Issue Type: Bug Components: Spark Core Reporter: Josh Rosen Some users have reported issues where a task fails due to an environment / configuration issue on some machine, then the task is reattempted _on that same buggy machine_ until the entire job failures because that single task has failed too many times. To guard against this, maybe we should add some randomization in how we reschedule failed tasks. -- This message was sent by Atlassian JIRA (v6.2#6252) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-2714) DAGScheduler logs jobid when runJob finishes
[ https://issues.apache.org/jira/browse/SPARK-2714?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mark Hamstra updated SPARK-2714: Issue Type: Improvement (was: Documentation) DAGScheduler logs jobid when runJob finishes Key: SPARK-2714 URL: https://issues.apache.org/jira/browse/SPARK-2714 Project: Spark Issue Type: Improvement Components: Spark Core Reporter: YanTang Zhai Priority: Minor DAGScheduler logs jobid when runJob finishes -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-1860) Standalone Worker cleanup should not clean up running executors
[ https://issues.apache.org/jira/browse/SPARK-1860?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14076778#comment-14076778 ] Mark Hamstra commented on SPARK-1860: - I don't think that there is much in the way of conflict, but something to be aware of is that the proposed fix to SPARK-2425 does modify Executor state transitions and cleanup: https://github.com/apache/spark/pull/1360 Standalone Worker cleanup should not clean up running executors --- Key: SPARK-1860 URL: https://issues.apache.org/jira/browse/SPARK-1860 Project: Spark Issue Type: Bug Components: Deploy Affects Versions: 1.0.0 Reporter: Aaron Davidson Priority: Critical Fix For: 1.1.0 The default values of the standalone worker cleanup code cleanup all application data every 7 days. This includes jars that were added to any executors that happen to be running for longer than 7 days, hitting streaming jobs especially hard. Executor's log/data folders should not be cleaned up if they're still running. Until then, this behavior should not be enabled by default. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-1812) Support cross-building with Scala 2.11
[ https://issues.apache.org/jira/browse/SPARK-1812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14075176#comment-14075176 ] Mark Hamstra commented on SPARK-1812: - FWIW scalatest can be pushed to 2.2.0 without any pain, and 2.2 does add some nice new functionality. Support cross-building with Scala 2.11 -- Key: SPARK-1812 URL: https://issues.apache.org/jira/browse/SPARK-1812 Project: Spark Issue Type: New Feature Components: Build, Spark Core Reporter: Matei Zaharia Assignee: Prashant Sharma Since Scala 2.10/2.11 are source compatible, we should be able to cross build for both versions. From what I understand there are basically three things we need to figure out: 1. Have a two versions of our dependency graph, one that uses 2.11 dependencies and the other that uses 2.10 dependencies. 2. Figure out how to publish different poms for 2.10 and 2.11. I think (1) can be accomplished by having a scala 2.11 profile. (2) isn't really well supported by Maven since published pom's aren't generated dynamically. But we can probably script around it to make it work. I've done some initial sanity checks with a simple build here: https://github.com/pwendell/scala-maven-crossbuild -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-2614) Add the spark-examples-xxx-.jar to the Debian package created by assembly/pom.xml (e.g. -Pdeb)
[ https://issues.apache.org/jira/browse/SPARK-2614?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14070908#comment-14070908 ] Mark Hamstra commented on SPARK-2614: - It's also common for installers/admins to not want all of the examples installed on their production machines. This issue is really touching on the fact that Spark's Debian packaging isn't presently everything that it could be. It's really just a hack to allow deployment tools like Chef to be able to manage Spark. To really do Debian packaging right, we should be creating multiple packages -- perhaps spark-core, spark-sql, spark-mllib, spark-graphx, spark-examples, etc. The only thing that is really preventing us from doing this is that proper Debian packaging would require a package maintainer willing and committed to do all of the maintenance work. Add the spark-examples-xxx-.jar to the Debian package created by assembly/pom.xml (e.g. -Pdeb) -- Key: SPARK-2614 URL: https://issues.apache.org/jira/browse/SPARK-2614 Project: Spark Issue Type: Improvement Components: Build, Deploy Reporter: Christian Tzolov The tar.gz distribution includes already the spark-examples.jar in the bundle. It is a common practice for installers to run SparkPi as a smoke test to verify that the installation is OK /usr/share/spark/bin/spark-submit \ --num-executors 10 --master yarn-cluster \ --class org.apache.spark.examples.SparkPi \ /usr/share/spark/jars/spark-examples-1.0.1-hadoop2.2.0.jar 10 -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-2568) RangePartitioner should go through the data only once
[ https://issues.apache.org/jira/browse/SPARK-2568?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14066553#comment-14066553 ] Mark Hamstra commented on SPARK-2568: - Sure, if they can be cleanly separated -- but there's also interaction with the ShuffleManager refactoring. Do you have some strategy in mind for addressing just SPARK-2568 in isolation? RangePartitioner should go through the data only once - Key: SPARK-2568 URL: https://issues.apache.org/jira/browse/SPARK-2568 Project: Spark Issue Type: Bug Affects Versions: 1.0.0 Reporter: Reynold Xin Assignee: Xiangrui Meng As of Spark 1.0, RangePartitioner goes through data twice: once to compute the count and once to do sampling. As a result, to do sortByKey, Spark goes through data 3 times (once to count, once to sample, and once to sort). RangePartitioner should go through data only once (remove the count step). -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Resolved] (SPARK-2158) FileAppenderSuite is not cleaning up after itself
[ https://issues.apache.org/jira/browse/SPARK-2158?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mark Hamstra resolved SPARK-2158. - Resolution: Fixed FileAppenderSuite is not cleaning up after itself - Key: SPARK-2158 URL: https://issues.apache.org/jira/browse/SPARK-2158 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.1.0 Reporter: Mark Hamstra Assignee: Mark Hamstra Priority: Trivial Fix For: 1.1.0 FileAppenderSuite is leaving behind the file core/stdout -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-2158) FileAppenderSuite is not cleaning up after itself
[ https://issues.apache.org/jira/browse/SPARK-2158?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14060201#comment-14060201 ] Mark Hamstra commented on SPARK-2158: - This is fixed at 4cb33a83e0 from https://github.com/apache/spark/pull/1100 FileAppenderSuite is not cleaning up after itself - Key: SPARK-2158 URL: https://issues.apache.org/jira/browse/SPARK-2158 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.1.0 Reporter: Mark Hamstra Assignee: Mark Hamstra Priority: Trivial Fix For: 1.1.0 FileAppenderSuite is leaving behind the file core/stdout -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Created] (SPARK-2424) ApplicationState.MAX_NUM_RETRY should be configurable
Mark Hamstra created SPARK-2424: --- Summary: ApplicationState.MAX_NUM_RETRY should be configurable Key: SPARK-2424 URL: https://issues.apache.org/jira/browse/SPARK-2424 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 1.0.0 Reporter: Mark Hamstra ApplicationState.MAX_NUM_RETRY, controlling the number of times standalone Executors can exit unsuccessfully before Master will remove the Application that the Executors are trying to run, is currently hard-coded to 10. There's no reason why this should be a single, fixed value for all standalone clusters (e.g., it should probably scale with the number of Executors), so it should be SparkConf-able. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Created] (SPARK-2425) Standalone Master is too aggressive in removing Applications
Mark Hamstra created SPARK-2425: --- Summary: Standalone Master is too aggressive in removing Applications Key: SPARK-2425 URL: https://issues.apache.org/jira/browse/SPARK-2425 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.0.0 Reporter: Mark Hamstra Assignee: Mark Hamstra When standalone Executors trying to run a particular Application fail a cummulative ApplicationState.MAX_NUM_RETRY times, Master will remove the Application. This will be true even if there actually are a number of Executors that are successfully running the Application. This makes long-running standalone-mode Applications in particular unnecessarily vulnerable to limited failures in the cluster -- e.g., a single bad node on which Executors repeatedly fail for any reason can prevent an Application from starting or can result in a running Application being removed even though it could continue to run successfully (just not making use of all potential Workers and Executors.) -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-2198) Partition the scala build file so that it is easier to maintain
[ https://issues.apache.org/jira/browse/SPARK-2198?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14037431#comment-14037431 ] Mark Hamstra commented on SPARK-2198: - While this is an admirable goal, I'm afraid that hand editing the SBT build files won't be a very durable solution. That is because it is currently our goal to consolidate the Maven and SBT builds by deriving the SBT build configuration from the Maven POMs: https://issues.apache.org/jira/browse/SPARK-1776. As such, any partitioning of the SBT build file will really need to be incorporated into the code that is generating that file from the Maven input. Partition the scala build file so that it is easier to maintain --- Key: SPARK-2198 URL: https://issues.apache.org/jira/browse/SPARK-2198 Project: Spark Issue Type: Task Components: Build Reporter: Helena Edelson Priority: Minor Original Estimate: 3h Remaining Estimate: 3h Partition to standard Dependencies, Version, Settings, Publish.scala. keeping the SparkBuild clean to describe the modules and their deps so that changes in versions, for example, need only be made in Version.scala, settings changes such as in scalac in Settings.scala, etc. I'd be happy to do this ([~helena_e]) -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Updated] (SPARK-2126) Move MapOutputTracker behind ShuffleManager interface
[ https://issues.apache.org/jira/browse/SPARK-2126?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mark Hamstra updated SPARK-2126: Assignee: Nan Zhu Move MapOutputTracker behind ShuffleManager interface - Key: SPARK-2126 URL: https://issues.apache.org/jira/browse/SPARK-2126 Project: Spark Issue Type: Sub-task Components: Shuffle, Spark Core Reporter: Matei Zaharia Assignee: Nan Zhu This will require changing the interface between the DAGScheduler and MapOutputTracker to be method calls on the ShuffleManager instead. However, it will make it easier to do push-based shuffle and other ideas requiring changes to map output tracking. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-1201) Do not materialize partitions whenever possible in BlockManager
[ https://issues.apache.org/jira/browse/SPARK-1201?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14031257#comment-14031257 ] Mark Hamstra commented on SPARK-1201: - What causes this to not be fixable within the scope of 1.0.1? Do not materialize partitions whenever possible in BlockManager --- Key: SPARK-1201 URL: https://issues.apache.org/jira/browse/SPARK-1201 Project: Spark Issue Type: New Feature Components: Block Manager, Spark Core Reporter: Patrick Wendell Assignee: Andrew Or Fix For: 1.1.0 This is a slightly more complex version of SPARK-942 where we try to avoid unrolling iterators in other situations where it is possible. SPARK-942 focused on the case where the DISK_ONLY storage level was used. There are other cases though, such as if data is stored serialized and in memory and but there is not enough memory left to store the RDD. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-1201) Do not materialize partitions whenever possible in BlockManager
[ https://issues.apache.org/jira/browse/SPARK-1201?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14031426#comment-14031426 ] Mark Hamstra commented on SPARK-1201: - Okay, but my question is really whether resolution of this issue will require new API that will exclude it from consideration from 1.0.x or whether this will be just implementation details that can be considered a bug fix and included in the maintenance branch. Do not materialize partitions whenever possible in BlockManager --- Key: SPARK-1201 URL: https://issues.apache.org/jira/browse/SPARK-1201 Project: Spark Issue Type: New Feature Components: Block Manager, Spark Core Reporter: Patrick Wendell Assignee: Andrew Or Fix For: 1.1.0 This is a slightly more complex version of SPARK-942 where we try to avoid unrolling iterators in other situations where it is possible. SPARK-942 focused on the case where the DISK_ONLY storage level was used. There are other cases though, such as if data is stored serialized and in memory and but there is not enough memory left to store the RDD. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-2019) Spark workers die/disappear when job fails for nearly any reason
[ https://issues.apache.org/jira/browse/SPARK-2019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14017776#comment-14017776 ] Mark Hamstra commented on SPARK-2019: - Please don't leave the Affects Version/s selector on None. As with the SO question, is this an issue that you are seeing with Spark 0.9.0? If so, then the version of Spark that you are using is significantly out of date even on the 0.9 branch. Several bug fixes are present in the 0.9.1 release of Spark, which has been available for almost two months. There are a few more in the current 0.9.2-SNAPSHOT code, and many more in the recent 1.0.0 release. Spark workers die/disappear when job fails for nearly any reason Key: SPARK-2019 URL: https://issues.apache.org/jira/browse/SPARK-2019 Project: Spark Issue Type: Bug Reporter: sam We either have to reboot all the nodes, or run 'sudo service spark-worker restart' across our cluster. I don't think this should happen - the job failures are often not even that bad. There is a 5 upvoted SO question here: http://stackoverflow.com/questions/22031006/spark-0-9-0-worker-keeps-dying-in-standalone-mode-when-job-fails We shouldn't be giving restart privileges to our devs, and therefore our sysadm has to frequently restart the workers. When the sysadm is not around, there is nothing our devs can do. Many thanks -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Updated] (SPARK-2019) Spark workers die/disappear when job fails for nearly any reason
[ https://issues.apache.org/jira/browse/SPARK-2019?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mark Hamstra updated SPARK-2019: Affects Version/s: 0.9.1 Spark workers die/disappear when job fails for nearly any reason Key: SPARK-2019 URL: https://issues.apache.org/jira/browse/SPARK-2019 Project: Spark Issue Type: Bug Affects Versions: 0.9.1 Reporter: sam We either have to reboot all the nodes, or run 'sudo service spark-worker restart' across our cluster. I don't think this should happen - the job failures are often not even that bad. There is a 5 upvoted SO question here: http://stackoverflow.com/questions/22031006/spark-0-9-0-worker-keeps-dying-in-standalone-mode-when-job-fails We shouldn't be giving restart privileges to our devs, and therefore our sysadm has to frequently restart the workers. When the sysadm is not around, there is nothing our devs can do. Many thanks -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-983) Support external sorting for RDD#sortByKey()
[ https://issues.apache.org/jira/browse/SPARK-983?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14015671#comment-14015671 ] Mark Hamstra commented on SPARK-983: Is that code visible someplace? In broad outline, it seems similar to the approach I'm anticipating taking to address SPARK-1021 -- and maybe I'll get a chance to actually do some work on that later this week. Support external sorting for RDD#sortByKey() Key: SPARK-983 URL: https://issues.apache.org/jira/browse/SPARK-983 Project: Spark Issue Type: New Feature Affects Versions: 0.9.0 Reporter: Reynold Xin Assignee: Madhu Siddalingaiah Currently, RDD#sortByKey() is implemented by a mapPartitions which creates a buffer to hold the entire partition, then sorts it. This will cause an OOM if an entire partition cannot fit in memory, which is especially problematic for skewed data. Rather than OOMing, the behavior should be similar to the [ExternalAppendOnlyMap|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala], where we fallback to disk if we detect memory pressure. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Updated] (SPARK-1973) Add randomSplit to JavaRDD (with tests, and tidy Java tests)
[ https://issues.apache.org/jira/browse/SPARK-1973?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mark Hamstra updated SPARK-1973: Fix Version/s: 1.1.0 Add randomSplit to JavaRDD (with tests, and tidy Java tests) Key: SPARK-1973 URL: https://issues.apache.org/jira/browse/SPARK-1973 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 1.0.0 Reporter: Sean Owen Priority: Minor Fix For: 1.1.0 I'd like to use randomSplit through the Java API, and would like to add a convenience wrapper for this method to JavaRDD. This is fairly trivial. (In fact, is the intent that JavaRDD not wrap every RDD method? and that sometimes users should just use JavaRDD.wrapRDD()?) Along the way, I added tests for it, and also touched up the Java API test style and behavior. This is maybe the more useful part of this small change. -- This message was sent by Atlassian JIRA (v6.2#6252)