[jira] [Commented] (SPARK-10816) EventTime based sessionization

2018-10-11 Thread Li Yuanjian (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-10816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16646177#comment-16646177
 ] 

Li Yuanjian commented on SPARK-10816:
-

Thanks [~zsxwing] for your comment and discussion, great thanks [~kabhwan] for 
the comparing work and sorry for the inactivity, the team back to work in this 
week, we are also comparing our approach and try HWX's patch internal, hope we 
could solve this problem together.
{quote}If I read the codes correctly, 
[https://github.com/apache/spark/pull/22583] is [1]. 
[https://github.com/apache/spark/pull/22482] is a combination of [2] and [3] 
but still need to load all values of a key into the memory at the same time.
{quote}
Yes, our approach is [1], will do the comparing in the design doc. We firstly 
choose this approach mainly consider performance and simple code too.
{quote}Since Baidu’s patch supports Complete mode and Append mode, I ran 
benchmark with Append mode while comparing HWX’s patch with Baidu’s patch.

While Baidu’s patch cannot keep up input rate 200 (it showed max processed rows 
per second as around 130), HWX’s patch (APPEND mode) can keep the input rate 
around 23000.

(Initial input rate was 1000 but Baidu’s patch got very slowed with 
consistently showing "Reached spill threshold of 4096 rows, switching to 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter".)
{quote}
As 
[discussion|https://docs.google.com/document/d/1hdh6GNLzprzlSJDDa4UKMyNQ9-u_H-CDpEtvtggxCL0/edit?disco=COOWOas],
 the bug of useless shuffle has been fixed. We'll run the benchmark and share 
some result to your doc.

> EventTime based sessionization
> --
>
> Key: SPARK-10816
> URL: https://issues.apache.org/jira/browse/SPARK-10816
> Project: Spark
>  Issue Type: New Feature
>  Components: Structured Streaming
>Reporter: Reynold Xin
>Priority: Major
> Attachments: SPARK-10816 Support session window natively.pdf, Session 
> Window Support For Structure Streaming.pdf
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-10816) EventTime based sessionization

2018-09-28 Thread Li Yuanjian (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-10816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16631751#comment-16631751
 ] 

Li Yuanjian commented on SPARK-10816:
-

Design doc: 
[https://docs.google.com/document/d/1zeAc7QKSO7J4-Yk06kc76kvldl-QHLCDJuu04d7k2bg/edit?usp=sharing]

PR: [https://github.com/apache/spark/pull/22583]
With a roughly checking with [~kabhwan] post doc and pr, we share several spots 
in design and implement, hope we can resolve this problem together!

> EventTime based sessionization
> --
>
> Key: SPARK-10816
> URL: https://issues.apache.org/jira/browse/SPARK-10816
> Project: Spark
>  Issue Type: New Feature
>  Components: Structured Streaming
>Reporter: Reynold Xin
>Priority: Major
> Attachments: SPARK-10816 Support session window natively.pdf, Session 
> Window Support For Structure Streaming.pdf
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-10816) EventTime based sessionization

2018-09-28 Thread Li Yuanjian (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-10816?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Li Yuanjian updated SPARK-10816:

Attachment: Session Window Support For Structure Streaming.pdf

> EventTime based sessionization
> --
>
> Key: SPARK-10816
> URL: https://issues.apache.org/jira/browse/SPARK-10816
> Project: Spark
>  Issue Type: New Feature
>  Components: Structured Streaming
>Reporter: Reynold Xin
>Priority: Major
> Attachments: SPARK-10816 Support session window natively.pdf, Session 
> Window Support For Structure Streaming.pdf
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-10816) EventTime based sessionization

2018-09-28 Thread Li Yuanjian (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-10816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16631398#comment-16631398
 ] 

Li Yuanjian commented on SPARK-10816:
-

Great thanks for [~kabhwan] notice me, just linked SPARK-22565 as duplicated 
with this, sorry for just searching "session window" before and lost this, will 
still find others duplicated jira.

As discussed in SPARK-22565, we also meet this problem while doing the 
migration of streaming app running on other system to Structure Streaming. We 
solved this by implement the session window as build-in function and gave 
internal beta version based on Apache Spark 2.3.0 just week ago. After steady 
running online for real product env, we are doing the code clean work and doc 
translating.

As discussed with Jungtaek, we also wished to join the discussion here and will 
give PR and design doc today.

The preview pr I'll submit contains others patch. cc [~liulinhong] [~ivoson]  
[~yanlin-Lynn] [~LiangchangZ] , please watching this issue.

> EventTime based sessionization
> --
>
> Key: SPARK-10816
> URL: https://issues.apache.org/jira/browse/SPARK-10816
> Project: Spark
>  Issue Type: New Feature
>  Components: Structured Streaming
>Reporter: Reynold Xin
>Priority: Major
> Attachments: SPARK-10816 Support session window natively.pdf
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22565) Session-based windowing

2018-09-27 Thread Li Yuanjian (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-22565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16631370#comment-16631370
 ] 

Li Yuanjian commented on SPARK-22565:
-

[~kabhwan]

Great thanks for noticing me, sorry for only searched the "session window" and 
missed SPARK-10816.

{quote}

It would be nice if you could also share the SPIP, as well as some PR or design 
doc, so that we could see spots on making co-work and get better product.

{quote}

No problem, I'll cherry-pick all related patch from internal folk, and actually 
we are translating the internal doc for few days, will also post a design doc 
today, let discuss in SPARK-10816.

Thanks again for your reply.

 

> Session-based windowing
> ---
>
> Key: SPARK-22565
> URL: https://issues.apache.org/jira/browse/SPARK-22565
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.2.0
>Reporter: Richard Xin
>Priority: Major
> Attachments: screenshot-1.png
>
>
> I came across a requirement to support session-based windowing. for example, 
> user activity comes in from kafka, we want to create window per user session 
> (if the time gap of activity from the same user exceeds the predefined value, 
> a new window will be created).
> I noticed that Flink does support this kind of support, any plan/schedule for 
> spark for this? 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-22565) Session-based windowing

2018-09-27 Thread Li Yuanjian (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-22565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16631298#comment-16631298
 ] 

Li Yuanjian edited comment on SPARK-22565 at 9/28/18 3:23 AM:
--

Thanks for reporting this. Actually we also met this problem in our usage, we 
have an implement about session window in internal folk to resolve this. After 
steady running online for real product env, we want to contribute to community 
within the next few days. We implemented this by a build-in function named 
session_window and corresponding support for window merge in Structure 
Streaming. The usage of dataframe api and SQL can be quickly browsing by the 
test:
 !screenshot-1.png!


was (Author: xuanyuan):
Thanks for reporting this. Actually we also met this problem in our usage, we 
have an implement about session window in internal folk to resolve this. After 
steady running online for real product env, we want to contribute to community 
within the next few days. We implemented this by a build-in function named 
session_window. The usage of dataframe api and SQL can be quickly browsing by 
the test:
 !screenshot-1.png!

> Session-based windowing
> ---
>
> Key: SPARK-22565
> URL: https://issues.apache.org/jira/browse/SPARK-22565
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.2.0
>Reporter: Richard Xin
>Priority: Major
> Attachments: screenshot-1.png
>
>
> I came across a requirement to support session-based windowing. for example, 
> user activity comes in from kafka, we want to create window per user session 
> (if the time gap of activity from the same user exceeds the predefined value, 
> a new window will be created).
> I noticed that Flink does support this kind of support, any plan/schedule for 
> spark for this? 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22565) Session-based windowing

2018-09-27 Thread Li Yuanjian (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-22565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16631300#comment-16631300
 ] 

Li Yuanjian commented on SPARK-22565:
-

Also cc [~zsxwing] [~tdas], we are translating the design doc and will post a 
SPIP in these days, hope you can have a look when you have time, thanks :)

> Session-based windowing
> ---
>
> Key: SPARK-22565
> URL: https://issues.apache.org/jira/browse/SPARK-22565
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.2.0
>Reporter: Richard Xin
>Priority: Major
> Attachments: screenshot-1.png
>
>
> I came across a requirement to support session-based windowing. for example, 
> user activity comes in from kafka, we want to create window per user session 
> (if the time gap of activity from the same user exceeds the predefined value, 
> a new window will be created).
> I noticed that Flink does support this kind of support, any plan/schedule for 
> spark for this? 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22565) Session-based windowing

2018-09-27 Thread Li Yuanjian (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-22565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16631298#comment-16631298
 ] 

Li Yuanjian commented on SPARK-22565:
-

Thanks for reporting this. Actually we also met this problem in our usage, we 
have an implement about session window in internal folk to resolve this. After 
steady running online for real product env, we want to contribute to community 
within the next few days. We implemented this by a build-in function named 
session_window. The usage of dataframe api and SQL can be quickly browsing by 
the test:
 !screenshot-1.png!

> Session-based windowing
> ---
>
> Key: SPARK-22565
> URL: https://issues.apache.org/jira/browse/SPARK-22565
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.2.0
>Reporter: Richard Xin
>Priority: Major
> Attachments: screenshot-1.png
>
>
> I came across a requirement to support session-based windowing. for example, 
> user activity comes in from kafka, we want to create window per user session 
> (if the time gap of activity from the same user exceeds the predefined value, 
> a new window will be created).
> I noticed that Flink does support this kind of support, any plan/schedule for 
> spark for this? 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-22565) Session-based windowing

2018-09-27 Thread Li Yuanjian (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-22565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Li Yuanjian updated SPARK-22565:

Attachment: screenshot-1.png

> Session-based windowing
> ---
>
> Key: SPARK-22565
> URL: https://issues.apache.org/jira/browse/SPARK-22565
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.2.0
>Reporter: Richard Xin
>Priority: Major
> Attachments: screenshot-1.png
>
>
> I came across a requirement to support session-based windowing. for example, 
> user activity comes in from kafka, we want to create window per user session 
> (if the time gap of activity from the same user exceeds the predefined value, 
> a new window will be created).
> I noticed that Flink does support this kind of support, any plan/schedule for 
> spark for this? 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-25527) Job stuck waiting for last stage to start

2018-09-25 Thread Li Yuanjian (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-25527?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16627593#comment-16627593
 ] 

Li Yuanjian commented on SPARK-25527:
-

{quote}
There are no Tasks waiting for completion, and the job just hangs.
{quote}
You can check the driver thread dump at this time, generally speaking driver is 
doing some heavy work like commit in this scenario. We need more clues to 
confirm this is a bug.

> Job stuck waiting for last stage to start
> -
>
> Key: SPARK-25527
> URL: https://issues.apache.org/jira/browse/SPARK-25527
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.0
>Reporter: Ran Haim
>Priority: Major
>
> Sometimes it can somehow happen that a job is stuck waiting for the last 
> stage to start.
> There are no Tasks waiting for completion, and the job just hangs.
> There are available Executors for the job to run.
> I do not know how to reproduce this, all I know is that it happens randomly 
> after couple days of hard load.
> Another thing that might help is that it seems to happen when some tasks fail 
> because one or more executors killed (due to memory issues or something).
> Those tasks eventually do get finished by other executors because of retries, 
> but the next stage hangs.  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24499) Documentation improvement of Spark core and SQL

2018-09-23 Thread Li Yuanjian (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16625061#comment-16625061
 ] 

Li Yuanjian commented on SPARK-24499:
-

[~smilegator] Sorry for the late reply for this, we'll give the first PR about 
page split in 2 weeks. We also looked into the currently doc, maybe the second 
PR for this is related about the usage of PipeRDD, it is widely used during our 
migration work from Hadoop to Spark and maybe the currently doc not mentioned 
to much.

> Documentation improvement of Spark core and SQL
> ---
>
> Key: SPARK-24499
> URL: https://issues.apache.org/jira/browse/SPARK-24499
> Project: Spark
>  Issue Type: New Feature
>  Components: Documentation, Spark Core, SQL
>Affects Versions: 2.3.0
>Reporter: Xiao Li
>Priority: Major
>
> The current documentation in Apache Spark lacks enough code examples and 
> tips. If needed, we should also split the page of 
> https://spark.apache.org/docs/latest/sql-programming-guide.html to multiple 
> separate pages like what we did for 
> https://spark.apache.org/docs/latest/ml-guide.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-25426) Remove the duplicate fallback logic in UnsafeProjection

2018-09-17 Thread Li Yuanjian (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-25426?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16617587#comment-16617587
 ] 

Li Yuanjian commented on SPARK-25426:
-

Resolved by https://github.com/apache/spark/pull/22417.

> Remove the duplicate fallback logic in UnsafeProjection
> ---
>
> Key: SPARK-25426
> URL: https://issues.apache.org/jira/browse/SPARK-25426
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.1
>Reporter: Takeshi Yamamuro
>Assignee: Takeshi Yamamuro
>Priority: Minor
> Fix For: 2.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23128) A new approach to do adaptive execution in Spark SQL

2018-08-21 Thread Li Yuanjian (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-23128?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16587348#comment-16587348
 ] 

Li Yuanjian commented on SPARK-23128:
-

Thanks for your comment [~xinyao].

{quote}
If I understand it correctly, one of the goals of this patch is to solve the 
data skew by splitting large partition into small ones. Have seen any 
significant improvement in this area?
{quote}
Yes you're right, the strategy of handling data skew in join do have 
significant improvement but not in my report, there's other companies has more 
practice on this strategy.

