[jira] [Commented] (SPARK-24882) separate responsibilities of the data source v2 read API
[ https://issues.apache.org/jira/browse/SPARK-24882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556896#comment-16556896 ] Ryan Blue commented on SPARK-24882: --- Sounds fine, but it's getting close and I wouldn't be surprised if it didn't make it. I'll make sure I spend some time after you respond to my questions on the doc. Right now, I think it could probably use a couple of iterations to clean up the interfaces and remove duplication. If we can get that done in a week, then great. > separate responsibilities of the data source v2 read API > > > Key: SPARK-24882 > URL: https://issues.apache.org/jira/browse/SPARK-24882 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0 >Reporter: Wenchen Fan >Assignee: Wenchen Fan >Priority: Major > > Data source V2 is out for a while, see the SPIP > [here|https://docs.google.com/document/d/1n_vUVbF4KD3gxTmkNEon5qdQ-Z8qU5Frf6WMQZ6jJVM/edit?usp=sharing]. > We have already migrated most of the built-in streaming data sources to the > V2 API, and the file source migration is in progress. During the migration, > we found several problems and want to address them before we stabilize the V2 > API. > To solve these problems, we need to separate responsibilities in the data > source v2 read API. Details please see the attached google doc: > https://docs.google.com/document/d/1DDXCTCrup4bKWByTalkXWgavcPdvur8a4eEu8x1BzPM/edit?usp=sharing -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24882) separate responsibilities of the data source v2 read API
[ https://issues.apache.org/jira/browse/SPARK-24882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556885#comment-16556885 ] Wenchen Fan commented on SPARK-24882: - We don't need to rush for 2.4, but would be great if everything goes well and we make it in 2.4. > separate responsibilities of the data source v2 read API > > > Key: SPARK-24882 > URL: https://issues.apache.org/jira/browse/SPARK-24882 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0 >Reporter: Wenchen Fan >Assignee: Wenchen Fan >Priority: Major > > Data source V2 is out for a while, see the SPIP > [here|https://docs.google.com/document/d/1n_vUVbF4KD3gxTmkNEon5qdQ-Z8qU5Frf6WMQZ6jJVM/edit?usp=sharing]. > We have already migrated most of the built-in streaming data sources to the > V2 API, and the file source migration is in progress. During the migration, > we found several problems and want to address them before we stabilize the V2 > API. > To solve these problems, we need to separate responsibilities in the data > source v2 read API. Details please see the attached google doc: > https://docs.google.com/document/d/1DDXCTCrup4bKWByTalkXWgavcPdvur8a4eEu8x1BzPM/edit?usp=sharing -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-24374) SPIP: Support Barrier Execution Mode in Apache Spark
[ https://issues.apache.org/jira/browse/SPARK-24374?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xiao Li updated SPARK-24374: Affects Version/s: (was: 3.0.0) 2.4.0 > SPIP: Support Barrier Execution Mode in Apache Spark > > > Key: SPARK-24374 > URL: https://issues.apache.org/jira/browse/SPARK-24374 > Project: Spark > Issue Type: Epic > Components: ML, Spark Core >Affects Versions: 2.4.0 >Reporter: Xiangrui Meng >Assignee: Xiangrui Meng >Priority: Major > Labels: Hydrogen, SPIP > Attachments: SPIP_ Support Barrier Scheduling in Apache Spark.pdf > > > (See details in the linked/attached SPIP doc.) > {quote} > The proposal here is to add a new scheduling model to Apache Spark so users > can properly embed distributed DL training as a Spark stage to simplify the > distributed training workflow. For example, Horovod uses MPI to implement > all-reduce to accelerate distributed TensorFlow training. The computation > model is different from MapReduce used by Spark. In Spark, a task in a stage > doesn’t depend on any other tasks in the same stage, and hence it can be > scheduled independently. In MPI, all workers start at the same time and pass > messages around. To embed this workload in Spark, we need to introduce a new > scheduling model, tentatively named “barrier scheduling”, which launches > tasks at the same time and provides users enough information and tooling to > embed distributed DL training. Spark can also provide an extra layer of fault > tolerance in case some tasks failed in the middle, where Spark would abort > all tasks and restart the stage. > {quote} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-24630) SPIP: Support SQLStreaming in Spark
[ https://issues.apache.org/jira/browse/SPARK-24630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556837#comment-16556837 ] Genmao Yu edited comment on SPARK-24630 at 7/26/18 3:24 AM: [~zsxwing] Is there plan to better support SQL on streaming? like provide stream table ddl, window and watermark syntax on stream etc. was (Author: unclegen): [~zsxwing] Is there plan to better support SQL on streaming? like provide stream table ddl, window ** and watermark ** syntax on stream etc. > SPIP: Support SQLStreaming in Spark > --- > > Key: SPARK-24630 > URL: https://issues.apache.org/jira/browse/SPARK-24630 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.2.0, 2.2.1 >Reporter: Jackey Lee >Priority: Minor > Labels: SQLStreaming > Attachments: SQLStreaming SPIP.pdf > > > At present, KafkaSQL, Flink SQL(which is actually based on Calcite), > SQLStream, StormSQL all provide a stream type SQL interface, with which users > with little knowledge about streaming, can easily develop a flow system > processing model. In Spark, we can also support SQL API based on > StructStreamig. > To support for SQL Streaming, there are two key points: > 1, Analysis should be able to parse streaming type SQL. > 2, Analyzer should be able to map metadata information to the corresponding > Relation. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24630) SPIP: Support SQLStreaming in Spark
[ https://issues.apache.org/jira/browse/SPARK-24630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556837#comment-16556837 ] Genmao Yu commented on SPARK-24630: --- [~zsxwing] Is there plan to better support SQL on streaming? like provide stream table ddl, window ** and watermark ** syntax on stream etc. > SPIP: Support SQLStreaming in Spark > --- > > Key: SPARK-24630 > URL: https://issues.apache.org/jira/browse/SPARK-24630 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.2.0, 2.2.1 >Reporter: Jackey Lee >Priority: Minor > Labels: SQLStreaming > Attachments: SQLStreaming SPIP.pdf > > > At present, KafkaSQL, Flink SQL(which is actually based on Calcite), > SQLStream, StormSQL all provide a stream type SQL interface, with which users > with little knowledge about streaming, can easily develop a flow system > processing model. In Spark, we can also support SQL API based on > StructStreamig. > To support for SQL Streaming, there are two key points: > 1, Analysis should be able to parse streaming type SQL. > 2, Analyzer should be able to map metadata information to the corresponding > Relation. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24921) SparkStreaming steadily increasing job generation delay due to apparent URLClassLoader contention
[ https://issues.apache.org/jira/browse/SPARK-24921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556826#comment-16556826 ] Hyukjin Kwon commented on SPARK-24921: -- [~tommyshiou], is this rather a question? If so, it should better be asked to mailing list (see https://spark.apache.org/community.html) > SparkStreaming steadily increasing job generation delay due to apparent > URLClassLoader contention > - > > Key: SPARK-24921 > URL: https://issues.apache.org/jira/browse/SPARK-24921 > Project: Spark > Issue Type: Bug > Components: Spark Core, Web UI >Affects Versions: 2.3.1 >Reporter: Tommy S >Priority: Major > > I'm seeing an issue where the job generation time of my spark streaming job > is steadily increasing after some time. > Looking at the thread dumps I see that the JobGenerator thread is BLOCKED > waiting for URLClassPath.getLoader synchronized method: > {noformat} > "JobGenerator" #153 daemon prio=5 os_prio=0 tid=0x02dad800 nid=0x253c > waiting for monitor entry [0x7f4b311c2000] > java.lang.Thread.State: BLOCKED (on object monitor) > at sun.misc.URLClassPath.getNextLoader(URLClassPath.java:469) > - waiting to lock <0x7f4be023f940> (a sun.misc.URLClassPath) > at sun.misc.URLClassPath.findResource(URLClassPath.java:214) > at java.net.URLClassLoader$2.run(URLClassLoader.java:569) > at java.net.URLClassLoader$2.run(URLClassLoader.java:567) > at java.security.AccessController.doPrivileged(Native Method) > at java.net.URLClassLoader.findResource(URLClassLoader.java:566) > at java.lang.ClassLoader.getResource(ClassLoader.java:1096) > at java.lang.ClassLoader.getResource(ClassLoader.java:1091) > at > java.net.URLClassLoader.getResourceAsStream(URLClassLoader.java:232) > at java.lang.Class.getResourceAsStream(Class.java:2223) > at > org.apache.spark.util.ClosureCleaner$.getClassReader(ClosureCleaner.scala:40) > at > org.apache.spark.util.ClosureCleaner$.getInnerClosureClasses(ClosureCleaner.scala:84) > at > org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:224) > at > org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159) > at org.apache.spark.SparkContext.clean(SparkContext.scala:2299) > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKeyWithClassTag$1.apply(PairRDDFunctions.scala:89) > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKeyWithClassTag$1.apply(PairRDDFunctions.scala:77) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:363) > at > org.apache.spark.rdd.PairRDDFunctions.combineByKeyWithClassTag(PairRDDFunctions.scala:77) > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKey$1.apply(PairRDDFunctions.scala:119) > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKey$1.apply(PairRDDFunctions.scala:119) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:363) > at > org.apache.spark.rdd.PairRDDFunctions.combineByKey(PairRDDFunctions.scala:117) > at > org.apache.spark.streaming.dstream.ShuffledDStream.compute(ShuffledDStream.scala:42) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342) > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341) > at > org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334) > at scala.Option.orElse(Option.scala:289) > at > org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331) > at >
[jira] [Commented] (SPARK-24914) totalSize is not a good estimate for broadcast joins
[ https://issues.apache.org/jira/browse/SPARK-24914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556824#comment-16556824 ] Hyukjin Kwon commented on SPARK-24914: -- cc [~ZenWzh] > totalSize is not a good estimate for broadcast joins > > > Key: SPARK-24914 > URL: https://issues.apache.org/jira/browse/SPARK-24914 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0 >Reporter: Bruce Robbins >Priority: Major > > When determining whether to do a broadcast join, Spark estimates the size of > the smaller table as follows: > - if totalSize is defined and greater than 0, use it. > - else, if rawDataSize is defined and greater than 0, use it > - else, use spark.sql.defaultSizeInBytes (default: Long.MaxValue) > Therefore, Spark prefers totalSize over rawDataSize. > Unfortunately, totalSize is often quite a bit smaller than the actual table > size, since it represents the size of the table's files on disk. Parquet and > Orc files, for example, are encoded and compressed. This can result in the > JVM throwing an OutOfMemoryError while Spark is loading the table into a > HashedRelation, or when Spark actually attempts to broadcast the data. > On the other hand, rawDataSize represents the uncompressed size of the > dataset, according to Hive documentation. This seems like a pretty good > number to use in preference to totalSize. However, due to HIVE-20079, this > value is simply #columns * #rows. Once that bug is fixed, it may be a > superior statistic, at least for managed tables. > In the meantime, we could apply a configurable "fudge factor" to totalSize, > at least for types of files that are encoded and compressed. Hive has the > setting hive.stats.deserialization.factor, which defaults to 1.0, and is > described as follows: > {quote}in the absence of uncompressed/raw data size, total file size will be > used for statistics annotation. But the file may be compressed, encoded and > serialized which may be lesser in size than the actual uncompressed/raw data > size. This factor will be multiplied to file size to estimate the raw data > size. > {quote} > Also, I propose a configuration setting to allow the user to completely > ignore rawDataSize, since that value is broken (due to HIVE-20079). When that > configuration setting is set to true, Spark would instead estimate the table > as follows: > - if totalSize is defined and greater than 0, use totalSize*fudgeFactor. > - else, use spark.sql.defaultSizeInBytes (default: Long.MaxValue) > Caveat: This mitigates the issue only for Hive tables. It does not help much > when the user is reading files using {{spark.read.parquet}}, unless we apply > the same fudge factor there. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24925) input bytesRead metrics fluctuate from time to time
[ https://issues.apache.org/jira/browse/SPARK-24925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556820#comment-16556820 ] yucai commented on SPARK-24925: --- [~cloud_fan], [~xiaoli] , [~kiszk] , any comments? > input bytesRead metrics fluctuate from time to time > --- > > Key: SPARK-24925 > URL: https://issues.apache.org/jira/browse/SPARK-24925 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.1 >Reporter: yucai >Priority: Major > Attachments: bytesRead.gif > > > input bytesRead metrics fluctuate from time to time, it is worse when > pushdown enabled. > Query > {code:java} > CREATE TABLE dev AS > SELECT > ... > FROM lstg_item cold, lstg_item_vrtn v > WHERE cold.auct_end_dt = CAST(SUBSTR('2018-03-18 00:00:00',1,10) AS DATE) > AND v.auct_end_dt = CAST(SUBSTR('2018-03-18 00:00:00',1,10) AS DATE) > ... > {code} > Issue > See attached bytesRead.gif, input bytesRead shows 48GB, 52GB, 51GB, 50GB, > 54GB, 53GB ... -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-24925) input bytesRead metrics fluctuate from time to time
[ https://issues.apache.org/jira/browse/SPARK-24925?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-24925: Assignee: Apache Spark > input bytesRead metrics fluctuate from time to time > --- > > Key: SPARK-24925 > URL: https://issues.apache.org/jira/browse/SPARK-24925 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.1 >Reporter: yucai >Assignee: Apache Spark >Priority: Major > Attachments: bytesRead.gif > > > input bytesRead metrics fluctuate from time to time, it is worse when > pushdown enabled. > Query > {code:java} > CREATE TABLE dev AS > SELECT > ... > FROM lstg_item cold, lstg_item_vrtn v > WHERE cold.auct_end_dt = CAST(SUBSTR('2018-03-18 00:00:00',1,10) AS DATE) > AND v.auct_end_dt = CAST(SUBSTR('2018-03-18 00:00:00',1,10) AS DATE) > ... > {code} > Issue > See attached bytesRead.gif, input bytesRead shows 48GB, 52GB, 51GB, 50GB, > 54GB, 53GB ... -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24925) input bytesRead metrics fluctuate from time to time
[ https://issues.apache.org/jira/browse/SPARK-24925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556819#comment-16556819 ] Apache Spark commented on SPARK-24925: -- User 'yucai' has created a pull request for this issue: https://github.com/apache/spark/pull/21791 > input bytesRead metrics fluctuate from time to time > --- > > Key: SPARK-24925 > URL: https://issues.apache.org/jira/browse/SPARK-24925 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.1 >Reporter: yucai >Priority: Major > Attachments: bytesRead.gif > > > input bytesRead metrics fluctuate from time to time, it is worse when > pushdown enabled. > Query > {code:java} > CREATE TABLE dev AS > SELECT > ... > FROM lstg_item cold, lstg_item_vrtn v > WHERE cold.auct_end_dt = CAST(SUBSTR('2018-03-18 00:00:00',1,10) AS DATE) > AND v.auct_end_dt = CAST(SUBSTR('2018-03-18 00:00:00',1,10) AS DATE) > ... > {code} > Issue > See attached bytesRead.gif, input bytesRead shows 48GB, 52GB, 51GB, 50GB, > 54GB, 53GB ... -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-24925) input bytesRead metrics fluctuate from time to time
[ https://issues.apache.org/jira/browse/SPARK-24925?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-24925: Assignee: (was: Apache Spark) > input bytesRead metrics fluctuate from time to time > --- > > Key: SPARK-24925 > URL: https://issues.apache.org/jira/browse/SPARK-24925 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.1 >Reporter: yucai >Priority: Major > Attachments: bytesRead.gif > > > input bytesRead metrics fluctuate from time to time, it is worse when > pushdown enabled. > Query > {code:java} > CREATE TABLE dev AS > SELECT > ... > FROM lstg_item cold, lstg_item_vrtn v > WHERE cold.auct_end_dt = CAST(SUBSTR('2018-03-18 00:00:00',1,10) AS DATE) > AND v.auct_end_dt = CAST(SUBSTR('2018-03-18 00:00:00',1,10) AS DATE) > ... > {code} > Issue > See attached bytesRead.gif, input bytesRead shows 48GB, 52GB, 51GB, 50GB, > 54GB, 53GB ... -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-24288) Enable preventing predicate pushdown
[ https://issues.apache.org/jira/browse/SPARK-24288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556801#comment-16556801 ] Hyukjin Kwon edited comment on SPARK-24288 at 7/26/18 2:56 AM: --- [~smilegator] should we resolve this {{Won't fix}} for now (and create another JIRA) or edit this JIRA? was (Author: hyukjin.kwon): [~smilegator] should we resolve this {{Won't fix}} for now? > Enable preventing predicate pushdown > > > Key: SPARK-24288 > URL: https://issues.apache.org/jira/browse/SPARK-24288 > Project: Spark > Issue Type: New Feature > Components: SQL >Affects Versions: 2.3.0 >Reporter: Tomasz Gawęda >Priority: Major > Attachments: SPARK-24288.simple.patch > > > Issue discussed on Mailing List: > [http://apache-spark-developers-list.1001551.n3.nabble.com/Preventing-predicate-pushdown-td23976.html] > While working with JDBC datasource I saw that many "or" clauses with > non-equality operators causes huge performance degradation of SQL query > to database (DB2). For example: > val df = spark.read.format("jdbc").(other options to parallelize > load).load() > df.where(s"(date1 > $param1 and (date1 < $param2 or date1 is null) or x > > 100)").show() // in real application whose predicates were pushed > many many lines below, many ANDs and ORs > If I use cache() before where, there is no predicate pushdown of this > "where" clause. However, in production system caching many sources is a > waste of memory (especially is pipeline is long and I must do cache many > times).There are also few more workarounds, but it would be great if Spark > will support preventing predicate pushdown by user. > > For example: df.withAnalysisBarrier().where(...) ? > > Note, that this should not be a global configuration option. If I read 2 > DataFrames, df1 and df2, I would like to specify that df1 should not have > some predicates pushed down, but some may be, but df2 should have all > predicates pushed down, even if target query joins df1 and df2. As far as I > understand Spark optimizer, if we use functions like `withAnalysisBarrier` > and put AnalysisBarrier explicitly in logical plan, then predicates won't be > pushed down on this particular DataFrames and PP will be still possible on > the second one. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24925) input bytesRead metrics fluctuate from time to time
[ https://issues.apache.org/jira/browse/SPARK-24925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556818#comment-16556818 ] yucai commented on SPARK-24925: --- I think there could be two issues. In FileScanRDD 1. ColumnarBatch's bytesRead need to be updated for every 4096 * 1000 rows, which makes the metrics out of date. 2. When advancing to the next file, FileScanRDD always adds the whole file length into bytesRead, which is inaccurate (pushdown reads much less data). For problem 1, in https://github.com/apache/spark/pull/21791, I tried to update the ColumnarBatch's bytesRead for each batch. For problem 2, updateBytesReadWithFileSize says, "If we can't get the bytes read from the FS stats, fall back to the file size", can we update only when this situation happens? > input bytesRead metrics fluctuate from time to time > --- > > Key: SPARK-24925 > URL: https://issues.apache.org/jira/browse/SPARK-24925 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.1 >Reporter: yucai >Priority: Major > Attachments: bytesRead.gif > > > input bytesRead metrics fluctuate from time to time, it is worse when > pushdown enabled. > Query > {code:java} > CREATE TABLE dev AS > SELECT > ... > FROM lstg_item cold, lstg_item_vrtn v > WHERE cold.auct_end_dt = CAST(SUBSTR('2018-03-18 00:00:00',1,10) AS DATE) > AND v.auct_end_dt = CAST(SUBSTR('2018-03-18 00:00:00',1,10) AS DATE) > ... > {code} > Issue > See attached bytesRead.gif, input bytesRead shows 48GB, 52GB, 51GB, 50GB, > 54GB, 53GB ... -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-24905) Spark 2.3 Internal URL env variable
[ https://issues.apache.org/jira/browse/SPARK-24905?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hyukjin Kwon updated SPARK-24905: - Priority: Major (was: Critical) > Spark 2.3 Internal URL env variable > --- > > Key: SPARK-24905 > URL: https://issues.apache.org/jira/browse/SPARK-24905 > Project: Spark > Issue Type: Bug > Components: Kubernetes >Affects Versions: 2.3.1 >Reporter: Björn Wenzel >Priority: Major > > Currently the Kubernetes Master internal URL is hardcoded in the > Constants.scala file > ([https://github.com/apache/spark/blob/85fe1297e35bcff9cf86bd53fee615e140ee5bfb/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala#L75)] > In some cases these URL should be changed e.g. if the Certificate is valid > for another Hostname. > Is it possible to make this URL a property like: > spark.kubernetes.authenticate.driver.hostname? > Kubernetes The Hard Way maintained by Kelsey Hightower for example uses > kubernetes.default as hostname, this will produce again a > SSLPeerUnverifiedException. > > Here is the use of the Hardcoded Host: > [https://github.com/apache/spark/blob/85fe1297e35bcff9cf86bd53fee615e140ee5bfb/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala#L52] > maybe this could be changed like the KUBERNETES_NAMESPACE property in Line > 53. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24905) Spark 2.3 Internal URL env variable
[ https://issues.apache.org/jira/browse/SPARK-24905?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556817#comment-16556817 ] Hyukjin Kwon commented on SPARK-24905: -- (please avoid to set Critical+ which is usually reserved for committers) > Spark 2.3 Internal URL env variable > --- > > Key: SPARK-24905 > URL: https://issues.apache.org/jira/browse/SPARK-24905 > Project: Spark > Issue Type: Bug > Components: Kubernetes >Affects Versions: 2.3.1 >Reporter: Björn Wenzel >Priority: Major > > Currently the Kubernetes Master internal URL is hardcoded in the > Constants.scala file > ([https://github.com/apache/spark/blob/85fe1297e35bcff9cf86bd53fee615e140ee5bfb/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala#L75)] > In some cases these URL should be changed e.g. if the Certificate is valid > for another Hostname. > Is it possible to make this URL a property like: > spark.kubernetes.authenticate.driver.hostname? > Kubernetes The Hard Way maintained by Kelsey Hightower for example uses > kubernetes.default as hostname, this will produce again a > SSLPeerUnverifiedException. > > Here is the use of the Hardcoded Host: > [https://github.com/apache/spark/blob/85fe1297e35bcff9cf86bd53fee615e140ee5bfb/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala#L52] > maybe this could be changed like the KUBERNETES_NAMESPACE property in Line > 53. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24897) DAGScheduler should not unregisterMapOutput and increaseEpoch repeatedly for stage fetchFailed
[ https://issues.apache.org/jira/browse/SPARK-24897?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556806#comment-16556806 ] Hyukjin Kwon commented on SPARK-24897: -- I couldn't follow it too. > DAGScheduler should not unregisterMapOutput and increaseEpoch repeatedly for > stage fetchFailed > -- > > Key: SPARK-24897 > URL: https://issues.apache.org/jira/browse/SPARK-24897 > Project: Spark > Issue Type: Bug > Components: Scheduler, Spark Core >Affects Versions: 2.1.0, 2.3.1 >Reporter: liupengcheng >Priority: Major > > In Spark2.1, when a stage fetchfailed DAGScheduler will retry both this stage > and it's parent stage, however, when the parent stage is resubmitted and > start running, the mapstatuses can > still be invalidate by the stage's outstanding task due to fetchfailed. > The stage's outstanding task might unregister the mapstatuses with new epoch, > thus causing > the parent stage repeated MetadataFetchFailed and finally failling the Job. > > > {code:java} > 2018-07-23,01:52:33,012 WARN org.apache.spark.scheduler.TaskSetManager: Lost > task 174.0 in stage 71.0 (TID 154127, , executor 96): > FetchFailed(BlockManagerId(4945, , 22409), shuffleId=24, mapId=667, > reduceId=174, message= org.apache.spark.shuffle.FetchFailedException: Failed > to connect to /:22409 > 2018-07-23,01:52:33,013 INFO org.apache.spark.scheduler.DAGScheduler: Marking > ShuffleMapStage 71 (map at DeviceLocateMain.scala:238) as failed due to a > fetch failure from ShuffleMapStage 69 ($plus$plus at > DeviceLocateMain.scala:236) > 2018-07-23,01:52:33,014 INFO org.apache.spark.scheduler.DAGScheduler: > ShuffleMapStage 71 (map at DeviceLocateMain.scala:238) failed in 246.856 s > 2018-07-23,01:52:33,014 INFO org.apache.spark.scheduler.DAGScheduler: > Resubmitting ShuffleMapStage 69 ($plus$plus at DeviceLocateMain.scala:236) > and ShuffleMapStage 71 (map at DeviceLocateMain.scala:238) due to fetch > failure > 2018-07-23,01:52:36,004 WARN org.apache.spark.scheduler.TaskSetManager: Lost > task 120.0 in stage 71.0 (TID 154073, , executor 286): > FetchFailed(BlockManagerId(4208, , 22409), shuffleId=24, mapId=241, > reduceId=120, message= org.apache.spark.shuffle.FetchFailedException: Failed > to connect to /:22409 > 2018-07-23,01:52:36,005 INFO org.apache.spark.scheduler.DAGScheduler: > Resubmitting ShuffleMapStage 69 ($plus$plus at DeviceLocateMain.scala:236) > and ShuffleMapStage 71 (map at DeviceLocateMain.scala:238) due to fetch > failure > 2018-07-23,01:52:36,017 INFO org.apache.spark.storage.MemoryStore: Block > broadcast_63_piece0 stored as bytes in memory (estimated size 4.0 MB, free > 26.7 MB) > 2018-07-23,01:52:36,025 INFO org.apache.spark.storage.BlockManagerInfo: > Removed broadcast_59_piece1 on :52349 in memory (size: 4.0 MB, free: > 3.0 GB) > 2018-07-23,01:52:36,029 INFO org.apache.spark.storage.BlockManagerInfo: > Removed broadcast_61_piece6 on :52349 in memory (size: 4.0 MB, free: > 3.0 GB) > 2018-07-23,01:52:36,079 INFO org.apache.spark.deploy.yarn.YarnAllocator: > Canceling requests for 0 executor containers > 2018-07-23,01:52:36,079 WARN org.apache.spark.deploy.yarn.YarnAllocator: > Expected to find pending requests, but found none. > 2018-07-23,01:52:36,094 INFO org.apache.spark.storage.BlockManagerInfo: > Added broadcast_63_piece0 in memory on :56780 (size: 4.0 MB, free: 3.7 > GB) > 2018-07-23,01:52:36,095 INFO org.apache.spark.storage.MemoryStore: Block > broadcast_63_piece1 stored as bytes in memory (estimated size 4.0 MB, free > 30.7 MB) > 2018-07-23,01:52:36,107 INFO org.apache.spark.storage.BlockManagerInfo: Added > broadcast_63_piece1 in memory on :56780 (size: 4.0 MB, free: 3.7 GB) > 2018-07-23,01:52:36,108 INFO org.apache.spark.storage.MemoryStore: Block > broadcast_63_piece2 stored as bytes in memory (estimated size 4.0 MB, free > 34.7 MB) > 2018-07-23,01:52:36,108 INFO org.apache.spark.storage.BlockManagerInfo: Added > broadcast_63_piece2 in memory on :56780 (size: 4.0 MB, free: 3.7 GB) > 2018-07-23,01:52:36,108 INFO org.apache.spark.storage.MemoryStore: Block > broadcast_63_piece3 stored as bytes in memory (estimated size 4.0 MB, free > 38.7 MB) > 2018-07-23,01:52:36,132 INFO org.apache.spark.storage.BlockManagerInfo: Added > broadcast_63_piece3 in memory on :56780 (size: 4.0 MB, free: 3.7 GB) > 2018-07-23,01:52:36,132 INFO org.apache.spark.storage.MemoryStore: Block > broadcast_63_piece4 stored as bytes in memory (estimated size 3.8 MB, free > 42.5 MB) > 2018-07-23,01:52:36,132 INFO org.apache.spark.storage.BlockManagerInfo: Added > broadcast_63_piece4 in memory on :56780 (size: 3.8 MB, free: 3.7 GB) > 2018-07-23,01:52:36,132 INFO org.apache.spark.MapOutputTracker: Broadcast > mapstatuses
[jira] [Commented] (SPARK-24288) Enable preventing predicate pushdown
[ https://issues.apache.org/jira/browse/SPARK-24288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556801#comment-16556801 ] Hyukjin Kwon commented on SPARK-24288: -- [~smilegator] should we resolve this {{Won't fix}} for now? > Enable preventing predicate pushdown > > > Key: SPARK-24288 > URL: https://issues.apache.org/jira/browse/SPARK-24288 > Project: Spark > Issue Type: New Feature > Components: SQL >Affects Versions: 2.3.0 >Reporter: Tomasz Gawęda >Priority: Major > Attachments: SPARK-24288.simple.patch > > > Issue discussed on Mailing List: > [http://apache-spark-developers-list.1001551.n3.nabble.com/Preventing-predicate-pushdown-td23976.html] > While working with JDBC datasource I saw that many "or" clauses with > non-equality operators causes huge performance degradation of SQL query > to database (DB2). For example: > val df = spark.read.format("jdbc").(other options to parallelize > load).load() > df.where(s"(date1 > $param1 and (date1 < $param2 or date1 is null) or x > > 100)").show() // in real application whose predicates were pushed > many many lines below, many ANDs and ORs > If I use cache() before where, there is no predicate pushdown of this > "where" clause. However, in production system caching many sources is a > waste of memory (especially is pipeline is long and I must do cache many > times).There are also few more workarounds, but it would be great if Spark > will support preventing predicate pushdown by user. > > For example: df.withAnalysisBarrier().where(...) ? > > Note, that this should not be a global configuration option. If I read 2 > DataFrames, df1 and df2, I would like to specify that df1 should not have > some predicates pushed down, but some may be, but df2 should have all > predicates pushed down, even if target query joins df1 and df2. As far as I > understand Spark optimizer, if we use functions like `withAnalysisBarrier` > and put AnalysisBarrier explicitly in logical plan, then predicates won't be > pushed down on this particular DataFrames and PP will be still possible on > the second one. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-24925) input bytesRead metrics fluctuate from time to time
[ https://issues.apache.org/jira/browse/SPARK-24925?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yucai updated SPARK-24925: -- Attachment: bytesRead.gif > input bytesRead metrics fluctuate from time to time > --- > > Key: SPARK-24925 > URL: https://issues.apache.org/jira/browse/SPARK-24925 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.1 >Reporter: yucai >Priority: Major > Attachments: bytesRead.gif > > > input bytesRead metrics fluctuate from time to time, it is worse when > pushdown enabled. > Query > {code:java} > CREATE TABLE dev AS > SELECT > ... > FROM lstg_item cold, lstg_item_vrtn v > WHERE cold.auct_end_dt = CAST(SUBSTR('2018-03-18 00:00:00',1,10) AS DATE) > AND v.auct_end_dt = CAST(SUBSTR('2018-03-18 00:00:00',1,10) AS DATE) > ... > {code} > Issue > See attached gif, input bytesRead shows 48GB, 52GB, 51GB, 50GB, 54GB, 53GB > ... -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-24925) input bytesRead metrics fluctuate from time to time
[ https://issues.apache.org/jira/browse/SPARK-24925?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yucai updated SPARK-24925: -- Description: input bytesRead metrics fluctuate from time to time, it is worse when pushdown enabled. Query {code:java} CREATE TABLE dev AS SELECT ... FROM lstg_item cold, lstg_item_vrtn v WHERE cold.auct_end_dt = CAST(SUBSTR('2018-03-18 00:00:00',1,10) AS DATE) AND v.auct_end_dt = CAST(SUBSTR('2018-03-18 00:00:00',1,10) AS DATE) ... {code} Issue See attached bytesRead.gif, input bytesRead shows 48GB, 52GB, 51GB, 50GB, 54GB, 53GB ... was: input bytesRead metrics fluctuate from time to time, it is worse when pushdown enabled. Query {code:java} CREATE TABLE dev AS SELECT ... FROM lstg_item cold, lstg_item_vrtn v WHERE cold.auct_end_dt = CAST(SUBSTR('2018-03-18 00:00:00',1,10) AS DATE) AND v.auct_end_dt = CAST(SUBSTR('2018-03-18 00:00:00',1,10) AS DATE) ... {code} Issue See attached gif, input bytesRead shows 48GB, 52GB, 51GB, 50GB, 54GB, 53GB ... > input bytesRead metrics fluctuate from time to time > --- > > Key: SPARK-24925 > URL: https://issues.apache.org/jira/browse/SPARK-24925 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.1 >Reporter: yucai >Priority: Major > Attachments: bytesRead.gif > > > input bytesRead metrics fluctuate from time to time, it is worse when > pushdown enabled. > Query > {code:java} > CREATE TABLE dev AS > SELECT > ... > FROM lstg_item cold, lstg_item_vrtn v > WHERE cold.auct_end_dt = CAST(SUBSTR('2018-03-18 00:00:00',1,10) AS DATE) > AND v.auct_end_dt = CAST(SUBSTR('2018-03-18 00:00:00',1,10) AS DATE) > ... > {code} > Issue > See attached bytesRead.gif, input bytesRead shows 48GB, 52GB, 51GB, 50GB, > 54GB, 53GB ... -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-24925) input bytesRead metrics fluctuate from time to time
yucai created SPARK-24925: - Summary: input bytesRead metrics fluctuate from time to time Key: SPARK-24925 URL: https://issues.apache.org/jira/browse/SPARK-24925 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.3.1 Reporter: yucai input bytesRead metrics fluctuate from time to time, it is worse when pushdown enabled. Query {code:java} CREATE TABLE dev AS SELECT ... FROM lstg_item cold, lstg_item_vrtn v WHERE cold.auct_end_dt = CAST(SUBSTR('2018-03-18 00:00:00',1,10) AS DATE) AND v.auct_end_dt = CAST(SUBSTR('2018-03-18 00:00:00',1,10) AS DATE) ... {code} Issue See attached gif, input bytesRead shows 48GB, 52GB, 51GB, 50GB, 54GB, 53GB ... -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-24832) Improve inputMetrics's bytesRead update for ColumnarBatch
[ https://issues.apache.org/jira/browse/SPARK-24832?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yucai updated SPARK-24832: -- Summary: Improve inputMetrics's bytesRead update for ColumnarBatch (was: When pushdown enabled, input bytesRead metrics is easy to fluctuate from time to time) > Improve inputMetrics's bytesRead update for ColumnarBatch > - > > Key: SPARK-24832 > URL: https://issues.apache.org/jira/browse/SPARK-24832 > Project: Spark > Issue Type: Bug > Components: Spark Core, SQL >Affects Versions: 2.3.1 >Reporter: yucai >Priority: Major > > Improve inputMetrics's bytesRead update for ColumnarBatch -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-24832) When pushdown enabled, input bytesRead metrics is easy to fluctuate from time to time
[ https://issues.apache.org/jira/browse/SPARK-24832?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yucai updated SPARK-24832: -- Summary: When pushdown enabled, input bytesRead metrics is easy to fluctuate from time to time (was: Improve inputMetrics's bytesRead update for ColumnarBatch) > When pushdown enabled, input bytesRead metrics is easy to fluctuate from time > to time > - > > Key: SPARK-24832 > URL: https://issues.apache.org/jira/browse/SPARK-24832 > Project: Spark > Issue Type: Bug > Components: Spark Core, SQL >Affects Versions: 2.3.1 >Reporter: yucai >Priority: Major > > Improve inputMetrics's bytesRead update for ColumnarBatch -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24867) Add AnalysisBarrier to DataFrameWriter
[ https://issues.apache.org/jira/browse/SPARK-24867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556760#comment-16556760 ] Saisai Shao commented on SPARK-24867: - I see, thanks! Please let me know when the JIRA is opened. > Add AnalysisBarrier to DataFrameWriter > --- > > Key: SPARK-24867 > URL: https://issues.apache.org/jira/browse/SPARK-24867 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0, 2.3.1 >Reporter: Xiao Li >Assignee: Xiao Li >Priority: Blocker > Fix For: 2.3.2 > > > {code} > val udf1 = udf({(x: Int, y: Int) => x + y}) > val df = spark.range(0, 3).toDF("a") > .withColumn("b", udf1($"a", udf1($"a", lit(10 > df.cache() > df.write.saveAsTable("t") > df.write.saveAsTable("t1") > {code} > Cache is not being used because the plans do not match with the cached plan. > This is a regression caused by the changes we made in AnalysisBarrier, since > not all the Analyzer rules are idempotent. We need to fix it to Spark 2.3.2 -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12911) Cacheing a dataframe causes array comparisons to fail (in filter / where) after 1.6
[ https://issues.apache.org/jira/browse/SPARK-12911?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556470#comment-16556470 ] David Vogelbacher commented on SPARK-12911: --- Hey [~hyukjin.kwon] [~sdicocco][~a1ray], I just reproduced this on master. I executed the following in the spark-shell: {noformat} scala> import org.apache.spark.sql.functions import org.apache.spark.sql.functions scala> val df = Seq(Array("a", "b"), Array("c", "d")).toDF("arrayCol") df: org.apache.spark.sql.DataFrame = [arrayCol: array] scala> df.filter(df.col("arrayCol").eqNullSafe(functions.array(functions.lit("a"), functions.lit("b".show() ++ |arrayCol| ++ | [a, b]| ++ scala> df.cache().filter(df.col("arrayCol").eqNullSafe(functions.array(functions.lit("a"), functions.lit("b".show() ++ |arrayCol| ++ ++ {noformat} This seems to be the same issue? > Cacheing a dataframe causes array comparisons to fail (in filter / where) > after 1.6 > --- > > Key: SPARK-12911 > URL: https://issues.apache.org/jira/browse/SPARK-12911 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.0 > Environment: OSX 10.11.1, Scala 2.11.7, Spark 1.6.0 >Reporter: Jesse English >Priority: Major > > When doing a *where* operation on a dataframe and testing for equality on an > array type, after 1.6 no valid comparisons are made if the dataframe has been > cached. If it has not been cached, the results are as expected. > This appears to be related to the underlying unsafe array data types. > {code:title=test.scala|borderStyle=solid} > test("test array comparison") { > val vectors: Vector[Row] = Vector( > Row.fromTuple("id_1" -> Array(0L, 2L)), > Row.fromTuple("id_2" -> Array(0L, 5L)), > Row.fromTuple("id_3" -> Array(0L, 9L)), > Row.fromTuple("id_4" -> Array(1L, 0L)), > Row.fromTuple("id_5" -> Array(1L, 8L)), > Row.fromTuple("id_6" -> Array(2L, 4L)), > Row.fromTuple("id_7" -> Array(5L, 6L)), > Row.fromTuple("id_8" -> Array(6L, 2L)), > Row.fromTuple("id_9" -> Array(7L, 0L)) > ) > val data: RDD[Row] = sc.parallelize(vectors, 3) > val schema = StructType( > StructField("id", StringType, false) :: > StructField("point", DataTypes.createArrayType(LongType, false), > false) :: > Nil > ) > val sqlContext = new SQLContext(sc) > val dataframe = sqlContext.createDataFrame(data, schema) > val targetPoint:Array[Long] = Array(0L,9L) > //Cacheing is the trigger to cause the error (no cacheing causes no error) > dataframe.cache() > //This is the line where it fails > //java.util.NoSuchElementException: next on empty iterator > //However we know that there is a valid match > val targetRow = dataframe.where(dataframe("point") === > array(targetPoint.map(value => lit(value)): _*)).first() > assert(targetRow != null) > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-24916) Fix type coercion for IN expression with subquery
[ https://issues.apache.org/jira/browse/SPARK-24916?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuming Wang resolved SPARK-24916. - Resolution: Duplicate > Fix type coercion for IN expression with subquery > - > > Key: SPARK-24916 > URL: https://issues.apache.org/jira/browse/SPARK-24916 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0, 2.3.0 >Reporter: Yuming Wang >Priority: Major > > How to reproduce: > {code:sql} > CREATE TEMPORARY VIEW t4 AS SELECT * FROM VALUES > (CAST(1 AS DOUBLE), CAST(2 AS STRING), CAST(3 AS STRING)) > AS t1(t4a, t4b, t4c); > CREATE TEMPORARY VIEW t5 AS SELECT * FROM VALUES > (CAST(1 AS DECIMAL(18, 0)), CAST(2 AS STRING), CAST(3 AS BIGINT)) > AS t1(t5a, t5b, t5c); > SELECT * FROM t4 > WHERE > (t4a, t4b, t4c) IN (SELECT t5a, >t5b, >t5c > FROM t5); > {code} > Will throw exception: > {noformat} > org.apache.spark.sql.AnalysisException > cannot resolve '(named_struct('t4a', t4.`t4a`, 't4b', t4.`t4b`, 't4c', > t4.`t4c`) IN (listquery()))' due to data type mismatch: > The data type of one or more elements in the left hand side of an IN subquery > is not compatible with the data type of the output of the subquery > Mismatched columns: > [(t4.`t4a`:double, t5.`t5a`:decimal(18,0)), (t4.`t4c`:string, > t5.`t5c`:bigint)] > Left side: > [double, string, string]. > Right side: > [decimal(18,0), string, bigint].; > {noformat} > But it success on Spark 2.1.x. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24867) Add AnalysisBarrier to DataFrameWriter
[ https://issues.apache.org/jira/browse/SPARK-24867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556447#comment-16556447 ] Xiao Li commented on SPARK-24867: - [~jerryshao] This ticket was just resolved. [~lian cheng] might find another serious bug. He will open a JIRA soon. > Add AnalysisBarrier to DataFrameWriter > --- > > Key: SPARK-24867 > URL: https://issues.apache.org/jira/browse/SPARK-24867 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0, 2.3.1 >Reporter: Xiao Li >Assignee: Xiao Li >Priority: Blocker > Fix For: 2.3.2 > > > {code} > val udf1 = udf({(x: Int, y: Int) => x + y}) > val df = spark.range(0, 3).toDF("a") > .withColumn("b", udf1($"a", udf1($"a", lit(10 > df.cache() > df.write.saveAsTable("t") > df.write.saveAsTable("t1") > {code} > Cache is not being used because the plans do not match with the cached plan. > This is a regression caused by the changes we made in AnalysisBarrier, since > not all the Analyzer rules are idempotent. We need to fix it to Spark 2.3.2 -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-24867) Add AnalysisBarrier to DataFrameWriter
[ https://issues.apache.org/jira/browse/SPARK-24867?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xiao Li resolved SPARK-24867. - Resolution: Fixed Fix Version/s: 2.3.2 > Add AnalysisBarrier to DataFrameWriter > --- > > Key: SPARK-24867 > URL: https://issues.apache.org/jira/browse/SPARK-24867 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0, 2.3.1 >Reporter: Xiao Li >Assignee: Xiao Li >Priority: Blocker > Fix For: 2.3.2 > > > {code} > val udf1 = udf({(x: Int, y: Int) => x + y}) > val df = spark.range(0, 3).toDF("a") > .withColumn("b", udf1($"a", udf1($"a", lit(10 > df.cache() > df.write.saveAsTable("t") > df.write.saveAsTable("t1") > {code} > Cache is not being used because the plans do not match with the cached plan. > This is a regression caused by the changes we made in AnalysisBarrier, since > not all the Analyzer rules are idempotent. We need to fix it to Spark 2.3.2 -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-24924) Add mapping for built-in Avro data source
[ https://issues.apache.org/jira/browse/SPARK-24924?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-24924: Assignee: Apache Spark > Add mapping for built-in Avro data source > - > > Key: SPARK-24924 > URL: https://issues.apache.org/jira/browse/SPARK-24924 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.4.0 >Reporter: Dongjoon Hyun >Assignee: Apache Spark >Priority: Minor > > This issue aims to the followings. > # Like `com.databricks.spark.csv` mapping, we had better map > `com.databricks.spark.avro` to built-in Avro data source. > # Remove incorrect error message, `Please find an Avro package at ...`. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24924) Add mapping for built-in Avro data source
[ https://issues.apache.org/jira/browse/SPARK-24924?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556427#comment-16556427 ] Apache Spark commented on SPARK-24924: -- User 'dongjoon-hyun' has created a pull request for this issue: https://github.com/apache/spark/pull/21878 > Add mapping for built-in Avro data source > - > > Key: SPARK-24924 > URL: https://issues.apache.org/jira/browse/SPARK-24924 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.4.0 >Reporter: Dongjoon Hyun >Priority: Minor > > This issue aims to the followings. > # Like `com.databricks.spark.csv` mapping, we had better map > `com.databricks.spark.avro` to built-in Avro data source. > # Remove incorrect error message, `Please find an Avro package at ...`. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-24924) Add mapping for built-in Avro data source
[ https://issues.apache.org/jira/browse/SPARK-24924?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-24924: Assignee: (was: Apache Spark) > Add mapping for built-in Avro data source > - > > Key: SPARK-24924 > URL: https://issues.apache.org/jira/browse/SPARK-24924 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.4.0 >Reporter: Dongjoon Hyun >Priority: Minor > > This issue aims to the followings. > # Like `com.databricks.spark.csv` mapping, we had better map > `com.databricks.spark.avro` to built-in Avro data source. > # Remove incorrect error message, `Please find an Avro package at ...`. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-24924) Add mapping for built-in Avro data source
Dongjoon Hyun created SPARK-24924: - Summary: Add mapping for built-in Avro data source Key: SPARK-24924 URL: https://issues.apache.org/jira/browse/SPARK-24924 Project: Spark Issue Type: Sub-task Components: SQL Affects Versions: 2.4.0 Reporter: Dongjoon Hyun This issue aims to the followings. # Like `com.databricks.spark.csv` mapping, we had better map `com.databricks.spark.avro` to built-in Avro data source. # Remove incorrect error message, `Please find an Avro package at ...`. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24906) Enlarge split size for columnar file to ensure the task read enough data
[ https://issues.apache.org/jira/browse/SPARK-24906?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556413#comment-16556413 ] Jason Guo commented on SPARK-24906: --- [~maropu] [~viirya] What do you think about this idea ? > Enlarge split size for columnar file to ensure the task read enough data > > > Key: SPARK-24906 > URL: https://issues.apache.org/jira/browse/SPARK-24906 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.1 >Reporter: Jason Guo >Priority: Critical > Attachments: image-2018-07-24-20-26-32-441.png, > image-2018-07-24-20-28-06-269.png, image-2018-07-24-20-29-24-797.png, > image-2018-07-24-20-30-24-552.png > > > For columnar file, such as, when spark sql read the table, each split will be > 128 MB by default since spark.sql.files.maxPartitionBytes is default to > 128MB. Even when user set it to a large value, such as 512MB, the task may > read only few MB or even hundreds of KB. Because the table (Parquet) may > consists of dozens of columns while the SQL only need few columns. And spark > will prune the unnecessary columns. > > In this case, spark DataSourceScanExec can enlarge maxPartitionBytes > adaptively. > For example, there is 40 columns , 20 are integer while another 20 are long. > When use query on an integer type column and an long type column, the > maxPartitionBytes should be 20 times larger. (20*4+20*8) / (4+8) = 20. > > With this optimization, the number of task will be smaller and the job will > run faster. More importantly, for a very large cluster (more the 10 thousand > nodes), it will relieve RM's schedule pressure. > > Here is the test > > The table named test2 has more than 40 columns and there are more than 5 TB > data each hour. > When we issue a very simple query > > {code:java} > select count(device_id) from test2 where date=20180708 and hour='23'{code} > > There are 72176 tasks and the duration of the job is 4.8 minutes > !image-2018-07-24-20-26-32-441.png! > > Most tasks last less than 1 second and read less than 1.5 MB data > !image-2018-07-24-20-28-06-269.png! > > After the optimization, there are only 1615 tasks and the job last only 30 > seconds. It almost 10 times faster. > !image-2018-07-24-20-29-24-797.png! > > The median of read data is 44.2MB. > !image-2018-07-24-20-30-24-552.png! > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24923) DataSourceV2: Add CTAS and RTAS logical operations
[ https://issues.apache.org/jira/browse/SPARK-24923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556383#comment-16556383 ] Apache Spark commented on SPARK-24923: -- User 'rdblue' has created a pull request for this issue: https://github.com/apache/spark/pull/21877 > DataSourceV2: Add CTAS and RTAS logical operations > -- > > Key: SPARK-24923 > URL: https://issues.apache.org/jira/browse/SPARK-24923 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.3.0, 2.3.1 >Reporter: Ryan Blue >Priority: Major > > When SPARK-24252 and SPARK-24251 are in, next plans to implement from the > SPIP are CTAS and RTAS. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-24923) DataSourceV2: Add CTAS and RTAS logical operations
[ https://issues.apache.org/jira/browse/SPARK-24923?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-24923: Assignee: (was: Apache Spark) > DataSourceV2: Add CTAS and RTAS logical operations > -- > > Key: SPARK-24923 > URL: https://issues.apache.org/jira/browse/SPARK-24923 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.3.0, 2.3.1 >Reporter: Ryan Blue >Priority: Major > > When SPARK-24252 and SPARK-24251 are in, next plans to implement from the > SPIP are CTAS and RTAS. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-24923) DataSourceV2: Add CTAS and RTAS logical operations
[ https://issues.apache.org/jira/browse/SPARK-24923?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-24923: Assignee: Apache Spark > DataSourceV2: Add CTAS and RTAS logical operations > -- > > Key: SPARK-24923 > URL: https://issues.apache.org/jira/browse/SPARK-24923 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.3.0, 2.3.1 >Reporter: Ryan Blue >Assignee: Apache Spark >Priority: Major > > When SPARK-24252 and SPARK-24251 are in, next plans to implement from the > SPIP are CTAS and RTAS. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-24921) SparkStreaming steadily increasing job generation delay due to apparent URLClassLoader contention
[ https://issues.apache.org/jira/browse/SPARK-24921?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tommy S updated SPARK-24921: Component/s: Web UI > SparkStreaming steadily increasing job generation delay due to apparent > URLClassLoader contention > - > > Key: SPARK-24921 > URL: https://issues.apache.org/jira/browse/SPARK-24921 > Project: Spark > Issue Type: Bug > Components: Spark Core, Web UI >Affects Versions: 2.3.1 >Reporter: Tommy S >Priority: Major > > I'm seeing an issue where the job generation time of my spark streaming job > is steadily increasing after some time. > Looking at the thread dumps I see that the JobGenerator thread is BLOCKED > waiting for URLClassPath.getLoader synchronized method: > {noformat} > "JobGenerator" #153 daemon prio=5 os_prio=0 tid=0x02dad800 nid=0x253c > waiting for monitor entry [0x7f4b311c2000] > java.lang.Thread.State: BLOCKED (on object monitor) > at sun.misc.URLClassPath.getNextLoader(URLClassPath.java:469) > - waiting to lock <0x7f4be023f940> (a sun.misc.URLClassPath) > at sun.misc.URLClassPath.findResource(URLClassPath.java:214) > at java.net.URLClassLoader$2.run(URLClassLoader.java:569) > at java.net.URLClassLoader$2.run(URLClassLoader.java:567) > at java.security.AccessController.doPrivileged(Native Method) > at java.net.URLClassLoader.findResource(URLClassLoader.java:566) > at java.lang.ClassLoader.getResource(ClassLoader.java:1096) > at java.lang.ClassLoader.getResource(ClassLoader.java:1091) > at > java.net.URLClassLoader.getResourceAsStream(URLClassLoader.java:232) > at java.lang.Class.getResourceAsStream(Class.java:2223) > at > org.apache.spark.util.ClosureCleaner$.getClassReader(ClosureCleaner.scala:40) > at > org.apache.spark.util.ClosureCleaner$.getInnerClosureClasses(ClosureCleaner.scala:84) > at > org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:224) > at > org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159) > at org.apache.spark.SparkContext.clean(SparkContext.scala:2299) > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKeyWithClassTag$1.apply(PairRDDFunctions.scala:89) > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKeyWithClassTag$1.apply(PairRDDFunctions.scala:77) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:363) > at > org.apache.spark.rdd.PairRDDFunctions.combineByKeyWithClassTag(PairRDDFunctions.scala:77) > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKey$1.apply(PairRDDFunctions.scala:119) > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKey$1.apply(PairRDDFunctions.scala:119) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:363) > at > org.apache.spark.rdd.PairRDDFunctions.combineByKey(PairRDDFunctions.scala:117) > at > org.apache.spark.streaming.dstream.ShuffledDStream.compute(ShuffledDStream.scala:42) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342) > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341) > at > org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334) > at scala.Option.orElse(Option.scala:289) > at > org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331) > at > org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:36) > at >
[jira] [Created] (SPARK-24923) DataSourceV2: Add CTAS and RTAS logical operations
Ryan Blue created SPARK-24923: - Summary: DataSourceV2: Add CTAS and RTAS logical operations Key: SPARK-24923 URL: https://issues.apache.org/jira/browse/SPARK-24923 Project: Spark Issue Type: Sub-task Components: SQL Affects Versions: 2.3.1, 2.3.0 Reporter: Ryan Blue When SPARK-24252 and SPARK-24251 are in, next plans to implement from the SPIP are CTAS and RTAS. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24802) Optimization Rule Exclusion
[ https://issues.apache.org/jira/browse/SPARK-24802?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556310#comment-16556310 ] Apache Spark commented on SPARK-24802: -- User 'maryannxue' has created a pull request for this issue: https://github.com/apache/spark/pull/21876 > Optimization Rule Exclusion > --- > > Key: SPARK-24802 > URL: https://issues.apache.org/jira/browse/SPARK-24802 > Project: Spark > Issue Type: New Feature > Components: SQL >Affects Versions: 2.3.0 >Reporter: Maryann Xue >Assignee: Maryann Xue >Priority: Major > Fix For: 2.4.0 > > > Since Spark has provided fairly clear interfaces for adding user-defined > optimization rules, it would be nice to have an easy-to-use interface for > excluding an optimization rule from the Spark query optimizer as well. > This would make customizing Spark optimizer easier and sometimes could > debugging issues too. > # Add a new config {{spark.sql.optimizer.excludedRules}}, with the value > being a list of rule names separated by comma. > # Modify the current {{batches}} method to remove the excluded rules from > the default batches. Log the rules that have been excluded. > # Split the existing default batches into "post-analysis batches" and > "optimization batches" so that only rules in the "optimization batches" can > be excluded. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-1137) ZK Persistence Engine crashes if stored data has wrong serialVersionUID
[ https://issues.apache.org/jira/browse/SPARK-1137?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556251#comment-16556251 ] Apache Spark commented on SPARK-1137: - User 'aarondav' has created a pull request for this issue: https://github.com/apache/spark/pull/4 > ZK Persistence Engine crashes if stored data has wrong serialVersionUID > --- > > Key: SPARK-1137 > URL: https://issues.apache.org/jira/browse/SPARK-1137 > Project: Spark > Issue Type: Bug > Components: Deploy >Affects Versions: 0.9.0 >Reporter: Aaron Davidson >Assignee: Aaron Davidson >Priority: Major > Fix For: 1.0.0 > > > The ZooKeeperPersistenceEngine contains information about concurrently > existing Masters and Workers. This information, as the name suggests, is > persistent in the event of a Master failure/restart. If the Spark version is > upgraded, the Master will crash with a Java serialization exception when > trying to re-read the persisted data. > Instead of crashing (indefinitely), the Master should probably just ignore > the prior data. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24874) Allow hybrid of both barrier tasks and regular tasks in a stage
[ https://issues.apache.org/jira/browse/SPARK-24874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556241#comment-16556241 ] Reynold Xin commented on SPARK-24874: - Do we really need this? Seems like an uncommon use case. > Allow hybrid of both barrier tasks and regular tasks in a stage > --- > > Key: SPARK-24874 > URL: https://issues.apache.org/jira/browse/SPARK-24874 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 3.0.0 >Reporter: Jiang Xingbo >Priority: Major > > Currently we only allow barrier tasks in a barrier stage, however, consider > the following query: > {code} > sc = new SparkContext(conf) > val rdd1 = sc.parallelize(1 to 100, 10) > val rdd2 = sc.parallelize(1 to 1000, 20).barrier().mapPartitions((it, ctx) => > it) > val rdd = rdd1.union(rdd2).mapPartitions(t => t) > {code} > Now it requires 30 free slots to run `rdd.collect()`. Actually, we can launch > regular tasks to collect data from rdd1's partitions, they are not required > to be launched together. If we can do that, we only need 20 free slots to run > `rdd.collect()`. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-24860) Expose dynamic partition overwrite per write operation
[ https://issues.apache.org/jira/browse/SPARK-24860?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xiao Li resolved SPARK-24860. - Resolution: Fixed Assignee: Koert Kuipers Fix Version/s: 2.4.0 > Expose dynamic partition overwrite per write operation > -- > > Key: SPARK-24860 > URL: https://issues.apache.org/jira/browse/SPARK-24860 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.1 >Reporter: koert kuipers >Assignee: Koert Kuipers >Priority: Minor > Fix For: 2.4.0 > > > This is a follow up to issue SPARK-20236 > Also see the discussion in pullreq https://github.com/apache/spark/pull/18714 > SPARK-20236 added a global setting spark.sql.sources.partitionOverwriteMode > to switch between static and dynamic overwrite of partitioned tables. It > would be nice if we could choose per partitioned overwrite operation whether > it's behavior is static or dynamic. The suggested syntax is: > {noformat} > df.write.option("partitionOverwriteMode", "dynamic").parquet...{noformat} > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-23146) Support client mode for Kubernetes cluster backend
[ https://issues.apache.org/jira/browse/SPARK-23146?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matt Cheah resolved SPARK-23146. Resolution: Fixed Fix Version/s: 2.4.0 > Support client mode for Kubernetes cluster backend > -- > > Key: SPARK-23146 > URL: https://issues.apache.org/jira/browse/SPARK-23146 > Project: Spark > Issue Type: New Feature > Components: Kubernetes >Affects Versions: 2.3.0 >Reporter: Anirudh Ramanathan >Priority: Major > Fix For: 2.4.0 > > > This issue tracks client mode support within Spark when running in the > Kubernetes cluster backend. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24915) Calling SparkSession.createDataFrame with schema can throw exception
[ https://issues.apache.org/jira/browse/SPARK-24915?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556126#comment-16556126 ] Bryan Cutler commented on SPARK-24915: -- Hi [~stspencer], I've been trying fix similar issues, but this is a little different since the StructType makes _needSerializeAnyField==True, as you pointed out. I agree that the current behavior is very confusing and should be fixed, but the related issue had to be pushed back to Spark 3.0 because it causes a behavior change. Hopefully we can improve both these issues. Until then, if you're not aware the intended way to define Row data if you care about a specific positioning is like this: {code:java} In [10]: MyRow = Row("field2", "field1") In [11]: data = [ ...: MyRow(Row(sub_field='world'), "hello") ...: ] In [12]: df = spark.createDataFrame(data, schema=schema) In [13]: df.show() +---+--+ | field2|field1| +---+--+ |[world]| hello| +---+--+{code} hope that helps > Calling SparkSession.createDataFrame with schema can throw exception > > > Key: SPARK-24915 > URL: https://issues.apache.org/jira/browse/SPARK-24915 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.3.1 > Environment: Python 3.6.3 > PySpark 2.3.1 (installed via pip) > OSX 10.12.6 >Reporter: Stephen Spencer >Priority: Major > > There seems to be a bug in PySpark when using the PySparkSQL session to > create a dataframe with a pre-defined schema. > Code to reproduce the error: > {code:java} > from pyspark import SparkConf, SparkContext > from pyspark.sql import SparkSession > from pyspark.sql.types import StructType, StructField, StringType, Row > conf = SparkConf().setMaster("local").setAppName("repro") > context = SparkContext(conf=conf) > session = SparkSession(context) > # Construct schema (the order of fields is important) > schema = StructType([ > StructField('field2', StructType([StructField('sub_field', StringType(), > False)]), False), > StructField('field1', StringType(), False), > ]) > # Create data to populate data frame > data = [ > Row(field1="Hello", field2=Row(sub_field='world')) > ] > # Attempt to create the data frame supplying the schema > # this will throw a ValueError > df = session.createDataFrame(data, schema=schema) > df.show(){code} > Running this throws a ValueError > {noformat} > Traceback (most recent call last): > File "schema_bug.py", line 18, in > df = session.createDataFrame(data, schema=schema) > File > "/Users/stephenspencer/benevolent/ai/neat/rex/.env/lib/python3.6/site-packages/pyspark/sql/session.py", > line 691, in createDataFrame > rdd, schema = self._createFromLocal(map(prepare, data), schema) > File > "/Users/stephenspencer/benevolent/ai/neat/rex/.env/lib/python3.6/site-packages/pyspark/sql/session.py", > line 423, in _createFromLocal > data = [schema.toInternal(row) for row in data] > File > "/Users/stephenspencer/benevolent/ai/neat/rex/.env/lib/python3.6/site-packages/pyspark/sql/session.py", > line 423, in > data = [schema.toInternal(row) for row in data] > File > "/Users/stephenspencer/benevolent/ai/neat/rex/.env/lib/python3.6/site-packages/pyspark/sql/types.py", > line 601, in toInternal > for f, v, c in zip(self.fields, obj, self._needConversion)) > File > "/Users/stephenspencer/benevolent/ai/neat/rex/.env/lib/python3.6/site-packages/pyspark/sql/types.py", > line 601, in > for f, v, c in zip(self.fields, obj, self._needConversion)) > File > "/Users/stephenspencer/benevolent/ai/neat/rex/.env/lib/python3.6/site-packages/pyspark/sql/types.py", > line 439, in toInternal > return self.dataType.toInternal(obj) > File > "/Users/stephenspencer/benevolent/ai/neat/rex/.env/lib/python3.6/site-packages/pyspark/sql/types.py", > line 619, in toInternal > raise ValueError("Unexpected tuple %r with StructType" % obj) > ValueError: Unexpected tuple 'Hello' with StructType{noformat} > The problem seems to be here: > https://github.com/apache/spark/blob/3d5c61e5fd24f07302e39b5d61294da79aa0c2f9/python/pyspark/sql/types.py#L603 > specifically the bit > {code:java} > zip(self.fields, obj, self._needConversion) > {code} > This zip statement seems to assume that obj and self.fields are ordered in > the same way, so that the elements of obj will correspond to the right fields > in the schema. However this is not true, a Row orders its elements > alphabetically but the fields in the schema are in whatever order they are > specified. In this example field2 is being initialised with the field1 > element 'Hello'. If you re-order the fields in the schema to go (field1, > field2), the given example works without error. > The schema in the repro is specifically designed to elicit the problem, the > fields are out of
[jira] [Commented] (SPARK-24288) Enable preventing predicate pushdown
[ https://issues.apache.org/jira/browse/SPARK-24288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556119#comment-16556119 ] Apache Spark commented on SPARK-24288: -- User 'maryannxue' has created a pull request for this issue: https://github.com/apache/spark/pull/21875 > Enable preventing predicate pushdown > > > Key: SPARK-24288 > URL: https://issues.apache.org/jira/browse/SPARK-24288 > Project: Spark > Issue Type: New Feature > Components: SQL >Affects Versions: 2.3.0 >Reporter: Tomasz Gawęda >Priority: Major > Attachments: SPARK-24288.simple.patch > > > Issue discussed on Mailing List: > [http://apache-spark-developers-list.1001551.n3.nabble.com/Preventing-predicate-pushdown-td23976.html] > While working with JDBC datasource I saw that many "or" clauses with > non-equality operators causes huge performance degradation of SQL query > to database (DB2). For example: > val df = spark.read.format("jdbc").(other options to parallelize > load).load() > df.where(s"(date1 > $param1 and (date1 < $param2 or date1 is null) or x > > 100)").show() // in real application whose predicates were pushed > many many lines below, many ANDs and ORs > If I use cache() before where, there is no predicate pushdown of this > "where" clause. However, in production system caching many sources is a > waste of memory (especially is pipeline is long and I must do cache many > times).There are also few more workarounds, but it would be great if Spark > will support preventing predicate pushdown by user. > > For example: df.withAnalysisBarrier().where(...) ? > > Note, that this should not be a global configuration option. If I read 2 > DataFrames, df1 and df2, I would like to specify that df1 should not have > some predicates pushed down, but some may be, but df2 should have all > predicates pushed down, even if target query joins df1 and df2. As far as I > understand Spark optimizer, if we use functions like `withAnalysisBarrier` > and put AnalysisBarrier explicitly in logical plan, then predicates won't be > pushed down on this particular DataFrames and PP will be still possible on > the second one. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23146) Support client mode for Kubernetes cluster backend
[ https://issues.apache.org/jira/browse/SPARK-23146?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556098#comment-16556098 ] Apache Spark commented on SPARK-23146: -- User 'mccheah' has created a pull request for this issue: https://github.com/apache/spark/pull/21874 > Support client mode for Kubernetes cluster backend > -- > > Key: SPARK-23146 > URL: https://issues.apache.org/jira/browse/SPARK-23146 > Project: Spark > Issue Type: New Feature > Components: Kubernetes >Affects Versions: 2.3.0 >Reporter: Anirudh Ramanathan >Priority: Major > > This issue tracks client mode support within Spark when running in the > Kubernetes cluster backend. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-24849) Convert StructType to DDL string
[ https://issues.apache.org/jira/browse/SPARK-24849?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xiao Li resolved SPARK-24849. - Resolution: Fixed Assignee: Maxim Gekk Fix Version/s: 2.4.0 > Convert StructType to DDL string > > > Key: SPARK-24849 > URL: https://issues.apache.org/jira/browse/SPARK-24849 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.1 >Reporter: Maxim Gekk >Assignee: Maxim Gekk >Priority: Minor > Fix For: 2.4.0 > > > Need to add new methods which should convert a value of StructType to a > schema in DDL format . It should be possible to use the former string in new > table creation by just copy-pasting of new method results. The existing > methods simpleString(), catalogString() and sql() put ':' between top level > field name and its type, and wrap by the *struct* word > {code} > ds.schema.catalogString > struct {code} > Output of new method should be > {code} > metaData struct {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-24911) SHOW CREATE TABLE drops escaping of nested column names
[ https://issues.apache.org/jira/browse/SPARK-24911?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xiao Li resolved SPARK-24911. - Resolution: Fixed Fix Version/s: 2.4.0 > SHOW CREATE TABLE drops escaping of nested column names > --- > > Key: SPARK-24911 > URL: https://issues.apache.org/jira/browse/SPARK-24911 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.1 >Reporter: Maxim Gekk >Assignee: Maxim Gekk >Priority: Major > Fix For: 2.4.0 > > > Create a table with quoted nested column - *`b`*: > {code:sql} > create table `test` (`a` STRUCT<`b`:STRING>); > {code} > and show how the table was created: > {code:sql} > SHOW CREATE TABLE `test` > {code} > {code} > CREATE TABLE `test`(`a` struct) > {code} > The column *b* becomes unquoted. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-24911) SHOW CREATE TABLE drops escaping of nested column names
[ https://issues.apache.org/jira/browse/SPARK-24911?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xiao Li reassigned SPARK-24911: --- Assignee: Maxim Gekk > SHOW CREATE TABLE drops escaping of nested column names > --- > > Key: SPARK-24911 > URL: https://issues.apache.org/jira/browse/SPARK-24911 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.1 >Reporter: Maxim Gekk >Assignee: Maxim Gekk >Priority: Major > Fix For: 2.4.0 > > > Create a table with quoted nested column - *`b`*: > {code:sql} > create table `test` (`a` STRUCT<`b`:STRING>); > {code} > and show how the table was created: > {code:sql} > SHOW CREATE TABLE `test` > {code} > {code} > CREATE TABLE `test`(`a` struct) > {code} > The column *b* becomes unquoted. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-24922) Iterative rdd union + reduceByKey operations on small dataset leads to "No space left on device" error on account of lot of shuffle spill.
[ https://issues.apache.org/jira/browse/SPARK-24922?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dinesh Dharme updated SPARK-24922: -- Description: I am trying to do few (union + reduceByKey) operations on a hiearchical dataset in a iterative fashion in rdd. The first few loops run fine but on the subsequent loops, the operations ends up using the whole scratch space provided to it. I have set the scratch directory, i.e. SPARK_LOCAL_DIRS , to be one having *100 GB* space. The heirarchical dataset, whose size is (< 400kB), remains constant throughout the iterations. I have tried the worker cleanup flag but it has no effect i.e. "spark.worker.cleanup.enabled=true" Error : {noformat} Caused by: java.io.IOException: No space left on device at java.io.FileOutputStream.writeBytes(Native Method) at java.io.FileOutputStream.write(FileOutputStream.java:326) at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126) at java.io.DataOutputStream.writeLong(DataOutputStream.java:224) at org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFileAndCommit$1$$anonfun$apply$mcV$sp$1.apply$mcVJ$sp(IndexShuffleBlockResolver.scala:151) at org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFileAndCommit$1$$anonfun$apply$mcV$sp$1.apply(IndexShuffleBlockResolver.scala:149) at org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFileAndCommit$1$$anonfun$apply$mcV$sp$1.apply(IndexShuffleBlockResolver.scala:149) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofLong.foreach(ArrayOps.scala:246) at org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFileAndCommit$1.apply$mcV$sp(IndexShuffleBlockResolver.scala:149) at org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFileAndCommit$1.apply(IndexShuffleBlockResolver.scala:145) at org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFileAndCommit$1.apply(IndexShuffleBlockResolver.scala:145) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377) at org.apache.spark.shuffle.IndexShuffleBlockResolver.writeIndexFileAndCommit(IndexShuffleBlockResolver.scala:153) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:73) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) {noformat} *What I am trying to do (High Level)*: I have a dataset of 5 different csv ( Parent, Child1, Child2, Child21, Child22 ) which are related in a hierarchical fashion as shown below. Parent-> Child1 -> Child2 -> Child21 Parent-> Child1 -> Child2 -> Child22 Each element in the tree has 14 columns (elementid, parentelement_id, cat1, cat2, num1, num2,., num10) I am trying to aggregate the values of one column of Child21 into Child1 (i.e. 2 levels up). I am doing the same for another column value of Child22 into Child1. Then I am merging these aggregated values at the same Child1 level. This is present in the code at location : spark.rddexample.dummyrdd.tree.child1.events.Function1 *Code which replicates the issue*: 1] [https://github.com/dineshdharme/SparkRddShuffleIssue] *Steps to reproduce the issue :* 1] Clone the above repository. 2] Put the csvs in the "issue-data" folder in the above repository at a hadoop location "hdfs:///tree/dummy/data/" 3] Set the spark scratch directory (SPARK_LOCAL_DIRS) to a folder which has large space. (> *100 GB*) 4] Run "sbt assembly" 5] Run the following command at the project location :( /path/to/spark-2.3.0-bin-hadoop2.7/bin/spark-submit \ --class spark.rddexample.dummyrdd.FunctionExecutor \ --master local[2] \ --deploy-mode client \ --executor-memory 2G \ --driver-memory 2G \ target/scala-2.11/rdd-shuffle-assembly-0.1.0.jar \ 20 \ hdfs:///tree/dummy/data/ \ hdfs:///tree/dummy/results/ was: I am trying to do few (union + reduceByKey) operations on a hiearchical dataset in a iterative fashion in rdd. The first few loops run fine but on the subsequent loops, the operations ends up using the whole scratch space provided to it. I have set the scratch directory, i.e. SPARK_LOCAL_DIRS , to be one having *100 GB* space. The heirarchical dataset, whose size is (< 400kB), remains constant throughout the iterations. Error : {noformat} Caused by: java.io.IOException: No space left on
[jira] [Created] (SPARK-24922) Iterative rdd union + reduceByKey operations on small dataset leads to "No space left on device" error on account of lot of shuffle spill.
Dinesh Dharme created SPARK-24922: - Summary: Iterative rdd union + reduceByKey operations on small dataset leads to "No space left on device" error on account of lot of shuffle spill. Key: SPARK-24922 URL: https://issues.apache.org/jira/browse/SPARK-24922 Project: Spark Issue Type: Bug Components: Shuffle Affects Versions: 2.3.0 Environment: Java 8, Scala 2.11.8, Spark 2.3.0, sbt 0.13.16 Reporter: Dinesh Dharme I am trying to do few (union + reduceByKey) operations on a hiearchical dataset in a iterative fashion in rdd. The first few loops run fine but on the subsequent loops, the operations ends up using the whole scratch space provided to it. I have set the scratch directory, i.e. SPARK_LOCAL_DIRS , to be one having *100 GB* space. The heirarchical dataset, whose size is (< 400kB), remains constant throughout the iterations. Error : {noformat} Caused by: java.io.IOException: No space left on device at java.io.FileOutputStream.writeBytes(Native Method) at java.io.FileOutputStream.write(FileOutputStream.java:326) at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126) at java.io.DataOutputStream.writeLong(DataOutputStream.java:224) at org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFileAndCommit$1$$anonfun$apply$mcV$sp$1.apply$mcVJ$sp(IndexShuffleBlockResolver.scala:151) at org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFileAndCommit$1$$anonfun$apply$mcV$sp$1.apply(IndexShuffleBlockResolver.scala:149) at org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFileAndCommit$1$$anonfun$apply$mcV$sp$1.apply(IndexShuffleBlockResolver.scala:149) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofLong.foreach(ArrayOps.scala:246) at org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFileAndCommit$1.apply$mcV$sp(IndexShuffleBlockResolver.scala:149) at org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFileAndCommit$1.apply(IndexShuffleBlockResolver.scala:145) at org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFileAndCommit$1.apply(IndexShuffleBlockResolver.scala:145) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377) at org.apache.spark.shuffle.IndexShuffleBlockResolver.writeIndexFileAndCommit(IndexShuffleBlockResolver.scala:153) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:73) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) {noformat} *What I am trying to do (High Level)*: I have a dataset of 5 different csv ( Parent, Child1, Child2, Child21, Child22 ) which are related in a hierarchical fashion as shown below. Parent-> Child1 -> Child2 -> Child21 Parent-> Child1 -> Child2 -> Child22 Each element in the tree has 14 columns (elementid, parentelement_id, cat1, cat2, num1, num2,., num10) I am trying to aggregate the values of one column of Child21 into Child1 (i.e. 2 levels up). I am doing the same for another column value of Child22 into Child1. Then I am merging these aggregated values at the same Child1 level. This is present in the code at location : spark.rddexample.dummyrdd.tree.child1.events.Function1 *Code which replicates the issue*: 1] [https://github.com/dineshdharme/SparkRddShuffleIssue] *Steps to reproduce the issue :* 1] Clone the above repository. 2] Put the csvs in the "issue-data" folder in the above repository at a hadoop location "hdfs:///tree/dummy/data/" 3] Set the spark scratch directory (SPARK_LOCAL_DIRS) to a folder which has large space. (> *100 GB*) 4] Run "sbt assembly" 5] Run the following command at the project location :( /path/to/spark-2.3.0-bin-hadoop2.7/bin/spark-submit \ --class spark.rddexample.dummyrdd.FunctionExecutor \ --master local[2] \ --deploy-mode client \ --executor-memory 2G \ --driver-memory 2G \ target/scala-2.11/rdd-shuffle-assembly-0.1.0.jar \ 20 \ hdfs:///tree/dummy/data/ \ hdfs:///tree/dummy/results/ -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-24921) SparkStreaming steadily increasing job generation delay due to apparent URLClassLoader contention
Tommy S created SPARK-24921: --- Summary: SparkStreaming steadily increasing job generation delay due to apparent URLClassLoader contention Key: SPARK-24921 URL: https://issues.apache.org/jira/browse/SPARK-24921 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 2.3.1 Reporter: Tommy S I'm seeing an issue where the job generation time of my spark streaming job is steadily increasing after some time. Looking at the thread dumps I see that the JobGenerator thread is BLOCKED waiting for URLClassPath.getLoader synchronized method: {noformat} "JobGenerator" #153 daemon prio=5 os_prio=0 tid=0x02dad800 nid=0x253c waiting for monitor entry [0x7f4b311c2000] java.lang.Thread.State: BLOCKED (on object monitor) at sun.misc.URLClassPath.getNextLoader(URLClassPath.java:469) - waiting to lock <0x7f4be023f940> (a sun.misc.URLClassPath) at sun.misc.URLClassPath.findResource(URLClassPath.java:214) at java.net.URLClassLoader$2.run(URLClassLoader.java:569) at java.net.URLClassLoader$2.run(URLClassLoader.java:567) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findResource(URLClassLoader.java:566) at java.lang.ClassLoader.getResource(ClassLoader.java:1096) at java.lang.ClassLoader.getResource(ClassLoader.java:1091) at java.net.URLClassLoader.getResourceAsStream(URLClassLoader.java:232) at java.lang.Class.getResourceAsStream(Class.java:2223) at org.apache.spark.util.ClosureCleaner$.getClassReader(ClosureCleaner.scala:40) at org.apache.spark.util.ClosureCleaner$.getInnerClosureClasses(ClosureCleaner.scala:84) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:224) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159) at org.apache.spark.SparkContext.clean(SparkContext.scala:2299) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKeyWithClassTag$1.apply(PairRDDFunctions.scala:89) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKeyWithClassTag$1.apply(PairRDDFunctions.scala:77) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:363) at org.apache.spark.rdd.PairRDDFunctions.combineByKeyWithClassTag(PairRDDFunctions.scala:77) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKey$1.apply(PairRDDFunctions.scala:119) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKey$1.apply(PairRDDFunctions.scala:119) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:363) at org.apache.spark.rdd.PairRDDFunctions.combineByKey(PairRDDFunctions.scala:117) at org.apache.spark.streaming.dstream.ShuffledDStream.compute(ShuffledDStream.scala:42) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341) at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334) at scala.Option.orElse(Option.scala:289) at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331) at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:36) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341) at
[jira] [Commented] (SPARK-24914) totalSize is not a good estimate for broadcast joins
[ https://issues.apache.org/jira/browse/SPARK-24914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16555998#comment-16555998 ] Bruce Robbins commented on SPARK-24914: --- [~irashid] {quote} given HIVE-20079, can we also have a conf to just ignore rawDataSize? {quote} That makes sense. I've updated the description. > totalSize is not a good estimate for broadcast joins > > > Key: SPARK-24914 > URL: https://issues.apache.org/jira/browse/SPARK-24914 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0 >Reporter: Bruce Robbins >Priority: Major > > When determining whether to do a broadcast join, Spark estimates the size of > the smaller table as follows: > - if totalSize is defined and greater than 0, use it. > - else, if rawDataSize is defined and greater than 0, use it > - else, use spark.sql.defaultSizeInBytes (default: Long.MaxValue) > Therefore, Spark prefers totalSize over rawDataSize. > Unfortunately, totalSize is often quite a bit smaller than the actual table > size, since it represents the size of the table's files on disk. Parquet and > Orc files, for example, are encoded and compressed. This can result in the > JVM throwing an OutOfMemoryError while Spark is loading the table into a > HashedRelation, or when Spark actually attempts to broadcast the data. > On the other hand, rawDataSize represents the uncompressed size of the > dataset, according to Hive documentation. This seems like a pretty good > number to use in preference to totalSize. However, due to HIVE-20079, this > value is simply #columns * #rows. Once that bug is fixed, it may be a > superior statistic, at least for managed tables. > In the meantime, we could apply a configurable "fudge factor" to totalSize, > at least for types of files that are encoded and compressed. Hive has the > setting hive.stats.deserialization.factor, which defaults to 1.0, and is > described as follows: > {quote}in the absence of uncompressed/raw data size, total file size will be > used for statistics annotation. But the file may be compressed, encoded and > serialized which may be lesser in size than the actual uncompressed/raw data > size. This factor will be multiplied to file size to estimate the raw data > size. > {quote} > Also, I propose a configuration setting to allow the user to completely > ignore rawDataSize, since that value is broken (due to HIVE-20079). When that > configuration setting is set to true, Spark would instead estimate the table > as follows: > - if totalSize is defined and greater than 0, use totalSize*fudgeFactor. > - else, use spark.sql.defaultSizeInBytes (default: Long.MaxValue) > Caveat: This mitigates the issue only for Hive tables. It does not help much > when the user is reading files using {{spark.read.parquet}}, unless we apply > the same fudge factor there. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-24914) totalSize is not a good estimate for broadcast joins
[ https://issues.apache.org/jira/browse/SPARK-24914?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bruce Robbins updated SPARK-24914: -- Description: When determining whether to do a broadcast join, Spark estimates the size of the smaller table as follows: - if totalSize is defined and greater than 0, use it. - else, if rawDataSize is defined and greater than 0, use it - else, use spark.sql.defaultSizeInBytes (default: Long.MaxValue) Therefore, Spark prefers totalSize over rawDataSize. Unfortunately, totalSize is often quite a bit smaller than the actual table size, since it represents the size of the table's files on disk. Parquet and Orc files, for example, are encoded and compressed. This can result in the JVM throwing an OutOfMemoryError while Spark is loading the table into a HashedRelation, or when Spark actually attempts to broadcast the data. On the other hand, rawDataSize represents the uncompressed size of the dataset, according to Hive documentation. This seems like a pretty good number to use in preference to totalSize. However, due to HIVE-20079, this value is simply #columns * #rows. Once that bug is fixed, it may be a superior statistic, at least for managed tables. In the meantime, we could apply a configurable "fudge factor" to totalSize, at least for types of files that are encoded and compressed. Hive has the setting hive.stats.deserialization.factor, which defaults to 1.0, and is described as follows: {quote}in the absence of uncompressed/raw data size, total file size will be used for statistics annotation. But the file may be compressed, encoded and serialized which may be lesser in size than the actual uncompressed/raw data size. This factor will be multiplied to file size to estimate the raw data size. {quote} Also, I propose a configuration setting to allow the user to completely ignore rawDataSize, since that value is broken (due to HIVE-20079). When that configuration setting is set to true, Spark would instead estimate the table as follows: - if totalSize is defined and greater than 0, use totalSize*fudgeFactor. - else, use spark.sql.defaultSizeInBytes (default: Long.MaxValue) Caveat: This mitigates the issue only for Hive tables. It does not help much when the user is reading files using {{spark.read.parquet}}, unless we apply the same fudge factor there. was: When determining whether to do a broadcast join, Spark estimates the size of the smaller table as follows: - if totalSize is defined and greater than 0, use it. - else, if rawDataSize is defined and greater than 0, use it - else, use spark.sql.defaultSizeInBytes (default: Long.MaxValue) Therefore, Spark prefers totalSize over rawDataSize. Unfortunately, totalSize is often quite a bit smaller than the actual table size, since it represents the size of the table's files on disk. Parquet and Orc files, for example, are encoded and compressed. This can result in the JVM throwing an OutOfMemoryError while Spark is loading the table into a HashedRelation, or when Spark actually attempts to broadcast the data. On the other hand, rawDataSize represents the uncompressed size of the dataset, according to Hive documentation. This seems like a pretty good number to use in preference to totalSize. However, due to HIVE-20079, this value is simply #columns * #rows. Once that bug is fixed, it may be a superior statistic, at least for managed tables. In the meantime, we could apply a configurable "fudge factor" to totalSize, at least for types of files that are encoded and compressed. Hive has the setting hive.stats.deserialization.factor, which defaults to 1.0, and is described as follows: {quote}in the absence of uncompressed/raw data size, total file size will be used for statistics annotation. But the file may be compressed, encoded and serialized which may be lesser in size than the actual uncompressed/raw data size. This factor will be multiplied to file size to estimate the raw data size. {quote} In addition to the fudge factor, we could compare the adjusted totalSize to rawDataSize and use the bigger of the two. Caveat: This mitigates the issue only for Hive tables. It does not help much when the user is reading files using {{spark.read.parquet}}, unless we apply the same fudge factor there. > totalSize is not a good estimate for broadcast joins > > > Key: SPARK-24914 > URL: https://issues.apache.org/jira/browse/SPARK-24914 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0 >Reporter: Bruce Robbins >Priority: Major > > When determining whether to do a broadcast join, Spark estimates the size of > the smaller table as follows: > - if totalSize is defined and greater than 0, use it. > - else, if rawDataSize is defined and
[jira] [Created] (SPARK-24920) Spark should share netty's memory pools across all uses
Imran Rashid created SPARK-24920: Summary: Spark should share netty's memory pools across all uses Key: SPARK-24920 URL: https://issues.apache.org/jira/browse/SPARK-24920 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 2.4.0 Reporter: Imran Rashid Spark currently creates separate netty memory pools for each of the following "services": 1) RPC Client 2) RPC Server 3) BlockTransfer Client 4) BlockTransfer Server 5) ExternalShuffle Client Depending on configuration and whether its an executor or driver JVM, different of these are active, but its always either 3 or 4. Having them independent somewhat defeats the purpose of using pools at all. In my experiments I've found each pool will grow due to a burst of activity in the related service (eg. task start / end msgs), followed another burst in a different service (eg. sending torrent broadcast blocks). Because of the way these pools work, they allocate memory in large chunks (16 MB by default) for each netty thread, so there is often a surge of 128 MB of allocated memory, even for really tiny messages. Also a lot of this memory is offheap by default, which makes it even tougher for users to manage. I think it would make more sense to combine all of these into a single pool. In some experiments I tried, this noticeably decreased memory usage, both onheap and offheap (no significant performance effect in my small experiments). As this is a pretty core change, as I first step I'd propose just exposing this as a conf, to let user experiment more broadly across a wider range of workloads -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-24919) Scala linter rule for sparkContext.hadoopConfiguration
Gengliang Wang created SPARK-24919: -- Summary: Scala linter rule for sparkContext.hadoopConfiguration Key: SPARK-24919 URL: https://issues.apache.org/jira/browse/SPARK-24919 Project: Spark Issue Type: Bug Components: Build Affects Versions: 2.4.0 Reporter: Gengliang Wang In most cases, we should use spark.sessionState.newHadoopConf() instead of sparkContext.hadoopConfiguration, so that the hadoop configurations specified in Spark session configuration will come into effect. Add a rule matching spark.sparkContext.hadoopConfiguration or spark.sqlContext.sparkContext.hadoopConfiguration to prevent the usage. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-24919) Scala linter rule for sparkContext.hadoopConfiguration
[ https://issues.apache.org/jira/browse/SPARK-24919?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-24919: Assignee: (was: Apache Spark) > Scala linter rule for sparkContext.hadoopConfiguration > -- > > Key: SPARK-24919 > URL: https://issues.apache.org/jira/browse/SPARK-24919 > Project: Spark > Issue Type: Bug > Components: Build >Affects Versions: 2.4.0 >Reporter: Gengliang Wang >Priority: Major > > In most cases, we should use spark.sessionState.newHadoopConf() instead of > sparkContext.hadoopConfiguration, so that the hadoop configurations specified > in Spark session > configuration will come into effect. > Add a rule matching spark.sparkContext.hadoopConfiguration or > spark.sqlContext.sparkContext.hadoopConfiguration to prevent the usage. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24918) Executor Plugin API
[ https://issues.apache.org/jira/browse/SPARK-24918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16555939#comment-16555939 ] Imran Rashid commented on SPARK-24918: -- [~jerryshao] [~tgraves] you might be interested in this -- I feel like this has come up in past discussions (though I couldn't find any jiras about it). > Executor Plugin API > --- > > Key: SPARK-24918 > URL: https://issues.apache.org/jira/browse/SPARK-24918 > Project: Spark > Issue Type: New Feature > Components: Spark Core >Affects Versions: 2.4.0 >Reporter: Imran Rashid >Priority: Major > Labels: memory-analysis > > It would be nice if we could specify an arbitrary class to run within each > executor for debugging and instrumentation. Its hard to do this currently > because: > a) you have no idea when executors will come and go with DynamicAllocation, > so don't have a chance to run custom code before the first task > b) even with static allocation, you'd have to change the code of your spark > app itself to run a special task to "install" the plugin, which is often > tough in production cases when those maintaining regularly running > applications might not even know how to make changes to the application. > For example, https://github.com/squito/spark-memory could be used in a > debugging context to understand memory use, just by re-running an application > with extra command line arguments (as opposed to rebuilding spark). > I think one tricky part here is just deciding the api, and how its versioned. > Does it just get created when the executor starts, and thats it? Or does it > get more specific events, like task start, task end, etc? Would we ever add > more events? It should definitely be a {{DeveloperApi}}, so breaking > compatibility would be allowed ... but still should be avoided. We could > create a base class that has no-op implementations, or explicitly version > everything. > Note that this is not needed in the driver as we already have SparkListeners > (even if you don't care about the SparkListenerEvents and just want to > inspect objects in the JVM, its still good enough). -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-24918) Executor Plugin API
Imran Rashid created SPARK-24918: Summary: Executor Plugin API Key: SPARK-24918 URL: https://issues.apache.org/jira/browse/SPARK-24918 Project: Spark Issue Type: New Feature Components: Spark Core Affects Versions: 2.4.0 Reporter: Imran Rashid It would be nice if we could specify an arbitrary class to run within each executor for debugging and instrumentation. Its hard to do this currently because: a) you have no idea when executors will come and go with DynamicAllocation, so don't have a chance to run custom code before the first task b) even with static allocation, you'd have to change the code of your spark app itself to run a special task to "install" the plugin, which is often tough in production cases when those maintaining regularly running applications might not even know how to make changes to the application. For example, https://github.com/squito/spark-memory could be used in a debugging context to understand memory use, just by re-running an application with extra command line arguments (as opposed to rebuilding spark). I think one tricky part here is just deciding the api, and how its versioned. Does it just get created when the executor starts, and thats it? Or does it get more specific events, like task start, task end, etc? Would we ever add more events? It should definitely be a {{DeveloperApi}}, so breaking compatibility would be allowed ... but still should be avoided. We could create a base class that has no-op implementations, or explicitly version everything. Note that this is not needed in the driver as we already have SparkListeners (even if you don't care about the SparkListenerEvents and just want to inspect objects in the JVM, its still good enough). -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24914) totalSize is not a good estimate for broadcast joins
[ https://issues.apache.org/jira/browse/SPARK-24914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16555877#comment-16555877 ] Imran Rashid commented on SPARK-24914: -- given HIVE-20079, can we also have a conf to just ignore rawDataSize? That estimate seems so bad you'd never want to use it. Unfortunately I don't think spark will be able to tell whether its a "good" or "bad" estimate, so we would have to leave it to the user to control. > totalSize is not a good estimate for broadcast joins > > > Key: SPARK-24914 > URL: https://issues.apache.org/jira/browse/SPARK-24914 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0 >Reporter: Bruce Robbins >Priority: Major > > When determining whether to do a broadcast join, Spark estimates the size of > the smaller table as follows: > - if totalSize is defined and greater than 0, use it. > - else, if rawDataSize is defined and greater than 0, use it > - else, use spark.sql.defaultSizeInBytes (default: Long.MaxValue) > Therefore, Spark prefers totalSize over rawDataSize. > Unfortunately, totalSize is often quite a bit smaller than the actual table > size, since it represents the size of the table's files on disk. Parquet and > Orc files, for example, are encoded and compressed. This can result in the > JVM throwing an OutOfMemoryError while Spark is loading the table into a > HashedRelation, or when Spark actually attempts to broadcast the data. > On the other hand, rawDataSize represents the uncompressed size of the > dataset, according to Hive documentation. This seems like a pretty good > number to use in preference to totalSize. However, due to HIVE-20079, this > value is simply #columns * #rows. Once that bug is fixed, it may be a > superior statistic, at least for managed tables. > In the meantime, we could apply a configurable "fudge factor" to totalSize, > at least for types of files that are encoded and compressed. Hive has the > setting hive.stats.deserialization.factor, which defaults to 1.0, and is > described as follows: > {quote}in the absence of uncompressed/raw data size, total file size will be > used for statistics annotation. But the file may be compressed, encoded and > serialized which may be lesser in size than the actual uncompressed/raw data > size. This factor will be multiplied to file size to estimate the raw data > size. > {quote} > In addition to the fudge factor, we could compare the adjusted totalSize to > rawDataSize and use the bigger of the two. > Caveat: This mitigates the issue only for Hive tables. It does not help much > when the user is reading files using {{spark.read.parquet}}, unless we apply > the same fudge factor there. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-24920) Spark should allow sharing netty's memory pools across all uses
[ https://issues.apache.org/jira/browse/SPARK-24920?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Imran Rashid updated SPARK-24920: - Summary: Spark should allow sharing netty's memory pools across all uses (was: Spark should share netty's memory pools across all uses) > Spark should allow sharing netty's memory pools across all uses > --- > > Key: SPARK-24920 > URL: https://issues.apache.org/jira/browse/SPARK-24920 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.4.0 >Reporter: Imran Rashid >Priority: Major > Labels: memory-analysis > > Spark currently creates separate netty memory pools for each of the following > "services": > 1) RPC Client > 2) RPC Server > 3) BlockTransfer Client > 4) BlockTransfer Server > 5) ExternalShuffle Client > Depending on configuration and whether its an executor or driver JVM, > different of these are active, but its always either 3 or 4. > Having them independent somewhat defeats the purpose of using pools at all. > In my experiments I've found each pool will grow due to a burst of activity > in the related service (eg. task start / end msgs), followed another burst in > a different service (eg. sending torrent broadcast blocks). Because of the > way these pools work, they allocate memory in large chunks (16 MB by default) > for each netty thread, so there is often a surge of 128 MB of allocated > memory, even for really tiny messages. Also a lot of this memory is offheap > by default, which makes it even tougher for users to manage. > I think it would make more sense to combine all of these into a single pool. > In some experiments I tried, this noticeably decreased memory usage, both > onheap and offheap (no significant performance effect in my small > experiments). > As this is a pretty core change, as I first step I'd propose just exposing > this as a conf, to let user experiment more broadly across a wider range of > workloads -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-24919) Scala linter rule for sparkContext.hadoopConfiguration
[ https://issues.apache.org/jira/browse/SPARK-24919?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-24919: Assignee: Apache Spark > Scala linter rule for sparkContext.hadoopConfiguration > -- > > Key: SPARK-24919 > URL: https://issues.apache.org/jira/browse/SPARK-24919 > Project: Spark > Issue Type: Bug > Components: Build >Affects Versions: 2.4.0 >Reporter: Gengliang Wang >Assignee: Apache Spark >Priority: Major > > In most cases, we should use spark.sessionState.newHadoopConf() instead of > sparkContext.hadoopConfiguration, so that the hadoop configurations specified > in Spark session > configuration will come into effect. > Add a rule matching spark.sparkContext.hadoopConfiguration or > spark.sqlContext.sparkContext.hadoopConfiguration to prevent the usage. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24919) Scala linter rule for sparkContext.hadoopConfiguration
[ https://issues.apache.org/jira/browse/SPARK-24919?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16555972#comment-16555972 ] Apache Spark commented on SPARK-24919: -- User 'gengliangwang' has created a pull request for this issue: https://github.com/apache/spark/pull/21873 > Scala linter rule for sparkContext.hadoopConfiguration > -- > > Key: SPARK-24919 > URL: https://issues.apache.org/jira/browse/SPARK-24919 > Project: Spark > Issue Type: Bug > Components: Build >Affects Versions: 2.4.0 >Reporter: Gengliang Wang >Priority: Major > > In most cases, we should use spark.sessionState.newHadoopConf() instead of > sparkContext.hadoopConfiguration, so that the hadoop configurations specified > in Spark session > configuration will come into effect. > Add a rule matching spark.sparkContext.hadoopConfiguration or > spark.sqlContext.sparkContext.hadoopConfiguration to prevent the usage. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-24904) Join with broadcasted dataframe causes shuffle of redundant data
[ https://issues.apache.org/jira/browse/SPARK-24904?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16555652#comment-16555652 ] Marco Gaido edited comment on SPARK-24904 at 7/25/18 1:28 PM: -- I see now what you mean, but yes, I think there is an assumption you are doing which is not always true, ie. "The output is (expected to be) very small compared to the big table". That is not true. If all the rows from the big table match the small one, this is not the case. We may trying to do something like what you mentioned in the optimizer if CBO is enabled and we have good enough statistics about the output size of the inner join, but i am not sure. was (Author: mgaido): I see now what you mean, but yes, It think there is an assumption you are doing which is not always true, ie. "The output is (expected to be) very small compared to the big table". That is not true. If all the rows from the big table match the small one, this is not the case. We may trying to do something like what you mentioned in the optimizer if CBO is enabled and we have good enough statistics about the output size of the inner join, but i am not sure. > Join with broadcasted dataframe causes shuffle of redundant data > > > Key: SPARK-24904 > URL: https://issues.apache.org/jira/browse/SPARK-24904 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.1.2 >Reporter: Shay Elbaz >Priority: Minor > > When joining a "large" dataframe with broadcasted small one, and join-type is > on the small DF side (see right-join below), the physical plan falls back to > sort merge join. But when the join is on the large DF side, the broadcast > does take place. Is there a good reason for this? In the below example it > sure doesn't make any sense to shuffle the entire large table: > > {code:java} > val small = spark.range(1, 10) > val big = spark.range(1, 1 << 30) > .withColumnRenamed("id", "id2") > big.join(broadcast(small), $"id" === $"id2", "right") > .explain > //OUTPUT: > == Physical Plan == > SortMergeJoin [id2#16307L], [id#16310L], RightOuter > :- *Sort [id2#16307L ASC NULLS FIRST], false, 0 > : +- Exchange hashpartitioning(id2#16307L, 1000) > : +- *Project [id#16304L AS id2#16307L] > : +- *Range (1, 1073741824, step=1, splits=Some(600)) > +- *Sort [id#16310L ASC NULLS FIRST], false, 0 > +- Exchange hashpartitioning(id#16310L, 1000) > +- *Range (1, 10, step=1, splits=Some(600)) > {code} > As a workaround, users need to perform inner instead of right join, and then > join the result back with the small DF to fill the missing rows. > > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24904) Join with broadcasted dataframe causes shuffle of redundant data
[ https://issues.apache.org/jira/browse/SPARK-24904?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1678#comment-1678 ] Shay Elbaz commented on SPARK-24904: [~mgaido] Technically you *can* that, you just need an additional shuffle (after the map side join) to fill in the missing rows as you mentioned. And since the current implementation already shuffles, I don't see how it makes sense to involve the entire big table in the shuffle. Instead, Spark could do the following: # Broadcast the small table. # Just link inner join, load the big table and hash-join. The output is (expected to be) very small compared to the big table. # Keep the small table broadcasted, and shuffle the results from last stage (say, sort-merge). # Now on each task, fill in missing rows from the broadcasted table. This is trivial if using sort-merge and the broadcasted table is just another partition to merge. As I mentioned in the description, this is can be achieved by the user using 2 joins, but shouldn't Spark offer this by default? Needless to say how sub optimal the current implementation is compared to the above plan. Am I missing something? > Join with broadcasted dataframe causes shuffle of redundant data > > > Key: SPARK-24904 > URL: https://issues.apache.org/jira/browse/SPARK-24904 > Project: Spark > Issue Type: Question > Components: SQL >Affects Versions: 2.1.2 >Reporter: Shay Elbaz >Priority: Minor > > When joining a "large" dataframe with broadcasted small one, and join-type is > on the small DF side (see right-join below), the physical plan does not > include broadcasting the small table. But when the join is on the large DF > side, the broadcast does take place. Is there a good reason for this? In the > below example it sure doesn't make any sense to shuffle the entire large > table: > > {code:java} > val small = spark.range(1, 10) > val big = spark.range(1, 1 << 30) > .withColumnRenamed("id", "id2") > big.join(broadcast(small), $"id" === $"id2", "right") > .explain > //OUTPUT: > == Physical Plan == > SortMergeJoin [id2#16307L], [id#16310L], RightOuter > :- *Sort [id2#16307L ASC NULLS FIRST], false, 0 > : +- Exchange hashpartitioning(id2#16307L, 1000) > : +- *Project [id#16304L AS id2#16307L] > : +- *Range (1, 1073741824, step=1, splits=Some(600)) > +- *Sort [id#16310L ASC NULLS FIRST], false, 0 > +- Exchange hashpartitioning(id#16310L, 1000) > +- *Range (1, 10, step=1, splits=Some(600)) > {code} > As a workaround, users need to perform inner instead of right join, and then > join the result back with the small DF to fill the missing rows. > > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-24904) Join with broadcasted dataframe causes shuffle of redundant data
[ https://issues.apache.org/jira/browse/SPARK-24904?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shay Elbaz updated SPARK-24904: --- Issue Type: Improvement (was: Question) > Join with broadcasted dataframe causes shuffle of redundant data > > > Key: SPARK-24904 > URL: https://issues.apache.org/jira/browse/SPARK-24904 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.1.2 >Reporter: Shay Elbaz >Priority: Minor > > When joining a "large" dataframe with broadcasted small one, and join-type is > on the small DF side (see right-join below), the physical plan does not > include broadcasting the small table. But when the join is on the large DF > side, the broadcast does take place. Is there a good reason for this? In the > below example it sure doesn't make any sense to shuffle the entire large > table: > > {code:java} > val small = spark.range(1, 10) > val big = spark.range(1, 1 << 30) > .withColumnRenamed("id", "id2") > big.join(broadcast(small), $"id" === $"id2", "right") > .explain > //OUTPUT: > == Physical Plan == > SortMergeJoin [id2#16307L], [id#16310L], RightOuter > :- *Sort [id2#16307L ASC NULLS FIRST], false, 0 > : +- Exchange hashpartitioning(id2#16307L, 1000) > : +- *Project [id#16304L AS id2#16307L] > : +- *Range (1, 1073741824, step=1, splits=Some(600)) > +- *Sort [id#16310L ASC NULLS FIRST], false, 0 > +- Exchange hashpartitioning(id#16310L, 1000) > +- *Range (1, 10, step=1, splits=Some(600)) > {code} > As a workaround, users need to perform inner instead of right join, and then > join the result back with the small DF to fill the missing rows. > > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-19018) spark csv writer charset support
[ https://issues.apache.org/jira/browse/SPARK-19018?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xiao Li updated SPARK-19018: Issue Type: Improvement (was: Bug) > spark csv writer charset support > > > Key: SPARK-19018 > URL: https://issues.apache.org/jira/browse/SPARK-19018 > Project: Spark > Issue Type: Improvement > Components: SQL >Reporter: todd.chen >Assignee: Carlos Peña >Priority: Major > Fix For: 2.4.0 > > > if we write dataFrame to csv ,default charset is utf-8,and we can't change it > like we read csv by giving `encoding ` in params ,so I think we should > support csv write by pass a param -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24904) Join with broadcasted dataframe causes shuffle of redundant data
[ https://issues.apache.org/jira/browse/SPARK-24904?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16555842#comment-16555842 ] Marco Gaido commented on SPARK-24904: - [~shay_elbaz] In the case I mentioned before the approach you proposed is not better, it is worse, as it requires an unneeded additional broadcast join. > Join with broadcasted dataframe causes shuffle of redundant data > > > Key: SPARK-24904 > URL: https://issues.apache.org/jira/browse/SPARK-24904 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.1.2 >Reporter: Shay Elbaz >Priority: Minor > > When joining a "large" dataframe with broadcasted small one, and join-type is > on the small DF side (see right-join below), the physical plan falls back to > sort merge join. But when the join is on the large DF side, the broadcast > does take place. Is there a good reason for this? In the below example it > sure doesn't make any sense to shuffle the entire large table: > > {code:java} > val small = spark.range(1, 10) > val big = spark.range(1, 1 << 30) > .withColumnRenamed("id", "id2") > big.join(broadcast(small), $"id" === $"id2", "right") > .explain > //OUTPUT: > == Physical Plan == > SortMergeJoin [id2#16307L], [id#16310L], RightOuter > :- *Sort [id2#16307L ASC NULLS FIRST], false, 0 > : +- Exchange hashpartitioning(id2#16307L, 1000) > : +- *Project [id#16304L AS id2#16307L] > : +- *Range (1, 1073741824, step=1, splits=Some(600)) > +- *Sort [id#16310L ASC NULLS FIRST], false, 0 > +- Exchange hashpartitioning(id#16310L, 1000) > +- *Range (1, 10, step=1, splits=Some(600)) > {code} > As a workaround, users need to perform inner instead of right join, and then > join the result back with the small DF to fill the missing rows. > > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-24917) Sending a partition over netty results in 2x memory usage
[ https://issues.apache.org/jira/browse/SPARK-24917?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vincent updated SPARK-24917: Description: Hello while investigating some OOM errors in Spark 2.2 [(here's my call stack)|https://image.ibb.co/hHa2R8/sparkOOM.png], I find the following behavior happening, which I think is weird: * a request happens to send a partition over network * this partition is 1.9 GB and is persisted in memory * this partition is apparently stored in a ByteBufferBlockData, that is made of a ChunkedByteBuffer, which is a list of (lots of) ByteBuffer of 4 MB each. * the call to toNetty() is supposed to only wrap all the arrays and not allocate any memory * yet the call stack shows that netty is allocating memory and is trying to consolidate all the chunks into one big 1.9GB array * this means that at this point the memory footprint is 2x the size of the actual partition (which is huge when the partition is 1.9GB) Is this transient allocation expected? After digging, it turns out that the actual copy is due to [this method|https://github.com/netty/netty/blob/4.0/buffer/src/main/java/io/netty/buffer/Unpooled.java#L260] in netty. If my initial buffer is made of more than DEFAULT_MAX_COMPONENTS (16) components it will trigger a re-allocation of all the buffer. This netty issue was fixed in this recent change : [https://github.com/netty/netty/commit/9b95b8ee628983e3e4434da93fffb893edff4aa2] As a result, is it possible to benefit from this change somehow in spark 2.2 and above? I don't know how the netty dependencies are handled for spark NB: it seems this ticket: [https://jira.apache.org/jira/browse/SPARK-24307] kinda changed the approach for spark 2.4 by bypassing netty buffer altogether. However as it is written in the ticket, this approach *still* needs to have the *entire* block serialized in memory, so this would be a downgrade from fixing the netty issue when your buffer in < 2GB Thanks! was: Hello while investigating some OOM errors in Spark 2.2 [(here's my call stack)|https://image.ibb.co/hHa2R8/sparkOOM.png], I find the following behavior happening, which I think is weird: * a request happens to send a partition over network * this partition is 1.9 GB and is persisted in memory * this partition is apparently stored in a ByteBufferBlockData, that is made of a ChunkedByteBuffer, which is a list of (lots of) ByteBuffer of 4 MB each. * the call to toNetty() is supposed to only wrap all the arrays and not allocate any memory * yet the call stack shows that netty is allocating memory and is trying to consolidate all the chunks into one big 1.9GB array * this means that at this point the memory footprint is 2x the size of the actual partition (which is huge when the partition is 1.9GB) Is this transient allocation expected? After digging, it turns out that the actual copy is due to [this method|https://github.com/netty/netty/blob/4.0/buffer/src/main/java/io/netty/buffer/Unpooled.java#L260] in netty. If my initial buffer is made of more than DEFAULT_MAX_COMPONENTS (16) components it will trigger a re-allocation of all the buffer. This netty issue was fixed in this recent change : [https://github.com/netty/netty/commit/9b95b8ee628983e3e4434da93fffb893edff4aa2] As a result, is it possible to benefit from this change somehow in spark 2.2 and above? I don't know how the netty dependencies are handled for spark NB: it seems this ticket: [https://jira.apache.org/jira/browse/SPARK-24307] fixes the issue for spark 2.4 by bypassing netty buffer altogether Thanks! > Sending a partition over netty results in 2x memory usage > - > > Key: SPARK-24917 > URL: https://issues.apache.org/jira/browse/SPARK-24917 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.2.2 >Reporter: Vincent >Priority: Major > > Hello > while investigating some OOM errors in Spark 2.2 [(here's my call > stack)|https://image.ibb.co/hHa2R8/sparkOOM.png], I find the following > behavior happening, which I think is weird: > * a request happens to send a partition over network > * this partition is 1.9 GB and is persisted in memory > * this partition is apparently stored in a ByteBufferBlockData, that is made > of a ChunkedByteBuffer, which is a list of (lots of) ByteBuffer of 4 MB each. > * the call to toNetty() is supposed to only wrap all the arrays and not > allocate any memory > * yet the call stack shows that netty is allocating memory and is trying to > consolidate all the chunks into one big 1.9GB array > * this means that at this point the memory footprint is 2x the size of the > actual partition (which is huge when the partition is 1.9GB) > Is this transient allocation expected? >
[jira] [Updated] (SPARK-24917) Sending a partition over netty results in 2x memory usage
[ https://issues.apache.org/jira/browse/SPARK-24917?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vincent updated SPARK-24917: Description: Hello while investigating some OOM errors in Spark 2.2 [(here's my call stack)|https://image.ibb.co/hHa2R8/sparkOOM.png], I find the following behavior happening, which I think is weird: * a request happens to send a partition over network * this partition is 1.9 GB and is persisted in memory * this partition is apparently stored in a ByteBufferBlockData, that is made of a ChunkedByteBuffer, which is a list of (lots of) ByteBuffer of 4 MB each. * the call to toNetty() is supposed to only wrap all the arrays and not allocate any memory * yet the call stack shows that netty is allocating memory and is trying to consolidate all the chunks into one big 1.9GB array * this means that at this point the memory footprint is 2x the size of the actual partition (which is huge when the partition is 1.9GB) Is this transient allocation expected? After digging, it turns out that the actual copy is due to [this method|https://github.com/netty/netty/blob/4.0/buffer/src/main/java/io/netty/buffer/Unpooled.java#L260] in netty. If my initial buffer is made of more than DEFAULT_MAX_COMPONENTS (16) components it will trigger a re-allocation of all the buffer. This netty issue was fixed in this recent change : [https://github.com/netty/netty/commit/9b95b8ee628983e3e4434da93fffb893edff4aa2] As a result, is it possible to benefit from this change somehow in spark 2.2 and above? I don't know how the netty dependencies are handled for spark NB: it seems this ticket: [https://jira.apache.org/jira/browse/SPARK-24307] fixes the issue for spark 2.4 by bypassing netty buffer altogether Thanks! was: Hello while investigating some OOM errors in Spark 2.2 [(here's my call stack)|https://image.ibb.co/hHa2R8/sparkOOM.png], I find the following behavior happening, which I think is weird: * a request happens to send a partition over network * this partition is 1.9 GB and is persisted in memory * this partition is apparently stored in a ByteBufferBlockData, that is made of a ChunkedByteBuffer, which is a list of (lots of) ByteBuffer of 4 MB each. * the call to toNetty() is supposed to only wrap all the arrays and not allocate any memory * yet the call stack shows that netty is allocating memory and is trying to consolidate all the chunks into one big 1.9GB array * this means that at this point the memory footprint is 2x the size of the actual partition (which is huge when the partition is 1.9GB) Is this transient allocation expected? After digging, it turns out that the actual copy is due to [this method|https://github.com/netty/netty/blob/4.0/buffer/src/main/java/io/netty/buffer/Unpooled.java#L260] in netty. If my initial buffer is made of more than DEFAULT_MAX_COMPONENTS (16) components it will trigger a re-allocation of all the buffer. This netty issue was fixed in this recent change : [https://github.com/netty/netty/commit/9b95b8ee628983e3e4434da93fffb893edff4aa2] As a result, is it possible to benefit from this change somehow in spark 2.2 and above? I don't know how the netty dependencies are handled for spark Thanks! > Sending a partition over netty results in 2x memory usage > - > > Key: SPARK-24917 > URL: https://issues.apache.org/jira/browse/SPARK-24917 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.2.2 >Reporter: Vincent >Priority: Major > > Hello > while investigating some OOM errors in Spark 2.2 [(here's my call > stack)|https://image.ibb.co/hHa2R8/sparkOOM.png], I find the following > behavior happening, which I think is weird: > * a request happens to send a partition over network > * this partition is 1.9 GB and is persisted in memory > * this partition is apparently stored in a ByteBufferBlockData, that is made > of a ChunkedByteBuffer, which is a list of (lots of) ByteBuffer of 4 MB each. > * the call to toNetty() is supposed to only wrap all the arrays and not > allocate any memory > * yet the call stack shows that netty is allocating memory and is trying to > consolidate all the chunks into one big 1.9GB array > * this means that at this point the memory footprint is 2x the size of the > actual partition (which is huge when the partition is 1.9GB) > Is this transient allocation expected? > After digging, it turns out that the actual copy is due to [this > method|https://github.com/netty/netty/blob/4.0/buffer/src/main/java/io/netty/buffer/Unpooled.java#L260] > in netty. If my initial buffer is made of more than DEFAULT_MAX_COMPONENTS > (16) components it will trigger a re-allocation of all the buffer. This netty > issue was fixed in this recent
[jira] [Created] (SPARK-24917) Sending a partition over netty results in 2x memory usage
Vincent created SPARK-24917: --- Summary: Sending a partition over netty results in 2x memory usage Key: SPARK-24917 URL: https://issues.apache.org/jira/browse/SPARK-24917 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 2.2.2 Reporter: Vincent Hello while investigating some OOM errors in Spark 2.2 [(here's my call stack)|https://image.ibb.co/hHa2R8/sparkOOM.png], I find the following behavior happening, which I think is weird: * a request happens to send a partition over network * this partition is 1.9 GB and is persisted in memory * this partition is apparently stored in a ByteBufferBlockData, that is made of a ChunkedByteBuffer, which is a list of (lots of) ByteBuffer of 4 MB each. * the call to toNetty() is supposed to only wrap all the arrays and not allocate any memory * yet the call stack shows that netty is allocating memory and is trying to consolidate all the chunks into one big 1.9GB array * this means that at this point the memory footprint is 2x the size of the actual partition (which is huge when the partition is 1.9GB) Is this transient allocation expected? After digging, it turns out that the actual copy is due to [this method|https://github.com/netty/netty/blob/4.0/buffer/src/main/java/io/netty/buffer/Unpooled.java#L260] in netty. If my initial buffer is made of more than DEFAULT_MAX_COMPONENTS (16) components it will trigger a re-allocation of all the buffer. This netty issue was fixed in this recent change : [https://github.com/netty/netty/commit/9b95b8ee628983e3e4434da93fffb893edff4aa2] As a result, is it possible to benefit from this change somehow in spark 2.2 and above? I don't know how the netty dependencies are handled for spark Thanks! -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24904) Join with broadcasted dataframe causes shuffle of redundant data
[ https://issues.apache.org/jira/browse/SPARK-24904?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16555769#comment-16555769 ] Shay Elbaz commented on SPARK-24904: [~mgaido] indeed this assumption is not always true. However the map side result will always be smaller or equal to the big table, that's why I think this approach is better. Kind of filter push down :) > Join with broadcasted dataframe causes shuffle of redundant data > > > Key: SPARK-24904 > URL: https://issues.apache.org/jira/browse/SPARK-24904 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.1.2 >Reporter: Shay Elbaz >Priority: Minor > > When joining a "large" dataframe with broadcasted small one, and join-type is > on the small DF side (see right-join below), the physical plan falls back to > sort merge join. But when the join is on the large DF side, the broadcast > does take place. Is there a good reason for this? In the below example it > sure doesn't make any sense to shuffle the entire large table: > > {code:java} > val small = spark.range(1, 10) > val big = spark.range(1, 1 << 30) > .withColumnRenamed("id", "id2") > big.join(broadcast(small), $"id" === $"id2", "right") > .explain > //OUTPUT: > == Physical Plan == > SortMergeJoin [id2#16307L], [id#16310L], RightOuter > :- *Sort [id2#16307L ASC NULLS FIRST], false, 0 > : +- Exchange hashpartitioning(id2#16307L, 1000) > : +- *Project [id#16304L AS id2#16307L] > : +- *Range (1, 1073741824, step=1, splits=Some(600)) > +- *Sort [id#16310L ASC NULLS FIRST], false, 0 > +- Exchange hashpartitioning(id#16310L, 1000) > +- *Range (1, 10, step=1, splits=Some(600)) > {code} > As a workaround, users need to perform inner instead of right join, and then > join the result back with the small DF to fill the missing rows. > > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24904) Join with broadcasted dataframe causes shuffle of redundant data
[ https://issues.apache.org/jira/browse/SPARK-24904?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16555652#comment-16555652 ] Marco Gaido commented on SPARK-24904: - I see now what you mean, but yes, It think there is an assumption you are doing which is not always true, ie. "The output is (expected to be) very small compared to the big table". That is not true. If all the rows from the big table match the small one, this is not the case. We may trying to do something like what you mentioned in the optimizer if CBO is enabled and we have good enough statistics about the output size of the inner join, but i am not sure. > Join with broadcasted dataframe causes shuffle of redundant data > > > Key: SPARK-24904 > URL: https://issues.apache.org/jira/browse/SPARK-24904 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.1.2 >Reporter: Shay Elbaz >Priority: Minor > > When joining a "large" dataframe with broadcasted small one, and join-type is > on the small DF side (see right-join below), the physical plan falls back to > sort merge join. But when the join is on the large DF side, the broadcast > does take place. Is there a good reason for this? In the below example it > sure doesn't make any sense to shuffle the entire large table: > > {code:java} > val small = spark.range(1, 10) > val big = spark.range(1, 1 << 30) > .withColumnRenamed("id", "id2") > big.join(broadcast(small), $"id" === $"id2", "right") > .explain > //OUTPUT: > == Physical Plan == > SortMergeJoin [id2#16307L], [id#16310L], RightOuter > :- *Sort [id2#16307L ASC NULLS FIRST], false, 0 > : +- Exchange hashpartitioning(id2#16307L, 1000) > : +- *Project [id#16304L AS id2#16307L] > : +- *Range (1, 1073741824, step=1, splits=Some(600)) > +- *Sort [id#16310L ASC NULLS FIRST], false, 0 > +- Exchange hashpartitioning(id#16310L, 1000) > +- *Range (1, 10, step=1, splits=Some(600)) > {code} > As a workaround, users need to perform inner instead of right join, and then > join the result back with the small DF to fill the missing rows. > > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-24904) Join with broadcasted dataframe causes shuffle of redundant data
[ https://issues.apache.org/jira/browse/SPARK-24904?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shay Elbaz updated SPARK-24904: --- Description: When joining a "large" dataframe with broadcasted small one, and join-type is on the small DF side (see right-join below), the physical plan falls back to sort merge join. But when the join is on the large DF side, the broadcast does take place. Is there a good reason for this? In the below example it sure doesn't make any sense to shuffle the entire large table: {code:java} val small = spark.range(1, 10) val big = spark.range(1, 1 << 30) .withColumnRenamed("id", "id2") big.join(broadcast(small), $"id" === $"id2", "right") .explain //OUTPUT: == Physical Plan == SortMergeJoin [id2#16307L], [id#16310L], RightOuter :- *Sort [id2#16307L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(id2#16307L, 1000) : +- *Project [id#16304L AS id2#16307L] : +- *Range (1, 1073741824, step=1, splits=Some(600)) +- *Sort [id#16310L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(id#16310L, 1000) +- *Range (1, 10, step=1, splits=Some(600)) {code} As a workaround, users need to perform inner instead of right join, and then join the result back with the small DF to fill the missing rows. was: When joining a "large" dataframe with broadcasted small one, and join-type is on the small DF side (see right-join below), the physical plan does not include broadcasting the small table. But when the join is on the large DF side, the broadcast does take place. Is there a good reason for this? In the below example it sure doesn't make any sense to shuffle the entire large table: {code:java} val small = spark.range(1, 10) val big = spark.range(1, 1 << 30) .withColumnRenamed("id", "id2") big.join(broadcast(small), $"id" === $"id2", "right") .explain //OUTPUT: == Physical Plan == SortMergeJoin [id2#16307L], [id#16310L], RightOuter :- *Sort [id2#16307L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(id2#16307L, 1000) : +- *Project [id#16304L AS id2#16307L] : +- *Range (1, 1073741824, step=1, splits=Some(600)) +- *Sort [id#16310L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(id#16310L, 1000) +- *Range (1, 10, step=1, splits=Some(600)) {code} As a workaround, users need to perform inner instead of right join, and then join the result back with the small DF to fill the missing rows. > Join with broadcasted dataframe causes shuffle of redundant data > > > Key: SPARK-24904 > URL: https://issues.apache.org/jira/browse/SPARK-24904 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.1.2 >Reporter: Shay Elbaz >Priority: Minor > > When joining a "large" dataframe with broadcasted small one, and join-type is > on the small DF side (see right-join below), the physical plan falls back to > sort merge join. But when the join is on the large DF side, the broadcast > does take place. Is there a good reason for this? In the below example it > sure doesn't make any sense to shuffle the entire large table: > > {code:java} > val small = spark.range(1, 10) > val big = spark.range(1, 1 << 30) > .withColumnRenamed("id", "id2") > big.join(broadcast(small), $"id" === $"id2", "right") > .explain > //OUTPUT: > == Physical Plan == > SortMergeJoin [id2#16307L], [id#16310L], RightOuter > :- *Sort [id2#16307L ASC NULLS FIRST], false, 0 > : +- Exchange hashpartitioning(id2#16307L, 1000) > : +- *Project [id#16304L AS id2#16307L] > : +- *Range (1, 1073741824, step=1, splits=Some(600)) > +- *Sort [id#16310L ASC NULLS FIRST], false, 0 > +- Exchange hashpartitioning(id#16310L, 1000) > +- *Range (1, 10, step=1, splits=Some(600)) > {code} > As a workaround, users need to perform inner instead of right join, and then > join the result back with the small DF to fill the missing rows. > > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24916) Fix type coercion for IN expression with subquery
[ https://issues.apache.org/jira/browse/SPARK-24916?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16555435#comment-16555435 ] Apache Spark commented on SPARK-24916: -- User 'wangyum' has created a pull request for this issue: https://github.com/apache/spark/pull/21871 > Fix type coercion for IN expression with subquery > - > > Key: SPARK-24916 > URL: https://issues.apache.org/jira/browse/SPARK-24916 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0, 2.3.0 >Reporter: Yuming Wang >Priority: Major > > How to reproduce: > {code:sql} > CREATE TEMPORARY VIEW t4 AS SELECT * FROM VALUES > (CAST(1 AS DOUBLE), CAST(2 AS STRING), CAST(3 AS STRING)) > AS t1(t4a, t4b, t4c); > CREATE TEMPORARY VIEW t5 AS SELECT * FROM VALUES > (CAST(1 AS DECIMAL(18, 0)), CAST(2 AS STRING), CAST(3 AS BIGINT)) > AS t1(t5a, t5b, t5c); > SELECT * FROM t4 > WHERE > (t4a, t4b, t4c) IN (SELECT t5a, >t5b, >t5c > FROM t5); > {code} > Will throw exception: > {noformat} > org.apache.spark.sql.AnalysisException > cannot resolve '(named_struct('t4a', t4.`t4a`, 't4b', t4.`t4b`, 't4c', > t4.`t4c`) IN (listquery()))' due to data type mismatch: > The data type of one or more elements in the left hand side of an IN subquery > is not compatible with the data type of the output of the subquery > Mismatched columns: > [(t4.`t4a`:double, t5.`t5a`:decimal(18,0)), (t4.`t4c`:string, > t5.`t5c`:bigint)] > Left side: > [double, string, string]. > Right side: > [decimal(18,0), string, bigint].; > {noformat} > But it success on Spark 2.1.x. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-24916) Fix type coercion for IN expression with subquery
[ https://issues.apache.org/jira/browse/SPARK-24916?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-24916: Assignee: (was: Apache Spark) > Fix type coercion for IN expression with subquery > - > > Key: SPARK-24916 > URL: https://issues.apache.org/jira/browse/SPARK-24916 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0, 2.3.0 >Reporter: Yuming Wang >Priority: Major > > How to reproduce: > {code:sql} > CREATE TEMPORARY VIEW t4 AS SELECT * FROM VALUES > (CAST(1 AS DOUBLE), CAST(2 AS STRING), CAST(3 AS STRING)) > AS t1(t4a, t4b, t4c); > CREATE TEMPORARY VIEW t5 AS SELECT * FROM VALUES > (CAST(1 AS DECIMAL(18, 0)), CAST(2 AS STRING), CAST(3 AS BIGINT)) > AS t1(t5a, t5b, t5c); > SELECT * FROM t4 > WHERE > (t4a, t4b, t4c) IN (SELECT t5a, >t5b, >t5c > FROM t5); > {code} > Will throw exception: > {noformat} > org.apache.spark.sql.AnalysisException > cannot resolve '(named_struct('t4a', t4.`t4a`, 't4b', t4.`t4b`, 't4c', > t4.`t4c`) IN (listquery()))' due to data type mismatch: > The data type of one or more elements in the left hand side of an IN subquery > is not compatible with the data type of the output of the subquery > Mismatched columns: > [(t4.`t4a`:double, t5.`t5a`:decimal(18,0)), (t4.`t4c`:string, > t5.`t5c`:bigint)] > Left side: > [double, string, string]. > Right side: > [decimal(18,0), string, bigint].; > {noformat} > But it success on Spark 2.1.x. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-24916) Fix type coercion for IN expression with subquery
[ https://issues.apache.org/jira/browse/SPARK-24916?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-24916: Assignee: Apache Spark > Fix type coercion for IN expression with subquery > - > > Key: SPARK-24916 > URL: https://issues.apache.org/jira/browse/SPARK-24916 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0, 2.3.0 >Reporter: Yuming Wang >Assignee: Apache Spark >Priority: Major > > How to reproduce: > {code:sql} > CREATE TEMPORARY VIEW t4 AS SELECT * FROM VALUES > (CAST(1 AS DOUBLE), CAST(2 AS STRING), CAST(3 AS STRING)) > AS t1(t4a, t4b, t4c); > CREATE TEMPORARY VIEW t5 AS SELECT * FROM VALUES > (CAST(1 AS DECIMAL(18, 0)), CAST(2 AS STRING), CAST(3 AS BIGINT)) > AS t1(t5a, t5b, t5c); > SELECT * FROM t4 > WHERE > (t4a, t4b, t4c) IN (SELECT t5a, >t5b, >t5c > FROM t5); > {code} > Will throw exception: > {noformat} > org.apache.spark.sql.AnalysisException > cannot resolve '(named_struct('t4a', t4.`t4a`, 't4b', t4.`t4b`, 't4c', > t4.`t4c`) IN (listquery()))' due to data type mismatch: > The data type of one or more elements in the left hand side of an IN subquery > is not compatible with the data type of the output of the subquery > Mismatched columns: > [(t4.`t4a`:double, t5.`t5a`:decimal(18,0)), (t4.`t4c`:string, > t5.`t5c`:bigint)] > Left side: > [double, string, string]. > Right side: > [decimal(18,0), string, bigint].; > {noformat} > But it success on Spark 2.1.x. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-24916) Fix type coercion for IN expression with subquery
Yuming Wang created SPARK-24916: --- Summary: Fix type coercion for IN expression with subquery Key: SPARK-24916 URL: https://issues.apache.org/jira/browse/SPARK-24916 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.3.0, 2.2.0 Reporter: Yuming Wang How to reproduce: {code:sql} CREATE TEMPORARY VIEW t4 AS SELECT * FROM VALUES (CAST(1 AS DOUBLE), CAST(2 AS STRING), CAST(3 AS STRING)) AS t1(t4a, t4b, t4c); CREATE TEMPORARY VIEW t5 AS SELECT * FROM VALUES (CAST(1 AS DECIMAL(18, 0)), CAST(2 AS STRING), CAST(3 AS BIGINT)) AS t1(t5a, t5b, t5c); SELECT * FROM t4 WHERE (t4a, t4b, t4c) IN (SELECT t5a, t5b, t5c FROM t5); {code} Will throw exception: {noformat} org.apache.spark.sql.AnalysisException cannot resolve '(named_struct('t4a', t4.`t4a`, 't4b', t4.`t4b`, 't4c', t4.`t4c`) IN (listquery()))' due to data type mismatch: The data type of one or more elements in the left hand side of an IN subquery is not compatible with the data type of the output of the subquery Mismatched columns: [(t4.`t4a`:double, t5.`t5a`:decimal(18,0)), (t4.`t4c`:string, t5.`t5c`:bigint)] Left side: [double, string, string]. Right side: [decimal(18,0), string, bigint].; {noformat} But it success on Spark 2.1.x. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21063) Spark return an empty result from remote hadoop cluster
[ https://issues.apache.org/jira/browse/SPARK-21063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16555403#comment-16555403 ] nick commented on SPARK-21063: -- [~paulstaab] It does work when both registering the dialect and using fetchsize, but how can we get all the data (don't set "fetchsize") ? > Spark return an empty result from remote hadoop cluster > --- > > Key: SPARK-21063 > URL: https://issues.apache.org/jira/browse/SPARK-21063 > Project: Spark > Issue Type: Bug > Components: Spark Core, SQL >Affects Versions: 2.1.0, 2.1.1 >Reporter: Peter Bykov >Priority: Major > > Spark returning empty result from when querying remote hadoop cluster. > All firewall settings removed. > Querying using JDBC working properly using hive-jdbc driver from version 1.1.1 > Code snippet is: > {code:java} > val spark = SparkSession.builder > .appName("RemoteSparkTest") > .master("local") > .getOrCreate() > val df = spark.read > .option("url", "jdbc:hive2://remote.hive.local:1/default") > .option("user", "user") > .option("password", "pass") > .option("dbtable", "test_table") > .option("driver", "org.apache.hive.jdbc.HiveDriver") > .format("jdbc") > .load() > > df.show() > {code} > Result: > {noformat} > +---+ > |test_table.test_col| > +---+ > +---+ > {noformat} > All manipulations like: > {code:java} > df.select(*).show() > {code} > returns empty result too. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24904) Join with broadcasted dataframe causes shuffle of redundant data
[ https://issues.apache.org/jira/browse/SPARK-24904?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16555477#comment-16555477 ] Marco Gaido commented on SPARK-24904: - You cannot do a broadcast join when it is on the side of the small table, as the join requires to compare each row of the small table with the whole big table and output it into the result if it is not met. Since the big table is available only in small pieces in each task, no task can determine whether the row matched at least once (as it doesn't know what other tasks did). > Join with broadcasted dataframe causes shuffle of redundant data > > > Key: SPARK-24904 > URL: https://issues.apache.org/jira/browse/SPARK-24904 > Project: Spark > Issue Type: Question > Components: SQL >Affects Versions: 2.1.2 >Reporter: Shay Elbaz >Priority: Minor > > When joining a "large" dataframe with broadcasted small one, and join-type is > on the small DF side (see right-join below), the physical plan does not > include broadcasting the small table. But when the join is on the large DF > side, the broadcast does take place. Is there a good reason for this? In the > below example it sure doesn't make any sense to shuffle the entire large > table: > > {code:java} > val small = spark.range(1, 10) > val big = spark.range(1, 1 << 30) > .withColumnRenamed("id", "id2") > big.join(broadcast(small), $"id" === $"id2", "right") > .explain > //OUTPUT: > == Physical Plan == > SortMergeJoin [id2#16307L], [id#16310L], RightOuter > :- *Sort [id2#16307L ASC NULLS FIRST], false, 0 > : +- Exchange hashpartitioning(id2#16307L, 1000) > : +- *Project [id#16304L AS id2#16307L] > : +- *Range (1, 1073741824, step=1, splits=Some(600)) > +- *Sort [id#16310L ASC NULLS FIRST], false, 0 > +- Exchange hashpartitioning(id#16310L, 1000) > +- *Range (1, 10, step=1, splits=Some(600)) > {code} > As a workaround, users need to perform inner instead of right join, and then > join the result back with the small DF to fill the missing rows. > > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-19018) spark csv writer charset support
[ https://issues.apache.org/jira/browse/SPARK-19018?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hyukjin Kwon reassigned SPARK-19018: Assignee: Carlos Peña > spark csv writer charset support > > > Key: SPARK-19018 > URL: https://issues.apache.org/jira/browse/SPARK-19018 > Project: Spark > Issue Type: Bug > Components: SQL >Reporter: todd.chen >Assignee: Carlos Peña >Priority: Major > Fix For: 2.4.0 > > > if we write dataFrame to csv ,default charset is utf-8,and we can't change it > like we read csv by giving `encoding ` in params ,so I think we should > support csv write by pass a param -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-24915) Calling SparkSession.createDataFrame with schema can throw exception
Stephen Spencer created SPARK-24915: --- Summary: Calling SparkSession.createDataFrame with schema can throw exception Key: SPARK-24915 URL: https://issues.apache.org/jira/browse/SPARK-24915 Project: Spark Issue Type: Bug Components: PySpark Affects Versions: 2.3.1 Environment: Python 3.6.3 PySpark 2.3.1 (installed via pip) OSX 10.12.6 Reporter: Stephen Spencer There seems to be a bug in PySpark when using the PySparkSQL session to create a dataframe with a pre-defined schema. Code to reproduce the error: {code:java} from pyspark import SparkConf, SparkContext from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StructField, StringType, Row conf = SparkConf().setMaster("local").setAppName("repro") context = SparkContext(conf=conf) session = SparkSession(context) # Construct schema (the order of fields is important) schema = StructType([ StructField('field2', StructType([StructField('sub_field', StringType(), False)]), False), StructField('field1', StringType(), False), ]) # Create data to populate data frame data = [ Row(field1="Hello", field2=Row(sub_field='world')) ] # Attempt to create the data frame supplying the schema # this will throw a ValueError df = session.createDataFrame(data, schema=schema) df.show(){code} Running this throws a ValueError {noformat} Traceback (most recent call last): File "schema_bug.py", line 18, in df = session.createDataFrame(data, schema=schema) File "/Users/stephenspencer/benevolent/ai/neat/rex/.env/lib/python3.6/site-packages/pyspark/sql/session.py", line 691, in createDataFrame rdd, schema = self._createFromLocal(map(prepare, data), schema) File "/Users/stephenspencer/benevolent/ai/neat/rex/.env/lib/python3.6/site-packages/pyspark/sql/session.py", line 423, in _createFromLocal data = [schema.toInternal(row) for row in data] File "/Users/stephenspencer/benevolent/ai/neat/rex/.env/lib/python3.6/site-packages/pyspark/sql/session.py", line 423, in data = [schema.toInternal(row) for row in data] File "/Users/stephenspencer/benevolent/ai/neat/rex/.env/lib/python3.6/site-packages/pyspark/sql/types.py", line 601, in toInternal for f, v, c in zip(self.fields, obj, self._needConversion)) File "/Users/stephenspencer/benevolent/ai/neat/rex/.env/lib/python3.6/site-packages/pyspark/sql/types.py", line 601, in for f, v, c in zip(self.fields, obj, self._needConversion)) File "/Users/stephenspencer/benevolent/ai/neat/rex/.env/lib/python3.6/site-packages/pyspark/sql/types.py", line 439, in toInternal return self.dataType.toInternal(obj) File "/Users/stephenspencer/benevolent/ai/neat/rex/.env/lib/python3.6/site-packages/pyspark/sql/types.py", line 619, in toInternal raise ValueError("Unexpected tuple %r with StructType" % obj) ValueError: Unexpected tuple 'Hello' with StructType{noformat} The problem seems to be here: https://github.com/apache/spark/blob/3d5c61e5fd24f07302e39b5d61294da79aa0c2f9/python/pyspark/sql/types.py#L603 specifically the bit {code:java} zip(self.fields, obj, self._needConversion) {code} This zip statement seems to assume that obj and self.fields are ordered in the same way, so that the elements of obj will correspond to the right fields in the schema. However this is not true, a Row orders its elements alphabetically but the fields in the schema are in whatever order they are specified. In this example field2 is being initialised with the field1 element 'Hello'. If you re-order the fields in the schema to go (field1, field2), the given example works without error. The schema in the repro is specifically designed to elicit the problem, the fields are out of alphabetical order and one field is a StructType, making chema._needSerializeAnyField==True . However we encountered this in real use. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-19018) spark csv writer charset support
[ https://issues.apache.org/jira/browse/SPARK-19018?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hyukjin Kwon resolved SPARK-19018. -- Resolution: Fixed Fix Version/s: 2.4.0 Fixed in https://github.com/apache/spark/pull/20949 > spark csv writer charset support > > > Key: SPARK-19018 > URL: https://issues.apache.org/jira/browse/SPARK-19018 > Project: Spark > Issue Type: Bug > Components: SQL >Reporter: todd.chen >Priority: Major > Fix For: 2.4.0 > > > if we write dataFrame to csv ,default charset is utf-8,and we can't change it > like we read csv by giving `encoding ` in params ,so I think we should > support csv write by pass a param -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-24906) Enlarge split size for columnar file to ensure the task read enough data
[ https://issues.apache.org/jira/browse/SPARK-24906?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16554972#comment-16554972 ] Jason Guo edited comment on SPARK-24906 at 7/25/18 6:09 AM: Thanks [~maropu] and [~viirya] for your comments. Here is my solution for our cluster 1. When this will be enabled? There is a configuration item named {code:java} spark.sql.parquet.adaptiveFileSplit=false{code} Only when this is enabled, DataSourceScanExec will enlarge the mapPartitionBytes. By default, this parameter is set to false. (Our ad-hoc query will set it to true) With this configuration, user will know that spark will adjust the partition / split size adaptively. If user do not want to use this, he or she can disable it 2. How to calculate maxPartitionBytes and openCostInBytes Different data type has different length (calculated with DataType.defaultSize). First we get the total size of the whole table (henceforth referred to as the “T”). Then we get the total size of all the requiredSchema (henceforth referred to as the “R”). The multiplier should be T / R .Then the maxPartitionBytes and openCostInBytes will be enlarge with T / R times. was (Author: habren): Thanks [~maropu] and [~viirya] for your comments. Here is my solution for our cluster (more than 40 thousands nodes in total and more than 10 thousands nodes for a single cluster). 1. When this will be enabled? There is a configuration item named {code:java} spark.sql.parquet.adaptiveFileSplit=false{code} Only when this is enabled, DataSourceScanExec will enlarge the mapPartitionBytes. By default, this parameter is set to false. (Our ad-hoc query will set it to true) With this configuration, user will know that spark will adjust the partition / split size adaptively. If user do not want to use this, he or she can disable it 2. How to calculate maxPartitionBytes and openCostInBytes Different data type has different length (calculated with DataType.defaultSize). First we get the total size of the whole table (henceforth referred to as the “T”). Then we get the total size of all the requiredSchema (henceforth referred to as the “R”). The multiplier should be T / R .Then the maxPartitionBytes and openCostInBytes will be enlarge with T / R times. > Enlarge split size for columnar file to ensure the task read enough data > > > Key: SPARK-24906 > URL: https://issues.apache.org/jira/browse/SPARK-24906 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.1 >Reporter: Jason Guo >Priority: Critical > Attachments: image-2018-07-24-20-26-32-441.png, > image-2018-07-24-20-28-06-269.png, image-2018-07-24-20-29-24-797.png, > image-2018-07-24-20-30-24-552.png > > > For columnar file, such as, when spark sql read the table, each split will be > 128 MB by default since spark.sql.files.maxPartitionBytes is default to > 128MB. Even when user set it to a large value, such as 512MB, the task may > read only few MB or even hundreds of KB. Because the table (Parquet) may > consists of dozens of columns while the SQL only need few columns. And spark > will prune the unnecessary columns. > > In this case, spark DataSourceScanExec can enlarge maxPartitionBytes > adaptively. > For example, there is 40 columns , 20 are integer while another 20 are long. > When use query on an integer type column and an long type column, the > maxPartitionBytes should be 20 times larger. (20*4+20*8) / (4+8) = 20. > > With this optimization, the number of task will be smaller and the job will > run faster. More importantly, for a very large cluster (more the 10 thousand > nodes), it will relieve RM's schedule pressure. > > Here is the test > > The table named test2 has more than 40 columns and there are more than 5 TB > data each hour. > When we issue a very simple query > > {code:java} > select count(device_id) from test2 where date=20180708 and hour='23'{code} > > There are 72176 tasks and the duration of the job is 4.8 minutes > !image-2018-07-24-20-26-32-441.png! > > Most tasks last less than 1 second and read less than 1.5 MB data > !image-2018-07-24-20-28-06-269.png! > > After the optimization, there are only 1615 tasks and the job last only 30 > seconds. It almost 10 times faster. > !image-2018-07-24-20-29-24-797.png! > > The median of read data is 44.2MB. > !image-2018-07-24-20-30-24-552.png! > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org