[jira] [Commented] (SPARK-24882) separate responsibilities of the data source v2 read API

2018-07-25 Thread Ryan Blue (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556896#comment-16556896
 ] 

Ryan Blue commented on SPARK-24882:
---

Sounds fine, but it's getting close and I wouldn't be surprised if it didn't 
make it. I'll make sure I spend some time after you respond to my questions on 
the doc. Right now, I think it could probably use a couple of iterations to 
clean up the interfaces and remove duplication. If we can get that done in a 
week, then great.

> separate responsibilities of the data source v2 read API
> 
>
> Key: SPARK-24882
> URL: https://issues.apache.org/jira/browse/SPARK-24882
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Wenchen Fan
>Assignee: Wenchen Fan
>Priority: Major
>
> Data source V2 is out for a while, see the SPIP 
> [here|https://docs.google.com/document/d/1n_vUVbF4KD3gxTmkNEon5qdQ-Z8qU5Frf6WMQZ6jJVM/edit?usp=sharing].
>  We have already migrated most of the built-in streaming data sources to the 
> V2 API, and the file source migration is in progress. During the migration, 
> we found several problems and want to address them before we stabilize the V2 
> API.
> To solve these problems, we need to separate responsibilities in the data 
> source v2 read API. Details please see the attached google doc: 
> https://docs.google.com/document/d/1DDXCTCrup4bKWByTalkXWgavcPdvur8a4eEu8x1BzPM/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24882) separate responsibilities of the data source v2 read API

2018-07-25 Thread Wenchen Fan (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556885#comment-16556885
 ] 

Wenchen Fan commented on SPARK-24882:
-

We don't need to rush for 2.4, but would be great if everything goes well and 
we make it in 2.4.

> separate responsibilities of the data source v2 read API
> 
>
> Key: SPARK-24882
> URL: https://issues.apache.org/jira/browse/SPARK-24882
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Wenchen Fan
>Assignee: Wenchen Fan
>Priority: Major
>
> Data source V2 is out for a while, see the SPIP 
> [here|https://docs.google.com/document/d/1n_vUVbF4KD3gxTmkNEon5qdQ-Z8qU5Frf6WMQZ6jJVM/edit?usp=sharing].
>  We have already migrated most of the built-in streaming data sources to the 
> V2 API, and the file source migration is in progress. During the migration, 
> we found several problems and want to address them before we stabilize the V2 
> API.
> To solve these problems, we need to separate responsibilities in the data 
> source v2 read API. Details please see the attached google doc: 
> https://docs.google.com/document/d/1DDXCTCrup4bKWByTalkXWgavcPdvur8a4eEu8x1BzPM/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-24374) SPIP: Support Barrier Execution Mode in Apache Spark

2018-07-25 Thread Xiao Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24374?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiao Li updated SPARK-24374:

Affects Version/s: (was: 3.0.0)
   2.4.0

> SPIP: Support Barrier Execution Mode in Apache Spark
> 
>
> Key: SPARK-24374
> URL: https://issues.apache.org/jira/browse/SPARK-24374
> Project: Spark
>  Issue Type: Epic
>  Components: ML, Spark Core
>Affects Versions: 2.4.0
>Reporter: Xiangrui Meng
>Assignee: Xiangrui Meng
>Priority: Major
>  Labels: Hydrogen, SPIP
> Attachments: SPIP_ Support Barrier Scheduling in Apache Spark.pdf
>
>
> (See details in the linked/attached SPIP doc.)
> {quote}
> The proposal here is to add a new scheduling model to Apache Spark so users 
> can properly embed distributed DL training as a Spark stage to simplify the 
> distributed training workflow. For example, Horovod uses MPI to implement 
> all-reduce to accelerate distributed TensorFlow training. The computation 
> model is different from MapReduce used by Spark. In Spark, a task in a stage 
> doesn’t depend on any other tasks in the same stage, and hence it can be 
> scheduled independently. In MPI, all workers start at the same time and pass 
> messages around. To embed this workload in Spark, we need to introduce a new 
> scheduling model, tentatively named “barrier scheduling”, which launches 
> tasks at the same time and provides users enough information and tooling to 
> embed distributed DL training. Spark can also provide an extra layer of fault 
> tolerance in case some tasks failed in the middle, where Spark would abort 
> all tasks and restart the stage.
> {quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-24630) SPIP: Support SQLStreaming in Spark

2018-07-25 Thread Genmao Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556837#comment-16556837
 ] 

Genmao Yu edited comment on SPARK-24630 at 7/26/18 3:24 AM:


[~zsxwing] Is there plan to better support SQL on streaming?  like provide 
stream table ddl, window and watermark syntax on stream etc.


was (Author: unclegen):
[~zsxwing] Is there plan to better support SQL on streaming?  like provide 
stream table ddl, window ** and watermark  ** syntax on stream etc.

> SPIP: Support SQLStreaming in Spark
> ---
>
> Key: SPARK-24630
> URL: https://issues.apache.org/jira/browse/SPARK-24630
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.2.0, 2.2.1
>Reporter: Jackey Lee
>Priority: Minor
>  Labels: SQLStreaming
> Attachments: SQLStreaming SPIP.pdf
>
>
> At present, KafkaSQL, Flink SQL(which is actually based on Calcite), 
> SQLStream, StormSQL all provide a stream type SQL interface, with which users 
> with little knowledge about streaming,  can easily develop a flow system 
> processing model. In Spark, we can also support SQL API based on 
> StructStreamig.
> To support for SQL Streaming, there are two key points: 
> 1, Analysis should be able to parse streaming type SQL. 
> 2, Analyzer should be able to map metadata information to the corresponding 
> Relation. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24630) SPIP: Support SQLStreaming in Spark

2018-07-25 Thread Genmao Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556837#comment-16556837
 ] 

Genmao Yu commented on SPARK-24630:
---

[~zsxwing] Is there plan to better support SQL on streaming?  like provide 
stream table ddl, window ** and watermark  ** syntax on stream etc.

> SPIP: Support SQLStreaming in Spark
> ---
>
> Key: SPARK-24630
> URL: https://issues.apache.org/jira/browse/SPARK-24630
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.2.0, 2.2.1
>Reporter: Jackey Lee
>Priority: Minor
>  Labels: SQLStreaming
> Attachments: SQLStreaming SPIP.pdf
>
>
> At present, KafkaSQL, Flink SQL(which is actually based on Calcite), 
> SQLStream, StormSQL all provide a stream type SQL interface, with which users 
> with little knowledge about streaming,  can easily develop a flow system 
> processing model. In Spark, we can also support SQL API based on 
> StructStreamig.
> To support for SQL Streaming, there are two key points: 
> 1, Analysis should be able to parse streaming type SQL. 
> 2, Analyzer should be able to map metadata information to the corresponding 
> Relation. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24921) SparkStreaming steadily increasing job generation delay due to apparent URLClassLoader contention

2018-07-25 Thread Hyukjin Kwon (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556826#comment-16556826
 ] 

Hyukjin Kwon commented on SPARK-24921:
--

[~tommyshiou], is this rather a question? If so, it should better be asked to 
mailing list (see https://spark.apache.org/community.html)

> SparkStreaming steadily increasing job generation delay due to apparent 
> URLClassLoader contention
> -
>
> Key: SPARK-24921
> URL: https://issues.apache.org/jira/browse/SPARK-24921
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core, Web UI
>Affects Versions: 2.3.1
>Reporter: Tommy S
>Priority: Major
>
> I'm seeing an issue where the job generation time of my spark streaming job 
> is steadily increasing after some time.
> Looking at the thread dumps I see that the JobGenerator thread is BLOCKED 
> waiting for URLClassPath.getLoader synchronized method:
> {noformat}
> "JobGenerator" #153 daemon prio=5 os_prio=0 tid=0x02dad800 nid=0x253c 
> waiting for monitor entry [0x7f4b311c2000]
>    java.lang.Thread.State: BLOCKED (on object monitor)
>     at sun.misc.URLClassPath.getNextLoader(URLClassPath.java:469)
>     - waiting to lock <0x7f4be023f940> (a sun.misc.URLClassPath)
>     at sun.misc.URLClassPath.findResource(URLClassPath.java:214)
>     at java.net.URLClassLoader$2.run(URLClassLoader.java:569)
>     at java.net.URLClassLoader$2.run(URLClassLoader.java:567)
>     at java.security.AccessController.doPrivileged(Native Method)
>     at java.net.URLClassLoader.findResource(URLClassLoader.java:566)
>     at java.lang.ClassLoader.getResource(ClassLoader.java:1096)
>     at java.lang.ClassLoader.getResource(ClassLoader.java:1091)
>     at 
> java.net.URLClassLoader.getResourceAsStream(URLClassLoader.java:232)
>     at java.lang.Class.getResourceAsStream(Class.java:2223)
>     at 
> org.apache.spark.util.ClosureCleaner$.getClassReader(ClosureCleaner.scala:40)
>     at 
> org.apache.spark.util.ClosureCleaner$.getInnerClosureClasses(ClosureCleaner.scala:84)
>     at 
> org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:224)
>     at 
> org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159)
>     at org.apache.spark.SparkContext.clean(SparkContext.scala:2299)
>     at 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKeyWithClassTag$1.apply(PairRDDFunctions.scala:89)
>     at 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKeyWithClassTag$1.apply(PairRDDFunctions.scala:77)
>     at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>     at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>     at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
>     at 
> org.apache.spark.rdd.PairRDDFunctions.combineByKeyWithClassTag(PairRDDFunctions.scala:77)
>     at 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKey$1.apply(PairRDDFunctions.scala:119)
>     at 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKey$1.apply(PairRDDFunctions.scala:119)
>     at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>     at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>     at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
>     at 
> org.apache.spark.rdd.PairRDDFunctions.combineByKey(PairRDDFunctions.scala:117)
>     at 
> org.apache.spark.streaming.dstream.ShuffledDStream.compute(ShuffledDStream.scala:42)
>     at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
>     at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
>     at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
>     at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
>     at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
>     at 
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
>     at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
>     at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
>     at scala.Option.orElse(Option.scala:289)
>     at 
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
>     at 
> 

[jira] [Commented] (SPARK-24914) totalSize is not a good estimate for broadcast joins

2018-07-25 Thread Hyukjin Kwon (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556824#comment-16556824
 ] 

Hyukjin Kwon commented on SPARK-24914:
--

cc [~ZenWzh]

> totalSize is not a good estimate for broadcast joins
> 
>
> Key: SPARK-24914
> URL: https://issues.apache.org/jira/browse/SPARK-24914
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Bruce Robbins
>Priority: Major
>
> When determining whether to do a broadcast join, Spark estimates the size of 
> the smaller table as follows:
>  - if totalSize is defined and greater than 0, use it.
>  - else, if rawDataSize is defined and greater than 0, use it
>  - else, use spark.sql.defaultSizeInBytes (default: Long.MaxValue)
> Therefore, Spark prefers totalSize over rawDataSize.
> Unfortunately, totalSize is often quite a bit smaller than the actual table 
> size, since it represents the size of the table's files on disk. Parquet and 
> Orc files, for example, are encoded and compressed. This can result in the 
> JVM throwing an OutOfMemoryError while Spark is loading the table into a 
> HashedRelation, or when Spark actually attempts to broadcast the data.
> On the other hand, rawDataSize represents the uncompressed size of the 
> dataset, according to Hive documentation. This seems like a pretty good 
> number to use in preference to totalSize. However, due to HIVE-20079, this 
> value is simply #columns * #rows. Once that bug is fixed, it may be a 
> superior statistic, at least for managed tables.
> In the meantime, we could apply a configurable "fudge factor" to totalSize, 
> at least for types of files that are encoded and compressed. Hive has the 
> setting hive.stats.deserialization.factor, which defaults to 1.0, and is 
> described as follows:
> {quote}in the absence of uncompressed/raw data size, total file size will be 
> used for statistics annotation. But the file may be compressed, encoded and 
> serialized which may be lesser in size than the actual uncompressed/raw data 
> size. This factor will be multiplied to file size to estimate the raw data 
> size.
> {quote}
> Also, I propose a configuration setting to allow the user to completely 
> ignore rawDataSize, since that value is broken (due to HIVE-20079). When that 
> configuration setting is set to true, Spark would instead estimate the table 
> as follows:
> - if totalSize is defined and greater than 0, use totalSize*fudgeFactor.
>  - else, use spark.sql.defaultSizeInBytes (default: Long.MaxValue)
> Caveat: This mitigates the issue only for Hive tables. It does not help much 
> when the user is reading files using {{spark.read.parquet}}, unless we apply 
> the same fudge factor there.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24925) input bytesRead metrics fluctuate from time to time

2018-07-25 Thread yucai (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556820#comment-16556820
 ] 

yucai commented on SPARK-24925:
---

[~cloud_fan], [~xiaoli] , [~kiszk] , any comments?

> input bytesRead metrics fluctuate from time to time
> ---
>
> Key: SPARK-24925
> URL: https://issues.apache.org/jira/browse/SPARK-24925
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.1
>Reporter: yucai
>Priority: Major
> Attachments: bytesRead.gif
>
>
> input bytesRead metrics fluctuate from time to time, it is worse when 
> pushdown enabled.
> Query
> {code:java}
> CREATE TABLE dev AS
> SELECT
> ...
> FROM lstg_item cold, lstg_item_vrtn v
> WHERE cold.auct_end_dt = CAST(SUBSTR('2018-03-18 00:00:00',1,10) AS DATE)
> AND v.auct_end_dt = CAST(SUBSTR('2018-03-18 00:00:00',1,10) AS DATE)
> ...
> {code}
> Issue
> See attached bytesRead.gif, input bytesRead shows 48GB, 52GB, 51GB, 50GB, 
> 54GB, 53GB ... 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-24925) input bytesRead metrics fluctuate from time to time

2018-07-25 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24925?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-24925:


Assignee: Apache Spark

> input bytesRead metrics fluctuate from time to time
> ---
>
> Key: SPARK-24925
> URL: https://issues.apache.org/jira/browse/SPARK-24925
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.1
>Reporter: yucai
>Assignee: Apache Spark
>Priority: Major
> Attachments: bytesRead.gif
>
>
> input bytesRead metrics fluctuate from time to time, it is worse when 
> pushdown enabled.
> Query
> {code:java}
> CREATE TABLE dev AS
> SELECT
> ...
> FROM lstg_item cold, lstg_item_vrtn v
> WHERE cold.auct_end_dt = CAST(SUBSTR('2018-03-18 00:00:00',1,10) AS DATE)
> AND v.auct_end_dt = CAST(SUBSTR('2018-03-18 00:00:00',1,10) AS DATE)
> ...
> {code}
> Issue
> See attached bytesRead.gif, input bytesRead shows 48GB, 52GB, 51GB, 50GB, 
> 54GB, 53GB ... 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24925) input bytesRead metrics fluctuate from time to time

2018-07-25 Thread Apache Spark (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556819#comment-16556819
 ] 

Apache Spark commented on SPARK-24925:
--

User 'yucai' has created a pull request for this issue:
https://github.com/apache/spark/pull/21791

> input bytesRead metrics fluctuate from time to time
> ---
>
> Key: SPARK-24925
> URL: https://issues.apache.org/jira/browse/SPARK-24925
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.1
>Reporter: yucai
>Priority: Major
> Attachments: bytesRead.gif
>
>
> input bytesRead metrics fluctuate from time to time, it is worse when 
> pushdown enabled.
> Query
> {code:java}
> CREATE TABLE dev AS
> SELECT
> ...
> FROM lstg_item cold, lstg_item_vrtn v
> WHERE cold.auct_end_dt = CAST(SUBSTR('2018-03-18 00:00:00',1,10) AS DATE)
> AND v.auct_end_dt = CAST(SUBSTR('2018-03-18 00:00:00',1,10) AS DATE)
> ...
> {code}
> Issue
> See attached bytesRead.gif, input bytesRead shows 48GB, 52GB, 51GB, 50GB, 
> 54GB, 53GB ... 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-24925) input bytesRead metrics fluctuate from time to time

2018-07-25 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24925?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-24925:


Assignee: (was: Apache Spark)

> input bytesRead metrics fluctuate from time to time
> ---
>
> Key: SPARK-24925
> URL: https://issues.apache.org/jira/browse/SPARK-24925
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.1
>Reporter: yucai
>Priority: Major
> Attachments: bytesRead.gif
>
>
> input bytesRead metrics fluctuate from time to time, it is worse when 
> pushdown enabled.
> Query
> {code:java}
> CREATE TABLE dev AS
> SELECT
> ...
> FROM lstg_item cold, lstg_item_vrtn v
> WHERE cold.auct_end_dt = CAST(SUBSTR('2018-03-18 00:00:00',1,10) AS DATE)
> AND v.auct_end_dt = CAST(SUBSTR('2018-03-18 00:00:00',1,10) AS DATE)
> ...
> {code}
> Issue
> See attached bytesRead.gif, input bytesRead shows 48GB, 52GB, 51GB, 50GB, 
> 54GB, 53GB ... 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-24288) Enable preventing predicate pushdown

2018-07-25 Thread Hyukjin Kwon (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556801#comment-16556801
 ] 

Hyukjin Kwon edited comment on SPARK-24288 at 7/26/18 2:56 AM:
---

