[jira] [Created] (SPARK-21661) SparkSQL can't merge load table from Hadoop
Dapeng Sun created SPARK-21661: -- Summary: SparkSQL can't merge load table from Hadoop Key: SPARK-21661 URL: https://issues.apache.org/jira/browse/SPARK-21661 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 2.2.0 Reporter: Dapeng Sun Here is the original text of external table on HDFS: {noformat} Permission Owner Group SizeLast Modified Replication Block Size Name -rw-r--r-- rootsupergroup 0 B 8/6/2017, 11:43:03 PM 3 256 MB income_band_001.dat -rw-r--r-- rootsupergroup 0 B 8/6/2017, 11:39:31 PM 3 256 MB income_band_002.dat ... -rw-r--r-- rootsupergroup 327 B 8/6/2017, 11:44:47 PM 3 256 MB income_band_530.dat {noformat} After SparkSQL load, each files have a output, even the files are 0B. For the load on Hive, the data files would be merged according the data size of original files. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-20894) Error while checkpointing to HDFS
[ https://issues.apache.org/jira/browse/SPARK-20894?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mark Grover resolved SPARK-20894. - Resolution: Fixed Fix Version/s: 2.3.0 > Error while checkpointing to HDFS > - > > Key: SPARK-20894 > URL: https://issues.apache.org/jira/browse/SPARK-20894 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.1.1 > Environment: Ubuntu, Spark 2.1.1, hadoop 2.7 >Reporter: kant kodali >Assignee: Shixiong Zhu > Fix For: 2.3.0 > > Attachments: driver_info_log, executor1_log, executor2_log > > > Dataset df2 = df1.groupBy(functions.window(df1.col("Timestamp5"), "24 > hours", "24 hours"), df1.col("AppName")).count(); > StreamingQuery query = df2.writeStream().foreach(new > KafkaSink()).option("checkpointLocation","/usr/local/hadoop/checkpoint").outputMode("update").start(); > query.awaitTermination(); > This for some reason fails with the Error > ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1) > java.lang.IllegalStateException: Error reading delta file > /usr/local/hadoop/checkpoint/state/0/0/1.delta of HDFSStateStoreProvider[id = > (op=0, part=0), dir = /usr/local/hadoop/checkpoint/state/0/0]: > /usr/local/hadoop/checkpoint/state/0/0/1.delta does not exist > I did clear all the checkpoint data in /usr/local/hadoop/checkpoint/ and all > consumer offsets in Kafka from all brokers prior to running and yet this > error still persists. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-21661) SparkSQL can't merge load table from Hadoop
[ https://issues.apache.org/jira/browse/SPARK-21661?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dapeng Sun updated SPARK-21661: --- Description: Here is the original text of external table on HDFS: {noformat} Permission Owner Group SizeLast Modified Replication Block Size Name -rw-r--r-- rootsupergroup 0 B 8/6/2017, 11:43:03 PM 3 256 MB income_band_001.dat -rw-r--r-- rootsupergroup 0 B 8/6/2017, 11:39:31 PM 3 256 MB income_band_002.dat ... -rw-r--r-- rootsupergroup 327 B 8/6/2017, 11:44:47 PM 3 256 MB income_band_530.dat {noformat} After SparkSQL load, every files have a output file, even the files are 0B. For the load on Hive, the data files would be merged according the data size of original files. was: Here is the original text of external table on HDFS: {noformat} Permission Owner Group SizeLast Modified Replication Block Size Name -rw-r--r-- rootsupergroup 0 B 8/6/2017, 11:43:03 PM 3 256 MB income_band_001.dat -rw-r--r-- rootsupergroup 0 B 8/6/2017, 11:39:31 PM 3 256 MB income_band_002.dat ... -rw-r--r-- rootsupergroup 327 B 8/6/2017, 11:44:47 PM 3 256 MB income_band_530.dat {noformat} After SparkSQL load, each files have a output, even the files are 0B. For the load on Hive, the data files would be merged according the data size of original files. > SparkSQL can't merge load table from Hadoop > --- > > Key: SPARK-21661 > URL: https://issues.apache.org/jira/browse/SPARK-21661 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.2.0 >Reporter: Dapeng Sun > > Here is the original text of external table on HDFS: > {noformat} > PermissionOwner Group SizeLast Modified Replication Block > Size Name > -rw-r--r--rootsupergroup 0 B 8/6/2017, 11:43:03 PM 3 > 256 MB income_band_001.dat > -rw-r--r--rootsupergroup 0 B 8/6/2017, 11:39:31 PM 3 > 256 MB income_band_002.dat > ... > -rw-r--r--rootsupergroup 327 B 8/6/2017, 11:44:47 PM 3 > 256 MB income_band_530.dat > {noformat} > After SparkSQL load, every files have a output file, even the files are 0B. > For the load on Hive, the data files would be merged according the data size > of original files. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21658) Adds the default None for value in na.replace in PySpark to match
[ https://issues.apache.org/jira/browse/SPARK-21658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16117718#comment-16117718 ] Liang-Chi Hsieh commented on SPARK-21658: - I will mentor a beginner to work on this. Thanks [~hyukjin.kwon]! > Adds the default None for value in na.replace in PySpark to match > - > > Key: SPARK-21658 > URL: https://issues.apache.org/jira/browse/SPARK-21658 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.2.0 >Reporter: Hyukjin Kwon >Priority: Minor > Labels: Starter > > Looks {{na.replace}} missed the default value {{None}}. > Both docs says they are aliases > http://spark.apache.org/docs/2.2.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.replace > http://spark.apache.org/docs/2.2.0/api/python/pyspark.sql.html#pyspark.sql.DataFrameNaFunctions.replace > but the default values looks different, which ends up with: > {code} > >>> df = spark.createDataFrame([('Alice', 10, 80.0)]) > >>> df.replace({"Alice": "a"}).first() > Row(_1=u'a', _2=10, _3=80.0) > >>> df.na.replace({"Alice": "a"}).first() > Traceback (most recent call last): > File "", line 1, in > TypeError: replace() takes at least 3 arguments (2 given) > {code} > To take the advantage of SPARK-19454, sounds we should match them. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-21631) Building Spark with SBT unsuccessful when source code in Mllib is modified, But with MVN is ok
[ https://issues.apache.org/jira/browse/SPARK-21631?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16117780#comment-16117780 ] Liang-Chi Hsieh edited comment on SPARK-21631 at 8/8/17 3:10 AM: - [~ibingoogle] I saw you did {{export NOLINT_ON_COMPILE}}, I don't think it will work. Please do something like {{export NOLINT_ON_COMPILE=1}}. was (Author: viirya): [~ibingoogle] I saw you did {{export NOLINT_ON_COMPILE}}, I don't thin it will work. Please do something like {{export NOLINT_ON_COMPILE=1}}. > Building Spark with SBT unsuccessful when source code in Mllib is modified, > But with MVN is ok > -- > > Key: SPARK-21631 > URL: https://issues.apache.org/jira/browse/SPARK-21631 > Project: Spark > Issue Type: Bug > Components: Build, MLlib >Affects Versions: 2.1.1 > Environment: ubuntu 14.04 > Spark 2.1.1 > MVN 3.3.9 > scala 2.11.8 >Reporter: Sean Wong > > I added > import org.apache.spark.internal.Logging > at the head of LinearRegression.scala file > Then, I try to build Spark using SBT. > However, here is the error: > *[info] Done packaging. > java.lang.RuntimeException: errors exist > at scala.sys.package$.error(package.scala:27) > at org.scalastyle.sbt.Tasks$.onHasErrors$1(Plugin.scala:132) > at > org.scalastyle.sbt.Tasks$.doScalastyleWithConfig$1(Plugin.scala:187) > at org.scalastyle.sbt.Tasks$.doScalastyle(Plugin.scala:195) > at > SparkBuild$$anonfun$cachedScalaStyle$1$$anonfun$17.apply(SparkBuild.scala:205) > at > SparkBuild$$anonfun$cachedScalaStyle$1$$anonfun$17.apply(SparkBuild.scala:192) > at sbt.FileFunction$$anonfun$cached$1.apply(Tracked.scala:235) > at sbt.FileFunction$$anonfun$cached$1.apply(Tracked.scala:235) > at > sbt.FileFunction$$anonfun$cached$2$$anonfun$apply$3$$anonfun$apply$4.apply(Tracked.scala:249) > at > sbt.FileFunction$$anonfun$cached$2$$anonfun$apply$3$$anonfun$apply$4.apply(Tracked.scala:245) > at sbt.Difference.apply(Tracked.scala:224) > at sbt.Difference.apply(Tracked.scala:206) > at > sbt.FileFunction$$anonfun$cached$2$$anonfun$apply$3.apply(Tracked.scala:245) > at > sbt.FileFunction$$anonfun$cached$2$$anonfun$apply$3.apply(Tracked.scala:244) > at sbt.Difference.apply(Tracked.scala:224) > at sbt.Difference.apply(Tracked.scala:200) > at sbt.FileFunction$$anonfun$cached$2.apply(Tracked.scala:244) > at sbt.FileFunction$$anonfun$cached$2.apply(Tracked.scala:242) > at SparkBuild$$anonfun$cachedScalaStyle$1.apply(SparkBuild.scala:212) > at SparkBuild$$anonfun$cachedScalaStyle$1.apply(SparkBuild.scala:187) > at scala.Function1$$anonfun$compose$1.apply(Function1.scala:47) > at sbt.$tilde$greater$$anonfun$$u2219$1.apply(TypeFunctions.scala:40) > at sbt.std.Transform$$anon$4.work(System.scala:63) > at > sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:228) > at > sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:228) > at sbt.ErrorHandling$.wideConvert(ErrorHandling.scala:17) > at sbt.Execute.work(Execute.scala:237) > at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:228) > at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:228) > at > sbt.ConcurrentRestrictions$$anon$4$$anonfun$1.apply(ConcurrentRestrictions.scala:159) > at sbt.CompletionService$$anon$2.call(CompletionService.scala:28) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > [error] (mllib/*:scalaStyleOnCompile) errors exist* > After this, I switch to use MVN to build Spark, Everything is ok and the > building is successful. > So is this a bug for SBT building? -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-21661) SparkSQL can't merge load table from Hadoop
[ https://issues.apache.org/jira/browse/SPARK-21661?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dapeng Sun updated SPARK-21661: --- Description: Here is the original text of external table on HDFS: {noformat} Permission Owner Group SizeLast Modified Replication Block Size Name -rw-r--r-- rootsupergroup 0 B 8/6/2017, 11:43:03 PM 3 256 MB income_band_001.dat -rw-r--r-- rootsupergroup 0 B 8/6/2017, 11:39:31 PM 3 256 MB income_band_002.dat ... -rw-r--r-- rootsupergroup 327 B 8/6/2017, 11:44:47 PM 3 256 MB income_band_530.dat {noformat} After SparkSQL load, every files have a output file, even the files are 0B. For the load on Hive, the data files would be merged according the data size of original files. Reproduce: {noformat} CREATE EXTERNAL TABLE t1 (a int,b string) STORED AS TEXTFILE LOCATION "hdfs://xxx:9000/data/t1" CREATE TABLE t2 STORED AS PARQUET AS SELECT * FROM t1; {noformat} The table t2 have many small files without data. was: Here is the original text of external table on HDFS: {noformat} Permission Owner Group SizeLast Modified Replication Block Size Name -rw-r--r-- rootsupergroup 0 B 8/6/2017, 11:43:03 PM 3 256 MB income_band_001.dat -rw-r--r-- rootsupergroup 0 B 8/6/2017, 11:39:31 PM 3 256 MB income_band_002.dat ... -rw-r--r-- rootsupergroup 327 B 8/6/2017, 11:44:47 PM 3 256 MB income_band_530.dat {noformat} After SparkSQL load, every files have a output file, even the files are 0B. For the load on Hive, the data files would be merged according the data size of original files. CREATE EXTERNAL TABLE t1 (a int,b string) > SparkSQL can't merge load table from Hadoop > --- > > Key: SPARK-21661 > URL: https://issues.apache.org/jira/browse/SPARK-21661 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.2.0 >Reporter: Dapeng Sun > > Here is the original text of external table on HDFS: > {noformat} > PermissionOwner Group SizeLast Modified Replication Block > Size Name > -rw-r--r--rootsupergroup 0 B 8/6/2017, 11:43:03 PM 3 > 256 MB income_band_001.dat > -rw-r--r--rootsupergroup 0 B 8/6/2017, 11:39:31 PM 3 > 256 MB income_band_002.dat > ... > -rw-r--r--rootsupergroup 327 B 8/6/2017, 11:44:47 PM 3 > 256 MB income_band_530.dat > {noformat} > After SparkSQL load, every files have a output file, even the files are 0B. > For the load on Hive, the data files would be merged according the data size > of original files. > Reproduce: > {noformat} > CREATE EXTERNAL TABLE t1 (a int,b string) STORED AS TEXTFILE LOCATION > "hdfs://xxx:9000/data/t1" > CREATE TABLE t2 STORED AS PARQUET AS SELECT * FROM t1; > {noformat} > The table t2 have many small files without data. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-21660) Yarn ShuffleService failed to start when the chosen directory become read-only
lishuming created SPARK-21660: - Summary: Yarn ShuffleService failed to start when the chosen directory become read-only Key: SPARK-21660 URL: https://issues.apache.org/jira/browse/SPARK-21660 Project: Spark Issue Type: Bug Components: Shuffle, YARN Affects Versions: 2.1.1 Reporter: lishuming h3. Background In our production environment,disks corrupt to `read-only` status almost once a month. Now the strategy of Yarn ShuffleService which chooses an available directory(disk) to store Shuffle info(DB) is as below(https://github.com/apache/spark/blob/master/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java#L340): 1. If NameNode's recoveryPath not empty and shuffle DB exists in the recoveryPath, return the recoveryPath; 2. If recoveryPath empty and shuffle DB exists in `yarn.nodemanager.local-dirs`, set recoveryPath as the existing DB path and return the path; 3. If recoveryPath not empty(shuffle DB not exists in the path) and shuffle DB exists in `yarn.nodemanager.local-dirs`, mv the existing shuffle DB to recoveryPath and return the path; 4. If all above don't hit, we choose the first disk of `yarn.nodemanager.local-dirs`as the recoveryPath; All above strategy don't consider the chosen disk(directory) is writable or not, so in our environment we meet such exception: {code:java} 2017-06-25 07:15:43,512 ERROR org.apache.spark.network.util.LevelDBProvider: error opening leveldb file /mnt/dfs/12/yarn/local/registeredExecutors.ldb. Creating new file, will not be able to recover state for existing applications at org.apache.spark.network.util.LevelDBProvider.initLevelDB(LevelDBProvider.java:48) at org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.(ExternalShuffleBlockResolver.java:116) at org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.(ExternalShuffleBlockResolver.java:94) at org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.(ExternalShuffleBlockHandler.java:66) at org.apache.spark.network.yarn.YarnShuffleService.serviceInit(YarnShuffleService.java:167) 2017-06-25 07:15:43,514 WARN org.apache.spark.network.util.LevelDBProvider: error deleting /mnt/dfs/12/yarn/local/registeredExecutors.ldb 2017-06-25 07:15:43,515 INFO org.apache.hadoop.service.AbstractService: Service spark_shuffle failed in state INITED; cause: java.io.IOException: Unable to create state store at org.apache.spark.network.util.LevelDBProvider.initLevelDB(LevelDBProvider.java:77) at org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.(ExternalShuffleBlockResolver.java:116) at org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.(ExternalShuffleBlockResolver.java:94) at org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.(ExternalShuffleBlockHandler.java:66) at org.apache.spark.network.yarn.YarnShuffleService.serviceInit(YarnShuffleService.java:167) at org.apache.spark.network.util.LevelDBProvider.initLevelDB(LevelDBProvider.java:75) {code} h3. Consideration 1. For many production environment, `yarn.nodemanager.local-dirs` always has more than 1 disk, so we can make a better chosen strategy to avoid the problem above; 2. Can we add a strategy to check the DB directory we choose is writable, so avoid the problem above? -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21631) Building Spark with SBT unsuccessful when source code in Mllib is modified, But with MVN is ok
[ https://issues.apache.org/jira/browse/SPARK-21631?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16117780#comment-16117780 ] Liang-Chi Hsieh commented on SPARK-21631: - [~ibingoogle] I saw you did {{export NOLINT_ON_COMPILE}}, I don't thin it will work. Please do something like {{export NOLINT_ON_COMPILE=1}}. > Building Spark with SBT unsuccessful when source code in Mllib is modified, > But with MVN is ok > -- > > Key: SPARK-21631 > URL: https://issues.apache.org/jira/browse/SPARK-21631 > Project: Spark > Issue Type: Bug > Components: Build, MLlib >Affects Versions: 2.1.1 > Environment: ubuntu 14.04 > Spark 2.1.1 > MVN 3.3.9 > scala 2.11.8 >Reporter: Sean Wong > > I added > import org.apache.spark.internal.Logging > at the head of LinearRegression.scala file > Then, I try to build Spark using SBT. > However, here is the error: > *[info] Done packaging. > java.lang.RuntimeException: errors exist > at scala.sys.package$.error(package.scala:27) > at org.scalastyle.sbt.Tasks$.onHasErrors$1(Plugin.scala:132) > at > org.scalastyle.sbt.Tasks$.doScalastyleWithConfig$1(Plugin.scala:187) > at org.scalastyle.sbt.Tasks$.doScalastyle(Plugin.scala:195) > at > SparkBuild$$anonfun$cachedScalaStyle$1$$anonfun$17.apply(SparkBuild.scala:205) > at > SparkBuild$$anonfun$cachedScalaStyle$1$$anonfun$17.apply(SparkBuild.scala:192) > at sbt.FileFunction$$anonfun$cached$1.apply(Tracked.scala:235) > at sbt.FileFunction$$anonfun$cached$1.apply(Tracked.scala:235) > at > sbt.FileFunction$$anonfun$cached$2$$anonfun$apply$3$$anonfun$apply$4.apply(Tracked.scala:249) > at > sbt.FileFunction$$anonfun$cached$2$$anonfun$apply$3$$anonfun$apply$4.apply(Tracked.scala:245) > at sbt.Difference.apply(Tracked.scala:224) > at sbt.Difference.apply(Tracked.scala:206) > at > sbt.FileFunction$$anonfun$cached$2$$anonfun$apply$3.apply(Tracked.scala:245) > at > sbt.FileFunction$$anonfun$cached$2$$anonfun$apply$3.apply(Tracked.scala:244) > at sbt.Difference.apply(Tracked.scala:224) > at sbt.Difference.apply(Tracked.scala:200) > at sbt.FileFunction$$anonfun$cached$2.apply(Tracked.scala:244) > at sbt.FileFunction$$anonfun$cached$2.apply(Tracked.scala:242) > at SparkBuild$$anonfun$cachedScalaStyle$1.apply(SparkBuild.scala:212) > at SparkBuild$$anonfun$cachedScalaStyle$1.apply(SparkBuild.scala:187) > at scala.Function1$$anonfun$compose$1.apply(Function1.scala:47) > at sbt.$tilde$greater$$anonfun$$u2219$1.apply(TypeFunctions.scala:40) > at sbt.std.Transform$$anon$4.work(System.scala:63) > at > sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:228) > at > sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:228) > at sbt.ErrorHandling$.wideConvert(ErrorHandling.scala:17) > at sbt.Execute.work(Execute.scala:237) > at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:228) > at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:228) > at > sbt.ConcurrentRestrictions$$anon$4$$anonfun$1.apply(ConcurrentRestrictions.scala:159) > at sbt.CompletionService$$anon$2.call(CompletionService.scala:28) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > [error] (mllib/*:scalaStyleOnCompile) errors exist* > After this, I switch to use MVN to build Spark, Everything is ok and the > building is successful. > So is this a bug for SBT building? -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-21306) OneVsRest Conceals Columns That May Be Relevant To Underlying Classifier
[ https://issues.apache.org/jira/browse/SPARK-21306?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yanbo Liang updated SPARK-21306: Fix Version/s: 2.1.2 2.0.3 > OneVsRest Conceals Columns That May Be Relevant To Underlying Classifier > > > Key: SPARK-21306 > URL: https://issues.apache.org/jira/browse/SPARK-21306 > Project: Spark > Issue Type: Bug > Components: ML >Affects Versions: 2.1.1 >Reporter: Cathal Garvey >Assignee: Yan Facai (颜发才) >Priority: Critical > Labels: classification, ml > Fix For: 2.0.3, 2.1.2, 2.2.1, 2.3.0 > > > Hi folks, thanks for Spark! :) > I've been learning to use `ml` and `mllib`, and I've encountered a block > while trying to use `ml.classification.OneVsRest` with > `ml.classification.LogisticRegression`. Basically, [here in the > code|https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala#L320], > only two columns are being extracted and fed to the underlying classifiers.. > however with some configurations, more than two columns are required. > Specifically: I want to do multiclass learning with Logistic Regression, on a > very imbalanced dataset. In my dataset, I have lots of imbalances, so I was > planning to use weights. I set a column, `"weight"`, as the inverse frequency > of each field, and I configured my `LogisticRegression` class to use this > column, then put it in a `OneVsRest` wrapper. > However, `OneVsRest` strips all but two columns out of a dataset before > training, so I get an error from within `LogisticRegression` that it can't > find the `"weight"` column. > It would be nice to have this fixed! I can see a few ways, but a very > conservative fix would be to include a parameter in `OneVsRest.fit` for > additional columns to `select` before passing to the underlying model. > Thanks! -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-21661) SparkSQL can't merge load table from Hadoop
[ https://issues.apache.org/jira/browse/SPARK-21661?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dapeng Sun updated SPARK-21661: --- Description: Here is the original text of external table on HDFS: {noformat} Permission Owner Group SizeLast Modified Replication Block Size Name -rw-r--r-- rootsupergroup 0 B 8/6/2017, 11:43:03 PM 3 256 MB income_band_001.dat -rw-r--r-- rootsupergroup 0 B 8/6/2017, 11:39:31 PM 3 256 MB income_band_002.dat ... -rw-r--r-- rootsupergroup 327 B 8/6/2017, 11:44:47 PM 3 256 MB income_band_530.dat {noformat} After SparkSQL load, every files have a output file, even the files are 0B. For the load on Hive, the data files would be merged according the data size of original files. CREATE EXTERNAL TABLE t1 (a int,b string) was: Here is the original text of external table on HDFS: {noformat} Permission Owner Group SizeLast Modified Replication Block Size Name -rw-r--r-- rootsupergroup 0 B 8/6/2017, 11:43:03 PM 3 256 MB income_band_001.dat -rw-r--r-- rootsupergroup 0 B 8/6/2017, 11:39:31 PM 3 256 MB income_band_002.dat ... -rw-r--r-- rootsupergroup 327 B 8/6/2017, 11:44:47 PM 3 256 MB income_band_530.dat {noformat} After SparkSQL load, every files have a output file, even the files are 0B. For the load on Hive, the data files would be merged according the data size of original files. > SparkSQL can't merge load table from Hadoop > --- > > Key: SPARK-21661 > URL: https://issues.apache.org/jira/browse/SPARK-21661 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.2.0 >Reporter: Dapeng Sun > > Here is the original text of external table on HDFS: > {noformat} > PermissionOwner Group SizeLast Modified Replication Block > Size Name > -rw-r--r--rootsupergroup 0 B 8/6/2017, 11:43:03 PM 3 > 256 MB income_band_001.dat > -rw-r--r--rootsupergroup 0 B 8/6/2017, 11:39:31 PM 3 > 256 MB income_band_002.dat > ... > -rw-r--r--rootsupergroup 327 B 8/6/2017, 11:44:47 PM 3 > 256 MB income_band_530.dat > {noformat} > After SparkSQL load, every files have a output file, even the files are 0B. > For the load on Hive, the data files would be merged according the data size > of original files. > CREATE EXTERNAL TABLE t1 (a int,b string) -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-21652) Optimizer cannot reach a fixed point on certain queries
[ https://issues.apache.org/jira/browse/SPARK-21652?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16116936#comment-16116936 ] Anton Okolnychyi edited comment on SPARK-21652 at 8/7/17 8:06 PM: -- Yes, disabling the constraint propagation helps because `InferFiltersFromConstraints` will not apply. I found several known issues regarding the performance of `InferFiltersFromConstraints` but what about the logic of `ConstantPropagation` in the above example? Should it replace such predicates as `(a = b)` with `(1 = 1)` even if it is semantically correct? was (Author: aokolnychyi): Yes, disabling the constraint propagation helps because `InferFiltersFromConstraints` will not apply. I found several issues regarding the performance of InferFiltersFromConstraints but what about the logic of `ConstantPropagation` in the above example? Should it replace such predicates as `(a = b)` with `(1 = 1)` even if it is semantically correct? > Optimizer cannot reach a fixed point on certain queries > --- > > Key: SPARK-21652 > URL: https://issues.apache.org/jira/browse/SPARK-21652 > Project: Spark > Issue Type: Bug > Components: Optimizer, SQL >Affects Versions: 2.2.0 >Reporter: Anton Okolnychyi > > The optimizer cannot reach a fixed point on the following query: > {code} > Seq((1, 2)).toDF("col1", "col2").write.saveAsTable("t1") > Seq(1, 2).toDF("col").write.saveAsTable("t2") > spark.sql("SELECT * FROM t1, t2 WHERE t1.col1 = 1 AND 1 = t1.col2 AND t1.col1 > = t2.col AND t1.col2 = t2.col").explain(true) > {code} > At some point during the optimization, InferFiltersFromConstraints infers a > new constraint '(col2#33 = col1#32)' that is appended to the join condition, > then PushPredicateThroughJoin pushes it down, ConstantPropagation replaces > '(col2#33 = col1#32)' with '1 = 1' based on other propagated constraints, > ConstantFolding replaces '1 = 1' with 'true and BooleanSimplification finally > removes this predicate. However, InferFiltersFromConstraints will again infer > '(col2#33 = col1#32)' on the next iteration and the process will continue > until the limit of iterations is reached. > See below for more details > {noformat} > === Applying Rule > org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints === > !Join Inner, ((col1#32 = col#34) && (col2#33 = col#34)) > Join Inner, ((col2#33 = col1#32) && ((col1#32 = col#34) && > (col2#33 = col#34))) > :- Filter ((isnotnull(col1#32) && isnotnull(col2#33)) && ((col1#32 = 1) && > (1 = col2#33))) :- Filter ((isnotnull(col1#32) && isnotnull(col2#33)) && > ((col1#32 = 1) && (1 = col2#33))) > : +- Relation[col1#32,col2#33] parquet > : +- Relation[col1#32,col2#33] parquet > +- Filter ((1 = col#34) && isnotnull(col#34)) > +- Filter ((1 = col#34) && isnotnull(col#34)) > +- Relation[col#34] parquet >+- Relation[col#34] parquet > > === Applying Rule > org.apache.spark.sql.catalyst.optimizer.PushPredicateThroughJoin === > !Join Inner, ((col2#33 = col1#32) && ((col1#32 = col#34) && (col2#33 = > col#34))) Join Inner, ((col1#32 = col#34) && (col2#33 = col#34)) > !:- Filter ((isnotnull(col1#32) && isnotnull(col2#33)) && ((col1#32 = 1) && > (1 = col2#33))) :- Filter (col2#33 = col1#32) > !: +- Relation[col1#32,col2#33] parquet > : +- Filter ((isnotnull(col1#32) && isnotnull(col2#33)) && > ((col1#32 = 1) && (1 = col2#33))) > !+- Filter ((1 = col#34) && isnotnull(col#34)) > : +- Relation[col1#32,col2#33] parquet > ! +- Relation[col#34] parquet > +- Filter ((1 = col#34) && isnotnull(col#34)) > ! >+- Relation[col#34] parquet > > === Applying Rule org.apache.spark.sql.catalyst.optimizer.CombineFilters === > Join Inner, ((col1#32 = col#34) && (col2#33 = col#34)) >Join Inner, ((col1#32 = col#34) && (col2#33 = col#34)) > !:- Filter (col2#33 = col1#32) >:- Filter (((isnotnull(col1#32) && isnotnull(col2#33)) && > ((col1#32 = 1) && (1 = col2#33))) && (col2#33 = col1#32)) > !: +- Filter ((isnotnull(col1#32) && isnotnull(col2#33)) && ((col1#32 = 1) > && (1 = col2#33))) : +- Relation[col1#32,col2#33] parquet > !: +- Relation[col1#32,col2#33] parquet >
[jira] [Commented] (SPARK-21656) spark dynamic allocation should not idle timeout executors when tasks still to run
[ https://issues.apache.org/jira/browse/SPARK-21656?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16117217#comment-16117217 ] Sean Owen commented on SPARK-21656: --- I do not understand what the bug is. Configuration says an executor should go away if idle for X seconds. Configuration leads tasks to schedule on other executors for X seconds. It is correct that it is removed. You are claiming that it would help the application, but, the application is not scheduling anything on the executor. It does not help the app to keep it alive. Right? this seems obvious, so we must be talking about something different. You're talking about a bunch of other logic but what would it be based on? all of the data it has says the executor will be unused, indefinitely. > spark dynamic allocation should not idle timeout executors when tasks still > to run > -- > > Key: SPARK-21656 > URL: https://issues.apache.org/jira/browse/SPARK-21656 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.1.1 >Reporter: Jong Yoon Lee > Fix For: 2.1.1 > > Original Estimate: 24h > Remaining Estimate: 24h > > Right now spark lets go of executors when they are idle for the 60s (or > configurable time). I have seen spark let them go when they are idle but they > were really needed. I have seen this issue when the scheduler was waiting to > get node locality but that takes longer then the default idle timeout. In > these jobs the number of executors goes down really small (less than 10) but > there are still like 80,000 tasks to run. > We should consider not allowing executors to idle timeout if they are still > needed according to the number of tasks to be run. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-21656) spark dynamic allocation should not idle timeout executors when tasks still to run
Jong Yoon Lee created SPARK-21656: - Summary: spark dynamic allocation should not idle timeout executors when tasks still to run Key: SPARK-21656 URL: https://issues.apache.org/jira/browse/SPARK-21656 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 2.1.1 Reporter: Jong Yoon Lee Priority: Minor Fix For: 2.1.1 Right now spark lets go of executors when they are idle for the 60s (or configurable time). I have seen spark let them go when they are idle but they were really needed. I have seen this issue when the scheduler was waiting to get node locality but that takes longer then the default idle timeout. In these jobs the number of executors goes down really small (less than 10) but there are still like 80,000 tasks to run. We should consider not allowing executors to idle timeout if they are still needed according to the number of tasks to be run. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21656) spark dynamic allocation should not idle timeout executors when tasks still to run
[ https://issues.apache.org/jira/browse/SPARK-21656?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16116975#comment-16116975 ] Sean Owen commented on SPARK-21656: --- I don't see how an executor would be idle if there is a task to run, unless of course you changed the locality settings a lot. There's no real detail here that would establish a problem in Spark. > spark dynamic allocation should not idle timeout executors when tasks still > to run > -- > > Key: SPARK-21656 > URL: https://issues.apache.org/jira/browse/SPARK-21656 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.1.1 >Reporter: Jong Yoon Lee >Priority: Minor > Fix For: 2.1.1 > > Original Estimate: 24h > Remaining Estimate: 24h > > Right now spark lets go of executors when they are idle for the 60s (or > configurable time). I have seen spark let them go when they are idle but they > were really needed. I have seen this issue when the scheduler was waiting to > get node locality but that takes longer then the default idle timeout. In > these jobs the number of executors goes down really small (less than 10) but > there are still like 80,000 tasks to run. > We should consider not allowing executors to idle timeout if they are still > needed according to the number of tasks to be run. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-21657) Spark has exponential time complexity to explode(array of structs)
Ruslan Dautkhanov created SPARK-21657: - Summary: Spark has exponential time complexity to explode(array of structs) Key: SPARK-21657 URL: https://issues.apache.org/jira/browse/SPARK-21657 Project: Spark Issue Type: Bug Components: Spark Core, SQL Affects Versions: 2.2.0, 2.1.1, 2.1.0, 2.0.0 Reporter: Ruslan Dautkhanov Priority: Critical It can take up to half a day to explode a modest-sizes nested collection (0.5m). On a recent Xeon processors. See attached pyspark script that reproduces this problem. {code} cached_df = sqlc.sql('select individ, hholdid, explode(amft) from ' + table_name).cache() print sqlc.count() {code} This script generate a number of tables, with the same total number of records across all nested collection (see `scaling` variable in loops). `scaling` variable scales up how many nested elements in each record, but by the same factor scales down number of records in the table. So total number of records stays the same. Time grows exponentially (notice log-10 vertical axis scale). -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-21374) Reading globbed paths from S3 into DF doesn't work if filesystem caching is disabled
[ https://issues.apache.org/jira/browse/SPARK-21374?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu updated SPARK-21374: - Fix Version/s: 2.2.1 > Reading globbed paths from S3 into DF doesn't work if filesystem caching is > disabled > > > Key: SPARK-21374 > URL: https://issues.apache.org/jira/browse/SPARK-21374 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.0.2, 2.1.1 >Reporter: Andrey Taptunov >Assignee: Andrey Taptunov > Fix For: 2.2.1, 2.3.0 > > > *Motivation:* > In my case I want to disable filesystem cache to be able to change S3's > access key and secret key on the fly to read from buckets with different > permissions. This works perfectly fine for RDDs but doesn't work for DFs. > *Example (works for RDD but fails for DataFrame):* > {code:java} > import org.apache.spark.SparkContext > import org.apache.spark.SparkConf > import org.apache.spark.sql.SparkSession > object SimpleApp { > def main(args: Array[String]) { > val awsAccessKeyId = "something" > val awsSecretKey = "something else" > val conf = new SparkConf().setAppName("Simple > Application").setMaster("local[*]") > val sc = new SparkContext(conf) > sc.hadoopConfiguration.set("fs.s3.awsAccessKeyId", awsAccessKeyId) > sc.hadoopConfiguration.set("fs.s3.awsSecretAccessKey", awsSecretKey) > sc.hadoopConfiguration.setBoolean("fs.s3.impl.disable.cache",true) > > sc.hadoopConfiguration.set("fs.s3.impl","org.apache.hadoop.fs.s3native.NativeS3FileSystem") > sc.hadoopConfiguration.set("fs.s3.buffer.dir","/tmp") > val spark = SparkSession.builder().config(conf).getOrCreate() > val rddFile = sc.textFile("s3://bucket/file.csv").count // ok > val rddGlob = sc.textFile("s3://bucket/*").count // ok > val dfFile = spark.read.format("csv").load("s3://bucket/file.csv").count > // ok > > val dfGlob = spark.read.format("csv").load("s3://bucket/*").count > // IllegalArgumentExcepton. AWS Access Key ID and Secret Access Key must > be specified as the username or password (respectively) > // of a s3 URL, or by setting the fs.s3.awsAccessKeyId or > fs.s3.awsSecretAccessKey properties (respectively). > > sc.stop() > } > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21655) Kill CLI for Yarn mode
[ https://issues.apache.org/jira/browse/SPARK-21655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16116989#comment-16116989 ] Thomas Graves commented on SPARK-21655: --- The UI kill requests are acl protected. You do need to have auth for your UI for that to work properly but I don't see any security issue there as you can enforce it if you want to. Unless there was something else you were referring to? > Kill CLI for Yarn mode > -- > > Key: SPARK-21655 > URL: https://issues.apache.org/jira/browse/SPARK-21655 > Project: Spark > Issue Type: Improvement > Components: YARN >Affects Versions: 2.1.1 >Reporter: Jong Yoon Lee >Priority: Minor > Fix For: 2.1.1 > > Original Estimate: 168h > Remaining Estimate: 168h > > Similar to how standalone and Mesos have the capability to safely shut down > the spark application, there should be a way to safely shut down spark on > Yarn mode. This will ensure a clean shutdown and unregistration from yarn. > This is the design doc: > https://docs.google.com/document/d/1QG8hITjLNi1D9dVR3b_hZkyrGm5FFm0u9M1KGM4y1Ak/edit?usp=sharing > and I will upload the patch soon -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-21657) Spark has exponential time complexity to explode(array of structs)
[ https://issues.apache.org/jira/browse/SPARK-21657?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ruslan Dautkhanov updated SPARK-21657: -- Description: It can take up to half a day to explode a modest-sizes nested collection (0.5m). On a recent Xeon processors. See attached pyspark script that reproduces this problem. {code} cached_df = sqlc.sql('select individ, hholdid, explode(amft) from ' + table_name).cache() print sqlc.count() {code} This script generate a number of tables, with the same total number of records across all nested collection (see `scaling` variable in loops). `scaling` variable scales up how many nested elements in each record, but by the same factor scales down number of records in the table. So total number of records stays the same. Time grows exponentially (notice log-10 vertical axis scale): !ExponentialTimeGrowth.PNG! At scaling 50,000 it took 7 hours to explode the nested collections (\!) of 8k records. After 1000 elements in nested collection, time grows exponentially. was: It can take up to half a day to explode a modest-sizes nested collection (0.5m). On a recent Xeon processors. See attached pyspark script that reproduces this problem. {code} cached_df = sqlc.sql('select individ, hholdid, explode(amft) from ' + table_name).cache() print sqlc.count() {code} This script generate a number of tables, with the same total number of records across all nested collection (see `scaling` variable in loops). `scaling` variable scales up how many nested elements in each record, but by the same factor scales down number of records in the table. So total number of records stays the same. Time grows exponentially (notice log-10 vertical axis scale): !ExponentialTimeGrowth.PNG! At scaling 50,000 it took 7 hours to explode the nested collections (\!) of 8k records. > Spark has exponential time complexity to explode(array of structs) > -- > > Key: SPARK-21657 > URL: https://issues.apache.org/jira/browse/SPARK-21657 > Project: Spark > Issue Type: Bug > Components: Spark Core, SQL >Affects Versions: 2.0.0, 2.1.0, 2.1.1, 2.2.0 >Reporter: Ruslan Dautkhanov >Priority: Critical > Attachments: ExponentialTimeGrowth.PNG, > nested-data-generator-and-test.py > > > It can take up to half a day to explode a modest-sizes nested collection > (0.5m). > On a recent Xeon processors. > See attached pyspark script that reproduces this problem. > {code} > cached_df = sqlc.sql('select individ, hholdid, explode(amft) from ' + > table_name).cache() > print sqlc.count() > {code} > This script generate a number of tables, with the same total number of > records across all nested collection (see `scaling` variable in loops). > `scaling` variable scales up how many nested elements in each record, but by > the same factor scales down number of records in the table. So total number > of records stays the same. > Time grows exponentially (notice log-10 vertical axis scale): > !ExponentialTimeGrowth.PNG! > At scaling 50,000 it took 7 hours to explode the nested collections (\!) of > 8k records. > After 1000 elements in nested collection, time grows exponentially. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-21657) Spark has exponential time complexity to explode(array of structs)
[ https://issues.apache.org/jira/browse/SPARK-21657?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ruslan Dautkhanov updated SPARK-21657: -- Attachment: ExponentialTimeGrowth.PNG nested-data-generator-and-test.py > Spark has exponential time complexity to explode(array of structs) > -- > > Key: SPARK-21657 > URL: https://issues.apache.org/jira/browse/SPARK-21657 > Project: Spark > Issue Type: Bug > Components: Spark Core, SQL >Affects Versions: 2.0.0, 2.1.0, 2.1.1, 2.2.0 >Reporter: Ruslan Dautkhanov >Priority: Critical > Attachments: ExponentialTimeGrowth.PNG, > nested-data-generator-and-test.py > > > It can take up to half a day to explode a modest-sizes nested collection > (0.5m). > On a recent Xeon processors. > See attached pyspark script that reproduces this problem. > {code} > cached_df = sqlc.sql('select individ, hholdid, explode(amft) from ' + > table_name).cache() > print sqlc.count() > {code} > This script generate a number of tables, with the same total number of > records across all nested collection (see `scaling` variable in loops). > `scaling` variable scales up how many nested elements in each record, but by > the same factor scales down number of records in the table. So total number > of records stays the same. > Time grows exponentially (notice log-10 vertical axis scale). -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-21657) Spark has exponential time complexity to explode(array of structs)
[ https://issues.apache.org/jira/browse/SPARK-21657?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ruslan Dautkhanov updated SPARK-21657: -- Description: It can take up to half a day to explode a modest-sizes nested collection (0.5m). On a recent Xeon processors. See attached pyspark script that reproduces this problem. {code} cached_df = sqlc.sql('select individ, hholdid, explode(amft) from ' + table_name).cache() print sqlc.count() {code} This script generate a number of tables, with the same total number of records across all nested collection (see `scaling` variable in loops). `scaling` variable scales up how many nested elements in each record, but by the same factor scales down number of records in the table. So total number of records stays the same. Time grows exponentially (notice log-10 vertical axis scale): !ExponentialTimeGrowth.PNG! At scaling 50,000 it took 7 hours to explode the nested collections (\!) of 8k records. was: It can take up to half a day to explode a modest-sizes nested collection (0.5m). On a recent Xeon processors. See attached pyspark script that reproduces this problem. {code} cached_df = sqlc.sql('select individ, hholdid, explode(amft) from ' + table_name).cache() print sqlc.count() {code} This script generate a number of tables, with the same total number of records across all nested collection (see `scaling` variable in loops). `scaling` variable scales up how many nested elements in each record, but by the same factor scales down number of records in the table. So total number of records stays the same. Time grows exponentially (notice log-10 vertical axis scale). > Spark has exponential time complexity to explode(array of structs) > -- > > Key: SPARK-21657 > URL: https://issues.apache.org/jira/browse/SPARK-21657 > Project: Spark > Issue Type: Bug > Components: Spark Core, SQL >Affects Versions: 2.0.0, 2.1.0, 2.1.1, 2.2.0 >Reporter: Ruslan Dautkhanov >Priority: Critical > Attachments: ExponentialTimeGrowth.PNG, > nested-data-generator-and-test.py > > > It can take up to half a day to explode a modest-sizes nested collection > (0.5m). > On a recent Xeon processors. > See attached pyspark script that reproduces this problem. > {code} > cached_df = sqlc.sql('select individ, hholdid, explode(amft) from ' + > table_name).cache() > print sqlc.count() > {code} > This script generate a number of tables, with the same total number of > records across all nested collection (see `scaling` variable in loops). > `scaling` variable scales up how many nested elements in each record, but by > the same factor scales down number of records in the table. So total number > of records stays the same. > Time grows exponentially (notice log-10 vertical axis scale): > !ExponentialTimeGrowth.PNG! > At scaling 50,000 it took 7 hours to explode the nested collections (\!) of > 8k records. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-21657) Spark has exponential time complexity to explode(array of structs)
[ https://issues.apache.org/jira/browse/SPARK-21657?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sean Owen updated SPARK-21657: -- Priority: Major (was: Critical) Issue Type: Improvement (was: Bug) (Not a bug) I doubt this is meant to be efficient at the scale you're using it. Is this a real use case? What change are you proposing? > Spark has exponential time complexity to explode(array of structs) > -- > > Key: SPARK-21657 > URL: https://issues.apache.org/jira/browse/SPARK-21657 > Project: Spark > Issue Type: Improvement > Components: Spark Core, SQL >Affects Versions: 2.0.0, 2.1.0, 2.1.1, 2.2.0 >Reporter: Ruslan Dautkhanov > Attachments: ExponentialTimeGrowth.PNG, > nested-data-generator-and-test.py > > > It can take up to half a day to explode a modest-sizes nested collection > (0.5m). > On a recent Xeon processors. > See attached pyspark script that reproduces this problem. > {code} > cached_df = sqlc.sql('select individ, hholdid, explode(amft) from ' + > table_name).cache() > print sqlc.count() > {code} > This script generate a number of tables, with the same total number of > records across all nested collection (see `scaling` variable in loops). > `scaling` variable scales up how many nested elements in each record, but by > the same factor scales down number of records in the table. So total number > of records stays the same. > Time grows exponentially (notice log-10 vertical axis scale): > !ExponentialTimeGrowth.PNG! > At scaling 50,000 it took 7 hours to explode the nested collections (\!) of > 8k records. > After 1000 elements in nested collection, time grows exponentially. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21657) Spark has exponential time complexity to explode(array of structs)
[ https://issues.apache.org/jira/browse/SPARK-21657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16117051#comment-16117051 ] Ruslan Dautkhanov commented on SPARK-21657: --- Absolutely, this is a real use case. We have a lot of production data that rely on that kind of schema for BI reporting. Other Hadoop sql engines, including Hive and Impala scale its time to explode nested collections linearly. Spark has exponential complexity to explode nested collection. There is definitely a room for improvement, as after ~40k+ records in a nested collection, most time of the job is spent in exploding; after ~200k+ records in a nested collection, Spark is not usable. > Spark has exponential time complexity to explode(array of structs) > -- > > Key: SPARK-21657 > URL: https://issues.apache.org/jira/browse/SPARK-21657 > Project: Spark > Issue Type: Improvement > Components: Spark Core, SQL >Affects Versions: 2.0.0, 2.1.0, 2.1.1, 2.2.0 >Reporter: Ruslan Dautkhanov > Attachments: ExponentialTimeGrowth.PNG, > nested-data-generator-and-test.py > > > It can take up to half a day to explode a modest-sizes nested collection > (0.5m). > On a recent Xeon processors. > See attached pyspark script that reproduces this problem. > {code} > cached_df = sqlc.sql('select individ, hholdid, explode(amft) from ' + > table_name).cache() > print sqlc.count() > {code} > This script generate a number of tables, with the same total number of > records across all nested collection (see `scaling` variable in loops). > `scaling` variable scales up how many nested elements in each record, but by > the same factor scales down number of records in the table. So total number > of records stays the same. > Time grows exponentially (notice log-10 vertical axis scale): > !ExponentialTimeGrowth.PNG! > At scaling 50,000 it took 7 hours to explode the nested collections (\!) of > 8k records. > After 1000 elements in nested collection, time grows exponentially. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-21656) spark dynamic allocation should not idle timeout executors when tasks still to run
[ https://issues.apache.org/jira/browse/SPARK-21656?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Thomas Graves updated SPARK-21656: -- Priority: Major (was: Minor) > spark dynamic allocation should not idle timeout executors when tasks still > to run > -- > > Key: SPARK-21656 > URL: https://issues.apache.org/jira/browse/SPARK-21656 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.1.1 >Reporter: Jong Yoon Lee > Fix For: 2.1.1 > > Original Estimate: 24h > Remaining Estimate: 24h > > Right now spark lets go of executors when they are idle for the 60s (or > configurable time). I have seen spark let them go when they are idle but they > were really needed. I have seen this issue when the scheduler was waiting to > get node locality but that takes longer then the default idle timeout. In > these jobs the number of executors goes down really small (less than 10) but > there are still like 80,000 tasks to run. > We should consider not allowing executors to idle timeout if they are still > needed according to the number of tasks to be run. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-21656) spark dynamic allocation should not idle timeout executors when tasks still to run
[ https://issues.apache.org/jira/browse/SPARK-21656?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Thomas Graves updated SPARK-21656: -- Issue Type: Bug (was: Improvement) > spark dynamic allocation should not idle timeout executors when tasks still > to run > -- > > Key: SPARK-21656 > URL: https://issues.apache.org/jira/browse/SPARK-21656 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.1.1 >Reporter: Jong Yoon Lee >Priority: Minor > Fix For: 2.1.1 > > Original Estimate: 24h > Remaining Estimate: 24h > > Right now spark lets go of executors when they are idle for the 60s (or > configurable time). I have seen spark let them go when they are idle but they > were really needed. I have seen this issue when the scheduler was waiting to > get node locality but that takes longer then the default idle timeout. In > these jobs the number of executors goes down really small (less than 10) but > there are still like 80,000 tasks to run. > We should consider not allowing executors to idle timeout if they are still > needed according to the number of tasks to be run. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-21657) Spark has exponential time complexity to explode(array of structs)
[ https://issues.apache.org/jira/browse/SPARK-21657?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ruslan Dautkhanov updated SPARK-21657: -- Labels: cache caching collections nested_types performance pyspark sparksql sql (was: ) > Spark has exponential time complexity to explode(array of structs) > -- > > Key: SPARK-21657 > URL: https://issues.apache.org/jira/browse/SPARK-21657 > Project: Spark > Issue Type: Improvement > Components: Spark Core, SQL >Affects Versions: 2.0.0, 2.1.0, 2.1.1, 2.2.0 >Reporter: Ruslan Dautkhanov > Labels: cache, caching, collections, nested_types, performance, > pyspark, sparksql, sql > Attachments: ExponentialTimeGrowth.PNG, > nested-data-generator-and-test.py > > > It can take up to half a day to explode a modest-sizes nested collection > (0.5m). > On a recent Xeon processors. > See attached pyspark script that reproduces this problem. > {code} > cached_df = sqlc.sql('select individ, hholdid, explode(amft) from ' + > table_name).cache() > print sqlc.count() > {code} > This script generate a number of tables, with the same total number of > records across all nested collection (see `scaling` variable in loops). > `scaling` variable scales up how many nested elements in each record, but by > the same factor scales down number of records in the table. So total number > of records stays the same. > Time grows exponentially (notice log-10 vertical axis scale): > !ExponentialTimeGrowth.PNG! > At scaling 50,000 it took 7 hours to explode the nested collections (\!) of > 8k records. > After 1000 elements in nested collection, time grows exponentially. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21656) spark dynamic allocation should not idle timeout executors when tasks still to run
[ https://issues.apache.org/jira/browse/SPARK-21656?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16117086#comment-16117086 ] Thomas Graves commented on SPARK-21656: --- The executor can be idle if the scheduler doesn't put any tasks on it. The scheduler can skip executors due to the locality settings (spark.locality.wait.node). We have seen this many times now where it gets in this harmonic where some executors get node locality and other don't. The scheduler skips many of the executors that don't get locality and eventually they idle timeout when there are 10's of thousands of tasks left. We generally see this with very large jobs that have like 1000 executors, 15 map tasks. We shouldn't allow them to idle timeout if we still need them. > spark dynamic allocation should not idle timeout executors when tasks still > to run > -- > > Key: SPARK-21656 > URL: https://issues.apache.org/jira/browse/SPARK-21656 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.1.1 >Reporter: Jong Yoon Lee > Fix For: 2.1.1 > > Original Estimate: 24h > Remaining Estimate: 24h > > Right now spark lets go of executors when they are idle for the 60s (or > configurable time). I have seen spark let them go when they are idle but they > were really needed. I have seen this issue when the scheduler was waiting to > get node locality but that takes longer then the default idle timeout. In > these jobs the number of executors goes down really small (less than 10) but > there are still like 80,000 tasks to run. > We should consider not allowing executors to idle timeout if they are still > needed according to the number of tasks to be run. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-650) Add a "setup hook" API for running initialization code on each executor
[ https://issues.apache.org/jira/browse/SPARK-650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16117090#comment-16117090 ] Louis Bergelson commented on SPARK-650: --- [~srowen] Thanks for the reply and the example. Unfortunately, I still believe that the singleton approach doesn't work well for our use case. We don't have a single resource which needs initialization and can always be wrapped in a singleton. We have a sprawl of legacy dependencies that need to be initialized in certain ways before use, and then can be called into from literally hundreds of entry points. One of the things that needs initializing is the set of FileSystemProviders that [~rdub] mentioned above. This has to be done before potentially any file access in our dependencies. It's implausible to wrap all of our library code into singleton objects and it's difficult to always call initResources() before every library call. It requires a lot of discipline on the part of the developers. Since we develop a framework for biologists to use to write tools, any thing that has to be enforced by convention isn't ideal and is likely to cause problems. People will forget to start their work by calling initResources() or worse, they'll remember to call initResources(), but only at the start of the first stage. Then they'll run into issues when executors die and are replaced during a later stage and the initialization doesn't run on the new executor. For something that could be cleanly wrapped in a singleton I agree that the semantics are obvious, but for the case where you're calling init() before running your code, the semantics are confusing and error prone. I'm sure there are complications from introducing a setup hook, but the one you mention seems simple enough to me. If a setup fails, that executor is killed and can't schedule tasks. There would probably have to be a mechanism for timing out after a certain number of failed executor starts, but I suspect that that exists already in some fashion for other sorts of failures. > Add a "setup hook" API for running initialization code on each executor > --- > > Key: SPARK-650 > URL: https://issues.apache.org/jira/browse/SPARK-650 > Project: Spark > Issue Type: New Feature > Components: Spark Core >Reporter: Matei Zaharia >Priority: Minor > > Would be useful to configure things like reporting libraries -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-650) Add a "setup hook" API for running initialization code on each executor
[ https://issues.apache.org/jira/browse/SPARK-650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16117114#comment-16117114 ] Sean Owen commented on SPARK-650: - I can also imagine cases involving legacy code that make this approach hard to implement. Still, it's possible with enough 'discipline', but this is true of wrangling any legacy code. I don't think the question of semantics is fully appreciated here. Is killing the app's other tasks on the same executor reasonable behavior? how many failures are allowed by default by this new mechanism? what do you do if init never returns? for how long? Are you willing to reschedule the task on another executor? how does it interact with locality? I know, any change raises questions, but this one raises a lot. It's a conceptual change in Spark and I'm just sure it's not going to happen 3 years in. Tasks have never had special status or lifecycle w.r.t. executors and that's a positive thing, really. > Add a "setup hook" API for running initialization code on each executor > --- > > Key: SPARK-650 > URL: https://issues.apache.org/jira/browse/SPARK-650 > Project: Spark > Issue Type: New Feature > Components: Spark Core >Reporter: Matei Zaharia >Priority: Minor > > Would be useful to configure things like reporting libraries -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21656) spark dynamic allocation should not idle timeout executors when tasks still to run
[ https://issues.apache.org/jira/browse/SPARK-21656?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16117122#comment-16117122 ] Sean Owen commented on SPARK-21656: --- Good point. In that case, what's wrong with killing the executor? if the scheduler is consistently preferring locality enough to let those executors go idle -- either those settings are wrong or those executors aren't needed. What's the argument that the app needs them if no tasks are scheduling? > spark dynamic allocation should not idle timeout executors when tasks still > to run > -- > > Key: SPARK-21656 > URL: https://issues.apache.org/jira/browse/SPARK-21656 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.1.1 >Reporter: Jong Yoon Lee > Fix For: 2.1.1 > > Original Estimate: 24h > Remaining Estimate: 24h > > Right now spark lets go of executors when they are idle for the 60s (or > configurable time). I have seen spark let them go when they are idle but they > were really needed. I have seen this issue when the scheduler was waiting to > get node locality but that takes longer then the default idle timeout. In > these jobs the number of executors goes down really small (less than 10) but > there are still like 80,000 tasks to run. > We should consider not allowing executors to idle timeout if they are still > needed according to the number of tasks to be run. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21656) spark dynamic allocation should not idle timeout executors when tasks still to run
[ https://issues.apache.org/jira/browse/SPARK-21656?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16117159#comment-16117159 ] Thomas Graves commented on SPARK-21656: --- If given more time the scheduler would have fallen back to use those for rack local or any locality.Yes you can get around this by changing the locality settings (which is what the work around is) but I don't think that is what should happen. Its 2 features that are conflicting with timeouts. And it is the defaults we ship with causing bad things to happen. I do think we should look at the locality logic in the scheduler more to see if there is anything to improve there but I haven't had time to do that. The thing is that dynamic allocation never gets more executors for the same stage once its acquired them and let them idle timeout. So if you get some weird situations you end up just having very few executors to run thousands of tasks. In my opinion its better to hold those executors and let the normal scheduler logic work. We can add a config flag for this if needed if people would like this behavior but I think that conflict with the scheduler logic. > spark dynamic allocation should not idle timeout executors when tasks still > to run > -- > > Key: SPARK-21656 > URL: https://issues.apache.org/jira/browse/SPARK-21656 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.1.1 >Reporter: Jong Yoon Lee > Fix For: 2.1.1 > > Original Estimate: 24h > Remaining Estimate: 24h > > Right now spark lets go of executors when they are idle for the 60s (or > configurable time). I have seen spark let them go when they are idle but they > were really needed. I have seen this issue when the scheduler was waiting to > get node locality but that takes longer then the default idle timeout. In > these jobs the number of executors goes down really small (less than 10) but > there are still like 80,000 tasks to run. > We should consider not allowing executors to idle timeout if they are still > needed according to the number of tasks to be run. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21656) spark dynamic allocation should not idle timeout executors when tasks still to run
[ https://issues.apache.org/jira/browse/SPARK-21656?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16117167#comment-16117167 ] Sean Owen commented on SPARK-21656: --- If the issue is "given more time" then increase the idle timeout? or indeed the locality settings. Why does this need another configuration? It sounds like it's at best a change to defaults, but, how about start by having the app care less about locality? It doesn't make sense to say that executors that are by definition not needed according to a user's config should not be reclaimed because the config is wrong. > spark dynamic allocation should not idle timeout executors when tasks still > to run > -- > > Key: SPARK-21656 > URL: https://issues.apache.org/jira/browse/SPARK-21656 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.1.1 >Reporter: Jong Yoon Lee > Fix For: 2.1.1 > > Original Estimate: 24h > Remaining Estimate: 24h > > Right now spark lets go of executors when they are idle for the 60s (or > configurable time). I have seen spark let them go when they are idle but they > were really needed. I have seen this issue when the scheduler was waiting to > get node locality but that takes longer then the default idle timeout. In > these jobs the number of executors goes down really small (less than 10) but > there are still like 80,000 tasks to run. > We should consider not allowing executors to idle timeout if they are still > needed according to the number of tasks to be run. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-21565) aggregate query fails with watermark on eventTime but works with watermark on timestamp column generated by current_timestamp
[ https://issues.apache.org/jira/browse/SPARK-21565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu resolved SPARK-21565. -- Resolution: Fixed Fix Version/s: 2.3.0 2.2.1 > aggregate query fails with watermark on eventTime but works with watermark on > timestamp column generated by current_timestamp > - > > Key: SPARK-21565 > URL: https://issues.apache.org/jira/browse/SPARK-21565 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.2.0 >Reporter: Amit Assudani > Fix For: 2.2.1, 2.3.0 > > > *Short Description: * > Aggregation query fails with eventTime as watermark column while works with > newTimeStamp column generated by running SQL with current_timestamp, > *Exception:* > {code} > Caused by: java.util.NoSuchElementException: None.get > at scala.None$.get(Option.scala:347) > at scala.None$.get(Option.scala:345) > at > org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anonfun$doExecute$3.apply(statefulOperators.scala:204) > at > org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anonfun$doExecute$3.apply(statefulOperators.scala:172) > at > org.apache.spark.sql.execution.streaming.state.package$StateStoreOps$$anonfun$1.apply(package.scala:70) > at > org.apache.spark.sql.execution.streaming.state.package$StateStoreOps$$anonfun$1.apply(package.scala:65) > at > org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:64) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > {code} > *Code to replicate:* > {code} > package test > import java.nio.file.{Files, Path, Paths} > import java.text.SimpleDateFormat > import org.apache.spark.sql.types._ > import org.apache.spark.sql.{SparkSession} > import scala.collection.JavaConverters._ > object Test1 { > def main(args: Array[String]) { > val sparkSession = SparkSession > .builder() > .master("local[*]") > .appName("Spark SQL basic example") > .config("spark.some.config.option", "some-value") > .getOrCreate() > val sdf = new SimpleDateFormat("-MM-dd HH:mm:ss") > val checkpointPath = "target/cp1" > val newEventsPath = Paths.get("target/newEvents/").toAbsolutePath > delete(newEventsPath) > delete(Paths.get(checkpointPath).toAbsolutePath) > Files.createDirectories(newEventsPath) > val dfNewEvents= newEvents(sparkSession) > dfNewEvents.createOrReplaceTempView("dfNewEvents") > //The below works - Start > //val dfNewEvents2 = sparkSession.sql("select *,current_timestamp as > newTimeStamp from dfNewEvents ").withWatermark("newTimeStamp","2 seconds") > //dfNewEvents2.createOrReplaceTempView("dfNewEvents2") > //val groupEvents = sparkSession.sql("select symbol,newTimeStamp, > count(price) as count1 from dfNewEvents2 group by symbol,newTimeStamp") > // End > > > //The below doesn't work - Start > val dfNewEvents2 = sparkSession.sql("select * from dfNewEvents > ").withWatermark("eventTime","2 seconds") > dfNewEvents2.createOrReplaceTempView("dfNewEvents2") > val groupEvents = sparkSession.sql("select symbol,eventTime, > count(price) as count1 from dfNewEvents2 group by symbol,eventTime") > // - End > > > val query1 = groupEvents.writeStream > .outputMode("append") > .format("console") > .option("checkpointLocation", checkpointPath) > .start("./myop") > val newEventFile1=newEventsPath.resolve("eventNew1.json") > Files.write(newEventFile1, List( > """{"symbol": > "GOOG","price":100,"eventTime":"2017-07-25T16:00:00.000-04:00"}""", > """{"symbol": > "GOOG","price":200,"eventTime":"2017-07-25T16:00:00.000-04:00"}""" > ).toIterable.asJava) > query1.processAllAvailable() > sparkSession.streams.awaitAnyTermination(1) > } > private def newEvents(sparkSession: SparkSession) = { > val newEvents = Paths.get("target/newEvents/").toAbsolutePath > delete(newEvents) > Files.createDirectories(newEvents) > val dfNewEvents = >
[jira] [Resolved] (SPARK-21648) Confusing assert failure in JDBC source when users misspell the option `partitionColumn`
[ https://issues.apache.org/jira/browse/SPARK-21648?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xiao Li resolved SPARK-21648. - Resolution: Fixed Fix Version/s: 2.3.0 2.2.1 > Confusing assert failure in JDBC source when users misspell the option > `partitionColumn` > > > Key: SPARK-21648 > URL: https://issues.apache.org/jira/browse/SPARK-21648 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0 >Reporter: Xiao Li >Assignee: Xiao Li > Fix For: 2.2.1, 2.3.0 > > > {noformat} > CREATE TABLE mytesttable1 > USING org.apache.spark.sql.jdbc > OPTIONS ( > url > 'jdbc:mysql://${jdbcHostname}:${jdbcPort}/${jdbcDatabase}?user=${jdbcUsername}=${jdbcPassword}', > > dbtable 'mytesttable1', > paritionColumn 'state_id', > lowerBound '0', > upperBound '52', > numPartitions '53', > fetchSize '1' > ) > {noformat} > The above option name `paritionColumn` is wrong. That mean, users did not > provide the value for `partitionColumn`. In such case, users hit a confusing > error > {noformat} > AssertionError: assertion failed > java.lang.AssertionError: assertion failed > at scala.Predef$.assert(Predef.scala:156) > at > org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:39) > at > org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:312) > {noformat} -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-21565) aggregate query fails with watermark on eventTime but works with watermark on timestamp column generated by current_timestamp
[ https://issues.apache.org/jira/browse/SPARK-21565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu reassigned SPARK-21565: Assignee: Jose Torres > aggregate query fails with watermark on eventTime but works with watermark on > timestamp column generated by current_timestamp > - > > Key: SPARK-21565 > URL: https://issues.apache.org/jira/browse/SPARK-21565 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.2.0 >Reporter: Amit Assudani >Assignee: Jose Torres > Fix For: 2.2.1, 2.3.0 > > > *Short Description: * > Aggregation query fails with eventTime as watermark column while works with > newTimeStamp column generated by running SQL with current_timestamp, > *Exception:* > {code} > Caused by: java.util.NoSuchElementException: None.get > at scala.None$.get(Option.scala:347) > at scala.None$.get(Option.scala:345) > at > org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anonfun$doExecute$3.apply(statefulOperators.scala:204) > at > org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anonfun$doExecute$3.apply(statefulOperators.scala:172) > at > org.apache.spark.sql.execution.streaming.state.package$StateStoreOps$$anonfun$1.apply(package.scala:70) > at > org.apache.spark.sql.execution.streaming.state.package$StateStoreOps$$anonfun$1.apply(package.scala:65) > at > org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:64) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > {code} > *Code to replicate:* > {code} > package test > import java.nio.file.{Files, Path, Paths} > import java.text.SimpleDateFormat > import org.apache.spark.sql.types._ > import org.apache.spark.sql.{SparkSession} > import scala.collection.JavaConverters._ > object Test1 { > def main(args: Array[String]) { > val sparkSession = SparkSession > .builder() > .master("local[*]") > .appName("Spark SQL basic example") > .config("spark.some.config.option", "some-value") > .getOrCreate() > val sdf = new SimpleDateFormat("-MM-dd HH:mm:ss") > val checkpointPath = "target/cp1" > val newEventsPath = Paths.get("target/newEvents/").toAbsolutePath > delete(newEventsPath) > delete(Paths.get(checkpointPath).toAbsolutePath) > Files.createDirectories(newEventsPath) > val dfNewEvents= newEvents(sparkSession) > dfNewEvents.createOrReplaceTempView("dfNewEvents") > //The below works - Start > //val dfNewEvents2 = sparkSession.sql("select *,current_timestamp as > newTimeStamp from dfNewEvents ").withWatermark("newTimeStamp","2 seconds") > //dfNewEvents2.createOrReplaceTempView("dfNewEvents2") > //val groupEvents = sparkSession.sql("select symbol,newTimeStamp, > count(price) as count1 from dfNewEvents2 group by symbol,newTimeStamp") > // End > > > //The below doesn't work - Start > val dfNewEvents2 = sparkSession.sql("select * from dfNewEvents > ").withWatermark("eventTime","2 seconds") > dfNewEvents2.createOrReplaceTempView("dfNewEvents2") > val groupEvents = sparkSession.sql("select symbol,eventTime, > count(price) as count1 from dfNewEvents2 group by symbol,eventTime") > // - End > > > val query1 = groupEvents.writeStream > .outputMode("append") > .format("console") > .option("checkpointLocation", checkpointPath) > .start("./myop") > val newEventFile1=newEventsPath.resolve("eventNew1.json") > Files.write(newEventFile1, List( > """{"symbol": > "GOOG","price":100,"eventTime":"2017-07-25T16:00:00.000-04:00"}""", > """{"symbol": > "GOOG","price":200,"eventTime":"2017-07-25T16:00:00.000-04:00"}""" > ).toIterable.asJava) > query1.processAllAvailable() > sparkSession.streams.awaitAnyTermination(1) > } > private def newEvents(sparkSession: SparkSession) = { > val newEvents = Paths.get("target/newEvents/").toAbsolutePath > delete(newEvents) > Files.createDirectories(newEvents) > val dfNewEvents = >
[jira] [Commented] (SPARK-21656) spark dynamic allocation should not idle timeout executors when tasks still to run
[ https://issues.apache.org/jira/browse/SPARK-21656?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16117204#comment-16117204 ] Thomas Graves commented on SPARK-21656: --- Another option would be just to add logic for spark to look at some point to see if it should try reacquiring some. All of that though seems like more logic then just not letting them go. To me Spark needs to be more resilient about this and should handle various possible conditions. User shouldn't have to tune every single job to account for weird things happening. Note that if dynamic allocation is off this doesn't happen. So why is user getting worse experience in this case. > spark dynamic allocation should not idle timeout executors when tasks still > to run > -- > > Key: SPARK-21656 > URL: https://issues.apache.org/jira/browse/SPARK-21656 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.1.1 >Reporter: Jong Yoon Lee > Fix For: 2.1.1 > > Original Estimate: 24h > Remaining Estimate: 24h > > Right now spark lets go of executors when they are idle for the 60s (or > configurable time). I have seen spark let them go when they are idle but they > were really needed. I have seen this issue when the scheduler was waiting to > get node locality but that takes longer then the default idle timeout. In > these jobs the number of executors goes down really small (less than 10) but > there are still like 80,000 tasks to run. > We should consider not allowing executors to idle timeout if they are still > needed according to the number of tasks to be run. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21656) spark dynamic allocation should not idle timeout executors when tasks still to run
[ https://issues.apache.org/jira/browse/SPARK-21656?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16117241#comment-16117241 ] Thomas Graves commented on SPARK-21656: --- As a said above it DOES help the application to keep them alive. the scheduler logic will fall back to them at some point when it goes to rack/any locality or when it finishes the tasks that are getting locality on those few nodes. Thus why I'm saying its a conflict within spark. SPARK should be resilient to any weird things happening. In the cases I have described we could actually release all of our executors and never ask for more within a stage, that is a BUG. We can change the configs to make it so that doesn't normally happen but a user could change them back and when they do that it shouldn't result in a deadlock. > spark dynamic allocation should not idle timeout executors when tasks still > to run > -- > > Key: SPARK-21656 > URL: https://issues.apache.org/jira/browse/SPARK-21656 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.1.1 >Reporter: Jong Yoon Lee > Fix For: 2.1.1 > > Original Estimate: 24h > Remaining Estimate: 24h > > Right now spark lets go of executors when they are idle for the 60s (or > configurable time). I have seen spark let them go when they are idle but they > were really needed. I have seen this issue when the scheduler was waiting to > get node locality but that takes longer then the default idle timeout. In > these jobs the number of executors goes down really small (less than 10) but > there are still like 80,000 tasks to run. > We should consider not allowing executors to idle timeout if they are still > needed according to the number of tasks to be run. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-21542) Helper functions for custom Python Persistence
[ https://issues.apache.org/jira/browse/SPARK-21542?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joseph K. Bradley resolved SPARK-21542. --- Resolution: Fixed Fix Version/s: 2.3.0 Issue resolved by pull request 18742 [https://github.com/apache/spark/pull/18742] > Helper functions for custom Python Persistence > -- > > Key: SPARK-21542 > URL: https://issues.apache.org/jira/browse/SPARK-21542 > Project: Spark > Issue Type: New Feature > Components: ML, PySpark >Affects Versions: 2.2.0 >Reporter: Ajay Saini >Assignee: Ajay Saini > Fix For: 2.3.0 > > > Currently, there is no way to easily persist Json-serializable parameters in > Python only. All parameters in Python are persisted by converting them to > Java objects and using the Java persistence implementation. In order to > facilitate the creation of custom Python-only pipeline stages, it would be > good to have a Python-only persistence framework so that these stages do not > need to be implemented in Scala for persistence. > This task involves: > - Adding implementations for DefaultParamsReadable, DefaultParamsWriteable, > DefaultParamsReader, and DefaultParamsWriter in pyspark. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Closed] (SPARK-21362) Add JDBCDialect for Apache Drill
[ https://issues.apache.org/jira/browse/SPARK-21362?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Reynold Xin closed SPARK-21362. --- Resolution: Won't Fix See my comment on github ... > Add JDBCDialect for Apache Drill > > > Key: SPARK-21362 > URL: https://issues.apache.org/jira/browse/SPARK-21362 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.1.1 >Reporter: David Radford >Priority: Minor > > Apache Drill does not allow quotation marks (") so a custom jdbc dialect is > needed to return the field names surround in tick marks (`) similar to how > MySQL dialect works. This requires an override to the method: quoteIdentifier -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-21659) FileStreamSink checks for _spark_metadata even if path has globs
peay created SPARK-21659: Summary: FileStreamSink checks for _spark_metadata even if path has globs Key: SPARK-21659 URL: https://issues.apache.org/jira/browse/SPARK-21659 Project: Spark Issue Type: Bug Components: Input/Output, SQL Affects Versions: 2.2.0 Reporter: peay Priority: Minor I am using the GCS connector for Hadoop, and reading a Dataframe using {{context.read.format("parquet").load("...")}}. When my URI has glob patterns of the form {code} gs://uri/{a,b,c} {code} or as below, Spark incorrectly assumes that it is a single file path, and produces this rather verbose exception: {code} java.net.URISyntaxException: Illegal character in path at index xx: gs://bucket-name/path/to/date=2017-0{1-29,1-30,1-31,2-01,2-02,2-03,2-04}*/_spark_metadata at java.net.URI$Parser.fail(URI.java:2848) at java.net.URI$Parser.checkChars(URI.java:3021) at java.net.URI$Parser.parseHierarchical(URI.java:3105) at java.net.URI$Parser.parse(URI.java:3053) at java.net.URI.(URI.java:588) at com.google.cloud.hadoop.gcsio.LegacyPathCodec.getPath(LegacyPathCodec.java:93) at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem.getGcsPath(GoogleHadoopFileSystem.java:171) at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.getFileStatus(GoogleHadoopFileSystemBase.java:1421) at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1436) at org.apache.spark.sql.execution.streaming.FileStreamSink$.hasMetadata(FileStreamSink.scala:46) at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:320) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:156) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:280) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:214) at java.lang.Thread.run(Thread.java:748) {code} I am not quite sure if the GCS connector deviates from the HCFS standard here in terms of behavior, but this makes logs really hard to read for jobs that load a bunch of files like this. https://github.com/apache/spark/blob/3ac60930865209bf804ec6506d9d8b0ddd613157/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala#L39 already has an explicit {{case Seq(singlePath) =>}}, except that it is misleading because {{singlePath}} can have wildcards. In addition, it could check for non-escaped glob characters, like {code} {, }, ?, * {code} and go to the multiple-paths case when those are present, where looking for metadata is skipped. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21565) aggregate query fails with watermark on eventTime but works with watermark on timestamp column generated by current_timestamp
[ https://issues.apache.org/jira/browse/SPARK-21565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16117173#comment-16117173 ] Shixiong Zhu commented on SPARK-21565: -- Resolved by https://github.com/apache/spark/pull/18840 > aggregate query fails with watermark on eventTime but works with watermark on > timestamp column generated by current_timestamp > - > > Key: SPARK-21565 > URL: https://issues.apache.org/jira/browse/SPARK-21565 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.2.0 >Reporter: Amit Assudani >Assignee: Jose Torres > Fix For: 2.2.1, 2.3.0 > > > *Short Description: * > Aggregation query fails with eventTime as watermark column while works with > newTimeStamp column generated by running SQL with current_timestamp, > *Exception:* > {code} > Caused by: java.util.NoSuchElementException: None.get > at scala.None$.get(Option.scala:347) > at scala.None$.get(Option.scala:345) > at > org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anonfun$doExecute$3.apply(statefulOperators.scala:204) > at > org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anonfun$doExecute$3.apply(statefulOperators.scala:172) > at > org.apache.spark.sql.execution.streaming.state.package$StateStoreOps$$anonfun$1.apply(package.scala:70) > at > org.apache.spark.sql.execution.streaming.state.package$StateStoreOps$$anonfun$1.apply(package.scala:65) > at > org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:64) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > {code} > *Code to replicate:* > {code} > package test > import java.nio.file.{Files, Path, Paths} > import java.text.SimpleDateFormat > import org.apache.spark.sql.types._ > import org.apache.spark.sql.{SparkSession} > import scala.collection.JavaConverters._ > object Test1 { > def main(args: Array[String]) { > val sparkSession = SparkSession > .builder() > .master("local[*]") > .appName("Spark SQL basic example") > .config("spark.some.config.option", "some-value") > .getOrCreate() > val sdf = new SimpleDateFormat("-MM-dd HH:mm:ss") > val checkpointPath = "target/cp1" > val newEventsPath = Paths.get("target/newEvents/").toAbsolutePath > delete(newEventsPath) > delete(Paths.get(checkpointPath).toAbsolutePath) > Files.createDirectories(newEventsPath) > val dfNewEvents= newEvents(sparkSession) > dfNewEvents.createOrReplaceTempView("dfNewEvents") > //The below works - Start > //val dfNewEvents2 = sparkSession.sql("select *,current_timestamp as > newTimeStamp from dfNewEvents ").withWatermark("newTimeStamp","2 seconds") > //dfNewEvents2.createOrReplaceTempView("dfNewEvents2") > //val groupEvents = sparkSession.sql("select symbol,newTimeStamp, > count(price) as count1 from dfNewEvents2 group by symbol,newTimeStamp") > // End > > > //The below doesn't work - Start > val dfNewEvents2 = sparkSession.sql("select * from dfNewEvents > ").withWatermark("eventTime","2 seconds") > dfNewEvents2.createOrReplaceTempView("dfNewEvents2") > val groupEvents = sparkSession.sql("select symbol,eventTime, > count(price) as count1 from dfNewEvents2 group by symbol,eventTime") > // - End > > > val query1 = groupEvents.writeStream > .outputMode("append") > .format("console") > .option("checkpointLocation", checkpointPath) > .start("./myop") > val newEventFile1=newEventsPath.resolve("eventNew1.json") > Files.write(newEventFile1, List( > """{"symbol": > "GOOG","price":100,"eventTime":"2017-07-25T16:00:00.000-04:00"}""", > """{"symbol": > "GOOG","price":200,"eventTime":"2017-07-25T16:00:00.000-04:00"}""" > ).toIterable.asJava) > query1.processAllAvailable() > sparkSession.streams.awaitAnyTermination(1) > } > private def newEvents(sparkSession: SparkSession) = { > val newEvents = Paths.get("target/newEvents/").toAbsolutePath > delete(newEvents) > Files.createDirectories(newEvents) > val dfNewEvents = >
[jira] [Created] (SPARK-21658) Adds the default None for value in na.replace in PySpark to match
Hyukjin Kwon created SPARK-21658: Summary: Adds the default None for value in na.replace in PySpark to match Key: SPARK-21658 URL: https://issues.apache.org/jira/browse/SPARK-21658 Project: Spark Issue Type: Bug Components: PySpark Affects Versions: 2.2.0 Reporter: Hyukjin Kwon Priority: Minor Looks {{na.replace}} missed the default value {{None}}. Both docs says they are aliases http://spark.apache.org/docs/2.2.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.replace http://spark.apache.org/docs/2.2.0/api/python/pyspark.sql.html#pyspark.sql.DataFrameNaFunctions.replace but the default values looks different, which ends up with: {code} >>> df = spark.createDataFrame([('Alice', 10, 80.0)]) >>> df.replace({"Alice": "a"}).first() Row(_1=u'a', _2=10, _3=80.0) >>> df.na.replace({"Alice": "a"}).first() Traceback (most recent call last): File "", line 1, in TypeError: replace() takes at least 3 arguments (2 given) {code} To take the advantage of SPARK-19454, sounds we should match them. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-18535) Redact sensitive information from Spark logs and UI
[ https://issues.apache.org/jira/browse/SPARK-18535?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Marcelo Vanzin updated SPARK-18535: --- Fix Version/s: 2.1.2 > Redact sensitive information from Spark logs and UI > --- > > Key: SPARK-18535 > URL: https://issues.apache.org/jira/browse/SPARK-18535 > Project: Spark > Issue Type: Bug > Components: Web UI, YARN >Affects Versions: 2.1.0 >Reporter: Mark Grover >Assignee: Mark Grover > Fix For: 2.1.2, 2.2.0 > > Attachments: redacted.png > > > A Spark user may have to provide a sensitive information for a Spark > configuration property, or a source out an environment variable in the > executor or driver environment that contains sensitive information. A good > example of this would be when reading/writing data from/to S3 using Spark. > The S3 secret and S3 access key can be placed in a [hadoop credential > provider|https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/CredentialProviderAPI.html]. > However, one still needs to provide the password for the credential provider > to Spark, which is typically supplied as an environment variable to the > driver and executor environments. This environment variable shows up in logs, > and may also show up in the UI. > 1. For logs, it shows up in a few places: > 1A. Event logs under {{SparkListenerEnvironmentUpdate}} event. > 1B. YARN logs, when printing the executor launch context. > 2. For UI, it would show up in the _Environment_ tab, but it is redacted if > it contains the words "password" or "secret" in it. And, these magic words > are > [hardcoded|https://github.com/apache/spark/blob/a2d464770cd183daa7d727bf377bde9c21e29e6a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentPage.scala#L30] > and hence not customizable. > This JIRA is to track the work to make sure sensitive information is redacted > from all logs and UIs in Spark, while still being passed on to all relevant > places it needs to get passed on to. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-21542) Helper functions for custom Python Persistence
[ https://issues.apache.org/jira/browse/SPARK-21542?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joseph K. Bradley reassigned SPARK-21542: - Assignee: Ajay Saini > Helper functions for custom Python Persistence > -- > > Key: SPARK-21542 > URL: https://issues.apache.org/jira/browse/SPARK-21542 > Project: Spark > Issue Type: New Feature > Components: ML, PySpark >Affects Versions: 2.2.0 >Reporter: Ajay Saini >Assignee: Ajay Saini > > Currently, there is no way to easily persist Json-serializable parameters in > Python only. All parameters in Python are persisted by converting them to > Java objects and using the Java persistence implementation. In order to > facilitate the creation of custom Python-only pipeline stages, it would be > good to have a Python-only persistence framework so that these stages do not > need to be implemented in Scala for persistence. > This task involves: > - Adding implementations for DefaultParamsReadable, DefaultParamsWriteable, > DefaultParamsReader, and DefaultParamsWriter in pyspark. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21656) spark dynamic allocation should not idle timeout executors when tasks still to run
[ https://issues.apache.org/jira/browse/SPARK-21656?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16117200#comment-16117200 ] Thomas Graves commented on SPARK-21656: --- why not fix the bug in dynamic allocation? changing configs is a work around. like everything else what are the best configs for everyone's job. dynamic allocation is supposed to get you enough executors to run all your tasks in parallel (up to your config limits). This is not allowing that and its code within SPARK that is doing it, not user code. Thus a bug in my opinion. The documentation even hints at it. The problem is we just didn't catch this issue that in the initial code. From: http://spark.apache.org/docs/2.2.0/job-scheduling.html#remove-policy "in that an executor should not be idle if there are still pending tasks to be scheduled" One other option here would be to actually let them go and get new ones. This may or may not help depending on if it can get ones with better locality. it might also just waste time releasing and reacquiring. I personally would also be ok with changing the locality wait for node to 0 which generally works around the problem, but I think this could happen in other cases and we should fix this bug too. For instance say your driver does a full GC and can't schedule things within 60 seconds, you lose those executors and we never get them back. What if you have temporary network congestion and your network timeout is plenty big to allow for, you could idle timeout. yes we could increase the idle timeout, but in the normal working case the idle timeout is meant to be cases where you don't have any tasks to run on this executor. Your stage has completed enough you can release some. This is not that case. > spark dynamic allocation should not idle timeout executors when tasks still > to run > -- > > Key: SPARK-21656 > URL: https://issues.apache.org/jira/browse/SPARK-21656 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.1.1 >Reporter: Jong Yoon Lee > Fix For: 2.1.1 > > Original Estimate: 24h > Remaining Estimate: 24h > > Right now spark lets go of executors when they are idle for the 60s (or > configurable time). I have seen spark let them go when they are idle but they > were really needed. I have seen this issue when the scheduler was waiting to > get node locality but that takes longer then the default idle timeout. In > these jobs the number of executors goes down really small (less than 10) but > there are still like 80,000 tasks to run. > We should consider not allowing executors to idle timeout if they are still > needed according to the number of tasks to be run. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-21653) Complement SQL expression document
[ https://issues.apache.org/jira/browse/SPARK-21653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16116324#comment-16116324 ] Hyukjin Kwon edited comment on SPARK-21653 at 8/7/17 11:37 AM: --- Yes, there were some discussion for adding arguments in my PR, IIRC, about correctness and adding tests accordingly. I am still fond of describing arguments as long as they look mostly correct in general and the examples produce the expected results because these are some information that did not exist before and we now generate documentation for SQL builtin functions. I am willing to push this if there are no strong objections now. was (Author: hyukjin.kwon): Yes, there was some discussion for adding arguments in my PR. I am still fond of describing arguments as long as they look mostly correct in general and the examples produce the expected results because these are some information that did not exist before and we now generate documentation for SQL builtin functions. I am willing to push this if there are no strong objections now. > Complement SQL expression document > -- > > Key: SPARK-21653 > URL: https://issues.apache.org/jira/browse/SPARK-21653 > Project: Spark > Issue Type: Umbrella > Components: SQL >Affects Versions: 2.2.0 >Reporter: Liang-Chi Hsieh > > We have {{ExpressionDescription}} for SQL expressions. The expression > description tells what an expression's usage, arguments, and examples. Users > can understand how to use those expressions by {{DESCRIBE}} command in SQL: > {code} > spark-sql> DESCRIBE FUNCTION EXTENDED In; > Function: in > Class: org.apache.spark.sql.catalyst.expressions.In > Usage: expr1 in(expr2, expr3, ...) - Returns true if `expr` equals to any > valN. > Extended Usage: > No example/argument for in. > {code} > Not all SQL expressions have complete description now. For example, in the > above case, there is no example for function {{in}}. This task is going to > complement the expression description. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19552) Upgrade Netty version to 4.1.8 final
[ https://issues.apache.org/jira/browse/SPARK-19552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16116513#comment-16116513 ] Pawel Szulc commented on SPARK-19552: - What I see is netty upgrade, not shading https://github.com/apache/spark/pull/16888/files#diff-c0db0846e805ed986c3fd2f1ceca4fe1L141 Or are we talking about some other PR that I'm not aware of? > Upgrade Netty version to 4.1.8 final > > > Key: SPARK-19552 > URL: https://issues.apache.org/jira/browse/SPARK-19552 > Project: Spark > Issue Type: Improvement > Components: Build >Affects Versions: 2.1.0 >Reporter: Adam Roberts >Priority: Minor > > Netty 4.1.8 was recently released but isn't API compatible with previous > major versions (like Netty 4.0.x), see > http://netty.io/news/2017/01/30/4-0-44-Final-4-1-8-Final.html for details. > This version does include a fix for a security concern but not one we'd be > exposed to with Spark "out of the box". Let's upgrade the version we use to > be on the safe side as the security fix I'm especially interested in is not > available in the 4.0.x release line. > We should move up anyway to take on a bunch of other big fixes cited in the > release notes (and if anyone were to use Spark with netty and tcnative, they > shouldn't be exposed to the security problem) - we should be good citizens > and make this change. > As this 4.1 version involves API changes we'll need to implement a few > methods and possibly adjust the Sasl tests. This JIRA and associated pull > request starts the process which I'll work on - and any help would be much > appreciated! Currently I know: > {code} > @Override > public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise > promise) > throws Exception { > if (!foundEncryptionHandler) { > foundEncryptionHandler = > ctx.channel().pipeline().get(encryptHandlerName) != null; <-- this > returns false and causes test failures > } > ctx.write(msg, promise); > } > {code} > Here's what changes will be required (at least): > {code} > common/network-common/src/main/java/org/apache/spark/network/crypto/TransportCipher.java{code} > requires touch, retain and transferred methods > {code} > common/network-common/src/main/java/org/apache/spark/network/sasl/SaslEncryption.java{code} > requires the above methods too > {code}common/network-common/src/test/java/org/apache/spark/network/protocol/MessageWithHeaderSuite.java{code} > With "dummy" implementations so we can at least compile and test, we'll see > five new test failures to address. > These are > {code} > org.apache.spark.network.sasl.SparkSaslSuite.testFileRegionEncryption > org.apache.spark.network.sasl.SparkSaslSuite.testSaslEncryption > org.apache.spark.network.shuffle.ExternalShuffleSecuritySuite.testEncryption > org.apache.spark.rpc.netty.NettyRpcEnvSuite.send with SASL encryption > org.apache.spark.rpc.netty.NettyRpcEnvSuite.ask with SASL encryption > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21460) Spark dynamic allocation breaks when ListenerBus event queue runs full
[ https://issues.apache.org/jira/browse/SPARK-21460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16116525#comment-16116525 ] Yuming Wang commented on SPARK-21460: - [~Tagar] After [SPARK-19146|https://github.com/apache/spark/pull/16527], I found log event process too slow, so I disable {{spark.eventLog.enabled=false}}. It works well right now. > Spark dynamic allocation breaks when ListenerBus event queue runs full > -- > > Key: SPARK-21460 > URL: https://issues.apache.org/jira/browse/SPARK-21460 > Project: Spark > Issue Type: Bug > Components: Scheduler, YARN >Affects Versions: 2.0.0, 2.0.2, 2.1.0, 2.1.1, 2.2.0 > Environment: Spark 2.1 > Hadoop 2.6 >Reporter: Ruslan Dautkhanov >Priority: Critical > Labels: dynamic_allocation, performance, scheduler, yarn > > When ListenerBus event queue runs full, spark dynamic allocation stops > working - Spark fails to shrink number of executors when there are no active > jobs (Spark driver "thinks" there are active jobs since it didn't capture > when they finished) . > ps. What's worse it also makes Spark flood YARN RM with reservation requests, > so YARN preemption doesn't function properly too (we're on Spark 2.1 / Hadoop > 2.6). -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-21544) Test jar of some module should not install or deploy twice
[ https://issues.apache.org/jira/browse/SPARK-21544?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sean Owen resolved SPARK-21544. --- Resolution: Fixed Fix Version/s: 2.3.0 Issue resolved by pull request 18745 [https://github.com/apache/spark/pull/18745] > Test jar of some module should not install or deploy twice > -- > > Key: SPARK-21544 > URL: https://issues.apache.org/jira/browse/SPARK-21544 > Project: Spark > Issue Type: Improvement > Components: Deploy >Affects Versions: 1.6.1, 2.1.0 >Reporter: zhoukang >Assignee: zhoukang >Priority: Minor > Fix For: 2.3.0 > > > For moudle below: > common/network-common > streaming > sql/core > sql/catalyst > tests.jar will install or deploy twice.Like: > {code:java} > [INFO] Installing > /home/mi/Work/Spark/scala2.11/spark/streaming/target/spark-streaming_2.11-2.1.0-mdh2.1.0.1-SNAPSHOT-tests.jar > to > /home/mi/.m2/repository/org/apache/spark/spark-streaming_2.11/2.1.0-mdh2.1.0.1-SNAPSHOT/spark-streaming_2.11-2.1.0-mdh2.1.0.1-SNAPSHOT-tests.jar > [DEBUG] Writing tracking file > /home/mi/.m2/repository/org/apache/spark/spark-streaming_2.11/2.1.0-mdh2.1.0.1-SNAPSHOT/_remote.repositories > [DEBUG] Installing > org.apache.spark:spark-streaming_2.11:2.1.0-mdh2.1.0.1-SNAPSHOT/maven-metadata.xml > to > /home/mi/.m2/repository/org/apache/spark/spark-streaming_2.11/2.1.0-mdh2.1.0.1-SNAPSHOT/maven-metadata-local.xml > [DEBUG] Installing org.apache.spark:spark-streaming_2.11/maven-metadata.xml > to > /home/mi/.m2/repository/org/apache/spark/spark-streaming_2.11/maven-metadata-local.xml > [INFO] Installing > /home/mi/Work/Spark/scala2.11/spark/streaming/target/spark-streaming_2.11-2.1.0-mdh2.1.0.1-SNAPSHOT-tests.jar > to > /home/mi/.m2/repository/org/apache/spark/spark-streaming_2.11/2.1.0-mdh2.1.0.1-SNAPSHOT/spark-streaming_2.11-2.1.0-mdh2.1.0.1-SNAPSHOT-tests.jar > [DEBUG] Skipped re-installing > /home/mi/Work/Spark/scala2.11/spark/streaming/target/spark-streaming_2.11-2.1.0-mdh2.1.0.1-SNAPSHOT-tests.jar > to > /home/mi/.m2/repository/org/apache/spark/spark-streaming_2.11/2.1.0-mdh2.1.0.1-SNAPSHOT/spark-streaming_2.11-2.1.0-mdh2.1.0.1-SNAPSHOT-tests.jar, > seems unchanged > {code} > The reason is below: > {code:java} > [DEBUG] (f) artifact = > org.apache.spark:spark-streaming_2.11:jar:2.1.0-mdh2.1.0.1-SNAPSHOT > [DEBUG] (f) attachedArtifacts = > [org.apache.spark:spark-streaming_2.11:test-jar:tests:2.1.0-mdh2.1.0.1-SNAPSHOT, > org.apache.spark:spark-streaming_2.11:jar:tests:2.1.0-mdh2.1.0.1-SNAPSHOT, > org.apache.spark:spark > -streaming_2.11:java-source:sources:2.1.0-mdh2.1.0.1-SNAPSHOT, > org.apache.spark:spark-streaming_2.11:java-source:test-sources:2.1.0-mdh2.1.0.1-SNAPSHOT, > org.apache.spark:spark-streaming_2.11:javadoc:javadoc:2.1.0 > -mdh2.1.0.1-SNAPSHOT] > {code} > when executing 'mvn deploy' to nexus during release.I will fail since release > nexus can not be override. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19552) Upgrade Netty version to 4.1.8 final
[ https://issues.apache.org/jira/browse/SPARK-19552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16116507#comment-16116507 ] Pawel Szulc commented on SPARK-19552: - [~srowen] can u elaborate why u think that shading will still bring issues with netty compatibility? if I PR a shade of netty and all tests will pass, will that be a good indicator that shading works? > Upgrade Netty version to 4.1.8 final > > > Key: SPARK-19552 > URL: https://issues.apache.org/jira/browse/SPARK-19552 > Project: Spark > Issue Type: Improvement > Components: Build >Affects Versions: 2.1.0 >Reporter: Adam Roberts >Priority: Minor > > Netty 4.1.8 was recently released but isn't API compatible with previous > major versions (like Netty 4.0.x), see > http://netty.io/news/2017/01/30/4-0-44-Final-4-1-8-Final.html for details. > This version does include a fix for a security concern but not one we'd be > exposed to with Spark "out of the box". Let's upgrade the version we use to > be on the safe side as the security fix I'm especially interested in is not > available in the 4.0.x release line. > We should move up anyway to take on a bunch of other big fixes cited in the > release notes (and if anyone were to use Spark with netty and tcnative, they > shouldn't be exposed to the security problem) - we should be good citizens > and make this change. > As this 4.1 version involves API changes we'll need to implement a few > methods and possibly adjust the Sasl tests. This JIRA and associated pull > request starts the process which I'll work on - and any help would be much > appreciated! Currently I know: > {code} > @Override > public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise > promise) > throws Exception { > if (!foundEncryptionHandler) { > foundEncryptionHandler = > ctx.channel().pipeline().get(encryptHandlerName) != null; <-- this > returns false and causes test failures > } > ctx.write(msg, promise); > } > {code} > Here's what changes will be required (at least): > {code} > common/network-common/src/main/java/org/apache/spark/network/crypto/TransportCipher.java{code} > requires touch, retain and transferred methods > {code} > common/network-common/src/main/java/org/apache/spark/network/sasl/SaslEncryption.java{code} > requires the above methods too > {code}common/network-common/src/test/java/org/apache/spark/network/protocol/MessageWithHeaderSuite.java{code} > With "dummy" implementations so we can at least compile and test, we'll see > five new test failures to address. > These are > {code} > org.apache.spark.network.sasl.SparkSaslSuite.testFileRegionEncryption > org.apache.spark.network.sasl.SparkSaslSuite.testSaslEncryption > org.apache.spark.network.shuffle.ExternalShuffleSecuritySuite.testEncryption > org.apache.spark.rpc.netty.NettyRpcEnvSuite.send with SASL encryption > org.apache.spark.rpc.netty.NettyRpcEnvSuite.ask with SASL encryption > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19552) Upgrade Netty version to 4.1.8 final
[ https://issues.apache.org/jira/browse/SPARK-19552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16116510#comment-16116510 ] Sean Owen commented on SPARK-19552: --- Because it's no longer on the classpath, which is rather the point, but, still a user-visible change. Yes getting the change working and tested is great, and the existing PR did get that far. It might need an update. > Upgrade Netty version to 4.1.8 final > > > Key: SPARK-19552 > URL: https://issues.apache.org/jira/browse/SPARK-19552 > Project: Spark > Issue Type: Improvement > Components: Build >Affects Versions: 2.1.0 >Reporter: Adam Roberts >Priority: Minor > > Netty 4.1.8 was recently released but isn't API compatible with previous > major versions (like Netty 4.0.x), see > http://netty.io/news/2017/01/30/4-0-44-Final-4-1-8-Final.html for details. > This version does include a fix for a security concern but not one we'd be > exposed to with Spark "out of the box". Let's upgrade the version we use to > be on the safe side as the security fix I'm especially interested in is not > available in the 4.0.x release line. > We should move up anyway to take on a bunch of other big fixes cited in the > release notes (and if anyone were to use Spark with netty and tcnative, they > shouldn't be exposed to the security problem) - we should be good citizens > and make this change. > As this 4.1 version involves API changes we'll need to implement a few > methods and possibly adjust the Sasl tests. This JIRA and associated pull > request starts the process which I'll work on - and any help would be much > appreciated! Currently I know: > {code} > @Override > public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise > promise) > throws Exception { > if (!foundEncryptionHandler) { > foundEncryptionHandler = > ctx.channel().pipeline().get(encryptHandlerName) != null; <-- this > returns false and causes test failures > } > ctx.write(msg, promise); > } > {code} > Here's what changes will be required (at least): > {code} > common/network-common/src/main/java/org/apache/spark/network/crypto/TransportCipher.java{code} > requires touch, retain and transferred methods > {code} > common/network-common/src/main/java/org/apache/spark/network/sasl/SaslEncryption.java{code} > requires the above methods too > {code}common/network-common/src/test/java/org/apache/spark/network/protocol/MessageWithHeaderSuite.java{code} > With "dummy" implementations so we can at least compile and test, we'll see > five new test failures to address. > These are > {code} > org.apache.spark.network.sasl.SparkSaslSuite.testFileRegionEncryption > org.apache.spark.network.sasl.SparkSaslSuite.testSaslEncryption > org.apache.spark.network.shuffle.ExternalShuffleSecuritySuite.testEncryption > org.apache.spark.rpc.netty.NettyRpcEnvSuite.send with SASL encryption > org.apache.spark.rpc.netty.NettyRpcEnvSuite.ask with SASL encryption > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19552) Upgrade Netty version to 4.1.8 final
[ https://issues.apache.org/jira/browse/SPARK-19552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16116517#comment-16116517 ] Sean Owen commented on SPARK-19552: --- There are two steps here: get the update working (because it's not compatible with 4.0), then get it shaded. > Upgrade Netty version to 4.1.8 final > > > Key: SPARK-19552 > URL: https://issues.apache.org/jira/browse/SPARK-19552 > Project: Spark > Issue Type: Improvement > Components: Build >Affects Versions: 2.1.0 >Reporter: Adam Roberts >Priority: Minor > > Netty 4.1.8 was recently released but isn't API compatible with previous > major versions (like Netty 4.0.x), see > http://netty.io/news/2017/01/30/4-0-44-Final-4-1-8-Final.html for details. > This version does include a fix for a security concern but not one we'd be > exposed to with Spark "out of the box". Let's upgrade the version we use to > be on the safe side as the security fix I'm especially interested in is not > available in the 4.0.x release line. > We should move up anyway to take on a bunch of other big fixes cited in the > release notes (and if anyone were to use Spark with netty and tcnative, they > shouldn't be exposed to the security problem) - we should be good citizens > and make this change. > As this 4.1 version involves API changes we'll need to implement a few > methods and possibly adjust the Sasl tests. This JIRA and associated pull > request starts the process which I'll work on - and any help would be much > appreciated! Currently I know: > {code} > @Override > public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise > promise) > throws Exception { > if (!foundEncryptionHandler) { > foundEncryptionHandler = > ctx.channel().pipeline().get(encryptHandlerName) != null; <-- this > returns false and causes test failures > } > ctx.write(msg, promise); > } > {code} > Here's what changes will be required (at least): > {code} > common/network-common/src/main/java/org/apache/spark/network/crypto/TransportCipher.java{code} > requires touch, retain and transferred methods > {code} > common/network-common/src/main/java/org/apache/spark/network/sasl/SaslEncryption.java{code} > requires the above methods too > {code}common/network-common/src/test/java/org/apache/spark/network/protocol/MessageWithHeaderSuite.java{code} > With "dummy" implementations so we can at least compile and test, we'll see > five new test failures to address. > These are > {code} > org.apache.spark.network.sasl.SparkSaslSuite.testFileRegionEncryption > org.apache.spark.network.sasl.SparkSaslSuite.testSaslEncryption > org.apache.spark.network.shuffle.ExternalShuffleSecuritySuite.testEncryption > org.apache.spark.rpc.netty.NettyRpcEnvSuite.send with SASL encryption > org.apache.spark.rpc.netty.NettyRpcEnvSuite.ask with SASL encryption > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21653) Complement SQL expression document
[ https://issues.apache.org/jira/browse/SPARK-21653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16116282#comment-16116282 ] Liang-Chi Hsieh commented on SPARK-21653: - [~hyukjin.kwon] oh, yeah, looks like it's. As SPARK-17963 titled as "Add examples (extend) in each expression", I'm not sure if we intentionally leave the expressions without detailed description? > Complement SQL expression document > -- > > Key: SPARK-21653 > URL: https://issues.apache.org/jira/browse/SPARK-21653 > Project: Spark > Issue Type: Umbrella > Components: SQL >Affects Versions: 2.2.0 >Reporter: Liang-Chi Hsieh > > We have {{ExpressionDescription}} for SQL expressions. The expression > description tells what an expression's usage, arguments, and examples. Users > can understand how to use those expressions by {{DESCRIBE}} command in SQL: > {code} > spark-sql> DESCRIBE FUNCTION EXTENDED In; > Function: in > Class: org.apache.spark.sql.catalyst.expressions.In > Usage: expr1 in(expr2, expr3, ...) - Returns true if `expr` equals to any > valN. > Extended Usage: > No example/argument for in. > {code} > Not all SQL expressions have complete description now. For example, in the > above case, there is no example for function {{in}}. This task is going to > complement the expression description. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-13041) Add a driver history ui link and a mesos sandbox link on the dispatcher's ui page for each driver
[ https://issues.apache.org/jira/browse/SPARK-13041?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sean Owen reassigned SPARK-13041: - Assignee: Stavros Kontopoulos > Add a driver history ui link and a mesos sandbox link on the dispatcher's ui > page for each driver > - > > Key: SPARK-13041 > URL: https://issues.apache.org/jira/browse/SPARK-13041 > Project: Spark > Issue Type: Improvement > Components: Mesos >Reporter: Stavros Kontopoulos >Assignee: Stavros Kontopoulos >Priority: Minor > Fix For: 2.3.0 > > > It would be convenient to have the driver's history uri from the history > server and the driver's mesos sandbox uri on the dispatcher's ui. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21655) Kill CLI for Yarn mode
[ https://issues.apache.org/jira/browse/SPARK-21655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16116616#comment-16116616 ] Sean Owen commented on SPARK-21655: --- Why not just kill the driver via YARN? > Kill CLI for Yarn mode > -- > > Key: SPARK-21655 > URL: https://issues.apache.org/jira/browse/SPARK-21655 > Project: Spark > Issue Type: Improvement > Components: YARN >Affects Versions: 2.1.1 >Reporter: Jong Yoon Lee >Priority: Minor > Fix For: 2.1.1 > > Original Estimate: 168h > Remaining Estimate: 168h > > Similar to how standalone and Mesos have the capability to safely shut down > the spark application, there should be a way to safely shut down spark on > Yarn mode. This will ensure a clean shutdown and unregistration from yarn. > This is the design doc: > https://docs.google.com/document/d/1QG8hITjLNi1D9dVR3b_hZkyrGm5FFm0u9M1KGM4y1Ak/edit?usp=sharing > and I will upload the patch soon -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-21610) Corrupt records are not handled properly when creating a dataframe from a file
[ https://issues.apache.org/jira/browse/SPARK-21610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16116126#comment-16116126 ] Jen-Ming Chung edited comment on SPARK-21610 at 8/7/17 2:07 PM: I have created a pull request for this issue: [https://github.com/apache/spark/pull/18865] was (Author: cjm): User 'jmchung' has created a pull request for this issue: [https://github.com/apache/spark/pull/18865] > Corrupt records are not handled properly when creating a dataframe from a file > -- > > Key: SPARK-21610 > URL: https://issues.apache.org/jira/browse/SPARK-21610 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.2, 2.2.0 > Environment: macOs Sierra 10.12.5 >Reporter: dmtran > > Consider a jsonl file with 3 records. The third record has a value of type > string, instead of int. > {code} > echo '{"field": 1} > {"field": 2} > {"field": "3"}' >/tmp/sample.json > {code} > Create a dataframe from this file, with a schema that contains > "_corrupt_record" so that corrupt records are kept. > {code} > import org.apache.spark.sql.types._ > val schema = new StructType() > .add("field", ByteType) > .add("_corrupt_record", StringType) > val file = "/tmp/sample.json" > val dfFromFile = spark.read.schema(schema).json(file) > {code} > Run the following lines from a spark-shell: > {code} > scala> dfFromFile.show(false) > +-+---+ > |field|_corrupt_record| > +-+---+ > |1|null | > |2|null | > |null |{"field": "3"} | > +-+---+ > scala> dfFromFile.filter($"_corrupt_record".isNotNull).count() > res1: Long = 0 > scala> dfFromFile.filter($"_corrupt_record".isNull).count() > res2: Long = 3 > {code} > The expected result is 1 corrupt record and 2 valid records, but the actual > one is 0 corrupt record and 3 valid records. > The bug is not reproduced if we create a dataframe from a RDD: > {code} > scala> val rdd = sc.textFile(file) > rdd: org.apache.spark.rdd.RDD[String] = /tmp/sample.json MapPartitionsRDD[92] > at textFile at :28 > scala> val dfFromRdd = spark.read.schema(schema).json(rdd) > dfFromRdd: org.apache.spark.sql.DataFrame = [field: tinyint, _corrupt_record: > string] > scala> dfFromRdd.show(false) > +-+---+ > |field|_corrupt_record| > +-+---+ > |1|null | > |2|null | > |null |{"field": "3"} | > +-+---+ > scala> dfFromRdd.filter($"_corrupt_record".isNotNull).count() > res5: Long = 1 > scala> dfFromRdd.filter($"_corrupt_record".isNull).count() > res6: Long = 2 > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-21655) Kill CLI for Yarn mode
Jong Yoon Lee created SPARK-21655: - Summary: Kill CLI for Yarn mode Key: SPARK-21655 URL: https://issues.apache.org/jira/browse/SPARK-21655 Project: Spark Issue Type: Improvement Components: YARN Affects Versions: 2.1.1 Reporter: Jong Yoon Lee Priority: Minor Fix For: 2.1.1 Similar to how standalone and Mesos have the capability to safely shut down the spark application, there should be a way to safely shut down spark on Yarn mode. This will ensure a clean shutdown and unregistration from yarn. This is the design doc: https://docs.google.com/document/d/1QG8hITjLNi1D9dVR3b_hZkyrGm5FFm0u9M1KGM4y1Ak/edit?usp=sharing and I will upload the patch soon -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21650) Insert into hive partitioned table from spark-sql taking hours to complete
[ https://issues.apache.org/jira/browse/SPARK-21650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16116187#comment-16116187 ] Madhavi Vaddepalli commented on SPARK-21650: Thank you Sean Owen. -Madhavi. > Insert into hive partitioned table from spark-sql taking hours to complete > -- > > Key: SPARK-21650 > URL: https://issues.apache.org/jira/browse/SPARK-21650 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.0 > Environment: Linux machines > Spark version - 1.6.0 > Hive Version - 1.1 > 200- number of executors. > 3 - number of executor cores. > 10g - executor and driver memory. > dynamic allocation enabled. >Reporter: Madhavi Vaddepalli > > We are trying to execute some logic using spark sql: > Input to program : 7 billion records. (60 gb gzip compressed,text format) > Output : 7 billion records.(260 gb gzip compressed and partitioned on few > columns) > output has 1 partitions(it has 1 different combinations > of partition columns) > We are trying to insert this output to a hive table. (text format , gzip > compressed) > All the tasks spawned finished completely in 33 minutes and all the executors > are de-commissioned, only driver is active.*It remained in this state without > showing any active stage or task in spark UI for about 2.5 hrs. *and > completed successfully. > Please let us know what can be done to improve the performance here.(is it > fixed in later versions ?) -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21652) Optimizer cannot reach a fixed point on certain queries
[ https://issues.apache.org/jira/browse/SPARK-21652?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16116236#comment-16116236 ] Takeshi Yamamuro commented on SPARK-21652: -- It seems the known issue; have you tried `spark.sql.constraintPropagation.enabled`? > Optimizer cannot reach a fixed point on certain queries > --- > > Key: SPARK-21652 > URL: https://issues.apache.org/jira/browse/SPARK-21652 > Project: Spark > Issue Type: Bug > Components: Optimizer, SQL >Affects Versions: 2.2.0 >Reporter: Anton Okolnychyi > > The optimizer cannot reach a fixed point on the following query: > {code} > Seq((1, 2)).toDF("col1", "col2").write.saveAsTable("t1") > Seq(1, 2).toDF("col").write.saveAsTable("t2") > spark.sql("SELECT * FROM t1, t2 WHERE t1.col1 = 1 AND 1 = t1.col2 AND t1.col1 > = t2.col AND t1.col2 = t2.col").explain(true) > {code} > At some point during the optimization, InferFiltersFromConstraints infers a > new constraint '(col2#33 = col1#32)' that is appended to the join condition, > then PushPredicateThroughJoin pushes it down, ConstantPropagation replaces > '(col2#33 = col1#32)' with '1 = 1' based on other propagated constraints, > ConstantFolding replaces '1 = 1' with 'true and BooleanSimplification finally > removes this predicate. However, InferFiltersFromConstraints will again infer > '(col2#33 = col1#32)' on the next iteration and the process will continue > until the limit of iterations is reached. > See below for more details > {noformat} > === Applying Rule > org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints === > !Join Inner, ((col1#32 = col#34) && (col2#33 = col#34)) > Join Inner, ((col2#33 = col1#32) && ((col1#32 = col#34) && > (col2#33 = col#34))) > :- Filter ((isnotnull(col1#32) && isnotnull(col2#33)) && ((col1#32 = 1) && > (1 = col2#33))) :- Filter ((isnotnull(col1#32) && isnotnull(col2#33)) && > ((col1#32 = 1) && (1 = col2#33))) > : +- Relation[col1#32,col2#33] parquet > : +- Relation[col1#32,col2#33] parquet > +- Filter ((1 = col#34) && isnotnull(col#34)) > +- Filter ((1 = col#34) && isnotnull(col#34)) > +- Relation[col#34] parquet >+- Relation[col#34] parquet > > === Applying Rule > org.apache.spark.sql.catalyst.optimizer.PushPredicateThroughJoin === > !Join Inner, ((col2#33 = col1#32) && ((col1#32 = col#34) && (col2#33 = > col#34))) Join Inner, ((col1#32 = col#34) && (col2#33 = col#34)) > !:- Filter ((isnotnull(col1#32) && isnotnull(col2#33)) && ((col1#32 = 1) && > (1 = col2#33))) :- Filter (col2#33 = col1#32) > !: +- Relation[col1#32,col2#33] parquet > : +- Filter ((isnotnull(col1#32) && isnotnull(col2#33)) && > ((col1#32 = 1) && (1 = col2#33))) > !+- Filter ((1 = col#34) && isnotnull(col#34)) > : +- Relation[col1#32,col2#33] parquet > ! +- Relation[col#34] parquet > +- Filter ((1 = col#34) && isnotnull(col#34)) > ! >+- Relation[col#34] parquet > > === Applying Rule org.apache.spark.sql.catalyst.optimizer.CombineFilters === > Join Inner, ((col1#32 = col#34) && (col2#33 = col#34)) >Join Inner, ((col1#32 = col#34) && (col2#33 = col#34)) > !:- Filter (col2#33 = col1#32) >:- Filter (((isnotnull(col1#32) && isnotnull(col2#33)) && > ((col1#32 = 1) && (1 = col2#33))) && (col2#33 = col1#32)) > !: +- Filter ((isnotnull(col1#32) && isnotnull(col2#33)) && ((col1#32 = 1) > && (1 = col2#33))) : +- Relation[col1#32,col2#33] parquet > !: +- Relation[col1#32,col2#33] parquet >+- Filter ((1 = col#34) && isnotnull(col#34)) > !+- Filter ((1 = col#34) && isnotnull(col#34)) > +- Relation[col#34] parquet > ! +- Relation[col#34] parquet > > > === Applying Rule org.apache.spark.sql.catalyst.optimizer.ConstantPropagation > === > Join Inner, ((col1#32 = col#34) && (col2#33 = col#34)) > Join Inner, ((col1#32 = col#34) && > (col2#33 = col#34)) > !:- Filter (((isnotnull(col1#32) && isnotnull(col2#33)) && ((col1#32 = 1) && > (1 = col2#33))) && (col2#33 = col1#32)) :- Filter
[jira] [Commented] (SPARK-21649) Support writing data into hive bucket table.
[ https://issues.apache.org/jira/browse/SPARK-21649?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16116132#comment-16116132 ] jin xing commented on SPARK-21649: -- made a pr: https://github.com/apache/spark/pull/18866 > Support writing data into hive bucket table. > > > Key: SPARK-21649 > URL: https://issues.apache.org/jira/browse/SPARK-21649 > Project: Spark > Issue Type: New Feature > Components: SQL >Affects Versions: 2.2.0 >Reporter: jin xing > > Currently it is not supported to write hive bucket table. Spark internally > uses Murmur3Hash for partitioning. We can use hive hash for compatibility > when write to bucket table. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-21650) Insert into hive partitioned table from spark-sql taking hours to complete
Madhavi Vaddepalli created SPARK-21650: -- Summary: Insert into hive partitioned table from spark-sql taking hours to complete Key: SPARK-21650 URL: https://issues.apache.org/jira/browse/SPARK-21650 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 1.6.0 Environment: Linux machines Spark version - 1.6.0 Hive Version - 1.1 200- number of executors. 3 - number of executor cores. 10g - executor and driver memory. dynamic allocation enabled. Reporter: Madhavi Vaddepalli We are trying to execute some logic using spark sql: Input to program : 7 billion records. (60 gb gzip compressed,text format) Output : 7 billion records.(260 gb gzip compressed and partitioned on few columns) output has 1 partitions(it has 1 different combinations of partition columns) We are trying to insert this output to a hive table. (text format , gzip compressed) All the tasks spawned finished completely in 33 minutes and all the executors are de-commissioned, only driver is active.*It remained in this state without showing any active stage or task in spark UI for about 2.5 hrs. *and completed successfully. Please let us know what can be done to improve the performance here.(is it fixed in later versions ?) -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-21652) Optimizer cannot reach a fixed point on certain queries
Anton Okolnychyi created SPARK-21652: Summary: Optimizer cannot reach a fixed point on certain queries Key: SPARK-21652 URL: https://issues.apache.org/jira/browse/SPARK-21652 Project: Spark Issue Type: Bug Components: Optimizer, SQL Affects Versions: 2.2.0 Reporter: Anton Okolnychyi The optimizer cannot reach a fixed point on the following query: {code} Seq((1, 2)).toDF("col1", "col2").write.saveAsTable("t1") Seq(1, 2).toDF("col").write.saveAsTable("t2") spark.sql("SELECT * FROM t1, t2 WHERE t1.col1 = 1 AND 1 = t1.col2 AND t1.col1 = t2.col AND t1.col2 = t2.col").explain(true) {code} At some point during the optimization, InferFiltersFromConstraints infers a new constraint '(col2#33 = col1#32)' that is appended to the join condition, then PushPredicateThroughJoin pushes it down, ConstantPropagation replaces '(col2#33 = col1#32)' with '1 = 1' based on other propagated constraints, ConstantFolding replaces '1 = 1' with 'true and BooleanSimplification finally removes this predicate. However, InferFiltersFromConstraints will again infer '(col2#33 = col1#32)' on the next iteration and the process will continue until the limit of iterations is reached. See below for more details {noformat} === Applying Rule org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints === !Join Inner, ((col1#32 = col#34) && (col2#33 = col#34)) Join Inner, ((col2#33 = col1#32) && ((col1#32 = col#34) && (col2#33 = col#34))) :- Filter ((isnotnull(col1#32) && isnotnull(col2#33)) && ((col1#32 = 1) && (1 = col2#33))) :- Filter ((isnotnull(col1#32) && isnotnull(col2#33)) && ((col1#32 = 1) && (1 = col2#33))) : +- Relation[col1#32,col2#33] parquet : +- Relation[col1#32,col2#33] parquet +- Filter ((1 = col#34) && isnotnull(col#34)) +- Filter ((1 = col#34) && isnotnull(col#34)) +- Relation[col#34] parquet +- Relation[col#34] parquet === Applying Rule org.apache.spark.sql.catalyst.optimizer.PushPredicateThroughJoin === !Join Inner, ((col2#33 = col1#32) && ((col1#32 = col#34) && (col2#33 = col#34))) Join Inner, ((col1#32 = col#34) && (col2#33 = col#34)) !:- Filter ((isnotnull(col1#32) && isnotnull(col2#33)) && ((col1#32 = 1) && (1 = col2#33))) :- Filter (col2#33 = col1#32) !: +- Relation[col1#32,col2#33] parquet : +- Filter ((isnotnull(col1#32) && isnotnull(col2#33)) && ((col1#32 = 1) && (1 = col2#33))) !+- Filter ((1 = col#34) && isnotnull(col#34)) : +- Relation[col1#32,col2#33] parquet ! +- Relation[col#34] parquet +- Filter ((1 = col#34) && isnotnull(col#34)) ! +- Relation[col#34] parquet === Applying Rule org.apache.spark.sql.catalyst.optimizer.CombineFilters === Join Inner, ((col1#32 = col#34) && (col2#33 = col#34)) Join Inner, ((col1#32 = col#34) && (col2#33 = col#34)) !:- Filter (col2#33 = col1#32) :- Filter (((isnotnull(col1#32) && isnotnull(col2#33)) && ((col1#32 = 1) && (1 = col2#33))) && (col2#33 = col1#32)) !: +- Filter ((isnotnull(col1#32) && isnotnull(col2#33)) && ((col1#32 = 1) && (1 = col2#33))) : +- Relation[col1#32,col2#33] parquet !: +- Relation[col1#32,col2#33] parquet +- Filter ((1 = col#34) && isnotnull(col#34)) !+- Filter ((1 = col#34) && isnotnull(col#34)) +- Relation[col#34] parquet ! +- Relation[col#34] parquet === Applying Rule org.apache.spark.sql.catalyst.optimizer.ConstantPropagation === Join Inner, ((col1#32 = col#34) && (col2#33 = col#34)) Join Inner, ((col1#32 = col#34) && (col2#33 = col#34)) !:- Filter (((isnotnull(col1#32) && isnotnull(col2#33)) && ((col1#32 = 1) && (1 = col2#33))) && (col2#33 = col1#32)) :- Filter (((isnotnull(col1#32) && isnotnull(col2#33)) && ((col1#32 = 1) && (1 = col2#33))) && (1 = 1)) : +- Relation[col1#32,col2#33] parquet : +- Relation[col1#32,col2#33] parquet +- Filter ((1 = col#34) && isnotnull(col#34)) +- Filter ((1 = col#34) && isnotnull(col#34)) +-
[jira] [Updated] (SPARK-21653) Complement SQL expression document
[ https://issues.apache.org/jira/browse/SPARK-21653?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Liang-Chi Hsieh updated SPARK-21653: Issue Type: Umbrella (was: Improvement) > Complement SQL expression document > -- > > Key: SPARK-21653 > URL: https://issues.apache.org/jira/browse/SPARK-21653 > Project: Spark > Issue Type: Umbrella > Components: SQL >Affects Versions: 2.2.0 >Reporter: Liang-Chi Hsieh > > The SQL expression description is not complete now. We should complement the > expression doc. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-21653) Complement SQL expression document
Liang-Chi Hsieh created SPARK-21653: --- Summary: Complement SQL expression document Key: SPARK-21653 URL: https://issues.apache.org/jira/browse/SPARK-21653 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 2.2.0 Reporter: Liang-Chi Hsieh The SQL expression description is not complete now. We should complement the expression doc. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21653) Complement SQL expression document
[ https://issues.apache.org/jira/browse/SPARK-21653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16116232#comment-16116232 ] Liang-Chi Hsieh commented on SPARK-21653: - We have {{ExpressionDescription}} for SQL expressions. Not all SQL expressions have complete description now. This task is going to complement the expression description. > Complement SQL expression document > -- > > Key: SPARK-21653 > URL: https://issues.apache.org/jira/browse/SPARK-21653 > Project: Spark > Issue Type: Umbrella > Components: SQL >Affects Versions: 2.2.0 >Reporter: Liang-Chi Hsieh > > The SQL expression description is not complete now. We should complement the > expression doc. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21653) Complement SQL expression document
[ https://issues.apache.org/jira/browse/SPARK-21653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16116241#comment-16116241 ] Liang-Chi Hsieh commented on SPARK-21653: - [~sowen] I made a detailed description now for this. Is it clear now for you? > Complement SQL expression document > -- > > Key: SPARK-21653 > URL: https://issues.apache.org/jira/browse/SPARK-21653 > Project: Spark > Issue Type: Umbrella > Components: SQL >Affects Versions: 2.2.0 >Reporter: Liang-Chi Hsieh > > We have {{ExpressionDescription}} for SQL expressions. The expression > description tells what an expression's usage, arguments, and examples. Users > can understand how to use those expressions by {{DESCRIBE}} command in SQL: > {code} > spark-sql> DESCRIBE FUNCTION EXTENDED In; > Function: in > Class: org.apache.spark.sql.catalyst.expressions.In > Usage: expr1 in(expr2, expr3, ...) - Returns true if `expr` equals to any > valN. > Extended Usage: > No example/argument for in. > {code} > Not all SQL expressions have complete description now. For example, in the > above case, there is no example for function {{in}}. This task is going to > complement the expression description. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-21653) Complement SQL expression document
[ https://issues.apache.org/jira/browse/SPARK-21653?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Liang-Chi Hsieh updated SPARK-21653: Description: We have {{ExpressionDescription}} for SQL expressions. The expression description tells what an expression's usage, arguments, and examples. Users can understand how to use those expressions by {{DESCRIBE}} command in SQL: {code} spark-sql> DESCRIBE FUNCTION EXTENDED In; Function: in Class: org.apache.spark.sql.catalyst.expressions.In Usage: expr1 in(expr2, expr3, ...) - Returns true if `expr` equals to any valN. Extended Usage: No example/argument for in. {code} Not all SQL expressions have complete description now. For example, in the above case, there is no example for function {{in}}. This task is going to complement the expression description. was:The SQL expression description is not complete now. We should complement the expression doc. > Complement SQL expression document > -- > > Key: SPARK-21653 > URL: https://issues.apache.org/jira/browse/SPARK-21653 > Project: Spark > Issue Type: Umbrella > Components: SQL >Affects Versions: 2.2.0 >Reporter: Liang-Chi Hsieh > > We have {{ExpressionDescription}} for SQL expressions. The expression > description tells what an expression's usage, arguments, and examples. Users > can understand how to use those expressions by {{DESCRIBE}} command in SQL: > {code} > spark-sql> DESCRIBE FUNCTION EXTENDED In; > Function: in > Class: org.apache.spark.sql.catalyst.expressions.In > Usage: expr1 in(expr2, expr3, ...) - Returns true if `expr` equals to any > valN. > Extended Usage: > No example/argument for in. > {code} > Not all SQL expressions have complete description now. For example, in the > above case, there is no example for function {{in}}. This task is going to > complement the expression description. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21652) Optimizer cannot reach a fixed point on certain queries
[ https://issues.apache.org/jira/browse/SPARK-21652?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16116192#comment-16116192 ] Anton Okolnychyi commented on SPARK-21652: -- One option to fix this is NOT to apply ConstantPropagation to such predicates as '(col1 = col2)' if both sides can be replaced with a constant value. > Optimizer cannot reach a fixed point on certain queries > --- > > Key: SPARK-21652 > URL: https://issues.apache.org/jira/browse/SPARK-21652 > Project: Spark > Issue Type: Bug > Components: Optimizer, SQL >Affects Versions: 2.2.0 >Reporter: Anton Okolnychyi > > The optimizer cannot reach a fixed point on the following query: > {code} > Seq((1, 2)).toDF("col1", "col2").write.saveAsTable("t1") > Seq(1, 2).toDF("col").write.saveAsTable("t2") > spark.sql("SELECT * FROM t1, t2 WHERE t1.col1 = 1 AND 1 = t1.col2 AND t1.col1 > = t2.col AND t1.col2 = t2.col").explain(true) > {code} > At some point during the optimization, InferFiltersFromConstraints infers a > new constraint '(col2#33 = col1#32)' that is appended to the join condition, > then PushPredicateThroughJoin pushes it down, ConstantPropagation replaces > '(col2#33 = col1#32)' with '1 = 1' based on other propagated constraints, > ConstantFolding replaces '1 = 1' with 'true and BooleanSimplification finally > removes this predicate. However, InferFiltersFromConstraints will again infer > '(col2#33 = col1#32)' on the next iteration and the process will continue > until the limit of iterations is reached. > See below for more details > {noformat} > === Applying Rule > org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints === > !Join Inner, ((col1#32 = col#34) && (col2#33 = col#34)) > Join Inner, ((col2#33 = col1#32) && ((col1#32 = col#34) && > (col2#33 = col#34))) > :- Filter ((isnotnull(col1#32) && isnotnull(col2#33)) && ((col1#32 = 1) && > (1 = col2#33))) :- Filter ((isnotnull(col1#32) && isnotnull(col2#33)) && > ((col1#32 = 1) && (1 = col2#33))) > : +- Relation[col1#32,col2#33] parquet > : +- Relation[col1#32,col2#33] parquet > +- Filter ((1 = col#34) && isnotnull(col#34)) > +- Filter ((1 = col#34) && isnotnull(col#34)) > +- Relation[col#34] parquet >+- Relation[col#34] parquet > > === Applying Rule > org.apache.spark.sql.catalyst.optimizer.PushPredicateThroughJoin === > !Join Inner, ((col2#33 = col1#32) && ((col1#32 = col#34) && (col2#33 = > col#34))) Join Inner, ((col1#32 = col#34) && (col2#33 = col#34)) > !:- Filter ((isnotnull(col1#32) && isnotnull(col2#33)) && ((col1#32 = 1) && > (1 = col2#33))) :- Filter (col2#33 = col1#32) > !: +- Relation[col1#32,col2#33] parquet > : +- Filter ((isnotnull(col1#32) && isnotnull(col2#33)) && > ((col1#32 = 1) && (1 = col2#33))) > !+- Filter ((1 = col#34) && isnotnull(col#34)) > : +- Relation[col1#32,col2#33] parquet > ! +- Relation[col#34] parquet > +- Filter ((1 = col#34) && isnotnull(col#34)) > ! >+- Relation[col#34] parquet > > === Applying Rule org.apache.spark.sql.catalyst.optimizer.CombineFilters === > Join Inner, ((col1#32 = col#34) && (col2#33 = col#34)) >Join Inner, ((col1#32 = col#34) && (col2#33 = col#34)) > !:- Filter (col2#33 = col1#32) >:- Filter (((isnotnull(col1#32) && isnotnull(col2#33)) && > ((col1#32 = 1) && (1 = col2#33))) && (col2#33 = col1#32)) > !: +- Filter ((isnotnull(col1#32) && isnotnull(col2#33)) && ((col1#32 = 1) > && (1 = col2#33))) : +- Relation[col1#32,col2#33] parquet > !: +- Relation[col1#32,col2#33] parquet >+- Filter ((1 = col#34) && isnotnull(col#34)) > !+- Filter ((1 = col#34) && isnotnull(col#34)) > +- Relation[col#34] parquet > ! +- Relation[col#34] parquet > > > === Applying Rule org.apache.spark.sql.catalyst.optimizer.ConstantPropagation > === > Join Inner, ((col1#32 = col#34) && (col2#33 = col#34)) > Join Inner, ((col1#32 = col#34) && > (col2#33 = col#34)) > !:- Filter (((isnotnull(col1#32) && isnotnull(col2#33)) &&
[jira] [Commented] (SPARK-21653) Complement SQL expression document
[ https://issues.apache.org/jira/browse/SPARK-21653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16116224#comment-16116224 ] Sean Owen commented on SPARK-21653: --- Before continuing, can you please describe what this means? > Complement SQL expression document > -- > > Key: SPARK-21653 > URL: https://issues.apache.org/jira/browse/SPARK-21653 > Project: Spark > Issue Type: Umbrella > Components: SQL >Affects Versions: 2.2.0 >Reporter: Liang-Chi Hsieh > > The SQL expression description is not complete now. We should complement the > expression doc. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-21610) Corrupt records are not handled properly when creating a dataframe from a file
[ https://issues.apache.org/jira/browse/SPARK-21610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16116126#comment-16116126 ] Jen-Ming Chung edited comment on SPARK-21610 at 8/7/17 6:33 AM: User 'jmchung' has created a pull request for this issue: [https://github.com/apache/spark/pull/18865] was (Author: cjm): User 'jmchung' has created a pull request for this issue: https://github.com/apache/spark/pull/18865[https://github.com/apache/spark/pull/18865] > Corrupt records are not handled properly when creating a dataframe from a file > -- > > Key: SPARK-21610 > URL: https://issues.apache.org/jira/browse/SPARK-21610 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.2, 2.2.0 > Environment: macOs Sierra 10.12.5 >Reporter: dmtran > > Consider a jsonl file with 3 records. The third record has a value of type > string, instead of int. > {code} > echo '{"field": 1} > {"field": 2} > {"field": "3"}' >/tmp/sample.json > {code} > Create a dataframe from this file, with a schema that contains > "_corrupt_record" so that corrupt records are kept. > {code} > import org.apache.spark.sql.types._ > val schema = new StructType() > .add("field", ByteType) > .add("_corrupt_record", StringType) > val file = "/tmp/sample.json" > val dfFromFile = spark.read.schema(schema).json(file) > {code} > Run the following lines from a spark-shell: > {code} > scala> dfFromFile.show(false) > +-+---+ > |field|_corrupt_record| > +-+---+ > |1|null | > |2|null | > |null |{"field": "3"} | > +-+---+ > scala> dfFromFile.filter($"_corrupt_record".isNotNull).count() > res1: Long = 0 > scala> dfFromFile.filter($"_corrupt_record".isNull).count() > res2: Long = 3 > {code} > The expected result is 1 corrupt record and 2 valid records, but the actual > one is 0 corrupt record and 3 valid records. > The bug is not reproduced if we create a dataframe from a RDD: > {code} > scala> val rdd = sc.textFile(file) > rdd: org.apache.spark.rdd.RDD[String] = /tmp/sample.json MapPartitionsRDD[92] > at textFile at :28 > scala> val dfFromRdd = spark.read.schema(schema).json(rdd) > dfFromRdd: org.apache.spark.sql.DataFrame = [field: tinyint, _corrupt_record: > string] > scala> dfFromRdd.show(false) > +-+---+ > |field|_corrupt_record| > +-+---+ > |1|null | > |2|null | > |null |{"field": "3"} | > +-+---+ > scala> dfFromRdd.filter($"_corrupt_record".isNotNull).count() > res5: Long = 1 > scala> dfFromRdd.filter($"_corrupt_record".isNull).count() > res6: Long = 2 > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-21651) Detect MapType in Json InferSchema
Jochen Niebuhr created SPARK-21651: -- Summary: Detect MapType in Json InferSchema Key: SPARK-21651 URL: https://issues.apache.org/jira/browse/SPARK-21651 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 2.2.0, 2.1.1, 2.1.0 Reporter: Jochen Niebuhr Priority: Minor When loading Json Files which include a map with very variable keys, the current schema infer logic might create a very large schema. This will lead to long load times and possibly out of memory errors. I've already submitted a pull request to the mongo spark driver which had a similar problem. Should I port this logic over to the json schema infer class? The MongoDB Spark pull request mentioned is: https://github.com/mongodb/mongo-spark/pull/24 -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-21651) Detect MapType in Json InferSchema
[ https://issues.apache.org/jira/browse/SPARK-21651?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jochen Niebuhr updated SPARK-21651: --- Description: When loading Json Files which include a map with very variable keys, the current schema infer logic might create a very large schema. This will lead to long load times and possibly out of memory errors. I've already submitted a pull request to the mongo spark driver which had the same problem. Should I port this logic over to the json schema infer class? The MongoDB Spark pull request mentioned is: https://github.com/mongodb/mongo-spark/pull/24 was: When loading Json Files which include a map with very variable keys, the current schema infer logic might create a very large schema. This will lead to long load times and possibly out of memory errors. I've already submitted a pull request to the mongo spark driver which had a similar problem. Should I port this logic over to the json schema infer class? The MongoDB Spark pull request mentioned is: https://github.com/mongodb/mongo-spark/pull/24 > Detect MapType in Json InferSchema > -- > > Key: SPARK-21651 > URL: https://issues.apache.org/jira/browse/SPARK-21651 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.1.0, 2.1.1, 2.2.0 >Reporter: Jochen Niebuhr >Priority: Minor > > When loading Json Files which include a map with very variable keys, the > current schema infer logic might create a very large schema. This will lead > to long load times and possibly out of memory errors. > I've already submitted a pull request to the mongo spark driver which had the > same problem. Should I port this logic over to the json schema infer class? > The MongoDB Spark pull request mentioned is: > https://github.com/mongodb/mongo-spark/pull/24 -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-21654) Complement predicates expression description
Liang-Chi Hsieh created SPARK-21654: --- Summary: Complement predicates expression description Key: SPARK-21654 URL: https://issues.apache.org/jira/browse/SPARK-21654 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 2.2.0 Reporter: Liang-Chi Hsieh Complement expression description for SQL predicates. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-21621) Reset numRecordsWritten after DiskBlockObjectWriter.commitAndGet called
[ https://issues.apache.org/jira/browse/SPARK-21621?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenchen Fan reassigned SPARK-21621: --- Assignee: Xianyang Liu > Reset numRecordsWritten after DiskBlockObjectWriter.commitAndGet called > --- > > Key: SPARK-21621 > URL: https://issues.apache.org/jira/browse/SPARK-21621 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.2.0 >Reporter: Xianyang Liu >Assignee: Xianyang Liu > Fix For: 2.2.1, 2.3.0 > > > We should reset numRecordsWritten to zero after > DiskBlockObjectWriter.commitAndGet called. > Because when `revertPartialWritesAndClose` be called, we decrease the > written records in `ShuffleWriteMetrics` . However, we decreased the written > records to zero, this should be wrong, we should only decreased the number > reords after the last `commitAndGet` called. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-21621) Reset numRecordsWritten after DiskBlockObjectWriter.commitAndGet called
[ https://issues.apache.org/jira/browse/SPARK-21621?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenchen Fan resolved SPARK-21621. - Resolution: Fixed Fix Version/s: 2.3.0 2.2.1 Issue resolved by pull request 18830 [https://github.com/apache/spark/pull/18830] > Reset numRecordsWritten after DiskBlockObjectWriter.commitAndGet called > --- > > Key: SPARK-21621 > URL: https://issues.apache.org/jira/browse/SPARK-21621 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.2.0 >Reporter: Xianyang Liu > Fix For: 2.2.1, 2.3.0 > > > We should reset numRecordsWritten to zero after > DiskBlockObjectWriter.commitAndGet called. > Because when `revertPartialWritesAndClose` be called, we decrease the > written records in `ShuffleWriteMetrics` . However, we decreased the written > records to zero, this should be wrong, we should only decreased the number > reords after the last `commitAndGet` called. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-13041) Add a driver history ui link and a mesos sandbox link on the dispatcher's ui page for each driver
[ https://issues.apache.org/jira/browse/SPARK-13041?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sean Owen resolved SPARK-13041. --- Resolution: Fixed Fix Version/s: 2.3.0 Issue resolved by pull request 18528 [https://github.com/apache/spark/pull/18528] > Add a driver history ui link and a mesos sandbox link on the dispatcher's ui > page for each driver > - > > Key: SPARK-13041 > URL: https://issues.apache.org/jira/browse/SPARK-13041 > Project: Spark > Issue Type: Improvement > Components: Mesos >Reporter: Stavros Kontopoulos >Priority: Minor > Fix For: 2.3.0 > > > It would be convenient to have the driver's history uri from the history > server and the driver's mesos sandbox uri on the dispatcher's ui. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21653) Complement SQL expression document
[ https://issues.apache.org/jira/browse/SPARK-21653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16116324#comment-16116324 ] Hyukjin Kwon commented on SPARK-21653: -- Yes, there was some discussion for adding arguments in my PR. I am still fond of describing arguments as long as they look mostly correct in general and the examples produce the expected results because these are some information that did not exist before and we now generate documentation for SQL builtin functions. I am willing to push this if there are no strong objections now. > Complement SQL expression document > -- > > Key: SPARK-21653 > URL: https://issues.apache.org/jira/browse/SPARK-21653 > Project: Spark > Issue Type: Umbrella > Components: SQL >Affects Versions: 2.2.0 >Reporter: Liang-Chi Hsieh > > We have {{ExpressionDescription}} for SQL expressions. The expression > description tells what an expression's usage, arguments, and examples. Users > can understand how to use those expressions by {{DESCRIBE}} command in SQL: > {code} > spark-sql> DESCRIBE FUNCTION EXTENDED In; > Function: in > Class: org.apache.spark.sql.catalyst.expressions.In > Usage: expr1 in(expr2, expr3, ...) - Returns true if `expr` equals to any > valN. > Extended Usage: > No example/argument for in. > {code} > Not all SQL expressions have complete description now. For example, in the > above case, there is no example for function {{in}}. This task is going to > complement the expression description. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21610) Corrupt records are not handled properly when creating a dataframe from a file
[ https://issues.apache.org/jira/browse/SPARK-21610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16116126#comment-16116126 ] Jen-Ming Chung commented on SPARK-21610: User 'jmchung' has created a pull request for this issue: https://github.com/apache/spark/pull/18865 [https://github.com/apache/spark/pull/18865] > Corrupt records are not handled properly when creating a dataframe from a file > -- > > Key: SPARK-21610 > URL: https://issues.apache.org/jira/browse/SPARK-21610 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.2, 2.2.0 > Environment: macOs Sierra 10.12.5 >Reporter: dmtran > > Consider a jsonl file with 3 records. The third record has a value of type > string, instead of int. > {code} > echo '{"field": 1} > {"field": 2} > {"field": "3"}' >/tmp/sample.json > {code} > Create a dataframe from this file, with a schema that contains > "_corrupt_record" so that corrupt records are kept. > {code} > import org.apache.spark.sql.types._ > val schema = new StructType() > .add("field", ByteType) > .add("_corrupt_record", StringType) > val file = "/tmp/sample.json" > val dfFromFile = spark.read.schema(schema).json(file) > {code} > Run the following lines from a spark-shell: > {code} > scala> dfFromFile.show(false) > +-+---+ > |field|_corrupt_record| > +-+---+ > |1|null | > |2|null | > |null |{"field": "3"} | > +-+---+ > scala> dfFromFile.filter($"_corrupt_record".isNotNull).count() > res1: Long = 0 > scala> dfFromFile.filter($"_corrupt_record".isNull).count() > res2: Long = 3 > {code} > The expected result is 1 corrupt record and 2 valid records, but the actual > one is 0 corrupt record and 3 valid records. > The bug is not reproduced if we create a dataframe from a RDD: > {code} > scala> val rdd = sc.textFile(file) > rdd: org.apache.spark.rdd.RDD[String] = /tmp/sample.json MapPartitionsRDD[92] > at textFile at :28 > scala> val dfFromRdd = spark.read.schema(schema).json(rdd) > dfFromRdd: org.apache.spark.sql.DataFrame = [field: tinyint, _corrupt_record: > string] > scala> dfFromRdd.show(false) > +-+---+ > |field|_corrupt_record| > +-+---+ > |1|null | > |2|null | > |null |{"field": "3"} | > +-+---+ > scala> dfFromRdd.filter($"_corrupt_record".isNotNull).count() > res5: Long = 1 > scala> dfFromRdd.filter($"_corrupt_record".isNull).count() > res6: Long = 2 > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-21610) Corrupt records are not handled properly when creating a dataframe from a file
[ https://issues.apache.org/jira/browse/SPARK-21610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16116126#comment-16116126 ] Jen-Ming Chung edited comment on SPARK-21610 at 8/7/17 6:33 AM: User 'jmchung' has created a pull request for this issue: https://github.com/apache/spark/pull/18865[https://github.com/apache/spark/pull/18865] was (Author: cjm): User 'jmchung' has created a pull request for this issue: https://github.com/apache/spark/pull/18865 [https://github.com/apache/spark/pull/18865] > Corrupt records are not handled properly when creating a dataframe from a file > -- > > Key: SPARK-21610 > URL: https://issues.apache.org/jira/browse/SPARK-21610 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.2, 2.2.0 > Environment: macOs Sierra 10.12.5 >Reporter: dmtran > > Consider a jsonl file with 3 records. The third record has a value of type > string, instead of int. > {code} > echo '{"field": 1} > {"field": 2} > {"field": "3"}' >/tmp/sample.json > {code} > Create a dataframe from this file, with a schema that contains > "_corrupt_record" so that corrupt records are kept. > {code} > import org.apache.spark.sql.types._ > val schema = new StructType() > .add("field", ByteType) > .add("_corrupt_record", StringType) > val file = "/tmp/sample.json" > val dfFromFile = spark.read.schema(schema).json(file) > {code} > Run the following lines from a spark-shell: > {code} > scala> dfFromFile.show(false) > +-+---+ > |field|_corrupt_record| > +-+---+ > |1|null | > |2|null | > |null |{"field": "3"} | > +-+---+ > scala> dfFromFile.filter($"_corrupt_record".isNotNull).count() > res1: Long = 0 > scala> dfFromFile.filter($"_corrupt_record".isNull).count() > res2: Long = 3 > {code} > The expected result is 1 corrupt record and 2 valid records, but the actual > one is 0 corrupt record and 3 valid records. > The bug is not reproduced if we create a dataframe from a RDD: > {code} > scala> val rdd = sc.textFile(file) > rdd: org.apache.spark.rdd.RDD[String] = /tmp/sample.json MapPartitionsRDD[92] > at textFile at :28 > scala> val dfFromRdd = spark.read.schema(schema).json(rdd) > dfFromRdd: org.apache.spark.sql.DataFrame = [field: tinyint, _corrupt_record: > string] > scala> dfFromRdd.show(false) > +-+---+ > |field|_corrupt_record| > +-+---+ > |1|null | > |2|null | > |null |{"field": "3"} | > +-+---+ > scala> dfFromRdd.filter($"_corrupt_record".isNotNull).count() > res5: Long = 1 > scala> dfFromRdd.filter($"_corrupt_record".isNull).count() > res6: Long = 2 > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-21650) Insert into hive partitioned table from spark-sql taking hours to complete
[ https://issues.apache.org/jira/browse/SPARK-21650?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sean Owen resolved SPARK-21650. --- Resolution: Invalid This isn't a place to ask questions about troubleshooting your app and there's no suggestion that this runtime is excessively slow.. > Insert into hive partitioned table from spark-sql taking hours to complete > -- > > Key: SPARK-21650 > URL: https://issues.apache.org/jira/browse/SPARK-21650 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.0 > Environment: Linux machines > Spark version - 1.6.0 > Hive Version - 1.1 > 200- number of executors. > 3 - number of executor cores. > 10g - executor and driver memory. > dynamic allocation enabled. >Reporter: Madhavi Vaddepalli > > We are trying to execute some logic using spark sql: > Input to program : 7 billion records. (60 gb gzip compressed,text format) > Output : 7 billion records.(260 gb gzip compressed and partitioned on few > columns) > output has 1 partitions(it has 1 different combinations > of partition columns) > We are trying to insert this output to a hive table. (text format , gzip > compressed) > All the tasks spawned finished completely in 33 minutes and all the executors > are de-commissioned, only driver is active.*It remained in this state without > showing any active stage or task in spark UI for about 2.5 hrs. *and > completed successfully. > Please let us know what can be done to improve the performance here.(is it > fixed in later versions ?) -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-21638) Warning message of RF is not accurate
[ https://issues.apache.org/jira/browse/SPARK-21638?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Peng Meng updated SPARK-21638: -- Description: When train RF model, there is many warning message like this: {quote}WARN RandomForest: Tree learning is using approximately 268492800 bytes per iteration, which exceeds requested limit maxMemoryUsage=268435456. This allows splitting 2622 nodes in this iteration.{quote} This warning message is unnecessary and the data is not accurate. Actually, if all the nodes cannot split in one iteration, it will show this warning. For most of the case, all the nodes cannot split just in one iteration, so for most of the case, it will show this warning for each iteration. This is because: {code:java} while (nodeStack.nonEmpty && (memUsage < maxMemoryUsage || memUsage == 0)) { val (treeIndex, node) = nodeStack.top // Choose subset of features for node (if subsampling). val featureSubset: Option[Array[Int]] = if (metadata.subsamplingFeatures) { Some(SamplingUtils.reservoirSampleAndCount(Range(0, metadata.numFeatures).iterator, metadata.numFeaturesPerNode, rng.nextLong())._1) } else { None } // Check if enough memory remains to add this node to the group. val nodeMemUsage = RandomForest.aggregateSizeForNode(metadata, featureSubset) * 8L if (memUsage + nodeMemUsage <= maxMemoryUsage || memUsage == 0) { nodeStack.pop() mutableNodesForGroup.getOrElseUpdate(treeIndex, new mutable.ArrayBuffer[LearningNode]()) += node mutableTreeToNodeToIndexInfo .getOrElseUpdate(treeIndex, new mutable.HashMap[Int, NodeIndexInfo]())(node.id) = new NodeIndexInfo(numNodesInGroup, featureSubset) } numNodesInGroup += 1 //we not add the node to mutableNodesForGroup, but we add memUsage here. memUsage += nodeMemUsage } if (memUsage > maxMemoryUsage) { // If maxMemoryUsage is 0, we should still allow splitting 1 node. logWarning(s"Tree learning is using approximately $memUsage bytes per iteration, which" + s" exceeds requested limit maxMemoryUsage=$maxMemoryUsage. This allows splitting" + s" $numNodesInGroup nodes in this iteration.") } {code} was: When train RF model, there is many warning message like this: {quote}WARN RandomForest: Tree learning is using approximately 268492800 bytes per iteration, which exceeds requested limit maxMemoryUsage=268435456. This allows splitting 2622 nodes in this iteration.{quote} This warning message is unnecessary and the data is not accurate. This is because {code:java} while (nodeStack.nonEmpty && (memUsage < maxMemoryUsage || memUsage == 0)) { val (treeIndex, node) = nodeStack.top // Choose subset of features for node (if subsampling). val featureSubset: Option[Array[Int]] = if (metadata.subsamplingFeatures) { Some(SamplingUtils.reservoirSampleAndCount(Range(0, metadata.numFeatures).iterator, metadata.numFeaturesPerNode, rng.nextLong())._1) } else { None } // Check if enough memory remains to add this node to the group. val nodeMemUsage = RandomForest.aggregateSizeForNode(metadata, featureSubset) * 8L if (memUsage + nodeMemUsage <= maxMemoryUsage || memUsage == 0) { nodeStack.pop() mutableNodesForGroup.getOrElseUpdate(treeIndex, new mutable.ArrayBuffer[LearningNode]()) += node mutableTreeToNodeToIndexInfo .getOrElseUpdate(treeIndex, new mutable.HashMap[Int, NodeIndexInfo]())(node.id) = new NodeIndexInfo(numNodesInGroup, featureSubset) } numNodesInGroup += 1 *//we not add the node to mutableNodesForGroup, but we add memUsage here.* memUsage += nodeMemUsage } if (memUsage > maxMemoryUsage) { // If maxMemoryUsage is 0, we should still allow splitting 1 node. logWarning(s"Tree learning is using approximately $memUsage bytes per iteration, which" + s" exceeds requested limit maxMemoryUsage=$maxMemoryUsage. This allows splitting" + s" $numNodesInGroup nodes in this iteration.") } {code} To avoid this unnecessary warning, we should change the code like this: {code:java} while (nodeStack.nonEmpty) { val (treeIndex, node) = nodeStack.top // Choose subset of features for node (if subsampling). val featureSubset: Option[Array[Int]] = if (metadata.subsamplingFeatures) { Some(SamplingUtils.reservoirSampleAndCount(Range(0, metadata.numFeatures).iterator, metadata.numFeaturesPerNode, rng.nextLong())._1) } else { None } // Check if enough memory remains to add this node to the group. val nodeMemUsage = RandomForest.aggregateSizeForNode(metadata, featureSubset) * 8L if (memUsage + nodeMemUsage <= maxMemoryUsage || memUsage == 0) {
[jira] [Updated] (SPARK-21654) Complement predicates expression description
[ https://issues.apache.org/jira/browse/SPARK-21654?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Liang-Chi Hsieh updated SPARK-21654: Issue Type: Sub-task (was: Improvement) Parent: SPARK-21653 > Complement predicates expression description > > > Key: SPARK-21654 > URL: https://issues.apache.org/jira/browse/SPARK-21654 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.2.0 >Reporter: Liang-Chi Hsieh > > Complement expression description for SQL predicates. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21653) Complement SQL expression document
[ https://issues.apache.org/jira/browse/SPARK-21653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16116275#comment-16116275 ] Hyukjin Kwon commented on SPARK-21653: -- BTW, sounds including SPARK-18411 and looks related with SPARK-17963. > Complement SQL expression document > -- > > Key: SPARK-21653 > URL: https://issues.apache.org/jira/browse/SPARK-21653 > Project: Spark > Issue Type: Umbrella > Components: SQL >Affects Versions: 2.2.0 >Reporter: Liang-Chi Hsieh > > We have {{ExpressionDescription}} for SQL expressions. The expression > description tells what an expression's usage, arguments, and examples. Users > can understand how to use those expressions by {{DESCRIBE}} command in SQL: > {code} > spark-sql> DESCRIBE FUNCTION EXTENDED In; > Function: in > Class: org.apache.spark.sql.catalyst.expressions.In > Usage: expr1 in(expr2, expr3, ...) - Returns true if `expr` equals to any > valN. > Extended Usage: > No example/argument for in. > {code} > Not all SQL expressions have complete description now. For example, in the > above case, there is no example for function {{in}}. This task is going to > complement the expression description. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-21623) Comments of parentStats on ml/tree/impl/DTStatsAggregator.scala is wrong
[ https://issues.apache.org/jira/browse/SPARK-21623?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sean Owen resolved SPARK-21623. --- Resolution: Fixed Fix Version/s: 2.3.0 Issue resolved by pull request 18832 [https://github.com/apache/spark/pull/18832] > Comments of parentStats on ml/tree/impl/DTStatsAggregator.scala is wrong > > > Key: SPARK-21623 > URL: https://issues.apache.org/jira/browse/SPARK-21623 > Project: Spark > Issue Type: Improvement > Components: ML >Affects Versions: 2.3.0 >Reporter: Peng Meng >Priority: Trivial > Fix For: 2.3.0 > > > {code:java} >* Note: this is necessary because stats for the parent node are not > available >* on the first iteration of tree learning. >*/ > private val parentStats: Array[Double] = new Array[Double](statsSize) > {code} > This comment is not right. Actually, parentStats is not only used for the > first iteration. It is used with all the iteration for unordered featrues. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-21623) Comments of parentStats on ml/tree/impl/DTStatsAggregator.scala is wrong
[ https://issues.apache.org/jira/browse/SPARK-21623?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sean Owen reassigned SPARK-21623: - Assignee: Peng Meng > Comments of parentStats on ml/tree/impl/DTStatsAggregator.scala is wrong > > > Key: SPARK-21623 > URL: https://issues.apache.org/jira/browse/SPARK-21623 > Project: Spark > Issue Type: Improvement > Components: ML >Affects Versions: 2.3.0 >Reporter: Peng Meng >Assignee: Peng Meng >Priority: Trivial > Fix For: 2.3.0 > > > {code:java} >* Note: this is necessary because stats for the parent node are not > available >* on the first iteration of tree learning. >*/ > private val parentStats: Array[Double] = new Array[Double](statsSize) > {code} > This comment is not right. Actually, parentStats is not only used for the > first iteration. It is used with all the iteration for unordered featrues. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-21647) SortMergeJoin failed when using CROSS
[ https://issues.apache.org/jira/browse/SPARK-21647?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenchen Fan resolved SPARK-21647. - Resolution: Fixed Assignee: Xiao Li Fix Version/s: 2.3.0 2.2.1 > SortMergeJoin failed when using CROSS > - > > Key: SPARK-21647 > URL: https://issues.apache.org/jira/browse/SPARK-21647 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0 >Reporter: Xiao Li >Assignee: Xiao Li > Fix For: 2.2.1, 2.3.0 > > > {noformat} > val df = Seq((1, 1)).toDF("i", "j") > df.createOrReplaceTempView("T") > withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { > sql("select * from (select a.i from T a cross join T t where t.i = a.i) as > t1 " + > "cross join T t2 where t2.i = t1.i").explain(true) > } > {noformat} > The above code could cause the following exception: > {noformat} > SortMergeJoinExec should not take Cross as the JoinType > java.lang.IllegalArgumentException: SortMergeJoinExec should not take Cross > as the JoinType > at > org.apache.spark.sql.execution.joins.SortMergeJoinExec.outputOrdering(SortMergeJoinExec.scala:100) > {noformat} -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21655) Kill CLI for Yarn mode
[ https://issues.apache.org/jira/browse/SPARK-21655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16116859#comment-16116859 ] Thomas Graves commented on SPARK-21655: --- the yarn kill does work, but it does a kill signal on the container. Generally the shutdown hooks allow things to unregister and cleanup ok, but we have seen times this doesn't happen. A spark kill command allows for things to shutdown and cleanup nicely. The event queue can try to drain, we can make sure the history server url is proper and do a stop like on a normal success case. it also adds a nice api that users can use to kill. I have gotten the question many times in the past from new users about how do I kill my application. Having a spark cli to do this would be more convenient. It is already supported across mesos and standalone so adding it here just makes it more consistent. The underlying rpc connection can be used for other things in the future. Like adding a better get status command, getting history info from the cli, pushing new tokens, etc. > Kill CLI for Yarn mode > -- > > Key: SPARK-21655 > URL: https://issues.apache.org/jira/browse/SPARK-21655 > Project: Spark > Issue Type: Improvement > Components: YARN >Affects Versions: 2.1.1 >Reporter: Jong Yoon Lee >Priority: Minor > Fix For: 2.1.1 > > Original Estimate: 168h > Remaining Estimate: 168h > > Similar to how standalone and Mesos have the capability to safely shut down > the spark application, there should be a way to safely shut down spark on > Yarn mode. This will ensure a clean shutdown and unregistration from yarn. > This is the design doc: > https://docs.google.com/document/d/1QG8hITjLNi1D9dVR3b_hZkyrGm5FFm0u9M1KGM4y1Ak/edit?usp=sharing > and I will upload the patch soon -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18838) High latency of event processing for large jobs
[ https://issues.apache.org/jira/browse/SPARK-18838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16116886#comment-16116886 ] Miles Crawford commented on SPARK-18838: This seems to be the core issue for a large number of bugs. It's causing regular stability issues for us as well. Is a more thorough look in progress? What is the recourse for people getting bit by this bug? > High latency of event processing for large jobs > --- > > Key: SPARK-18838 > URL: https://issues.apache.org/jira/browse/SPARK-18838 > Project: Spark > Issue Type: Improvement >Affects Versions: 2.0.0 >Reporter: Sital Kedia > Attachments: perfResults.pdf, SparkListernerComputeTime.xlsx > > > Currently we are observing the issue of very high event processing delay in > driver's `ListenerBus` for large jobs with many tasks. Many critical > component of the scheduler like `ExecutorAllocationManager`, > `HeartbeatReceiver` depend on the `ListenerBus` events and this delay might > hurt the job performance significantly or even fail the job. For example, a > significant delay in receiving the `SparkListenerTaskStart` might cause > `ExecutorAllocationManager` manager to mistakenly remove an executor which is > not idle. > The problem is that the event processor in `ListenerBus` is a single thread > which loops through all the Listeners for each event and processes each event > synchronously > https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala#L94. > This single threaded processor often becomes the bottleneck for large jobs. > Also, if one of the Listener is very slow, all the listeners will pay the > price of delay incurred by the slow listener. In addition to that a slow > listener can cause events to be dropped from the event queue which might be > fatal to the job. > To solve the above problems, we propose to get rid of the event queue and the > single threaded event processor. Instead each listener will have its own > dedicate single threaded executor service . When ever an event is posted, it > will be submitted to executor service of all the listeners. The Single > threaded executor service will guarantee in order processing of the events > per listener. The queue used for the executor service will be bounded to > guarantee we do not grow the memory indefinitely. The downside of this > approach is separate event queue per listener will increase the driver memory > footprint. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18838) High latency of event processing for large jobs
[ https://issues.apache.org/jira/browse/SPARK-18838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16116896#comment-16116896 ] Miles Crawford commented on SPARK-18838: We do not use dynamic allocation, and our applications frequently hang completely after seeing this log message, not just the UI. Can I gather any information from an application in this state that would inform the priority of this issue, or the possible fixes? > High latency of event processing for large jobs > --- > > Key: SPARK-18838 > URL: https://issues.apache.org/jira/browse/SPARK-18838 > Project: Spark > Issue Type: Improvement >Affects Versions: 2.0.0 >Reporter: Sital Kedia > Attachments: perfResults.pdf, SparkListernerComputeTime.xlsx > > > Currently we are observing the issue of very high event processing delay in > driver's `ListenerBus` for large jobs with many tasks. Many critical > component of the scheduler like `ExecutorAllocationManager`, > `HeartbeatReceiver` depend on the `ListenerBus` events and this delay might > hurt the job performance significantly or even fail the job. For example, a > significant delay in receiving the `SparkListenerTaskStart` might cause > `ExecutorAllocationManager` manager to mistakenly remove an executor which is > not idle. > The problem is that the event processor in `ListenerBus` is a single thread > which loops through all the Listeners for each event and processes each event > synchronously > https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala#L94. > This single threaded processor often becomes the bottleneck for large jobs. > Also, if one of the Listener is very slow, all the listeners will pay the > price of delay incurred by the slow listener. In addition to that a slow > listener can cause events to be dropped from the event queue which might be > fatal to the job. > To solve the above problems, we propose to get rid of the event queue and the > single threaded event processor. Instead each listener will have its own > dedicate single threaded executor service . When ever an event is posted, it > will be submitted to executor service of all the listeners. The Single > threaded executor service will guarantee in order processing of the events > per listener. The queue used for the executor service will be bounded to > guarantee we do not grow the memory indefinitely. The downside of this > approach is separate event queue per listener will increase the driver memory > footprint. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-18838) High latency of event processing for large jobs
[ https://issues.apache.org/jira/browse/SPARK-18838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16116916#comment-16116916 ] Miles Crawford edited comment on SPARK-18838 at 8/7/17 5:40 PM: Can I get specific direction on the logs you'd like to see? I see bursts of lines like this: {{ 2017-08-07 17:33:45,911 WARN org.apache.spark.scheduler.LiveListenerBus: Dropped 48996 SparkListenerEvents since Mon Aug 07 17:32:45 UTC 2017 2017-08-07 17:34:45,911 WARN org.apache.spark.scheduler.LiveListenerBus: Dropped 86057 SparkListenerEvents since Mon Aug 07 17:33:45 UTC 2017 }} And after a while we get in a state where every executor is idle, and the driver appears to still be waiting for results from executors, right in the middle of a stage. was (Author: milesc): Can I get specific direction on the logs you'd like to see? I see bursts of lines like this: {{2017-08-07 17:33:45,911 WARN org.apache.spark.scheduler.LiveListenerBus: Dropped 48996 SparkListenerEvents since Mon Aug 07 17:32:45 UTC 2017 2017-08-07 17:34:45,911 WARN org.apache.spark.scheduler.LiveListenerBus: Dropped 86057 SparkListenerEvents since Mon Aug 07 17:33:45 UTC 2017}} And after a while we get in a state where every executor is idle, and the driver appears to still be waiting for results from executors, right in the middle of a stage. > High latency of event processing for large jobs > --- > > Key: SPARK-18838 > URL: https://issues.apache.org/jira/browse/SPARK-18838 > Project: Spark > Issue Type: Improvement >Affects Versions: 2.0.0 >Reporter: Sital Kedia > Attachments: perfResults.pdf, SparkListernerComputeTime.xlsx > > > Currently we are observing the issue of very high event processing delay in > driver's `ListenerBus` for large jobs with many tasks. Many critical > component of the scheduler like `ExecutorAllocationManager`, > `HeartbeatReceiver` depend on the `ListenerBus` events and this delay might > hurt the job performance significantly or even fail the job. For example, a > significant delay in receiving the `SparkListenerTaskStart` might cause > `ExecutorAllocationManager` manager to mistakenly remove an executor which is > not idle. > The problem is that the event processor in `ListenerBus` is a single thread > which loops through all the Listeners for each event and processes each event > synchronously > https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala#L94. > This single threaded processor often becomes the bottleneck for large jobs. > Also, if one of the Listener is very slow, all the listeners will pay the > price of delay incurred by the slow listener. In addition to that a slow > listener can cause events to be dropped from the event queue which might be > fatal to the job. > To solve the above problems, we propose to get rid of the event queue and the > single threaded event processor. Instead each listener will have its own > dedicate single threaded executor service . When ever an event is posted, it > will be submitted to executor service of all the listeners. The Single > threaded executor service will guarantee in order processing of the events > per listener. The queue used for the executor service will be bounded to > guarantee we do not grow the memory indefinitely. The downside of this > approach is separate event queue per listener will increase the driver memory > footprint. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18838) High latency of event processing for large jobs
[ https://issues.apache.org/jira/browse/SPARK-18838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16116916#comment-16116916 ] Miles Crawford commented on SPARK-18838: Can I get specific direction on the logs you'd like to see? I see bursts of lines like this: {{2017-08-07 17:33:45,911 WARN org.apache.spark.scheduler.LiveListenerBus: Dropped 48996 SparkListenerEvents since Mon Aug 07 17:32:45 UTC 2017 2017-08-07 17:34:45,911 WARN org.apache.spark.scheduler.LiveListenerBus: Dropped 86057 SparkListenerEvents since Mon Aug 07 17:33:45 UTC 2017}} And after a while we get in a state where every executor is idle, and the driver appears to still be waiting for results from executors, right in the middle of a stage. > High latency of event processing for large jobs > --- > > Key: SPARK-18838 > URL: https://issues.apache.org/jira/browse/SPARK-18838 > Project: Spark > Issue Type: Improvement >Affects Versions: 2.0.0 >Reporter: Sital Kedia > Attachments: perfResults.pdf, SparkListernerComputeTime.xlsx > > > Currently we are observing the issue of very high event processing delay in > driver's `ListenerBus` for large jobs with many tasks. Many critical > component of the scheduler like `ExecutorAllocationManager`, > `HeartbeatReceiver` depend on the `ListenerBus` events and this delay might > hurt the job performance significantly or even fail the job. For example, a > significant delay in receiving the `SparkListenerTaskStart` might cause > `ExecutorAllocationManager` manager to mistakenly remove an executor which is > not idle. > The problem is that the event processor in `ListenerBus` is a single thread > which loops through all the Listeners for each event and processes each event > synchronously > https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala#L94. > This single threaded processor often becomes the bottleneck for large jobs. > Also, if one of the Listener is very slow, all the listeners will pay the > price of delay incurred by the slow listener. In addition to that a slow > listener can cause events to be dropped from the event queue which might be > fatal to the job. > To solve the above problems, we propose to get rid of the event queue and the > single threaded event processor. Instead each listener will have its own > dedicate single threaded executor service . When ever an event is posted, it > will be submitted to executor service of all the listeners. The Single > threaded executor service will guarantee in order processing of the events > per listener. The queue used for the executor service will be bounded to > guarantee we do not grow the memory indefinitely. The downside of this > approach is separate event queue per listener will increase the driver memory > footprint. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21652) Optimizer cannot reach a fixed point on certain queries
[ https://issues.apache.org/jira/browse/SPARK-21652?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16116936#comment-16116936 ] Anton Okolnychyi commented on SPARK-21652: -- Yes, disabling the constraint propagation helps because `InferFiltersFromConstraints` will not apply. I found several issues regarding the performance of InferFiltersFromConstraints but what about the logic of `ConstantPropagation` in the above example? Should it replace such predicates as `(a = b)` with `(1 = 1)` even if it is semantically correct? > Optimizer cannot reach a fixed point on certain queries > --- > > Key: SPARK-21652 > URL: https://issues.apache.org/jira/browse/SPARK-21652 > Project: Spark > Issue Type: Bug > Components: Optimizer, SQL >Affects Versions: 2.2.0 >Reporter: Anton Okolnychyi > > The optimizer cannot reach a fixed point on the following query: > {code} > Seq((1, 2)).toDF("col1", "col2").write.saveAsTable("t1") > Seq(1, 2).toDF("col").write.saveAsTable("t2") > spark.sql("SELECT * FROM t1, t2 WHERE t1.col1 = 1 AND 1 = t1.col2 AND t1.col1 > = t2.col AND t1.col2 = t2.col").explain(true) > {code} > At some point during the optimization, InferFiltersFromConstraints infers a > new constraint '(col2#33 = col1#32)' that is appended to the join condition, > then PushPredicateThroughJoin pushes it down, ConstantPropagation replaces > '(col2#33 = col1#32)' with '1 = 1' based on other propagated constraints, > ConstantFolding replaces '1 = 1' with 'true and BooleanSimplification finally > removes this predicate. However, InferFiltersFromConstraints will again infer > '(col2#33 = col1#32)' on the next iteration and the process will continue > until the limit of iterations is reached. > See below for more details > {noformat} > === Applying Rule > org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints === > !Join Inner, ((col1#32 = col#34) && (col2#33 = col#34)) > Join Inner, ((col2#33 = col1#32) && ((col1#32 = col#34) && > (col2#33 = col#34))) > :- Filter ((isnotnull(col1#32) && isnotnull(col2#33)) && ((col1#32 = 1) && > (1 = col2#33))) :- Filter ((isnotnull(col1#32) && isnotnull(col2#33)) && > ((col1#32 = 1) && (1 = col2#33))) > : +- Relation[col1#32,col2#33] parquet > : +- Relation[col1#32,col2#33] parquet > +- Filter ((1 = col#34) && isnotnull(col#34)) > +- Filter ((1 = col#34) && isnotnull(col#34)) > +- Relation[col#34] parquet >+- Relation[col#34] parquet > > === Applying Rule > org.apache.spark.sql.catalyst.optimizer.PushPredicateThroughJoin === > !Join Inner, ((col2#33 = col1#32) && ((col1#32 = col#34) && (col2#33 = > col#34))) Join Inner, ((col1#32 = col#34) && (col2#33 = col#34)) > !:- Filter ((isnotnull(col1#32) && isnotnull(col2#33)) && ((col1#32 = 1) && > (1 = col2#33))) :- Filter (col2#33 = col1#32) > !: +- Relation[col1#32,col2#33] parquet > : +- Filter ((isnotnull(col1#32) && isnotnull(col2#33)) && > ((col1#32 = 1) && (1 = col2#33))) > !+- Filter ((1 = col#34) && isnotnull(col#34)) > : +- Relation[col1#32,col2#33] parquet > ! +- Relation[col#34] parquet > +- Filter ((1 = col#34) && isnotnull(col#34)) > ! >+- Relation[col#34] parquet > > === Applying Rule org.apache.spark.sql.catalyst.optimizer.CombineFilters === > Join Inner, ((col1#32 = col#34) && (col2#33 = col#34)) >Join Inner, ((col1#32 = col#34) && (col2#33 = col#34)) > !:- Filter (col2#33 = col1#32) >:- Filter (((isnotnull(col1#32) && isnotnull(col2#33)) && > ((col1#32 = 1) && (1 = col2#33))) && (col2#33 = col1#32)) > !: +- Filter ((isnotnull(col1#32) && isnotnull(col2#33)) && ((col1#32 = 1) > && (1 = col2#33))) : +- Relation[col1#32,col2#33] parquet > !: +- Relation[col1#32,col2#33] parquet >+- Filter ((1 = col#34) && isnotnull(col#34)) > !+- Filter ((1 = col#34) && isnotnull(col#34)) > +- Relation[col#34] parquet > ! +- Relation[col#34] parquet > > > === Applying Rule org.apache.spark.sql.catalyst.optimizer.ConstantPropagation > === > Join Inner, ((col1#32 = col#34) &&
[jira] [Commented] (SPARK-21655) Kill CLI for Yarn mode
[ https://issues.apache.org/jira/browse/SPARK-21655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16116872#comment-16116872 ] Sean Owen commented on SPARK-21655: --- I haven't thought this through but does this open up a security problem? if the UI is proxying kill requests. I suppose that ship has sailed. > Kill CLI for Yarn mode > -- > > Key: SPARK-21655 > URL: https://issues.apache.org/jira/browse/SPARK-21655 > Project: Spark > Issue Type: Improvement > Components: YARN >Affects Versions: 2.1.1 >Reporter: Jong Yoon Lee >Priority: Minor > Fix For: 2.1.1 > > Original Estimate: 168h > Remaining Estimate: 168h > > Similar to how standalone and Mesos have the capability to safely shut down > the spark application, there should be a way to safely shut down spark on > Yarn mode. This will ensure a clean shutdown and unregistration from yarn. > This is the design doc: > https://docs.google.com/document/d/1QG8hITjLNi1D9dVR3b_hZkyrGm5FFm0u9M1KGM4y1Ak/edit?usp=sharing > and I will upload the patch soon -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18838) High latency of event processing for large jobs
[ https://issues.apache.org/jira/browse/SPARK-18838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16116892#comment-16116892 ] Marcelo Vanzin commented on SPARK-18838: bq. although that is guaranteed to happen when the wrong events are dropped I think there are, mainly, two different things that go on when the listener bus gets backed up: - events dropped mess up the UI - events dropped mess up dynamic allocation The UI being messed up doesn't mean the application is not making progress. The scheduler does not use the event bus nor UI information to decide what to do next. Dynamic allocation getting messed up is a real problem; currently the best course is probably to disable it, or to play with settings to increase the queue size and disable other expensive listeners. A blocking strategy is not out of the picture, but it needs to be properly studied to understand its effects. At the very least, it will cause memory usage to increase and will slow down the scheduler, even if it does not cause actual errors. It's kinda sub-optimal to slow down the whole Spark app because some listener in the driver is doing I/O. > High latency of event processing for large jobs > --- > > Key: SPARK-18838 > URL: https://issues.apache.org/jira/browse/SPARK-18838 > Project: Spark > Issue Type: Improvement >Affects Versions: 2.0.0 >Reporter: Sital Kedia > Attachments: perfResults.pdf, SparkListernerComputeTime.xlsx > > > Currently we are observing the issue of very high event processing delay in > driver's `ListenerBus` for large jobs with many tasks. Many critical > component of the scheduler like `ExecutorAllocationManager`, > `HeartbeatReceiver` depend on the `ListenerBus` events and this delay might > hurt the job performance significantly or even fail the job. For example, a > significant delay in receiving the `SparkListenerTaskStart` might cause > `ExecutorAllocationManager` manager to mistakenly remove an executor which is > not idle. > The problem is that the event processor in `ListenerBus` is a single thread > which loops through all the Listeners for each event and processes each event > synchronously > https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala#L94. > This single threaded processor often becomes the bottleneck for large jobs. > Also, if one of the Listener is very slow, all the listeners will pay the > price of delay incurred by the slow listener. In addition to that a slow > listener can cause events to be dropped from the event queue which might be > fatal to the job. > To solve the above problems, we propose to get rid of the event queue and the > single threaded event processor. Instead each listener will have its own > dedicate single threaded executor service . When ever an event is posted, it > will be submitted to executor service of all the listeners. The Single > threaded executor service will guarantee in order processing of the events > per listener. The queue used for the executor service will be bounded to > guarantee we do not grow the memory indefinitely. The downside of this > approach is separate event queue per listener will increase the driver memory > footprint. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18838) High latency of event processing for large jobs
[ https://issues.apache.org/jira/browse/SPARK-18838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16116903#comment-16116903 ] Marcelo Vanzin commented on SPARK-18838: I'd be interested in seeing logs from an application not using dynamic allocation that hangs because of this issue. I was under the impression that the scheduler was pretty isolated from the listener bus and things should work, even if you can't tell that they are from the UI. I think there's a consensus about what the solution is (implementing queues so that slow listeners can be isolated), and perhaps once that's done we can even think about adding blocking (since we can then control which listeners can block the listener bus - e.g. we can make the dynamic allocation listener blocking, but the event log listener is allowed to drop events). There are even a couple of PRs opened, but since this is a very core part of the Spark code, there's going to be more scrutiny about the code that is being written. > High latency of event processing for large jobs > --- > > Key: SPARK-18838 > URL: https://issues.apache.org/jira/browse/SPARK-18838 > Project: Spark > Issue Type: Improvement >Affects Versions: 2.0.0 >Reporter: Sital Kedia > Attachments: perfResults.pdf, SparkListernerComputeTime.xlsx > > > Currently we are observing the issue of very high event processing delay in > driver's `ListenerBus` for large jobs with many tasks. Many critical > component of the scheduler like `ExecutorAllocationManager`, > `HeartbeatReceiver` depend on the `ListenerBus` events and this delay might > hurt the job performance significantly or even fail the job. For example, a > significant delay in receiving the `SparkListenerTaskStart` might cause > `ExecutorAllocationManager` manager to mistakenly remove an executor which is > not idle. > The problem is that the event processor in `ListenerBus` is a single thread > which loops through all the Listeners for each event and processes each event > synchronously > https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala#L94. > This single threaded processor often becomes the bottleneck for large jobs. > Also, if one of the Listener is very slow, all the listeners will pay the > price of delay incurred by the slow listener. In addition to that a slow > listener can cause events to be dropped from the event queue which might be > fatal to the job. > To solve the above problems, we propose to get rid of the event queue and the > single threaded event processor. Instead each listener will have its own > dedicate single threaded executor service . When ever an event is posted, it > will be submitted to executor service of all the listeners. The Single > threaded executor service will guarantee in order processing of the events > per listener. The queue used for the executor service will be bounded to > guarantee we do not grow the memory indefinitely. The downside of this > approach is separate event queue per listener will increase the driver memory > footprint. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20863) Add metrics/instrumentation to LiveListenerBus
[ https://issues.apache.org/jira/browse/SPARK-20863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16116905#comment-16116905 ] Miles Crawford commented on SPARK-20863: How can I enable and view these metrics on my applications? I am having lots of trouble with dropped events, and I'd like to diagnose the issue. > Add metrics/instrumentation to LiveListenerBus > -- > > Key: SPARK-20863 > URL: https://issues.apache.org/jira/browse/SPARK-20863 > Project: Spark > Issue Type: New Feature > Components: Spark Core >Affects Versions: 2.2.0 >Reporter: Josh Rosen >Assignee: Josh Rosen > Fix For: 2.3.0 > > > I think that we should add Coda Hale metrics to the LiveListenerBus in order > to count the number of queued, processed, and dropped events, as well as a > timer tracking per-event processing times. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18838) High latency of event processing for large jobs
[ https://issues.apache.org/jira/browse/SPARK-18838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16116930#comment-16116930 ] Jason Dunkelberger commented on SPARK-18838: Thanks for your quick outline [~vanzin]. It gives a much better idea of the intention, which might help me see something significant. > High latency of event processing for large jobs > --- > > Key: SPARK-18838 > URL: https://issues.apache.org/jira/browse/SPARK-18838 > Project: Spark > Issue Type: Improvement >Affects Versions: 2.0.0 >Reporter: Sital Kedia > Attachments: perfResults.pdf, SparkListernerComputeTime.xlsx > > > Currently we are observing the issue of very high event processing delay in > driver's `ListenerBus` for large jobs with many tasks. Many critical > component of the scheduler like `ExecutorAllocationManager`, > `HeartbeatReceiver` depend on the `ListenerBus` events and this delay might > hurt the job performance significantly or even fail the job. For example, a > significant delay in receiving the `SparkListenerTaskStart` might cause > `ExecutorAllocationManager` manager to mistakenly remove an executor which is > not idle. > The problem is that the event processor in `ListenerBus` is a single thread > which loops through all the Listeners for each event and processes each event > synchronously > https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala#L94. > This single threaded processor often becomes the bottleneck for large jobs. > Also, if one of the Listener is very slow, all the listeners will pay the > price of delay incurred by the slow listener. In addition to that a slow > listener can cause events to be dropped from the event queue which might be > fatal to the job. > To solve the above problems, we propose to get rid of the event queue and the > single threaded event processor. Instead each listener will have its own > dedicate single threaded executor service . When ever an event is posted, it > will be submitted to executor service of all the listeners. The Single > threaded executor service will guarantee in order processing of the events > per listener. The queue used for the executor service will be bounded to > guarantee we do not grow the memory indefinitely. The downside of this > approach is separate event queue per listener will increase the driver memory > footprint. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-18838) High latency of event processing for large jobs
[ https://issues.apache.org/jira/browse/SPARK-18838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16116916#comment-16116916 ] Miles Crawford edited comment on SPARK-18838 at 8/7/17 5:41 PM: Can I get specific direction on the logs you'd like to see? I see bursts of lines like this: 2017-08-07 17:33:45,911 WARN org.apache.spark.scheduler.LiveListenerBus: Dropped 48996 SparkListenerEvents since Mon Aug 07 17:32:45 UTC 2017 2017-08-07 17:34:45,911 WARN org.apache.spark.scheduler.LiveListenerBus: Dropped 86057 SparkListenerEvents since Mon Aug 07 17:33:45 UTC 2017 And after a while we get in a state where every executor is idle, and the driver appears to still be waiting for results from executors, right in the middle of a stage. was (Author: milesc): Can I get specific direction on the logs you'd like to see? I see bursts of lines like this: bq. 2017-08-07 17:33:45,911 WARN org.apache.spark.scheduler.LiveListenerBus: Dropped 48996 SparkListenerEvents since Mon Aug 07 17:32:45 UTC 2017 bq. 2017-08-07 17:34:45,911 WARN org.apache.spark.scheduler.LiveListenerBus: Dropped 86057 SparkListenerEvents since Mon Aug 07 17:33:45 UTC 2017 bq. And after a while we get in a state where every executor is idle, and the driver appears to still be waiting for results from executors, right in the middle of a stage. > High latency of event processing for large jobs > --- > > Key: SPARK-18838 > URL: https://issues.apache.org/jira/browse/SPARK-18838 > Project: Spark > Issue Type: Improvement >Affects Versions: 2.0.0 >Reporter: Sital Kedia > Attachments: perfResults.pdf, SparkListernerComputeTime.xlsx > > > Currently we are observing the issue of very high event processing delay in > driver's `ListenerBus` for large jobs with many tasks. Many critical > component of the scheduler like `ExecutorAllocationManager`, > `HeartbeatReceiver` depend on the `ListenerBus` events and this delay might > hurt the job performance significantly or even fail the job. For example, a > significant delay in receiving the `SparkListenerTaskStart` might cause > `ExecutorAllocationManager` manager to mistakenly remove an executor which is > not idle. > The problem is that the event processor in `ListenerBus` is a single thread > which loops through all the Listeners for each event and processes each event > synchronously > https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala#L94. > This single threaded processor often becomes the bottleneck for large jobs. > Also, if one of the Listener is very slow, all the listeners will pay the > price of delay incurred by the slow listener. In addition to that a slow > listener can cause events to be dropped from the event queue which might be > fatal to the job. > To solve the above problems, we propose to get rid of the event queue and the > single threaded event processor. Instead each listener will have its own > dedicate single threaded executor service . When ever an event is posted, it > will be submitted to executor service of all the listeners. The Single > threaded executor service will guarantee in order processing of the events > per listener. The queue used for the executor service will be bounded to > guarantee we do not grow the memory indefinitely. The downside of this > approach is separate event queue per listener will increase the driver memory > footprint. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-18838) High latency of event processing for large jobs
[ https://issues.apache.org/jira/browse/SPARK-18838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16116916#comment-16116916 ] Miles Crawford edited comment on SPARK-18838 at 8/7/17 5:41 PM: Can I get specific direction on the logs you'd like to see? I see bursts of lines like this: bq. 2017-08-07 17:33:45,911 WARN org.apache.spark.scheduler.LiveListenerBus: Dropped 48996 SparkListenerEvents since Mon Aug 07 17:32:45 UTC 2017 bq. 2017-08-07 17:34:45,911 WARN org.apache.spark.scheduler.LiveListenerBus: Dropped 86057 SparkListenerEvents since Mon Aug 07 17:33:45 UTC 2017 bq. And after a while we get in a state where every executor is idle, and the driver appears to still be waiting for results from executors, right in the middle of a stage. was (Author: milesc): Can I get specific direction on the logs you'd like to see? I see bursts of lines like this: {{ 2017-08-07 17:33:45,911 WARN org.apache.spark.scheduler.LiveListenerBus: Dropped 48996 SparkListenerEvents since Mon Aug 07 17:32:45 UTC 2017 2017-08-07 17:34:45,911 WARN org.apache.spark.scheduler.LiveListenerBus: Dropped 86057 SparkListenerEvents since Mon Aug 07 17:33:45 UTC 2017 }} And after a while we get in a state where every executor is idle, and the driver appears to still be waiting for results from executors, right in the middle of a stage. > High latency of event processing for large jobs > --- > > Key: SPARK-18838 > URL: https://issues.apache.org/jira/browse/SPARK-18838 > Project: Spark > Issue Type: Improvement >Affects Versions: 2.0.0 >Reporter: Sital Kedia > Attachments: perfResults.pdf, SparkListernerComputeTime.xlsx > > > Currently we are observing the issue of very high event processing delay in > driver's `ListenerBus` for large jobs with many tasks. Many critical > component of the scheduler like `ExecutorAllocationManager`, > `HeartbeatReceiver` depend on the `ListenerBus` events and this delay might > hurt the job performance significantly or even fail the job. For example, a > significant delay in receiving the `SparkListenerTaskStart` might cause > `ExecutorAllocationManager` manager to mistakenly remove an executor which is > not idle. > The problem is that the event processor in `ListenerBus` is a single thread > which loops through all the Listeners for each event and processes each event > synchronously > https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala#L94. > This single threaded processor often becomes the bottleneck for large jobs. > Also, if one of the Listener is very slow, all the listeners will pay the > price of delay incurred by the slow listener. In addition to that a slow > listener can cause events to be dropped from the event queue which might be > fatal to the job. > To solve the above problems, we propose to get rid of the event queue and the > single threaded event processor. Instead each listener will have its own > dedicate single threaded executor service . When ever an event is posted, it > will be submitted to executor service of all the listeners. The Single > threaded executor service will guarantee in order processing of the events > per listener. The queue used for the executor service will be bounded to > guarantee we do not grow the memory indefinitely. The downside of this > approach is separate event queue per listener will increase the driver memory > footprint. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18838) High latency of event processing for large jobs
[ https://issues.apache.org/jira/browse/SPARK-18838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16116922#comment-16116922 ] Marcelo Vanzin commented on SPARK-18838: I'd like to see something that backs your assertion that the app is not making progress; that will probably just mean looking at all the logs. Those messages are expected when your listener bus is backed up; what is not expected is the app to stop making progress. > High latency of event processing for large jobs > --- > > Key: SPARK-18838 > URL: https://issues.apache.org/jira/browse/SPARK-18838 > Project: Spark > Issue Type: Improvement >Affects Versions: 2.0.0 >Reporter: Sital Kedia > Attachments: perfResults.pdf, SparkListernerComputeTime.xlsx > > > Currently we are observing the issue of very high event processing delay in > driver's `ListenerBus` for large jobs with many tasks. Many critical > component of the scheduler like `ExecutorAllocationManager`, > `HeartbeatReceiver` depend on the `ListenerBus` events and this delay might > hurt the job performance significantly or even fail the job. For example, a > significant delay in receiving the `SparkListenerTaskStart` might cause > `ExecutorAllocationManager` manager to mistakenly remove an executor which is > not idle. > The problem is that the event processor in `ListenerBus` is a single thread > which loops through all the Listeners for each event and processes each event > synchronously > https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala#L94. > This single threaded processor often becomes the bottleneck for large jobs. > Also, if one of the Listener is very slow, all the listeners will pay the > price of delay incurred by the slow listener. In addition to that a slow > listener can cause events to be dropped from the event queue which might be > fatal to the job. > To solve the above problems, we propose to get rid of the event queue and the > single threaded event processor. Instead each listener will have its own > dedicate single threaded executor service . When ever an event is posted, it > will be submitted to executor service of all the listeners. The Single > threaded executor service will guarantee in order processing of the events > per listener. The queue used for the executor service will be bounded to > guarantee we do not grow the memory indefinitely. The downside of this > approach is separate event queue per listener will increase the driver memory > footprint. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org