[jira] [Commented] (SPARK-10816) EventTime based sessionization
[ https://issues.apache.org/jira/browse/SPARK-10816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16646177#comment-16646177 ] Li Yuanjian commented on SPARK-10816: - Thanks [~zsxwing] for your comment and discussion, great thanks [~kabhwan] for the comparing work and sorry for the inactivity, the team back to work in this week, we are also comparing our approach and try HWX's patch internal, hope we could solve this problem together. {quote}If I read the codes correctly, [https://github.com/apache/spark/pull/22583] is [1]. [https://github.com/apache/spark/pull/22482] is a combination of [2] and [3] but still need to load all values of a key into the memory at the same time. {quote} Yes, our approach is [1], will do the comparing in the design doc. We firstly choose this approach mainly consider performance and simple code too. {quote}Since Baidu’s patch supports Complete mode and Append mode, I ran benchmark with Append mode while comparing HWX’s patch with Baidu’s patch. While Baidu’s patch cannot keep up input rate 200 (it showed max processed rows per second as around 130), HWX’s patch (APPEND mode) can keep the input rate around 23000. (Initial input rate was 1000 but Baidu’s patch got very slowed with consistently showing "Reached spill threshold of 4096 rows, switching to org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter".) {quote} As [discussion|https://docs.google.com/document/d/1hdh6GNLzprzlSJDDa4UKMyNQ9-u_H-CDpEtvtggxCL0/edit?disco=COOWOas], the bug of useless shuffle has been fixed. We'll run the benchmark and share some result to your doc. > EventTime based sessionization > -- > > Key: SPARK-10816 > URL: https://issues.apache.org/jira/browse/SPARK-10816 > Project: Spark > Issue Type: New Feature > Components: Structured Streaming >Reporter: Reynold Xin >Priority: Major > Attachments: SPARK-10816 Support session window natively.pdf, Session > Window Support For Structure Streaming.pdf > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-10816) EventTime based sessionization
[ https://issues.apache.org/jira/browse/SPARK-10816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16631751#comment-16631751 ] Li Yuanjian commented on SPARK-10816: - Design doc: [https://docs.google.com/document/d/1zeAc7QKSO7J4-Yk06kc76kvldl-QHLCDJuu04d7k2bg/edit?usp=sharing] PR: [https://github.com/apache/spark/pull/22583] With a roughly checking with [~kabhwan] post doc and pr, we share several spots in design and implement, hope we can resolve this problem together! > EventTime based sessionization > -- > > Key: SPARK-10816 > URL: https://issues.apache.org/jira/browse/SPARK-10816 > Project: Spark > Issue Type: New Feature > Components: Structured Streaming >Reporter: Reynold Xin >Priority: Major > Attachments: SPARK-10816 Support session window natively.pdf, Session > Window Support For Structure Streaming.pdf > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-10816) EventTime based sessionization
[ https://issues.apache.org/jira/browse/SPARK-10816?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Li Yuanjian updated SPARK-10816: Attachment: Session Window Support For Structure Streaming.pdf > EventTime based sessionization > -- > > Key: SPARK-10816 > URL: https://issues.apache.org/jira/browse/SPARK-10816 > Project: Spark > Issue Type: New Feature > Components: Structured Streaming >Reporter: Reynold Xin >Priority: Major > Attachments: SPARK-10816 Support session window natively.pdf, Session > Window Support For Structure Streaming.pdf > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-10816) EventTime based sessionization
[ https://issues.apache.org/jira/browse/SPARK-10816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16631398#comment-16631398 ] Li Yuanjian commented on SPARK-10816: - Great thanks for [~kabhwan] notice me, just linked SPARK-22565 as duplicated with this, sorry for just searching "session window" before and lost this, will still find others duplicated jira. As discussed in SPARK-22565, we also meet this problem while doing the migration of streaming app running on other system to Structure Streaming. We solved this by implement the session window as build-in function and gave internal beta version based on Apache Spark 2.3.0 just week ago. After steady running online for real product env, we are doing the code clean work and doc translating. As discussed with Jungtaek, we also wished to join the discussion here and will give PR and design doc today. The preview pr I'll submit contains others patch. cc [~liulinhong] [~ivoson] [~yanlin-Lynn] [~LiangchangZ] , please watching this issue. > EventTime based sessionization > -- > > Key: SPARK-10816 > URL: https://issues.apache.org/jira/browse/SPARK-10816 > Project: Spark > Issue Type: New Feature > Components: Structured Streaming >Reporter: Reynold Xin >Priority: Major > Attachments: SPARK-10816 Support session window natively.pdf > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22565) Session-based windowing
[ https://issues.apache.org/jira/browse/SPARK-22565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16631370#comment-16631370 ] Li Yuanjian commented on SPARK-22565: - [~kabhwan] Great thanks for noticing me, sorry for only searched the "session window" and missed SPARK-10816. {quote} It would be nice if you could also share the SPIP, as well as some PR or design doc, so that we could see spots on making co-work and get better product. {quote} No problem, I'll cherry-pick all related patch from internal folk, and actually we are translating the internal doc for few days, will also post a design doc today, let discuss in SPARK-10816. Thanks again for your reply. > Session-based windowing > --- > > Key: SPARK-22565 > URL: https://issues.apache.org/jira/browse/SPARK-22565 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.2.0 >Reporter: Richard Xin >Priority: Major > Attachments: screenshot-1.png > > > I came across a requirement to support session-based windowing. for example, > user activity comes in from kafka, we want to create window per user session > (if the time gap of activity from the same user exceeds the predefined value, > a new window will be created). > I noticed that Flink does support this kind of support, any plan/schedule for > spark for this? -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-22565) Session-based windowing
[ https://issues.apache.org/jira/browse/SPARK-22565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16631298#comment-16631298 ] Li Yuanjian edited comment on SPARK-22565 at 9/28/18 3:23 AM: -- Thanks for reporting this. Actually we also met this problem in our usage, we have an implement about session window in internal folk to resolve this. After steady running online for real product env, we want to contribute to community within the next few days. We implemented this by a build-in function named session_window and corresponding support for window merge in Structure Streaming. The usage of dataframe api and SQL can be quickly browsing by the test: !screenshot-1.png! was (Author: xuanyuan): Thanks for reporting this. Actually we also met this problem in our usage, we have an implement about session window in internal folk to resolve this. After steady running online for real product env, we want to contribute to community within the next few days. We implemented this by a build-in function named session_window. The usage of dataframe api and SQL can be quickly browsing by the test: !screenshot-1.png! > Session-based windowing > --- > > Key: SPARK-22565 > URL: https://issues.apache.org/jira/browse/SPARK-22565 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.2.0 >Reporter: Richard Xin >Priority: Major > Attachments: screenshot-1.png > > > I came across a requirement to support session-based windowing. for example, > user activity comes in from kafka, we want to create window per user session > (if the time gap of activity from the same user exceeds the predefined value, > a new window will be created). > I noticed that Flink does support this kind of support, any plan/schedule for > spark for this? -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22565) Session-based windowing
[ https://issues.apache.org/jira/browse/SPARK-22565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16631300#comment-16631300 ] Li Yuanjian commented on SPARK-22565: - Also cc [~zsxwing] [~tdas], we are translating the design doc and will post a SPIP in these days, hope you can have a look when you have time, thanks :) > Session-based windowing > --- > > Key: SPARK-22565 > URL: https://issues.apache.org/jira/browse/SPARK-22565 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.2.0 >Reporter: Richard Xin >Priority: Major > Attachments: screenshot-1.png > > > I came across a requirement to support session-based windowing. for example, > user activity comes in from kafka, we want to create window per user session > (if the time gap of activity from the same user exceeds the predefined value, > a new window will be created). > I noticed that Flink does support this kind of support, any plan/schedule for > spark for this? -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22565) Session-based windowing
[ https://issues.apache.org/jira/browse/SPARK-22565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16631298#comment-16631298 ] Li Yuanjian commented on SPARK-22565: - Thanks for reporting this. Actually we also met this problem in our usage, we have an implement about session window in internal folk to resolve this. After steady running online for real product env, we want to contribute to community within the next few days. We implemented this by a build-in function named session_window. The usage of dataframe api and SQL can be quickly browsing by the test: !screenshot-1.png! > Session-based windowing > --- > > Key: SPARK-22565 > URL: https://issues.apache.org/jira/browse/SPARK-22565 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.2.0 >Reporter: Richard Xin >Priority: Major > Attachments: screenshot-1.png > > > I came across a requirement to support session-based windowing. for example, > user activity comes in from kafka, we want to create window per user session > (if the time gap of activity from the same user exceeds the predefined value, > a new window will be created). > I noticed that Flink does support this kind of support, any plan/schedule for > spark for this? -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-22565) Session-based windowing
[ https://issues.apache.org/jira/browse/SPARK-22565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Li Yuanjian updated SPARK-22565: Attachment: screenshot-1.png > Session-based windowing > --- > > Key: SPARK-22565 > URL: https://issues.apache.org/jira/browse/SPARK-22565 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.2.0 >Reporter: Richard Xin >Priority: Major > Attachments: screenshot-1.png > > > I came across a requirement to support session-based windowing. for example, > user activity comes in from kafka, we want to create window per user session > (if the time gap of activity from the same user exceeds the predefined value, > a new window will be created). > I noticed that Flink does support this kind of support, any plan/schedule for > spark for this? -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-25527) Job stuck waiting for last stage to start
[ https://issues.apache.org/jira/browse/SPARK-25527?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16627593#comment-16627593 ] Li Yuanjian commented on SPARK-25527: - {quote} There are no Tasks waiting for completion, and the job just hangs. {quote} You can check the driver thread dump at this time, generally speaking driver is doing some heavy work like commit in this scenario. We need more clues to confirm this is a bug. > Job stuck waiting for last stage to start > - > > Key: SPARK-25527 > URL: https://issues.apache.org/jira/browse/SPARK-25527 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.1.0 >Reporter: Ran Haim >Priority: Major > > Sometimes it can somehow happen that a job is stuck waiting for the last > stage to start. > There are no Tasks waiting for completion, and the job just hangs. > There are available Executors for the job to run. > I do not know how to reproduce this, all I know is that it happens randomly > after couple days of hard load. > Another thing that might help is that it seems to happen when some tasks fail > because one or more executors killed (due to memory issues or something). > Those tasks eventually do get finished by other executors because of retries, > but the next stage hangs. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24499) Documentation improvement of Spark core and SQL
[ https://issues.apache.org/jira/browse/SPARK-24499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16625061#comment-16625061 ] Li Yuanjian commented on SPARK-24499: - [~smilegator] Sorry for the late reply for this, we'll give the first PR about page split in 2 weeks. We also looked into the currently doc, maybe the second PR for this is related about the usage of PipeRDD, it is widely used during our migration work from Hadoop to Spark and maybe the currently doc not mentioned to much. > Documentation improvement of Spark core and SQL > --- > > Key: SPARK-24499 > URL: https://issues.apache.org/jira/browse/SPARK-24499 > Project: Spark > Issue Type: New Feature > Components: Documentation, Spark Core, SQL >Affects Versions: 2.3.0 >Reporter: Xiao Li >Priority: Major > > The current documentation in Apache Spark lacks enough code examples and > tips. If needed, we should also split the page of > https://spark.apache.org/docs/latest/sql-programming-guide.html to multiple > separate pages like what we did for > https://spark.apache.org/docs/latest/ml-guide.html -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-25426) Remove the duplicate fallback logic in UnsafeProjection
[ https://issues.apache.org/jira/browse/SPARK-25426?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16617587#comment-16617587 ] Li Yuanjian commented on SPARK-25426: - Resolved by https://github.com/apache/spark/pull/22417. > Remove the duplicate fallback logic in UnsafeProjection > --- > > Key: SPARK-25426 > URL: https://issues.apache.org/jira/browse/SPARK-25426 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.1 >Reporter: Takeshi Yamamuro >Assignee: Takeshi Yamamuro >Priority: Minor > Fix For: 2.5.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23128) A new approach to do adaptive execution in Spark SQL
[ https://issues.apache.org/jira/browse/SPARK-23128?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16587348#comment-16587348 ] Li Yuanjian commented on SPARK-23128: - Thanks for your comment [~xinyao]. {quote} If I understand it correctly, one of the goals of this patch is to solve the data skew by splitting large partition into small ones. Have seen any significant improvement in this area? {quote} Yes you're right, the strategy of handling data skew in join do have significant improvement but not in my report, there's other companies has more practice on this strategy. > A new approach to do adaptive execution in Spark SQL > > > Key: SPARK-23128 > URL: https://issues.apache.org/jira/browse/SPARK-23128 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.0 >Reporter: Carson Wang >Priority: Major > Attachments: AdaptiveExecutioninBaidu.pdf > > > SPARK-9850 proposed the basic idea of adaptive execution in Spark. In > DAGScheduler, a new API is added to support submitting a single map stage. > The current implementation of adaptive execution in Spark SQL supports > changing the reducer number at runtime. An Exchange coordinator is used to > determine the number of post-shuffle partitions for a stage that needs to > fetch shuffle data from one or multiple stages. The current implementation > adds ExchangeCoordinator while we are adding Exchanges. However there are > some limitations. First, it may cause additional shuffles that may decrease > the performance. We can see this from EnsureRequirements rule when it adds > ExchangeCoordinator. Secondly, it is not a good idea to add > ExchangeCoordinators while we are adding Exchanges because we don’t have a > global picture of all shuffle dependencies of a post-shuffle stage. I.e. for > 3 tables’ join in a single stage, the same ExchangeCoordinator should be used > in three Exchanges but currently two separated ExchangeCoordinator will be > added. Thirdly, with the current framework it is not easy to implement other > features in adaptive execution flexibly like changing the execution plan and > handling skewed join at runtime. > We'd like to introduce a new way to do adaptive execution in Spark SQL and > address the limitations. The idea is described at > [https://docs.google.com/document/d/1mpVjvQZRAkD-Ggy6-hcjXtBPiQoVbZGe3dLnAKgtJ4k/edit?usp=sharing] -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-25050) Handle more than two types in avro union types
[ https://issues.apache.org/jira/browse/SPARK-25050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16587326#comment-16587326 ] Li Yuanjian commented on SPARK-25050: - Has more than tow types in avro union types been supported in SchemaConverters here: [SchemaConverters.scala+98|https://github.com/apache/spark/blob/4fb96e5105cec4a3eb19a2b7997600b086bac32f/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala#L98-L116]? If I'm wrong please let me know, thanks for your checking :) > Handle more than two types in avro union types > -- > > Key: SPARK-25050 > URL: https://issues.apache.org/jira/browse/SPARK-25050 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0 >Reporter: DB Tsai >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-25072) PySpark custom Row class can be given extra parameters
[ https://issues.apache.org/jira/browse/SPARK-25072?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16584705#comment-16584705 ] Li Yuanjian commented on SPARK-25072: - Interesting issue, but maybe this only in PySpark, Scala Row will not have this problem. I'll give a swiftly fix for this. > PySpark custom Row class can be given extra parameters > -- > > Key: SPARK-25072 > URL: https://issues.apache.org/jira/browse/SPARK-25072 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.2.0 > Environment: {noformat} > SPARK_MAJOR_VERSION is set to 2, using Spark2 > Python 3.4.5 (default, Dec 11 2017, 16:57:19) > Type 'copyright', 'credits' or 'license' for more information > IPython 6.2.1 -- An enhanced Interactive Python. Type '?' for help. > Using Spark's default log4j profile: > org/apache/spark/log4j-defaults.properties > Setting default log level to "WARN". > To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use > setLogLevel(newLevel). > 18/08/01 04:49:16 WARN NativeCodeLoader: Unable to load native-hadoop library > for your platform... using builtin-java classes where applicable > 18/08/01 04:49:17 WARN Utils: Service 'SparkUI' could not bind on port 4040. > Attempting port 4041. > 18/08/01 04:49:27 WARN ObjectStore: Failed to get database global_temp, > returning NoSuchObjectException > Welcome to > __ > / __/__ ___ _/ /__ > _\ \/ _ \/ _ `/ __/ '_/ >/__ / .__/\_,_/_/ /_/\_\ version 2.2.0 > /_/ > Using Python version 3.4.5 (default, Dec 11 2017 16:57:19) > SparkSession available as 'spark'. > {noformat} > {{CentOS release 6.9 (Final)}} > {{Linux sandbox-hdp.hortonworks.com 4.14.0-1.el7.elrepo.x86_64 #1 SMP Sun Nov > 12 20:21:04 EST 2017 x86_64 x86_64 x86_64 GNU/Linux}} > {noformat}openjdk version "1.8.0_161" > OpenJDK Runtime Environment (build 1.8.0_161-b14) > OpenJDK 64-Bit Server VM (build 25.161-b14, mixed mode){noformat} >Reporter: Jan-Willem van der Sijp >Priority: Minor > > When a custom Row class is made in PySpark, it is possible to provide the > constructor of this class with more parameters than there are columns. These > extra parameters affect the value of the Row, but are not part of the > {{repr}} or {{str}} output, making it hard to debug errors due to these > "invisible" values. The hidden values can be accessed through integer-based > indexing though. > Some examples: > {code:python} > In [69]: RowClass = Row("column1", "column2") > In [70]: RowClass(1, 2) == RowClass(1, 2) > Out[70]: True > In [71]: RowClass(1, 2) == RowClass(1, 2, 3) > Out[71]: False > In [75]: RowClass(1, 2, 3) > Out[75]: Row(column1=1, column2=2) > In [76]: RowClass(1, 2) > Out[76]: Row(column1=1, column2=2) > In [77]: RowClass(1, 2, 3).asDict() > Out[77]: {'column1': 1, 'column2': 2} > In [78]: RowClass(1, 2, 3)[2] > Out[78]: 3 > In [79]: repr(RowClass(1, 2, 3)) > Out[79]: 'Row(column1=1, column2=2)' > In [80]: str(RowClass(1, 2, 3)) > Out[80]: 'Row(column1=1, column2=2)' > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-25104) Validate user specified output schema
[ https://issues.apache.org/jira/browse/SPARK-25104?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Li Yuanjian updated SPARK-25104: Description: With code changes in [https://github.com/apache/spark/pull/21847|https://github.com/apache/spark/pull/21847] , Spark can write out data as per user provided output schema. To make it more robust and user friendly, we should validate the Avro schema before tasks launched. Also we should support output logical decimal type as BYTES (By default we output as FIXED) was: With code changes in [https://github.com/apache/spark/pull/21847|https://github.com/apache/spark/pull/21847,] , Spark can write out data as per user provided output schema. To make it more robust and user friendly, we should validate the Avro schema before tasks launched. Also we should support output logical decimal type as BYTES (By default we output as FIXED) > Validate user specified output schema > - > > Key: SPARK-25104 > URL: https://issues.apache.org/jira/browse/SPARK-25104 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.4.0 >Reporter: Gengliang Wang >Assignee: Gengliang Wang >Priority: Major > Fix For: 2.4.0 > > > With code changes in > [https://github.com/apache/spark/pull/21847|https://github.com/apache/spark/pull/21847] > , Spark can write out data as per user provided output schema. > To make it more robust and user friendly, we should validate the Avro schema > before tasks launched. > Also we should support output logical decimal type as BYTES (By default we > output as FIXED) -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-25100) Using KryoSerializer and setting registrationRequired true can lead job failed
[ https://issues.apache.org/jira/browse/SPARK-25100?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Li Yuanjian updated SPARK-25100: Description: When spark.serializer is `org.apache.spark.serializer.KryoSerializer` and `spark.kryo.registrationRequired` is true in SparkConf. I invoked saveAsNewAPIHadoopDataset to store data in hdfs. The job will fail because the class TaskCommitMessage hasn't be registered. {code:java} java.lang.IllegalArgumentException: Class is not registered: org.apache.spark.internal.io.FileCommitProtocol$TaskCommitMessage Note: To register this class use: kryo.register(org.apache.spark.internal.io.FileCommitProtocol$TaskCommitMessage.class); at com.esotericsoftware.kryo.Kryo.getRegistration(Kryo.java:488) at com.twitter.chill.KryoBase.getRegistration(KryoBase.scala:52) at com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:97) at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:517) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:622) at org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:347) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:393) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) {code} was: When spark.serializer is org.apache.spark.serializer.KryoSerializer and spark.kryo.registrationRequired is true in SparkCOnf. I invoked saveAsNewAPIHadoopDataset to store data in hdfs. The job will fail because the class TaskCommitMessage hasn't be registered. {code:java} java.lang.IllegalArgumentException: Class is not registered: org.apache.spark.internal.io.FileCommitProtocol$TaskCommitMessage Note: To register this class use: kryo.register(org.apache.spark.internal.io.FileCommitProtocol$TaskCommitMessage.class); at com.esotericsoftware.kryo.Kryo.getRegistration(Kryo.java:488) at com.twitter.chill.KryoBase.getRegistration(KryoBase.scala:52) at com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:97) at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:517) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:622) at org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:347) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:393) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) {code} > Using KryoSerializer and setting registrationRequired true can lead job failed > -- > > Key: SPARK-25100 > URL: https://issues.apache.org/jira/browse/SPARK-25100 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.1 >Reporter: deshanxiao >Priority: Major > > When spark.serializer is `org.apache.spark.serializer.KryoSerializer` and > `spark.kryo.registrationRequired` is true in SparkConf. I invoked > saveAsNewAPIHadoopDataset to store data in hdfs. The job will fail because > the class TaskCommitMessage hasn't be registered. > > {code:java} > java.lang.IllegalArgumentException: Class is not registered: > org.apache.spark.internal.io.FileCommitProtocol$TaskCommitMessage > Note: To register this class use: > kryo.register(org.apache.spark.internal.io.FileCommitProtocol$TaskCommitMessage.class); > at com.esotericsoftware.kryo.Kryo.getRegistration(Kryo.java:488) > at com.twitter.chill.KryoBase.getRegistration(KryoBase.scala:52) > at > com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:97) > at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:517) > at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:622) > at > org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:347) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:393) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-25077) Delete unused variable in WindowExec
Li Yuanjian created SPARK-25077: --- Summary: Delete unused variable in WindowExec Key: SPARK-25077 URL: https://issues.apache.org/jira/browse/SPARK-25077 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 2.3.0 Reporter: Li Yuanjian Delete the unused variable `inputFields` in WindowExec, avoid making others confused while reading the code. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-24989) BlockFetcher should retry while getting OutOfDirectMemoryError
[ https://issues.apache.org/jira/browse/SPARK-24989?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Li Yuanjian resolved SPARK-24989. - Resolution: Not A Problem The param `spark.reducer.maxBlocksInFlightPerAddress` added in SPARK-21243 can solve this problem, close this jira. Thanks [~Dhruve Ashar] ! > BlockFetcher should retry while getting OutOfDirectMemoryError > -- > > Key: SPARK-24989 > URL: https://issues.apache.org/jira/browse/SPARK-24989 > Project: Spark > Issue Type: Improvement > Components: Shuffle >Affects Versions: 2.2.0 >Reporter: Li Yuanjian >Priority: Major > Attachments: FailedStage.png > > > h3. Description > This problem can be reproduced stably by a large parallelism job migrate from > map reduce to Spark in our practice, some metrics list below: > ||Item||Value|| > |spark.executor.instances|1000| > |spark.executor.cores|5| > |task number of shuffle writer stage|18038| > |task number of shuffle reader stage|8| > While the shuffle writer stage successful ended, the shuffle reader stage > starting and keep failing by FetchFail. Each fetch request need the netty > sever allocate a buffer in 16MB(detailed stack attached below), the huge > amount of fetch request will use up default maxDirectMemory rapidly, even > though we bump up io.netty.maxDirectMemory to 50GB! > {code:java} > org.apache.spark.shuffle.FetchFailedException: failed to allocate 16777216 > byte(s) of direct memory (used: 21474836480, max: 21474836480) > at > org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:514) > at > org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:445) > at > org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:61) > at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) > at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at > org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) > at > org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) > at > org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:199) > at > org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:119) > at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:105) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) > at org.apache.spark.scheduler.Task.run(Task.scala:108) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: io.netty.util.internal.OutOfDirectMemoryError: failed to allocate > 16777216 byte(s) of direct memory (use
[jira] [Updated] (SPARK-24989) BlockFetcher should retry while getting OutOfDirectMemoryError
[ https://issues.apache.org/jira/browse/SPARK-24989?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Li Yuanjian updated SPARK-24989: Attachment: FailedStage.png > BlockFetcher should retry while getting OutOfDirectMemoryError > -- > > Key: SPARK-24989 > URL: https://issues.apache.org/jira/browse/SPARK-24989 > Project: Spark > Issue Type: Improvement > Components: Shuffle >Affects Versions: 2.2.0 >Reporter: Li Yuanjian >Priority: Major > Attachments: FailedStage.png > > > h3. Description > This problem can be reproduced stably by a large parallelism job migrate from > map reduce to Spark in our practice, some metrics list below: > ||Item||Value|| > |spark.executor.instances|1000| > |spark.executor.cores|5| > |task number of shuffle writer stage|18038| > |task number of shuffle reader stage|8| > While the shuffle writer stage successful ended, the shuffle reader stage > starting and keep failing by FetchFail. Each fetch request need the netty > sever allocate a buffer in 16MB(detailed stack attached below), the huge > amount of fetch request will use up default maxDirectMemory rapidly, even > though we bump up io.netty.maxDirectMemory to 50GB! > {code:java} > org.apache.spark.shuffle.FetchFailedException: failed to allocate 16777216 > byte(s) of direct memory (used: 21474836480, max: 21474836480) > at > org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:514) > at > org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:445) > at > org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:61) > at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) > at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at > org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) > at > org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) > at > org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:199) > at > org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:119) > at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:105) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) > at org.apache.spark.scheduler.Task.run(Task.scala:108) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: io.netty.util.internal.OutOfDirectMemoryError: failed to allocate > 16777216 byte(s) of direct memory (used: 21474836480, max: 21474836480) > at > io.netty.util.internal.PlatformDependent.incrementMemoryCounter(PlatformDependent.java:530) >
[jira] [Created] (SPARK-24989) BlockFetcher should retry while getting OutOfDirectMemoryError
Li Yuanjian created SPARK-24989: --- Summary: BlockFetcher should retry while getting OutOfDirectMemoryError Key: SPARK-24989 URL: https://issues.apache.org/jira/browse/SPARK-24989 Project: Spark Issue Type: Improvement Components: Shuffle Affects Versions: 2.2.0 Reporter: Li Yuanjian h3. Description This problem can be reproduced stably by a large parallelism job migrate from map reduce to Spark in our practice, some metrics list below: ||Item||Value|| |spark.executor.instances|1000| |spark.executor.cores|5| |task number of shuffle writer stage|18038| |task number of shuffle reader stage|8| While the shuffle writer stage successful ended, the shuffle reader stage starting and keep failing by FetchFail. Each fetch request need the netty sever allocate a buffer in 16MB(detailed stack attached below), the huge amount of fetch request will use up default maxDirectMemory rapidly, even though we bump up io.netty.maxDirectMemory to 50GB! {code:java} org.apache.spark.shuffle.FetchFailedException: failed to allocate 16777216 byte(s) of direct memory (used: 21474836480, max: 21474836480) at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:514) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:445) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:61) at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:199) at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:119) at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:105) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:108) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: io.netty.util.internal.OutOfDirectMemoryError: failed to allocate 16777216 byte(s) of direct memory (used: 21474836480, max: 21474836480) at io.netty.util.internal.PlatformDependent.incrementMemoryCounter(PlatformDependent.java:530) at io.netty.util.internal.PlatformDependent.allocateDirectNoCleaner(PlatformDependent.java:484) at io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:711) at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:700) at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:237) at io
[jira] [Commented] (SPARK-24340) Clean up non-shuffle disk block manager files following executor death
[ https://issues.apache.org/jira/browse/SPARK-24340?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16551687#comment-16551687 ] Li Yuanjian commented on SPARK-24340: - cc [~jiangxb1987] I think this was resolved by your pr 21390, we need change the status? > Clean up non-shuffle disk block manager files following executor death > -- > > Key: SPARK-24340 > URL: https://issues.apache.org/jira/browse/SPARK-24340 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.4.0 >Reporter: Jiang Xingbo >Priority: Major > > Currently we only clean up local folders on application removed, and we don't > clean up non-shuffle files, such as temp. shuffle blocks, cached > RDD/broadcast blocks, spill files, etc. and this can cause disk space leaks > when executors periodically die and are replaced. > To avoid this source of disk space leak, we can clean up executor disk store > files except for shuffle index and data files on executor finished. > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23128) A new approach to do adaptive execution in Spark SQL
[ https://issues.apache.org/jira/browse/SPARK-23128?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16551638#comment-16551638 ] Li Yuanjian commented on SPARK-23128: - [~tgraves] Thanks for your comment, as far as I know [~carsonwang] are still working on this and porting the patch to spark 2.3. Also this patch has been used by several team in their internal product env, hope this can be reviewed soon. > A new approach to do adaptive execution in Spark SQL > > > Key: SPARK-23128 > URL: https://issues.apache.org/jira/browse/SPARK-23128 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.0 >Reporter: Carson Wang >Priority: Major > Attachments: AdaptiveExecutioninBaidu.pdf > > > SPARK-9850 proposed the basic idea of adaptive execution in Spark. In > DAGScheduler, a new API is added to support submitting a single map stage. > The current implementation of adaptive execution in Spark SQL supports > changing the reducer number at runtime. An Exchange coordinator is used to > determine the number of post-shuffle partitions for a stage that needs to > fetch shuffle data from one or multiple stages. The current implementation > adds ExchangeCoordinator while we are adding Exchanges. However there are > some limitations. First, it may cause additional shuffles that may decrease > the performance. We can see this from EnsureRequirements rule when it adds > ExchangeCoordinator. Secondly, it is not a good idea to add > ExchangeCoordinators while we are adding Exchanges because we don’t have a > global picture of all shuffle dependencies of a post-shuffle stage. I.e. for > 3 tables’ join in a single stage, the same ExchangeCoordinator should be used > in three Exchanges but currently two separated ExchangeCoordinator will be > added. Thirdly, with the current framework it is not easy to implement other > features in adaptive execution flexibly like changing the execution plan and > handling skewed join at runtime. > We'd like to introduce a new way to do adaptive execution in Spark SQL and > address the limitations. The idea is described at > [https://docs.google.com/document/d/1mpVjvQZRAkD-Ggy6-hcjXtBPiQoVbZGe3dLnAKgtJ4k/edit?usp=sharing] -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24295) Purge Structured streaming FileStreamSinkLog metadata compact file data.
[ https://issues.apache.org/jira/browse/SPARK-24295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16547873#comment-16547873 ] Li Yuanjian commented on SPARK-24295: - Thanks for your detailed explain. You can check this: SPARK-17604, seems like the same requirements about purging the compact aged file. The small difference is we need the purge logic in FileStreamSinkLog while the jira support in FileSourceSinkLog, but I think the strategy can be reused. Also cc the original author [~jerryshao2015] for SPARK-17604. > Purge Structured streaming FileStreamSinkLog metadata compact file data. > > > Key: SPARK-24295 > URL: https://issues.apache.org/jira/browse/SPARK-24295 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.3.0 >Reporter: Iqbal Singh >Priority: Major > > FileStreamSinkLog metadata logs are concatenated to a single compact file > after defined compact interval. > For long running jobs, compact file size can grow up to 10's of GB's, Causing > slowness while reading the data from FileStreamSinkLog dir as spark is > defaulting to the "__spark__metadata" dir for the read. > We need a functionality to purge the compact file size. > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24295) Purge Structured streaming FileStreamSinkLog metadata compact file data.
[ https://issues.apache.org/jira/browse/SPARK-24295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16544453#comment-16544453 ] Li Yuanjian commented on SPARK-24295: - Could you give more detailed information about how the compact file size growing up to 10GB in your scenario? As the implementation of FileStreamSinkLog, batches in compactInterval(default value is 10) will be merged into a single file, all the content in this file is serialized SinkFileStatus, it seems hardly can grow to 10GB. > Purge Structured streaming FileStreamSinkLog metadata compact file data. > > > Key: SPARK-24295 > URL: https://issues.apache.org/jira/browse/SPARK-24295 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.3.0 >Reporter: Iqbal Singh >Priority: Major > > FileStreamSinkLog metadata logs are concatenated to a single compact file > after defined compact interval. > For long running jobs, compact file size can grow up to 10's of GB's, Causing > slowness while reading the data from FileStreamSinkLog dir as spark is > defaulting to the "__spark__metadata" dir for the read. > We need a functionality to purge the compact file size. > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24755) Executor loss can cause task to not be resubmitted
[ https://issues.apache.org/jira/browse/SPARK-24755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16536107#comment-16536107 ] Li Yuanjian commented on SPARK-24755: - No problem, thanks [~hthuynh2]. Thanks [~mridulm80] for letting me know, I'll also watch this jira. > Executor loss can cause task to not be resubmitted > -- > > Key: SPARK-24755 > URL: https://issues.apache.org/jira/browse/SPARK-24755 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.0 >Reporter: Mridul Muralidharan >Priority: Major > > As part of SPARK-22074, when an executor is lost, TSM.executorLost currently > checks for "if (successful(index) && !killedByOtherAttempt(index))" to decide > if task needs to be resubmitted for partition. > Consider following: > For partition P1, tasks T1 and T2 are running on exec-1 and exec-2 > respectively (one of them being speculative task) > T1 finishes successfully first. > This results in setting "killedByOtherAttempt(P1) = true" due to running T2. > We also end up killing task T2. > Now, exec-1 if/when goes MIA. > executorLost will no longer schedule task for P1 - since > killedByOtherAttempt(P1) == true; even though P1 was hosted on T1 and there > is no other copy of P1 around (T2 was killed when T1 succeeded). > I noticed this bug as part of reviewing PR# 21653 for SPARK-13343 > Essentially, SPARK-22074 causes a regression (which I dont usually observe > due to shuffle service, sigh) - and as such the fix is broken IMO. > I dont have a PR handy for this, so if anyone wants to pick it up, please do > feel free ! > +CC [~XuanYuan] who fixed SPARK-22074 initially. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-24665) Add SQLConf in PySpark to manage all sql configs
Li Yuanjian created SPARK-24665: --- Summary: Add SQLConf in PySpark to manage all sql configs Key: SPARK-24665 URL: https://issues.apache.org/jira/browse/SPARK-24665 Project: Spark Issue Type: Improvement Components: PySpark Affects Versions: 2.3.0 Reporter: Li Yuanjian With new config adding in PySpark, we currently get them by hard coding the config name and default value. We should move all the configs into a Class like what we did in Spark SQL Conf. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24630) SPIP: Support SQLStreaming in Spark
[ https://issues.apache.org/jira/browse/SPARK-24630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16523064#comment-16523064 ] Li Yuanjian commented on SPARK-24630: - cc [~zsxwing] and [~tdas] We have some practice over SQLStreaming as [~Jackey Lee] described in SPIP doc, but maybe there's two points need more discussion: 1. The "STREAM" key words in SQL grammar 2. Combining with Hive table is enough in internal usage, but maybe we should integrate with Datasoure V2 for stronger generality > SPIP: Support SQLStreaming in Spark > --- > > Key: SPARK-24630 > URL: https://issues.apache.org/jira/browse/SPARK-24630 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.2.0, 2.2.1 >Reporter: Jackey Lee >Priority: Minor > Labels: SQLStreaming > Attachments: SQLStreaming SPIP.pdf > > > At present, KafkaSQL, Flink SQL(which is actually based on Calcite), > SQLStream, StormSQL all provide a stream type SQL interface, with which users > with little knowledge about streaming, can easily develop a flow system > processing model. In Spark, we can also support SQL API based on > StructStreamig. > To support for SQL Streaming, there are two key points: > 1, Analysis should be able to parse streaming type SQL. > 2, Analyzer should be able to map metadata information to the corresponding > Relation. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24499) Documentation improvement of Spark core and SQL
[ https://issues.apache.org/jira/browse/SPARK-24499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16506285#comment-16506285 ] Li Yuanjian commented on SPARK-24499: - No problem, thanks for ping me, our pleasure. We'll collect and translate some internal user demos and also some hints during MR job migrate to Spark. > Documentation improvement of Spark core and SQL > --- > > Key: SPARK-24499 > URL: https://issues.apache.org/jira/browse/SPARK-24499 > Project: Spark > Issue Type: New Feature > Components: Documentation, Spark Core, SQL >Affects Versions: 2.3.0 >Reporter: Xiao Li >Priority: Major > > The current documentation in Apache Spark lacks enough code examples and > tips. If needed, we should also split the page of > https://spark.apache.org/docs/latest/sql-programming-guide.html to multiple > separate pages like what we did for > https://spark.apache.org/docs/latest/ml-guide.html -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24183) add unit tests for ContinuousDataReader hook
[ https://issues.apache.org/jira/browse/SPARK-24183?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16504614#comment-16504614 ] Li Yuanjian commented on SPARK-24183: - Hi [~joseph.torres], I notice currently we already have `ContinuousQueuedDataReaderSuite` after SPARK-20439, is this class enough for "add unit tests for ContinuousDataReader hook"? > add unit tests for ContinuousDataReader hook > > > Key: SPARK-24183 > URL: https://issues.apache.org/jira/browse/SPARK-24183 > Project: Spark > Issue Type: Sub-task > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Jose Torres >Priority: Major > > Currently this is the class named ContinuousQueuedDataReader, but I don't > know if this will change as we deal with stateful operators. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24375) Design sketch: support barrier scheduling in Apache Spark
[ https://issues.apache.org/jira/browse/SPARK-24375?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16504171#comment-16504171 ] Li Yuanjian commented on SPARK-24375: - Got it, great thanks for your detailed explanation. > Design sketch: support barrier scheduling in Apache Spark > - > > Key: SPARK-24375 > URL: https://issues.apache.org/jira/browse/SPARK-24375 > Project: Spark > Issue Type: Story > Components: Spark Core >Affects Versions: 3.0.0 >Reporter: Xiangrui Meng >Assignee: Jiang Xingbo >Priority: Major > > This task is to outline a design sketch for the barrier scheduling SPIP > discussion. It doesn't need to be a complete design before the vote. But it > should at least cover both Scala/Java and PySpark. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-24375) Design sketch: support barrier scheduling in Apache Spark
[ https://issues.apache.org/jira/browse/SPARK-24375?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16503486#comment-16503486 ] Li Yuanjian edited comment on SPARK-24375 at 6/6/18 3:55 PM: - Hi [~cloud_fan] and [~jiangxb1987], just I tiny question here, I notice the discussion in SPARK-20928 mentioned the barrier scheduling which Continuous Processing will depend on. {quote} A barrier stage doesn’t launch any of its tasks until the available slots(free CPU cores can be used to launch pending tasks) satisfies the target to launch all the tasks at the same time, and always retry the whole stage when any task(s) fail. {quote} Why the task level retrying was forbidden here, is there any possible to achieve this? Thanks. was (Author: xuanyuan): Hi [~cloud_fan] and [~jiangxb1987], just I tiny question here, I notice the discussion in SPARK-20928 mentioned the barrier scheduling. {quote} A barrier stage doesn’t launch any of its tasks until the available slots(free CPU cores can be used to launch pending tasks) satisfies the target to launch all the tasks at the same time, and always retry the whole stage when any task(s) fail. {quote} Why the task level retrying was forbidden here, is there any possible to achieve this? Thanks. > Design sketch: support barrier scheduling in Apache Spark > - > > Key: SPARK-24375 > URL: https://issues.apache.org/jira/browse/SPARK-24375 > Project: Spark > Issue Type: Story > Components: Spark Core >Affects Versions: 3.0.0 >Reporter: Xiangrui Meng >Assignee: Jiang Xingbo >Priority: Major > > This task is to outline a design sketch for the barrier scheduling SPIP > discussion. It doesn't need to be a complete design before the vote. But it > should at least cover both Scala/Java and PySpark. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24375) Design sketch: support barrier scheduling in Apache Spark
[ https://issues.apache.org/jira/browse/SPARK-24375?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16503486#comment-16503486 ] Li Yuanjian commented on SPARK-24375: - Hi [~cloud_fan] and [~jiangxb1987], just I tiny question here, I notice the discussion in SPARK-20928 mentioned the barrier scheduling. {quote} A barrier stage doesn’t launch any of its tasks until the available slots(free CPU cores can be used to launch pending tasks) satisfies the target to launch all the tasks at the same time, and always retry the whole stage when any task(s) fail. {quote} Why the task level retrying was forbidden here, is there any possible to achieve this? Thanks. > Design sketch: support barrier scheduling in Apache Spark > - > > Key: SPARK-24375 > URL: https://issues.apache.org/jira/browse/SPARK-24375 > Project: Spark > Issue Type: Story > Components: Spark Core >Affects Versions: 3.0.0 >Reporter: Xiangrui Meng >Assignee: Jiang Xingbo >Priority: Major > > This task is to outline a design sketch for the barrier scheduling SPIP > discussion. It doesn't need to be a complete design before the vote. But it > should at least cover both Scala/Java and PySpark. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-24210) incorrect handling of boolean expressions when using column in expressions in pyspark.sql.DataFrame filter function
[ https://issues.apache.org/jira/browse/SPARK-24210?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16500383#comment-16500383 ] Li Yuanjian edited comment on SPARK-24210 at 6/4/18 3:34 PM: - I think it maybe not a bug. {code:python} #KO: returns r1 and r3ex.filter(('c1 = 1') and ('c2 = 1')).show() {code} This cause by python self base string __and__ implementation. After passing to df.filter, there's only 'c2 = 1'. {code:python} #KO: returns r0 and r3ex.filter('c1 = 1 & c2 = 1').show() #KO: returns r0 and r3ex.filter('c1 == 1 & c2 == 1').show() {code} As you mentioned, [https://github.com/apache/spark/pull/6961] actually fix the '&' between column, but not string expression like 'c1 = 1 & c2 = 1', here in ex.filter('c1 = 1 & c2 = 1'), Spark parse it to valueExpression like: 'Filter (('a = (1 & 'b)) = 1), I think this make sense here. was (Author: xuanyuan): I think it maybe not a bug. #KO: returns r1 and r3ex.filter(('c1 = 1') and ('c2 = 1')).show() This cause by python self base string __and__ implementation. After passing to df.filter, there's only 'c2 = 1'. #KO: returns r0 and r3ex.filter('c1 = 1 & c2 = 1').show()#KO: returns r0 and r3ex.filter('c1 == 1 & c2 == 1').show() As you mentioned, [https://github.com/apache/spark/pull/6961] actually fix the '&' between column, but not string expression like 'c1 = 1 & c2 = 1', here in ex.filter('c1 = 1 & c2 = 1'), Spark parse it to valueExpression like: 'Filter (('a = (1 & 'b)) = 1), I think this make sense here. > incorrect handling of boolean expressions when using column in expressions in > pyspark.sql.DataFrame filter function > --- > > Key: SPARK-24210 > URL: https://issues.apache.org/jira/browse/SPARK-24210 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.1.2 >Reporter: Michael H >Priority: Major > > {code:python} > ex = spark.createDataFrame([ > ('r0', 0, 0), > ('r1', 0, 1), > ('r2', 1, 0), > ('r3', 1, 1)]\ > , "row: string, c1: int, c2: int") > #KO: returns r1 and r3 > ex.filter(('c1 = 1') and ('c2 = 1')).show() > #OK, raises an exception > ex.filter(('c1 == 1') & ('c2 == 1')).show() > #KO: returns r0 and r3 > ex.filter('c1 = 1 & c2 = 1').show() > #KO: returns r0 and r3 > ex.filter('c1 == 1 & c2 == 1').show() > #OK: returns r3 only > ex.filter('c1 = 1 and c2 = 1').show() > #OK: returns r3 only > ex.filter('c1 == 1 and c2 == 1').show() > {code} > building the expressions using {code}ex.c1{code} or {code}ex['c1']{code} we > don't have this. > Issue seems related with > https://github.com/apache/spark/pull/6961 -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24210) incorrect handling of boolean expressions when using column in expressions in pyspark.sql.DataFrame filter function
[ https://issues.apache.org/jira/browse/SPARK-24210?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16500383#comment-16500383 ] Li Yuanjian commented on SPARK-24210: - I think it maybe not a bug. #KO: returns r1 and r3ex.filter(('c1 = 1') and ('c2 = 1')).show() This cause by python self base string __and__ implementation. After passing to df.filter, there's only 'c2 = 1'. #KO: returns r0 and r3ex.filter('c1 = 1 & c2 = 1').show()#KO: returns r0 and r3ex.filter('c1 == 1 & c2 == 1').show() As you mentioned, [https://github.com/apache/spark/pull/6961] actually fix the '&' between column, but not string expression like 'c1 = 1 & c2 = 1', here in ex.filter('c1 = 1 & c2 = 1'), Spark parse it to valueExpression like: 'Filter (('a = (1 & 'b)) = 1), I think this make sense here. > incorrect handling of boolean expressions when using column in expressions in > pyspark.sql.DataFrame filter function > --- > > Key: SPARK-24210 > URL: https://issues.apache.org/jira/browse/SPARK-24210 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.1.2 >Reporter: Michael H >Priority: Major > > {code:python} > ex = spark.createDataFrame([ > ('r0', 0, 0), > ('r1', 0, 1), > ('r2', 1, 0), > ('r3', 1, 1)]\ > , "row: string, c1: int, c2: int") > #KO: returns r1 and r3 > ex.filter(('c1 = 1') and ('c2 = 1')).show() > #OK, raises an exception > ex.filter(('c1 == 1') & ('c2 == 1')).show() > #KO: returns r0 and r3 > ex.filter('c1 = 1 & c2 = 1').show() > #KO: returns r0 and r3 > ex.filter('c1 == 1 & c2 == 1').show() > #OK: returns r3 only > ex.filter('c1 = 1 and c2 = 1').show() > #OK: returns r3 only > ex.filter('c1 == 1 and c2 == 1').show() > {code} > building the expressions using {code}ex.c1{code} or {code}ex['c1']{code} we > don't have this. > Issue seems related with > https://github.com/apache/spark/pull/6961 -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-24304) Scheduler changes for continuous processing shuffle support
Li Yuanjian created SPARK-24304: --- Summary: Scheduler changes for continuous processing shuffle support Key: SPARK-24304 URL: https://issues.apache.org/jira/browse/SPARK-24304 Project: Spark Issue Type: Sub-task Components: Structured Streaming Affects Versions: 2.4.0 Reporter: Li Yuanjian Both docs including the change requirements of scheduler, add a new jira and collect our comments here. https://docs.google.com/document/d/14cGJ75v9myznywtB35ytEqL9wHy9xfZRv06B6g2tUgI/edit#bookmark=id.tjcvevpyuv4x https://docs.google.com/document/d/1IL4kJoKrZWeyIhklKUJqsW-yEN7V7aL05MmM65AYOfE/edit?disco=B4z058g -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24293) Serialized shuffle supports mapSideCombine
[ https://issues.apache.org/jira/browse/SPARK-24293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16478406#comment-16478406 ] Li Yuanjian commented on SPARK-24293: - {quote} doing the map side combine manually with SQL operators, to get SQL optimizations like whole-stage-codegen. {quote} Thanks for [~cloud_fan] detailed explanation, this resolved our confusion of why map-side combine not supported in SerializedShuffle. > Serialized shuffle supports mapSideCombine > -- > > Key: SPARK-24293 > URL: https://issues.apache.org/jira/browse/SPARK-24293 > Project: Spark > Issue Type: Brainstorming > Components: Shuffle >Affects Versions: 2.3.0 >Reporter: Xianjin YE >Priority: Major > > While doing research on integrating my company's internal Shuffle Service > with Spark, I found it is possible to support mapSideCombine with serialized > shuffle. > The simple idea is that the `UnsafeShuffleWriter` uses a `Combiner` to > accumulate records when mapSideCombine is required before inserting into > `ShuffleExternalSorter`. The `Combiner` will tracking it's memory usage or > elements accumulated and is never spilled. When the `Combiner` accumulates > enough records(varied by different strategies), the accumulated (K, C) pairs > are then inserted into the `ShuffleExternalSorter`. After that, the > `Combiner` is reset to empty state. > After this change, combinedValues are sent to sorter segment by segment, and > the `BlockStoreShuffleReader` already handles this case. > I did a local POC, and looks like that I can get the same result with normal > SortShuffle. The performance is not optimized yet. The most significant part > of code is shown as below: > {code:java} > // code placeholder > while (records.hasNext()) { > Product2 record = records.next(); > if (this.mapSideCombine) { > this.aggregator.accumulateRecord(record); > if (this.aggregator.accumulatedKeyNum() >= 160_000) { // for poc > scala.collection.Iterator> combinedIterator = > this.aggregator.accumulatedIterator(); > while (combinedIterator.hasNext()) { > insertRecordIntoSorter(combinedIterator.next()); > } > this.aggregator.resetAccumulation(); > } > } else { > insertRecordIntoSorter(record); > } > } > if (this.mapSideCombine && this.aggregator.accumulatedKeyNum() > 0) { > scala.collection.Iterator> combinedIterator = > this.aggregator.accumulatedIterator(); > while (combinedIterator.hasNext()) { > insertRecordIntoSorter(combinedIterator.next()); > } > this.aggregator.resetAccumulation(1); > } > {code} > > Is there something I am missing? cc [~joshrosen] [~cloud_fan] [~XuanYuan] -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23128) A new approach to do adaptive execution in Spark SQL
[ https://issues.apache.org/jira/browse/SPARK-23128?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16471812#comment-16471812 ] Li Yuanjian commented on SPARK-23128: - I collected some user cases and performance improve effect during Baidu internal usage of this patch, summarize as following 3 scenario: 1. SortMergeJoin to BroadcastJoin The SortMergeJoin transform to BroadcastJoin over deeply tree node can bring us {color:red}50% to 200%{color} boosting on query performance, and this strategy alway hit the BI scenario like join several tables with filter strategy in subquery 2. Long running application or use Spark as a service In this case, long running application refers to the duration of application near 1 hour. Using Spark as a service refers to use spark-shell and keep submit sql or use the service of Spark like Zeppelin, Livy or our internal sql service Baidu BigSQL. In such scenario, all spark jobs share same partition number, so enable AE and add configs about expected task info including data size, row number, min\max partition number and etc, will bring us {color:red}50%-100%{color} boosting on performance improvement. 3. GraphFrame jobs The last scenario is the application use GraphFrame, in this case, user has a 2-dimension graph with 1 billion edges, use the connected componentsalgorithm in GraphFrame. With enabling AE, the duration of app reduce from 58min to 32min, almost {color:red}100%{color} boosting on performance improvement. The detailed screenshot and config in the attached pdf. > A new approach to do adaptive execution in Spark SQL > > > Key: SPARK-23128 > URL: https://issues.apache.org/jira/browse/SPARK-23128 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.0 >Reporter: Carson Wang >Priority: Major > Attachments: AdaptiveExecutioninBaidu.pdf > > > SPARK-9850 proposed the basic idea of adaptive execution in Spark. In > DAGScheduler, a new API is added to support submitting a single map stage. > The current implementation of adaptive execution in Spark SQL supports > changing the reducer number at runtime. An Exchange coordinator is used to > determine the number of post-shuffle partitions for a stage that needs to > fetch shuffle data from one or multiple stages. The current implementation > adds ExchangeCoordinator while we are adding Exchanges. However there are > some limitations. First, it may cause additional shuffles that may decrease > the performance. We can see this from EnsureRequirements rule when it adds > ExchangeCoordinator. Secondly, it is not a good idea to add > ExchangeCoordinators while we are adding Exchanges because we don’t have a > global picture of all shuffle dependencies of a post-shuffle stage. I.e. for > 3 tables’ join in a single stage, the same ExchangeCoordinator should be used > in three Exchanges but currently two separated ExchangeCoordinator will be > added. Thirdly, with the current framework it is not easy to implement other > features in adaptive execution flexibly like changing the execution plan and > handling skewed join at runtime. > We'd like to introduce a new way to do adaptive execution in Spark SQL and > address the limitations. The idea is described at > [https://docs.google.com/document/d/1mpVjvQZRAkD-Ggy6-hcjXtBPiQoVbZGe3dLnAKgtJ4k/edit?usp=sharing] -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-23128) A new approach to do adaptive execution in Spark SQL
[ https://issues.apache.org/jira/browse/SPARK-23128?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Li Yuanjian updated SPARK-23128: Attachment: AdaptiveExecutioninBaidu.pdf > A new approach to do adaptive execution in Spark SQL > > > Key: SPARK-23128 > URL: https://issues.apache.org/jira/browse/SPARK-23128 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.0 >Reporter: Carson Wang >Priority: Major > Attachments: AdaptiveExecutioninBaidu.pdf > > > SPARK-9850 proposed the basic idea of adaptive execution in Spark. In > DAGScheduler, a new API is added to support submitting a single map stage. > The current implementation of adaptive execution in Spark SQL supports > changing the reducer number at runtime. An Exchange coordinator is used to > determine the number of post-shuffle partitions for a stage that needs to > fetch shuffle data from one or multiple stages. The current implementation > adds ExchangeCoordinator while we are adding Exchanges. However there are > some limitations. First, it may cause additional shuffles that may decrease > the performance. We can see this from EnsureRequirements rule when it adds > ExchangeCoordinator. Secondly, it is not a good idea to add > ExchangeCoordinators while we are adding Exchanges because we don’t have a > global picture of all shuffle dependencies of a post-shuffle stage. I.e. for > 3 tables’ join in a single stage, the same ExchangeCoordinator should be used > in three Exchanges but currently two separated ExchangeCoordinator will be > added. Thirdly, with the current framework it is not easy to implement other > features in adaptive execution flexibly like changing the execution plan and > handling skewed join at runtime. > We'd like to introduce a new way to do adaptive execution in Spark SQL and > address the limitations. The idea is described at > [https://docs.google.com/document/d/1mpVjvQZRAkD-Ggy6-hcjXtBPiQoVbZGe3dLnAKgtJ4k/edit?usp=sharing] -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Issue Comment Deleted] (SPARK-23128) A new approach to do adaptive execution in Spark SQL
[ https://issues.apache.org/jira/browse/SPARK-23128?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Li Yuanjian updated SPARK-23128: Comment: was deleted (was: I collected some user cases and performance improve effect during Baidu internal usage of this patch, summarize as following 3 scenario: 1. SortMergeJoin to BroadcastJoin The SortMergeJoin transform to BroadcastJoin over deeply tree node can bring us {color:red}50% to 200%{color} boosting on query performance, and this strategy alway hit the BI scenario like join several tables with filter strategy in subquery 2. Long running application or use Spark as a service In this case, long running application refers to the duration of application near 1 hour. Using Spark as a service refers to use spark-shell and keep submit sql or use the service of Spark like Zeppelin, Livy or our internal sql service Baidu BigSQL. In such scenario, all spark jobs share same partition number, so enable AE and add configs about expected task info including data size, row number, min\max partition number and etc, will bring us {color:red}50%-100%{color} boosting on performance improvement. 3. GraphFrame jobs The last scenario is the application use GraphFrame, in this case, user has a 2-dimension graph with 1 billion edges, use the connected componentsalgorithm in GraphFrame. With enabling AE, the duration of app reduce from 58min to 32min, almost {color:red}100%{color} boosting on performance improvement. The detailed screenshot and config in the attached pdf. ) > A new approach to do adaptive execution in Spark SQL > > > Key: SPARK-23128 > URL: https://issues.apache.org/jira/browse/SPARK-23128 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.0 >Reporter: Carson Wang >Priority: Major > Attachments: AdaptiveExecutioninBaidu.pdf > > > SPARK-9850 proposed the basic idea of adaptive execution in Spark. In > DAGScheduler, a new API is added to support submitting a single map stage. > The current implementation of adaptive execution in Spark SQL supports > changing the reducer number at runtime. An Exchange coordinator is used to > determine the number of post-shuffle partitions for a stage that needs to > fetch shuffle data from one or multiple stages. The current implementation > adds ExchangeCoordinator while we are adding Exchanges. However there are > some limitations. First, it may cause additional shuffles that may decrease > the performance. We can see this from EnsureRequirements rule when it adds > ExchangeCoordinator. Secondly, it is not a good idea to add > ExchangeCoordinators while we are adding Exchanges because we don’t have a > global picture of all shuffle dependencies of a post-shuffle stage. I.e. for > 3 tables’ join in a single stage, the same ExchangeCoordinator should be used > in three Exchanges but currently two separated ExchangeCoordinator will be > added. Thirdly, with the current framework it is not easy to implement other > features in adaptive execution flexibly like changing the execution plan and > handling skewed join at runtime. > We'd like to introduce a new way to do adaptive execution in Spark SQL and > address the limitations. The idea is described at > [https://docs.google.com/document/d/1mpVjvQZRAkD-Ggy6-hcjXtBPiQoVbZGe3dLnAKgtJ4k/edit?usp=sharing] -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23128) A new approach to do adaptive execution in Spark SQL
[ https://issues.apache.org/jira/browse/SPARK-23128?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16471811#comment-16471811 ] Li Yuanjian commented on SPARK-23128: - I collected some user cases and performance improve effect during Baidu internal usage of this patch, summarize as following 3 scenario: 1. SortMergeJoin to BroadcastJoin The SortMergeJoin transform to BroadcastJoin over deeply tree node can bring us {color:red}50% to 200%{color} boosting on query performance, and this strategy alway hit the BI scenario like join several tables with filter strategy in subquery 2. Long running application or use Spark as a service In this case, long running application refers to the duration of application near 1 hour. Using Spark as a service refers to use spark-shell and keep submit sql or use the service of Spark like Zeppelin, Livy or our internal sql service Baidu BigSQL. In such scenario, all spark jobs share same partition number, so enable AE and add configs about expected task info including data size, row number, min\max partition number and etc, will bring us {color:red}50%-100%{color} boosting on performance improvement. 3. GraphFrame jobs The last scenario is the application use GraphFrame, in this case, user has a 2-dimension graph with 1 billion edges, use the connected componentsalgorithm in GraphFrame. With enabling AE, the duration of app reduce from 58min to 32min, almost {color:red}100%{color} boosting on performance improvement. The detailed screenshot and config in the attached pdf. > A new approach to do adaptive execution in Spark SQL > > > Key: SPARK-23128 > URL: https://issues.apache.org/jira/browse/SPARK-23128 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.0 >Reporter: Carson Wang >Priority: Major > Attachments: AdaptiveExecutioninBaidu.pdf > > > SPARK-9850 proposed the basic idea of adaptive execution in Spark. In > DAGScheduler, a new API is added to support submitting a single map stage. > The current implementation of adaptive execution in Spark SQL supports > changing the reducer number at runtime. An Exchange coordinator is used to > determine the number of post-shuffle partitions for a stage that needs to > fetch shuffle data from one or multiple stages. The current implementation > adds ExchangeCoordinator while we are adding Exchanges. However there are > some limitations. First, it may cause additional shuffles that may decrease > the performance. We can see this from EnsureRequirements rule when it adds > ExchangeCoordinator. Secondly, it is not a good idea to add > ExchangeCoordinators while we are adding Exchanges because we don’t have a > global picture of all shuffle dependencies of a post-shuffle stage. I.e. for > 3 tables’ join in a single stage, the same ExchangeCoordinator should be used > in three Exchanges but currently two separated ExchangeCoordinator will be > added. Thirdly, with the current framework it is not easy to implement other > features in adaptive execution flexibly like changing the execution plan and > handling skewed join at runtime. > We'd like to introduce a new way to do adaptive execution in Spark SQL and > address the limitations. The idea is described at > [https://docs.google.com/document/d/1mpVjvQZRAkD-Ggy6-hcjXtBPiQoVbZGe3dLnAKgtJ4k/edit?usp=sharing] -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-24235) create the top-of-task RDD sending rows to the remote buffer
[ https://issues.apache.org/jira/browse/SPARK-24235?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Li Yuanjian updated SPARK-24235: Description: [https://docs.google.com/document/d/1IL4kJoKrZWeyIhklKUJqsW-yEN7V7aL05MmM65AYOfE/edit#heading=h.8t3ci57f7uii] Note that after [https://github.com/apache/spark/pull/21239|https://github.com/apache/spark/pull/21239],this will need to be responsible for incrementing its task's EpochTracker. was: [https://docs.google.com/document/d/1IL4kJoKrZWeyIhklKUJqsW-yEN7V7aL05MmM65AYOfE/edit#heading=h.8t3ci57f7uii] Note that after [https://github.com/apache/spark/pull/21239|https://github.com/apache/spark/pull/21239,] ,this will need to be responsible for incrementing its task's EpochTracker. > create the top-of-task RDD sending rows to the remote buffer > > > Key: SPARK-24235 > URL: https://issues.apache.org/jira/browse/SPARK-24235 > Project: Spark > Issue Type: Sub-task > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Jose Torres >Priority: Major > > [https://docs.google.com/document/d/1IL4kJoKrZWeyIhklKUJqsW-yEN7V7aL05MmM65AYOfE/edit#heading=h.8t3ci57f7uii] > > Note that after > [https://github.com/apache/spark/pull/21239|https://github.com/apache/spark/pull/21239],this > will need to be responsible for incrementing its task's EpochTracker. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-24235) create the top-of-task RDD sending rows to the remote buffer
[ https://issues.apache.org/jira/browse/SPARK-24235?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Li Yuanjian updated SPARK-24235: Description: [https://docs.google.com/document/d/1IL4kJoKrZWeyIhklKUJqsW-yEN7V7aL05MmM65AYOfE/edit#heading=h.8t3ci57f7uii] Note that after [https://github.com/apache/spark/pull/21239|https://github.com/apache/spark/pull/21239,], this will need to be responsible for incrementing its task's EpochTracker. was: [https://docs.google.com/document/d/1IL4kJoKrZWeyIhklKUJqsW-yEN7V7aL05MmM65AYOfE/edit#heading=h.8t3ci57f7uii] Note that after [https://github.com/apache/spark/pull/21239,] this will need to be responsible for incrementing its task's EpochTracker. > create the top-of-task RDD sending rows to the remote buffer > > > Key: SPARK-24235 > URL: https://issues.apache.org/jira/browse/SPARK-24235 > Project: Spark > Issue Type: Sub-task > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Jose Torres >Priority: Major > > [https://docs.google.com/document/d/1IL4kJoKrZWeyIhklKUJqsW-yEN7V7aL05MmM65AYOfE/edit#heading=h.8t3ci57f7uii] > > Note that after > [https://github.com/apache/spark/pull/21239|https://github.com/apache/spark/pull/21239,], > this will need to be responsible for incrementing its task's EpochTracker. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-24235) create the top-of-task RDD sending rows to the remote buffer
[ https://issues.apache.org/jira/browse/SPARK-24235?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Li Yuanjian updated SPARK-24235: Description: [https://docs.google.com/document/d/1IL4kJoKrZWeyIhklKUJqsW-yEN7V7aL05MmM65AYOfE/edit#heading=h.8t3ci57f7uii] Note that after [https://github.com/apache/spark/pull/21239|https://github.com/apache/spark/pull/21239,] ,this will need to be responsible for incrementing its task's EpochTracker. was: [https://docs.google.com/document/d/1IL4kJoKrZWeyIhklKUJqsW-yEN7V7aL05MmM65AYOfE/edit#heading=h.8t3ci57f7uii] Note that after [https://github.com/apache/spark/pull/21239|https://github.com/apache/spark/pull/21239,], this will need to be responsible for incrementing its task's EpochTracker. > create the top-of-task RDD sending rows to the remote buffer > > > Key: SPARK-24235 > URL: https://issues.apache.org/jira/browse/SPARK-24235 > Project: Spark > Issue Type: Sub-task > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Jose Torres >Priority: Major > > [https://docs.google.com/document/d/1IL4kJoKrZWeyIhklKUJqsW-yEN7V7aL05MmM65AYOfE/edit#heading=h.8t3ci57f7uii] > > Note that after > [https://github.com/apache/spark/pull/21239|https://github.com/apache/spark/pull/21239,] > ,this will need to be responsible for incrementing its task's EpochTracker. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24036) Stateful operators in continuous processing
[ https://issues.apache.org/jira/browse/SPARK-24036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16470067#comment-16470067 ] Li Yuanjian commented on SPARK-24036: - I agree with the division about the kinds of tasks, that's quite clear, but maybe all of this can be maximum transparent to scheduler by reusing the ResultTask and ShuffleMapTask design, could the DAGScheduler use ContinuousShuffleMapTask to replace original ShuffleMapTask? {quote}Changing DAGScheduler to accommodate continuous processing would create significant additional complexity I don't think we can really justify. {quote} So here, in my opinion, maybe not as complex as we think? If I'm wrong please let me know. :) {quote}Whether we need to write an explicit shuffle RDD class or not would I think come down to an implementation detail of SPARK-24236. It depends on what's the cleanest way to unfold the SparkPlan tree. {quote} Yep, can't agree more. I'll arrange this part of our internal code and give a preview PR. We'll appreciate very much with your any opinions! > Stateful operators in continuous processing > --- > > Key: SPARK-24036 > URL: https://issues.apache.org/jira/browse/SPARK-24036 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Jose Torres >Priority: Major > > The first iteration of continuous processing in Spark 2.3 does not work with > stateful operators. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-24036) Stateful operators in continuous processing
[ https://issues.apache.org/jira/browse/SPARK-24036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16469830#comment-16469830 ] Li Yuanjian edited comment on SPARK-24036 at 5/10/18 2:32 AM: -- Hi [~joseph.torres] Thanks for cc me, looks great! My doc maybe included sub-task SPARK-24237 and SPARK-24236, could you have a look about the design: [design link|https://docs.google.com/document/d/14cGJ75v9myznywtB35ytEqL9wHy9xfZRv06B6g2tUgI/edit#bookmark=id.2lfv2glj7ny0], I'll take this two Jira and discuss with you in detail. Also in our practice, a new kind of continuous shuffle map task(I mentioned this in your doc comments: [comment link|https://docs.google.com/document/d/1IL4kJoKrZWeyIhklKUJqsW-yEN7V7aL05MmM65AYOfE/edit?disco=B4X1H_E]) and shuffle rdd should be added, do you agree to add another two Jira about these? was (Author: xuanyuan): Hi [~joseph.torres] Thanks for cc me, looks great! My doc maybe included sub-task SPARK-24237 and SPARK-23236, could you have a look about the design: [design link|https://docs.google.com/document/d/14cGJ75v9myznywtB35ytEqL9wHy9xfZRv06B6g2tUgI/edit#bookmark=id.2lfv2glj7ny0], I'll take this two Jira and discuss with you in detail. Also in our practice, a new kind of continuous shuffle map task(I mentioned this in your doc comments: [comment link|https://docs.google.com/document/d/1IL4kJoKrZWeyIhklKUJqsW-yEN7V7aL05MmM65AYOfE/edit?disco=B4X1H_E]) and shuffle rdd should be added, do you agree to add another two Jira about these? > Stateful operators in continuous processing > --- > > Key: SPARK-24036 > URL: https://issues.apache.org/jira/browse/SPARK-24036 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Jose Torres >Priority: Major > > The first iteration of continuous processing in Spark 2.3 does not work with > stateful operators. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24036) Stateful operators in continuous processing
[ https://issues.apache.org/jira/browse/SPARK-24036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16469830#comment-16469830 ] Li Yuanjian commented on SPARK-24036: - Hi [~joseph.torres] Thanks for cc me, looks great! My doc maybe included sub-task SPARK-24237 and SPARK-23236, could you have a look about the design: [design link|https://docs.google.com/document/d/14cGJ75v9myznywtB35ytEqL9wHy9xfZRv06B6g2tUgI/edit#bookmark=id.2lfv2glj7ny0], I'll take this two Jira and discuss with you in detail. Also in our practice, a new kind of continuous shuffle map task(I mentioned this in your doc comments: [comment link|https://docs.google.com/document/d/1IL4kJoKrZWeyIhklKUJqsW-yEN7V7aL05MmM65AYOfE/edit?disco=B4X1H_E]) and shuffle rdd should be added, do you agree to add another two Jira about these? > Stateful operators in continuous processing > --- > > Key: SPARK-24036 > URL: https://issues.apache.org/jira/browse/SPARK-24036 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Jose Torres >Priority: Major > > The first iteration of continuous processing in Spark 2.3 does not work with > stateful operators. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Closed] (SPARK-24108) ChunkedByteBuffer.writeFully method has not reset the limit value
[ https://issues.apache.org/jira/browse/SPARK-24108?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Li Yuanjian closed SPARK-24108. --- Duplicated submit for [SPARK-24107|https://issues.apache.org/jira/browse/SPARK-24107], just close it. > ChunkedByteBuffer.writeFully method has not reset the limit value > - > > Key: SPARK-24108 > URL: https://issues.apache.org/jira/browse/SPARK-24108 > Project: Spark > Issue Type: Bug > Components: Block Manager, Input/Output >Affects Versions: 2.2.0, 2.2.1, 2.3.0 >Reporter: wangjinhai >Priority: Major > Fix For: 2.4.0 > > > ChunkedByteBuffer.writeFully method has not reset the limit value. When > chunks larger than bufferWriteChunkSize, such as 80*1024*1024 larger than > config.BUFFER_WRITE_CHUNK_SIZE(64 * 1024 * 1024),only while once, will lost > 16*1024*1024 byte -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21661) SparkSQL can't merge load table from Hadoop
[ https://issues.apache.org/jira/browse/SPARK-21661?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16454070#comment-16454070 ] Li Yuanjian commented on SPARK-21661: - Got it. > SparkSQL can't merge load table from Hadoop > --- > > Key: SPARK-21661 > URL: https://issues.apache.org/jira/browse/SPARK-21661 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.2.0 >Reporter: Dapeng Sun >Assignee: Li Yuanjian >Priority: Major > Fix For: 2.3.0 > > > Here is the original text of external table on HDFS: > {noformat} > PermissionOwner Group SizeLast Modified Replication Block > Size Name > -rw-r--r--rootsupergroup 0 B 8/6/2017, 11:43:03 PM 3 > 256 MB income_band_001.dat > -rw-r--r--rootsupergroup 0 B 8/6/2017, 11:39:31 PM 3 > 256 MB income_band_002.dat > ... > -rw-r--r--rootsupergroup 327 B 8/6/2017, 11:44:47 PM 3 > 256 MB income_band_530.dat > {noformat} > After SparkSQL load, every files have a output file, even the files are 0B. > For the load on Hive, the data files would be merged according the data size > of original files. > Reproduce: > {noformat} > CREATE EXTERNAL TABLE t1 (a int,b string) STORED AS TEXTFILE LOCATION > "hdfs://xxx:9000/data/t1" > CREATE TABLE t2 STORED AS PARQUET AS SELECT * FROM t1; > {noformat} > The table t2 have many small files without data. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21661) SparkSQL can't merge load table from Hadoop
[ https://issues.apache.org/jira/browse/SPARK-21661?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16453591#comment-16453591 ] Li Yuanjian commented on SPARK-21661: - [~hyukjin.kwon] Hi Hyukjin, do you think we should backport [https://github.com/apache/spark/pull/18654] to branch 2.1\2.2? I found there's some Jira report this problem. > SparkSQL can't merge load table from Hadoop > --- > > Key: SPARK-21661 > URL: https://issues.apache.org/jira/browse/SPARK-21661 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.2.0 >Reporter: Dapeng Sun >Assignee: Li Yuanjian >Priority: Major > > Here is the original text of external table on HDFS: > {noformat} > PermissionOwner Group SizeLast Modified Replication Block > Size Name > -rw-r--r--rootsupergroup 0 B 8/6/2017, 11:43:03 PM 3 > 256 MB income_band_001.dat > -rw-r--r--rootsupergroup 0 B 8/6/2017, 11:39:31 PM 3 > 256 MB income_band_002.dat > ... > -rw-r--r--rootsupergroup 327 B 8/6/2017, 11:44:47 PM 3 > 256 MB income_band_530.dat > {noformat} > After SparkSQL load, every files have a output file, even the files are 0B. > For the load on Hive, the data files would be merged according the data size > of original files. > Reproduce: > {noformat} > CREATE EXTERNAL TABLE t1 (a int,b string) STORED AS TEXTFILE LOCATION > "hdfs://xxx:9000/data/t1" > CREATE TABLE t2 STORED AS PARQUET AS SELECT * FROM t1; > {noformat} > The table t2 have many small files without data. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-23811) FetchFailed comes before Success of same task will cause child stage never succeed
[ https://issues.apache.org/jira/browse/SPARK-23811?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Li Yuanjian updated SPARK-23811: Description: This is a bug caused by abnormal scenario describe below: # ShuffleMapTask 1.0 running, this task will fetch data from ExecutorA # ExecutorA Lost, trigger `mapOutputTracker.removeOutputsOnExecutor(execId)` , shuffleStatus changed. # Speculative ShuffleMapTask 1.1 start, got a FetchFailed immediately. # ShuffleMapTask 1.0 finally succeed, but because of 1.1's FetchFailed, stage still mark as failed stage. # ShuffleMapTask 1 is the last task of its stage, so this stage will never succeed because of there's no missing task DagScheduler can get. was: This is a bug caused by abnormal scenario describe below: # ShuffleMapTask 1.0 running, this task will fetch data from ExecutorA # ExecutorA Lost, trigger `mapOutputTracker.removeOutputsOnExecutor(execId)` , shuffleStatus changed. # Speculative ShuffleMapTask 1.1 start, got a FetchFailed immediately. # ShuffleMapTask 1 is the last task of its stage, so this stage will never succeed because of there's no missing task DagScheduler can get. > FetchFailed comes before Success of same task will cause child stage never > succeed > -- > > Key: SPARK-23811 > URL: https://issues.apache.org/jira/browse/SPARK-23811 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.2.0, 2.3.0 >Reporter: Li Yuanjian >Priority: Major > Attachments: 1.png, 2.png > > > This is a bug caused by abnormal scenario describe below: > # ShuffleMapTask 1.0 running, this task will fetch data from ExecutorA > # ExecutorA Lost, trigger `mapOutputTracker.removeOutputsOnExecutor(execId)` > , shuffleStatus changed. > # Speculative ShuffleMapTask 1.1 start, got a FetchFailed immediately. > # ShuffleMapTask 1.0 finally succeed, but because of 1.1's FetchFailed, > stage still mark as failed stage. > # ShuffleMapTask 1 is the last task of its stage, so this stage will never > succeed because of there's no missing task DagScheduler can get. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-23811) FetchFailed comes before Success of same task will cause child stage never succeed
[ https://issues.apache.org/jira/browse/SPARK-23811?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Li Yuanjian updated SPARK-23811: Summary: FetchFailed comes before Success of same task will cause child stage never succeed (was: Same tasks' FetchFailed event comes before Success will cause child stage never succeed) > FetchFailed comes before Success of same task will cause child stage never > succeed > -- > > Key: SPARK-23811 > URL: https://issues.apache.org/jira/browse/SPARK-23811 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.2.0, 2.3.0 >Reporter: Li Yuanjian >Priority: Major > Attachments: 1.png, 2.png > > > This is a bug caused by abnormal scenario describe below: > # ShuffleMapTask 1.0 running, this task will fetch data from ExecutorA > # ExecutorA Lost, trigger `mapOutputTracker.removeOutputsOnExecutor(execId)` > , shuffleStatus changed. > # Speculative ShuffleMapTask 1.1 start, got a FetchFailed immediately. > # ShuffleMapTask 1 is the last task of its stage, so this stage will never > succeed because of there's no missing task DagScheduler can get. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23811) Same tasks' FetchFailed event comes before Success will cause child stage never succeed
[ https://issues.apache.org/jira/browse/SPARK-23811?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16418434#comment-16418434 ] Li Yuanjian commented on SPARK-23811: - The scenario can be reproduced by below test case added in `{{DAGSchedulerSuite`}} {code:java} /** * This tests the case where origin task success after speculative task got FetchFailed * before. */ test("[SPARK-23811] Fetch failed task should kill other attempt") { // Create 3 RDDs with shuffle dependencies on each other: rddA <--- rddB <--- rddC val rddA = new MyRDD(sc, 2, Nil) val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2)) val shuffleIdA = shuffleDepA.shuffleId val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = mapOutputTracker) val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(2)) val rddC = new MyRDD(sc, 2, List(shuffleDepB), tracker = mapOutputTracker) submit(rddC, Array(0, 1)) // Complete both tasks in rddA. assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0) complete(taskSets(0), Seq( (Success, makeMapStatus("hostA", 2)), (Success, makeMapStatus("hostB", 2 // The first task success runEvent(makeCompletionEvent( taskSets(1).tasks(0), Success, makeMapStatus("hostB", 2))) // The second task's speculative attempt fails first, but task self still running. // This may caused by ExecutorLost. runEvent(makeCompletionEvent( taskSets(1).tasks(1), FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, "ignored"), null)) // Check currently missing partition assert(mapOutputTracker.findMissingPartitions(shuffleDepB.shuffleId).get.size === 1) val missingPartition = mapOutputTracker.findMissingPartitions(shuffleDepB.shuffleId).get(0) // The second result task self success soon runEvent(makeCompletionEvent( taskSets(1).tasks(1), Success, makeMapStatus("hostB", 2))) // No missing partitions here, this will cause child stage never succeed assert(mapOutputTracker.findMissingPartitions(shuffleDepB.shuffleId).get.size === 0) } {code} > Same tasks' FetchFailed event comes before Success will cause child stage > never succeed > --- > > Key: SPARK-23811 > URL: https://issues.apache.org/jira/browse/SPARK-23811 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.2.0, 2.3.0 >Reporter: Li Yuanjian >Priority: Major > Attachments: 1.png, 2.png > > > This is a bug caused by abnormal scenario describe below: > # ShuffleMapTask 1.0 running, this task will fetch data from ExecutorA > # ExecutorA Lost, trigger `mapOutputTracker.removeOutputsOnExecutor(execId)` > , shuffleStatus changed. > # Speculative ShuffleMapTask 1.1 start, got a FetchFailed immediately. > # ShuffleMapTask 1 is the last task of its stage, so this stage will never > succeed because of there's no missing task DagScheduler can get. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-23811) Same tasks' FetchFailed event comes before Success will cause child stage never succeed
[ https://issues.apache.org/jira/browse/SPARK-23811?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Li Yuanjian updated SPARK-23811: Attachment: 2.png > Same tasks' FetchFailed event comes before Success will cause child stage > never succeed > --- > > Key: SPARK-23811 > URL: https://issues.apache.org/jira/browse/SPARK-23811 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.2.0, 2.3.0 >Reporter: Li Yuanjian >Priority: Major > Attachments: 1.png, 2.png > > > This is a bug caused by abnormal scenario describe below: > # ShuffleMapTask 1.0 running, this task will fetch data from ExecutorA > # ExecutorA Lost, trigger `mapOutputTracker.removeOutputsOnExecutor(execId)` > , shuffleStatus changed. > # Speculative ShuffleMapTask 1.1 start, got a FetchFailed immediately. > # ShuffleMapTask 1 is the last task of its stage, so this stage will never > succeed because of there's no missing task DagScheduler can get. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-23811) Same tasks' FetchFailed event comes before Success will cause child stage never succeed
[ https://issues.apache.org/jira/browse/SPARK-23811?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Li Yuanjian updated SPARK-23811: Attachment: 1.png > Same tasks' FetchFailed event comes before Success will cause child stage > never succeed > --- > > Key: SPARK-23811 > URL: https://issues.apache.org/jira/browse/SPARK-23811 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.2.0, 2.3.0 >Reporter: Li Yuanjian >Priority: Major > Attachments: 1.png > > > This is a bug caused by abnormal scenario describe below: > # ShuffleMapTask 1.0 running, this task will fetch data from ExecutorA > # ExecutorA Lost, trigger `mapOutputTracker.removeOutputsOnExecutor(execId)` > , shuffleStatus changed. > # Speculative ShuffleMapTask 1.1 start, got a FetchFailed immediately. > # ShuffleMapTask 1 is the last task of its stage, so this stage will never > succeed because of there's no missing task DagScheduler can get. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-23811) Same tasks' FetchFailed event comes before Success will cause child stage never succeed
Li Yuanjian created SPARK-23811: --- Summary: Same tasks' FetchFailed event comes before Success will cause child stage never succeed Key: SPARK-23811 URL: https://issues.apache.org/jira/browse/SPARK-23811 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 2.3.0, 2.2.0 Reporter: Li Yuanjian This is a bug caused by abnormal scenario describe below: # ShuffleMapTask 1.0 running, this task will fetch data from ExecutorA # ExecutorA Lost, trigger `mapOutputTracker.removeOutputsOnExecutor(execId)` , shuffleStatus changed. # Speculative ShuffleMapTask 1.1 start, got a FetchFailed immediately. # ShuffleMapTask 1 is the last task of its stage, so this stage will never succeed because of there's no missing task DagScheduler can get. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-23533) Add support for changing ContinuousDataReader's startOffset
[ https://issues.apache.org/jira/browse/SPARK-23533?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Li Yuanjian updated SPARK-23533: Summary: Add support for changing ContinuousDataReader's startOffset (was: Add support for changing ContinousDataReader's startOffset) > Add support for changing ContinuousDataReader's startOffset > --- > > Key: SPARK-23533 > URL: https://issues.apache.org/jira/browse/SPARK-23533 > Project: Spark > Issue Type: Task > Components: Structured Streaming >Affects Versions: 2.3.0 >Reporter: Li Yuanjian >Priority: Major > > As discussion in [https://github.com/apache/spark/pull/20675], we need add a > new interface `ContinuousDataReaderFactory` to support the requirements of > setting start offset in Continuous Processing. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-23533) Add support for changing ContinousDataReader's startOffset
Li Yuanjian created SPARK-23533: --- Summary: Add support for changing ContinousDataReader's startOffset Key: SPARK-23533 URL: https://issues.apache.org/jira/browse/SPARK-23533 Project: Spark Issue Type: Task Components: Structured Streaming Affects Versions: 2.3.0 Reporter: Li Yuanjian As discussion in [https://github.com/apache/spark/pull/20675], we need add a new interface `ContinuousDataReaderFactory` to support the requirements of setting start offset in Continuous Processing. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-22956) Union Stream Failover Cause `IllegalStateException`
Li Yuanjian created SPARK-22956: --- Summary: Union Stream Failover Cause `IllegalStateException` Key: SPARK-22956 URL: https://issues.apache.org/jira/browse/SPARK-22956 Project: Spark Issue Type: Bug Components: Structured Streaming Affects Versions: 2.1.0 Reporter: Li Yuanjian When we union 2 streams from kafka or other sources, while one of them have no continues data coming and in the same time task restart, this will cause an `IllegalStateException`. This mainly cause because the code in [MicroBatchExecution|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala#L190] , while one stream has no continues data, its comittedOffset same with availableOffset during `populateStartOffsets`, and `currentPartitionOffsets` not properly handled in KafkaSource. Also, maybe we should also consider this scenario in other Source. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-2926) Add MR-style (merge-sort) SortShuffleReader for sort-based shuffle
[ https://issues.apache.org/jira/browse/SPARK-2926?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Li Yuanjian updated SPARK-2926: --- Attachment: Spark Shuffle Test Report on Spark2.x.pdf [~jerryshao] Hi saisai, thanks for your advise, I added a test report according to your suggestion. As described in the report, I only compare two shuffle mode in 'sort-by-key' workload because other test workloads shared same code paths in POC implementation(SortShuffleWriter with BlockStoreShuffleReader). Also add a config( [code link|https://github.com/apache/spark/pull/19745/commits/fe9394eadf8ea51af2b2cb41b5b42981fa600752] ) just to force shutting down SerializedShuffle in 'sort-by-key' workload, otherwise both of master and POC use the SerializedShuffle. For sort-by-key work around after closing Serialized Shuffle, the POC version can brings 1.44x faster than current master, although map side stage 1.16x slower, but reducer stage has 9.4x boosting. > Add MR-style (merge-sort) SortShuffleReader for sort-based shuffle > -- > > Key: SPARK-2926 > URL: https://issues.apache.org/jira/browse/SPARK-2926 > Project: Spark > Issue Type: Improvement > Components: Shuffle >Affects Versions: 1.1.0 >Reporter: Saisai Shao >Assignee: Saisai Shao > Attachments: SortBasedShuffleRead.pdf, SortBasedShuffleReader on > Spark 2.x.pdf, Spark Shuffle Test Report on Spark2.x.pdf, Spark Shuffle Test > Report(contd).pdf, Spark Shuffle Test Report.pdf > > > Currently Spark has already integrated sort-based shuffle write, which > greatly improve the IO performance and reduce the memory consumption when > reducer number is very large. But for the reducer side, it still adopts the > implementation of hash-based shuffle reader, which neglects the ordering > attributes of map output data in some situations. > Here we propose a MR style sort-merge like shuffle reader for sort-based > shuffle to better improve the performance of sort-based shuffle. > Working in progress code and performance test report will be posted later > when some unit test bugs are fixed. > Any comments would be greatly appreciated. > Thanks a lot. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-22753) Get rid of dataSource.writeAndRead
Li Yuanjian created SPARK-22753: --- Summary: Get rid of dataSource.writeAndRead Key: SPARK-22753 URL: https://issues.apache.org/jira/browse/SPARK-22753 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 2.1.0 Reporter: Li Yuanjian Priority: Minor Code clean work for getting rid of dataSource.writeAndRead. As the discussion in https://github.com/apache/spark/pull/16481 and https://github.com/apache/spark/pull/18975#discussion_r155454606 Currently the BaseRelation returned by `dataSource.writeAndRead` only used in `CreateDataSourceTableAsSelect`, planForWriting and writeAndRead has some common code paths. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-2926) Add MR-style (merge-sort) SortShuffleReader for sort-based shuffle
[ https://issues.apache.org/jira/browse/SPARK-2926?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16265109#comment-16265109 ] Li Yuanjian commented on SPARK-2926: Yes, only the reduce stage. You're right, I shouldn't only pay attention to the final stage. I rearrange all messages of screenshot below: ||test name||map stage shuffle write||reduce stage shuffle read||map stage duration||reduce stage duration||total job time||code base|| |Test Round1|654.4 MB|802.6MB|3.6min|2.0min|5.5min|master| |Test Round1|654.4 MB|714.0MB|3.4min|9s|3.5min|SPARK-2926| |Test Round2: Add more pressure for SortShuffleReader by coalesce|654.4MB|654.4MB|2.6min|20min|22min|master| |Test Round2: Add more pressure for SortShuffleReader by coalesce|654.4MB|654.4MB|3.7min|1.4min|5.1min|SPARK-2926| |Test Round3: Test file spill scenario of sort shuffle reader|142.6MB|142.6MB|26s|16min|16min|master| |Test Round3: Test file spill scenario of sort shuffle reader|142.6MB|142.6MB|21s|13s|34s|SPARK-2926| |Test Round3: Test file spill scenario of sort shuffle reader|142.6MB|142.6MB|22s|25s|47s|SPARK-2926(code change for force spill to disk)| |Test Round3: Test file spill scenario of sort shuffle reader|142.6MB|142.6MB|22s|29s|51s|SPARK-2926(code change for force spill to disk)| > Add MR-style (merge-sort) SortShuffleReader for sort-based shuffle > -- > > Key: SPARK-2926 > URL: https://issues.apache.org/jira/browse/SPARK-2926 > Project: Spark > Issue Type: Improvement > Components: Shuffle >Affects Versions: 1.1.0 >Reporter: Saisai Shao >Assignee: Saisai Shao > Attachments: SortBasedShuffleRead.pdf, SortBasedShuffleReader on > Spark 2.x.pdf, Spark Shuffle Test Report(contd).pdf, Spark Shuffle Test > Report.pdf > > > Currently Spark has already integrated sort-based shuffle write, which > greatly improve the IO performance and reduce the memory consumption when > reducer number is very large. But for the reducer side, it still adopts the > implementation of hash-based shuffle reader, which neglects the ordering > attributes of map output data in some situations. > Here we propose a MR style sort-merge like shuffle reader for sort-based > shuffle to better improve the performance of sort-based shuffle. > Working in progress code and performance test report will be posted later > when some unit test bugs are fixed. > Any comments would be greatly appreciated. > Thanks a lot. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-2926) Add MR-style (merge-sort) SortShuffleReader for sort-based shuffle
[ https://issues.apache.org/jira/browse/SPARK-2926?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16265030#comment-16265030 ] Li Yuanjian commented on SPARK-2926: [~jerryshao], thanks a lot for your advise and reply. {quote} would you please use spark-perf's micro benchmark (https://github.com/databricks/spark-perf) to verify again with same workload as mentioned in original test report? {quote} Sure, I'll verify this again ASAP. {quote} Theoretically this solution cannot get 12x-30x boosting according to my test {quote} Firstly I also had question on this, I attached all the screenshot in the pdf. The 12x boosting happened in both scenario of reducer task number is 1 and 100. The duration of this stage reduce from 2min to 9s(13x) while reducer task number is 1 and reduce from 20min to 1.4min while the number is 100. The 30x boosting happened after I add more data pressure for reducer task. {quote} Can you please explain the key difference and the reason of such boosting? {quote} I think the key difference mainly comes from this 2 points: 1. Like saisai said, BlockStoreShuffleReader use `ExternalSorter` deal with the reduce work, each record should do the compare work, while SortShuffleReader is more cpu friendly, it collect all shuffle map result(both data in memory and data spilled to disk) and sort them by merging sort(each partition has been sorted in map side). 2. The obvious cut down of peak memory used in reduce task, this will save gc time during sorting. > Add MR-style (merge-sort) SortShuffleReader for sort-based shuffle > -- > > Key: SPARK-2926 > URL: https://issues.apache.org/jira/browse/SPARK-2926 > Project: Spark > Issue Type: Improvement > Components: Shuffle >Affects Versions: 1.1.0 >Reporter: Saisai Shao >Assignee: Saisai Shao > Attachments: SortBasedShuffleRead.pdf, SortBasedShuffleReader on > Spark 2.x.pdf, Spark Shuffle Test Report(contd).pdf, Spark Shuffle Test > Report.pdf > > > Currently Spark has already integrated sort-based shuffle write, which > greatly improve the IO performance and reduce the memory consumption when > reducer number is very large. But for the reducer side, it still adopts the > implementation of hash-based shuffle reader, which neglects the ordering > attributes of map output data in some situations. > Here we propose a MR style sort-merge like shuffle reader for sort-based > shuffle to better improve the performance of sort-based shuffle. > Working in progress code and performance test report will be posted later > when some unit test bugs are fixed. > Any comments would be greatly appreciated. > Thanks a lot. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-22546) Allow users to update the dataType of a column
Li Yuanjian created SPARK-22546: --- Summary: Allow users to update the dataType of a column Key: SPARK-22546 URL: https://issues.apache.org/jira/browse/SPARK-22546 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.1.0 Reporter: Li Yuanjian [SPARK-17910|https://issues.apache.org/jira/browse/SPARK-17910] supported user to change comment of column, the patch also left TODO for other metadata. Here just implement the dataType changing requirement, others like renaming column and changing position may have different considerations in datasource table and hive table, maybe need more discussion. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-2926) Add MR-style (merge-sort) SortShuffleReader for sort-based shuffle
[ https://issues.apache.org/jira/browse/SPARK-2926?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16251457#comment-16251457 ] Li Yuanjian commented on SPARK-2926: I just giving a preview PR above, I'll collect more suggestions about this and maybe raise a SPIP vote later. > Add MR-style (merge-sort) SortShuffleReader for sort-based shuffle > -- > > Key: SPARK-2926 > URL: https://issues.apache.org/jira/browse/SPARK-2926 > Project: Spark > Issue Type: Improvement > Components: Shuffle >Affects Versions: 1.1.0 >Reporter: Saisai Shao >Assignee: Saisai Shao > Attachments: SortBasedShuffleRead.pdf, SortBasedShuffleReader on > Spark 2.x.pdf, Spark Shuffle Test Report(contd).pdf, Spark Shuffle Test > Report.pdf > > > Currently Spark has already integrated sort-based shuffle write, which > greatly improve the IO performance and reduce the memory consumption when > reducer number is very large. But for the reducer side, it still adopts the > implementation of hash-based shuffle reader, which neglects the ordering > attributes of map output data in some situations. > Here we propose a MR style sort-merge like shuffle reader for sort-based > shuffle to better improve the performance of sort-based shuffle. > Working in progress code and performance test report will be posted later > when some unit test bugs are fixed. > Any comments would be greatly appreciated. > Thanks a lot. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Issue Comment Deleted] (SPARK-2926) Add MR-style (merge-sort) SortShuffleReader for sort-based shuffle
[ https://issues.apache.org/jira/browse/SPARK-2926?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Li Yuanjian updated SPARK-2926: --- Comment: was deleted (was: The follow up work for SortShuffleReader in current master branch, detail test cases and benchmark description.) > Add MR-style (merge-sort) SortShuffleReader for sort-based shuffle > -- > > Key: SPARK-2926 > URL: https://issues.apache.org/jira/browse/SPARK-2926 > Project: Spark > Issue Type: Improvement > Components: Shuffle >Affects Versions: 1.1.0 >Reporter: Saisai Shao >Assignee: Saisai Shao > Attachments: SortBasedShuffleRead.pdf, SortBasedShuffleReader on > Spark 2.x.pdf, Spark Shuffle Test Report(contd).pdf, Spark Shuffle Test > Report.pdf > > > Currently Spark has already integrated sort-based shuffle write, which > greatly improve the IO performance and reduce the memory consumption when > reducer number is very large. But for the reducer side, it still adopts the > implementation of hash-based shuffle reader, which neglects the ordering > attributes of map output data in some situations. > Here we propose a MR style sort-merge like shuffle reader for sort-based > shuffle to better improve the performance of sort-based shuffle. > Working in progress code and performance test report will be posted later > when some unit test bugs are fixed. > Any comments would be greatly appreciated. > Thanks a lot. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-2926) Add MR-style (merge-sort) SortShuffleReader for sort-based shuffle
[ https://issues.apache.org/jira/browse/SPARK-2926?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Li Yuanjian updated SPARK-2926: --- Attachment: SortBasedShuffleReader on Spark 2.x.pdf The follow up work for SortShuffleReader in current master branch, detail test cases and benchmark description. > Add MR-style (merge-sort) SortShuffleReader for sort-based shuffle > -- > > Key: SPARK-2926 > URL: https://issues.apache.org/jira/browse/SPARK-2926 > Project: Spark > Issue Type: Improvement > Components: Shuffle >Affects Versions: 1.1.0 >Reporter: Saisai Shao >Assignee: Saisai Shao > Attachments: SortBasedShuffleRead.pdf, SortBasedShuffleReader on > Spark 2.x.pdf, Spark Shuffle Test Report(contd).pdf, Spark Shuffle Test > Report.pdf > > > Currently Spark has already integrated sort-based shuffle write, which > greatly improve the IO performance and reduce the memory consumption when > reducer number is very large. But for the reducer side, it still adopts the > implementation of hash-based shuffle reader, which neglects the ordering > attributes of map output data in some situations. > Here we propose a MR style sort-merge like shuffle reader for sort-based > shuffle to better improve the performance of sort-based shuffle. > Working in progress code and performance test report will be posted later > when some unit test bugs are fixed. > Any comments would be greatly appreciated. > Thanks a lot. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-2926) Add MR-style (merge-sort) SortShuffleReader for sort-based shuffle
[ https://issues.apache.org/jira/browse/SPARK-2926?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16251398#comment-16251398 ] Li Yuanjian edited comment on SPARK-2926 at 11/14/17 1:54 PM: -- During our work of migrating some old Hadoop job to Spark, I noticed this JIRA and the code based on spark 1.x. I re-implemented the old PR based on Spark 2.1 and current master branch. After produced some scenario and ran some benchmark tests, I found that this shuffle mode can bring {color:red}12x~30x boosting in task duration and reduce peak execution memory to 1/12 ~ 1/50{color} vs current master version(see detail screenshot and test data in attatched pdf), especially the memory reducing, in this shuffle mode Spark can support more data size in less memory usage. The detail doc attached in this jira named "SortShuffleReader on Spark 2.x.pdf". I know that DataSet API will have better optimization and performance, but RDD API may still useful for flexible control and old Spark/Hadoop jobs. For the better performance in ordering cases and more cost-effective memory usage, maybe this PR is still worth to merge in to master. I'll sort out current code base and give a PR soon. Any comments and trying out would be greatly appreciated. was (Author: xuanyuan): During our work of migrating some old Hadoop job to Spark, I noticed this JIRA and the code based on spark 1.x. I re-implemented the old PR based on Spark 2.1 and current master branch. After produced some scenario and ran some benchmark tests, I found that this shuffle mode can bring {color:red}12x~30x boosting in task duration and reduce peak execution memory to 1/12 ~ 1/50{color}(see detail screenshot and test data in attatched pdf) vs current master version, especially the memory reducing, in this shuffle mode Spark can support more data size in less memory usage. The detail doc attached in this jira named "SortShuffleReader on Spark 2.x.pdf". I know that DataSet API will have better optimization and performance, but RDD API may still useful for flexible control and old Spark/Hadoop jobs. For the better performance in ordering cases and more cost-effective memory usage, maybe this PR is still worth to merge in to master. I'll sort out current code base and give a PR soon. Any comments and trying out would be greatly appreciated. > Add MR-style (merge-sort) SortShuffleReader for sort-based shuffle > -- > > Key: SPARK-2926 > URL: https://issues.apache.org/jira/browse/SPARK-2926 > Project: Spark > Issue Type: Improvement > Components: Shuffle >Affects Versions: 1.1.0 >Reporter: Saisai Shao >Assignee: Saisai Shao > Attachments: SortBasedShuffleRead.pdf, Spark Shuffle Test > Report(contd).pdf, Spark Shuffle Test Report.pdf > > > Currently Spark has already integrated sort-based shuffle write, which > greatly improve the IO performance and reduce the memory consumption when > reducer number is very large. But for the reducer side, it still adopts the > implementation of hash-based shuffle reader, which neglects the ordering > attributes of map output data in some situations. > Here we propose a MR style sort-merge like shuffle reader for sort-based > shuffle to better improve the performance of sort-based shuffle. > Working in progress code and performance test report will be posted later > when some unit test bugs are fixed. > Any comments would be greatly appreciated. > Thanks a lot. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-2926) Add MR-style (merge-sort) SortShuffleReader for sort-based shuffle
[ https://issues.apache.org/jira/browse/SPARK-2926?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16251398#comment-16251398 ] Li Yuanjian edited comment on SPARK-2926 at 11/14/17 1:53 PM: -- During our work of migrating some old Hadoop job to Spark, I noticed this JIRA and the code based on spark 1.x. I re-implemented the old PR based on Spark 2.1 and current master branch. After produced some scenario and ran some benchmark tests, I found that this shuffle mode can bring {color:red}12x~30x boosting in task duration and reduce peak execution memory to 1/12 ~ 1/50{color}(see detail screenshot and test data in attatched pdf) vs current master version, especially the memory reducing, in this shuffle mode Spark can support more data size in less memory usage. The detail doc attached in this jira named "SortShuffleReader on Spark 2.x.pdf". I know that DataSet API will have better optimization and performance, but RDD API may still useful for flexible control and old Spark/Hadoop jobs. For the better performance in ordering cases and more cost-effective memory usage, maybe this PR is still worth to merge in to master. I'll sort out current code base and give a PR soon. Any comments and trying out would be greatly appreciated. was (Author: xuanyuan): During our work of migrating some old Hadoop job to Spark, I noticed this JIRA and the code based on spark 1.x. I re-implemented the old PR based on Spark 2.1 and current master branch. After produced some scenario and ran some benchmark tests, I found that this shuffle mode can bring {color:red}12x~30x boosting in task duration and reduce peak execution memory to 1/12 ~ 1/50{color} vs current master version, especially the memory reducing, in this shuffle mode Spark can support more data size in less memory usage. The detail doc attached in this jira named "SortShuffleReader on Spark 2.x". I know that DataSet API will have better optimization and performance, but RDD API may still useful for flexible control and old Spark/Hadoop jobs. For the better performance in ordering cases and more cost-effective memory usage, maybe this PR is still worth to merge in to master. I'll sort out current code base and give a PR soon. Any comments and trying out would be greatly appreciated. > Add MR-style (merge-sort) SortShuffleReader for sort-based shuffle > -- > > Key: SPARK-2926 > URL: https://issues.apache.org/jira/browse/SPARK-2926 > Project: Spark > Issue Type: Improvement > Components: Shuffle >Affects Versions: 1.1.0 >Reporter: Saisai Shao >Assignee: Saisai Shao > Attachments: SortBasedShuffleRead.pdf, Spark Shuffle Test > Report(contd).pdf, Spark Shuffle Test Report.pdf > > > Currently Spark has already integrated sort-based shuffle write, which > greatly improve the IO performance and reduce the memory consumption when > reducer number is very large. But for the reducer side, it still adopts the > implementation of hash-based shuffle reader, which neglects the ordering > attributes of map output data in some situations. > Here we propose a MR style sort-merge like shuffle reader for sort-based > shuffle to better improve the performance of sort-based shuffle. > Working in progress code and performance test report will be posted later > when some unit test bugs are fixed. > Any comments would be greatly appreciated. > Thanks a lot. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-2926) Add MR-style (merge-sort) SortShuffleReader for sort-based shuffle
[ https://issues.apache.org/jira/browse/SPARK-2926?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16251398#comment-16251398 ] Li Yuanjian commented on SPARK-2926: During our work of migrating some old Hadoop job to Spark, I noticed this JIRA and the code based on spark 1.x. I re-implemented the old PR based on Spark 2.1 and current master branch. After produced some scenario and ran some benchmark tests, I found that this shuffle mode can bring {color:red}12x~30x boosting in task duration and reduce peak execution memory to 1/12 ~ 1/50{color} vs current master version, especially the memory reducing, in this shuffle mode Spark can support more data size in less memory usage. The detail doc attached in this jira named "SortShuffleReader on Spark 2.x". I know that DataSet API will have better optimization and performance, but RDD API may still useful for flexible control and old Spark/Hadoop jobs. For the better performance in ordering cases and more cost-effective memory usage, maybe this PR is still worth to merge in to master. I'll sort out current code base and give a PR soon. Any comments and trying out would be greatly appreciated. > Add MR-style (merge-sort) SortShuffleReader for sort-based shuffle > -- > > Key: SPARK-2926 > URL: https://issues.apache.org/jira/browse/SPARK-2926 > Project: Spark > Issue Type: Improvement > Components: Shuffle >Affects Versions: 1.1.0 >Reporter: Saisai Shao >Assignee: Saisai Shao > Attachments: SortBasedShuffleRead.pdf, Spark Shuffle Test > Report(contd).pdf, Spark Shuffle Test Report.pdf > > > Currently Spark has already integrated sort-based shuffle write, which > greatly improve the IO performance and reduce the memory consumption when > reducer number is very large. But for the reducer side, it still adopts the > implementation of hash-based shuffle reader, which neglects the ordering > attributes of map output data in some situations. > Here we propose a MR style sort-merge like shuffle reader for sort-based > shuffle to better improve the performance of sort-based shuffle. > Working in progress code and performance test report will be posted later > when some unit test bugs are fixed. > Any comments would be greatly appreciated. > Thanks a lot. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20928) SPIP: Continuous Processing Mode for Structured Streaming
[ https://issues.apache.org/jira/browse/SPARK-20928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16245556#comment-16245556 ] Li Yuanjian commented on SPARK-20928: - Our team discuss on the design sketch in detail, we have some ideas and questions take down below. 1. Will the Window Operation support in the Continuous Processing Mode? Even if we only consider narrow dependencies currently like the design sketch described, the exactly-once assurance may not be accomplished based on current implementation of window and watermark. 2. Should the EpochIDs aligned in the scenario of not map-only? {quote} The design can also work with blocking operators, although it’d require the blocking operators to ensure epoch markers from all the partitions have been received by the operator before moving forward to commit. {quote} is the `blocking operators` means 'operator need shuffle'? We think that only the operator has ordering relation(like window\mapState\sortByKey) need the EpochIDs aligned, others(like groupBy) doesn't. 3. Also the scenario of many to one(like shuffle and window), should we use a new EpochID in shuffle read stage and window slide out trigger, or use the original EpochIDs batch? > SPIP: Continuous Processing Mode for Structured Streaming > - > > Key: SPARK-20928 > URL: https://issues.apache.org/jira/browse/SPARK-20928 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.2.0 >Reporter: Michael Armbrust > Labels: SPIP > Attachments: Continuous Processing in Structured Streaming Design > Sketch.pdf > > > Given the current Source API, the minimum possible latency for any record is > bounded by the amount of time that it takes to launch a task. This > limitation is a result of the fact that {{getBatch}} requires us to know both > the starting and the ending offset, before any tasks are launched. In the > worst case, the end-to-end latency is actually closer to the average batch > time + task launching time. > For applications where latency is more important than exactly-once output > however, it would be useful if processing could happen continuously. This > would allow us to achieve fully pipelined reading and writing from sources > such as Kafka. This kind of architecture would make it possible to process > records with end-to-end latencies on the order of 1 ms, rather than the > 10-100ms that is possible today. > One possible architecture here would be to change the Source API to look like > the following rough sketch: > {code} > trait Epoch { > def data: DataFrame > /** The exclusive starting position for `data`. */ > def startOffset: Offset > /** The inclusive ending position for `data`. Incrementally updated > during processing, but not complete until execution of the query plan in > `data` is finished. */ > def endOffset: Offset > } > def getBatch(startOffset: Option[Offset], endOffset: Option[Offset], > limits: Limits): Epoch > {code} > The above would allow us to build an alternative implementation of > {{StreamExecution}} that processes continuously with much lower latency and > only stops processing when needing to reconfigure the stream (either due to a > failure or a user requested change in parallelism. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22074) Task killed by other attempt task should not be resubmitted
[ https://issues.apache.org/jira/browse/SPARK-22074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16182095#comment-16182095 ] Li Yuanjian commented on SPARK-22074: - Yes, that's right. > Task killed by other attempt task should not be resubmitted > --- > > Key: SPARK-22074 > URL: https://issues.apache.org/jira/browse/SPARK-22074 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.1.0, 2.2.0 >Reporter: Li Yuanjian > > When a task killed by other task attempt, the task still resubmitted while > its executor lost. There is a certain probability caused the stage hanging > forever because of the unnecessary resubmit(see the scenario description > below). Although the patch https://issues.apache.org/jira/browse/SPARK-13931 > can resolve the hanging problem(thx [~GavinGavinNo1] :) ), but the > unnecessary resubmit should abandon. > Detail scenario description: > 1. A ShuffleMapStage has many tasks, some of them finished successfully > 2. An Executor Lost happened, this will trigger a new TaskSet resubmitted, > includes all missing partitions. > 3. Before the resubmitted TaskSet completed, another executor which only > include the task killed by other attempt lost, trigger the Resubmitted Event, > current stage's pendingPartitions is not empty. > 4. Resubmitted TaskSet end, shuffleMapStage.isAvailable == true, but > pendingPartitions is not empty, never step into submitWaitingChildStages. > Leave the key logs of this scenario below: > {noformat} > 393332:17/09/11 13:45:24 [dag-scheduler-event-loop] INFO DAGScheduler: > Submitting 120 missing tasks from ShuffleMapStage 1046 > (MapPartitionsRDD[5321] at rdd at AFDEntry.scala:116) > 39:17/09/11 13:45:24 [dag-scheduler-event-loop] INFO > YarnClusterScheduler: Adding task set 1046.0 with 120 tasks > 408766:17/09/11 13:46:25 [dispatcher-event-loop-5] INFO TaskSetManager: > Starting task 66.0 in stage 1046.0 (TID 110761, hidden-baidu-host.baidu.com, > executor 15, partition 66, PROCESS_LOCAL, 6237 bytes) > [1] Executor 15 lost, task 66.0 and 90.0 on it > 410532:17/09/11 13:46:32 [dispatcher-event-loop-47] INFO > YarnSchedulerBackend$YarnDriverEndpoint: Disabling executor 15. > 410900:17/09/11 13:46:33 [dispatcher-event-loop-34] INFO TaskSetManager: > Starting task 66.1 in stage 1046.0 (TID 111400, hidden-baidu-host.baidu.com, > executor 70, partition 66, PROCESS_LOCAL, 6237 bytes) > [2] Task 66.0 killed by 66.1 > 411315:17/09/11 13:46:37 [task-result-getter-2] INFO TaskSetManager: Killing > attempt 0 for task 66.0 in stage 1046.0 (TID 110761) on > hidden-baidu-host.baidu.com as the attempt 1 succeeded on > hidden-baidu-host.baidu.com > 411316:17/09/11 13:46:37 [task-result-getter-2] INFO TaskSetManager: Finished > task 66.1 in stage 1046.0 (TID 111400) in 3545 ms on > hidden-baidu-host.baidu.com (executor 70) (115/120) > [3] Executor 7 lost, task 0.0 72.0 7.0 on it > 411390:17/09/11 13:46:37 [dispatcher-event-loop-24] INFO > YarnSchedulerBackend$YarnDriverEndpoint: Disabling executor 7. > 416014:17/09/11 13:46:59 [dag-scheduler-event-loop] INFO DAGScheduler: > ShuffleMapStage 1046 (rdd at AFDEntry.scala:116) finished in 94.577 s > [4] ShuffleMapStage 1046.0 finished, missing partition trigger resubmitted > 1046.1 > 416019:17/09/1 13:46:59 [dag-scheduler-event- oop] INFO DAGScheduler: > Resubmitting ShuffleMapStage 1046 (rdd at AFDEntry.scala:116) because some of > its tasks had failed: 0, 72, 79 > 416020:17/09/11 13:46:59 [dag-scheduler-event-loop] INFO DAGScheduler: > Submitting ShuffleMapStage 1046 (MapPartitionsRDD[5321] at rdd at > AFDEntry.scala:116), which has no missing parents > 416030:17/09/11 13:46:59 [dag-scheduler-event-loop] INFO DAGScheduler: > Submitting 3 missing tasks from ShuffleMapStage 1046 (MapPartitionsRDD[5321] > at rdd at AFDEntry.scala:116) > 416032:17/09/11 13:46:59 [dag-scheduler-event-loop] INFO > YarnClusterScheduler: Adding task set 1046.1 with 3 tasks > 416034:17/09/11 13:46:59 [dispatcher-event-loop-21] INFO TaskSetManager: > Starting task 0.0 in stage 1046.1 (TID 112788, hidden-baidu-host.baidu.com, > executor 37, partition 0, PROCESS_LOCAL, 6237 bytes) > 416037:17/09/11 13:46:59 [dispatcher-event-loop-23] INFO TaskSetManager: > Starting task 1.0 in stage 1046.1 (TID 112789, > yq01-inf-nmg01-spark03-20160817113538.yq01.baidu.com, executor 69, partition > 72, PROCESS_LOCAL, 6237 bytes) > 416039:17/09/11 13:46:59 [dispatcher-event-loop-23] INFO TaskSetManager: > Starting task 2.0 in stage 1046.1 (TID 112790, hidden-baidu-host.baidu.com, > executor 26, partition 79, PROCESS_LOCAL, 6237 bytes) > [5] ShuffleMapStage 1046.1 still running, the attempted task killed by other > trigger the Resubmitted event > 416646:17/09/11 13:47:01 [dispatcher-event-loop-2
[jira] [Comment Edited] (SPARK-22074) Task killed by other attempt task should not be resubmitted
[ https://issues.apache.org/jira/browse/SPARK-22074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16181943#comment-16181943 ] Li Yuanjian edited comment on SPARK-22074 at 9/27/17 4:46 AM: -- Hi [~jerryshao] saisai, the 66.0 resubmitted because of its executor lost during 1046.1 running. I also reproduce this in the UT(https://github.com/apache/spark/pull/19287/files#diff-8425e96a6c100b5f368b8e520ad80068R748) added in my patch and add detailed scenario description in comment, it will fail without the changes in this PR and will pass conversely. Could you help me check the UT recreate the scenario right? Thanks a lot. :) was (Author: xuanyuan): Hi [~jerryshao] saisai, the 66.0 resubmitted because of its executor lost during 1046.1 running. I also reproduce this in the UT added in my patch and add detailed scenario description in comment, it will fail without the changes in this PR and will pass conversely. Could you help me check the UT recreate the scenario right? Thanks a lot. :) > Task killed by other attempt task should not be resubmitted > --- > > Key: SPARK-22074 > URL: https://issues.apache.org/jira/browse/SPARK-22074 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.1.0, 2.2.0 >Reporter: Li Yuanjian > > When a task killed by other task attempt, the task still resubmitted while > its executor lost. There is a certain probability caused the stage hanging > forever because of the unnecessary resubmit(see the scenario description > below). Although the patch https://issues.apache.org/jira/browse/SPARK-13931 > can resolve the hanging problem(thx [~GavinGavinNo1] :) ), but the > unnecessary resubmit should abandon. > Detail scenario description: > 1. A ShuffleMapStage has many tasks, some of them finished successfully > 2. An Executor Lost happened, this will trigger a new TaskSet resubmitted, > includes all missing partitions. > 3. Before the resubmitted TaskSet completed, another executor which only > include the task killed by other attempt lost, trigger the Resubmitted Event, > current stage's pendingPartitions is not empty. > 4. Resubmitted TaskSet end, shuffleMapStage.isAvailable == true, but > pendingPartitions is not empty, never step into submitWaitingChildStages. > Leave the key logs of this scenario below: > {noformat} > 393332:17/09/11 13:45:24 [dag-scheduler-event-loop] INFO DAGScheduler: > Submitting 120 missing tasks from ShuffleMapStage 1046 > (MapPartitionsRDD[5321] at rdd at AFDEntry.scala:116) > 39:17/09/11 13:45:24 [dag-scheduler-event-loop] INFO > YarnClusterScheduler: Adding task set 1046.0 with 120 tasks > 408766:17/09/11 13:46:25 [dispatcher-event-loop-5] INFO TaskSetManager: > Starting task 66.0 in stage 1046.0 (TID 110761, hidden-baidu-host.baidu.com, > executor 15, partition 66, PROCESS_LOCAL, 6237 bytes) > [1] Executor 15 lost, task 66.0 and 90.0 on it > 410532:17/09/11 13:46:32 [dispatcher-event-loop-47] INFO > YarnSchedulerBackend$YarnDriverEndpoint: Disabling executor 15. > 410900:17/09/11 13:46:33 [dispatcher-event-loop-34] INFO TaskSetManager: > Starting task 66.1 in stage 1046.0 (TID 111400, hidden-baidu-host.baidu.com, > executor 70, partition 66, PROCESS_LOCAL, 6237 bytes) > [2] Task 66.0 killed by 66.1 > 411315:17/09/11 13:46:37 [task-result-getter-2] INFO TaskSetManager: Killing > attempt 0 for task 66.0 in stage 1046.0 (TID 110761) on > hidden-baidu-host.baidu.com as the attempt 1 succeeded on > hidden-baidu-host.baidu.com > 411316:17/09/11 13:46:37 [task-result-getter-2] INFO TaskSetManager: Finished > task 66.1 in stage 1046.0 (TID 111400) in 3545 ms on > hidden-baidu-host.baidu.com (executor 70) (115/120) > [3] Executor 7 lost, task 0.0 72.0 7.0 on it > 411390:17/09/11 13:46:37 [dispatcher-event-loop-24] INFO > YarnSchedulerBackend$YarnDriverEndpoint: Disabling executor 7. > 416014:17/09/11 13:46:59 [dag-scheduler-event-loop] INFO DAGScheduler: > ShuffleMapStage 1046 (rdd at AFDEntry.scala:116) finished in 94.577 s > [4] ShuffleMapStage 1046.0 finished, missing partition trigger resubmitted > 1046.1 > 416019:17/09/1 13:46:59 [dag-scheduler-event- oop] INFO DAGScheduler: > Resubmitting ShuffleMapStage 1046 (rdd at AFDEntry.scala:116) because some of > its tasks had failed: 0, 72, 79 > 416020:17/09/11 13:46:59 [dag-scheduler-event-loop] INFO DAGScheduler: > Submitting ShuffleMapStage 1046 (MapPartitionsRDD[5321] at rdd at > AFDEntry.scala:116), which has no missing parents > 416030:17/09/11 13:46:59 [dag-scheduler-event-loop] INFO DAGScheduler: > Submitting 3 missing tasks from ShuffleMapStage 1046 (MapPartitionsRDD[5321] > at rdd at AFDEntry.scala:116) > 416032:17/09/11 13:46:59 [dag-scheduler-event-loop] INFO > YarnClusterScheduler: Adding task se
[jira] [Commented] (SPARK-22074) Task killed by other attempt task should not be resubmitted
[ https://issues.apache.org/jira/browse/SPARK-22074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16181943#comment-16181943 ] Li Yuanjian commented on SPARK-22074: - Hi [~jerryshao] saisai, the 66.0 resubmitted because of its executor lost during 1046.1 running. I also reproduce this in the UT added in my patch and add detailed scenario description in comment, it will fail without the changes in this PR and will pass conversely. Could you help me check the UT recreate the scenario right? Thanks a lot. :) > Task killed by other attempt task should not be resubmitted > --- > > Key: SPARK-22074 > URL: https://issues.apache.org/jira/browse/SPARK-22074 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.1.0, 2.2.0 >Reporter: Li Yuanjian > > When a task killed by other task attempt, the task still resubmitted while > its executor lost. There is a certain probability caused the stage hanging > forever because of the unnecessary resubmit(see the scenario description > below). Although the patch https://issues.apache.org/jira/browse/SPARK-13931 > can resolve the hanging problem(thx [~GavinGavinNo1] :) ), but the > unnecessary resubmit should abandon. > Detail scenario description: > 1. A ShuffleMapStage has many tasks, some of them finished successfully > 2. An Executor Lost happened, this will trigger a new TaskSet resubmitted, > includes all missing partitions. > 3. Before the resubmitted TaskSet completed, another executor which only > include the task killed by other attempt lost, trigger the Resubmitted Event, > current stage's pendingPartitions is not empty. > 4. Resubmitted TaskSet end, shuffleMapStage.isAvailable == true, but > pendingPartitions is not empty, never step into submitWaitingChildStages. > Leave the key logs of this scenario below: > {noformat} > 393332:17/09/11 13:45:24 [dag-scheduler-event-loop] INFO DAGScheduler: > Submitting 120 missing tasks from ShuffleMapStage 1046 > (MapPartitionsRDD[5321] at rdd at AFDEntry.scala:116) > 39:17/09/11 13:45:24 [dag-scheduler-event-loop] INFO > YarnClusterScheduler: Adding task set 1046.0 with 120 tasks > 408766:17/09/11 13:46:25 [dispatcher-event-loop-5] INFO TaskSetManager: > Starting task 66.0 in stage 1046.0 (TID 110761, hidden-baidu-host.baidu.com, > executor 15, partition 66, PROCESS_LOCAL, 6237 bytes) > [1] Executor 15 lost, task 66.0 and 90.0 on it > 410532:17/09/11 13:46:32 [dispatcher-event-loop-47] INFO > YarnSchedulerBackend$YarnDriverEndpoint: Disabling executor 15. > 410900:17/09/11 13:46:33 [dispatcher-event-loop-34] INFO TaskSetManager: > Starting task 66.1 in stage 1046.0 (TID 111400, hidden-baidu-host.baidu.com, > executor 70, partition 66, PROCESS_LOCAL, 6237 bytes) > [2] Task 66.0 killed by 66.1 > 411315:17/09/11 13:46:37 [task-result-getter-2] INFO TaskSetManager: Killing > attempt 0 for task 66.0 in stage 1046.0 (TID 110761) on > hidden-baidu-host.baidu.com as the attempt 1 succeeded on > hidden-baidu-host.baidu.com > 411316:17/09/11 13:46:37 [task-result-getter-2] INFO TaskSetManager: Finished > task 66.1 in stage 1046.0 (TID 111400) in 3545 ms on > hidden-baidu-host.baidu.com (executor 70) (115/120) > [3] Executor 7 lost, task 0.0 72.0 7.0 on it > 411390:17/09/11 13:46:37 [dispatcher-event-loop-24] INFO > YarnSchedulerBackend$YarnDriverEndpoint: Disabling executor 7. > 416014:17/09/11 13:46:59 [dag-scheduler-event-loop] INFO DAGScheduler: > ShuffleMapStage 1046 (rdd at AFDEntry.scala:116) finished in 94.577 s > [4] ShuffleMapStage 1046.0 finished, missing partition trigger resubmitted > 1046.1 > 416019:17/09/1 13:46:59 [dag-scheduler-event- oop] INFO DAGScheduler: > Resubmitting ShuffleMapStage 1046 (rdd at AFDEntry.scala:116) because some of > its tasks had failed: 0, 72, 79 > 416020:17/09/11 13:46:59 [dag-scheduler-event-loop] INFO DAGScheduler: > Submitting ShuffleMapStage 1046 (MapPartitionsRDD[5321] at rdd at > AFDEntry.scala:116), which has no missing parents > 416030:17/09/11 13:46:59 [dag-scheduler-event-loop] INFO DAGScheduler: > Submitting 3 missing tasks from ShuffleMapStage 1046 (MapPartitionsRDD[5321] > at rdd at AFDEntry.scala:116) > 416032:17/09/11 13:46:59 [dag-scheduler-event-loop] INFO > YarnClusterScheduler: Adding task set 1046.1 with 3 tasks > 416034:17/09/11 13:46:59 [dispatcher-event-loop-21] INFO TaskSetManager: > Starting task 0.0 in stage 1046.1 (TID 112788, hidden-baidu-host.baidu.com, > executor 37, partition 0, PROCESS_LOCAL, 6237 bytes) > 416037:17/09/11 13:46:59 [dispatcher-event-loop-23] INFO TaskSetManager: > Starting task 1.0 in stage 1046.1 (TID 112789, > yq01-inf-nmg01-spark03-20160817113538.yq01.baidu.com, executor 69, partition > 72, PROCESS_LOCAL, 6237 bytes) > 416039:17/09/11 13:46:59 [dispatcher-event-
[jira] [Commented] (SPARK-22074) Task killed by other attempt task should not be resubmitted
[ https://issues.apache.org/jira/browse/SPARK-22074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16180652#comment-16180652 ] Li Yuanjian commented on SPARK-22074: - Hi [~jerryshao], thanks for you comment. In my scenario, the 66.0 is truly killed by 66.1, the root case cause 1046.0 fail to finish is that the resubmitted event of task 66.0(killed by 66.1before) reached while 1046.1 running. > Task killed by other attempt task should not be resubmitted > --- > > Key: SPARK-22074 > URL: https://issues.apache.org/jira/browse/SPARK-22074 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.1.0, 2.2.0 >Reporter: Li Yuanjian > > When a task killed by other task attempt, the task still resubmitted while > its executor lost. There is a certain probability caused the stage hanging > forever because of the unnecessary resubmit(see the scenario description > below). Although the patch https://issues.apache.org/jira/browse/SPARK-13931 > can resolve the hanging problem(thx [~GavinGavinNo1] :) ), but the > unnecessary resubmit should abandon. > Detail scenario description: > 1. A ShuffleMapStage has many tasks, some of them finished successfully > 2. An Executor Lost happened, this will trigger a new TaskSet resubmitted, > includes all missing partitions. > 3. Before the resubmitted TaskSet completed, another executor which only > include the task killed by other attempt lost, trigger the Resubmitted Event, > current stage's pendingPartitions is not empty. > 4. Resubmitted TaskSet end, shuffleMapStage.isAvailable == true, but > pendingPartitions is not empty, never step into submitWaitingChildStages. > Leave the key logs of this scenario below: > {noformat} > 393332:17/09/11 13:45:24 [dag-scheduler-event-loop] INFO DAGScheduler: > Submitting 120 missing tasks from ShuffleMapStage 1046 > (MapPartitionsRDD[5321] at rdd at AFDEntry.scala:116) > 39:17/09/11 13:45:24 [dag-scheduler-event-loop] INFO > YarnClusterScheduler: Adding task set 1046.0 with 120 tasks > 408766:17/09/11 13:46:25 [dispatcher-event-loop-5] INFO TaskSetManager: > Starting task 66.0 in stage 1046.0 (TID 110761, hidden-baidu-host.baidu.com, > executor 15, partition 66, PROCESS_LOCAL, 6237 bytes) > [1] Executor 15 lost, task 66.0 and 90.0 on it > 410532:17/09/11 13:46:32 [dispatcher-event-loop-47] INFO > YarnSchedulerBackend$YarnDriverEndpoint: Disabling executor 15. > 410900:17/09/11 13:46:33 [dispatcher-event-loop-34] INFO TaskSetManager: > Starting task 66.1 in stage 1046.0 (TID 111400, hidden-baidu-host.baidu.com, > executor 70, partition 66, PROCESS_LOCAL, 6237 bytes) > [2] Task 66.0 killed by 66.1 > 411315:17/09/11 13:46:37 [task-result-getter-2] INFO TaskSetManager: Killing > attempt 0 for task 66.0 in stage 1046.0 (TID 110761) on > hidden-baidu-host.baidu.com as the attempt 1 succeeded on > hidden-baidu-host.baidu.com > 411316:17/09/11 13:46:37 [task-result-getter-2] INFO TaskSetManager: Finished > task 66.1 in stage 1046.0 (TID 111400) in 3545 ms on > hidden-baidu-host.baidu.com (executor 70) (115/120) > [3] Executor 7 lost, task 0.0 72.0 7.0 on it > 411390:17/09/11 13:46:37 [dispatcher-event-loop-24] INFO > YarnSchedulerBackend$YarnDriverEndpoint: Disabling executor 7. > 416014:17/09/11 13:46:59 [dag-scheduler-event-loop] INFO DAGScheduler: > ShuffleMapStage 1046 (rdd at AFDEntry.scala:116) finished in 94.577 s > [4] ShuffleMapStage 1046.0 finished, missing partition trigger resubmitted > 1046.1 > 416019:17/09/1 13:46:59 [dag-scheduler-event- oop] INFO DAGScheduler: > Resubmitting ShuffleMapStage 1046 (rdd at AFDEntry.scala:116) because some of > its tasks had failed: 0, 72, 79 > 416020:17/09/11 13:46:59 [dag-scheduler-event-loop] INFO DAGScheduler: > Submitting ShuffleMapStage 1046 (MapPartitionsRDD[5321] at rdd at > AFDEntry.scala:116), which has no missing parents > 416030:17/09/11 13:46:59 [dag-scheduler-event-loop] INFO DAGScheduler: > Submitting 3 missing tasks from ShuffleMapStage 1046 (MapPartitionsRDD[5321] > at rdd at AFDEntry.scala:116) > 416032:17/09/11 13:46:59 [dag-scheduler-event-loop] INFO > YarnClusterScheduler: Adding task set 1046.1 with 3 tasks > 416034:17/09/11 13:46:59 [dispatcher-event-loop-21] INFO TaskSetManager: > Starting task 0.0 in stage 1046.1 (TID 112788, hidden-baidu-host.baidu.com, > executor 37, partition 0, PROCESS_LOCAL, 6237 bytes) > 416037:17/09/11 13:46:59 [dispatcher-event-loop-23] INFO TaskSetManager: > Starting task 1.0 in stage 1046.1 (TID 112789, > yq01-inf-nmg01-spark03-20160817113538.yq01.baidu.com, executor 69, partition > 72, PROCESS_LOCAL, 6237 bytes) > 416039:17/09/11 13:46:59 [dispatcher-event-loop-23] INFO TaskSetManager: > Starting task 2.0 in stage 1046.1 (TID 112790, hidden-baidu-host.baidu.com, > execu
[jira] [Updated] (SPARK-22074) Task killed by other attempt task should not be resubmitted
[ https://issues.apache.org/jira/browse/SPARK-22074?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Li Yuanjian updated SPARK-22074: Description: When a task killed by other task attempt, the task still resubmitted while its executor lost. There is a certain probability caused the stage hanging forever because of the unnecessary resubmit(see the scenario description below). Although the patch https://issues.apache.org/jira/browse/SPARK-13931 can resolve the hanging problem(thx [~GavinGavinNo1] :) ), but the unnecessary resubmit should abandon. Detail scenario description: 1. A ShuffleMapStage has many tasks, some of them finished successfully 2. An Executor Lost happened, this will trigger a new TaskSet resubmitted, includes all missing partitions. 3. Before the resubmitted TaskSet completed, another executor which only include the task killed by other attempt lost, trigger the Resubmitted Event, current stage's pendingPartitions is not empty. 4. Resubmitted TaskSet end, shuffleMapStage.isAvailable == true, but pendingPartitions is not empty, never step into submitWaitingChildStages. Leave the key logs of this scenario below: {noformat} 393332:17/09/11 13:45:24 [dag-scheduler-event-loop] INFO DAGScheduler: Submitting 120 missing tasks from ShuffleMapStage 1046 (MapPartitionsRDD[5321] at rdd at AFDEntry.scala:116) 39:17/09/11 13:45:24 [dag-scheduler-event-loop] INFO YarnClusterScheduler: Adding task set 1046.0 with 120 tasks 408766:17/09/11 13:46:25 [dispatcher-event-loop-5] INFO TaskSetManager: Starting task 66.0 in stage 1046.0 (TID 110761, hidden-baidu-host.baidu.com, executor 15, partition 66, PROCESS_LOCAL, 6237 bytes) [1] Executor 15 lost, task 66.0 and 90.0 on it 410532:17/09/11 13:46:32 [dispatcher-event-loop-47] INFO YarnSchedulerBackend$YarnDriverEndpoint: Disabling executor 15. 410900:17/09/11 13:46:33 [dispatcher-event-loop-34] INFO TaskSetManager: Starting task 66.1 in stage 1046.0 (TID 111400, hidden-baidu-host.baidu.com, executor 70, partition 66, PROCESS_LOCAL, 6237 bytes) [2] Task 66.0 killed by 66.1 411315:17/09/11 13:46:37 [task-result-getter-2] INFO TaskSetManager: Killing attempt 0 for task 66.0 in stage 1046.0 (TID 110761) on hidden-baidu-host.baidu.com as the attempt 1 succeeded on hidden-baidu-host.baidu.com 411316:17/09/11 13:46:37 [task-result-getter-2] INFO TaskSetManager: Finished task 66.1 in stage 1046.0 (TID 111400) in 3545 ms on hidden-baidu-host.baidu.com (executor 70) (115/120) [3] Executor 7 lost, task 0.0 72.0 7.0 on it 411390:17/09/11 13:46:37 [dispatcher-event-loop-24] INFO YarnSchedulerBackend$YarnDriverEndpoint: Disabling executor 7. 416014:17/09/11 13:46:59 [dag-scheduler-event-loop] INFO DAGScheduler: ShuffleMapStage 1046 (rdd at AFDEntry.scala:116) finished in 94.577 s [4] ShuffleMapStage 1046.0 finished, missing partition trigger resubmitted 1046.1 416019:17/09/1 13:46:59 [dag-scheduler-event- oop] INFO DAGScheduler: Resubmitting ShuffleMapStage 1046 (rdd at AFDEntry.scala:116) because some of its tasks had failed: 0, 72, 79 416020:17/09/11 13:46:59 [dag-scheduler-event-loop] INFO DAGScheduler: Submitting ShuffleMapStage 1046 (MapPartitionsRDD[5321] at rdd at AFDEntry.scala:116), which has no missing parents 416030:17/09/11 13:46:59 [dag-scheduler-event-loop] INFO DAGScheduler: Submitting 3 missing tasks from ShuffleMapStage 1046 (MapPartitionsRDD[5321] at rdd at AFDEntry.scala:116) 416032:17/09/11 13:46:59 [dag-scheduler-event-loop] INFO YarnClusterScheduler: Adding task set 1046.1 with 3 tasks 416034:17/09/11 13:46:59 [dispatcher-event-loop-21] INFO TaskSetManager: Starting task 0.0 in stage 1046.1 (TID 112788, hidden-baidu-host.baidu.com, executor 37, partition 0, PROCESS_LOCAL, 6237 bytes) 416037:17/09/11 13:46:59 [dispatcher-event-loop-23] INFO TaskSetManager: Starting task 1.0 in stage 1046.1 (TID 112789, yq01-inf-nmg01-spark03-20160817113538.yq01.baidu.com, executor 69, partition 72, PROCESS_LOCAL, 6237 bytes) 416039:17/09/11 13:46:59 [dispatcher-event-loop-23] INFO TaskSetManager: Starting task 2.0 in stage 1046.1 (TID 112790, hidden-baidu-host.baidu.com, executor 26, partition 79, PROCESS_LOCAL, 6237 bytes) [5] ShuffleMapStage 1046.1 still running, the attempted task killed by other trigger the Resubmitted event 416646:17/09/11 13:47:01 [dispatcher-event-loop-26] WARN TaskSetManager: Lost task 66.0 in stage 1046.0 (TID 110761, hidden-baidu-host.baidu.com, executor 15): ExecutorLostFailure (executor 15 exited caused by one of the running tasks) Reason: Container marked as failed: container_1502719603300_158941_01_104857616 on host: hidden-baidu-host.baidu.com. Exit status: -100. Diagnostics: Container released on a *lost* node 416647:17/09/11 13:47:01 [dag-scheduler-event-loop] INFO DAGScheduler: Resubmitted ShuffleMapTask(1046, 66), so marking it as still running 416648:17/09/11 13:47:01 [dispatcher-event-loop-26] WARN TaskSet
[jira] [Created] (SPARK-22074) Task killed by other attempt task should not be resubmitted
Li Yuanjian created SPARK-22074: --- Summary: Task killed by other attempt task should not be resubmitted Key: SPARK-22074 URL: https://issues.apache.org/jira/browse/SPARK-22074 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 2.2.0, 2.1.0 Reporter: Li Yuanjian When a task killed by other task attempt, the task still resubmitted while its executor lost. There is a certain probability caused the stage hanging forever because of the unnecessary resubmit(see the scenario description below). Although the patch https://issues.apache.org/jira/browse/SPARK-13931 can resolve the hanging problem, but the unnecessary resubmit should abandon. Detail scenario description: 1. A ShuffleMapStage has many tasks, some of them finished successfully 2. An Executor Lost happened, this will trigger a new TaskSet resubmitted, includes all missing partitions. 3. Before the resubmitted TaskSet completed, another executor which only include the task killed by other attempt lost, trigger the Resubmitted Event, current stage's pendingPartitions is not empty. 4. Resubmitted TaskSet end, shuffleMapStage.isAvailable == true, but pendingPartitions is not empty, never step into submitWaitingChildStages. Leave the key logs of this scenario below: {noformat} 393332:17/09/11 13:45:24 [dag-scheduler-event-loop] INFO DAGScheduler: Submitting 120 missing tasks from ShuffleMapStage 1046 (MapPartitionsRDD[5321] at rdd at AFDEntry.scala:116) 39:17/09/11 13:45:24 [dag-scheduler-event-loop] INFO YarnClusterScheduler: Adding task set 1046.0 with 120 tasks 408766:17/09/11 13:46:25 [dispatcher-event-loop-5] INFO TaskSetManager: Starting task 66.0 in stage 1046.0 (TID 110761, hidden-baidu-host.baidu.com, executor 15, partition 66, PROCESS_LOCAL, 6237 bytes) [1] Executor 15 lost, task 66.0 and 90.0 on it 410532:17/09/11 13:46:32 [dispatcher-event-loop-47] INFO YarnSchedulerBackend$YarnDriverEndpoint: Disabling executor 15. 410900:17/09/11 13:46:33 [dispatcher-event-loop-34] INFO TaskSetManager: Starting task 66.1 in stage 1046.0 (TID 111400, hidden-baidu-host.baidu.com, executor 70, partition 66, PROCESS_LOCAL, 6237 bytes) [2] Task 66.0 killed by 66.1 411315:17/09/11 13:46:37 [task-result-getter-2] INFO TaskSetManager: Killing attempt 0 for task 66.0 in stage 1046.0 (TID 110761) on hidden-baidu-host.baidu.com as the attempt 1 succeeded on hidden-baidu-host.baidu.com 411316:17/09/11 13:46:37 [task-result-getter-2] INFO TaskSetManager: Finished task 66.1 in stage 1046.0 (TID 111400) in 3545 ms on hidden-baidu-host.baidu.com (executor 70) (115/120) [3] Executor 7 lost, task 0.0 72.0 7.0 on it 411390:17/09/11 13:46:37 [dispatcher-event-loop-24] INFO YarnSchedulerBackend$YarnDriverEndpoint: Disabling executor 7. 416014:17/09/11 13:46:59 [dag-scheduler-event-loop] INFO DAGScheduler: ShuffleMapStage 1046 (rdd at AFDEntry.scala:116) finished in 94.577 s [4] ShuffleMapStage 1046.0 finished, missing partition trigger resubmitted 1046.1 416019:17/09/1 13:46:59 [dag-scheduler-event- oop] INFO DAGScheduler: Resubmitting ShuffleMapStage 1046 (rdd at AFDEntry.scala:116) because some of its tasks had failed: 0, 72, 79 416020:17/09/11 13:46:59 [dag-scheduler-event-loop] INFO DAGScheduler: Submitting ShuffleMapStage 1046 (MapPartitionsRDD[5321] at rdd at AFDEntry.scala:116), which has no missing parents 416030:17/09/11 13:46:59 [dag-scheduler-event-loop] INFO DAGScheduler: Submitting 3 missing tasks from ShuffleMapStage 1046 (MapPartitionsRDD[5321] at rdd at AFDEntry.scala:116) 416032:17/09/11 13:46:59 [dag-scheduler-event-loop] INFO YarnClusterScheduler: Adding task set 1046.1 with 3 tasks 416034:17/09/11 13:46:59 [dispatcher-event-loop-21] INFO TaskSetManager: Starting task 0.0 in stage 1046.1 (TID 112788, hidden-baidu-host.baidu.com, executor 37, partition 0, PROCESS_LOCAL, 6237 bytes) 416037:17/09/11 13:46:59 [dispatcher-event-loop-23] INFO TaskSetManager: Starting task 1.0 in stage 1046.1 (TID 112789, yq01-inf-nmg01-spark03-20160817113538.yq01.baidu.com, executor 69, partition 72, PROCESS_LOCAL, 6237 bytes) 416039:17/09/11 13:46:59 [dispatcher-event-loop-23] INFO TaskSetManager: Starting task 2.0 in stage 1046.1 (TID 112790, hidden-baidu-host.baidu.com, executor 26, partition 79, PROCESS_LOCAL, 6237 bytes) [5] ShuffleMapStage 1046.1 still running, the attempted task killed by other trigger the Resubmitted event 416646:17/09/11 13:47:01 [dispatcher-event-loop-26] WARN TaskSetManager: Lost task 66.0 in stage 1046.0 (TID 110761, hidden-baidu-host.baidu.com, executor 15): ExecutorLostFailure (executor 15 exited caused by one of the running tasks) Reason: Container marked as failed: container_1502719603300_158941_01_104857616 on host: hidden-baidu-host.baidu.com. Exit status: -100. Diagnostics: Container released on a *lost* node 416647:17/09/11 13:47:01 [da
[jira] [Commented] (SPARK-18838) High latency of event processing for large jobs
[ https://issues.apache.org/jira/browse/SPARK-18838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16119265#comment-16119265 ] Li Yuanjian commented on SPARK-18838: - I'm facing the same problem with [~milesc] {quote} We do not use dynamic allocation, and our applications frequently hang completely after seeing this log message, not just the UI. {quote} Should we solve these problems by dividing in 2 parts. The first is we should ensure not to drop the event and the second is like discussion above, improve the performance by several ways. We try to do the first in SPARK-21560 and the corresponding PR is testing in our pre-online env, do you have any suggestions for that? [~vanzin] Thanks. > High latency of event processing for large jobs > --- > > Key: SPARK-18838 > URL: https://issues.apache.org/jira/browse/SPARK-18838 > Project: Spark > Issue Type: Improvement >Affects Versions: 2.0.0 >Reporter: Sital Kedia > Attachments: perfResults.pdf, SparkListernerComputeTime.xlsx > > > Currently we are observing the issue of very high event processing delay in > driver's `ListenerBus` for large jobs with many tasks. Many critical > component of the scheduler like `ExecutorAllocationManager`, > `HeartbeatReceiver` depend on the `ListenerBus` events and this delay might > hurt the job performance significantly or even fail the job. For example, a > significant delay in receiving the `SparkListenerTaskStart` might cause > `ExecutorAllocationManager` manager to mistakenly remove an executor which is > not idle. > The problem is that the event processor in `ListenerBus` is a single thread > which loops through all the Listeners for each event and processes each event > synchronously > https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala#L94. > This single threaded processor often becomes the bottleneck for large jobs. > Also, if one of the Listener is very slow, all the listeners will pay the > price of delay incurred by the slow listener. In addition to that a slow > listener can cause events to be dropped from the event queue which might be > fatal to the job. > To solve the above problems, we propose to get rid of the event queue and the > single threaded event processor. Instead each listener will have its own > dedicate single threaded executor service . When ever an event is posted, it > will be submitted to executor service of all the listeners. The Single > threaded executor service will guarantee in order processing of the events > per listener. The queue used for the executor service will be bounded to > guarantee we do not grow the memory indefinitely. The downside of this > approach is separate event queue per listener will increase the driver memory > footprint. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21560) Add hold mode for the LiveListenerBus
[ https://issues.apache.org/jira/browse/SPARK-21560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16106030#comment-16106030 ] Li Yuanjian commented on SPARK-21560: - Something wrong with the sync between PR and JIRA, the PR corresponding to this : https://github.com/apache/spark/pull/18760 > Add hold mode for the LiveListenerBus > - > > Key: SPARK-21560 > URL: https://issues.apache.org/jira/browse/SPARK-21560 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.1.0, 2.2.0 >Reporter: Li Yuanjian > > As the comments in SPARK-18838, we also face the same problem about critical > events dropped while the event queue is full. > There's no doubt that improving the performance of the processing thread is > important, whether the solution is multithreading or any others like > SPARK-20776, but maybe we still need the hold strategy when the event queue > is full, and restart after some room released. The hold strategy open or not > and the empty rate should both configurable. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-21560) Add hold mode for the LiveListenerBus
Li Yuanjian created SPARK-21560: --- Summary: Add hold mode for the LiveListenerBus Key: SPARK-21560 URL: https://issues.apache.org/jira/browse/SPARK-21560 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 2.2.0, 2.1.0 Reporter: Li Yuanjian As the comments in SPARK-18838, we also face the same problem about critical events dropped while the event queue is full. There's no doubt that improving the performance of the processing thread is important, whether the solution is multithreading or any others like SPARK-20776, but maybe we still need the hold strategy when the event queue is full, and restart after some room released. The hold strategy open or not and the empty rate should both configurable. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21435) Empty files should be skipped while write to file
[ https://issues.apache.org/jira/browse/SPARK-21435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16089442#comment-16089442 ] Li Yuanjian commented on SPARK-21435: - [~sowen] I tested the patch in our scenario and add a UT, I think it can skip empty files in Parquet. > Empty files should be skipped while write to file > - > > Key: SPARK-21435 > URL: https://issues.apache.org/jira/browse/SPARK-21435 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.2.0 >Reporter: Li Yuanjian >Priority: Minor > > Consider of this scenario, source table has many partitions and data files, > after the query filter, only a few data write to the destination dir. > In this case the destination dir or table will have many empty files or files > only have schema meta(parquet format), I know we can use coalesce but skip > the empty file may be more better in this case. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-21435) Empty files should be skipped while write to file
Li Yuanjian created SPARK-21435: --- Summary: Empty files should be skipped while write to file Key: SPARK-21435 URL: https://issues.apache.org/jira/browse/SPARK-21435 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 2.2.0 Reporter: Li Yuanjian Consider of this scenario, source table has many partitions and data files, after the query filter, only a few data write to the destination dir. In this case the destination dir or table will have many empty files or files only have schema meta(parquet format), I know we can use coalesce but skip the empty file may be more better in this case. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-20408) Get glob path in parallel to reduce resolve relation time
[ https://issues.apache.org/jira/browse/SPARK-20408?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Li Yuanjian updated SPARK-20408: Summary: Get glob path in parallel to reduce resolve relation time (was: Get glob path in parallel to boost resolve relation time) > Get glob path in parallel to reduce resolve relation time > - > > Key: SPARK-20408 > URL: https://issues.apache.org/jira/browse/SPARK-20408 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.1.0 >Reporter: Li Yuanjian > Fix For: 2.2.0 > > > The datasource read from a path with wildcard like below will cause a long > time waiting(each star may represent 100~1000 file or dir), especially in a > cross region env, driver and hdfs in different region, the drawback will > enlarge. > bq. spark.read.text("/log/product/201704/\*/\*/\*/\*") > Optimize strategy is same with bulkListLeafFiles in InMemoryFileIndex, get > the wildcard path in parallel. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-20408) Get glob path in parallel to boost resolve relation time
Li Yuanjian created SPARK-20408: --- Summary: Get glob path in parallel to boost resolve relation time Key: SPARK-20408 URL: https://issues.apache.org/jira/browse/SPARK-20408 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 2.1.0 Reporter: Li Yuanjian Fix For: 2.2.0 The datasource read from a path with wildcard like below will cause a long time waiting(each star may represent 100~1000 file or dir), especially in a cross region env, driver and hdfs in different region, the drawback will enlarge. bq. spark.read.text("/log/product/201704/\*/\*/\*/\*") Optimize strategy is same with bulkListLeafFiles in InMemoryFileIndex, get the wildcard path in parallel. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-18700) getCached in HiveMetastoreCatalog not thread safe cause driver OOM
[ https://issues.apache.org/jira/browse/SPARK-18700?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Li Yuanjian updated SPARK-18700: Affects Version/s: 2.1.1 > getCached in HiveMetastoreCatalog not thread safe cause driver OOM > -- > > Key: SPARK-18700 > URL: https://issues.apache.org/jira/browse/SPARK-18700 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.1, 2.0.0, 2.1.1 >Reporter: Li Yuanjian > > In our spark sql platform, each query use same HiveContext and > independent thread, new data will append to tables as new partitions every > 30min. After a new partition added to table T, we should call refreshTable to > clear T’s cache in cachedDataSourceTables to make the new partition > searchable. > For the table have more partitions and files(much bigger than > spark.sql.sources.parallelPartitionDiscovery.threshold), a new query of table > T will start a job to fetch all FileStatus in listLeafFiles function. Because > of the huge number of files, the job will run several seconds, during the > time, new queries of table T will also start new jobs to fetch FileStatus > because of the function of getCache is not thread safe. Final cause a driver > OOM. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-18700) getCached in HiveMetastoreCatalog not thread safe cause driver OOM
[ https://issues.apache.org/jira/browse/SPARK-18700?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15718580#comment-15718580 ] Li Yuanjian edited comment on SPARK-18700 at 12/9/16 4:47 AM: -- Give a PR for this, add StripedLock for each table's relation in cache, not for the whole cachedDataSourceTables. was (Author: xuanyuan): I'll add PR for this soon, add ReadWriteLock for each table's relation in cache, not for the whole cachedDataSourceTables. > getCached in HiveMetastoreCatalog not thread safe cause driver OOM > -- > > Key: SPARK-18700 > URL: https://issues.apache.org/jira/browse/SPARK-18700 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.1, 2.0.0 >Reporter: Li Yuanjian > > In our spark sql platform, each query use same HiveContext and > independent thread, new data will append to tables as new partitions every > 30min. After a new partition added to table T, we should call refreshTable to > clear T’s cache in cachedDataSourceTables to make the new partition > searchable. > For the table have more partitions and files(much bigger than > spark.sql.sources.parallelPartitionDiscovery.threshold), a new query of table > T will start a job to fetch all FileStatus in listLeafFiles function. Because > of the huge number of files, the job will run several seconds, during the > time, new queries of table T will also start new jobs to fetch FileStatus > because of the function of getCache is not thread safe. Final cause a driver > OOM. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18700) getCached in HiveMetastoreCatalog not thread safe cause driver OOM
[ https://issues.apache.org/jira/browse/SPARK-18700?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15718580#comment-15718580 ] Li Yuanjian commented on SPARK-18700: - I'll add PR for this soon, add ReadWriteLock for each table's relation in cache, not for the whole cachedDataSourceTables. > getCached in HiveMetastoreCatalog not thread safe cause driver OOM > -- > > Key: SPARK-18700 > URL: https://issues.apache.org/jira/browse/SPARK-18700 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.1, 2.0.0 >Reporter: Li Yuanjian > > In our spark sql platform, each query use same HiveContext and > independent thread, new data will append to tables as new partitions every > 30min. After a new partition added to table T, we should call refreshTable to > clear T’s cache in cachedDataSourceTables to make the new partition > searchable. > For the table have more partitions and files(much bigger than > spark.sql.sources.parallelPartitionDiscovery.threshold), a new query of table > T will start a job to fetch all FileStatus in listLeafFiles function. Because > of the huge number of files, the job will run several seconds, during the > time, new queries of table T will also start new jobs to fetch FileStatus > because of the function of getCache is not thread safe. Final cause a driver > OOM. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-18700) getCached in HiveMetastoreCatalog not thread safe cause driver OOM
[ https://issues.apache.org/jira/browse/SPARK-18700?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Li Yuanjian updated SPARK-18700: Description: In our spark sql platform, each query use same HiveContext and independent thread, new data will append to tables as new partitions every 30min. After a new partition added to table T, we should call refreshTable to clear T’s cache in cachedDataSourceTables to make the new partition searchable. For the table have more partitions and files(much bigger than spark.sql.sources.parallelPartitionDiscovery.threshold), a new query of table T will start a job to fetch all FileStatus in listLeafFiles function. Because of the huge number of files, the job will run several seconds, during the time, new queries of table T will also start new jobs to fetch FileStatus because of the function of getCache is not thread safe. Final cause a driver OOM. was: In our spark sql platform, each query use same HiveContext and independent thread, new data will append to tables as new partitions every 30min. After a new partition added to table T, we should call refreshTable to clear T’s cache in cachedDataSourceTables to make the new partition searchable. For the table have more partitions and files(much bigger than spark.sql.sources.parallelPartitionDiscovery.threshold), a new query of table T will start a job to fetch all FileStatus in listLeafFiles function. Because of the huge number of files, the job will run several seconds, during the time, new queries of table T will also start new jobs to fetch FileStatus because of the function of getCache is not thread safe. Final cause a driver OOM. > getCached in HiveMetastoreCatalog not thread safe cause driver OOM > -- > > Key: SPARK-18700 > URL: https://issues.apache.org/jira/browse/SPARK-18700 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.1, 2.0.0 >Reporter: Li Yuanjian > > In our spark sql platform, each query use same HiveContext and > independent thread, new data will append to tables as new partitions every > 30min. After a new partition added to table T, we should call refreshTable to > clear T’s cache in cachedDataSourceTables to make the new partition > searchable. > For the table have more partitions and files(much bigger than > spark.sql.sources.parallelPartitionDiscovery.threshold), a new query of table > T will start a job to fetch all FileStatus in listLeafFiles function. Because > of the huge number of files, the job will run several seconds, during the > time, new queries of table T will also start new jobs to fetch FileStatus > because of the function of getCache is not thread safe. Final cause a driver > OOM. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-18700) getCached in HiveMetastoreCatalog not thread safe cause driver OOM
Li Yuanjian created SPARK-18700: --- Summary: getCached in HiveMetastoreCatalog not thread safe cause driver OOM Key: SPARK-18700 URL: https://issues.apache.org/jira/browse/SPARK-18700 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.0.0, 1.6.1 Reporter: Li Yuanjian In our spark sql platform, each query use same HiveContext and independent thread, new data will append to tables as new partitions every 30min. After a new partition added to table T, we should call refreshTable to clear T’s cache in cachedDataSourceTables to make the new partition searchable. For the table have more partitions and files(much bigger than spark.sql.sources.parallelPartitionDiscovery.threshold), a new query of table T will start a job to fetch all FileStatus in listLeafFiles function. Because of the huge number of files, the job will run several seconds, during the time, new queries of table T will also start new jobs to fetch FileStatus because of the function of getCache is not thread safe. Final cause a driver OOM. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org