[~smilegator] should we resolve this {{Won't fix}} for now (and create another 
JIRA) or edit this JIRA?


was (Author: hyukjin.kwon):
[~smilegator] should we resolve this {{Won't fix}} for now?

> Enable preventing predicate pushdown
> 
>
> Key: SPARK-24288
> URL: https://issues.apache.org/jira/browse/SPARK-24288
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Tomasz Gawęda
>Priority: Major
> Attachments: SPARK-24288.simple.patch
>
>
> Issue discussed on Mailing List: 
> [http://apache-spark-developers-list.1001551.n3.nabble.com/Preventing-predicate-pushdown-td23976.html]
> While working with JDBC datasource I saw that many "or" clauses with 
> non-equality operators causes huge performance degradation of SQL query 
> to database (DB2). For example: 
> val df = spark.read.format("jdbc").(other options to parallelize 
> load).load() 
> df.where(s"(date1 > $param1 and (date1 < $param2 or date1 is null) or x 
>  > 100)").show() // in real application whose predicates were pushed 
> many many lines below, many ANDs and ORs 
> If I use cache() before where, there is no predicate pushdown of this 
> "where" clause. However, in production system caching many sources is a 
> waste of memory (especially is pipeline is long and I must do cache many 
> times).There are also few more workarounds, but it would be great if Spark 
> will support preventing predicate pushdown by user.
>  
> For example: df.withAnalysisBarrier().where(...) ?
>  
> Note, that this should not be a global configuration option. If I read 2 
> DataFrames, df1 and df2, I would like to specify that df1 should not have 
> some predicates pushed down, but some may be, but df2 should have all 
> predicates pushed down, even if target query joins df1 and df2. As far as I 
> understand Spark optimizer, if we use functions like `withAnalysisBarrier` 
> and put AnalysisBarrier explicitly in logical plan, then predicates won't be 
> pushed down on this particular DataFrames and PP will be still possible on 
> the second one.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24925) input bytesRead metrics fluctuate from time to time

2018-07-25 Thread yucai (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556818#comment-16556818
 ] 

yucai commented on SPARK-24925:
---

I think there could be two issues.

In FileScanRDD
1. ColumnarBatch's bytesRead need to be updated for every 4096 * 1000 rows, 
which makes the metrics out of date.
2. When advancing to the next file, FileScanRDD always adds the whole file 
length into bytesRead, which is inaccurate (pushdown reads much less data).

For problem 1, in https://github.com/apache/spark/pull/21791, I tried to update 
the ColumnarBatch's bytesRead for each batch.
For problem 2, updateBytesReadWithFileSize says, "If we can't get the bytes 
read from the FS stats, fall back to the file size", can we update only when 
this situation happens?

> input bytesRead metrics fluctuate from time to time
> ---
>
> Key: SPARK-24925
> URL: https://issues.apache.org/jira/browse/SPARK-24925
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.1
>Reporter: yucai
>Priority: Major
> Attachments: bytesRead.gif
>
>
> input bytesRead metrics fluctuate from time to time, it is worse when 
> pushdown enabled.
> Query
> {code:java}
> CREATE TABLE dev AS
> SELECT
> ...
> FROM lstg_item cold, lstg_item_vrtn v
> WHERE cold.auct_end_dt = CAST(SUBSTR('2018-03-18 00:00:00',1,10) AS DATE)
> AND v.auct_end_dt = CAST(SUBSTR('2018-03-18 00:00:00',1,10) AS DATE)
> ...
> {code}
> Issue
> See attached bytesRead.gif, input bytesRead shows 48GB, 52GB, 51GB, 50GB, 
> 54GB, 53GB ... 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-24905) Spark 2.3 Internal URL env variable

2018-07-25 Thread Hyukjin Kwon (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24905?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon updated SPARK-24905:
-
Priority: Major  (was: Critical)

> Spark 2.3 Internal URL env variable
> ---
>
> Key: SPARK-24905
> URL: https://issues.apache.org/jira/browse/SPARK-24905
> Project: Spark
>  Issue Type: Bug
>  Components: Kubernetes
>Affects Versions: 2.3.1
>Reporter: Björn Wenzel
>Priority: Major
>
> Currently the Kubernetes Master internal URL is hardcoded in the 
> Constants.scala file 
> ([https://github.com/apache/spark/blob/85fe1297e35bcff9cf86bd53fee615e140ee5bfb/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala#L75)]
> In some cases these URL should be changed e.g. if the Certificate is valid 
> for another Hostname.
> Is it possible to make this URL a property like: 
> spark.kubernetes.authenticate.driver.hostname?
> Kubernetes The Hard Way maintained by Kelsey Hightower for example uses 
> kubernetes.default as hostname, this will produce again a 
> SSLPeerUnverifiedException.
>  
> Here is the use of the Hardcoded Host: 
> [https://github.com/apache/spark/blob/85fe1297e35bcff9cf86bd53fee615e140ee5bfb/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala#L52]
>  maybe this could be changed like the KUBERNETES_NAMESPACE property in Line 
> 53.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24905) Spark 2.3 Internal URL env variable

2018-07-25 Thread Hyukjin Kwon (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24905?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556817#comment-16556817
 ] 

Hyukjin Kwon commented on SPARK-24905:
--

(please avoid to set Critical+ which is usually reserved for committers)

> Spark 2.3 Internal URL env variable
> ---
>
> Key: SPARK-24905
> URL: https://issues.apache.org/jira/browse/SPARK-24905
> Project: Spark
>  Issue Type: Bug
>  Components: Kubernetes
>Affects Versions: 2.3.1
>Reporter: Björn Wenzel
>Priority: Major
>
> Currently the Kubernetes Master internal URL is hardcoded in the 
> Constants.scala file 
> ([https://github.com/apache/spark/blob/85fe1297e35bcff9cf86bd53fee615e140ee5bfb/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala#L75)]
> In some cases these URL should be changed e.g. if the Certificate is valid 
> for another Hostname.
> Is it possible to make this URL a property like: 
> spark.kubernetes.authenticate.driver.hostname?
> Kubernetes The Hard Way maintained by Kelsey Hightower for example uses 
> kubernetes.default as hostname, this will produce again a 
> SSLPeerUnverifiedException.
>  
> Here is the use of the Hardcoded Host: 
> [https://github.com/apache/spark/blob/85fe1297e35bcff9cf86bd53fee615e140ee5bfb/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala#L52]
>  maybe this could be changed like the KUBERNETES_NAMESPACE property in Line 
> 53.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24897) DAGScheduler should not unregisterMapOutput and increaseEpoch repeatedly for stage fetchFailed

2018-07-25 Thread Hyukjin Kwon (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24897?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556806#comment-16556806
 ] 

Hyukjin Kwon commented on SPARK-24897:
--

I couldn't follow it too.

> DAGScheduler should not unregisterMapOutput and increaseEpoch repeatedly for 
> stage fetchFailed
> --
>
> Key: SPARK-24897
> URL: https://issues.apache.org/jira/browse/SPARK-24897
> Project: Spark
>  Issue Type: Bug
>  Components: Scheduler, Spark Core
>Affects Versions: 2.1.0, 2.3.1
>Reporter: liupengcheng
>Priority: Major
>
> In Spark2.1, when a stage fetchfailed DAGScheduler will retry both this stage 
> and it's parent stage, however, when the parent stage is resubmitted and 
> start running, the mapstatuses can 
> still be invalidate by the stage's outstanding task due to fetchfailed.
> The stage's outstanding task might unregister the mapstatuses with new epoch, 
> thus causing 
> the parent stage repeated MetadataFetchFailed and finally failling the Job.
>  
>  
> {code:java}
> 2018-07-23,01:52:33,012 WARN org.apache.spark.scheduler.TaskSetManager: Lost 
> task 174.0 in stage 71.0 (TID 154127, , executor 96): 
> FetchFailed(BlockManagerId(4945, , 22409), shuffleId=24, mapId=667, 
> reduceId=174, message= org.apache.spark.shuffle.FetchFailedException: Failed 
> to connect to /:22409
> 2018-07-23,01:52:33,013 INFO org.apache.spark.scheduler.DAGScheduler: Marking 
> ShuffleMapStage 71 (map at DeviceLocateMain.scala:238) as failed due to a 
> fetch failure from ShuffleMapStage 69 ($plus$plus at 
> DeviceLocateMain.scala:236) 
> 2018-07-23,01:52:33,014 INFO org.apache.spark.scheduler.DAGScheduler: 
> ShuffleMapStage 71 (map at DeviceLocateMain.scala:238) failed in 246.856 s 
> 2018-07-23,01:52:33,014 INFO org.apache.spark.scheduler.DAGScheduler: 
> Resubmitting ShuffleMapStage 69 ($plus$plus at DeviceLocateMain.scala:236) 
> and ShuffleMapStage 71 (map at DeviceLocateMain.scala:238) due to fetch 
> failure 
> 2018-07-23,01:52:36,004 WARN org.apache.spark.scheduler.TaskSetManager: Lost 
> task 120.0 in stage 71.0 (TID 154073, , executor 286): 
> FetchFailed(BlockManagerId(4208, , 22409), shuffleId=24, mapId=241, 
> reduceId=120, message= org.apache.spark.shuffle.FetchFailedException: Failed 
> to connect to /:22409 
> 2018-07-23,01:52:36,005 INFO org.apache.spark.scheduler.DAGScheduler: 
> Resubmitting ShuffleMapStage 69 ($plus$plus at DeviceLocateMain.scala:236) 
> and ShuffleMapStage 71 (map at DeviceLocateMain.scala:238) due to fetch 
> failure 
> 2018-07-23,01:52:36,017 INFO org.apache.spark.storage.MemoryStore: Block 
> broadcast_63_piece0 stored as bytes in memory (estimated size 4.0 MB, free 
> 26.7 MB) 
> 2018-07-23,01:52:36,025 INFO org.apache.spark.storage.BlockManagerInfo: 
> Removed broadcast_59_piece1 on :52349 in memory (size: 4.0 MB, free: 
> 3.0 GB) 
> 2018-07-23,01:52:36,029 INFO org.apache.spark.storage.BlockManagerInfo: 
> Removed broadcast_61_piece6 on :52349 in memory (size: 4.0 MB, free: 
> 3.0 GB) 
> 2018-07-23,01:52:36,079 INFO org.apache.spark.deploy.yarn.YarnAllocator: 
> Canceling requests for 0 executor containers 
> 2018-07-23,01:52:36,079 WARN org.apache.spark.deploy.yarn.YarnAllocator: 
> Expected to find pending requests, but found none.
>  2018-07-23,01:52:36,094 INFO org.apache.spark.storage.BlockManagerInfo: 
> Added broadcast_63_piece0 in memory on :56780 (size: 4.0 MB, free: 3.7 
> GB) 
> 2018-07-23,01:52:36,095 INFO org.apache.spark.storage.MemoryStore: Block 
> broadcast_63_piece1 stored as bytes in memory (estimated size 4.0 MB, free 
> 30.7 MB) 
> 2018-07-23,01:52:36,107 INFO org.apache.spark.storage.BlockManagerInfo: Added 
> broadcast_63_piece1 in memory on :56780 (size: 4.0 MB, free: 3.7 GB) 
> 2018-07-23,01:52:36,108 INFO org.apache.spark.storage.MemoryStore: Block 
> broadcast_63_piece2 stored as bytes in memory (estimated size 4.0 MB, free 
> 34.7 MB) 
> 2018-07-23,01:52:36,108 INFO org.apache.spark.storage.BlockManagerInfo: Added 
> broadcast_63_piece2 in memory on :56780 (size: 4.0 MB, free: 3.7 GB) 
> 2018-07-23,01:52:36,108 INFO org.apache.spark.storage.MemoryStore: Block 
> broadcast_63_piece3 stored as bytes in memory (estimated size 4.0 MB, free 
> 38.7 MB) 
> 2018-07-23,01:52:36,132 INFO org.apache.spark.storage.BlockManagerInfo: Added 
> broadcast_63_piece3 in memory on :56780 (size: 4.0 MB, free: 3.7 GB) 
> 2018-07-23,01:52:36,132 INFO org.apache.spark.storage.MemoryStore: Block 
> broadcast_63_piece4 stored as bytes in memory (estimated size 3.8 MB, free 
> 42.5 MB) 
> 2018-07-23,01:52:36,132 INFO org.apache.spark.storage.BlockManagerInfo: Added 
> broadcast_63_piece4 in memory on :56780 (size: 3.8 MB, free: 3.7 GB) 
> 2018-07-23,01:52:36,132 INFO org.apache.spark.MapOutputTracker: Broadcast 
> mapstatuses 

[jira] [Commented] (SPARK-24288) Enable preventing predicate pushdown

2018-07-25 Thread Hyukjin Kwon (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556801#comment-16556801
 ] 

Hyukjin Kwon commented on SPARK-24288:
--

[~smilegator] should we resolve this {{Won't fix}} for now?

> Enable preventing predicate pushdown
> 
>
> Key: SPARK-24288
> URL: https://issues.apache.org/jira/browse/SPARK-24288
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Tomasz Gawęda
>Priority: Major
> Attachments: SPARK-24288.simple.patch
>
>
> Issue discussed on Mailing List: 
> [http://apache-spark-developers-list.1001551.n3.nabble.com/Preventing-predicate-pushdown-td23976.html]
> While working with JDBC datasource I saw that many "or" clauses with 
> non-equality operators causes huge performance degradation of SQL query 
> to database (DB2). For example: 
> val df = spark.read.format("jdbc").(other options to parallelize 
> load).load() 
> df.where(s"(date1 > $param1 and (date1 < $param2 or date1 is null) or x 
>  > 100)").show() // in real application whose predicates were pushed 
> many many lines below, many ANDs and ORs 
> If I use cache() before where, there is no predicate pushdown of this 
> "where" clause. However, in production system caching many sources is a 
> waste of memory (especially is pipeline is long and I must do cache many 
> times).There are also few more workarounds, but it would be great if Spark 
> will support preventing predicate pushdown by user.
>  
> For example: df.withAnalysisBarrier().where(...) ?
>  
> Note, that this should not be a global configuration option. If I read 2 
> DataFrames, df1 and df2, I would like to specify that df1 should not have 
> some predicates pushed down, but some may be, but df2 should have all 
> predicates pushed down, even if target query joins df1 and df2. As far as I 
> understand Spark optimizer, if we use functions like `withAnalysisBarrier` 
> and put AnalysisBarrier explicitly in logical plan, then predicates won't be 
> pushed down on this particular DataFrames and PP will be still possible on 
> the second one.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-24925) input bytesRead metrics fluctuate from time to time

2018-07-25 Thread yucai (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24925?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yucai updated SPARK-24925:
--
Attachment: bytesRead.gif

> input bytesRead metrics fluctuate from time to time
> ---
>
> Key: SPARK-24925
> URL: https://issues.apache.org/jira/browse/SPARK-24925
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.1
>Reporter: yucai
>Priority: Major
> Attachments: bytesRead.gif
>
>
> input bytesRead metrics fluctuate from time to time, it is worse when 
> pushdown enabled.
> Query
> {code:java}
> CREATE TABLE dev AS
> SELECT
> ...
> FROM lstg_item cold, lstg_item_vrtn v
> WHERE cold.auct_end_dt = CAST(SUBSTR('2018-03-18 00:00:00',1,10) AS DATE)
> AND v.auct_end_dt = CAST(SUBSTR('2018-03-18 00:00:00',1,10) AS DATE)
> ...
> {code}
> Issue
> See attached gif, input bytesRead shows 48GB, 52GB, 51GB, 50GB, 54GB, 53GB 
> ... 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-24925) input bytesRead metrics fluctuate from time to time

2018-07-25 Thread yucai (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24925?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yucai updated SPARK-24925:
--
Description: 
input bytesRead metrics fluctuate from time to time, it is worse when pushdown 
enabled.

Query
{code:java}
CREATE TABLE dev AS
SELECT
...
FROM lstg_item cold, lstg_item_vrtn v
WHERE cold.auct_end_dt = CAST(SUBSTR('2018-03-18 00:00:00',1,10) AS DATE)
AND v.auct_end_dt = CAST(SUBSTR('2018-03-18 00:00:00',1,10) AS DATE)
...
{code}

Issue
See attached bytesRead.gif, input bytesRead shows 48GB, 52GB, 51GB, 50GB, 54GB, 
53GB ... 

  was:
input bytesRead metrics fluctuate from time to time, it is worse when pushdown 
enabled.

Query
{code:java}
CREATE TABLE dev AS
SELECT
...
FROM lstg_item cold, lstg_item_vrtn v
WHERE cold.auct_end_dt = CAST(SUBSTR('2018-03-18 00:00:00',1,10) AS DATE)
AND v.auct_end_dt = CAST(SUBSTR('2018-03-18 00:00:00',1,10) AS DATE)
...
{code}

Issue
See attached gif, input bytesRead shows 48GB, 52GB, 51GB, 50GB, 54GB, 53GB ... 


> input bytesRead metrics fluctuate from time to time
> ---
>
> Key: SPARK-24925
> URL: https://issues.apache.org/jira/browse/SPARK-24925
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.1
>Reporter: yucai
>Priority: Major
> Attachments: bytesRead.gif
>
>
> input bytesRead metrics fluctuate from time to time, it is worse when 
> pushdown enabled.
> Query
> {code:java}
> CREATE TABLE dev AS
> SELECT
> ...
> FROM lstg_item cold, lstg_item_vrtn v
> WHERE cold.auct_end_dt = CAST(SUBSTR('2018-03-18 00:00:00',1,10) AS DATE)
> AND v.auct_end_dt = CAST(SUBSTR('2018-03-18 00:00:00',1,10) AS DATE)
> ...
> {code}
> Issue
> See attached bytesRead.gif, input bytesRead shows 48GB, 52GB, 51GB, 50GB, 
> 54GB, 53GB ... 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24925) input bytesRead metrics fluctuate from time to time

2018-07-25 Thread yucai (JIRA)
yucai created SPARK-24925:
-

 Summary: input bytesRead metrics fluctuate from time to time
 Key: SPARK-24925
 URL: https://issues.apache.org/jira/browse/SPARK-24925
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.3.1
Reporter: yucai


input bytesRead metrics fluctuate from time to time, it is worse when pushdown 
enabled.

Query
{code:java}
CREATE TABLE dev AS
SELECT
...
FROM lstg_item cold, lstg_item_vrtn v
WHERE cold.auct_end_dt = CAST(SUBSTR('2018-03-18 00:00:00',1,10) AS DATE)
AND v.auct_end_dt = CAST(SUBSTR('2018-03-18 00:00:00',1,10) AS DATE)
...
{code}

Issue
See attached gif, input bytesRead shows 48GB, 52GB, 51GB, 50GB, 54GB, 53GB ... 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-24832) Improve inputMetrics's bytesRead update for ColumnarBatch

2018-07-25 Thread yucai (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24832?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yucai updated SPARK-24832:
--
Summary: Improve inputMetrics's bytesRead update for ColumnarBatch  (was: 
When pushdown enabled, input bytesRead metrics is easy to fluctuate from time 
to time)

> Improve inputMetrics's bytesRead update for ColumnarBatch
> -
>
> Key: SPARK-24832
> URL: https://issues.apache.org/jira/browse/SPARK-24832
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core, SQL
>Affects Versions: 2.3.1
>Reporter: yucai
>Priority: Major
>
> Improve inputMetrics's bytesRead update for ColumnarBatch



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-24832) When pushdown enabled, input bytesRead metrics is easy to fluctuate from time to time

2018-07-25 Thread yucai (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24832?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yucai updated SPARK-24832:
--
Summary: When pushdown enabled, input bytesRead metrics is easy to 
fluctuate from time to time  (was: Improve inputMetrics's bytesRead update for 
ColumnarBatch)

> When pushdown enabled, input bytesRead metrics is easy to fluctuate from time 
> to time
> -
>
> Key: SPARK-24832
> URL: https://issues.apache.org/jira/browse/SPARK-24832
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core, SQL
>Affects Versions: 2.3.1
>Reporter: yucai
>Priority: Major
>
> Improve inputMetrics's bytesRead update for ColumnarBatch



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24867) Add AnalysisBarrier to DataFrameWriter

2018-07-25 Thread Saisai Shao (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556760#comment-16556760
 ] 

Saisai Shao commented on SPARK-24867:
-

I see, thanks! Please let me know when the JIRA is opened.

> Add AnalysisBarrier to DataFrameWriter 
> ---
>
> Key: SPARK-24867
> URL: https://issues.apache.org/jira/browse/SPARK-24867
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0, 2.3.1
>Reporter: Xiao Li
>Assignee: Xiao Li
>Priority: Blocker
> Fix For: 2.3.2
>
>
> {code}
>   val udf1 = udf({(x: Int, y: Int) => x + y})
>   val df = spark.range(0, 3).toDF("a")
> .withColumn("b", udf1($"a", udf1($"a", lit(10
>   df.cache()
>   df.write.saveAsTable("t")
>   df.write.saveAsTable("t1")
> {code}
> Cache is not being used because the plans do not match with the cached plan. 
> This is a regression caused by the changes we made in AnalysisBarrier, since 
> not all the Analyzer rules are idempotent. We need to fix it to Spark 2.3.2



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-12911) Cacheing a dataframe causes array comparisons to fail (in filter / where) after 1.6

2018-07-25 Thread David Vogelbacher (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-12911?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556470#comment-16556470
 ] 

David Vogelbacher commented on SPARK-12911:
---

Hey [~hyukjin.kwon] [~sdicocco][~a1ray], I just reproduced this on master. I 
executed the following in the spark-shell:
{noformat}
scala> import org.apache.spark.sql.functions
import org.apache.spark.sql.functions

scala> val df = Seq(Array("a", "b"), Array("c", "d")).toDF("arrayCol")
df: org.apache.spark.sql.DataFrame = [arrayCol: array]

scala> 
df.filter(df.col("arrayCol").eqNullSafe(functions.array(functions.lit("a"), 
functions.lit("b".show()
++
|arrayCol|
++
|  [a, b]|
++


scala> 
df.cache().filter(df.col("arrayCol").eqNullSafe(functions.array(functions.lit("a"),
 functions.lit("b".show()
++
|arrayCol|
++
++
{noformat}

This seems to be the same issue?

> Cacheing a dataframe causes array comparisons to fail (in filter / where) 
> after 1.6
> ---
>
> Key: SPARK-12911
> URL: https://issues.apache.org/jira/browse/SPARK-12911
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.0
> Environment: OSX 10.11.1, Scala 2.11.7, Spark 1.6.0
>Reporter: Jesse English
>Priority: Major
>
> When doing a *where* operation on a dataframe and testing for equality on an 
> array type, after 1.6 no valid comparisons are made if the dataframe has been 
> cached.  If it has not been cached, the results are as expected.
> This appears to be related to the underlying unsafe array data types.
> {code:title=test.scala|borderStyle=solid}
> test("test array comparison") {
> val vectors: Vector[Row] =  Vector(
>   Row.fromTuple("id_1" -> Array(0L, 2L)),
>   Row.fromTuple("id_2" -> Array(0L, 5L)),
>   Row.fromTuple("id_3" -> Array(0L, 9L)),
>   Row.fromTuple("id_4" -> Array(1L, 0L)),
>   Row.fromTuple("id_5" -> Array(1L, 8L)),
>   Row.fromTuple("id_6" -> Array(2L, 4L)),
>   Row.fromTuple("id_7" -> Array(5L, 6L)),
>   Row.fromTuple("id_8" -> Array(6L, 2L)),
>   Row.fromTuple("id_9" -> Array(7L, 0L))
> )
> val data: RDD[Row] = sc.parallelize(vectors, 3)
> val schema = StructType(
>   StructField("id", StringType, false) ::
> StructField("point", DataTypes.createArrayType(LongType, false), 
> false) ::
> Nil
> )
> val sqlContext = new SQLContext(sc)
> val dataframe = sqlContext.createDataFrame(data, schema)
> val targetPoint:Array[Long] = Array(0L,9L)
> //Cacheing is the trigger to cause the error (no cacheing causes no error)
> dataframe.cache()
> //This is the line where it fails
> //java.util.NoSuchElementException: next on empty iterator
> //However we know that there is a valid match
> val targetRow = dataframe.where(dataframe("point") === 
> array(targetPoint.map(value => lit(value)): _*)).first()
> assert(targetRow != null)
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-24916) Fix type coercion for IN expression with subquery

2018-07-25 Thread Yuming Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24916?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yuming Wang resolved SPARK-24916.
-
Resolution: Duplicate

> Fix type coercion for IN expression with subquery
> -
>
> Key: SPARK-24916
> URL: https://issues.apache.org/jira/browse/SPARK-24916
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0, 2.3.0
>Reporter: Yuming Wang
>Priority: Major
>
> How to reproduce:
> {code:sql}
> CREATE TEMPORARY VIEW t4 AS SELECT * FROM VALUES
>   (CAST(1 AS DOUBLE), CAST(2 AS STRING), CAST(3 AS STRING))
> AS t1(t4a, t4b, t4c);
> CREATE TEMPORARY VIEW t5 AS SELECT * FROM VALUES
>   (CAST(1 AS DECIMAL(18, 0)), CAST(2 AS STRING), CAST(3 AS BIGINT))
> AS t1(t5a, t5b, t5c);
> SELECT * FROM t4
> WHERE
> (t4a, t4b, t4c) IN (SELECT t5a,
>t5b,
>t5c
> FROM t5);
> {code}
> Will throw exception:
> {noformat}
> org.apache.spark.sql.AnalysisException
> cannot resolve '(named_struct('t4a', t4.`t4a`, 't4b', t4.`t4b`, 't4c', 
> t4.`t4c`) IN (listquery()))' due to data type mismatch: 
> The data type of one or more elements in the left hand side of an IN subquery
> is not compatible with the data type of the output of the subquery
> Mismatched columns:
> [(t4.`t4a`:double, t5.`t5a`:decimal(18,0)), (t4.`t4c`:string, 
> t5.`t5c`:bigint)]
> Left side:
> [double, string, string].
> Right side:
> [decimal(18,0), string, bigint].;
> {noformat}
> But it success on Spark 2.1.x.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24867) Add AnalysisBarrier to DataFrameWriter

2018-07-25 Thread Xiao Li (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556447#comment-16556447
 ] 

Xiao Li commented on SPARK-24867:
-

[~jerryshao] This ticket was just resolved. [~lian cheng] might find another 
serious bug. He will open a JIRA soon. 

> Add AnalysisBarrier to DataFrameWriter 
> ---
>
> Key: SPARK-24867
> URL: https://issues.apache.org/jira/browse/SPARK-24867
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0, 2.3.1
>Reporter: Xiao Li
>Assignee: Xiao Li
>Priority: Blocker
> Fix For: 2.3.2
>
>
> {code}
>   val udf1 = udf({(x: Int, y: Int) => x + y})
>   val df = spark.range(0, 3).toDF("a")
> .withColumn("b", udf1($"a", udf1($"a", lit(10
>   df.cache()
>   df.write.saveAsTable("t")
>   df.write.saveAsTable("t1")
> {code}
> Cache is not being used because the plans do not match with the cached plan. 
> This is a regression caused by the changes we made in AnalysisBarrier, since 
> not all the Analyzer rules are idempotent. We need to fix it to Spark 2.3.2



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-24867) Add AnalysisBarrier to DataFrameWriter

2018-07-25 Thread Xiao Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24867?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiao Li resolved SPARK-24867.
-
   Resolution: Fixed
Fix Version/s: 2.3.2

> Add AnalysisBarrier to DataFrameWriter 
> ---
>
> Key: SPARK-24867
> URL: https://issues.apache.org/jira/browse/SPARK-24867
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0, 2.3.1
>Reporter: Xiao Li
>Assignee: Xiao Li
>Priority: Blocker
> Fix For: 2.3.2
>
>
> {code}
>   val udf1 = udf({(x: Int, y: Int) => x + y})
>   val df = spark.range(0, 3).toDF("a")
> .withColumn("b", udf1($"a", udf1($"a", lit(10
>   df.cache()
>   df.write.saveAsTable("t")
>   df.write.saveAsTable("t1")
> {code}
> Cache is not being used because the plans do not match with the cached plan. 
> This is a regression caused by the changes we made in AnalysisBarrier, since 
> not all the Analyzer rules are idempotent. We need to fix it to Spark 2.3.2



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-24924) Add mapping for built-in Avro data source

2018-07-25 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24924?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-24924:


Assignee: Apache Spark

> Add mapping for built-in Avro data source
> -
>
> Key: SPARK-24924
> URL: https://issues.apache.org/jira/browse/SPARK-24924
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Dongjoon Hyun
>Assignee: Apache Spark
>Priority: Minor
>
> This issue aims to the followings.
>  # Like `com.databricks.spark.csv` mapping, we had better map 
> `com.databricks.spark.avro` to built-in Avro data source.
>  # Remove incorrect error message, `Please find an Avro package at ...`.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24924) Add mapping for built-in Avro data source

2018-07-25 Thread Apache Spark (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24924?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556427#comment-16556427
 ] 

Apache Spark commented on SPARK-24924:
--

User 'dongjoon-hyun' has created a pull request for this issue:
https://github.com/apache/spark/pull/21878

> Add mapping for built-in Avro data source
> -
>
> Key: SPARK-24924
> URL: https://issues.apache.org/jira/browse/SPARK-24924
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Dongjoon Hyun
>Priority: Minor
>
> This issue aims to the followings.
>  # Like `com.databricks.spark.csv` mapping, we had better map 
> `com.databricks.spark.avro` to built-in Avro data source.
>  # Remove incorrect error message, `Please find an Avro package at ...`.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-24924) Add mapping for built-in Avro data source

2018-07-25 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24924?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-24924:


Assignee: (was: Apache Spark)

> Add mapping for built-in Avro data source
> -
>
> Key: SPARK-24924
> URL: https://issues.apache.org/jira/browse/SPARK-24924
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Dongjoon Hyun
>Priority: Minor
>
> This issue aims to the followings.
>  # Like `com.databricks.spark.csv` mapping, we had better map 
> `com.databricks.spark.avro` to built-in Avro data source.
>  # Remove incorrect error message, `Please find an Avro package at ...`.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24924) Add mapping for built-in Avro data source

2018-07-25 Thread Dongjoon Hyun (JIRA)
Dongjoon Hyun created SPARK-24924:
-

 Summary: Add mapping for built-in Avro data source
 Key: SPARK-24924
 URL: https://issues.apache.org/jira/browse/SPARK-24924
 Project: Spark
  Issue Type: Sub-task
  Components: SQL
Affects Versions: 2.4.0
Reporter: Dongjoon Hyun


This issue aims to the followings.
 # Like `com.databricks.spark.csv` mapping, we had better map 
`com.databricks.spark.avro` to built-in Avro data source.
 # Remove incorrect error message, `Please find an Avro package at ...`.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24906) Enlarge split size for columnar file to ensure the task read enough data

2018-07-25 Thread Jason Guo (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24906?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556413#comment-16556413
 ] 

Jason Guo commented on SPARK-24906:
---

[~maropu]  [~viirya]  What do you think about this idea ?

> Enlarge split size for columnar file to ensure the task read enough data
> 
>
> Key: SPARK-24906
> URL: https://issues.apache.org/jira/browse/SPARK-24906
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.1
>Reporter: Jason Guo
>Priority: Critical
> Attachments: image-2018-07-24-20-26-32-441.png, 
> image-2018-07-24-20-28-06-269.png, image-2018-07-24-20-29-24-797.png, 
> image-2018-07-24-20-30-24-552.png
>
>
> For columnar file, such as, when spark sql read the table, each split will be 
> 128 MB by default since spark.sql.files.maxPartitionBytes is default to 
> 128MB. Even when user set it to a large value, such as 512MB, the task may 
> read only few MB or even hundreds of KB. Because the table (Parquet) may 
> consists of dozens of columns while the SQL only need few columns. And spark 
> will prune the unnecessary columns.
>  
> In this case, spark DataSourceScanExec can enlarge maxPartitionBytes 
> adaptively. 
> For example, there is 40 columns , 20 are integer while another 20 are long. 
> When use query on an integer type column and an long type column, the 
> maxPartitionBytes should be 20 times larger. (20*4+20*8) /  (4+8) = 20. 
>  
> With this optimization, the number of task will be smaller and the job will 
> run faster. More importantly, for a very large cluster (more the 10 thousand 
> nodes), it will relieve RM's schedule pressure.
>  
> Here is the test
>  
> The table named test2 has more than 40 columns and there are more than 5 TB 
> data each hour.
> When we issue a very simple query 
>  
> {code:java}
> select count(device_id) from test2 where date=20180708 and hour='23'{code}
>  
> There are 72176 tasks and the duration of the job is 4.8 minutes
> !image-2018-07-24-20-26-32-441.png!
>  
> Most tasks last less than 1 second and read less than 1.5 MB data
> !image-2018-07-24-20-28-06-269.png!
>  
> After the optimization, there are only 1615 tasks and the job last only 30 
> seconds. It almost 10 times faster.
> !image-2018-07-24-20-29-24-797.png!
>  
> The median of read data is 44.2MB. 
> !image-2018-07-24-20-30-24-552.png!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24923) DataSourceV2: Add CTAS and RTAS logical operations

2018-07-25 Thread Apache Spark (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556383#comment-16556383
 ] 

Apache Spark commented on SPARK-24923:
--

User 'rdblue' has created a pull request for this issue:
https://github.com/apache/spark/pull/21877

> DataSourceV2: Add CTAS and RTAS logical operations
> --
>
> Key: SPARK-24923
> URL: https://issues.apache.org/jira/browse/SPARK-24923
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 2.3.0, 2.3.1
>Reporter: Ryan Blue
>Priority: Major
>
> When SPARK-24252 and SPARK-24251 are in, next plans to implement from the 
> SPIP are CTAS and RTAS.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-24923) DataSourceV2: Add CTAS and RTAS logical operations

2018-07-25 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24923?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-24923:


Assignee: (was: Apache Spark)

> DataSourceV2: Add CTAS and RTAS logical operations
> --
>
> Key: SPARK-24923
> URL: https://issues.apache.org/jira/browse/SPARK-24923
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 2.3.0, 2.3.1
>Reporter: Ryan Blue
>Priority: Major
>
> When SPARK-24252 and SPARK-24251 are in, next plans to implement from the 
> SPIP are CTAS and RTAS.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-24923) DataSourceV2: Add CTAS and RTAS logical operations

2018-07-25 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24923?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-24923:


Assignee: Apache Spark

> DataSourceV2: Add CTAS and RTAS logical operations
> --
>
> Key: SPARK-24923
> URL: https://issues.apache.org/jira/browse/SPARK-24923
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 2.3.0, 2.3.1
>Reporter: Ryan Blue
>Assignee: Apache Spark
>Priority: Major
>
> When SPARK-24252 and SPARK-24251 are in, next plans to implement from the 
> SPIP are CTAS and RTAS.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-24921) SparkStreaming steadily increasing job generation delay due to apparent URLClassLoader contention

2018-07-25 Thread Tommy S (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24921?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tommy S updated SPARK-24921:

Component/s: Web UI

> SparkStreaming steadily increasing job generation delay due to apparent 
> URLClassLoader contention
> -
>
> Key: SPARK-24921
> URL: https://issues.apache.org/jira/browse/SPARK-24921
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core, Web UI
>Affects Versions: 2.3.1
>Reporter: Tommy S
>Priority: Major
>
> I'm seeing an issue where the job generation time of my spark streaming job 
> is steadily increasing after some time.
> Looking at the thread dumps I see that the JobGenerator thread is BLOCKED 
> waiting for URLClassPath.getLoader synchronized method:
> {noformat}
> "JobGenerator" #153 daemon prio=5 os_prio=0 tid=0x02dad800 nid=0x253c 
> waiting for monitor entry [0x7f4b311c2000]
>    java.lang.Thread.State: BLOCKED (on object monitor)
>     at sun.misc.URLClassPath.getNextLoader(URLClassPath.java:469)
>     - waiting to lock <0x7f4be023f940> (a sun.misc.URLClassPath)
>     at sun.misc.URLClassPath.findResource(URLClassPath.java:214)
>     at java.net.URLClassLoader$2.run(URLClassLoader.java:569)
>     at java.net.URLClassLoader$2.run(URLClassLoader.java:567)
>     at java.security.AccessController.doPrivileged(Native Method)
>     at java.net.URLClassLoader.findResource(URLClassLoader.java:566)
>     at java.lang.ClassLoader.getResource(ClassLoader.java:1096)
>     at java.lang.ClassLoader.getResource(ClassLoader.java:1091)
>     at 
> java.net.URLClassLoader.getResourceAsStream(URLClassLoader.java:232)
>     at java.lang.Class.getResourceAsStream(Class.java:2223)
>     at 
> org.apache.spark.util.ClosureCleaner$.getClassReader(ClosureCleaner.scala:40)
>     at 
> org.apache.spark.util.ClosureCleaner$.getInnerClosureClasses(ClosureCleaner.scala:84)
>     at 
> org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:224)
>     at 
> org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159)
>     at org.apache.spark.SparkContext.clean(SparkContext.scala:2299)
>     at 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKeyWithClassTag$1.apply(PairRDDFunctions.scala:89)
>     at 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKeyWithClassTag$1.apply(PairRDDFunctions.scala:77)
>     at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>     at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>     at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
>     at 
> org.apache.spark.rdd.PairRDDFunctions.combineByKeyWithClassTag(PairRDDFunctions.scala:77)
>     at 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKey$1.apply(PairRDDFunctions.scala:119)
>     at 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKey$1.apply(PairRDDFunctions.scala:119)
>     at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>     at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>     at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
>     at 
> org.apache.spark.rdd.PairRDDFunctions.combineByKey(PairRDDFunctions.scala:117)
>     at 
> org.apache.spark.streaming.dstream.ShuffledDStream.compute(ShuffledDStream.scala:42)
>     at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
>     at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
>     at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
>     at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
>     at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
>     at 
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
>     at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
>     at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
>     at scala.Option.orElse(Option.scala:289)
>     at 
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
>     at 
> org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:36)
>     at 
> 

[jira] [Created] (SPARK-24923) DataSourceV2: Add CTAS and RTAS logical operations

2018-07-25 Thread Ryan Blue (JIRA)
Ryan Blue created SPARK-24923:
-

 Summary: DataSourceV2: Add CTAS and RTAS logical operations
 Key: SPARK-24923
 URL: https://issues.apache.org/jira/browse/SPARK-24923
 Project: Spark
  Issue Type: Sub-task
  Components: SQL
Affects Versions: 2.3.1, 2.3.0
Reporter: Ryan Blue


When SPARK-24252 and SPARK-24251 are in, next plans to implement from the SPIP 
are CTAS and RTAS.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24802) Optimization Rule Exclusion

2018-07-25 Thread Apache Spark (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24802?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556310#comment-16556310
 ] 

Apache Spark commented on SPARK-24802:
--

User 'maryannxue' has created a pull request for this issue:
https://github.com/apache/spark/pull/21876

> Optimization Rule Exclusion
> ---
>
> Key: SPARK-24802
> URL: https://issues.apache.org/jira/browse/SPARK-24802
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Maryann Xue
>Assignee: Maryann Xue
>Priority: Major
> Fix For: 2.4.0
>
>
> Since Spark has provided fairly clear interfaces for adding user-defined 
> optimization rules, it would be nice to have an easy-to-use interface for 
> excluding an optimization rule from the Spark query optimizer as well.
> This would make customizing Spark optimizer easier and sometimes could 
> debugging issues too.
>  # Add a new config {{spark.sql.optimizer.excludedRules}}, with the value 
> being a list of rule names separated by comma.
>  # Modify the current {{batches}} method to remove the excluded rules from 
> the default batches. Log the rules that have been excluded.
>  # Split the existing default batches into "post-analysis batches" and 
> "optimization batches" so that only rules in the "optimization batches" can 
> be excluded.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-1137) ZK Persistence Engine crashes if stored data has wrong serialVersionUID

2018-07-25 Thread Apache Spark (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-1137?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556251#comment-16556251
 ] 

Apache Spark commented on SPARK-1137:
-

User 'aarondav' has created a pull request for this issue:
https://github.com/apache/spark/pull/4

> ZK Persistence Engine crashes if stored data has wrong serialVersionUID
> ---
>
> Key: SPARK-1137
> URL: https://issues.apache.org/jira/browse/SPARK-1137
> Project: Spark
>  Issue Type: Bug
>  Components: Deploy
>Affects Versions: 0.9.0
>Reporter: Aaron Davidson
>Assignee: Aaron Davidson
>Priority: Major
> Fix For: 1.0.0
>
>
> The ZooKeeperPersistenceEngine contains information about concurrently 
> existing Masters and Workers. This information, as the name suggests, is 
> persistent in the event of a Master failure/restart. If the Spark version is 
> upgraded, the Master will crash with a Java serialization exception when 
> trying to re-read the persisted data.
> Instead of crashing (indefinitely), the Master should probably just ignore 
> the prior data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24874) Allow hybrid of both barrier tasks and regular tasks in a stage

2018-07-25 Thread Reynold Xin (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556241#comment-16556241
 ] 

Reynold Xin commented on SPARK-24874:
-

Do we really need this? Seems like an uncommon use case.

 

> Allow hybrid of both barrier tasks and regular tasks in a stage
> ---
>
> Key: SPARK-24874
> URL: https://issues.apache.org/jira/browse/SPARK-24874
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 3.0.0
>Reporter: Jiang Xingbo
>Priority: Major
>
> Currently we only allow barrier tasks in a barrier stage, however, consider 
> the following query:
> {code}
> sc = new SparkContext(conf)
> val rdd1 = sc.parallelize(1 to 100, 10)
> val rdd2 = sc.parallelize(1 to 1000, 20).barrier().mapPartitions((it, ctx) => 
> it)
> val rdd = rdd1.union(rdd2).mapPartitions(t => t)
> {code}
> Now it requires 30 free slots to run `rdd.collect()`. Actually, we can launch 
> regular tasks to collect data from rdd1's partitions, they are not required 
> to be launched together. If we can do that, we only need 20 free slots to run 
> `rdd.collect()`.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-24860) Expose dynamic partition overwrite per write operation

2018-07-25 Thread Xiao Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24860?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiao Li resolved SPARK-24860.
-
   Resolution: Fixed
 Assignee: Koert Kuipers
Fix Version/s: 2.4.0

> Expose dynamic partition overwrite per write operation
> --
>
> Key: SPARK-24860
> URL: https://issues.apache.org/jira/browse/SPARK-24860
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.1
>Reporter: koert kuipers
>Assignee: Koert Kuipers
>Priority: Minor
> Fix For: 2.4.0
>
>
> This is a follow up to issue SPARK-20236
> Also see the discussion in pullreq https://github.com/apache/spark/pull/18714
> SPARK-20236 added a global setting spark.sql.sources.partitionOverwriteMode 
> to switch between static and dynamic overwrite of partitioned tables. It 
> would be nice if we could choose per partitioned overwrite operation whether 
> it's behavior is static or dynamic. The suggested syntax is:
> {noformat}
> df.write.option("partitionOverwriteMode", "dynamic").parquet...{noformat}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-23146) Support client mode for Kubernetes cluster backend

2018-07-25 Thread Matt Cheah (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-23146?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matt Cheah resolved SPARK-23146.

   Resolution: Fixed
Fix Version/s: 2.4.0

> Support client mode for Kubernetes cluster backend
> --
>
> Key: SPARK-23146
> URL: https://issues.apache.org/jira/browse/SPARK-23146
> Project: Spark
>  Issue Type: New Feature
>  Components: Kubernetes
>Affects Versions: 2.3.0
>Reporter: Anirudh Ramanathan
>Priority: Major
> Fix For: 2.4.0
>
>
> This issue tracks client mode support within Spark when running in the 
> Kubernetes cluster backend.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24915) Calling SparkSession.createDataFrame with schema can throw exception

2018-07-25 Thread Bryan Cutler (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24915?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556126#comment-16556126
 ] 

Bryan Cutler commented on SPARK-24915:
--

Hi [~stspencer], I've been trying fix similar issues, but this is a little 
different since the StructType makes _needSerializeAnyField==True, as you 
pointed out.  I agree that the current behavior is very confusing and should be 
fixed, but the related issue had to be pushed back to Spark 3.0 because it 
causes a behavior change.  Hopefully we can improve both these issues.  Until 
then, if you're not aware the intended way to define Row data if you care about 
a specific positioning is like this:

 
{code:java}
In [10]: MyRow = Row("field2", "field1")

In [11]: data = [
...: MyRow(Row(sub_field='world'), "hello")
...: ]

In [12]: df = spark.createDataFrame(data, schema=schema)

In [13]: df.show()
+---+--+
| field2|field1|
+---+--+
|[world]| hello|
+---+--+{code}

hope that helps

> Calling SparkSession.createDataFrame with schema can throw exception
> 
>
> Key: SPARK-24915
> URL: https://issues.apache.org/jira/browse/SPARK-24915
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.3.1
> Environment: Python 3.6.3
> PySpark 2.3.1 (installed via pip)
> OSX 10.12.6
>Reporter: Stephen Spencer
>Priority: Major
>
> There seems to be a bug in PySpark when using the PySparkSQL session to 
> create a dataframe with a pre-defined schema.
> Code to reproduce the error:
> {code:java}
> from pyspark import SparkConf, SparkContext
> from pyspark.sql import SparkSession
> from pyspark.sql.types import StructType, StructField, StringType, Row
> conf = SparkConf().setMaster("local").setAppName("repro") 
> context = SparkContext(conf=conf) 
> session = SparkSession(context)
> # Construct schema (the order of fields is important)
> schema = StructType([
> StructField('field2', StructType([StructField('sub_field', StringType(), 
> False)]), False),
> StructField('field1', StringType(), False),
> ])
> # Create data to populate data frame
> data = [
> Row(field1="Hello", field2=Row(sub_field='world'))
> ]
> # Attempt to create the data frame supplying the schema
> # this will throw a ValueError
> df = session.createDataFrame(data, schema=schema)
> df.show(){code}
> Running this throws a ValueError
> {noformat}
> Traceback (most recent call last):
> File "schema_bug.py", line 18, in 
> df = session.createDataFrame(data, schema=schema)
> File 
> "/Users/stephenspencer/benevolent/ai/neat/rex/.env/lib/python3.6/site-packages/pyspark/sql/session.py",
>  line 691, in createDataFrame
> rdd, schema = self._createFromLocal(map(prepare, data), schema)
> File 
> "/Users/stephenspencer/benevolent/ai/neat/rex/.env/lib/python3.6/site-packages/pyspark/sql/session.py",
>  line 423, in _createFromLocal
> data = [schema.toInternal(row) for row in data]
> File 
> "/Users/stephenspencer/benevolent/ai/neat/rex/.env/lib/python3.6/site-packages/pyspark/sql/session.py",
>  line 423, in 
> data = [schema.toInternal(row) for row in data]
> File 
> "/Users/stephenspencer/benevolent/ai/neat/rex/.env/lib/python3.6/site-packages/pyspark/sql/types.py",
>  line 601, in toInternal
> for f, v, c in zip(self.fields, obj, self._needConversion))
> File 
> "/Users/stephenspencer/benevolent/ai/neat/rex/.env/lib/python3.6/site-packages/pyspark/sql/types.py",
>  line 601, in 
> for f, v, c in zip(self.fields, obj, self._needConversion))
> File 
> "/Users/stephenspencer/benevolent/ai/neat/rex/.env/lib/python3.6/site-packages/pyspark/sql/types.py",
>  line 439, in toInternal
> return self.dataType.toInternal(obj)
> File 
> "/Users/stephenspencer/benevolent/ai/neat/rex/.env/lib/python3.6/site-packages/pyspark/sql/types.py",
>  line 619, in toInternal
> raise ValueError("Unexpected tuple %r with StructType" % obj)
> ValueError: Unexpected tuple 'Hello' with StructType{noformat}
> The problem seems to be here:
> https://github.com/apache/spark/blob/3d5c61e5fd24f07302e39b5d61294da79aa0c2f9/python/pyspark/sql/types.py#L603
> specifically the bit
> {code:java}
> zip(self.fields, obj, self._needConversion)
> {code}
> This zip statement seems to assume that obj and self.fields are ordered in 
> the same way, so that the elements of obj will correspond to the right fields 
> in the schema. However this is not true, a Row orders its elements 
> alphabetically but the fields in the schema are in whatever order they are 
> specified. In this example field2 is being initialised with the field1 
> element 'Hello'. If you re-order the fields in the schema to go (field1, 
> field2), the given example works without error.
> The schema in the repro is specifically designed to elicit the problem, the 
> fields are out of 

[jira] [Commented] (SPARK-24288) Enable preventing predicate pushdown

2018-07-25 Thread Apache Spark (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556119#comment-16556119
 ] 

Apache Spark commented on SPARK-24288:
--

User 'maryannxue' has created a pull request for this issue:
https://github.com/apache/spark/pull/21875

> Enable preventing predicate pushdown
> 
>
> Key: SPARK-24288
> URL: https://issues.apache.org/jira/browse/SPARK-24288
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Tomasz Gawęda
>Priority: Major
> Attachments: SPARK-24288.simple.patch
>
>
> Issue discussed on Mailing List: 
> [http://apache-spark-developers-list.1001551.n3.nabble.com/Preventing-predicate-pushdown-td23976.html]
> While working with JDBC datasource I saw that many "or" clauses with 
> non-equality operators causes huge performance degradation of SQL query 
> to database (DB2). For example: 
> val df = spark.read.format("jdbc").(other options to parallelize 
> load).load() 
> df.where(s"(date1 > $param1 and (date1 < $param2 or date1 is null) or x 
>  > 100)").show() // in real application whose predicates were pushed 
> many many lines below, many ANDs and ORs 
> If I use cache() before where, there is no predicate pushdown of this 
> "where" clause. However, in production system caching many sources is a 
> waste of memory (especially is pipeline is long and I must do cache many 
> times).There are also few more workarounds, but it would be great if Spark 
> will support preventing predicate pushdown by user.
>  
> For example: df.withAnalysisBarrier().where(...) ?
>  
> Note, that this should not be a global configuration option. If I read 2 
> DataFrames, df1 and df2, I would like to specify that df1 should not have 
> some predicates pushed down, but some may be, but df2 should have all 
> predicates pushed down, even if target query joins df1 and df2. As far as I 
> understand Spark optimizer, if we use functions like `withAnalysisBarrier` 
> and put AnalysisBarrier explicitly in logical plan, then predicates won't be 
> pushed down on this particular DataFrames and PP will be still possible on 
> the second one.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23146) Support client mode for Kubernetes cluster backend

2018-07-25 Thread Apache Spark (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-23146?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556098#comment-16556098
 ] 

Apache Spark commented on SPARK-23146:
--

User 'mccheah' has created a pull request for this issue:
https://github.com/apache/spark/pull/21874

> Support client mode for Kubernetes cluster backend
> --
>
> Key: SPARK-23146
> URL: https://issues.apache.org/jira/browse/SPARK-23146
> Project: Spark
>  Issue Type: New Feature
>  Components: Kubernetes
>Affects Versions: 2.3.0
>Reporter: Anirudh Ramanathan
>Priority: Major
>
> This issue tracks client mode support within Spark when running in the 
> Kubernetes cluster backend.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-24849) Convert StructType to DDL string

2018-07-25 Thread Xiao Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24849?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiao Li resolved SPARK-24849.
-
   Resolution: Fixed
 Assignee: Maxim Gekk
Fix Version/s: 2.4.0

> Convert StructType to DDL string
> 
>
> Key: SPARK-24849
> URL: https://issues.apache.org/jira/browse/SPARK-24849
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.1
>Reporter: Maxim Gekk
>Assignee: Maxim Gekk
>Priority: Minor
> Fix For: 2.4.0
>
>
> Need to add new methods which should convert a value of StructType to a 
> schema in DDL format . It should be possible to use the former string in new 
> table creation by just copy-pasting of new method results. The existing 
> methods simpleString(), catalogString() and sql() put ':' between top level 
> field name and its type, and wrap by the *struct* word
> {code}
> ds.schema.catalogString
> struct {code}
> Output of new method should be
> {code}
> metaData struct {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-24911) SHOW CREATE TABLE drops escaping of nested column names

2018-07-25 Thread Xiao Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24911?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiao Li resolved SPARK-24911.
-
   Resolution: Fixed
Fix Version/s: 2.4.0

> SHOW CREATE TABLE drops escaping of nested column names
> ---
>
> Key: SPARK-24911
> URL: https://issues.apache.org/jira/browse/SPARK-24911
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.1
>Reporter: Maxim Gekk
>Assignee: Maxim Gekk
>Priority: Major
> Fix For: 2.4.0
>
>
> Create a table with quoted nested column - *`b`*:
> {code:sql}
> create table `test` (`a` STRUCT<`b`:STRING>);
> {code}
> and show how the table was created:
> {code:sql}
> SHOW CREATE TABLE `test`
> {code}
> {code}
> CREATE TABLE `test`(`a` struct)
> {code}
> The column *b* becomes unquoted.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-24911) SHOW CREATE TABLE drops escaping of nested column names

2018-07-25 Thread Xiao Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24911?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiao Li reassigned SPARK-24911:
---

Assignee: Maxim Gekk

> SHOW CREATE TABLE drops escaping of nested column names
> ---
>
> Key: SPARK-24911
> URL: https://issues.apache.org/jira/browse/SPARK-24911
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.1
>Reporter: Maxim Gekk
>Assignee: Maxim Gekk
>Priority: Major
> Fix For: 2.4.0
>
>
> Create a table with quoted nested column - *`b`*:
> {code:sql}
> create table `test` (`a` STRUCT<`b`:STRING>);
> {code}
> and show how the table was created:
> {code:sql}
> SHOW CREATE TABLE `test`
> {code}
> {code}
> CREATE TABLE `test`(`a` struct)
> {code}
> The column *b* becomes unquoted.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-24922) Iterative rdd union + reduceByKey operations on small dataset leads to "No space left on device" error on account of lot of shuffle spill.

2018-07-25 Thread Dinesh Dharme (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24922?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dinesh Dharme updated SPARK-24922:
--
Description: 
I am trying to do few (union + reduceByKey) operations on a hiearchical dataset 
in a iterative fashion in rdd. The first few loops run fine but on the 
subsequent loops, the operations ends up using the whole scratch space provided 
to it. 

I have set the scratch directory, i.e. SPARK_LOCAL_DIRS , to be one having *100 
GB* space.

The heirarchical dataset, whose size is (< 400kB), remains constant throughout 
the iterations.

 I have tried the worker cleanup flag but it has no effect i.e. 
"spark.worker.cleanup.enabled=true"

 

Error : 

 
{noformat}
Caused by: java.io.IOException: No space left on device
at java.io.FileOutputStream.writeBytes(Native Method)
at java.io.FileOutputStream.write(FileOutputStream.java:326)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
at java.io.DataOutputStream.writeLong(DataOutputStream.java:224)
at 
org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFileAndCommit$1$$anonfun$apply$mcV$sp$1.apply$mcVJ$sp(IndexShuffleBlockResolver.scala:151)
at 
org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFileAndCommit$1$$anonfun$apply$mcV$sp$1.apply(IndexShuffleBlockResolver.scala:149)
at 
org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFileAndCommit$1$$anonfun$apply$mcV$sp$1.apply(IndexShuffleBlockResolver.scala:149)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofLong.foreach(ArrayOps.scala:246)
at 
org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFileAndCommit$1.apply$mcV$sp(IndexShuffleBlockResolver.scala:149)
at 
org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFileAndCommit$1.apply(IndexShuffleBlockResolver.scala:145)
at 
org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFileAndCommit$1.apply(IndexShuffleBlockResolver.scala:145)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at 
org.apache.spark.shuffle.IndexShuffleBlockResolver.writeIndexFileAndCommit(IndexShuffleBlockResolver.scala:153)
at 
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:73)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
{noformat}
 

*What I am trying to do (High Level)*:

I have a dataset of 5 different csv ( Parent, Child1, Child2, Child21, Child22 
) which are related in a hierarchical fashion as shown below. 

Parent-> Child1 -> Child2  -> Child21 

Parent-> Child1 -> Child2  -> Child22 

Each element in the tree has 14 columns (elementid, parentelement_id, cat1, 
cat2, num1, num2,., num10)

I am trying to aggregate the values of one column of Child21 into Child1 (i.e. 
2 levels up). I am doing the same for another column value of Child22 into 
Child1. Then I am merging these aggregated values at the same Child1 level.

This is present in the code at location : 

spark.rddexample.dummyrdd.tree.child1.events.Function1

 

 

*Code which replicates the issue*: 

1] [https://github.com/dineshdharme/SparkRddShuffleIssue]

 

*Steps to reproduce the issue :* 

1] Clone the above repository.

2] Put the csvs in the "issue-data" folder in the above repository at a hadoop 
location "hdfs:///tree/dummy/data/"

3] Set the spark scratch directory (SPARK_LOCAL_DIRS) to a folder which has 
large space. (> *100 GB*)

4] Run "sbt assembly"

5] Run the following command at the project location :(

/path/to/spark-2.3.0-bin-hadoop2.7/bin/spark-submit \
 --class spark.rddexample.dummyrdd.FunctionExecutor \
 --master local[2] \
 --deploy-mode client \
 --executor-memory 2G \
 --driver-memory 2G \
 target/scala-2.11/rdd-shuffle-assembly-0.1.0.jar \
 20 \
 hdfs:///tree/dummy/data/ \
 hdfs:///tree/dummy/results/   

  was:
I am trying to do few (union + reduceByKey) operations on a hiearchical dataset 
in a iterative fashion in rdd. The first few loops run fine but on the 
subsequent loops, the operations ends up using the whole scratch space provided 
to it. 

I have set the scratch directory, i.e. SPARK_LOCAL_DIRS , to be one having *100 
GB* space.

The heirarchical dataset, whose size is (< 400kB), remains constant throughout 
the iterations.

 

 

Error : 

 
{noformat}
Caused by: java.io.IOException: No space left on 

[jira] [Created] (SPARK-24922) Iterative rdd union + reduceByKey operations on small dataset leads to "No space left on device" error on account of lot of shuffle spill.

2018-07-25 Thread Dinesh Dharme (JIRA)
Dinesh Dharme created SPARK-24922:
-

 Summary: Iterative rdd union + reduceByKey operations on small 
dataset leads to "No space left on device" error on account of lot of shuffle 
spill.
 Key: SPARK-24922
 URL: https://issues.apache.org/jira/browse/SPARK-24922
 Project: Spark
  Issue Type: Bug
  Components: Shuffle
Affects Versions: 2.3.0
 Environment: Java 8, Scala 2.11.8, Spark 2.3.0, sbt 0.13.16

 
Reporter: Dinesh Dharme


I am trying to do few (union + reduceByKey) operations on a hiearchical dataset 
in a iterative fashion in rdd. The first few loops run fine but on the 
subsequent loops, the operations ends up using the whole scratch space provided 
to it. 

I have set the scratch directory, i.e. SPARK_LOCAL_DIRS , to be one having *100 
GB* space.

The heirarchical dataset, whose size is (< 400kB), remains constant throughout 
the iterations.

 

 

Error : 

 
{noformat}
Caused by: java.io.IOException: No space left on device
at java.io.FileOutputStream.writeBytes(Native Method)
at java.io.FileOutputStream.write(FileOutputStream.java:326)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
at java.io.DataOutputStream.writeLong(DataOutputStream.java:224)
at 
org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFileAndCommit$1$$anonfun$apply$mcV$sp$1.apply$mcVJ$sp(IndexShuffleBlockResolver.scala:151)
at 
org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFileAndCommit$1$$anonfun$apply$mcV$sp$1.apply(IndexShuffleBlockResolver.scala:149)
at 
org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFileAndCommit$1$$anonfun$apply$mcV$sp$1.apply(IndexShuffleBlockResolver.scala:149)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofLong.foreach(ArrayOps.scala:246)
at 
org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFileAndCommit$1.apply$mcV$sp(IndexShuffleBlockResolver.scala:149)
at 
org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFileAndCommit$1.apply(IndexShuffleBlockResolver.scala:145)
at 
org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFileAndCommit$1.apply(IndexShuffleBlockResolver.scala:145)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at 
org.apache.spark.shuffle.IndexShuffleBlockResolver.writeIndexFileAndCommit(IndexShuffleBlockResolver.scala:153)
at 
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:73)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
{noformat}
 

*What I am trying to do (High Level)*:

I have a dataset of 5 different csv ( Parent, Child1, Child2, Child21, Child22 
) which are related in a hierarchical fashion as shown below. 

Parent-> Child1 -> Child2  -> Child21 

Parent-> Child1 -> Child2  -> Child22 

Each element in the tree has 14 columns (elementid, parentelement_id, cat1, 
cat2, num1, num2,., num10)

I am trying to aggregate the values of one column of Child21 into Child1 (i.e. 
2 levels up). I am doing the same for another column value of Child22 into 
Child1. Then I am merging these aggregated values at the same Child1 level.

This is present in the code at location : 

spark.rddexample.dummyrdd.tree.child1.events.Function1

 

 

*Code which replicates the issue*: 

1] [https://github.com/dineshdharme/SparkRddShuffleIssue]

 

*Steps to reproduce the issue :* 

1] Clone the above repository.

2] Put the csvs in the "issue-data" folder in the above repository at a hadoop 
location "hdfs:///tree/dummy/data/"