> A new approach to do adaptive execution in Spark SQL
> 
>
> Key: SPARK-23128
> URL: https://issues.apache.org/jira/browse/SPARK-23128
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Carson Wang
>Priority: Major
> Attachments: AdaptiveExecutioninBaidu.pdf
>
>
> SPARK-9850 proposed the basic idea of adaptive execution in Spark. In 
> DAGScheduler, a new API is added to support submitting a single map stage.  
> The current implementation of adaptive execution in Spark SQL supports 
> changing the reducer number at runtime. An Exchange coordinator is used to 
> determine the number of post-shuffle partitions for a stage that needs to 
> fetch shuffle data from one or multiple stages. The current implementation 
> adds ExchangeCoordinator while we are adding Exchanges. However there are 
> some limitations. First, it may cause additional shuffles that may decrease 
> the performance. We can see this from EnsureRequirements rule when it adds 
> ExchangeCoordinator.  Secondly, it is not a good idea to add 
> ExchangeCoordinators while we are adding Exchanges because we don’t have a 
> global picture of all shuffle dependencies of a post-shuffle stage. I.e. for 
> 3 tables’ join in a single stage, the same ExchangeCoordinator should be used 
> in three Exchanges but currently two separated ExchangeCoordinator will be 
> added. Thirdly, with the current framework it is not easy to implement other 
> features in adaptive execution flexibly like changing the execution plan and 
> handling skewed join at runtime.
> We'd like to introduce a new way to do adaptive execution in Spark SQL and 
> address the limitations. The idea is described at 
> [https://docs.google.com/document/d/1mpVjvQZRAkD-Ggy6-hcjXtBPiQoVbZGe3dLnAKgtJ4k/edit?usp=sharing]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-25050) Handle more than two types in avro union types

2018-08-21 Thread Li Yuanjian (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-25050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16587326#comment-16587326
 ] 

Li Yuanjian commented on SPARK-25050:
-

Has more than tow types in avro union types been supported in SchemaConverters 
here: 
[SchemaConverters.scala+98|https://github.com/apache/spark/blob/4fb96e5105cec4a3eb19a2b7997600b086bac32f/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala#L98-L116]?
 If I'm wrong please let me know, thanks for your checking :)

> Handle more than two types in avro union types
> --
>
> Key: SPARK-25050
> URL: https://issues.apache.org/jira/browse/SPARK-25050
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: DB Tsai
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-25072) PySpark custom Row class can be given extra parameters

2018-08-18 Thread Li Yuanjian (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-25072?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16584705#comment-16584705
 ] 

Li Yuanjian commented on SPARK-25072:
-

Interesting issue, but maybe this only in PySpark, Scala Row will not have this 
problem. I'll give a swiftly fix for this.

> PySpark custom Row class can be given extra parameters
> --
>
> Key: SPARK-25072
> URL: https://issues.apache.org/jira/browse/SPARK-25072
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.2.0
> Environment: {noformat}
> SPARK_MAJOR_VERSION is set to 2, using Spark2
> Python 3.4.5 (default, Dec 11 2017, 16:57:19)
> Type 'copyright', 'credits' or 'license' for more information
> IPython 6.2.1 -- An enhanced Interactive Python. Type '?' for help.
> Using Spark's default log4j profile: 
> org/apache/spark/log4j-defaults.properties
> Setting default log level to "WARN".
> To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use 
> setLogLevel(newLevel).
> 18/08/01 04:49:16 WARN NativeCodeLoader: Unable to load native-hadoop library 
> for your platform... using builtin-java classes where applicable
> 18/08/01 04:49:17 WARN Utils: Service 'SparkUI' could not bind on port 4040. 
> Attempting port 4041.
> 18/08/01 04:49:27 WARN ObjectStore: Failed to get database global_temp, 
> returning NoSuchObjectException
> Welcome to
>     __
>  / __/__  ___ _/ /__
> _\ \/ _ \/ _ `/ __/  '_/
>/__ / .__/\_,_/_/ /_/\_\   version 2.2.0
>   /_/
> Using Python version 3.4.5 (default, Dec 11 2017 16:57:19)
> SparkSession available as 'spark'.
> {noformat}
> {{CentOS release 6.9 (Final)}}
> {{Linux sandbox-hdp.hortonworks.com 4.14.0-1.el7.elrepo.x86_64 #1 SMP Sun Nov 
> 12 20:21:04 EST 2017 x86_64 x86_64 x86_64 GNU/Linux}}
> {noformat}openjdk version "1.8.0_161"
> OpenJDK Runtime Environment (build 1.8.0_161-b14)
> OpenJDK 64-Bit Server VM (build 25.161-b14, mixed mode){noformat}
>Reporter: Jan-Willem van der Sijp
>Priority: Minor
>
> When a custom Row class is made in PySpark, it is possible to provide the 
> constructor of this class with more parameters than there are columns. These 
> extra parameters affect the value of the Row, but are not part of the 
> {{repr}} or {{str}} output, making it hard to debug errors due to these 
> "invisible" values. The hidden values can be accessed through integer-based 
> indexing though.
> Some examples:
> {code:python}
> In [69]: RowClass = Row("column1", "column2")
> In [70]: RowClass(1, 2) == RowClass(1, 2)
> Out[70]: True
> In [71]: RowClass(1, 2) == RowClass(1, 2, 3)
> Out[71]: False
> In [75]: RowClass(1, 2, 3)
> Out[75]: Row(column1=1, column2=2)
> In [76]: RowClass(1, 2)
> Out[76]: Row(column1=1, column2=2)
> In [77]: RowClass(1, 2, 3).asDict()
> Out[77]: {'column1': 1, 'column2': 2}
> In [78]: RowClass(1, 2, 3)[2]
> Out[78]: 3
> In [79]: repr(RowClass(1, 2, 3))
> Out[79]: 'Row(column1=1, column2=2)'
> In [80]: str(RowClass(1, 2, 3))
> Out[80]: 'Row(column1=1, column2=2)'
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-25104) Validate user specified output schema

2018-08-15 Thread Li Yuanjian (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-25104?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Li Yuanjian updated SPARK-25104:

Description: 
With code changes in 
[https://github.com/apache/spark/pull/21847|https://github.com/apache/spark/pull/21847]
 , Spark can write out data as per user provided output schema.

To make it more robust and user friendly, we should validate the Avro schema 
before tasks launched.

Also we should support output logical decimal type as BYTES (By default we 
output as FIXED)

  was:
With code changes in 
[https://github.com/apache/spark/pull/21847|https://github.com/apache/spark/pull/21847,]
 , Spark can write out data as per user provided output schema.

To make it more robust and user friendly, we should validate the Avro schema 
before tasks launched.

Also we should support output logical decimal type as BYTES (By default we 
output as FIXED)


> Validate user specified output schema
> -
>
> Key: SPARK-25104
> URL: https://issues.apache.org/jira/browse/SPARK-25104
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Gengliang Wang
>Assignee: Gengliang Wang
>Priority: Major
> Fix For: 2.4.0
>
>
> With code changes in 
> [https://github.com/apache/spark/pull/21847|https://github.com/apache/spark/pull/21847]
>  , Spark can write out data as per user provided output schema.
> To make it more robust and user friendly, we should validate the Avro schema 
> before tasks launched.
> Also we should support output logical decimal type as BYTES (By default we 
> output as FIXED)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-25100) Using KryoSerializer and setting registrationRequired true can lead job failed

2018-08-15 Thread Li Yuanjian (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-25100?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Li Yuanjian updated SPARK-25100:

Description: 
When spark.serializer is `org.apache.spark.serializer.KryoSerializer` and  
`spark.kryo.registrationRequired` is true in SparkConf. I invoked  
saveAsNewAPIHadoopDataset to store data in hdfs. The job will fail because the 
class TaskCommitMessage hasn't be registered.

 
{code:java}
java.lang.IllegalArgumentException: Class is not registered: 
org.apache.spark.internal.io.FileCommitProtocol$TaskCommitMessage
Note: To register this class use: 
kryo.register(org.apache.spark.internal.io.FileCommitProtocol$TaskCommitMessage.class);
at com.esotericsoftware.kryo.Kryo.getRegistration(Kryo.java:488)
at com.twitter.chill.KryoBase.getRegistration(KryoBase.scala:52)
at 
com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:97)
at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:517)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:622)
at 
org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:347)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:393)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
{code}
 

  was:
When spark.serializer is org.apache.spark.serializer.KryoSerializer and  
spark.kryo.registrationRequired is true in SparkCOnf. I invoked  
saveAsNewAPIHadoopDataset to store data in hdfs. The job will fail because the 
class TaskCommitMessage hasn't be registered.

 
{code:java}
java.lang.IllegalArgumentException: Class is not registered: 
org.apache.spark.internal.io.FileCommitProtocol$TaskCommitMessage
Note: To register this class use: 
kryo.register(org.apache.spark.internal.io.FileCommitProtocol$TaskCommitMessage.class);
at com.esotericsoftware.kryo.Kryo.getRegistration(Kryo.java:488)
at com.twitter.chill.KryoBase.getRegistration(KryoBase.scala:52)
at 
com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:97)
at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:517)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:622)
at 
org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:347)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:393)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
{code}
 


> Using KryoSerializer and setting registrationRequired true can lead job failed
> --
>
> Key: SPARK-25100
> URL: https://issues.apache.org/jira/browse/SPARK-25100
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.1
>Reporter: deshanxiao
>Priority: Major
>
> When spark.serializer is `org.apache.spark.serializer.KryoSerializer` and  
> `spark.kryo.registrationRequired` is true in SparkConf. I invoked  
> saveAsNewAPIHadoopDataset to store data in hdfs. The job will fail because 
> the class TaskCommitMessage hasn't be registered.
>  
> {code:java}
> java.lang.IllegalArgumentException: Class is not registered: 
> org.apache.spark.internal.io.FileCommitProtocol$TaskCommitMessage
> Note: To register this class use: 
> kryo.register(org.apache.spark.internal.io.FileCommitProtocol$TaskCommitMessage.class);
> at com.esotericsoftware.kryo.Kryo.getRegistration(Kryo.java:488)
> at com.twitter.chill.KryoBase.getRegistration(KryoBase.scala:52)
> at 
> com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:97)
> at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:517)
> at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:622)
> at 
> org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:347)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:393)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-25077) Delete unused variable in WindowExec

2018-08-09 Thread Li Yuanjian (JIRA)
Li Yuanjian created SPARK-25077:
---

 Summary: Delete unused variable in WindowExec
 Key: SPARK-25077
 URL: https://issues.apache.org/jira/browse/SPARK-25077
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.3.0
Reporter: Li Yuanjian


Delete the unused variable `inputFields` in WindowExec, avoid making others 
confused while reading the code.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-24989) BlockFetcher should retry while getting OutOfDirectMemoryError

2018-08-03 Thread Li Yuanjian (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24989?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Li Yuanjian resolved SPARK-24989.
-
Resolution: Not A Problem

The param `spark.reducer.maxBlocksInFlightPerAddress` added in SPARK-21243 can 
solve this problem, close this jira. Thanks [~Dhruve Ashar] !

> BlockFetcher should retry while getting OutOfDirectMemoryError
> --
>
> Key: SPARK-24989
> URL: https://issues.apache.org/jira/browse/SPARK-24989
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle
>Affects Versions: 2.2.0
>Reporter: Li Yuanjian
>Priority: Major
> Attachments: FailedStage.png
>
>
> h3. Description
> This problem can be reproduced stably by a large parallelism job migrate from 
> map reduce to Spark in our practice, some metrics list below:
> ||Item||Value||
> |spark.executor.instances|1000|
> |spark.executor.cores|5|
> |task number of shuffle writer stage|18038|
> |task number of shuffle reader stage|8|
> While the shuffle writer stage successful ended, the shuffle reader stage 
> starting and keep failing by FetchFail. Each fetch request need the netty 
> sever allocate a buffer in 16MB(detailed stack attached below), the huge 
> amount of fetch request will use up default maxDirectMemory rapidly, even 
> though we bump up io.netty.maxDirectMemory to 50GB!
> {code:java}
> org.apache.spark.shuffle.FetchFailedException: failed to allocate 16777216 
> byte(s) of direct memory (used: 21474836480, max: 21474836480)
>   at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:514)
>   at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:445)
>   at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:61)
>   at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
>   at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>   at 
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>   at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
>   at 
> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:199)
>   at 
> org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:119)
>   at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:105)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>   at org.apache.spark.scheduler.Task.run(Task.scala:108)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: io.netty.util.internal.OutOfDirectMemoryError: failed to allocate 
> 16777216 byte(s) of direct memory 

[jira] [Updated] (SPARK-24989) BlockFetcher should retry while getting OutOfDirectMemoryError

2018-08-01 Thread Li Yuanjian (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24989?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Li Yuanjian updated SPARK-24989:

Attachment: FailedStage.png

> BlockFetcher should retry while getting OutOfDirectMemoryError
> --
>
> Key: SPARK-24989
> URL: https://issues.apache.org/jira/browse/SPARK-24989
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle
>Affects Versions: 2.2.0
>Reporter: Li Yuanjian
>Priority: Major
> Attachments: FailedStage.png
>
>
> h3. Description
> This problem can be reproduced stably by a large parallelism job migrate from 
> map reduce to Spark in our practice, some metrics list below:
> ||Item||Value||
> |spark.executor.instances|1000|
> |spark.executor.cores|5|
> |task number of shuffle writer stage|18038|
> |task number of shuffle reader stage|8|
> While the shuffle writer stage successful ended, the shuffle reader stage 
> starting and keep failing by FetchFail. Each fetch request need the netty 
> sever allocate a buffer in 16MB(detailed stack attached below), the huge 
> amount of fetch request will use up default maxDirectMemory rapidly, even 
> though we bump up io.netty.maxDirectMemory to 50GB!
> {code:java}
> org.apache.spark.shuffle.FetchFailedException: failed to allocate 16777216 
> byte(s) of direct memory (used: 21474836480, max: 21474836480)
>   at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:514)
>   at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:445)
>   at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:61)
>   at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
>   at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>   at 
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>   at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
>   at 
> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:199)
>   at 
> org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:119)
>   at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:105)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>   at org.apache.spark.scheduler.Task.run(Task.scala:108)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: io.netty.util.internal.OutOfDirectMemoryError: failed to allocate 
> 16777216 byte(s) of direct memory (used: 21474836480, max: 21474836480)
>   at 
> io.netty.util.internal.PlatformDependent.incrementMemoryCounter(PlatformDependent.java:530)
>  

[jira] [Created] (SPARK-24989) BlockFetcher should retry while getting OutOfDirectMemoryError

2018-08-01 Thread Li Yuanjian (JIRA)
Li Yuanjian created SPARK-24989:
---

 Summary: BlockFetcher should retry while getting 
OutOfDirectMemoryError
 Key: SPARK-24989
 URL: https://issues.apache.org/jira/browse/SPARK-24989
 Project: Spark
  Issue Type: Improvement
  Components: Shuffle
Affects Versions: 2.2.0
Reporter: Li Yuanjian


h3. Description

This problem can be reproduced stably by a large parallelism job migrate from 
map reduce to Spark in our practice, some metrics list below:
||Item||Value||
|spark.executor.instances|1000|
|spark.executor.cores|5|
|task number of shuffle writer stage|18038|
|task number of shuffle reader stage|8|

While the shuffle writer stage successful ended, the shuffle reader stage 
starting and keep failing by FetchFail. Each fetch request need the netty sever 
allocate a buffer in 16MB(detailed stack attached below), the huge amount of 
fetch request will use up default maxDirectMemory rapidly, even though we bump 
up io.netty.maxDirectMemory to 50GB!
{code:java}
org.apache.spark.shuffle.FetchFailedException: failed to allocate 16777216 
byte(s) of direct memory (used: 21474836480, max: 21474836480)
at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:514)
at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:445)
at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:61)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at 
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at 
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:199)
at 
org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:119)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:105)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: io.netty.util.internal.OutOfDirectMemoryError: failed to allocate 
16777216 byte(s) of direct memory (used: 21474836480, max: 21474836480)
at 
io.netty.util.internal.PlatformDependent.incrementMemoryCounter(PlatformDependent.java:530)
at 
io.netty.util.internal.PlatformDependent.allocateDirectNoCleaner(PlatformDependent.java:484)
at 
io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:711)
at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:700)
at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:237)
at 

[jira] [Commented] (SPARK-24340) Clean up non-shuffle disk block manager files following executor death

2018-07-21 Thread Li Yuanjian (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24340?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16551687#comment-16551687
 ] 

Li Yuanjian commented on SPARK-24340:
-

cc [~jiangxb1987] I think this was resolved by your pr 21390, we need change 
the status?

> Clean up non-shuffle disk block manager files following executor death
> --
>
> Key: SPARK-24340
> URL: https://issues.apache.org/jira/browse/SPARK-24340
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.4.0
>Reporter: Jiang Xingbo
>Priority: Major
>
> Currently we only clean up local folders on application removed, and we don't 
> clean up non-shuffle files, such as temp. shuffle blocks, cached 
> RDD/broadcast blocks, spill files, etc. and this can cause disk space leaks 
> when executors periodically die and are replaced.
> To avoid this source of disk space leak, we can clean up executor disk store 
> files except for shuffle index and data files on executor finished.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23128) A new approach to do adaptive execution in Spark SQL

2018-07-21 Thread Li Yuanjian (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-23128?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16551638#comment-16551638
 ] 

Li Yuanjian commented on SPARK-23128:
-

[~tgraves] Thanks for your comment, as far as I know [~carsonwang] are still 
working on this and porting the patch to spark 2.3. Also this patch has been 
used by several team in their internal product env, hope this can be reviewed 
soon.

> A new approach to do adaptive execution in Spark SQL
> 
>
> Key: SPARK-23128
> URL: https://issues.apache.org/jira/browse/SPARK-23128
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Carson Wang
>Priority: Major
> Attachments: AdaptiveExecutioninBaidu.pdf
>
>
> SPARK-9850 proposed the basic idea of adaptive execution in Spark. In 
> DAGScheduler, a new API is added to support submitting a single map stage.  
> The current implementation of adaptive execution in Spark SQL supports 
> changing the reducer number at runtime. An Exchange coordinator is used to 
> determine the number of post-shuffle partitions for a stage that needs to 
> fetch shuffle data from one or multiple stages. The current implementation 
> adds ExchangeCoordinator while we are adding Exchanges. However there are 
> some limitations. First, it may cause additional shuffles that may decrease 
> the performance. We can see this from EnsureRequirements rule when it adds 
> ExchangeCoordinator.  Secondly, it is not a good idea to add 
> ExchangeCoordinators while we are adding Exchanges because we don’t have a 
> global picture of all shuffle dependencies of a post-shuffle stage. I.e. for 
> 3 tables’ join in a single stage, the same ExchangeCoordinator should be used 
> in three Exchanges but currently two separated ExchangeCoordinator will be 
> added. Thirdly, with the current framework it is not easy to implement other 
> features in adaptive execution flexibly like changing the execution plan and 
> handling skewed join at runtime.
> We'd like to introduce a new way to do adaptive execution in Spark SQL and 
> address the limitations. The idea is described at 
> [https://docs.google.com/document/d/1mpVjvQZRAkD-Ggy6-hcjXtBPiQoVbZGe3dLnAKgtJ4k/edit?usp=sharing]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24295) Purge Structured streaming FileStreamSinkLog metadata compact file data.

2018-07-18 Thread Li Yuanjian (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16547873#comment-16547873
 ] 

Li Yuanjian commented on SPARK-24295:
-

Thanks for your detailed explain. 
You can check this: SPARK-17604, seems like the same requirements about purging 
the compact aged file. The small difference is we need the purge logic in 
FileStreamSinkLog while the jira support in FileSourceSinkLog, but I think the 
strategy can be reused. Also cc the original author [~jerryshao2015] for 
SPARK-17604. 


> Purge Structured streaming FileStreamSinkLog metadata compact file data.
> 
>
> Key: SPARK-24295
> URL: https://issues.apache.org/jira/browse/SPARK-24295
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.3.0
>Reporter: Iqbal Singh
>Priority: Major
>
> FileStreamSinkLog metadata logs are concatenated to a single compact file 
> after defined compact interval.
> For long running jobs, compact file size can grow up to 10's of GB's, Causing 
> slowness  while reading the data from FileStreamSinkLog dir as spark is 
> defaulting to the "__spark__metadata" dir for the read.
> We need a functionality to purge the compact file size.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24295) Purge Structured streaming FileStreamSinkLog metadata compact file data.

2018-07-15 Thread Li Yuanjian (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544453#comment-16544453
 ] 

Li Yuanjian commented on SPARK-24295:
-

Could you give more detailed information about how the compact file size 
growing up to 10GB in your scenario? As the implementation of 
FileStreamSinkLog, batches in compactInterval(default value is 10) will be 
merged into a single file, all the content in this file is serialized 
SinkFileStatus, it seems hardly can grow to 10GB.

> Purge Structured streaming FileStreamSinkLog metadata compact file data.
> 
>
> Key: SPARK-24295
> URL: https://issues.apache.org/jira/browse/SPARK-24295
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.3.0
>Reporter: Iqbal Singh
>Priority: Major
>
> FileStreamSinkLog metadata logs are concatenated to a single compact file 
> after defined compact interval.
> For long running jobs, compact file size can grow up to 10's of GB's, Causing 
> slowness  while reading the data from FileStreamSinkLog dir as spark is 
> defaulting to the "__spark__metadata" dir for the read.
> We need a functionality to purge the compact file size.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24755) Executor loss can cause task to not be resubmitted

2018-07-08 Thread Li Yuanjian (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16536107#comment-16536107
 ] 

Li Yuanjian commented on SPARK-24755:
-

No problem, thanks [~hthuynh2].
Thanks [~mridulm80] for letting me know, I'll also watch this jira.

> Executor loss can cause task to not be resubmitted
> --
>
> Key: SPARK-24755
> URL: https://issues.apache.org/jira/browse/SPARK-24755
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: Mridul Muralidharan
>Priority: Major
>
> As part of SPARK-22074, when an executor is lost, TSM.executorLost currently 
> checks for "if (successful(index) && !killedByOtherAttempt(index))" to decide 
> if task needs to be resubmitted for partition.
> Consider following:
> For partition P1, tasks T1 and T2 are running on exec-1 and exec-2 
> respectively (one of them being speculative task)
> T1 finishes successfully first.
> This results in setting "killedByOtherAttempt(P1) = true" due to running T2.
> We also end up killing task T2.
> Now, exec-1 if/when goes MIA.
> executorLost will no longer schedule task for P1 - since 
> killedByOtherAttempt(P1) == true; even though P1 was hosted on T1 and there 
> is no other copy of P1 around (T2 was killed when T1 succeeded).
> I noticed this bug as part of reviewing PR# 21653 for SPARK-13343
> Essentially, SPARK-22074 causes a regression (which I dont usually observe 
> due to shuffle service, sigh) - and as such the fix is broken IMO.
> I dont have a PR handy for this, so if anyone wants to pick it up, please do 
> feel free !
> +CC [~XuanYuan] who fixed SPARK-22074 initially.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24665) Add SQLConf in PySpark to manage all sql configs

2018-06-26 Thread Li Yuanjian (JIRA)
Li Yuanjian created SPARK-24665:
---

 Summary: Add SQLConf in PySpark to manage all sql configs
 Key: SPARK-24665
 URL: https://issues.apache.org/jira/browse/SPARK-24665
 Project: Spark
  Issue Type: Improvement
  Components: PySpark
Affects Versions: 2.3.0
Reporter: Li Yuanjian


With new config adding in PySpark, we currently get them by hard coding the 
config name and default value. We should move all the configs into a Class like 
what we did in Spark SQL Conf.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24630) SPIP: Support SQLStreaming in Spark

2018-06-25 Thread Li Yuanjian (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523064#comment-16523064
 ] 

Li Yuanjian commented on SPARK-24630:
-

cc [~zsxwing] and [~tdas]

We have some practice over SQLStreaming as [~Jackey Lee] described in SPIP doc, 
but maybe there's two points need more discussion:
1. The "STREAM" key words in SQL grammar
2. Combining with Hive table is enough in internal usage, but maybe we should 
integrate with Datasoure V2 for stronger generality

> SPIP: Support SQLStreaming in Spark
> ---
>
> Key: SPARK-24630
> URL: https://issues.apache.org/jira/browse/SPARK-24630
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.2.0, 2.2.1
>Reporter: Jackey Lee
>Priority: Minor
>  Labels: SQLStreaming
> Attachments: SQLStreaming SPIP.pdf
>
>
> At present, KafkaSQL, Flink SQL(which is actually based on Calcite), 
> SQLStream, StormSQL all provide a stream type SQL interface, with which users 
> with little knowledge about streaming,  can easily develop a flow system 
> processing model. In Spark, we can also support SQL API based on 
> StructStreamig.
> To support for SQL Streaming, there are two key points: 
> 1, Analysis should be able to parse streaming type SQL. 
> 2, Analyzer should be able to map metadata information to the corresponding 
> Relation. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24499) Documentation improvement of Spark core and SQL

2018-06-08 Thread Li Yuanjian (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16506285#comment-16506285
 ] 

Li Yuanjian commented on SPARK-24499:
-

No problem, thanks for ping me, our pleasure.
We'll collect and translate some internal user demos and also some hints during 
MR job migrate to Spark.

> Documentation improvement of Spark core and SQL
> ---
>
> Key: SPARK-24499
> URL: https://issues.apache.org/jira/browse/SPARK-24499
> Project: Spark
>  Issue Type: New Feature
>  Components: Documentation, Spark Core, SQL
>Affects Versions: 2.3.0
>Reporter: Xiao Li
>Priority: Major
>
> The current documentation in Apache Spark lacks enough code examples and 
> tips. If needed, we should also split the page of 
> https://spark.apache.org/docs/latest/sql-programming-guide.html to multiple 
> separate pages like what we did for 
> https://spark.apache.org/docs/latest/ml-guide.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24183) add unit tests for ContinuousDataReader hook

2018-06-07 Thread Li Yuanjian (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24183?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16504614#comment-16504614
 ] 

Li Yuanjian commented on SPARK-24183:
-

Hi [~joseph.torres], I notice currently we already have 
`ContinuousQueuedDataReaderSuite` after SPARK-20439, is this class enough for 
"add unit tests for ContinuousDataReader hook"? 

> add unit tests for ContinuousDataReader hook
> 
>
> Key: SPARK-24183
> URL: https://issues.apache.org/jira/browse/SPARK-24183
> Project: Spark
>  Issue Type: Sub-task
>  Components: Structured Streaming
>Affects Versions: 2.4.0
>Reporter: Jose Torres
>Priority: Major
>
> Currently this is the class named ContinuousQueuedDataReader, but I don't 
> know if this will change as we deal with stateful operators.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24375) Design sketch: support barrier scheduling in Apache Spark

2018-06-06 Thread Li Yuanjian (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24375?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16504171#comment-16504171
 ] 

Li Yuanjian commented on SPARK-24375:
-

Got it, great thanks for your detailed explanation.

> Design sketch: support barrier scheduling in Apache Spark
> -
>
> Key: SPARK-24375
> URL: https://issues.apache.org/jira/browse/SPARK-24375
> Project: Spark
>  Issue Type: Story
>  Components: Spark Core
>Affects Versions: 3.0.0
>Reporter: Xiangrui Meng
>Assignee: Jiang Xingbo
>Priority: Major
>
> This task is to outline a design sketch for the barrier scheduling SPIP 
> discussion. It doesn't need to be a complete design before the vote. But it 
> should at least cover both Scala/Java and PySpark.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-24375) Design sketch: support barrier scheduling in Apache Spark

2018-06-06 Thread Li Yuanjian (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24375?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16503486#comment-16503486
 ] 

Li Yuanjian edited comment on SPARK-24375 at 6/6/18 3:55 PM:
-

Hi [~cloud_fan] and [~jiangxb1987], just I tiny question here, I notice the 
discussion in SPARK-20928 mentioned the barrier scheduling which Continuous 
Processing will depend on.
{quote}
A barrier stage doesn’t launch any of its tasks until the available slots(free 
CPU cores can be used to launch pending tasks) satisfies the target to launch 
all the tasks at the same time, and always retry the whole stage when any 
task(s) fail.
{quote}
Why the task level retrying was forbidden here, is there any possible to 
achieve this? Thanks.
 

 


was (Author: xuanyuan):
Hi [~cloud_fan] and [~jiangxb1987], just I tiny question here, I notice the 
discussion in SPARK-20928 mentioned the barrier scheduling.
{quote}
A barrier stage doesn’t launch any of its tasks until the available slots(free 
CPU cores can be used to launch pending tasks) satisfies the target to launch 
all the tasks at the same time, and always retry the whole stage when any 
task(s) fail.
{quote}
Why the task level retrying was forbidden here, is there any possible to 
achieve this? Thanks.
 

 

> Design sketch: support barrier scheduling in Apache Spark
> -
>
> Key: SPARK-24375
> URL: https://issues.apache.org/jira/browse/SPARK-24375
> Project: Spark
>  Issue Type: Story
>  Components: Spark Core
>Affects Versions: 3.0.0
>Reporter: Xiangrui Meng
>Assignee: Jiang Xingbo
>Priority: Major
>
> This task is to outline a design sketch for the barrier scheduling SPIP 
> discussion. It doesn't need to be a complete design before the vote. But it 
> should at least cover both Scala/Java and PySpark.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24375) Design sketch: support barrier scheduling in Apache Spark

2018-06-06 Thread Li Yuanjian (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24375?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16503486#comment-16503486
 ] 

Li Yuanjian commented on SPARK-24375:
-

Hi [~cloud_fan] and [~jiangxb1987], just I tiny question here, I notice the 
discussion in SPARK-20928 mentioned the barrier scheduling.
{quote}
A barrier stage doesn’t launch any of its tasks until the available slots(free 
CPU cores can be used to launch pending tasks) satisfies the target to launch 
all the tasks at the same time, and always retry the whole stage when any 
task(s) fail.
{quote}
Why the task level retrying was forbidden here, is there any possible to 
achieve this? Thanks.
 

 

> Design sketch: support barrier scheduling in Apache Spark
> -
>
> Key: SPARK-24375
> URL: https://issues.apache.org/jira/browse/SPARK-24375
> Project: Spark
>  Issue Type: Story
>  Components: Spark Core
>Affects Versions: 3.0.0
>Reporter: Xiangrui Meng
>Assignee: Jiang Xingbo
>Priority: Major
>
> This task is to outline a design sketch for the barrier scheduling SPIP 
> discussion. It doesn't need to be a complete design before the vote. But it 
> should at least cover both Scala/Java and PySpark.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-24210) incorrect handling of boolean expressions when using column in expressions in pyspark.sql.DataFrame filter function

2018-06-04 Thread Li Yuanjian (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24210?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16500383#comment-16500383
 ] 

Li Yuanjian edited comment on SPARK-24210 at 6/4/18 3:34 PM:
-

I think it maybe not a bug.
{code:python}
#KO:
returns r1 and r3ex.filter(('c1 = 1') and ('c2 = 1')).show()
{code}
This cause by python self base string __and__ implementation. After passing to 
df.filter, there's only 'c2 = 1'.
{code:python}
#KO:
returns r0 and r3ex.filter('c1 = 1 & c2 = 1').show()
#KO:
returns r0 and r3ex.filter('c1 == 1 & c2 == 1').show()
{code}
As you mentioned, [https://github.com/apache/spark/pull/6961] actually fix the 
'&' between column, but not string expression like 'c1 = 1 & c2 = 1', here in 
ex.filter('c1 = 1 & c2 = 1'), Spark parse it to valueExpression like: 'Filter 
(('a = (1 & 'b)) = 1), I think this make sense here. 


was (Author: xuanyuan):
I think it maybe not a bug.
#KO: returns r1 and r3ex.filter(('c1 = 1') and ('c2 = 1')).show()
This cause by python self base string __and__ implementation. After passing to 
df.filter, there's only 'c2 = 1'.
#KO: returns r0 and r3ex.filter('c1 = 1 & c2 = 1').show()#KO: returns r0 and 
r3ex.filter('c1 == 1 & c2 == 1').show()
As you mentioned, [https://github.com/apache/spark/pull/6961] actually fix the 
'&' between column, but not string expression like 'c1 = 1 & c2 = 1', here in 
ex.filter('c1 = 1 & c2 = 1'), Spark parse it to valueExpression like: 'Filter 
(('a = (1 & 'b)) = 1), I think this make sense here. 

> incorrect handling of boolean expressions when using column in expressions in 
> pyspark.sql.DataFrame filter function
> ---
>
> Key: SPARK-24210
> URL: https://issues.apache.org/jira/browse/SPARK-24210
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.1.2
>Reporter: Michael H
>Priority: Major
>
> {code:python}
> ex = spark.createDataFrame([
> ('r0', 0, 0),
> ('r1', 0, 1),
> ('r2', 1, 0),
> ('r3', 1, 1)]\
>   , "row: string, c1: int, c2: int")
> #KO: returns r1 and r3
> ex.filter(('c1 = 1') and ('c2 = 1')).show()
> #OK, raises an exception
> ex.filter(('c1 == 1') & ('c2 == 1')).show()
> #KO: returns r0 and r3
> ex.filter('c1 = 1 & c2 = 1').show()
> #KO: returns r0 and r3
> ex.filter('c1 == 1 & c2 == 1').show()
> #OK: returns r3 only
> ex.filter('c1 = 1 and c2 = 1').show()
> #OK: returns r3 only
> ex.filter('c1 == 1 and c2 == 1').show()
> {code}
> building the expressions using {code}ex.c1{code} or {code}ex['c1']{code} we 
> don't have this.
> Issue seems related with
> https://github.com/apache/spark/pull/6961



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24210) incorrect handling of boolean expressions when using column in expressions in pyspark.sql.DataFrame filter function

2018-06-04 Thread Li Yuanjian (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24210?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16500383#comment-16500383
 ] 

Li Yuanjian commented on SPARK-24210:
-

I think it maybe not a bug.
#KO: returns r1 and r3ex.filter(('c1 = 1') and ('c2 = 1')).show()
This cause by python self base string __and__ implementation. After passing to 
df.filter, there's only 'c2 = 1'.
#KO: returns r0 and r3ex.filter('c1 = 1 & c2 = 1').show()#KO: returns r0 and 
r3ex.filter('c1 == 1 & c2 == 1').show()
As you mentioned, [https://github.com/apache/spark/pull/6961] actually fix the 
'&' between column, but not string expression like 'c1 = 1 & c2 = 1', here in 
ex.filter('c1 = 1 & c2 = 1'), Spark parse it to valueExpression like: 'Filter 
(('a = (1 & 'b)) = 1), I think this make sense here. 

> incorrect handling of boolean expressions when using column in expressions in 
> pyspark.sql.DataFrame filter function
> ---
>
> Key: SPARK-24210
> URL: https://issues.apache.org/jira/browse/SPARK-24210
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.1.2
>Reporter: Michael H
>Priority: Major
>
> {code:python}
> ex = spark.createDataFrame([
> ('r0', 0, 0),
> ('r1', 0, 1),
> ('r2', 1, 0),
> ('r3', 1, 1)]\
>   , "row: string, c1: int, c2: int")
> #KO: returns r1 and r3
> ex.filter(('c1 = 1') and ('c2 = 1')).show()
> #OK, raises an exception
> ex.filter(('c1 == 1') & ('c2 == 1')).show()
> #KO: returns r0 and r3
> ex.filter('c1 = 1 & c2 = 1').show()
> #KO: returns r0 and r3
> ex.filter('c1 == 1 & c2 == 1').show()
> #OK: returns r3 only
> ex.filter('c1 = 1 and c2 = 1').show()
> #OK: returns r3 only
> ex.filter('c1 == 1 and c2 == 1').show()
> {code}
> building the expressions using {code}ex.c1{code} or {code}ex['c1']{code} we 
> don't have this.
> Issue seems related with
> https://github.com/apache/spark/pull/6961



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24304) Scheduler changes for continuous processing shuffle support

2018-05-17 Thread Li Yuanjian (JIRA)
Li Yuanjian created SPARK-24304:
---

 Summary: Scheduler changes for continuous processing shuffle 
support
 Key: SPARK-24304
 URL: https://issues.apache.org/jira/browse/SPARK-24304
 Project: Spark
  Issue Type: Sub-task
  Components: Structured Streaming
Affects Versions: 2.4.0
Reporter: Li Yuanjian


Both docs including the change requirements of scheduler, add a new jira and 
collect our comments here.  
https://docs.google.com/document/d/14cGJ75v9myznywtB35ytEqL9wHy9xfZRv06B6g2tUgI/edit#bookmark=id.tjcvevpyuv4x
https://docs.google.com/document/d/1IL4kJoKrZWeyIhklKUJqsW-yEN7V7aL05MmM65AYOfE/edit?disco=B4z058g



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24293) Serialized shuffle supports mapSideCombine

2018-05-16 Thread Li Yuanjian (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-24293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16478406#comment-16478406
 ] 

Li Yuanjian commented on SPARK-24293:
-

{quote}
doing the map side combine manually with SQL operators, to get SQL 
optimizations like whole-stage-codegen.
{quote}
Thanks for [~cloud_fan] detailed explanation, this resolved our confusion of 
why map-side combine not supported in SerializedShuffle.


> Serialized shuffle supports mapSideCombine
> --
>
> Key: SPARK-24293
> URL: https://issues.apache.org/jira/browse/SPARK-24293
> Project: Spark
>  Issue Type: Brainstorming
>  Components: Shuffle
>Affects Versions: 2.3.0
>Reporter: Xianjin YE
>Priority: Major
>
> While doing research on integrating my company's internal Shuffle Service 
> with Spark, I found it is possible to support mapSideCombine with serialized 
> shuffle.
> The simple idea is that the `UnsafeShuffleWriter` uses a `Combiner` to 
> accumulate records when mapSideCombine is required before inserting into 
> `ShuffleExternalSorter`. The `Combiner` will tracking it's memory usage or 
> elements accumulated and is never spilled. When the `Combiner` accumulates 
> enough records(varied by different strategies), the accumulated (K, C) pairs 
> are then inserted into the `ShuffleExternalSorter`.  After that, the 
> `Combiner` is reset to empty state.
> After this change, combinedValues are sent to sorter segment by segment, and 
> the `BlockStoreShuffleReader` already handles this case.
> I did a local POC, and looks like that I can get the same result with normal 
> SortShuffle. The performance is not optimized yet. The most significant part 
> of code is shown as below: 
> {code:java}
> // code placeholder
> while (records.hasNext()) {
>   Product2 record = records.next();
>   if (this.mapSideCombine) {
> this.aggregator.accumulateRecord(record);
> if (this.aggregator.accumulatedKeyNum() >= 160_000) { // for poc
>   scala.collection.Iterator> combinedIterator = 
> this.aggregator.accumulatedIterator();
>   while (combinedIterator.hasNext()) {
> insertRecordIntoSorter(combinedIterator.next());
>   }
>   this.aggregator.resetAccumulation();
> }
>   } else {
> insertRecordIntoSorter(record);
>   }
> }
> if (this.mapSideCombine && this.aggregator.accumulatedKeyNum() > 0) {
>   scala.collection.Iterator> combinedIterator = 
> this.aggregator.accumulatedIterator();
>   while (combinedIterator.hasNext()) {
> insertRecordIntoSorter(combinedIterator.next());
>   }
>   this.aggregator.resetAccumulation(1);
> }
> {code}
>  
>  Is there something I am missing? cc [~joshrosen] [~cloud_fan] [~XuanYuan]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23128) A new approach to do adaptive execution in Spark SQL

2018-05-11 Thread Li Yuanjian (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23128?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16471812#comment-16471812
 ] 

Li Yuanjian commented on SPARK-23128:
-

I collected some user cases and performance improve effect during Baidu 
internal usage of this patch, summarize as following 3 scenario:
1. SortMergeJoin to BroadcastJoin
The SortMergeJoin transform to BroadcastJoin over deeply tree node can bring us 
{color:red}50% to 200%{color} boosting on query performance, and this strategy 
alway hit the BI scenario like join several tables with filter strategy in 
subquery
2. Long running application or use Spark as a service
In this case, long running application refers to the duration of application 
near 1 hour. Using Spark as a service refers to use spark-shell and keep submit 
sql or use the service of Spark like Zeppelin, Livy or our internal sql service 
Baidu BigSQL. In such scenario, all spark jobs share same partition number, so 
enable AE and add configs about expected task info including data size, row 
number, min\max partition number and etc, will bring us 
{color:red}50%-100%{color} boosting on performance improvement.
3. GraphFrame jobs
The last scenario is the application use GraphFrame, in this case, user has a 
2-dimension graph with 1 billion edges, use the connected componentsalgorithm 
in GraphFrame. With enabling AE, the duration of app reduce from 58min to 
32min, almost {color:red}100%{color} boosting on performance improvement.

The detailed screenshot and config in the attached pdf. 

> A new approach to do adaptive execution in Spark SQL
> 
>
> Key: SPARK-23128
> URL: https://issues.apache.org/jira/browse/SPARK-23128
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Carson Wang
>Priority: Major
> Attachments: AdaptiveExecutioninBaidu.pdf
>
>
> SPARK-9850 proposed the basic idea of adaptive execution in Spark. In 
> DAGScheduler, a new API is added to support submitting a single map stage.  
> The current implementation of adaptive execution in Spark SQL supports 
> changing the reducer number at runtime. An Exchange coordinator is used to 
> determine the number of post-shuffle partitions for a stage that needs to 
> fetch shuffle data from one or multiple stages. The current implementation 
> adds ExchangeCoordinator while we are adding Exchanges. However there are 
> some limitations. First, it may cause additional shuffles that may decrease 
> the performance. We can see this from EnsureRequirements rule when it adds 
> ExchangeCoordinator.  Secondly, it is not a good idea to add 
> ExchangeCoordinators while we are adding Exchanges because we don’t have a 
> global picture of all shuffle dependencies of a post-shuffle stage. I.e. for 
> 3 tables’ join in a single stage, the same ExchangeCoordinator should be used 
> in three Exchanges but currently two separated ExchangeCoordinator will be 
> added. Thirdly, with the current framework it is not easy to implement other 
> features in adaptive execution flexibly like changing the execution plan and 
> handling skewed join at runtime.
> We'd like to introduce a new way to do adaptive execution in Spark SQL and 
> address the limitations. The idea is described at 
> [https://docs.google.com/document/d/1mpVjvQZRAkD-Ggy6-hcjXtBPiQoVbZGe3dLnAKgtJ4k/edit?usp=sharing]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-23128) A new approach to do adaptive execution in Spark SQL

2018-05-11 Thread Li Yuanjian (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23128?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Li Yuanjian updated SPARK-23128:

Attachment: AdaptiveExecutioninBaidu.pdf

> A new approach to do adaptive execution in Spark SQL
> 
>
> Key: SPARK-23128
> URL: https://issues.apache.org/jira/browse/SPARK-23128
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Carson Wang
>Priority: Major
> Attachments: AdaptiveExecutioninBaidu.pdf
>
>
> SPARK-9850 proposed the basic idea of adaptive execution in Spark. In 
> DAGScheduler, a new API is added to support submitting a single map stage.  
> The current implementation of adaptive execution in Spark SQL supports 
> changing the reducer number at runtime. An Exchange coordinator is used to 
> determine the number of post-shuffle partitions for a stage that needs to 
> fetch shuffle data from one or multiple stages. The current implementation 
> adds ExchangeCoordinator while we are adding Exchanges. However there are 
> some limitations. First, it may cause additional shuffles that may decrease 
> the performance. We can see this from EnsureRequirements rule when it adds 
> ExchangeCoordinator.  Secondly, it is not a good idea to add 
> ExchangeCoordinators while we are adding Exchanges because we don’t have a 
> global picture of all shuffle dependencies of a post-shuffle stage. I.e. for 
> 3 tables’ join in a single stage, the same ExchangeCoordinator should be used 
> in three Exchanges but currently two separated ExchangeCoordinator will be 
> added. Thirdly, with the current framework it is not easy to implement other 
> features in adaptive execution flexibly like changing the execution plan and 
> handling skewed join at runtime.
> We'd like to introduce a new way to do adaptive execution in Spark SQL and 
> address the limitations. The idea is described at 
> [https://docs.google.com/document/d/1mpVjvQZRAkD-Ggy6-hcjXtBPiQoVbZGe3dLnAKgtJ4k/edit?usp=sharing]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Issue Comment Deleted] (SPARK-23128) A new approach to do adaptive execution in Spark SQL

2018-05-11 Thread Li Yuanjian (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23128?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Li Yuanjian updated SPARK-23128:

Comment: was deleted

(was: I collected some user cases and performance improve effect during Baidu 
internal usage of this patch, summarize as following 3 scenario:
1. SortMergeJoin to BroadcastJoin
The SortMergeJoin transform to BroadcastJoin over deeply tree node can bring us 
{color:red}50% to 200%{color} boosting on query performance, and this strategy 
alway hit the BI scenario like join several tables with filter strategy in 
subquery
2. Long running application or use Spark as a service
In this case, long running application refers to the duration of application 
near 1 hour. Using Spark as a service refers to use spark-shell and keep submit 
sql or use the service of Spark like Zeppelin, Livy or our internal sql service 
Baidu BigSQL. In such scenario, all spark jobs share same partition number, so 
enable AE and add configs about expected task info including data size, row 
number, min\max partition number and etc, will bring us 
{color:red}50%-100%{color} boosting on performance improvement.
3. GraphFrame jobs
The last scenario is the application use GraphFrame, in this case, user has a 
2-dimension graph with 1 billion edges, use the connected componentsalgorithm 
in GraphFrame. With enabling AE, the duration of app reduce from 58min to 
32min, almost {color:red}100%{color} boosting on performance improvement.

The detailed screenshot and config in the attached pdf. )

> A new approach to do adaptive execution in Spark SQL
> 
>
> Key: SPARK-23128
> URL: https://issues.apache.org/jira/browse/SPARK-23128
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Carson Wang
>Priority: Major
> Attachments: AdaptiveExecutioninBaidu.pdf
>
>
> SPARK-9850 proposed the basic idea of adaptive execution in Spark. In 
> DAGScheduler, a new API is added to support submitting a single map stage.  
> The current implementation of adaptive execution in Spark SQL supports 
> changing the reducer number at runtime. An Exchange coordinator is used to 
> determine the number of post-shuffle partitions for a stage that needs to 
> fetch shuffle data from one or multiple stages. The current implementation 
> adds ExchangeCoordinator while we are adding Exchanges. However there are 
> some limitations. First, it may cause additional shuffles that may decrease 
> the performance. We can see this from EnsureRequirements rule when it adds 
> ExchangeCoordinator.  Secondly, it is not a good idea to add 
> ExchangeCoordinators while we are adding Exchanges because we don’t have a 
> global picture of all shuffle dependencies of a post-shuffle stage. I.e. for 
> 3 tables’ join in a single stage, the same ExchangeCoordinator should be used 
> in three Exchanges but currently two separated ExchangeCoordinator will be 
> added. Thirdly, with the current framework it is not easy to implement other 
> features in adaptive execution flexibly like changing the execution plan and 
> handling skewed join at runtime.
> We'd like to introduce a new way to do adaptive execution in Spark SQL and 
> address the limitations. The idea is described at 
> [https://docs.google.com/document/d/1mpVjvQZRAkD-Ggy6-hcjXtBPiQoVbZGe3dLnAKgtJ4k/edit?usp=sharing]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23128) A new approach to do adaptive execution in Spark SQL

2018-05-11 Thread Li Yuanjian (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23128?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16471811#comment-16471811
 ] 

Li Yuanjian commented on SPARK-23128:
-

I collected some user cases and performance improve effect during Baidu 
internal usage of this patch, summarize as following 3 scenario:
1. SortMergeJoin to BroadcastJoin
The SortMergeJoin transform to BroadcastJoin over deeply tree node can bring us 
{color:red}50% to 200%{color} boosting on query performance, and this strategy 
alway hit the BI scenario like join several tables with filter strategy in 
subquery
2. Long running application or use Spark as a service
In this case, long running application refers to the duration of application 
near 1 hour. Using Spark as a service refers to use spark-shell and keep submit 
sql or use the service of Spark like Zeppelin, Livy or our internal sql service 
Baidu BigSQL. In such scenario, all spark jobs share same partition number, so 
enable AE and add configs about expected task info including data size, row 
number, min\max partition number and etc, will bring us 
{color:red}50%-100%{color} boosting on performance improvement.
3. GraphFrame jobs
The last scenario is the application use GraphFrame, in this case, user has a 
2-dimension graph with 1 billion edges, use the connected componentsalgorithm 
in GraphFrame. With enabling AE, the duration of app reduce from 58min to 
32min, almost {color:red}100%{color} boosting on performance improvement.

The detailed screenshot and config in the attached pdf. 

> A new approach to do adaptive execution in Spark SQL
> 
>
> Key: SPARK-23128
> URL: https://issues.apache.org/jira/browse/SPARK-23128
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Carson Wang
>Priority: Major
> Attachments: AdaptiveExecutioninBaidu.pdf
>
>
> SPARK-9850 proposed the basic idea of adaptive execution in Spark. In 
> DAGScheduler, a new API is added to support submitting a single map stage.  
> The current implementation of adaptive execution in Spark SQL supports 
> changing the reducer number at runtime. An Exchange coordinator is used to 
> determine the number of post-shuffle partitions for a stage that needs to 
> fetch shuffle data from one or multiple stages. The current implementation 
> adds ExchangeCoordinator while we are adding Exchanges. However there are 
> some limitations. First, it may cause additional shuffles that may decrease 
> the performance. We can see this from EnsureRequirements rule when it adds 
> ExchangeCoordinator.  Secondly, it is not a good idea to add 
> ExchangeCoordinators while we are adding Exchanges because we don’t have a 
> global picture of all shuffle dependencies of a post-shuffle stage. I.e. for 
> 3 tables’ join in a single stage, the same ExchangeCoordinator should be used 
> in three Exchanges but currently two separated ExchangeCoordinator will be 
> added. Thirdly, with the current framework it is not easy to implement other 
> features in adaptive execution flexibly like changing the execution plan and 
> handling skewed join at runtime.
> We'd like to introduce a new way to do adaptive execution in Spark SQL and 
> address the limitations. The idea is described at 
> [https://docs.google.com/document/d/1mpVjvQZRAkD-Ggy6-hcjXtBPiQoVbZGe3dLnAKgtJ4k/edit?usp=sharing]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-24235) create the top-of-task RDD sending rows to the remote buffer

2018-05-10 Thread Li Yuanjian (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-24235?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Li Yuanjian updated SPARK-24235:

Description: 
[https://docs.google.com/document/d/1IL4kJoKrZWeyIhklKUJqsW-yEN7V7aL05MmM65AYOfE/edit#heading=h.8t3ci57f7uii]

 

Note that after 
[https://github.com/apache/spark/pull/21239|https://github.com/apache/spark/pull/21239],this
 will need to be responsible for incrementing its task's EpochTracker.

  was:
[https://docs.google.com/document/d/1IL4kJoKrZWeyIhklKUJqsW-yEN7V7aL05MmM65AYOfE/edit#heading=h.8t3ci57f7uii]

 

Note that after 
[https://github.com/apache/spark/pull/21239|https://github.com/apache/spark/pull/21239,]
 ,this will need to be responsible for incrementing its task's EpochTracker.


> create the top-of-task RDD sending rows to the remote buffer
> 
>
> Key: SPARK-24235
> URL: https://issues.apache.org/jira/browse/SPARK-24235
> Project: Spark
>  Issue Type: Sub-task
>  Components: Structured Streaming
>Affects Versions: 2.4.0
>Reporter: Jose Torres
>Priority: Major
>
> [https://docs.google.com/document/d/1IL4kJoKrZWeyIhklKUJqsW-yEN7V7aL05MmM65AYOfE/edit#heading=h.8t3ci57f7uii]
>  
> Note that after 
> [https://github.com/apache/spark/pull/21239|https://github.com/apache/spark/pull/21239],this
>  will need to be responsible for incrementing its task's EpochTracker.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-24235) create the top-of-task RDD sending rows to the remote buffer

2018-05-10 Thread Li Yuanjian (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-24235?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Li Yuanjian updated SPARK-24235:

Description: 
[https://docs.google.com/document/d/1IL4kJoKrZWeyIhklKUJqsW-yEN7V7aL05MmM65AYOfE/edit#heading=h.8t3ci57f7uii]

 

Note that after 
[https://github.com/apache/spark/pull/21239|https://github.com/apache/spark/pull/21239,],
 this will need to be responsible for incrementing its task's EpochTracker.

  was:
[https://docs.google.com/document/d/1IL4kJoKrZWeyIhklKUJqsW-yEN7V7aL05MmM65AYOfE/edit#heading=h.8t3ci57f7uii]

 

Note that after [https://github.com/apache/spark/pull/21239,] this will need to 
be responsible for incrementing its task's EpochTracker.


> create the top-of-task RDD sending rows to the remote buffer
> 
>
> Key: SPARK-24235
> URL: https://issues.apache.org/jira/browse/SPARK-24235
> Project: Spark
>  Issue Type: Sub-task
>  Components: Structured Streaming
>Affects Versions: 2.4.0
>Reporter: Jose Torres
>Priority: Major
>
> [https://docs.google.com/document/d/1IL4kJoKrZWeyIhklKUJqsW-yEN7V7aL05MmM65AYOfE/edit#heading=h.8t3ci57f7uii]
>  
> Note that after 
> [https://github.com/apache/spark/pull/21239|https://github.com/apache/spark/pull/21239,],
>  this will need to be responsible for incrementing its task's EpochTracker.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-24235) create the top-of-task RDD sending rows to the remote buffer

2018-05-10 Thread Li Yuanjian (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-24235?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Li Yuanjian updated SPARK-24235:

Description: 
[https://docs.google.com/document/d/1IL4kJoKrZWeyIhklKUJqsW-yEN7V7aL05MmM65AYOfE/edit#heading=h.8t3ci57f7uii]

 

Note that after 
[https://github.com/apache/spark/pull/21239|https://github.com/apache/spark/pull/21239,]
 ,this will need to be responsible for incrementing its task's EpochTracker.

  was:
[https://docs.google.com/document/d/1IL4kJoKrZWeyIhklKUJqsW-yEN7V7aL05MmM65AYOfE/edit#heading=h.8t3ci57f7uii]

 

Note that after 
[https://github.com/apache/spark/pull/21239|https://github.com/apache/spark/pull/21239,],
 this will need to be responsible for incrementing its task's EpochTracker.


> create the top-of-task RDD sending rows to the remote buffer
> 
>
> Key: SPARK-24235
> URL: https://issues.apache.org/jira/browse/SPARK-24235
> Project: Spark
>  Issue Type: Sub-task
>  Components: Structured Streaming
>Affects Versions: 2.4.0
>Reporter: Jose Torres
>Priority: Major
>
> [https://docs.google.com/document/d/1IL4kJoKrZWeyIhklKUJqsW-yEN7V7aL05MmM65AYOfE/edit#heading=h.8t3ci57f7uii]
>  
> Note that after 
> [https://github.com/apache/spark/pull/21239|https://github.com/apache/spark/pull/21239,]
>  ,this will need to be responsible for incrementing its task's EpochTracker.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24036) Stateful operators in continuous processing

2018-05-10 Thread Li Yuanjian (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-24036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16470067#comment-16470067
 ] 

Li Yuanjian commented on SPARK-24036:
-

I agree with the division about the kinds of tasks, that's quite clear, but 
maybe all of this can be maximum transparent to scheduler by reusing the 
ResultTask and ShuffleMapTask design, could the DAGScheduler use 
ContinuousShuffleMapTask to replace original ShuffleMapTask?
{quote}Changing DAGScheduler to accommodate continuous processing would create 
significant additional complexity I don't think we can really justify.
{quote}
So here, in my opinion, maybe not as complex as we think? If I'm wrong please 
let me know. :)
{quote}Whether we need to write an explicit shuffle RDD class or not would I 
think come down to an implementation detail of SPARK-24236. It depends on 
what's the cleanest way to unfold the SparkPlan tree.
{quote}
 Yep, can't agree more. I'll arrange this part of our internal code and give a 
preview PR. We'll appreciate very much with your any opinions!

> Stateful operators in continuous processing
> ---
>
> Key: SPARK-24036
> URL: https://issues.apache.org/jira/browse/SPARK-24036
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.4.0
>Reporter: Jose Torres
>Priority: Major
>
> The first iteration of continuous processing in Spark 2.3 does not work with 
> stateful operators.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-24036) Stateful operators in continuous processing

2018-05-09 Thread Li Yuanjian (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-24036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16469830#comment-16469830
 ] 

Li Yuanjian edited comment on SPARK-24036 at 5/10/18 2:32 AM:
--

Hi [~joseph.torres]

Thanks for cc me, looks great! 

My doc maybe included sub-task SPARK-24237 and SPARK-24236, could you have a 
look about the design: [design 
link|https://docs.google.com/document/d/14cGJ75v9myznywtB35ytEqL9wHy9xfZRv06B6g2tUgI/edit#bookmark=id.2lfv2glj7ny0],
 I'll take this two Jira and discuss with you in detail.

Also in our practice, a new kind of continuous shuffle map task(I mentioned 
this in your doc comments: [comment 
link|https://docs.google.com/document/d/1IL4kJoKrZWeyIhklKUJqsW-yEN7V7aL05MmM65AYOfE/edit?disco=B4X1H_E])
 and shuffle rdd should be added, do you agree to add another two Jira about 
these?

 


was (Author: xuanyuan):
Hi [~joseph.torres]

Thanks for cc me, looks great! 

My doc maybe included sub-task SPARK-24237 and SPARK-23236, could you have a 
look about the design: [design 
link|https://docs.google.com/document/d/14cGJ75v9myznywtB35ytEqL9wHy9xfZRv06B6g2tUgI/edit#bookmark=id.2lfv2glj7ny0],
 I'll take this two Jira and discuss with you in detail.

Also in our practice, a new kind of continuous shuffle map task(I mentioned 
this in your doc comments: [comment 
link|https://docs.google.com/document/d/1IL4kJoKrZWeyIhklKUJqsW-yEN7V7aL05MmM65AYOfE/edit?disco=B4X1H_E])
 and shuffle rdd should be added, do you agree to add another two Jira about 
these?

 

> Stateful operators in continuous processing
> ---
>
> Key: SPARK-24036
> URL: https://issues.apache.org/jira/browse/SPARK-24036
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.4.0
>Reporter: Jose Torres
>Priority: Major
>
> The first iteration of continuous processing in Spark 2.3 does not work with 
> stateful operators.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24036) Stateful operators in continuous processing

2018-05-09 Thread Li Yuanjian (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-24036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16469830#comment-16469830
 ] 

Li Yuanjian commented on SPARK-24036:
-

Hi [~joseph.torres]

Thanks for cc me, looks great! 

My doc maybe included sub-task SPARK-24237 and SPARK-23236, could you have a 
look about the design: [design 
link|https://docs.google.com/document/d/14cGJ75v9myznywtB35ytEqL9wHy9xfZRv06B6g2tUgI/edit#bookmark=id.2lfv2glj7ny0],
 I'll take this two Jira and discuss with you in detail.

Also in our practice, a new kind of continuous shuffle map task(I mentioned 
this in your doc comments: [comment 
link|https://docs.google.com/document/d/1IL4kJoKrZWeyIhklKUJqsW-yEN7V7aL05MmM65AYOfE/edit?disco=B4X1H_E])
 and shuffle rdd should be added, do you agree to add another two Jira about 
these?

 

> Stateful operators in continuous processing
> ---
>
> Key: SPARK-24036
> URL: https://issues.apache.org/jira/browse/SPARK-24036
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.4.0
>Reporter: Jose Torres
>Priority: Major
>
> The first iteration of continuous processing in Spark 2.3 does not work with 
> stateful operators.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Closed] (SPARK-24108) ChunkedByteBuffer.writeFully method has not reset the limit value

2018-04-29 Thread Li Yuanjian (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-24108?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Li Yuanjian closed SPARK-24108.
---

Duplicated submit for 
[SPARK-24107|https://issues.apache.org/jira/browse/SPARK-24107], just close it.

> ChunkedByteBuffer.writeFully method has not reset the limit value
> -
>
> Key: SPARK-24108
> URL: https://issues.apache.org/jira/browse/SPARK-24108
> Project: Spark
>  Issue Type: Bug
>  Components: Block Manager, Input/Output
>Affects Versions: 2.2.0, 2.2.1, 2.3.0
>Reporter: wangjinhai
>Priority: Major
> Fix For: 2.4.0
>
>
> ChunkedByteBuffer.writeFully method has not reset the limit value. When 
> chunks larger than bufferWriteChunkSize, such as 80*1024*1024 larger than
> config.BUFFER_WRITE_CHUNK_SIZE(64 * 1024 * 1024),only while once, will lost 
> 16*1024*1024 byte



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21661) SparkSQL can't merge load table from Hadoop

2018-04-26 Thread Li Yuanjian (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21661?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16454070#comment-16454070
 ] 

Li Yuanjian commented on SPARK-21661:
-

Got it.

> SparkSQL can't merge load table from Hadoop
> ---
>
> Key: SPARK-21661
> URL: https://issues.apache.org/jira/browse/SPARK-21661
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Dapeng Sun
>Assignee: Li Yuanjian
>Priority: Major
> Fix For: 2.3.0
>
>
> Here is the original text of external table on HDFS:
> {noformat}
> PermissionOwner   Group   SizeLast Modified   Replication Block 
> Size  Name
> -rw-r--r--rootsupergroup  0 B 8/6/2017, 11:43:03 PM   3   
> 256 MB  income_band_001.dat
> -rw-r--r--rootsupergroup  0 B 8/6/2017, 11:39:31 PM   3   
> 256 MB  income_band_002.dat
> ...
> -rw-r--r--rootsupergroup  327 B   8/6/2017, 11:44:47 PM   3   
> 256 MB  income_band_530.dat
> {noformat}
> After SparkSQL load, every files have a output file, even the files are 0B. 
> For the load on Hive, the data files would be merged according the data size 
> of original files.
> Reproduce:
> {noformat}
> CREATE EXTERNAL TABLE t1 (a int,b string)  STORED AS TEXTFILE LOCATION 
> "hdfs://xxx:9000/data/t1"
> CREATE TABLE t2 STORED AS PARQUET AS SELECT * FROM t1;
> {noformat}
> The table t2 have many small files without data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21661) SparkSQL can't merge load table from Hadoop

2018-04-26 Thread Li Yuanjian (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21661?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16453591#comment-16453591
 ] 

Li Yuanjian commented on SPARK-21661:
-

[~hyukjin.kwon] Hi Hyukjin, do you think we should backport 
[https://github.com/apache/spark/pull/18654] to branch 2.1\2.2? I found there's 
some Jira report this problem.

> SparkSQL can't merge load table from Hadoop
> ---
>
> Key: SPARK-21661
> URL: https://issues.apache.org/jira/browse/SPARK-21661
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Dapeng Sun
>Assignee: Li Yuanjian
>Priority: Major
>
> Here is the original text of external table on HDFS:
> {noformat}
> PermissionOwner   Group   SizeLast Modified   Replication Block 
> Size  Name
> -rw-r--r--rootsupergroup  0 B 8/6/2017, 11:43:03 PM   3   
> 256 MB  income_band_001.dat
> -rw-r--r--rootsupergroup  0 B 8/6/2017, 11:39:31 PM   3   
> 256 MB  income_band_002.dat
> ...
> -rw-r--r--rootsupergroup  327 B   8/6/2017, 11:44:47 PM   3   
> 256 MB  income_band_530.dat
> {noformat}
> After SparkSQL load, every files have a output file, even the files are 0B. 
> For the load on Hive, the data files would be merged according the data size 
> of original files.
> Reproduce:
> {noformat}
> CREATE EXTERNAL TABLE t1 (a int,b string)  STORED AS TEXTFILE LOCATION 
> "hdfs://xxx:9000/data/t1"
> CREATE TABLE t2 STORED AS PARQUET AS SELECT * FROM t1;
> {noformat}
> The table t2 have many small files without data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-23811) FetchFailed comes before Success of same task will cause child stage never succeed

2018-03-30 Thread Li Yuanjian (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23811?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Li Yuanjian updated SPARK-23811:

Description: 
This is a bug caused by abnormal scenario describe below:
 # ShuffleMapTask 1.0 running, this task will fetch data from ExecutorA
 # ExecutorA Lost, trigger `mapOutputTracker.removeOutputsOnExecutor(execId)` , 
shuffleStatus changed.
 # Speculative ShuffleMapTask 1.1 start, got a FetchFailed immediately.
 # ShuffleMapTask 1.0 finally succeed, but because of 1.1's FetchFailed, stage 
still mark as failed stage.
 # ShuffleMapTask 1 is the last task of its stage, so this stage will never 
succeed because of there's no missing task DagScheduler can get.

  was:
This is a bug caused by abnormal scenario describe below:
 # ShuffleMapTask 1.0 running, this task will fetch data from ExecutorA
 # ExecutorA Lost, trigger `mapOutputTracker.removeOutputsOnExecutor(execId)` , 
shuffleStatus changed.
 # Speculative ShuffleMapTask 1.1 start, got a FetchFailed immediately.
 # ShuffleMapTask 1 is the last task of its stage, so this stage will never 
succeed because of there's no missing task DagScheduler can get.


> FetchFailed comes before Success of same task will cause child stage never 
> succeed
> --
>
> Key: SPARK-23811
> URL: https://issues.apache.org/jira/browse/SPARK-23811
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.0, 2.3.0
>Reporter: Li Yuanjian
>Priority: Major
> Attachments: 1.png, 2.png
>
>
> This is a bug caused by abnormal scenario describe below:
>  # ShuffleMapTask 1.0 running, this task will fetch data from ExecutorA
>  # ExecutorA Lost, trigger `mapOutputTracker.removeOutputsOnExecutor(execId)` 
> , shuffleStatus changed.
>  # Speculative ShuffleMapTask 1.1 start, got a FetchFailed immediately.
>  # ShuffleMapTask 1.0 finally succeed, but because of 1.1's FetchFailed, 
> stage still mark as failed stage.
>  # ShuffleMapTask 1 is the last task of its stage, so this stage will never 
> succeed because of there's no missing task DagScheduler can get.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-23811) FetchFailed comes before Success of same task will cause child stage never succeed

2018-03-29 Thread Li Yuanjian (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23811?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Li Yuanjian updated SPARK-23811:

Summary: FetchFailed comes before Success of same task will cause child 
stage never succeed  (was: Same tasks' FetchFailed event comes before Success 
will cause child stage never succeed)

> FetchFailed comes before Success of same task will cause child stage never 
> succeed
> --
>
> Key: SPARK-23811
> URL: https://issues.apache.org/jira/browse/SPARK-23811
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.0, 2.3.0
>Reporter: Li Yuanjian
>Priority: Major
> Attachments: 1.png, 2.png
>
>
> This is a bug caused by abnormal scenario describe below:
>  # ShuffleMapTask 1.0 running, this task will fetch data from ExecutorA
>  # ExecutorA Lost, trigger `mapOutputTracker.removeOutputsOnExecutor(execId)` 
> , shuffleStatus changed.
>  # Speculative ShuffleMapTask 1.1 start, got a FetchFailed immediately.
>  # ShuffleMapTask 1 is the last task of its stage, so this stage will never 
> succeed because of there's no missing task DagScheduler can get.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23811) Same tasks' FetchFailed event comes before Success will cause child stage never succeed

2018-03-28 Thread Li Yuanjian (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23811?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16418434#comment-16418434
 ] 

Li Yuanjian commented on SPARK-23811:
-

 

The scenario can be reproduced by below test case added in 
`{{DAGSchedulerSuite`}}
{code:java}
/**
 * This tests the case where origin task success after speculative task got 
FetchFailed
 * before.
 */
test("[SPARK-23811] Fetch failed task should kill other attempt") {
  // Create 3 RDDs with shuffle dependencies on each other: rddA <--- rddB <--- 
rddC
  val rddA = new MyRDD(sc, 2, Nil)
  val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2))
  val shuffleIdA = shuffleDepA.shuffleId

  val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = mapOutputTracker)
  val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(2))

  val rddC = new MyRDD(sc, 2, List(shuffleDepB), tracker = mapOutputTracker)

  submit(rddC, Array(0, 1))

  // Complete both tasks in rddA.
  assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0)
  complete(taskSets(0), Seq(
(Success, makeMapStatus("hostA", 2)),
(Success, makeMapStatus("hostB", 2

  // The first task success
  runEvent(makeCompletionEvent(
taskSets(1).tasks(0), Success, makeMapStatus("hostB", 2)))

  // The second task's speculative attempt fails first, but task self still 
running.
  // This may caused by ExecutorLost.
  runEvent(makeCompletionEvent(
taskSets(1).tasks(1),
FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, "ignored"),
null))
  // Check currently missing partition
  assert(mapOutputTracker.findMissingPartitions(shuffleDepB.shuffleId).get.size 
=== 1)
  val missingPartition = 
mapOutputTracker.findMissingPartitions(shuffleDepB.shuffleId).get(0)

  // The second result task self success soon
  runEvent(makeCompletionEvent(
taskSets(1).tasks(1), Success, makeMapStatus("hostB", 2)))
  // No missing partitions here, this will cause child stage never succeed
  assert(mapOutputTracker.findMissingPartitions(shuffleDepB.shuffleId).get.size 
=== 0)
}
{code}
 

> Same tasks' FetchFailed event comes before Success will cause child stage 
> never succeed
> ---
>
> Key: SPARK-23811
> URL: https://issues.apache.org/jira/browse/SPARK-23811
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.0, 2.3.0
>Reporter: Li Yuanjian
>Priority: Major
> Attachments: 1.png, 2.png
>
>
> This is a bug caused by abnormal scenario describe below:
>  # ShuffleMapTask 1.0 running, this task will fetch data from ExecutorA
>  # ExecutorA Lost, trigger `mapOutputTracker.removeOutputsOnExecutor(execId)` 
> , shuffleStatus changed.
>  # Speculative ShuffleMapTask 1.1 start, got a FetchFailed immediately.
>  # ShuffleMapTask 1 is the last task of its stage, so this stage will never 
> succeed because of there's no missing task DagScheduler can get.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-23811) Same tasks' FetchFailed event comes before Success will cause child stage never succeed

2018-03-28 Thread Li Yuanjian (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23811?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Li Yuanjian updated SPARK-23811:

Attachment: 2.png

> Same tasks' FetchFailed event comes before Success will cause child stage 
> never succeed
> ---
>
> Key: SPARK-23811
> URL: https://issues.apache.org/jira/browse/SPARK-23811
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.0, 2.3.0
>Reporter: Li Yuanjian
>Priority: Major
> Attachments: 1.png, 2.png
>
>
> This is a bug caused by abnormal scenario describe below:
>  # ShuffleMapTask 1.0 running, this task will fetch data from ExecutorA
>  # ExecutorA Lost, trigger `mapOutputTracker.removeOutputsOnExecutor(execId)` 
> , shuffleStatus changed.
>  # Speculative ShuffleMapTask 1.1 start, got a FetchFailed immediately.
>  # ShuffleMapTask 1 is the last task of its stage, so this stage will never 
> succeed because of there's no missing task DagScheduler can get.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-23811) Same tasks' FetchFailed event comes before Success will cause child stage never succeed

2018-03-28 Thread Li Yuanjian (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23811?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Li Yuanjian updated SPARK-23811:

Attachment: 1.png

> Same tasks' FetchFailed event comes before Success will cause child stage 
> never succeed
> ---
>
> Key: SPARK-23811
> URL: https://issues.apache.org/jira/browse/SPARK-23811
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.0, 2.3.0
>Reporter: Li Yuanjian
>Priority: Major
> Attachments: 1.png
>
>
> This is a bug caused by abnormal scenario describe below:
>  # ShuffleMapTask 1.0 running, this task will fetch data from ExecutorA
>  # ExecutorA Lost, trigger `mapOutputTracker.removeOutputsOnExecutor(execId)` 
> , shuffleStatus changed.
>  # Speculative ShuffleMapTask 1.1 start, got a FetchFailed immediately.
>  # ShuffleMapTask 1 is the last task of its stage, so this stage will never 
> succeed because of there's no missing task DagScheduler can get.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-23811) Same tasks' FetchFailed event comes before Success will cause child stage never succeed

2018-03-28 Thread Li Yuanjian (JIRA)
Li Yuanjian created SPARK-23811:
---

 Summary: Same tasks' FetchFailed event comes before Success will 
cause child stage never succeed
 Key: SPARK-23811
 URL: https://issues.apache.org/jira/browse/SPARK-23811
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 2.3.0, 2.2.0
Reporter: Li Yuanjian


This is a bug caused by abnormal scenario describe below:
 # ShuffleMapTask 1.0 running, this task will fetch data from ExecutorA
 # ExecutorA Lost, trigger `mapOutputTracker.removeOutputsOnExecutor(execId)` , 
shuffleStatus changed.
 # Speculative ShuffleMapTask 1.1 start, got a FetchFailed immediately.
 # ShuffleMapTask 1 is the last task of its stage, so this stage will never 
succeed because of there's no missing task DagScheduler can get.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-23533) Add support for changing ContinuousDataReader's startOffset

2018-02-27 Thread Li Yuanjian (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23533?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Li Yuanjian updated SPARK-23533:

Summary: Add support for changing ContinuousDataReader's startOffset  (was: 
Add support for changing ContinousDataReader's startOffset)

> Add support for changing ContinuousDataReader's startOffset
> ---
>
> Key: SPARK-23533
> URL: https://issues.apache.org/jira/browse/SPARK-23533
> Project: Spark
>  Issue Type: Task
>  Components: Structured Streaming
>Affects Versions: 2.3.0
>Reporter: Li Yuanjian
>Priority: Major
>
> As discussion in [https://github.com/apache/spark/pull/20675], we need add a 
> new interface `ContinuousDataReaderFactory` to support the requirements of 
> setting start offset in Continuous Processing.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-23533) Add support for changing ContinousDataReader's startOffset

2018-02-27 Thread Li Yuanjian (JIRA)
Li Yuanjian created SPARK-23533:
---

 Summary: Add support for changing ContinousDataReader's startOffset
 Key: SPARK-23533
 URL: https://issues.apache.org/jira/browse/SPARK-23533
 Project: Spark
  Issue Type: Task
  Components: Structured Streaming
Affects Versions: 2.3.0
Reporter: Li Yuanjian


As discussion in [https://github.com/apache/spark/pull/20675], we need add a 
new interface `ContinuousDataReaderFactory` to support the requirements of 
setting start offset in Continuous Processing.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-22956) Union Stream Failover Cause `IllegalStateException`

2018-01-04 Thread Li Yuanjian (JIRA)
Li Yuanjian created SPARK-22956:
---

 Summary: Union Stream Failover Cause `IllegalStateException`
 Key: SPARK-22956
 URL: https://issues.apache.org/jira/browse/SPARK-22956
 Project: Spark
  Issue Type: Bug
  Components: Structured Streaming
Affects Versions: 2.1.0
Reporter: Li Yuanjian


When we union 2 streams from kafka or other sources, while one of them have no 
continues data coming and in the same time task restart, this will cause an 
`IllegalStateException`. This mainly cause because the code in 
[MicroBatchExecution|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala#L190]
  , while one stream has no continues data, its comittedOffset same with 
availableOffset during `populateStartOffsets`, and `currentPartitionOffsets` 
not properly handled in KafkaSource. Also, maybe we should also consider this 
scenario in other Source.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-2926) Add MR-style (merge-sort) SortShuffleReader for sort-based shuffle

2017-12-13 Thread Li Yuanjian (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-2926?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Li Yuanjian updated SPARK-2926:
---
Attachment: Spark Shuffle Test Report on Spark2.x.pdf

[~jerryshao] Hi saisai, thanks for your advise, I added a test report according 
to your suggestion. As described in the report, I only compare two shuffle mode 
in 'sort-by-key' workload because other test workloads shared same code paths 
in POC implementation(SortShuffleWriter with BlockStoreShuffleReader).
Also add a config( [code 
link|https://github.com/apache/spark/pull/19745/commits/fe9394eadf8ea51af2b2cb41b5b42981fa600752]
 ) just to force shutting down SerializedShuffle in 'sort-by-key' workload, 
otherwise both of master and POC use the SerializedShuffle.
For sort-by-key work around after closing Serialized Shuffle, the POC version 
can brings 1.44x faster than current master, although map side stage 1.16x 
slower, but reducer stage has 9.4x boosting.

> Add MR-style (merge-sort) SortShuffleReader for sort-based shuffle
> --
>
> Key: SPARK-2926
> URL: https://issues.apache.org/jira/browse/SPARK-2926
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle
>Affects Versions: 1.1.0
>Reporter: Saisai Shao
>Assignee: Saisai Shao
> Attachments: SortBasedShuffleRead.pdf, SortBasedShuffleReader on 
> Spark 2.x.pdf, Spark Shuffle Test Report on Spark2.x.pdf, Spark Shuffle Test 
> Report(contd).pdf, Spark Shuffle Test Report.pdf
>
>
> Currently Spark has already integrated sort-based shuffle write, which 
> greatly improve the IO performance and reduce the memory consumption when 
> reducer number is very large. But for the reducer side, it still adopts the 
> implementation of hash-based shuffle reader, which neglects the ordering 
> attributes of map output data in some situations.
> Here we propose a MR style sort-merge like shuffle reader for sort-based 
> shuffle to better improve the performance of sort-based shuffle.
> Working in progress code and performance test report will be posted later 
> when some unit test bugs are fixed.
> Any comments would be greatly appreciated. 
> Thanks a lot.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-22753) Get rid of dataSource.writeAndRead

2017-12-11 Thread Li Yuanjian (JIRA)
Li Yuanjian created SPARK-22753:
---

 Summary: Get rid of dataSource.writeAndRead
 Key: SPARK-22753
 URL: https://issues.apache.org/jira/browse/SPARK-22753
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.1.0
Reporter: Li Yuanjian
Priority: Minor


Code clean work for getting rid of dataSource.writeAndRead. 
As the discussion in https://github.com/apache/spark/pull/16481 and 
https://github.com/apache/spark/pull/18975#discussion_r155454606
Currently the BaseRelation returned by `dataSource.writeAndRead` only used in 
`CreateDataSourceTableAsSelect`, planForWriting and writeAndRead has some 
common code paths. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-2926) Add MR-style (merge-sort) SortShuffleReader for sort-based shuffle

2017-11-24 Thread Li Yuanjian (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2926?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265109#comment-16265109
 ] 

Li Yuanjian commented on SPARK-2926:


Yes, only the reduce stage. You're right, I shouldn't only pay attention to the 
final stage. I rearrange all messages of screenshot below:

||test name||map stage shuffle write||reduce stage shuffle read||map stage 
duration||reduce stage duration||total job time||code base||
|Test Round1|654.4 MB|802.6MB|3.6min|2.0min|5.5min|master|
|Test Round1|654.4 MB|714.0MB|3.4min|9s|3.5min|SPARK-2926|
|Test Round2: Add more pressure for SortShuffleReader by 
coalesce|654.4MB|654.4MB|2.6min|20min|22min|master|
|Test Round2: Add more pressure for SortShuffleReader by 
coalesce|654.4MB|654.4MB|3.7min|1.4min|5.1min|SPARK-2926|
|Test Round3: Test file spill scenario of sort shuffle 
reader|142.6MB|142.6MB|26s|16min|16min|master|
|Test Round3: Test file spill scenario of sort shuffle 
reader|142.6MB|142.6MB|21s|13s|34s|SPARK-2926|
|Test Round3: Test file spill scenario of sort shuffle 
reader|142.6MB|142.6MB|22s|25s|47s|SPARK-2926(code change for force spill to 
disk)|
|Test Round3: Test file spill scenario of sort shuffle 
reader|142.6MB|142.6MB|22s|29s|51s|SPARK-2926(code change for force spill to 
disk)|


> Add MR-style (merge-sort) SortShuffleReader for sort-based shuffle
> --
>
> Key: SPARK-2926
> URL: https://issues.apache.org/jira/browse/SPARK-2926
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle
>Affects Versions: 1.1.0
>Reporter: Saisai Shao
>Assignee: Saisai Shao
> Attachments: SortBasedShuffleRead.pdf, SortBasedShuffleReader on 
> Spark 2.x.pdf, Spark Shuffle Test Report(contd).pdf, Spark Shuffle Test 
> Report.pdf
>
>
> Currently Spark has already integrated sort-based shuffle write, which 
> greatly improve the IO performance and reduce the memory consumption when 
> reducer number is very large. But for the reducer side, it still adopts the 
> implementation of hash-based shuffle reader, which neglects the ordering 
> attributes of map output data in some situations.
> Here we propose a MR style sort-merge like shuffle reader for sort-based 
> shuffle to better improve the performance of sort-based shuffle.
> Working in progress code and performance test report will be posted later 
> when some unit test bugs are fixed.
> Any comments would be greatly appreciated. 
> Thanks a lot.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-2926) Add MR-style (merge-sort) SortShuffleReader for sort-based shuffle

2017-11-24 Thread Li Yuanjian (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2926?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265030#comment-16265030
 ] 

Li Yuanjian commented on SPARK-2926:


[~jerryshao], thanks a lot for your advise and reply.
{quote}
would you please use spark-perf's micro benchmark 
(https://github.com/databricks/spark-perf) to verify again with same workload 
as mentioned in original test report?
{quote}
Sure, I'll verify this again ASAP.
{quote}
Theoretically this solution cannot get 12x-30x boosting according to my test
{quote}
Firstly I also had question on this, I attached all the screenshot in the pdf. 
The 12x boosting happened in both scenario of reducer task number is 1 and 100. 
The duration of this stage reduce from 2min to 9s(13x) while reducer task 
number is 1 and reduce from 20min to 1.4min while the number is 100. The 30x 
boosting happened after I add more data pressure for reducer task.
{quote}
Can you please explain the key difference and the reason of such boosting?
{quote}
I think the key difference mainly comes from this 2 points:
1. Like saisai said, BlockStoreShuffleReader use `ExternalSorter` deal with the 
reduce work, each record should do the compare work, while SortShuffleReader is 
more cpu friendly, it collect all shuffle map result(both data in memory and 
data spilled to disk) and sort them by merging sort(each partition has been 
sorted in map side).
2. The obvious cut down of peak memory used in reduce task, this will save gc 
time during sorting.

> Add MR-style (merge-sort) SortShuffleReader for sort-based shuffle
> --
>
> Key: SPARK-2926
> URL: https://issues.apache.org/jira/browse/SPARK-2926
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle
>Affects Versions: 1.1.0
>Reporter: Saisai Shao
>Assignee: Saisai Shao
> Attachments: SortBasedShuffleRead.pdf, SortBasedShuffleReader on 
> Spark 2.x.pdf, Spark Shuffle Test Report(contd).pdf, Spark Shuffle Test 
> Report.pdf
>
>
> Currently Spark has already integrated sort-based shuffle write, which 
> greatly improve the IO performance and reduce the memory consumption when 
> reducer number is very large. But for the reducer side, it still adopts the 
> implementation of hash-based shuffle reader, which neglects the ordering 
> attributes of map output data in some situations.
> Here we propose a MR style sort-merge like shuffle reader for sort-based 
> shuffle to better improve the performance of sort-based shuffle.
> Working in progress code and performance test report will be posted later 
> when some unit test bugs are fixed.
> Any comments would be greatly appreciated. 
> Thanks a lot.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-22546) Allow users to update the dataType of a column

2017-11-17 Thread Li Yuanjian (JIRA)
Li Yuanjian created SPARK-22546:
---

 Summary: Allow users to update the dataType of a column
 Key: SPARK-22546
 URL: https://issues.apache.org/jira/browse/SPARK-22546
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.1.0
Reporter: Li Yuanjian


[SPARK-17910|https://issues.apache.org/jira/browse/SPARK-17910] supported user 
to change comment of column, the patch also left TODO for other metadata. Here 
just implement the dataType changing requirement, others like renaming column 
and changing position may have different considerations in datasource table and 
hive table, maybe need more discussion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-2926) Add MR-style (merge-sort) SortShuffleReader for sort-based shuffle

2017-11-14 Thread Li Yuanjian (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2926?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16251457#comment-16251457
 ] 

Li Yuanjian commented on SPARK-2926:


I just giving a preview PR above, I'll collect more suggestions about this and 
maybe raise a SPIP vote later.

> Add MR-style (merge-sort) SortShuffleReader for sort-based shuffle
> --
>
> Key: SPARK-2926
> URL: https://issues.apache.org/jira/browse/SPARK-2926
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle
>Affects Versions: 1.1.0
>Reporter: Saisai Shao
>Assignee: Saisai Shao
> Attachments: SortBasedShuffleRead.pdf, SortBasedShuffleReader on 
> Spark 2.x.pdf, Spark Shuffle Test Report(contd).pdf, Spark Shuffle Test 
> Report.pdf
>
>
> Currently Spark has already integrated sort-based shuffle write, which 
> greatly improve the IO performance and reduce the memory consumption when 
> reducer number is very large. But for the reducer side, it still adopts the 
> implementation of hash-based shuffle reader, which neglects the ordering 
> attributes of map output data in some situations.
> Here we propose a MR style sort-merge like shuffle reader for sort-based 
> shuffle to better improve the performance of sort-based shuffle.
> Working in progress code and performance test report will be posted later 
> when some unit test bugs are fixed.
> Any comments would be greatly appreciated. 
> Thanks a lot.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Issue Comment Deleted] (SPARK-2926) Add MR-style (merge-sort) SortShuffleReader for sort-based shuffle

2017-11-14 Thread Li Yuanjian (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-2926?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Li Yuanjian updated SPARK-2926:
---
Comment: was deleted

(was: The follow up work for SortShuffleReader in current master branch, detail 
test cases and benchmark description.)

> Add MR-style (merge-sort) SortShuffleReader for sort-based shuffle
> --
>
> Key: SPARK-2926
> URL: https://issues.apache.org/jira/browse/SPARK-2926
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle
>Affects Versions: 1.1.0
>Reporter: Saisai Shao
>Assignee: Saisai Shao
> Attachments: SortBasedShuffleRead.pdf, SortBasedShuffleReader on 
> Spark 2.x.pdf, Spark Shuffle Test Report(contd).pdf, Spark Shuffle Test 
> Report.pdf
>
>
> Currently Spark has already integrated sort-based shuffle write, which 
> greatly improve the IO performance and reduce the memory consumption when 
> reducer number is very large. But for the reducer side, it still adopts the 
> implementation of hash-based shuffle reader, which neglects the ordering 
> attributes of map output data in some situations.
> Here we propose a MR style sort-merge like shuffle reader for sort-based 
> shuffle to better improve the performance of sort-based shuffle.
> Working in progress code and performance test report will be posted later 
> when some unit test bugs are fixed.
> Any comments would be greatly appreciated. 
> Thanks a lot.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-2926) Add MR-style (merge-sort) SortShuffleReader for sort-based shuffle

2017-11-14 Thread Li Yuanjian (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-2926?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Li Yuanjian updated SPARK-2926:
---
Attachment: SortBasedShuffleReader on Spark 2.x.pdf

The follow up work for SortShuffleReader in current master branch, detail test 
cases and benchmark description.

> Add MR-style (merge-sort) SortShuffleReader for sort-based shuffle
> --
>
> Key: SPARK-2926
> URL: https://issues.apache.org/jira/browse/SPARK-2926
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle
>Affects Versions: 1.1.0
>Reporter: Saisai Shao
>Assignee: Saisai Shao
> Attachments: SortBasedShuffleRead.pdf, SortBasedShuffleReader on 
> Spark 2.x.pdf, Spark Shuffle Test Report(contd).pdf, Spark Shuffle Test 
> Report.pdf
>
>
> Currently Spark has already integrated sort-based shuffle write, which 
> greatly improve the IO performance and reduce the memory consumption when 
> reducer number is very large. But for the reducer side, it still adopts the 
> implementation of hash-based shuffle reader, which neglects the ordering 
> attributes of map output data in some situations.
> Here we propose a MR style sort-merge like shuffle reader for sort-based 
> shuffle to better improve the performance of sort-based shuffle.
> Working in progress code and performance test report will be posted later 
> when some unit test bugs are fixed.
> Any comments would be greatly appreciated. 
> Thanks a lot.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-2926) Add MR-style (merge-sort) SortShuffleReader for sort-based shuffle

2017-11-14 Thread Li Yuanjian (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2926?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16251398#comment-16251398
 ] 

Li Yuanjian edited comment on SPARK-2926 at 11/14/17 1:54 PM:
--

During our work of migrating some old Hadoop job to Spark, I noticed this JIRA 
and the code based on spark 1.x.

I re-implemented the old PR based on Spark 2.1 and current master branch. After 
produced some scenario and ran some benchmark tests, I found that this shuffle 
mode can bring {color:red}12x~30x boosting in task duration and reduce peak 
execution memory to 1/12 ~ 1/50{color} vs current master version(see detail 
screenshot and test data in attatched pdf), especially the memory reducing, in 
this shuffle mode Spark can support more data size in less memory usage. The 
detail doc attached in this jira named "SortShuffleReader on Spark 2.x.pdf".

I know that DataSet API will have better optimization and performance, but RDD 
API may still useful for flexible control and old Spark/Hadoop jobs. For the 
better performance in ordering cases and more cost-effective memory usage, 
maybe this PR is still worth to merge in to master.

I'll sort out current code base and give a PR soon. Any comments and trying out 
would be greatly appreciated.


was (Author: xuanyuan):
During our work of migrating some old Hadoop job to Spark, I noticed this JIRA 
and the code based on spark 1.x.

I re-implemented the old PR based on Spark 2.1 and current master branch. After 
produced some scenario and ran some benchmark tests, I found that this shuffle 
mode can bring {color:red}12x~30x boosting in task duration and reduce peak 
execution memory to 1/12 ~ 1/50{color}(see detail screenshot and test data in 
attatched pdf) vs current master version, especially the memory reducing, in 
this shuffle mode Spark can support more data size in less memory usage. The 
detail doc attached in this jira named "SortShuffleReader on Spark 2.x.pdf".

I know that DataSet API will have better optimization and performance, but RDD 
API may still useful for flexible control and old Spark/Hadoop jobs. For the 
better performance in ordering cases and more cost-effective memory usage, 
maybe this PR is still worth to merge in to master.

I'll sort out current code base and give a PR soon. Any comments and trying out 
would be greatly appreciated.

> Add MR-style (merge-sort) SortShuffleReader for sort-based shuffle
> --
>
> Key: SPARK-2926
> URL: https://issues.apache.org/jira/browse/SPARK-2926
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle
>Affects Versions: 1.1.0
>Reporter: Saisai Shao
>Assignee: Saisai Shao
> Attachments: SortBasedShuffleRead.pdf, Spark Shuffle Test 
> Report(contd).pdf, Spark Shuffle Test Report.pdf
>
>
> Currently Spark has already integrated sort-based shuffle write, which 
> greatly improve the IO performance and reduce the memory consumption when 
> reducer number is very large. But for the reducer side, it still adopts the 
> implementation of hash-based shuffle reader, which neglects the ordering 
> attributes of map output data in some situations.
> Here we propose a MR style sort-merge like shuffle reader for sort-based 
> shuffle to better improve the performance of sort-based shuffle.
> Working in progress code and performance test report will be posted later 
> when some unit test bugs are fixed.
> Any comments would be greatly appreciated. 
> Thanks a lot.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-2926) Add MR-style (merge-sort) SortShuffleReader for sort-based shuffle

2017-11-14 Thread Li Yuanjian (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2926?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16251398#comment-16251398
 ] 

Li Yuanjian edited comment on SPARK-2926 at 11/14/17 1:53 PM:
--

During our work of migrating some old Hadoop job to Spark, I noticed this JIRA 
and the code based on spark 1.x.

I re-implemented the old PR based on Spark 2.1 and current master branch. After 
produced some scenario and ran some benchmark tests, I found that this shuffle 
mode can bring {color:red}12x~30x boosting in task duration and reduce peak 
execution memory to 1/12 ~ 1/50{color}(see detail screenshot and test data in 
attatched pdf) vs current master version, especially the memory reducing, in 
this shuffle mode Spark can support more data size in less memory usage. The 
detail doc attached in this jira named "SortShuffleReader on Spark 2.x.pdf".

I know that DataSet API will have better optimization and performance, but RDD 
API may still useful for flexible control and old Spark/Hadoop jobs. For the 
better performance in ordering cases and more cost-effective memory usage, 
maybe this PR is still worth to merge in to master.

I'll sort out current code base and give a PR soon. Any comments and trying out 
would be greatly appreciated.


was (Author: xuanyuan):
During our work of migrating some old Hadoop job to Spark, I noticed this JIRA 
and the code based on spark 1.x.

I re-implemented the old PR based on Spark 2.1 and current master branch. After 
produced some scenario and ran some benchmark tests, I found that this shuffle 
mode can bring {color:red}12x~30x boosting in task duration and reduce peak 
execution memory to 1/12 ~ 1/50{color} vs current master version, especially 
the memory reducing, in this shuffle mode Spark can support more data size in 
less memory usage. The detail doc attached in this jira named 
"SortShuffleReader on Spark 2.x".

I know that DataSet API will have better optimization and performance, but RDD 
API may still useful for flexible control and old Spark/Hadoop jobs. For the 
better performance in ordering cases and more cost-effective memory usage, 
maybe this PR is still worth to merge in to master.

I'll sort out current code base and give a PR soon. Any comments and trying out 
would be greatly appreciated.

> Add MR-style (merge-sort) SortShuffleReader for sort-based shuffle
> --
>
> Key: SPARK-2926
> URL: https://issues.apache.org/jira/browse/SPARK-2926
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle
>Affects Versions: 1.1.0
>Reporter: Saisai Shao
>Assignee: Saisai Shao
> Attachments: SortBasedShuffleRead.pdf, Spark Shuffle Test 
> Report(contd).pdf, Spark Shuffle Test Report.pdf
>
>
> Currently Spark has already integrated sort-based shuffle write, which 
> greatly improve the IO performance and reduce the memory consumption when 
> reducer number is very large. But for the reducer side, it still adopts the 
> implementation of hash-based shuffle reader, which neglects the ordering 
> attributes of map output data in some situations.
> Here we propose a MR style sort-merge like shuffle reader for sort-based 
> shuffle to better improve the performance of sort-based shuffle.
> Working in progress code and performance test report will be posted later 
> when some unit test bugs are fixed.
> Any comments would be greatly appreciated. 
> Thanks a lot.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-2926) Add MR-style (merge-sort) SortShuffleReader for sort-based shuffle

2017-11-14 Thread Li Yuanjian (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2926?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16251398#comment-16251398
 ] 

Li Yuanjian commented on SPARK-2926:


During our work of migrating some old Hadoop job to Spark, I noticed this JIRA 
and the code based on spark 1.x.

I re-implemented the old PR based on Spark 2.1 and current master branch. After 
produced some scenario and ran some benchmark tests, I found that this shuffle 
mode can bring {color:red}12x~30x boosting in task duration and reduce peak 
execution memory to 1/12 ~ 1/50{color} vs current master version, especially 
the memory reducing, in this shuffle mode Spark can support more data size in 
less memory usage. The detail doc attached in this jira named 
"SortShuffleReader on Spark 2.x".

I know that DataSet API will have better optimization and performance, but RDD 
API may still useful for flexible control and old Spark/Hadoop jobs. For the 
better performance in ordering cases and more cost-effective memory usage, 
maybe this PR is still worth to merge in to master.

I'll sort out current code base and give a PR soon. Any comments and trying out 
would be greatly appreciated.

> Add MR-style (merge-sort) SortShuffleReader for sort-based shuffle
> --
>
> Key: SPARK-2926
> URL: https://issues.apache.org/jira/browse/SPARK-2926
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle
>Affects Versions: 1.1.0
>Reporter: Saisai Shao
>Assignee: Saisai Shao
> Attachments: SortBasedShuffleRead.pdf, Spark Shuffle Test 
> Report(contd).pdf, Spark Shuffle Test Report.pdf
>
>
> Currently Spark has already integrated sort-based shuffle write, which 
> greatly improve the IO performance and reduce the memory consumption when 
> reducer number is very large. But for the reducer side, it still adopts the 
> implementation of hash-based shuffle reader, which neglects the ordering 
> attributes of map output data in some situations.
> Here we propose a MR style sort-merge like shuffle reader for sort-based 
> shuffle to better improve the performance of sort-based shuffle.
> Working in progress code and performance test report will be posted later 
> when some unit test bugs are fixed.
> Any comments would be greatly appreciated. 
> Thanks a lot.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20928) SPIP: Continuous Processing Mode for Structured Streaming

2017-11-09 Thread Li Yuanjian (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245556#comment-16245556
 ] 

Li Yuanjian commented on SPARK-20928:
-

Our team discuss on the design sketch in detail, we have some ideas and 
questions take down below.
1. Will the Window Operation support in the Continuous Processing Mode? 
Even if we only consider narrow dependencies currently  like the design sketch 
described, the exactly-once assurance may not be accomplished based on current 
implementation of window and watermark.
2. Should the EpochIDs aligned in the scenario of not map-only?
{quote}
The design can also work with blocking operators, although it’d require the 
blocking operators to ensure epoch markers from all the partitions have been 
received by the operator before moving forward to commit.
{quote}
is the `blocking operators` means 'operator need shuffle'? We think that only 
the operator has ordering relation(like window\mapState\sortByKey) need the 
EpochIDs aligned, others(like groupBy) doesn't.
3. Also the scenario of many to one(like shuffle and window), should we use a 
new EpochID in shuffle read stage and window slide out trigger, or use the 
original EpochIDs batch?

> SPIP: Continuous Processing Mode for Structured Streaming
> -
>
> Key: SPARK-20928
> URL: https://issues.apache.org/jira/browse/SPARK-20928
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.2.0
>Reporter: Michael Armbrust
>  Labels: SPIP
> Attachments: Continuous Processing in Structured Streaming Design 
> Sketch.pdf
>
>
> Given the current Source API, the minimum possible latency for any record is 
> bounded by the amount of time that it takes to launch a task.  This 
> limitation is a result of the fact that {{getBatch}} requires us to know both 
> the starting and the ending offset, before any tasks are launched.  In the 
> worst case, the end-to-end latency is actually closer to the average batch 
> time + task launching time.
> For applications where latency is more important than exactly-once output 
> however, it would be useful if processing could happen continuously.  This 
> would allow us to achieve fully pipelined reading and writing from sources 
> such as Kafka.  This kind of architecture would make it possible to process 
> records with end-to-end latencies on the order of 1 ms, rather than the 
> 10-100ms that is possible today.
> One possible architecture here would be to change the Source API to look like 
> the following rough sketch:
> {code}
>   trait Epoch {
> def data: DataFrame
> /** The exclusive starting position for `data`. */
> def startOffset: Offset
> /** The inclusive ending position for `data`.  Incrementally updated 
> during processing, but not complete until execution of the query plan in 
> `data` is finished. */
> def endOffset: Offset
>   }
>   def getBatch(startOffset: Option[Offset], endOffset: Option[Offset], 
> limits: Limits): Epoch
> {code}
> The above would allow us to build an alternative implementation of 
> {{StreamExecution}} that processes continuously with much lower latency and 
> only stops processing when needing to reconfigure the stream (either due to a 
> failure or a user requested change in parallelism.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22074) Task killed by other attempt task should not be resubmitted

2017-09-27 Thread Li Yuanjian (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16182095#comment-16182095
 ] 

Li Yuanjian commented on SPARK-22074:
-

Yes, that's right.

> Task killed by other attempt task should not be resubmitted
> ---
>
> Key: SPARK-22074
> URL: https://issues.apache.org/jira/browse/SPARK-22074
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.0, 2.2.0
>Reporter: Li Yuanjian
>
> When a task killed by other task attempt, the task still resubmitted while 
> its executor lost. There is a certain probability caused the stage hanging 
> forever because of the unnecessary resubmit(see the scenario description 
> below). Although the patch https://issues.apache.org/jira/browse/SPARK-13931 
> can resolve the hanging problem(thx [~GavinGavinNo1] :) ), but the 
> unnecessary resubmit should abandon.
> Detail scenario description:
> 1. A ShuffleMapStage has many tasks, some of them finished successfully
> 2. An Executor Lost happened, this will trigger a new TaskSet resubmitted, 
> includes all missing partitions.
> 3. Before the resubmitted TaskSet completed, another executor which only 
> include the task killed by other attempt lost, trigger the Resubmitted Event, 
> current stage's pendingPartitions is not empty.
> 4. Resubmitted TaskSet end, shuffleMapStage.isAvailable == true, but 
> pendingPartitions is not empty, never step into submitWaitingChildStages.
> Leave the key logs of this scenario below:
> {noformat}
> 393332:17/09/11 13:45:24 [dag-scheduler-event-loop] INFO DAGScheduler: 
> Submitting 120 missing tasks from ShuffleMapStage 1046 
> (MapPartitionsRDD[5321] at rdd at AFDEntry.scala:116)
> 39:17/09/11 13:45:24 [dag-scheduler-event-loop] INFO 
> YarnClusterScheduler: Adding task set 1046.0 with 120 tasks
> 408766:17/09/11 13:46:25 [dispatcher-event-loop-5] INFO TaskSetManager: 
> Starting task 66.0 in stage 1046.0 (TID 110761, hidden-baidu-host.baidu.com, 
> executor 15, partition 66, PROCESS_LOCAL, 6237 bytes)
> [1] Executor 15 lost, task 66.0 and 90.0 on it
> 410532:17/09/11 13:46:32 [dispatcher-event-loop-47] INFO 
> YarnSchedulerBackend$YarnDriverEndpoint: Disabling executor 15.
> 410900:17/09/11 13:46:33 [dispatcher-event-loop-34] INFO TaskSetManager: 
> Starting task 66.1 in stage 1046.0 (TID 111400, hidden-baidu-host.baidu.com, 
> executor 70, partition 66, PROCESS_LOCAL, 6237 bytes)
> [2] Task 66.0 killed by 66.1
> 411315:17/09/11 13:46:37 [task-result-getter-2] INFO TaskSetManager: Killing 
> attempt 0 for task 66.0 in stage 1046.0 (TID 110761) on 
> hidden-baidu-host.baidu.com as the attempt 1 succeeded on 
> hidden-baidu-host.baidu.com
> 411316:17/09/11 13:46:37 [task-result-getter-2] INFO TaskSetManager: Finished 
> task 66.1 in stage 1046.0 (TID 111400) in 3545 ms on 
> hidden-baidu-host.baidu.com (executor 70) (115/120)
> [3] Executor 7 lost, task 0.0 72.0 7.0 on it
> 411390:17/09/11 13:46:37 [dispatcher-event-loop-24] INFO 
> YarnSchedulerBackend$YarnDriverEndpoint: Disabling executor 7.
> 416014:17/09/11 13:46:59 [dag-scheduler-event-loop] INFO DAGScheduler: 
> ShuffleMapStage 1046 (rdd at AFDEntry.scala:116) finished in 94.577 s
> [4] ShuffleMapStage 1046.0 finished, missing partition trigger resubmitted 
> 1046.1
> 416019:17/09/1 13:46:59 [dag-scheduler-event- oop] INFO DAGScheduler: 
> Resubmitting ShuffleMapStage 1046 (rdd at AFDEntry.scala:116) because some of 
> its tasks had failed: 0, 72, 79
> 416020:17/09/11 13:46:59 [dag-scheduler-event-loop] INFO DAGScheduler: 
> Submitting ShuffleMapStage 1046 (MapPartitionsRDD[5321] at rdd at 
> AFDEntry.scala:116), which has no missing parents
> 416030:17/09/11 13:46:59 [dag-scheduler-event-loop] INFO DAGScheduler: 
> Submitting 3 missing tasks from ShuffleMapStage 1046 (MapPartitionsRDD[5321] 
> at rdd at AFDEntry.scala:116)
> 416032:17/09/11 13:46:59 [dag-scheduler-event-loop] INFO 
> YarnClusterScheduler: Adding task set 1046.1 with 3 tasks
> 416034:17/09/11 13:46:59 [dispatcher-event-loop-21] INFO TaskSetManager: 
> Starting task 0.0 in stage 1046.1 (TID 112788, hidden-baidu-host.baidu.com, 
> executor 37, partition 0, PROCESS_LOCAL, 6237 bytes)
> 416037:17/09/11 13:46:59 [dispatcher-event-loop-23] INFO TaskSetManager: 
> Starting task 1.0 in stage 1046.1 (TID 112789, 
> yq01-inf-nmg01-spark03-20160817113538.yq01.baidu.com, executor 69, partition 
> 72, PROCESS_LOCAL, 6237 bytes)
> 416039:17/09/11 13:46:59 [dispatcher-event-loop-23] INFO TaskSetManager: 
> Starting task 2.0 in stage 1046.1 (TID 112790, hidden-baidu-host.baidu.com, 
> executor 26, partition 79, PROCESS_LOCAL, 6237 bytes)
> [5] ShuffleMapStage 1046.1 still running, the attempted task killed by other 
> trigger the Resubmitted event
> 416646:17/09/11 13:47:01 [dispatcher-event-loop-26] WARN 

[jira] [Comment Edited] (SPARK-22074) Task killed by other attempt task should not be resubmitted

2017-09-26 Thread Li Yuanjian (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16181943#comment-16181943
 ] 

Li Yuanjian edited comment on SPARK-22074 at 9/27/17 4:46 AM:
--

Hi [~jerryshao] saisai, the 66.0 resubmitted because of its executor lost 
during 1046.1 running. I also reproduce this in the 
UT(https://github.com/apache/spark/pull/19287/files#diff-8425e96a6c100b5f368b8e520ad80068R748)
 added in my patch and add detailed scenario description in comment, it will 
fail without the changes in this PR and will pass conversely. Could you help me 
check the UT recreate the scenario right? Thanks a lot. :)


was (Author: xuanyuan):
Hi [~jerryshao] saisai, the 66.0 resubmitted because of its executor lost 
during 1046.1 running. I also reproduce this in the UT added in my patch and 
add detailed scenario description in comment, it will fail without the changes 
in this PR and will pass conversely. Could you help me check the UT recreate 
the scenario right? Thanks a lot. :)

> Task killed by other attempt task should not be resubmitted
> ---
>
> Key: SPARK-22074
> URL: https://issues.apache.org/jira/browse/SPARK-22074
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.0, 2.2.0
>Reporter: Li Yuanjian
>
> When a task killed by other task attempt, the task still resubmitted while 
> its executor lost. There is a certain probability caused the stage hanging 
> forever because of the unnecessary resubmit(see the scenario description 
> below). Although the patch https://issues.apache.org/jira/browse/SPARK-13931 
> can resolve the hanging problem(thx [~GavinGavinNo1] :) ), but the 
> unnecessary resubmit should abandon.
> Detail scenario description:
> 1. A ShuffleMapStage has many tasks, some of them finished successfully
> 2. An Executor Lost happened, this will trigger a new TaskSet resubmitted, 
> includes all missing partitions.
> 3. Before the resubmitted TaskSet completed, another executor which only 
> include the task killed by other attempt lost, trigger the Resubmitted Event, 
> current stage's pendingPartitions is not empty.
> 4. Resubmitted TaskSet end, shuffleMapStage.isAvailable == true, but 
> pendingPartitions is not empty, never step into submitWaitingChildStages.
> Leave the key logs of this scenario below:
> {noformat}
> 393332:17/09/11 13:45:24 [dag-scheduler-event-loop] INFO DAGScheduler: 
> Submitting 120 missing tasks from ShuffleMapStage 1046 
> (MapPartitionsRDD[5321] at rdd at AFDEntry.scala:116)
> 39:17/09/11 13:45:24 [dag-scheduler-event-loop] INFO 
> YarnClusterScheduler: Adding task set 1046.0 with 120 tasks
> 408766:17/09/11 13:46:25 [dispatcher-event-loop-5] INFO TaskSetManager: 
> Starting task 66.0 in stage 1046.0 (TID 110761, hidden-baidu-host.baidu.com, 
> executor 15, partition 66, PROCESS_LOCAL, 6237 bytes)
> [1] Executor 15 lost, task 66.0 and 90.0 on it
> 410532:17/09/11 13:46:32 [dispatcher-event-loop-47] INFO 
> YarnSchedulerBackend$YarnDriverEndpoint: Disabling executor 15.
> 410900:17/09/11 13:46:33 [dispatcher-event-loop-34] INFO TaskSetManager: 
> Starting task 66.1 in stage 1046.0 (TID 111400, hidden-baidu-host.baidu.com, 
> executor 70, partition 66, PROCESS_LOCAL, 6237 bytes)
> [2] Task 66.0 killed by 66.1
> 411315:17/09/11 13:46:37 [task-result-getter-2] INFO TaskSetManager: Killing 
> attempt 0 for task 66.0 in stage 1046.0 (TID 110761) on 
> hidden-baidu-host.baidu.com as the attempt 1 succeeded on 
> hidden-baidu-host.baidu.com
> 411316:17/09/11 13:46:37 [task-result-getter-2] INFO TaskSetManager: Finished 
> task 66.1 in stage 1046.0 (TID 111400) in 3545 ms on 
> hidden-baidu-host.baidu.com (executor 70) (115/120)
> [3] Executor 7 lost, task 0.0 72.0 7.0 on it
> 411390:17/09/11 13:46:37 [dispatcher-event-loop-24] INFO 
> YarnSchedulerBackend$YarnDriverEndpoint: Disabling executor 7.
> 416014:17/09/11 13:46:59 [dag-scheduler-event-loop] INFO DAGScheduler: 
> ShuffleMapStage 1046 (rdd at AFDEntry.scala:116) finished in 94.577 s
> [4] ShuffleMapStage 1046.0 finished, missing partition trigger resubmitted 
> 1046.1
> 416019:17/09/1 13:46:59 [dag-scheduler-event- oop] INFO DAGScheduler: 
> Resubmitting ShuffleMapStage 1046 (rdd at AFDEntry.scala:116) because some of 
> its tasks had failed: 0, 72, 79
> 416020:17/09/11 13:46:59 [dag-scheduler-event-loop] INFO DAGScheduler: 
> Submitting ShuffleMapStage 1046 (MapPartitionsRDD[5321] at rdd at 
> AFDEntry.scala:116), which has no missing parents
> 416030:17/09/11 13:46:59 [dag-scheduler-event-loop] INFO DAGScheduler: 
> Submitting 3 missing tasks from ShuffleMapStage 1046 (MapPartitionsRDD[5321] 
> at rdd at AFDEntry.scala:116)
> 416032:17/09/11 13:46:59 [dag-scheduler-event-loop] INFO 
> YarnClusterScheduler: Adding task set 1046.1 with 3 

[jira] [Commented] (SPARK-22074) Task killed by other attempt task should not be resubmitted

2017-09-26 Thread Li Yuanjian (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16181943#comment-16181943
 ] 

Li Yuanjian commented on SPARK-22074:
-

Hi [~jerryshao] saisai, the 66.0 resubmitted because of its executor lost 
during 1046.1 running. I also reproduce this in the UT added in my patch and 
add detailed scenario description in comment, it will fail without the changes 
in this PR and will pass conversely. Could you help me check the UT recreate 
the scenario right? Thanks a lot. :)

> Task killed by other attempt task should not be resubmitted
> ---
>
> Key: SPARK-22074
> URL: https://issues.apache.org/jira/browse/SPARK-22074
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.0, 2.2.0
>Reporter: Li Yuanjian
>
> When a task killed by other task attempt, the task still resubmitted while 
> its executor lost. There is a certain probability caused the stage hanging 
> forever because of the unnecessary resubmit(see the scenario description 
> below). Although the patch https://issues.apache.org/jira/browse/SPARK-13931 
> can resolve the hanging problem(thx [~GavinGavinNo1] :) ), but the 
> unnecessary resubmit should abandon.
> Detail scenario description:
> 1. A ShuffleMapStage has many tasks, some of them finished successfully
> 2. An Executor Lost happened, this will trigger a new TaskSet resubmitted, 
> includes all missing partitions.
> 3. Before the resubmitted TaskSet completed, another executor which only 
> include the task killed by other attempt lost, trigger the Resubmitted Event, 
> current stage's pendingPartitions is not empty.
> 4. Resubmitted TaskSet end, shuffleMapStage.isAvailable == true, but 
> pendingPartitions is not empty, never step into submitWaitingChildStages.
> Leave the key logs of this scenario below:
> {noformat}
> 393332:17/09/11 13:45:24 [dag-scheduler-event-loop] INFO DAGScheduler: 
> Submitting 120 missing tasks from ShuffleMapStage 1046 
> (MapPartitionsRDD[5321] at rdd at AFDEntry.scala:116)
> 39:17/09/11 13:45:24 [dag-scheduler-event-loop] INFO 
> YarnClusterScheduler: Adding task set 1046.0 with 120 tasks
> 408766:17/09/11 13:46:25 [dispatcher-event-loop-5] INFO TaskSetManager: 
> Starting task 66.0 in stage 1046.0 (TID 110761, hidden-baidu-host.baidu.com, 
> executor 15, partition 66, PROCESS_LOCAL, 6237 bytes)
> [1] Executor 15 lost, task 66.0 and 90.0 on it
> 410532:17/09/11 13:46:32 [dispatcher-event-loop-47] INFO 
> YarnSchedulerBackend$YarnDriverEndpoint: Disabling executor 15.
> 410900:17/09/11 13:46:33 [dispatcher-event-loop-34] INFO TaskSetManager: 
> Starting task 66.1 in stage 1046.0 (TID 111400, hidden-baidu-host.baidu.com, 
> executor 70, partition 66, PROCESS_LOCAL, 6237 bytes)
> [2] Task 66.0 killed by 66.1
> 411315:17/09/11 13:46:37 [task-result-getter-2] INFO TaskSetManager: Killing 
> attempt 0 for task 66.0 in stage 1046.0 (TID 110761) on 
> hidden-baidu-host.baidu.com as the attempt 1 succeeded on 
> hidden-baidu-host.baidu.com
> 411316:17/09/11 13:46:37 [task-result-getter-2] INFO TaskSetManager: Finished 
> task 66.1 in stage 1046.0 (TID 111400) in 3545 ms on 
> hidden-baidu-host.baidu.com (executor 70) (115/120)
> [3] Executor 7 lost, task 0.0 72.0 7.0 on it
> 411390:17/09/11 13:46:37 [dispatcher-event-loop-24] INFO 
> YarnSchedulerBackend$YarnDriverEndpoint: Disabling executor 7.
> 416014:17/09/11 13:46:59 [dag-scheduler-event-loop] INFO DAGScheduler: 
> ShuffleMapStage 1046 (rdd at AFDEntry.scala:116) finished in 94.577 s
> [4] ShuffleMapStage 1046.0 finished, missing partition trigger resubmitted 
> 1046.1
> 416019:17/09/1 13:46:59 [dag-scheduler-event- oop] INFO DAGScheduler: 
> Resubmitting ShuffleMapStage 1046 (rdd at AFDEntry.scala:116) because some of 
> its tasks had failed: 0, 72, 79
> 416020:17/09/11 13:46:59 [dag-scheduler-event-loop] INFO DAGScheduler: 
> Submitting ShuffleMapStage 1046 (MapPartitionsRDD[5321] at rdd at 
> AFDEntry.scala:116), which has no missing parents
> 416030:17/09/11 13:46:59 [dag-scheduler-event-loop] INFO DAGScheduler: 
> Submitting 3 missing tasks from ShuffleMapStage 1046 (MapPartitionsRDD[5321] 
> at rdd at AFDEntry.scala:116)
> 416032:17/09/11 13:46:59 [dag-scheduler-event-loop] INFO 
> YarnClusterScheduler: Adding task set 1046.1 with 3 tasks
> 416034:17/09/11 13:46:59 [dispatcher-event-loop-21] INFO TaskSetManager: 
> Starting task 0.0 in stage 1046.1 (TID 112788, hidden-baidu-host.baidu.com, 
> executor 37, partition 0, PROCESS_LOCAL, 6237 bytes)
> 416037:17/09/11 13:46:59 [dispatcher-event-loop-23] INFO TaskSetManager: 
> Starting task 1.0 in stage 1046.1 (TID 112789, 
> yq01-inf-nmg01-spark03-20160817113538.yq01.baidu.com, executor 69, partition 
> 72, PROCESS_LOCAL, 6237 bytes)
> 416039:17/09/11 13:46:59 [dispatcher-event-loop-23] INFO 

[jira] [Commented] (SPARK-22074) Task killed by other attempt task should not be resubmitted

2017-09-26 Thread Li Yuanjian (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16180652#comment-16180652
 ] 

Li Yuanjian commented on SPARK-22074:
-

Hi [~jerryshao], thanks for you comment. 
In my scenario, the 66.0 is truly killed by 66.1, the root case cause 1046.0 
fail to finish is that the resubmitted event of task 66.0(killed by 66.1before) 
reached while 1046.1 running.

> Task killed by other attempt task should not be resubmitted
> ---
>
> Key: SPARK-22074
> URL: https://issues.apache.org/jira/browse/SPARK-22074
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.0, 2.2.0
>Reporter: Li Yuanjian
>
> When a task killed by other task attempt, the task still resubmitted while 
> its executor lost. There is a certain probability caused the stage hanging 
> forever because of the unnecessary resubmit(see the scenario description 
> below). Although the patch https://issues.apache.org/jira/browse/SPARK-13931 
> can resolve the hanging problem(thx [~GavinGavinNo1] :) ), but the 
> unnecessary resubmit should abandon.
> Detail scenario description:
> 1. A ShuffleMapStage has many tasks, some of them finished successfully
> 2. An Executor Lost happened, this will trigger a new TaskSet resubmitted, 
> includes all missing partitions.
> 3. Before the resubmitted TaskSet completed, another executor which only 
> include the task killed by other attempt lost, trigger the Resubmitted Event, 
> current stage's pendingPartitions is not empty.
> 4. Resubmitted TaskSet end, shuffleMapStage.isAvailable == true, but 
> pendingPartitions is not empty, never step into submitWaitingChildStages.
> Leave the key logs of this scenario below:
> {noformat}
> 393332:17/09/11 13:45:24 [dag-scheduler-event-loop] INFO DAGScheduler: 
> Submitting 120 missing tasks from ShuffleMapStage 1046 
> (MapPartitionsRDD[5321] at rdd at AFDEntry.scala:116)
> 39:17/09/11 13:45:24 [dag-scheduler-event-loop] INFO 
> YarnClusterScheduler: Adding task set 1046.0 with 120 tasks
> 408766:17/09/11 13:46:25 [dispatcher-event-loop-5] INFO TaskSetManager: 
> Starting task 66.0 in stage 1046.0 (TID 110761, hidden-baidu-host.baidu.com, 
> executor 15, partition 66, PROCESS_LOCAL, 6237 bytes)
> [1] Executor 15 lost, task 66.0 and 90.0 on it
> 410532:17/09/11 13:46:32 [dispatcher-event-loop-47] INFO 
> YarnSchedulerBackend$YarnDriverEndpoint: Disabling executor 15.
> 410900:17/09/11 13:46:33 [dispatcher-event-loop-34] INFO TaskSetManager: 
> Starting task 66.1 in stage 1046.0 (TID 111400, hidden-baidu-host.baidu.com, 
> executor 70, partition 66, PROCESS_LOCAL, 6237 bytes)
> [2] Task 66.0 killed by 66.1
> 411315:17/09/11 13:46:37 [task-result-getter-2] INFO TaskSetManager: Killing 
> attempt 0 for task 66.0 in stage 1046.0 (TID 110761) on 
> hidden-baidu-host.baidu.com as the attempt 1 succeeded on 
> hidden-baidu-host.baidu.com
> 411316:17/09/11 13:46:37 [task-result-getter-2] INFO TaskSetManager: Finished 
> task 66.1 in stage 1046.0 (TID 111400) in 3545 ms on 
> hidden-baidu-host.baidu.com (executor 70) (115/120)
> [3] Executor 7 lost, task 0.0 72.0 7.0 on it
> 411390:17/09/11 13:46:37 [dispatcher-event-loop-24] INFO 
> YarnSchedulerBackend$YarnDriverEndpoint: Disabling executor 7.
> 416014:17/09/11 13:46:59 [dag-scheduler-event-loop] INFO DAGScheduler: 
> ShuffleMapStage 1046 (rdd at AFDEntry.scala:116) finished in 94.577 s
> [4] ShuffleMapStage 1046.0 finished, missing partition trigger resubmitted 
> 1046.1
> 416019:17/09/1 13:46:59 [dag-scheduler-event- oop] INFO DAGScheduler: 
> Resubmitting ShuffleMapStage 1046 (rdd at AFDEntry.scala:116) because some of 
> its tasks had failed: 0, 72, 79
> 416020:17/09/11 13:46:59 [dag-scheduler-event-loop] INFO DAGScheduler: 
> Submitting ShuffleMapStage 1046 (MapPartitionsRDD[5321] at rdd at 
> AFDEntry.scala:116), which has no missing parents
> 416030:17/09/11 13:46:59 [dag-scheduler-event-loop] INFO DAGScheduler: 
> Submitting 3 missing tasks from ShuffleMapStage 1046 (MapPartitionsRDD[5321] 
> at rdd at AFDEntry.scala:116)
> 416032:17/09/11 13:46:59 [dag-scheduler-event-loop] INFO 
> YarnClusterScheduler: Adding task set 1046.1 with 3 tasks
> 416034:17/09/11 13:46:59 [dispatcher-event-loop-21] INFO TaskSetManager: 
> Starting task 0.0 in stage 1046.1 (TID 112788, hidden-baidu-host.baidu.com, 
> executor 37, partition 0, PROCESS_LOCAL, 6237 bytes)
> 416037:17/09/11 13:46:59 [dispatcher-event-loop-23] INFO TaskSetManager: 
> Starting task 1.0 in stage 1046.1 (TID 112789, 
> yq01-inf-nmg01-spark03-20160817113538.yq01.baidu.com, executor 69, partition 
> 72, PROCESS_LOCAL, 6237 bytes)
> 416039:17/09/11 13:46:59 [dispatcher-event-loop-23] INFO TaskSetManager: 
> Starting task 2.0 in stage 1046.1 (TID 112790, hidden-baidu-host.baidu.com, 
> executor 26, 

[jira] [Updated] (SPARK-22074) Task killed by other attempt task should not be resubmitted

2017-09-19 Thread Li Yuanjian (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22074?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Li Yuanjian updated SPARK-22074:

Description: 
When a task killed by other task attempt, the task still resubmitted while its 
executor lost. There is a certain probability caused the stage hanging forever 
because of the unnecessary resubmit(see the scenario description below). 
Although the patch https://issues.apache.org/jira/browse/SPARK-13931 can 
resolve the hanging problem(thx [~GavinGavinNo1] :) ), but the unnecessary 
resubmit should abandon.


Detail scenario description:
1. A ShuffleMapStage has many tasks, some of them finished successfully
2. An Executor Lost happened, this will trigger a new TaskSet resubmitted, 
includes all missing partitions.
3. Before the resubmitted TaskSet completed, another executor which only 
include the task killed by other attempt lost, trigger the Resubmitted Event, 
current stage's pendingPartitions is not empty.
4. Resubmitted TaskSet end, shuffleMapStage.isAvailable == true, but 
pendingPartitions is not empty, never step into submitWaitingChildStages.

Leave the key logs of this scenario below:
{noformat}
393332:17/09/11 13:45:24 [dag-scheduler-event-loop] INFO DAGScheduler: 
Submitting 120 missing tasks from ShuffleMapStage 1046 (MapPartitionsRDD[5321] 
at rdd at AFDEntry.scala:116)
39:17/09/11 13:45:24 [dag-scheduler-event-loop] INFO YarnClusterScheduler: 
Adding task set 1046.0 with 120 tasks
408766:17/09/11 13:46:25 [dispatcher-event-loop-5] INFO TaskSetManager: 
Starting task 66.0 in stage 1046.0 (TID 110761, hidden-baidu-host.baidu.com, 
executor 15, partition 66, PROCESS_LOCAL, 6237 bytes)

[1] Executor 15 lost, task 66.0 and 90.0 on it

410532:17/09/11 13:46:32 [dispatcher-event-loop-47] INFO 
YarnSchedulerBackend$YarnDriverEndpoint: Disabling executor 15.
410900:17/09/11 13:46:33 [dispatcher-event-loop-34] INFO TaskSetManager: 
Starting task 66.1 in stage 1046.0 (TID 111400, hidden-baidu-host.baidu.com, 
executor 70, partition 66, PROCESS_LOCAL, 6237 bytes)

[2] Task 66.0 killed by 66.1

411315:17/09/11 13:46:37 [task-result-getter-2] INFO TaskSetManager: Killing 
attempt 0 for task 66.0 in stage 1046.0 (TID 110761) on 
hidden-baidu-host.baidu.com as the attempt 1 succeeded on 
hidden-baidu-host.baidu.com
411316:17/09/11 13:46:37 [task-result-getter-2] INFO TaskSetManager: Finished 
task 66.1 in stage 1046.0 (TID 111400) in 3545 ms on 
hidden-baidu-host.baidu.com (executor 70) (115/120)

[3] Executor 7 lost, task 0.0 72.0 7.0 on it

411390:17/09/11 13:46:37 [dispatcher-event-loop-24] INFO 
YarnSchedulerBackend$YarnDriverEndpoint: Disabling executor 7.
416014:17/09/11 13:46:59 [dag-scheduler-event-loop] INFO DAGScheduler: 
ShuffleMapStage 1046 (rdd at AFDEntry.scala:116) finished in 94.577 s

[4] ShuffleMapStage 1046.0 finished, missing partition trigger resubmitted 
1046.1

416019:17/09/1 13:46:59 [dag-scheduler-event- oop] INFO DAGScheduler: 
Resubmitting ShuffleMapStage 1046 (rdd at AFDEntry.scala:116) because some of 
its tasks had failed: 0, 72, 79
416020:17/09/11 13:46:59 [dag-scheduler-event-loop] INFO DAGScheduler: 
Submitting ShuffleMapStage 1046 (MapPartitionsRDD[5321] at rdd at 
AFDEntry.scala:116), which has no missing parents
416030:17/09/11 13:46:59 [dag-scheduler-event-loop] INFO DAGScheduler: 
Submitting 3 missing tasks from ShuffleMapStage 1046 (MapPartitionsRDD[5321] at 
rdd at AFDEntry.scala:116)
416032:17/09/11 13:46:59 [dag-scheduler-event-loop] INFO YarnClusterScheduler: 
Adding task set 1046.1 with 3 tasks
416034:17/09/11 13:46:59 [dispatcher-event-loop-21] INFO TaskSetManager: 
Starting task 0.0 in stage 1046.1 (TID 112788, hidden-baidu-host.baidu.com, 
executor 37, partition 0, PROCESS_LOCAL, 6237 bytes)
416037:17/09/11 13:46:59 [dispatcher-event-loop-23] INFO TaskSetManager: 
Starting task 1.0 in stage 1046.1 (TID 112789, 
yq01-inf-nmg01-spark03-20160817113538.yq01.baidu.com, executor 69, partition 
72, PROCESS_LOCAL, 6237 bytes)
416039:17/09/11 13:46:59 [dispatcher-event-loop-23] INFO TaskSetManager: 
Starting task 2.0 in stage 1046.1 (TID 112790, hidden-baidu-host.baidu.com, 
executor 26, partition 79, PROCESS_LOCAL, 6237 bytes)

[5] ShuffleMapStage 1046.1 still running, the attempted task killed by other 
trigger the Resubmitted event

416646:17/09/11 13:47:01 [dispatcher-event-loop-26] WARN TaskSetManager: Lost 
task 66.0 in stage 1046.0 (TID 110761, hidden-baidu-host.baidu.com, executor 
15): ExecutorLostFailure (executor 15 exited caused by one of the running 
tasks) Reason: Container marked as failed: 
container_1502719603300_158941_01_104857616 on host: 
hidden-baidu-host.baidu.com. Exit status: -100. Diagnostics: Container released 
on a *lost* node
416647:17/09/11 13:47:01 [dag-scheduler-event-loop] INFO DAGScheduler: 
Resubmitted ShuffleMapTask(1046, 66), so marking it as still running
416648:17/09/11 13:47:01 [dispatcher-event-loop-26] WARN 

[jira] [Created] (SPARK-22074) Task killed by other attempt task should not be resubmitted

2017-09-19 Thread Li Yuanjian (JIRA)
Li Yuanjian created SPARK-22074:
---

 Summary: Task killed by other attempt task should not be 
resubmitted
 Key: SPARK-22074
 URL: https://issues.apache.org/jira/browse/SPARK-22074
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 2.2.0, 2.1.0
Reporter: Li Yuanjian


When a task killed by other task attempt, the task still resubmitted while its 
executor lost. There is a certain probability caused the stage hanging forever 
because of the unnecessary resubmit(see the scenario description below). 
Although the patch https://issues.apache.org/jira/browse/SPARK-13931 can 
resolve the hanging problem, but the unnecessary resubmit should abandon.


Detail scenario description:
1. A ShuffleMapStage has many tasks, some of them finished successfully
2. An Executor Lost happened, this will trigger a new TaskSet resubmitted, 
includes all missing partitions.
3. Before the resubmitted TaskSet completed, another executor which only 
include the task killed by other attempt lost, trigger the Resubmitted Event, 
current stage's pendingPartitions is not empty.
4. Resubmitted TaskSet end, shuffleMapStage.isAvailable == true, but 
pendingPartitions is not empty, never step into submitWaitingChildStages.

Leave the key logs of this scenario below:
{noformat}
393332:17/09/11 13:45:24 [dag-scheduler-event-loop] INFO DAGScheduler: 
Submitting 120 missing tasks from ShuffleMapStage 1046 (MapPartitionsRDD[5321] 
at rdd at AFDEntry.scala:116)
39:17/09/11 13:45:24 [dag-scheduler-event-loop] INFO YarnClusterScheduler: 
Adding task set 1046.0 with 120 tasks
408766:17/09/11 13:46:25 [dispatcher-event-loop-5] INFO TaskSetManager: 
Starting task 66.0 in stage 1046.0 (TID 110761, hidden-baidu-host.baidu.com, 
executor 15, partition 66, PROCESS_LOCAL, 6237 bytes)

[1] Executor 15 lost, task 66.0 and 90.0 on it

410532:17/09/11 13:46:32 [dispatcher-event-loop-47] INFO 
YarnSchedulerBackend$YarnDriverEndpoint: Disabling executor 15.
410900:17/09/11 13:46:33 [dispatcher-event-loop-34] INFO TaskSetManager: 
Starting task 66.1 in stage 1046.0 (TID 111400, hidden-baidu-host.baidu.com, 
executor 70, partition 66, PROCESS_LOCAL, 6237 bytes)

[2] Task 66.0 killed by 66.1

411315:17/09/11 13:46:37 [task-result-getter-2] INFO TaskSetManager: Killing 
attempt 0 for task 66.0 in stage 1046.0 (TID 110761) on 
hidden-baidu-host.baidu.com as the attempt 1 succeeded on 
hidden-baidu-host.baidu.com
411316:17/09/11 13:46:37 [task-result-getter-2] INFO TaskSetManager: Finished 
task 66.1 in stage 1046.0 (TID 111400) in 3545 ms on 
hidden-baidu-host.baidu.com (executor 70) (115/120)

[3] Executor 7 lost, task 0.0 72.0 7.0 on it

411390:17/09/11 13:46:37 [dispatcher-event-loop-24] INFO 
YarnSchedulerBackend$YarnDriverEndpoint: Disabling executor 7.
416014:17/09/11 13:46:59 [dag-scheduler-event-loop] INFO DAGScheduler: 
ShuffleMapStage 1046 (rdd at AFDEntry.scala:116) finished in 94.577 s

[4] ShuffleMapStage 1046.0 finished, missing partition trigger resubmitted 
1046.1

416019:17/09/1 13:46:59 [dag-scheduler-event- oop] INFO DAGScheduler: 
Resubmitting ShuffleMapStage 1046 (rdd at AFDEntry.scala:116) because some of 
its tasks had failed: 0, 72, 79
416020:17/09/11 13:46:59 [dag-scheduler-event-loop] INFO DAGScheduler: 
Submitting ShuffleMapStage 1046 (MapPartitionsRDD[5321] at rdd at 
AFDEntry.scala:116), which has no missing parents
416030:17/09/11 13:46:59 [dag-scheduler-event-loop] INFO DAGScheduler: 
Submitting 3 missing tasks from ShuffleMapStage 1046 (MapPartitionsRDD[5321] at 
rdd at AFDEntry.scala:116)
416032:17/09/11 13:46:59 [dag-scheduler-event-loop] INFO YarnClusterScheduler: 
Adding task set 1046.1 with 3 tasks
416034:17/09/11 13:46:59 [dispatcher-event-loop-21] INFO TaskSetManager: 
Starting task 0.0 in stage 1046.1 (TID 112788, hidden-baidu-host.baidu.com, 
executor 37, partition 0, PROCESS_LOCAL, 6237 bytes)
416037:17/09/11 13:46:59 [dispatcher-event-loop-23] INFO TaskSetManager: 
Starting task 1.0 in stage 1046.1 (TID 112789, 
yq01-inf-nmg01-spark03-20160817113538.yq01.baidu.com, executor 69, partition 
72, PROCESS_LOCAL, 6237 bytes)
416039:17/09/11 13:46:59 [dispatcher-event-loop-23] INFO TaskSetManager: 
Starting task 2.0 in stage 1046.1 (TID 112790, hidden-baidu-host.baidu.com, 
executor 26, partition 79, PROCESS_LOCAL, 6237 bytes)

[5] ShuffleMapStage 1046.1 still running, the attempted task killed by other 
trigger the Resubmitted event

416646:17/09/11 13:47:01 [dispatcher-event-loop-26] WARN TaskSetManager: Lost 
task 66.0 in stage 1046.0 (TID 110761, hidden-baidu-host.baidu.com, executor 
15): ExecutorLostFailure (executor 15 exited caused by one of the running 
tasks) Reason: Container marked as failed: 
container_1502719603300_158941_01_104857616 on host: 
hidden-baidu-host.baidu.com. Exit status: -100. Diagnostics: Container released 
on a *lost* node
416647:17/09/11 13:47:01 

[jira] [Commented] (SPARK-18838) High latency of event processing for large jobs

2017-08-08 Thread Li Yuanjian (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16119265#comment-16119265
 ] 

Li Yuanjian commented on SPARK-18838:
-

I'm facing the same problem with [~milesc]
{quote}
We do not use dynamic allocation, and our applications frequently hang 
completely after seeing this log message, not just the UI.
{quote}
Should we solve these problems by dividing in 2 parts. The first is we should 
ensure not to drop the event and the second is like discussion above, improve 
the performance by several ways. We try to do the first in SPARK-21560 and the 
corresponding PR is testing in our pre-online env, do you have any suggestions 
for that? [~vanzin] Thanks.

> High latency of event processing for large jobs
> ---
>
> Key: SPARK-18838
> URL: https://issues.apache.org/jira/browse/SPARK-18838
> Project: Spark
>  Issue Type: Improvement
>Affects Versions: 2.0.0
>Reporter: Sital Kedia
> Attachments: perfResults.pdf, SparkListernerComputeTime.xlsx
>
>
> Currently we are observing the issue of very high event processing delay in 
> driver's `ListenerBus` for large jobs with many tasks. Many critical 
> component of the scheduler like `ExecutorAllocationManager`, 
> `HeartbeatReceiver` depend on the `ListenerBus` events and this delay might 
> hurt the job performance significantly or even fail the job.  For example, a 
> significant delay in receiving the `SparkListenerTaskStart` might cause 
> `ExecutorAllocationManager` manager to mistakenly remove an executor which is 
> not idle.  
> The problem is that the event processor in `ListenerBus` is a single thread 
> which loops through all the Listeners for each event and processes each event 
> synchronously 
> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala#L94.
>  This single threaded processor often becomes the bottleneck for large jobs.  
> Also, if one of the Listener is very slow, all the listeners will pay the 
> price of delay incurred by the slow listener. In addition to that a slow 
> listener can cause events to be dropped from the event queue which might be 
> fatal to the job.
> To solve the above problems, we propose to get rid of the event queue and the 
> single threaded event processor. Instead each listener will have its own 
> dedicate single threaded executor service . When ever an event is posted, it 
> will be submitted to executor service of all the listeners. The Single 
> threaded executor service will guarantee in order processing of the events 
> per listener.  The queue used for the executor service will be bounded to 
> guarantee we do not grow the memory indefinitely. The downside of this 
> approach is separate event queue per listener will increase the driver memory 
> footprint. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21560) Add hold mode for the LiveListenerBus

2017-07-28 Thread Li Yuanjian (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16106030#comment-16106030
 ] 

Li Yuanjian commented on SPARK-21560:
-

Something wrong with the sync between PR and JIRA, the PR corresponding to this 
: https://github.com/apache/spark/pull/18760

> Add hold mode for the LiveListenerBus
> -
>
> Key: SPARK-21560
> URL: https://issues.apache.org/jira/browse/SPARK-21560
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.1.0, 2.2.0
>Reporter: Li Yuanjian
>
> As the comments in SPARK-18838, we also face the same problem about critical 
> events dropped while the event queue is full. 
> There's no doubt that improving the performance of the processing thread is 
> important, whether the solution is multithreading or any others like 
> SPARK-20776, but maybe we still need the hold strategy when the event queue 
> is full, and restart after some room released. The hold strategy open or not 
> and the empty rate should both configurable.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-21560) Add hold mode for the LiveListenerBus

2017-07-28 Thread Li Yuanjian (JIRA)
Li Yuanjian created SPARK-21560:
---

 Summary: Add hold mode for the LiveListenerBus
 Key: SPARK-21560
 URL: https://issues.apache.org/jira/browse/SPARK-21560
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 2.2.0, 2.1.0
Reporter: Li Yuanjian


As the comments in SPARK-18838, we also face the same problem about critical 
events dropped while the event queue is full. 
There's no doubt that improving the performance of the processing thread is 
important, whether the solution is multithreading or any others like 
SPARK-20776, but maybe we still need the hold strategy when the event queue is 
full, and restart after some room released. The hold strategy open or not and 
the empty rate should both configurable.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21435) Empty files should be skipped while write to file

2017-07-17 Thread Li Yuanjian (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16089442#comment-16089442
 ] 

Li Yuanjian commented on SPARK-21435:
-

[~sowen] I tested the patch in our scenario and add a UT, I think it can skip 
empty files in Parquet.

> Empty files should be skipped while write to file
> -
>
> Key: SPARK-21435
> URL: https://issues.apache.org/jira/browse/SPARK-21435
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Li Yuanjian
>Priority: Minor
>
> Consider of this scenario, source table has many partitions and data files, 
> after the query filter, only a few data write to the destination dir.
> In this case the destination dir or table will have many empty files or files 
> only have schema meta(parquet format), I know we can use coalesce but skip 
> the empty file may be more better in this case.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-21435) Empty files should be skipped while write to file

2017-07-17 Thread Li Yuanjian (JIRA)
Li Yuanjian created SPARK-21435:
---

 Summary: Empty files should be skipped while write to file
 Key: SPARK-21435
 URL: https://issues.apache.org/jira/browse/SPARK-21435
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.2.0
Reporter: Li Yuanjian


Consider of this scenario, source table has many partitions and data files, 
after the query filter, only a few data write to the destination dir.
In this case the destination dir or table will have many empty files or files 
only have schema meta(parquet format), I know we can use coalesce but skip the 
empty file may be more better in this case.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-20408) Get glob path in parallel to reduce resolve relation time

2017-04-20 Thread Li Yuanjian (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-20408?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Li Yuanjian updated SPARK-20408:

Summary: Get glob path in parallel to reduce resolve relation time  (was: 
Get glob path in parallel to boost resolve relation time)

> Get glob path in parallel to reduce resolve relation time
> -
>
> Key: SPARK-20408
> URL: https://issues.apache.org/jira/browse/SPARK-20408
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.1.0
>Reporter: Li Yuanjian
> Fix For: 2.2.0
>
>
> The datasource read from a path with wildcard like below will cause a long 
> time waiting(each star may represent 100~1000 file or dir), especially in a 
> cross region env, driver and hdfs in different region, the drawback will 
> enlarge.
> bq. spark.read.text("/log/product/201704/\*/\*/\*/\*")
> Optimize strategy is same with bulkListLeafFiles in InMemoryFileIndex, get 
> the  wildcard path in parallel.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-20408) Get glob path in parallel to boost resolve relation time

2017-04-20 Thread Li Yuanjian (JIRA)
Li Yuanjian created SPARK-20408:
---

 Summary: Get glob path in parallel to boost resolve relation time
 Key: SPARK-20408
 URL: https://issues.apache.org/jira/browse/SPARK-20408
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.1.0
Reporter: Li Yuanjian
 Fix For: 2.2.0


The datasource read from a path with wildcard like below will cause a long time 
waiting(each star may represent 100~1000 file or dir), especially in a cross 
region env, driver and hdfs in different region, the drawback will enlarge.
bq. spark.read.text("/log/product/201704/\*/\*/\*/\*")
Optimize strategy is same with bulkListLeafFiles in InMemoryFileIndex, get the  
wildcard path in parallel.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-18700) getCached in HiveMetastoreCatalog not thread safe cause driver OOM

2016-12-19 Thread Li Yuanjian (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18700?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Li Yuanjian updated SPARK-18700:

Affects Version/s: 2.1.1

> getCached in HiveMetastoreCatalog not thread safe cause driver OOM
> --
>
> Key: SPARK-18700
> URL: https://issues.apache.org/jira/browse/SPARK-18700
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.1, 2.0.0, 2.1.1
>Reporter: Li Yuanjian
>
> In our spark sql platform, each query use same HiveContext and 
> independent thread, new data will append to tables as new partitions every 
> 30min. After a new partition added to table T, we should call refreshTable to 
> clear T’s cache in cachedDataSourceTables to make the new partition 
> searchable. 
> For the table have more partitions and files(much bigger than 
> spark.sql.sources.parallelPartitionDiscovery.threshold), a new query of table 
> T will start a job to fetch all FileStatus in listLeafFiles function. Because 
> of the huge number of files, the job will run several seconds, during the 
> time, new queries of table T will also start new jobs to fetch FileStatus 
> because of the function of getCache is not thread safe. Final cause a driver 
> OOM.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-18700) getCached in HiveMetastoreCatalog not thread safe cause driver OOM

2016-12-08 Thread Li Yuanjian (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18700?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15718580#comment-15718580
 ] 

Li Yuanjian edited comment on SPARK-18700 at 12/9/16 4:47 AM:
--

Give a PR for this, add StripedLock for each table's relation in cache, not for 
the whole cachedDataSourceTables.



was (Author: xuanyuan):
I'll add PR for this soon, add ReadWriteLock for each table's relation in 
cache, not for the whole cachedDataSourceTables.


> getCached in HiveMetastoreCatalog not thread safe cause driver OOM
> --
>
> Key: SPARK-18700
> URL: https://issues.apache.org/jira/browse/SPARK-18700
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.1, 2.0.0
>Reporter: Li Yuanjian
>
> In our spark sql platform, each query use same HiveContext and 
> independent thread, new data will append to tables as new partitions every 
> 30min. After a new partition added to table T, we should call refreshTable to 
> clear T’s cache in cachedDataSourceTables to make the new partition 
> searchable. 
> For the table have more partitions and files(much bigger than 
> spark.sql.sources.parallelPartitionDiscovery.threshold), a new query of table 
> T will start a job to fetch all FileStatus in listLeafFiles function. Because 
> of the huge number of files, the job will run several seconds, during the 
> time, new queries of table T will also start new jobs to fetch FileStatus 
> because of the function of getCache is not thread safe. Final cause a driver 
> OOM.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18700) getCached in HiveMetastoreCatalog not thread safe cause driver OOM

2016-12-03 Thread Li Yuanjian (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18700?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15718580#comment-15718580
 ] 

Li Yuanjian commented on SPARK-18700:
-

I'll add PR for this soon, add ReadWriteLock for each table's relation in 
cache, not for the whole cachedDataSourceTables.


> getCached in HiveMetastoreCatalog not thread safe cause driver OOM
> --
>
> Key: SPARK-18700
> URL: https://issues.apache.org/jira/browse/SPARK-18700
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.1, 2.0.0
>Reporter: Li Yuanjian
>
> In our spark sql platform, each query use same HiveContext and 
> independent thread, new data will append to tables as new partitions every 
> 30min. After a new partition added to table T, we should call refreshTable to 
> clear T’s cache in cachedDataSourceTables to make the new partition 
> searchable. 
> For the table have more partitions and files(much bigger than 
> spark.sql.sources.parallelPartitionDiscovery.threshold), a new query of table 
> T will start a job to fetch all FileStatus in listLeafFiles function. Because 
> of the huge number of files, the job will run several seconds, during the 
> time, new queries of table T will also start new jobs to fetch FileStatus 
> because of the function of getCache is not thread safe. Final cause a driver 
> OOM.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-18700) getCached in HiveMetastoreCatalog not thread safe cause driver OOM

2016-12-03 Thread Li Yuanjian (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18700?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Li Yuanjian updated SPARK-18700:

Description: 
In our spark sql platform, each query use same HiveContext and independent 
thread, new data will append to tables as new partitions every 30min. After a 
new partition added to table T, we should call refreshTable to clear T’s cache 
in cachedDataSourceTables to make the new partition searchable. 
For the table have more partitions and files(much bigger than 
spark.sql.sources.parallelPartitionDiscovery.threshold), a new query of table T 
will start a job to fetch all FileStatus in listLeafFiles function. Because of 
the huge number of files, the job will run several seconds, during the time, 
new queries of table T will also start new jobs to fetch FileStatus because of 
the function of getCache is not thread safe. Final cause a driver OOM.

  was:
In our spark sql platform, each query use same HiveContext and independent 
thread, new data will append to tables as new partitions every 30min. After a 
new partition added to table T, we should call refreshTable to clear T’s cache 
in cachedDataSourceTables
to make the new partition searchable. 
For the table have more partitions and files(much bigger than 
spark.sql.sources.parallelPartitionDiscovery.threshold), a new query of table T 
will start a job to fetch all FileStatus in listLeafFiles function. Because of 
the huge number of files, the job will run several seconds, during the time, 
new queries of table T will also start new jobs to fetch FileStatus because of 
the function of getCache is not thread safe. Final cause a driver OOM.


> getCached in HiveMetastoreCatalog not thread safe cause driver OOM
> --
>
> Key: SPARK-18700
> URL: https://issues.apache.org/jira/browse/SPARK-18700
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.1, 2.0.0
>Reporter: Li Yuanjian
>
> In our spark sql platform, each query use same HiveContext and 
> independent thread, new data will append to tables as new partitions every 
> 30min. After a new partition added to table T, we should call refreshTable to 
> clear T’s cache in cachedDataSourceTables to make the new partition 
> searchable. 
> For the table have more partitions and files(much bigger than 
> spark.sql.sources.parallelPartitionDiscovery.threshold), a new query of table 
> T will start a job to fetch all FileStatus in listLeafFiles function. Because 
> of the huge number of files, the job will run several seconds, during the 
> time, new queries of table T will also start new jobs to fetch FileStatus 
> because of the function of getCache is not thread safe. Final cause a driver 
> OOM.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-18700) getCached in HiveMetastoreCatalog not thread safe cause driver OOM

2016-12-03 Thread Li Yuanjian (JIRA)
Li Yuanjian created SPARK-18700:
---

 Summary: getCached in HiveMetastoreCatalog not thread safe cause 
driver OOM
 Key: SPARK-18700
 URL: https://issues.apache.org/jira/browse/SPARK-18700
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.0.0, 1.6.1
Reporter: Li Yuanjian


In our spark sql platform, each query use same HiveContext and independent 
thread, new data will append to tables as new partitions every 30min. After a 
new partition added to table T, we should call refreshTable to clear T’s cache 
in cachedDataSourceTables
to make the new partition searchable. 
For the table have more partitions and files(much bigger than 
spark.sql.sources.parallelPartitionDiscovery.threshold), a new query of table T 
will start a job to fetch all FileStatus in listLeafFiles function. Because of 
the huge number of files, the job will run several seconds, during the time, 
new queries of table T will also start new jobs to fetch FileStatus because of 
the function of getCache is not thread safe. Final cause a driver OOM.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org