3] Set the spark scratch directory (SPARK_LOCAL_DIRS) to a folder which has 
large space. (> *100 GB*)

4] Run "sbt assembly"

5] Run the following command at the project location :(

/path/to/spark-2.3.0-bin-hadoop2.7/bin/spark-submit \
--class spark.rddexample.dummyrdd.FunctionExecutor \
--master local[2] \
--deploy-mode client \
--executor-memory 2G \
--driver-memory 2G \
target/scala-2.11/rdd-shuffle-assembly-0.1.0.jar \
20 \
hdfs:///tree/dummy/data/ \
hdfs:///tree/dummy/results/   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24921) SparkStreaming steadily increasing job generation delay due to apparent URLClassLoader contention

2018-07-25 Thread Tommy S (JIRA)
Tommy S created SPARK-24921:
---

 Summary: SparkStreaming steadily increasing job generation delay 
due to apparent URLClassLoader contention
 Key: SPARK-24921
 URL: https://issues.apache.org/jira/browse/SPARK-24921
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 2.3.1
Reporter: Tommy S


I'm seeing an issue where the job generation time of my spark streaming job is 
steadily increasing after some time.

Looking at the thread dumps I see that the JobGenerator thread is BLOCKED 
waiting for URLClassPath.getLoader synchronized method:
{noformat}
"JobGenerator" #153 daemon prio=5 os_prio=0 tid=0x02dad800 nid=0x253c 
waiting for monitor entry [0x7f4b311c2000]
   java.lang.Thread.State: BLOCKED (on object monitor)
    at sun.misc.URLClassPath.getNextLoader(URLClassPath.java:469)
    - waiting to lock <0x7f4be023f940> (a sun.misc.URLClassPath)
    at sun.misc.URLClassPath.findResource(URLClassPath.java:214)
    at java.net.URLClassLoader$2.run(URLClassLoader.java:569)
    at java.net.URLClassLoader$2.run(URLClassLoader.java:567)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findResource(URLClassLoader.java:566)
    at java.lang.ClassLoader.getResource(ClassLoader.java:1096)
    at java.lang.ClassLoader.getResource(ClassLoader.java:1091)
    at java.net.URLClassLoader.getResourceAsStream(URLClassLoader.java:232)
    at java.lang.Class.getResourceAsStream(Class.java:2223)
    at 
org.apache.spark.util.ClosureCleaner$.getClassReader(ClosureCleaner.scala:40)
    at 
org.apache.spark.util.ClosureCleaner$.getInnerClosureClasses(ClosureCleaner.scala:84)
    at 
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:224)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2299)
    at 
org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKeyWithClassTag$1.apply(PairRDDFunctions.scala:89)
    at 
org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKeyWithClassTag$1.apply(PairRDDFunctions.scala:77)
    at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
    at 
org.apache.spark.rdd.PairRDDFunctions.combineByKeyWithClassTag(PairRDDFunctions.scala:77)
    at 
org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKey$1.apply(PairRDDFunctions.scala:119)
    at 
org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKey$1.apply(PairRDDFunctions.scala:119)
    at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
    at 
org.apache.spark.rdd.PairRDDFunctions.combineByKey(PairRDDFunctions.scala:117)
    at 
org.apache.spark.streaming.dstream.ShuffledDStream.compute(ShuffledDStream.scala:42)
    at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
    at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
    at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
    at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
    at 
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
    at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
    at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
    at scala.Option.orElse(Option.scala:289)
    at 
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
    at 
org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:36)
    at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
    at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
    at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
    at 

[jira] [Commented] (SPARK-24914) totalSize is not a good estimate for broadcast joins

2018-07-25 Thread Bruce Robbins (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16555998#comment-16555998
 ] 

Bruce Robbins commented on SPARK-24914:
---

[~irashid]
{quote}
given HIVE-20079, can we also have a conf to just ignore rawDataSize?
{quote}
That makes sense. I've updated the description.

> totalSize is not a good estimate for broadcast joins
> 
>
> Key: SPARK-24914
> URL: https://issues.apache.org/jira/browse/SPARK-24914
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Bruce Robbins
>Priority: Major
>
> When determining whether to do a broadcast join, Spark estimates the size of 
> the smaller table as follows:
>  - if totalSize is defined and greater than 0, use it.
>  - else, if rawDataSize is defined and greater than 0, use it
>  - else, use spark.sql.defaultSizeInBytes (default: Long.MaxValue)
> Therefore, Spark prefers totalSize over rawDataSize.
> Unfortunately, totalSize is often quite a bit smaller than the actual table 
> size, since it represents the size of the table's files on disk. Parquet and 
> Orc files, for example, are encoded and compressed. This can result in the 
> JVM throwing an OutOfMemoryError while Spark is loading the table into a 
> HashedRelation, or when Spark actually attempts to broadcast the data.
> On the other hand, rawDataSize represents the uncompressed size of the 
> dataset, according to Hive documentation. This seems like a pretty good 
> number to use in preference to totalSize. However, due to HIVE-20079, this 
> value is simply #columns * #rows. Once that bug is fixed, it may be a 
> superior statistic, at least for managed tables.
> In the meantime, we could apply a configurable "fudge factor" to totalSize, 
> at least for types of files that are encoded and compressed. Hive has the 
> setting hive.stats.deserialization.factor, which defaults to 1.0, and is 
> described as follows:
> {quote}in the absence of uncompressed/raw data size, total file size will be 
> used for statistics annotation. But the file may be compressed, encoded and 
> serialized which may be lesser in size than the actual uncompressed/raw data 
> size. This factor will be multiplied to file size to estimate the raw data 
> size.
> {quote}
> Also, I propose a configuration setting to allow the user to completely 
> ignore rawDataSize, since that value is broken (due to HIVE-20079). When that 
> configuration setting is set to true, Spark would instead estimate the table 
> as follows:
> - if totalSize is defined and greater than 0, use totalSize*fudgeFactor.
>  - else, use spark.sql.defaultSizeInBytes (default: Long.MaxValue)
> Caveat: This mitigates the issue only for Hive tables. It does not help much 
> when the user is reading files using {{spark.read.parquet}}, unless we apply 
> the same fudge factor there.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-24914) totalSize is not a good estimate for broadcast joins

2018-07-25 Thread Bruce Robbins (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24914?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruce Robbins updated SPARK-24914:
--
Description: 
When determining whether to do a broadcast join, Spark estimates the size of 
the smaller table as follows:
 - if totalSize is defined and greater than 0, use it.
 - else, if rawDataSize is defined and greater than 0, use it
 - else, use spark.sql.defaultSizeInBytes (default: Long.MaxValue)

Therefore, Spark prefers totalSize over rawDataSize.

Unfortunately, totalSize is often quite a bit smaller than the actual table 
size, since it represents the size of the table's files on disk. Parquet and 
Orc files, for example, are encoded and compressed. This can result in the JVM 
throwing an OutOfMemoryError while Spark is loading the table into a 
HashedRelation, or when Spark actually attempts to broadcast the data.

On the other hand, rawDataSize represents the uncompressed size of the dataset, 
according to Hive documentation. This seems like a pretty good number to use in 
preference to totalSize. However, due to HIVE-20079, this value is simply 
#columns * #rows. Once that bug is fixed, it may be a superior statistic, at 
least for managed tables.

In the meantime, we could apply a configurable "fudge factor" to totalSize, at 
least for types of files that are encoded and compressed. Hive has the setting 
hive.stats.deserialization.factor, which defaults to 1.0, and is described as 
follows:
{quote}in the absence of uncompressed/raw data size, total file size will be 
used for statistics annotation. But the file may be compressed, encoded and 
serialized which may be lesser in size than the actual uncompressed/raw data 
size. This factor will be multiplied to file size to estimate the raw data size.
{quote}
Also, I propose a configuration setting to allow the user to completely ignore 
rawDataSize, since that value is broken (due to HIVE-20079). When that 
configuration setting is set to true, Spark would instead estimate the table as 
follows:

- if totalSize is defined and greater than 0, use totalSize*fudgeFactor.
 - else, use spark.sql.defaultSizeInBytes (default: Long.MaxValue)

Caveat: This mitigates the issue only for Hive tables. It does not help much 
when the user is reading files using {{spark.read.parquet}}, unless we apply 
the same fudge factor there.

  was:
When determining whether to do a broadcast join, Spark estimates the size of 
the smaller table as follows:
 - if totalSize is defined and greater than 0, use it.
 - else, if rawDataSize is defined and greater than 0, use it
 - else, use spark.sql.defaultSizeInBytes (default: Long.MaxValue)

Therefore, Spark prefers totalSize over rawDataSize.

Unfortunately, totalSize is often quite a bit smaller than the actual table 
size, since it represents the size of the table's files on disk. Parquet and 
Orc files, for example, are encoded and compressed. This can result in the JVM 
throwing an OutOfMemoryError while Spark is loading the table into a 
HashedRelation, or when Spark actually attempts to broadcast the data.

On the other hand, rawDataSize represents the uncompressed size of the dataset, 
according to Hive documentation. This seems like a pretty good number to use in 
preference to totalSize. However, due to HIVE-20079, this value is simply 
#columns * #rows. Once that bug is fixed, it may be a superior statistic, at 
least for managed tables.

In the meantime, we could apply a configurable "fudge factor" to totalSize, at 
least for types of files that are encoded and compressed. Hive has the setting 
hive.stats.deserialization.factor, which defaults to 1.0, and is described as 
follows:
{quote}in the absence of uncompressed/raw data size, total file size will be 
used for statistics annotation. But the file may be compressed, encoded and 
serialized which may be lesser in size than the actual uncompressed/raw data 
size. This factor will be multiplied to file size to estimate the raw data size.
{quote}
In addition to the fudge factor, we could compare the adjusted totalSize to 
rawDataSize and use the bigger of the two.

Caveat: This mitigates the issue only for Hive tables. It does not help much 
when the user is reading files using {{spark.read.parquet}}, unless we apply 
the same fudge factor there.


> totalSize is not a good estimate for broadcast joins
> 
>
> Key: SPARK-24914
> URL: https://issues.apache.org/jira/browse/SPARK-24914
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Bruce Robbins
>Priority: Major
>
> When determining whether to do a broadcast join, Spark estimates the size of 
> the smaller table as follows:
>  - if totalSize is defined and greater than 0, use it.
>  - else, if rawDataSize is defined and 

[jira] [Created] (SPARK-24920) Spark should share netty's memory pools across all uses

2018-07-25 Thread Imran Rashid (JIRA)
Imran Rashid created SPARK-24920:


 Summary: Spark should share netty's memory pools across all uses
 Key: SPARK-24920
 URL: https://issues.apache.org/jira/browse/SPARK-24920
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 2.4.0
Reporter: Imran Rashid


Spark currently creates separate netty memory pools for each of the following 
"services":

1) RPC Client
2) RPC Server
3) BlockTransfer Client
4) BlockTransfer Server
5) ExternalShuffle Client

Depending on configuration and whether its an executor or driver JVM, different 
of these are active, but its always either 3 or 4.
Having them independent somewhat defeats the purpose of using pools at all.  In 
my experiments I've found each pool will grow due to a burst of activity in the 
related service (eg. task start / end msgs), followed another burst in a 
different service (eg. sending torrent broadcast blocks).  Because of the way 
these pools work, they allocate memory in large chunks (16 MB by default) for 
each netty thread, so there is often a surge of 128 MB of allocated memory, 
even for really tiny messages.  Also a lot of this memory is offheap by 
default, which makes it even tougher for users to manage.

I think it would make more sense to combine all of these into a single pool.  
In some experiments I tried, this noticeably decreased memory usage, both 
onheap and offheap (no significant performance effect in my small experiments).

As this is a pretty core change, as I first step I'd propose just exposing this 
as a conf, to let user experiment more broadly across a wider range of workloads



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24919) Scala linter rule for sparkContext.hadoopConfiguration

2018-07-25 Thread Gengliang Wang (JIRA)
Gengliang Wang created SPARK-24919:
--

 Summary: Scala linter rule for sparkContext.hadoopConfiguration
 Key: SPARK-24919
 URL: https://issues.apache.org/jira/browse/SPARK-24919
 Project: Spark
  Issue Type: Bug
  Components: Build
Affects Versions: 2.4.0
Reporter: Gengliang Wang


In most cases, we should use spark.sessionState.newHadoopConf() instead of 
sparkContext.hadoopConfiguration, so that the hadoop configurations specified 
in Spark session
configuration will come into effect.

Add a rule matching spark.sparkContext.hadoopConfiguration or 

spark.sqlContext.sparkContext.hadoopConfiguration to prevent the usage.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-24919) Scala linter rule for sparkContext.hadoopConfiguration

2018-07-25 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24919?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-24919:


Assignee: (was: Apache Spark)

> Scala linter rule for sparkContext.hadoopConfiguration
> --
>
> Key: SPARK-24919
> URL: https://issues.apache.org/jira/browse/SPARK-24919
> Project: Spark
>  Issue Type: Bug
>  Components: Build
>Affects Versions: 2.4.0
>Reporter: Gengliang Wang
>Priority: Major
>
> In most cases, we should use spark.sessionState.newHadoopConf() instead of 
> sparkContext.hadoopConfiguration, so that the hadoop configurations specified 
> in Spark session
> configuration will come into effect.
> Add a rule matching spark.sparkContext.hadoopConfiguration or 
> spark.sqlContext.sparkContext.hadoopConfiguration to prevent the usage.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24918) Executor Plugin API

2018-07-25 Thread Imran Rashid (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16555939#comment-16555939
 ] 

Imran Rashid commented on SPARK-24918:
--

[~jerryshao] [~tgraves] you might be interested in this -- I feel like this has 
come up in past discussions (though I couldn't find any jiras about it).

> Executor Plugin API
> ---
>
> Key: SPARK-24918
> URL: https://issues.apache.org/jira/browse/SPARK-24918
> Project: Spark
>  Issue Type: New Feature
>  Components: Spark Core
>Affects Versions: 2.4.0
>Reporter: Imran Rashid
>Priority: Major
>  Labels: memory-analysis
>
> It would be nice if we could specify an arbitrary class to run within each 
> executor for debugging and instrumentation.  Its hard to do this currently 
> because:
> a) you have no idea when executors will come and go with DynamicAllocation, 
> so don't have a chance to run custom code before the first task
> b) even with static allocation, you'd have to change the code of your spark 
> app itself to run a special task to "install" the plugin, which is often 
> tough in production cases when those maintaining regularly running 
> applications might not even know how to make changes to the application.
> For example, https://github.com/squito/spark-memory could be used in a 
> debugging context to understand memory use, just by re-running an application 
> with extra command line arguments (as opposed to rebuilding spark).
> I think one tricky part here is just deciding the api, and how its versioned. 
>  Does it just get created when the executor starts, and thats it?  Or does it 
> get more specific events, like task start, task end, etc?  Would we ever add 
> more events?  It should definitely be a {{DeveloperApi}}, so breaking 
> compatibility would be allowed ... but still should be avoided.  We could 
> create a base class that has no-op implementations, or explicitly version 
> everything.
> Note that this is not needed in the driver as we already have SparkListeners 
> (even if you don't care about the SparkListenerEvents and just want to 
> inspect objects in the JVM, its still good enough).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24918) Executor Plugin API

2018-07-25 Thread Imran Rashid (JIRA)
Imran Rashid created SPARK-24918:


 Summary: Executor Plugin API
 Key: SPARK-24918
 URL: https://issues.apache.org/jira/browse/SPARK-24918
 Project: Spark
  Issue Type: New Feature
  Components: Spark Core
Affects Versions: 2.4.0
Reporter: Imran Rashid


It would be nice if we could specify an arbitrary class to run within each 
executor for debugging and instrumentation.  Its hard to do this currently 
because:

a) you have no idea when executors will come and go with DynamicAllocation, so 
don't have a chance to run custom code before the first task
b) even with static allocation, you'd have to change the code of your spark app 
itself to run a special task to "install" the plugin, which is often tough in 
production cases when those maintaining regularly running applications might 
not even know how to make changes to the application.

For example, https://github.com/squito/spark-memory could be used in a 
debugging context to understand memory use, just by re-running an application 
with extra command line arguments (as opposed to rebuilding spark).

I think one tricky part here is just deciding the api, and how its versioned.  
Does it just get created when the executor starts, and thats it?  Or does it 
get more specific events, like task start, task end, etc?  Would we ever add 
more events?  It should definitely be a {{DeveloperApi}}, so breaking 
compatibility would be allowed ... but still should be avoided.  We could 
create a base class that has no-op implementations, or explicitly version 
everything.

Note that this is not needed in the driver as we already have SparkListeners 
(even if you don't care about the SparkListenerEvents and just want to inspect 
objects in the JVM, its still good enough).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24914) totalSize is not a good estimate for broadcast joins

2018-07-25 Thread Imran Rashid (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16555877#comment-16555877
 ] 

Imran Rashid commented on SPARK-24914:
--

given HIVE-20079, can we also have a conf to just ignore rawDataSize?  That 
estimate seems so bad you'd never want to use it.  Unfortunately I don't think 
spark will be able to tell whether its a "good" or "bad" estimate, so we would 
have to leave it to the user to control.

> totalSize is not a good estimate for broadcast joins
> 
>
> Key: SPARK-24914
> URL: https://issues.apache.org/jira/browse/SPARK-24914
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Bruce Robbins
>Priority: Major
>
> When determining whether to do a broadcast join, Spark estimates the size of 
> the smaller table as follows:
>  - if totalSize is defined and greater than 0, use it.
>  - else, if rawDataSize is defined and greater than 0, use it
>  - else, use spark.sql.defaultSizeInBytes (default: Long.MaxValue)
> Therefore, Spark prefers totalSize over rawDataSize.
> Unfortunately, totalSize is often quite a bit smaller than the actual table 
> size, since it represents the size of the table's files on disk. Parquet and 
> Orc files, for example, are encoded and compressed. This can result in the 
> JVM throwing an OutOfMemoryError while Spark is loading the table into a 
> HashedRelation, or when Spark actually attempts to broadcast the data.
> On the other hand, rawDataSize represents the uncompressed size of the 
> dataset, according to Hive documentation. This seems like a pretty good 
> number to use in preference to totalSize. However, due to HIVE-20079, this 
> value is simply #columns * #rows. Once that bug is fixed, it may be a 
> superior statistic, at least for managed tables.
> In the meantime, we could apply a configurable "fudge factor" to totalSize, 
> at least for types of files that are encoded and compressed. Hive has the 
> setting hive.stats.deserialization.factor, which defaults to 1.0, and is 
> described as follows:
> {quote}in the absence of uncompressed/raw data size, total file size will be 
> used for statistics annotation. But the file may be compressed, encoded and 
> serialized which may be lesser in size than the actual uncompressed/raw data 
> size. This factor will be multiplied to file size to estimate the raw data 
> size.
> {quote}
> In addition to the fudge factor, we could compare the adjusted totalSize to 
> rawDataSize and use the bigger of the two.
> Caveat: This mitigates the issue only for Hive tables. It does not help much 
> when the user is reading files using {{spark.read.parquet}}, unless we apply 
> the same fudge factor there.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-24920) Spark should allow sharing netty's memory pools across all uses

2018-07-25 Thread Imran Rashid (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24920?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Imran Rashid updated SPARK-24920:
-
Summary: Spark should allow sharing netty's memory pools across all uses  
(was: Spark should share netty's memory pools across all uses)

> Spark should allow sharing netty's memory pools across all uses
> ---
>
> Key: SPARK-24920
> URL: https://issues.apache.org/jira/browse/SPARK-24920
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.4.0
>Reporter: Imran Rashid
>Priority: Major
>  Labels: memory-analysis
>
> Spark currently creates separate netty memory pools for each of the following 
> "services":
> 1) RPC Client
> 2) RPC Server
> 3) BlockTransfer Client
> 4) BlockTransfer Server
> 5) ExternalShuffle Client
> Depending on configuration and whether its an executor or driver JVM, 
> different of these are active, but its always either 3 or 4.
> Having them independent somewhat defeats the purpose of using pools at all.  
> In my experiments I've found each pool will grow due to a burst of activity 
> in the related service (eg. task start / end msgs), followed another burst in 
> a different service (eg. sending torrent broadcast blocks).  Because of the 
> way these pools work, they allocate memory in large chunks (16 MB by default) 
> for each netty thread, so there is often a surge of 128 MB of allocated 
> memory, even for really tiny messages.  Also a lot of this memory is offheap 
> by default, which makes it even tougher for users to manage.
> I think it would make more sense to combine all of these into a single pool.  
> In some experiments I tried, this noticeably decreased memory usage, both 
> onheap and offheap (no significant performance effect in my small 
> experiments).
> As this is a pretty core change, as I first step I'd propose just exposing 
> this as a conf, to let user experiment more broadly across a wider range of 
> workloads



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-24919) Scala linter rule for sparkContext.hadoopConfiguration

2018-07-25 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24919?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-24919:


Assignee: Apache Spark

> Scala linter rule for sparkContext.hadoopConfiguration
> --
>
> Key: SPARK-24919
> URL: https://issues.apache.org/jira/browse/SPARK-24919
> Project: Spark
>  Issue Type: Bug
>  Components: Build
>Affects Versions: 2.4.0
>Reporter: Gengliang Wang
>Assignee: Apache Spark
>Priority: Major
>
> In most cases, we should use spark.sessionState.newHadoopConf() instead of 
> sparkContext.hadoopConfiguration, so that the hadoop configurations specified 
> in Spark session
> configuration will come into effect.
> Add a rule matching spark.sparkContext.hadoopConfiguration or 
> spark.sqlContext.sparkContext.hadoopConfiguration to prevent the usage.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24919) Scala linter rule for sparkContext.hadoopConfiguration

2018-07-25 Thread Apache Spark (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24919?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16555972#comment-16555972
 ] 

Apache Spark commented on SPARK-24919:
--

User 'gengliangwang' has created a pull request for this issue:
https://github.com/apache/spark/pull/21873

> Scala linter rule for sparkContext.hadoopConfiguration
> --
>
> Key: SPARK-24919
> URL: https://issues.apache.org/jira/browse/SPARK-24919
> Project: Spark
>  Issue Type: Bug
>  Components: Build
>Affects Versions: 2.4.0
>Reporter: Gengliang Wang
>Priority: Major
>
> In most cases, we should use spark.sessionState.newHadoopConf() instead of 
> sparkContext.hadoopConfiguration, so that the hadoop configurations specified 
> in Spark session
> configuration will come into effect.
> Add a rule matching spark.sparkContext.hadoopConfiguration or 
> spark.sqlContext.sparkContext.hadoopConfiguration to prevent the usage.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-24904) Join with broadcasted dataframe causes shuffle of redundant data

2018-07-25 Thread Marco Gaido (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24904?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16555652#comment-16555652
 ] 

Marco Gaido edited comment on SPARK-24904 at 7/25/18 1:28 PM:
--

I see now what you mean, but yes, I think there is an assumption you are doing 
which is not always true, ie. "The output is (expected to be) very small 
compared to the big table". That is not true. If all the rows from the big 
table match the small one, this is not the case. We may trying to do something 
like what you mentioned in the optimizer if CBO is enabled and we have good 
enough statistics about the output size of the inner join, but i am not sure.


was (Author: mgaido):
I see now what you mean, but yes, It think there is an assumption you are doing 
which is not always true, ie. "The output is (expected to be) very small 
compared to the big table". That is not true. If all the rows from the big 
table match the small one, this is not the case. We may trying to do something 
like what you mentioned in the optimizer if CBO is enabled and we have good 
enough statistics about the output size of the inner join, but i am not sure.

> Join with broadcasted dataframe causes shuffle of redundant data
> 
>
> Key: SPARK-24904
> URL: https://issues.apache.org/jira/browse/SPARK-24904
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.1.2
>Reporter: Shay Elbaz
>Priority: Minor
>
> When joining a "large" dataframe with broadcasted small one, and join-type is 
> on the small DF side (see right-join below), the physical plan falls back to 
> sort merge join. But when the join is on the large DF side, the broadcast 
> does take place. Is there a good reason for this? In the below example it 
> sure doesn't make any sense to shuffle the entire large table:
>  
> {code:java}
> val small = spark.range(1, 10)
> val big = spark.range(1, 1 << 30)
>   .withColumnRenamed("id", "id2")
> big.join(broadcast(small), $"id" === $"id2", "right")
> .explain
> //OUTPUT:
> == Physical Plan == 
> SortMergeJoin [id2#16307L], [id#16310L], RightOuter 
> :- *Sort [id2#16307L ASC NULLS FIRST], false, 0
>  :  +- Exchange hashpartitioning(id2#16307L, 1000)
>  : +- *Project [id#16304L AS id2#16307L]
>  :    +- *Range (1, 1073741824, step=1, splits=Some(600))
>  +- *Sort [id#16310L ASC NULLS FIRST], false, 0
>     +- Exchange hashpartitioning(id#16310L, 1000)
>    +- *Range (1, 10, step=1, splits=Some(600))
> {code}
> As a workaround, users need to perform inner instead of right join, and then 
> join the result back with the small DF to fill the missing rows.
>  
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24904) Join with broadcasted dataframe causes shuffle of redundant data

2018-07-25 Thread Shay Elbaz (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24904?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1678#comment-1678
 ] 

Shay Elbaz commented on SPARK-24904:


[~mgaido] Technically you *can* that, you just need an additional shuffle 
(after the map side join) to fill in the missing rows as you mentioned. And 
since the current implementation already shuffles, I don't see how it makes 
sense to involve the entire big table in the shuffle. Instead, Spark could do 
the following:
 # Broadcast the small table.
 # Just link inner join, load the big table and hash-join. 
The output is (expected to be) very small compared to the big table.
 # Keep the small table broadcasted, and shuffle the results from last stage 
(say, sort-merge).
 # Now on each task, fill in missing rows from the broadcasted table. This is 
trivial if using sort-merge and the broadcasted table is just another partition 
to merge. 

As I mentioned in the description, this is can be achieved by the user using 2 
joins, but shouldn't Spark offer this by default? Needless to say how sub 
optimal the current implementation is compared to the above plan. Am I missing 
something?

> Join with broadcasted dataframe causes shuffle of redundant data
> 
>
> Key: SPARK-24904
> URL: https://issues.apache.org/jira/browse/SPARK-24904
> Project: Spark
>  Issue Type: Question
>  Components: SQL
>Affects Versions: 2.1.2
>Reporter: Shay Elbaz
>Priority: Minor
>
> When joining a "large" dataframe with broadcasted small one, and join-type is 
> on the small DF side (see right-join below), the physical plan does not 
> include broadcasting the small table. But when the join is on the large DF 
> side, the broadcast does take place. Is there a good reason for this? In the 
> below example it sure doesn't make any sense to shuffle the entire large 
> table:
>  
> {code:java}
> val small = spark.range(1, 10)
> val big = spark.range(1, 1 << 30)
>   .withColumnRenamed("id", "id2")
> big.join(broadcast(small), $"id" === $"id2", "right")
> .explain
> //OUTPUT:
> == Physical Plan == 
> SortMergeJoin [id2#16307L], [id#16310L], RightOuter 
> :- *Sort [id2#16307L ASC NULLS FIRST], false, 0
>  :  +- Exchange hashpartitioning(id2#16307L, 1000)
>  : +- *Project [id#16304L AS id2#16307L]
>  :    +- *Range (1, 1073741824, step=1, splits=Some(600))
>  +- *Sort [id#16310L ASC NULLS FIRST], false, 0
>     +- Exchange hashpartitioning(id#16310L, 1000)
>    +- *Range (1, 10, step=1, splits=Some(600))
> {code}
> As a workaround, users need to perform inner instead of right join, and then 
> join the result back with the small DF to fill the missing rows.
>  
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-24904) Join with broadcasted dataframe causes shuffle of redundant data

2018-07-25 Thread Shay Elbaz (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24904?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shay Elbaz updated SPARK-24904:
---
Issue Type: Improvement  (was: Question)

> Join with broadcasted dataframe causes shuffle of redundant data
> 
>
> Key: SPARK-24904
> URL: https://issues.apache.org/jira/browse/SPARK-24904
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.1.2
>Reporter: Shay Elbaz
>Priority: Minor
>
> When joining a "large" dataframe with broadcasted small one, and join-type is 
> on the small DF side (see right-join below), the physical plan does not 
> include broadcasting the small table. But when the join is on the large DF 
> side, the broadcast does take place. Is there a good reason for this? In the 
> below example it sure doesn't make any sense to shuffle the entire large 
> table:
>  
> {code:java}
> val small = spark.range(1, 10)
> val big = spark.range(1, 1 << 30)
>   .withColumnRenamed("id", "id2")
> big.join(broadcast(small), $"id" === $"id2", "right")
> .explain
> //OUTPUT:
> == Physical Plan == 
> SortMergeJoin [id2#16307L], [id#16310L], RightOuter 
> :- *Sort [id2#16307L ASC NULLS FIRST], false, 0
>  :  +- Exchange hashpartitioning(id2#16307L, 1000)
>  : +- *Project [id#16304L AS id2#16307L]
>  :    +- *Range (1, 1073741824, step=1, splits=Some(600))
>  +- *Sort [id#16310L ASC NULLS FIRST], false, 0
>     +- Exchange hashpartitioning(id#16310L, 1000)
>    +- *Range (1, 10, step=1, splits=Some(600))
> {code}
> As a workaround, users need to perform inner instead of right join, and then 
> join the result back with the small DF to fill the missing rows.
>  
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-19018) spark csv writer charset support

2018-07-25 Thread Xiao Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-19018?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiao Li updated SPARK-19018:

Issue Type: Improvement  (was: Bug)

> spark csv writer charset support
> 
>
> Key: SPARK-19018
> URL: https://issues.apache.org/jira/browse/SPARK-19018
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Reporter: todd.chen
>Assignee: Carlos Peña
>Priority: Major
> Fix For: 2.4.0
>
>
> if we write dataFrame to csv ,default charset is utf-8,and we can't change it 
> like we read csv by giving `encoding ` in params ,so I think we should 
> support csv write by pass a param



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24904) Join with broadcasted dataframe causes shuffle of redundant data

2018-07-25 Thread Marco Gaido (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24904?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16555842#comment-16555842
 ] 

Marco Gaido commented on SPARK-24904:
-

[~shay_elbaz] In the case I mentioned before the approach you proposed is not 
better, it is worse, as it requires an unneeded additional broadcast join.

> Join with broadcasted dataframe causes shuffle of redundant data
> 
>
> Key: SPARK-24904
> URL: https://issues.apache.org/jira/browse/SPARK-24904
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.1.2
>Reporter: Shay Elbaz
>Priority: Minor
>
> When joining a "large" dataframe with broadcasted small one, and join-type is 
> on the small DF side (see right-join below), the physical plan falls back to 
> sort merge join. But when the join is on the large DF side, the broadcast 
> does take place. Is there a good reason for this? In the below example it 
> sure doesn't make any sense to shuffle the entire large table:
>  
> {code:java}
> val small = spark.range(1, 10)
> val big = spark.range(1, 1 << 30)
>   .withColumnRenamed("id", "id2")
> big.join(broadcast(small), $"id" === $"id2", "right")
> .explain
> //OUTPUT:
> == Physical Plan == 
> SortMergeJoin [id2#16307L], [id#16310L], RightOuter 
> :- *Sort [id2#16307L ASC NULLS FIRST], false, 0
>  :  +- Exchange hashpartitioning(id2#16307L, 1000)
>  : +- *Project [id#16304L AS id2#16307L]
>  :    +- *Range (1, 1073741824, step=1, splits=Some(600))
>  +- *Sort [id#16310L ASC NULLS FIRST], false, 0
>     +- Exchange hashpartitioning(id#16310L, 1000)
>    +- *Range (1, 10, step=1, splits=Some(600))
> {code}
> As a workaround, users need to perform inner instead of right join, and then 
> join the result back with the small DF to fill the missing rows.
>  
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-24917) Sending a partition over netty results in 2x memory usage

2018-07-25 Thread Vincent (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24917?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vincent updated SPARK-24917:

Description: 
Hello

while investigating some OOM errors in Spark 2.2 [(here's my call 
stack)|https://image.ibb.co/hHa2R8/sparkOOM.png], I find the following behavior 
happening, which I think is weird:
 * a request happens to send a partition over network
 * this partition is 1.9 GB and is persisted in memory
 * this partition is apparently stored in a ByteBufferBlockData, that is made 
of a ChunkedByteBuffer, which is a list of (lots of) ByteBuffer of 4 MB each.
 * the call to toNetty() is supposed to only wrap all the arrays and not 
allocate any memory
 * yet the call stack shows that netty is allocating memory and is trying to 
consolidate all the chunks into one big 1.9GB array
 * this means that at this point the memory footprint is 2x the size of the 
actual partition (which is huge when the partition is 1.9GB)

Is this transient allocation expected?

After digging, it turns out that the actual copy is due to [this 
method|https://github.com/netty/netty/blob/4.0/buffer/src/main/java/io/netty/buffer/Unpooled.java#L260]
 in netty. If my initial buffer is made of more than DEFAULT_MAX_COMPONENTS 
(16) components it will trigger a re-allocation of all the buffer. This netty 
issue was fixed in this recent change : 
[https://github.com/netty/netty/commit/9b95b8ee628983e3e4434da93fffb893edff4aa2]

 

As a result, is it possible to benefit from this change somehow in spark 2.2 
and above? I don't know how the netty dependencies are handled for spark

 

NB: it seems this ticket: [https://jira.apache.org/jira/browse/SPARK-24307] 
kinda changed the approach for spark 2.4 by bypassing netty buffer altogether. 
However as it is written in the ticket, this approach *still* needs to have the 
*entire* block serialized in memory, so this would be a downgrade from fixing 
the netty issue when your buffer in <  2GB

 

Thanks!

 

 

  was:
Hello

while investigating some OOM errors in Spark 2.2 [(here's my call 
stack)|https://image.ibb.co/hHa2R8/sparkOOM.png], I find the following behavior 
happening, which I think is weird:
 * a request happens to send a partition over network
 * this partition is 1.9 GB and is persisted in memory
 * this partition is apparently stored in a ByteBufferBlockData, that is made 
of a ChunkedByteBuffer, which is a list of (lots of) ByteBuffer of 4 MB each.
 * the call to toNetty() is supposed to only wrap all the arrays and not 
allocate any memory
 * yet the call stack shows that netty is allocating memory and is trying to 
consolidate all the chunks into one big 1.9GB array
 * this means that at this point the memory footprint is 2x the size of the 
actual partition (which is huge when the partition is 1.9GB)

Is this transient allocation expected?

After digging, it turns out that the actual copy is due to [this 
method|https://github.com/netty/netty/blob/4.0/buffer/src/main/java/io/netty/buffer/Unpooled.java#L260]
 in netty. If my initial buffer is made of more than DEFAULT_MAX_COMPONENTS 
(16) components it will trigger a re-allocation of all the buffer. This netty 
issue was fixed in this recent change : 
[https://github.com/netty/netty/commit/9b95b8ee628983e3e4434da93fffb893edff4aa2]

 

As a result, is it possible to benefit from this change somehow in spark 2.2 
and above? I don't know how the netty dependencies are handled for spark

 

NB: it seems this ticket: [https://jira.apache.org/jira/browse/SPARK-24307] 
fixes the issue for spark 2.4 by bypassing netty buffer altogether

 

Thanks!

 

 


> Sending a partition over netty results in 2x memory usage
> -
>
> Key: SPARK-24917
> URL: https://issues.apache.org/jira/browse/SPARK-24917
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.2.2
>Reporter: Vincent
>Priority: Major
>
> Hello
> while investigating some OOM errors in Spark 2.2 [(here's my call 
> stack)|https://image.ibb.co/hHa2R8/sparkOOM.png], I find the following 
> behavior happening, which I think is weird:
>  * a request happens to send a partition over network
>  * this partition is 1.9 GB and is persisted in memory
>  * this partition is apparently stored in a ByteBufferBlockData, that is made 
> of a ChunkedByteBuffer, which is a list of (lots of) ByteBuffer of 4 MB each.
>  * the call to toNetty() is supposed to only wrap all the arrays and not 
> allocate any memory
>  * yet the call stack shows that netty is allocating memory and is trying to 
> consolidate all the chunks into one big 1.9GB array
>  * this means that at this point the memory footprint is 2x the size of the 
> actual partition (which is huge when the partition is 1.9GB)
> Is this transient allocation expected?
> 

[jira] [Updated] (SPARK-24917) Sending a partition over netty results in 2x memory usage

2018-07-25 Thread Vincent (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24917?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vincent updated SPARK-24917:

Description: 
Hello

while investigating some OOM errors in Spark 2.2 [(here's my call 
stack)|https://image.ibb.co/hHa2R8/sparkOOM.png], I find the following behavior 
happening, which I think is weird:
 * a request happens to send a partition over network
 * this partition is 1.9 GB and is persisted in memory
 * this partition is apparently stored in a ByteBufferBlockData, that is made 
of a ChunkedByteBuffer, which is a list of (lots of) ByteBuffer of 4 MB each.
 * the call to toNetty() is supposed to only wrap all the arrays and not 
allocate any memory
 * yet the call stack shows that netty is allocating memory and is trying to 
consolidate all the chunks into one big 1.9GB array
 * this means that at this point the memory footprint is 2x the size of the 
actual partition (which is huge when the partition is 1.9GB)

Is this transient allocation expected?

After digging, it turns out that the actual copy is due to [this 
method|https://github.com/netty/netty/blob/4.0/buffer/src/main/java/io/netty/buffer/Unpooled.java#L260]
 in netty. If my initial buffer is made of more than DEFAULT_MAX_COMPONENTS 
(16) components it will trigger a re-allocation of all the buffer. This netty 
issue was fixed in this recent change : 
[https://github.com/netty/netty/commit/9b95b8ee628983e3e4434da93fffb893edff4aa2]

 

As a result, is it possible to benefit from this change somehow in spark 2.2 
and above? I don't know how the netty dependencies are handled for spark

 

NB: it seems this ticket: [https://jira.apache.org/jira/browse/SPARK-24307] 
fixes the issue for spark 2.4 by bypassing netty buffer altogether

 

Thanks!

 

 

  was:
Hello

while investigating some OOM errors in Spark 2.2 [(here's my call 
stack)|https://image.ibb.co/hHa2R8/sparkOOM.png], I find the following behavior 
happening, which I think is weird:
 * a request happens to send a partition over network
 * this partition is 1.9 GB and is persisted in memory
 * this partition is apparently stored in a ByteBufferBlockData, that is made 
of a ChunkedByteBuffer, which is a list of (lots of) ByteBuffer of 4 MB each.
 * the call to toNetty() is supposed to only wrap all the arrays and not 
allocate any memory
 * yet the call stack shows that netty is allocating memory and is trying to 
consolidate all the chunks into one big 1.9GB array
 * this means that at this point the memory footprint is 2x the size of the 
actual partition (which is huge when the partition is 1.9GB)

Is this transient allocation expected?

After digging, it turns out that the actual copy is due to [this 
method|https://github.com/netty/netty/blob/4.0/buffer/src/main/java/io/netty/buffer/Unpooled.java#L260]
 in netty. If my initial buffer is made of more than DEFAULT_MAX_COMPONENTS 
(16) components it will trigger a re-allocation of all the buffer. This netty 
issue was fixed in this recent change : 
[https://github.com/netty/netty/commit/9b95b8ee628983e3e4434da93fffb893edff4aa2]

 

As a result, is it possible to benefit from this change somehow in spark 2.2 
and above? I don't know how the netty dependencies are handled for spark

 

Thanks!

 

 


> Sending a partition over netty results in 2x memory usage
> -
>
> Key: SPARK-24917
> URL: https://issues.apache.org/jira/browse/SPARK-24917
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.2.2
>Reporter: Vincent
>Priority: Major
>
> Hello
> while investigating some OOM errors in Spark 2.2 [(here's my call 
> stack)|https://image.ibb.co/hHa2R8/sparkOOM.png], I find the following 
> behavior happening, which I think is weird:
>  * a request happens to send a partition over network
>  * this partition is 1.9 GB and is persisted in memory
>  * this partition is apparently stored in a ByteBufferBlockData, that is made 
> of a ChunkedByteBuffer, which is a list of (lots of) ByteBuffer of 4 MB each.
>  * the call to toNetty() is supposed to only wrap all the arrays and not 
> allocate any memory
>  * yet the call stack shows that netty is allocating memory and is trying to 
> consolidate all the chunks into one big 1.9GB array
>  * this means that at this point the memory footprint is 2x the size of the 
> actual partition (which is huge when the partition is 1.9GB)
> Is this transient allocation expected?
> After digging, it turns out that the actual copy is due to [this 
> method|https://github.com/netty/netty/blob/4.0/buffer/src/main/java/io/netty/buffer/Unpooled.java#L260]
>  in netty. If my initial buffer is made of more than DEFAULT_MAX_COMPONENTS 
> (16) components it will trigger a re-allocation of all the buffer. This netty 
> issue was fixed in this recent 

[jira] [Created] (SPARK-24917) Sending a partition over netty results in 2x memory usage

2018-07-25 Thread Vincent (JIRA)
Vincent created SPARK-24917:
---

 Summary: Sending a partition over netty results in 2x memory usage
 Key: SPARK-24917
 URL: https://issues.apache.org/jira/browse/SPARK-24917
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 2.2.2
Reporter: Vincent


Hello

while investigating some OOM errors in Spark 2.2 [(here's my call 
stack)|https://image.ibb.co/hHa2R8/sparkOOM.png], I find the following behavior 
happening, which I think is weird:
 * a request happens to send a partition over network
 * this partition is 1.9 GB and is persisted in memory
 * this partition is apparently stored in a ByteBufferBlockData, that is made 
of a ChunkedByteBuffer, which is a list of (lots of) ByteBuffer of 4 MB each.
 * the call to toNetty() is supposed to only wrap all the arrays and not 
allocate any memory
 * yet the call stack shows that netty is allocating memory and is trying to 
consolidate all the chunks into one big 1.9GB array
 * this means that at this point the memory footprint is 2x the size of the 
actual partition (which is huge when the partition is 1.9GB)

Is this transient allocation expected?

After digging, it turns out that the actual copy is due to [this 
method|https://github.com/netty/netty/blob/4.0/buffer/src/main/java/io/netty/buffer/Unpooled.java#L260]
 in netty. If my initial buffer is made of more than DEFAULT_MAX_COMPONENTS 
(16) components it will trigger a re-allocation of all the buffer. This netty 
issue was fixed in this recent change : 
[https://github.com/netty/netty/commit/9b95b8ee628983e3e4434da93fffb893edff4aa2]

 

As a result, is it possible to benefit from this change somehow in spark 2.2 
and above? I don't know how the netty dependencies are handled for spark

 

Thanks!

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24904) Join with broadcasted dataframe causes shuffle of redundant data

2018-07-25 Thread Shay Elbaz (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24904?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16555769#comment-16555769
 ] 

Shay Elbaz commented on SPARK-24904:


[~mgaido] indeed this assumption is not always true. However the map side 
result will always be smaller or equal to the big table, that's why I think 
this approach is better. Kind of filter push down :)

> Join with broadcasted dataframe causes shuffle of redundant data
> 
>
> Key: SPARK-24904
> URL: https://issues.apache.org/jira/browse/SPARK-24904
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.1.2
>Reporter: Shay Elbaz
>Priority: Minor
>
> When joining a "large" dataframe with broadcasted small one, and join-type is 
> on the small DF side (see right-join below), the physical plan falls back to 
> sort merge join. But when the join is on the large DF side, the broadcast 
> does take place. Is there a good reason for this? In the below example it 
> sure doesn't make any sense to shuffle the entire large table:
>  
> {code:java}
> val small = spark.range(1, 10)
> val big = spark.range(1, 1 << 30)
>   .withColumnRenamed("id", "id2")
> big.join(broadcast(small), $"id" === $"id2", "right")
> .explain
> //OUTPUT:
> == Physical Plan == 
> SortMergeJoin [id2#16307L], [id#16310L], RightOuter 
> :- *Sort [id2#16307L ASC NULLS FIRST], false, 0
>  :  +- Exchange hashpartitioning(id2#16307L, 1000)
>  : +- *Project [id#16304L AS id2#16307L]
>  :    +- *Range (1, 1073741824, step=1, splits=Some(600))
>  +- *Sort [id#16310L ASC NULLS FIRST], false, 0
>     +- Exchange hashpartitioning(id#16310L, 1000)
>    +- *Range (1, 10, step=1, splits=Some(600))
> {code}
> As a workaround, users need to perform inner instead of right join, and then 
> join the result back with the small DF to fill the missing rows.
>  
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24904) Join with broadcasted dataframe causes shuffle of redundant data

2018-07-25 Thread Marco Gaido (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24904?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16555652#comment-16555652
 ] 

Marco Gaido commented on SPARK-24904:
-

I see now what you mean, but yes, It think there is an assumption you are doing 
which is not always true, ie. "The output is (expected to be) very small 
compared to the big table". That is not true. If all the rows from the big 
table match the small one, this is not the case. We may trying to do something 
like what you mentioned in the optimizer if CBO is enabled and we have good 
enough statistics about the output size of the inner join, but i am not sure.

> Join with broadcasted dataframe causes shuffle of redundant data
> 
>
> Key: SPARK-24904
> URL: https://issues.apache.org/jira/browse/SPARK-24904
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.1.2
>Reporter: Shay Elbaz
>Priority: Minor
>
> When joining a "large" dataframe with broadcasted small one, and join-type is 
> on the small DF side (see right-join below), the physical plan falls back to 
> sort merge join. But when the join is on the large DF side, the broadcast 
> does take place. Is there a good reason for this? In the below example it 
> sure doesn't make any sense to shuffle the entire large table:
>  
> {code:java}
> val small = spark.range(1, 10)
> val big = spark.range(1, 1 << 30)
>   .withColumnRenamed("id", "id2")
> big.join(broadcast(small), $"id" === $"id2", "right")
> .explain
> //OUTPUT:
> == Physical Plan == 
> SortMergeJoin [id2#16307L], [id#16310L], RightOuter 
> :- *Sort [id2#16307L ASC NULLS FIRST], false, 0
>  :  +- Exchange hashpartitioning(id2#16307L, 1000)
>  : +- *Project [id#16304L AS id2#16307L]
>  :    +- *Range (1, 1073741824, step=1, splits=Some(600))
>  +- *Sort [id#16310L ASC NULLS FIRST], false, 0
>     +- Exchange hashpartitioning(id#16310L, 1000)
>    +- *Range (1, 10, step=1, splits=Some(600))
> {code}
> As a workaround, users need to perform inner instead of right join, and then 
> join the result back with the small DF to fill the missing rows.
>  
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-24904) Join with broadcasted dataframe causes shuffle of redundant data

2018-07-25 Thread Shay Elbaz (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24904?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shay Elbaz updated SPARK-24904:
---
Description: 
When joining a "large" dataframe with broadcasted small one, and join-type is 
on the small DF side (see right-join below), the physical plan falls back to 
sort merge join. But when the join is on the large DF side, the broadcast does 
take place. Is there a good reason for this? In the below example it sure 
doesn't make any sense to shuffle the entire large table:

 
{code:java}
val small = spark.range(1, 10)
val big = spark.range(1, 1 << 30)
  .withColumnRenamed("id", "id2")

big.join(broadcast(small), $"id" === $"id2", "right")
.explain

//OUTPUT:
== Physical Plan == 
SortMergeJoin [id2#16307L], [id#16310L], RightOuter 
:- *Sort [id2#16307L ASC NULLS FIRST], false, 0
 :  +- Exchange hashpartitioning(id2#16307L, 1000)
 : +- *Project [id#16304L AS id2#16307L]
 :    +- *Range (1, 1073741824, step=1, splits=Some(600))
 +- *Sort [id#16310L ASC NULLS FIRST], false, 0
    +- Exchange hashpartitioning(id#16310L, 1000)
   +- *Range (1, 10, step=1, splits=Some(600))
{code}
As a workaround, users need to perform inner instead of right join, and then 
join the result back with the small DF to fill the missing rows.

 

 

 

 

  was:
When joining a "large" dataframe with broadcasted small one, and join-type is 
on the small DF side (see right-join below), the physical plan does not include 
broadcasting the small table. But when the join is on the large DF side, the 
broadcast does take place. Is there a good reason for this? In the below 
example it sure doesn't make any sense to shuffle the entire large table:

 
{code:java}
val small = spark.range(1, 10)
val big = spark.range(1, 1 << 30)
  .withColumnRenamed("id", "id2")

big.join(broadcast(small), $"id" === $"id2", "right")
.explain

//OUTPUT:
== Physical Plan == 
SortMergeJoin [id2#16307L], [id#16310L], RightOuter 
:- *Sort [id2#16307L ASC NULLS FIRST], false, 0
 :  +- Exchange hashpartitioning(id2#16307L, 1000)
 : +- *Project [id#16304L AS id2#16307L]
 :    +- *Range (1, 1073741824, step=1, splits=Some(600))
 +- *Sort [id#16310L ASC NULLS FIRST], false, 0
    +- Exchange hashpartitioning(id#16310L, 1000)
   +- *Range (1, 10, step=1, splits=Some(600))
{code}
As a workaround, users need to perform inner instead of right join, and then 
join the result back with the small DF to fill the missing rows.

 

 

 

 


> Join with broadcasted dataframe causes shuffle of redundant data
> 
>
> Key: SPARK-24904
> URL: https://issues.apache.org/jira/browse/SPARK-24904
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.1.2
>Reporter: Shay Elbaz
>Priority: Minor
>
> When joining a "large" dataframe with broadcasted small one, and join-type is 
> on the small DF side (see right-join below), the physical plan falls back to 
> sort merge join. But when the join is on the large DF side, the broadcast 
> does take place. Is there a good reason for this? In the below example it 
> sure doesn't make any sense to shuffle the entire large table:
>  
> {code:java}
> val small = spark.range(1, 10)
> val big = spark.range(1, 1 << 30)
>   .withColumnRenamed("id", "id2")
> big.join(broadcast(small), $"id" === $"id2", "right")
> .explain
> //OUTPUT:
> == Physical Plan == 
> SortMergeJoin [id2#16307L], [id#16310L], RightOuter 
> :- *Sort [id2#16307L ASC NULLS FIRST], false, 0
>  :  +- Exchange hashpartitioning(id2#16307L, 1000)
>  : +- *Project [id#16304L AS id2#16307L]
>  :    +- *Range (1, 1073741824, step=1, splits=Some(600))
>  +- *Sort [id#16310L ASC NULLS FIRST], false, 0
>     +- Exchange hashpartitioning(id#16310L, 1000)
>    +- *Range (1, 10, step=1, splits=Some(600))
> {code}
> As a workaround, users need to perform inner instead of right join, and then 
> join the result back with the small DF to fill the missing rows.
>  
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24916) Fix type coercion for IN expression with subquery

2018-07-25 Thread Apache Spark (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24916?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16555435#comment-16555435
 ] 

Apache Spark commented on SPARK-24916:
--

User 'wangyum' has created a pull request for this issue:
https://github.com/apache/spark/pull/21871

> Fix type coercion for IN expression with subquery
> -
>
> Key: SPARK-24916
> URL: https://issues.apache.org/jira/browse/SPARK-24916
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0, 2.3.0
>Reporter: Yuming Wang
>Priority: Major
>
> How to reproduce:
> {code:sql}
> CREATE TEMPORARY VIEW t4 AS SELECT * FROM VALUES
>   (CAST(1 AS DOUBLE), CAST(2 AS STRING), CAST(3 AS STRING))
> AS t1(t4a, t4b, t4c);
> CREATE TEMPORARY VIEW t5 AS SELECT * FROM VALUES
>   (CAST(1 AS DECIMAL(18, 0)), CAST(2 AS STRING), CAST(3 AS BIGINT))
> AS t1(t5a, t5b, t5c);
> SELECT * FROM t4
> WHERE
> (t4a, t4b, t4c) IN (SELECT t5a,
>t5b,
>t5c
> FROM t5);
> {code}
> Will throw exception:
> {noformat}
> org.apache.spark.sql.AnalysisException
> cannot resolve '(named_struct('t4a', t4.`t4a`, 't4b', t4.`t4b`, 't4c', 
> t4.`t4c`) IN (listquery()))' due to data type mismatch: 
> The data type of one or more elements in the left hand side of an IN subquery
> is not compatible with the data type of the output of the subquery
> Mismatched columns:
> [(t4.`t4a`:double, t5.`t5a`:decimal(18,0)), (t4.`t4c`:string, 
> t5.`t5c`:bigint)]
> Left side:
> [double, string, string].
> Right side:
> [decimal(18,0), string, bigint].;
> {noformat}
> But it success on Spark 2.1.x.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-24916) Fix type coercion for IN expression with subquery

2018-07-25 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24916?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-24916:


Assignee: (was: Apache Spark)

> Fix type coercion for IN expression with subquery
> -
>
> Key: SPARK-24916
> URL: https://issues.apache.org/jira/browse/SPARK-24916
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0, 2.3.0
>Reporter: Yuming Wang
>Priority: Major
>
> How to reproduce:
> {code:sql}
> CREATE TEMPORARY VIEW t4 AS SELECT * FROM VALUES
>   (CAST(1 AS DOUBLE), CAST(2 AS STRING), CAST(3 AS STRING))
> AS t1(t4a, t4b, t4c);
> CREATE TEMPORARY VIEW t5 AS SELECT * FROM VALUES
>   (CAST(1 AS DECIMAL(18, 0)), CAST(2 AS STRING), CAST(3 AS BIGINT))
> AS t1(t5a, t5b, t5c);
> SELECT * FROM t4
> WHERE
> (t4a, t4b, t4c) IN (SELECT t5a,
>t5b,
>t5c
> FROM t5);
> {code}
> Will throw exception:
> {noformat}
> org.apache.spark.sql.AnalysisException
> cannot resolve '(named_struct('t4a', t4.`t4a`, 't4b', t4.`t4b`, 't4c', 
> t4.`t4c`) IN (listquery()))' due to data type mismatch: 
> The data type of one or more elements in the left hand side of an IN subquery
> is not compatible with the data type of the output of the subquery
> Mismatched columns:
> [(t4.`t4a`:double, t5.`t5a`:decimal(18,0)), (t4.`t4c`:string, 
> t5.`t5c`:bigint)]
> Left side:
> [double, string, string].
> Right side:
> [decimal(18,0), string, bigint].;
> {noformat}
> But it success on Spark 2.1.x.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-24916) Fix type coercion for IN expression with subquery

2018-07-25 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24916?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-24916:


Assignee: Apache Spark

> Fix type coercion for IN expression with subquery
> -
>
> Key: SPARK-24916
> URL: https://issues.apache.org/jira/browse/SPARK-24916
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0, 2.3.0
>Reporter: Yuming Wang
>Assignee: Apache Spark
>Priority: Major
>
> How to reproduce:
> {code:sql}
> CREATE TEMPORARY VIEW t4 AS SELECT * FROM VALUES
>   (CAST(1 AS DOUBLE), CAST(2 AS STRING), CAST(3 AS STRING))
> AS t1(t4a, t4b, t4c);
> CREATE TEMPORARY VIEW t5 AS SELECT * FROM VALUES
>   (CAST(1 AS DECIMAL(18, 0)), CAST(2 AS STRING), CAST(3 AS BIGINT))
> AS t1(t5a, t5b, t5c);
> SELECT * FROM t4
> WHERE
> (t4a, t4b, t4c) IN (SELECT t5a,
>t5b,
>t5c
> FROM t5);
> {code}
> Will throw exception:
> {noformat}
> org.apache.spark.sql.AnalysisException
> cannot resolve '(named_struct('t4a', t4.`t4a`, 't4b', t4.`t4b`, 't4c', 
> t4.`t4c`) IN (listquery()))' due to data type mismatch: 
> The data type of one or more elements in the left hand side of an IN subquery
> is not compatible with the data type of the output of the subquery
> Mismatched columns:
> [(t4.`t4a`:double, t5.`t5a`:decimal(18,0)), (t4.`t4c`:string, 
> t5.`t5c`:bigint)]
> Left side:
> [double, string, string].
> Right side:
> [decimal(18,0), string, bigint].;
> {noformat}
> But it success on Spark 2.1.x.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24916) Fix type coercion for IN expression with subquery

2018-07-25 Thread Yuming Wang (JIRA)
Yuming Wang created SPARK-24916:
---

 Summary: Fix type coercion for IN expression with subquery
 Key: SPARK-24916
 URL: https://issues.apache.org/jira/browse/SPARK-24916
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.3.0, 2.2.0
Reporter: Yuming Wang


How to reproduce:
{code:sql}
CREATE TEMPORARY VIEW t4 AS SELECT * FROM VALUES
  (CAST(1 AS DOUBLE), CAST(2 AS STRING), CAST(3 AS STRING))
AS t1(t4a, t4b, t4c);

CREATE TEMPORARY VIEW t5 AS SELECT * FROM VALUES
  (CAST(1 AS DECIMAL(18, 0)), CAST(2 AS STRING), CAST(3 AS BIGINT))
AS t1(t5a, t5b, t5c);

SELECT * FROM t4
WHERE
(t4a, t4b, t4c) IN (SELECT t5a,
   t5b,
   t5c
FROM t5);
{code}
Will throw exception:

{noformat}
org.apache.spark.sql.AnalysisException
cannot resolve '(named_struct('t4a', t4.`t4a`, 't4b', t4.`t4b`, 't4c', 
t4.`t4c`) IN (listquery()))' due to data type mismatch: 
The data type of one or more elements in the left hand side of an IN subquery
is not compatible with the data type of the output of the subquery
Mismatched columns:
[(t4.`t4a`:double, t5.`t5a`:decimal(18,0)), (t4.`t4c`:string, t5.`t5c`:bigint)]
Left side:
[double, string, string].
Right side:
[decimal(18,0), string, bigint].;
{noformat}
But it success on Spark 2.1.x.






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21063) Spark return an empty result from remote hadoop cluster

2018-07-25 Thread nick (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-21063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16555403#comment-16555403
 ] 

nick commented on SPARK-21063:
--

[~paulstaab]

It does work when both registering the dialect and using fetchsize, but how can 
we get all the data (don't set "fetchsize") ?

> Spark return an empty result from remote hadoop cluster
> ---
>
> Key: SPARK-21063
> URL: https://issues.apache.org/jira/browse/SPARK-21063
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core, SQL
>Affects Versions: 2.1.0, 2.1.1
>Reporter: Peter Bykov
>Priority: Major
>
> Spark returning empty result from when querying remote hadoop cluster.
> All firewall settings removed.
> Querying using JDBC working properly using hive-jdbc driver from version 1.1.1
> Code snippet is:
> {code:java}
> val spark = SparkSession.builder
> .appName("RemoteSparkTest")
> .master("local")
> .getOrCreate()
> val df = spark.read
>   .option("url", "jdbc:hive2://remote.hive.local:1/default")
>   .option("user", "user")
>   .option("password", "pass")
>   .option("dbtable", "test_table")
>   .option("driver", "org.apache.hive.jdbc.HiveDriver")
>   .format("jdbc")
>   .load()
>  
> df.show()
> {code}
> Result:
> {noformat}
> +---+
> |test_table.test_col|
> +---+
> +---+
> {noformat}
> All manipulations like: 
> {code:java}
> df.select(*).show()
> {code}
> returns empty result too.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24904) Join with broadcasted dataframe causes shuffle of redundant data

2018-07-25 Thread Marco Gaido (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24904?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16555477#comment-16555477
 ] 

Marco Gaido commented on SPARK-24904:
-

You cannot do a broadcast join when it is on the side of the small table, as 
the join requires to compare each row of the small table with the whole big 
table and output it into the result if it is not met. Since the big table is 
available only in small pieces in each task, no task can determine whether the 
row matched at least once (as it doesn't know what other tasks did).

> Join with broadcasted dataframe causes shuffle of redundant data
> 
>
> Key: SPARK-24904
> URL: https://issues.apache.org/jira/browse/SPARK-24904
> Project: Spark
>  Issue Type: Question
>  Components: SQL
>Affects Versions: 2.1.2
>Reporter: Shay Elbaz
>Priority: Minor
>
> When joining a "large" dataframe with broadcasted small one, and join-type is 
> on the small DF side (see right-join below), the physical plan does not 
> include broadcasting the small table. But when the join is on the large DF 
> side, the broadcast does take place. Is there a good reason for this? In the 
> below example it sure doesn't make any sense to shuffle the entire large 
> table:
>  
> {code:java}
> val small = spark.range(1, 10)
> val big = spark.range(1, 1 << 30)
>   .withColumnRenamed("id", "id2")
> big.join(broadcast(small), $"id" === $"id2", "right")
> .explain
> //OUTPUT:
> == Physical Plan == 
> SortMergeJoin [id2#16307L], [id#16310L], RightOuter 
> :- *Sort [id2#16307L ASC NULLS FIRST], false, 0
>  :  +- Exchange hashpartitioning(id2#16307L, 1000)
>  : +- *Project [id#16304L AS id2#16307L]
>  :    +- *Range (1, 1073741824, step=1, splits=Some(600))
>  +- *Sort [id#16310L ASC NULLS FIRST], false, 0
>     +- Exchange hashpartitioning(id#16310L, 1000)
>    +- *Range (1, 10, step=1, splits=Some(600))
> {code}
> As a workaround, users need to perform inner instead of right join, and then 
> join the result back with the small DF to fill the missing rows.
>  
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-19018) spark csv writer charset support

2018-07-25 Thread Hyukjin Kwon (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-19018?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon reassigned SPARK-19018:


Assignee: Carlos Peña

> spark csv writer charset support
> 
>
> Key: SPARK-19018
> URL: https://issues.apache.org/jira/browse/SPARK-19018
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Reporter: todd.chen
>Assignee: Carlos Peña
>Priority: Major
> Fix For: 2.4.0
>
>
> if we write dataFrame to csv ,default charset is utf-8,and we can't change it 
> like we read csv by giving `encoding ` in params ,so I think we should 
> support csv write by pass a param



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24915) Calling SparkSession.createDataFrame with schema can throw exception

2018-07-25 Thread Stephen Spencer (JIRA)
Stephen Spencer created SPARK-24915:
---

 Summary: Calling SparkSession.createDataFrame with schema can 
throw exception
 Key: SPARK-24915
 URL: https://issues.apache.org/jira/browse/SPARK-24915
 Project: Spark
  Issue Type: Bug
  Components: PySpark
Affects Versions: 2.3.1
 Environment: Python 3.6.3

PySpark 2.3.1 (installed via pip)

OSX 10.12.6
Reporter: Stephen Spencer


There seems to be a bug in PySpark when using the PySparkSQL session to create 
a dataframe with a pre-defined schema.

Code to reproduce the error:
{code:java}
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, Row

conf = SparkConf().setMaster("local").setAppName("repro") 
context = SparkContext(conf=conf) 
session = SparkSession(context)

# Construct schema (the order of fields is important)
schema = StructType([
StructField('field2', StructType([StructField('sub_field', StringType(), 
False)]), False),
StructField('field1', StringType(), False),
])

# Create data to populate data frame
data = [
Row(field1="Hello", field2=Row(sub_field='world'))
]

# Attempt to create the data frame supplying the schema
# this will throw a ValueError
df = session.createDataFrame(data, schema=schema)
df.show(){code}
Running this throws a ValueError
{noformat}
Traceback (most recent call last):
File "schema_bug.py", line 18, in 
df = session.createDataFrame(data, schema=schema)
File 
"/Users/stephenspencer/benevolent/ai/neat/rex/.env/lib/python3.6/site-packages/pyspark/sql/session.py",
 line 691, in createDataFrame
rdd, schema = self._createFromLocal(map(prepare, data), schema)
File 
"/Users/stephenspencer/benevolent/ai/neat/rex/.env/lib/python3.6/site-packages/pyspark/sql/session.py",
 line 423, in _createFromLocal
data = [schema.toInternal(row) for row in data]
File 
"/Users/stephenspencer/benevolent/ai/neat/rex/.env/lib/python3.6/site-packages/pyspark/sql/session.py",
 line 423, in 
data = [schema.toInternal(row) for row in data]
File 
"/Users/stephenspencer/benevolent/ai/neat/rex/.env/lib/python3.6/site-packages/pyspark/sql/types.py",
 line 601, in toInternal
for f, v, c in zip(self.fields, obj, self._needConversion))
File 
"/Users/stephenspencer/benevolent/ai/neat/rex/.env/lib/python3.6/site-packages/pyspark/sql/types.py",
 line 601, in 
for f, v, c in zip(self.fields, obj, self._needConversion))
File 
"/Users/stephenspencer/benevolent/ai/neat/rex/.env/lib/python3.6/site-packages/pyspark/sql/types.py",
 line 439, in toInternal
return self.dataType.toInternal(obj)
File 
"/Users/stephenspencer/benevolent/ai/neat/rex/.env/lib/python3.6/site-packages/pyspark/sql/types.py",
 line 619, in toInternal
raise ValueError("Unexpected tuple %r with StructType" % obj)
ValueError: Unexpected tuple 'Hello' with StructType{noformat}
The problem seems to be here:

https://github.com/apache/spark/blob/3d5c61e5fd24f07302e39b5d61294da79aa0c2f9/python/pyspark/sql/types.py#L603

specifically the bit
{code:java}
zip(self.fields, obj, self._needConversion)
{code}
This zip statement seems to assume that obj and self.fields are ordered in the 
same way, so that the elements of obj will correspond to the right fields in 
the schema. However this is not true, a Row orders its elements alphabetically 
but the fields in the schema are in whatever order they are specified. In this 
example field2 is being initialised with the field1 element 'Hello'. If you 
re-order the fields in the schema to go (field1, field2), the given example 
works without error.

The schema in the repro is specifically designed to elicit the problem, the 
fields are out of alphabetical order and one field is a StructType, making 
chema._needSerializeAnyField==True . However we encountered this in real use.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-19018) spark csv writer charset support

2018-07-25 Thread Hyukjin Kwon (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-19018?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon resolved SPARK-19018.
--
   Resolution: Fixed
Fix Version/s: 2.4.0

Fixed in https://github.com/apache/spark/pull/20949

> spark csv writer charset support
> 
>
> Key: SPARK-19018
> URL: https://issues.apache.org/jira/browse/SPARK-19018
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Reporter: todd.chen
>Priority: Major
> Fix For: 2.4.0
>
>
> if we write dataFrame to csv ,default charset is utf-8,and we can't change it 
> like we read csv by giving `encoding ` in params ,so I think we should 
> support csv write by pass a param



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-24906) Enlarge split size for columnar file to ensure the task read enough data

2018-07-25 Thread Jason Guo (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24906?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16554972#comment-16554972
 ] 

Jason Guo edited comment on SPARK-24906 at 7/25/18 6:09 AM:


Thanks [~maropu] and [~viirya] for your comments. Here is my solution for our 
cluster  

1. When this will be enabled?

There is a configuration item named 
{code:java}
spark.sql.parquet.adaptiveFileSplit=false{code}
 

Only when this is enabled, DataSourceScanExec will enlarge the 
mapPartitionBytes. By default, this parameter is set to false. (Our ad-hoc 
query will set it to true)

 

With this configuration, user will know that spark will adjust the partition / 
split size adaptively. If user do not want to use this, he or she can disable it

 

2. How to calculate maxPartitionBytes and openCostInBytes

Different data type has different length (calculated with 
DataType.defaultSize). First we get the total size of the whole table 
(henceforth referred to as the “T”). Then we get the total size of all the 
requiredSchema (henceforth referred to as the “R”). The multiplier should be T 
/ R  .Then the maxPartitionBytes and openCostInBytes will be enlarge with T / R 
times.

 

 

 


was (Author: habren):
Thanks [~maropu] and [~viirya] for your comments. Here is my solution for our 
cluster (more than 40 thousands nodes in total and more than 10 thousands nodes 
for a single cluster). 

1. When this will be enabled?

There is a configuration item named 
{code:java}
spark.sql.parquet.adaptiveFileSplit=false{code}
 

Only when this is enabled, DataSourceScanExec will enlarge the 
mapPartitionBytes. By default, this parameter is set to false. (Our ad-hoc 
query will set it to true)

 

With this configuration, user will know that spark will adjust the partition / 
split size adaptively. If user do not want to use this, he or she can disable it

 

2. How to calculate maxPartitionBytes and openCostInBytes

Different data type has different length (calculated with 
DataType.defaultSize). First we get the total size of the whole table 
(henceforth referred to as the “T”). Then we get the total size of all the 
requiredSchema (henceforth referred to as the “R”). The multiplier should be T 
/ R  .Then the maxPartitionBytes and openCostInBytes will be enlarge with T / R 
times.

 

 

 

> Enlarge split size for columnar file to ensure the task read enough data
> 
>
> Key: SPARK-24906
> URL: https://issues.apache.org/jira/browse/SPARK-24906
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.1
>Reporter: Jason Guo
>Priority: Critical
> Attachments: image-2018-07-24-20-26-32-441.png, 
> image-2018-07-24-20-28-06-269.png, image-2018-07-24-20-29-24-797.png, 
> image-2018-07-24-20-30-24-552.png
>
>
> For columnar file, such as, when spark sql read the table, each split will be 
> 128 MB by default since spark.sql.files.maxPartitionBytes is default to 
> 128MB. Even when user set it to a large value, such as 512MB, the task may 
> read only few MB or even hundreds of KB. Because the table (Parquet) may 
> consists of dozens of columns while the SQL only need few columns. And spark 
> will prune the unnecessary columns.
>  
> In this case, spark DataSourceScanExec can enlarge maxPartitionBytes 
> adaptively. 
> For example, there is 40 columns , 20 are integer while another 20 are long. 
> When use query on an integer type column and an long type column, the 
> maxPartitionBytes should be 20 times larger. (20*4+20*8) /  (4+8) = 20. 
>  
> With this optimization, the number of task will be smaller and the job will 
> run faster. More importantly, for a very large cluster (more the 10 thousand 
> nodes), it will relieve RM's schedule pressure.
>  
> Here is the test
>  
> The table named test2 has more than 40 columns and there are more than 5 TB 
> data each hour.
> When we issue a very simple query 
>  
> {code:java}
> select count(device_id) from test2 where date=20180708 and hour='23'{code}
>  
> There are 72176 tasks and the duration of the job is 4.8 minutes
> !image-2018-07-24-20-26-32-441.png!
>  
> Most tasks last less than 1 second and read less than 1.5 MB data
> !image-2018-07-24-20-28-06-269.png!
>  
> After the optimization, there are only 1615 tasks and the job last only 30 
> seconds. It almost 10 times faster.
> !image-2018-07-24-20-29-24-797.png!
>  
> The median of read data is 44.2MB. 
> !image-2018-07-24-20-30-24-552.png!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org