[jira] [Created] (SPARK-27225) Implement join strategy hints
Maryann Xue created SPARK-27225: --- Summary: Implement join strategy hints Key: SPARK-27225 URL: https://issues.apache.org/jira/browse/SPARK-27225 Project: Spark Issue Type: New Feature Components: SQL Affects Versions: 2.4.0 Reporter: Maryann Xue Extend the existing BROADCAST join hint by implementing other join strategy hints corresponding to the rest of Spark's existing join strategies: shuffle-hash, sort-merge, cartesian-product. Broadcast-nested-loop will use BROADCAST hint as it does now. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-27214) Upgrading locality level when lots of pending tasks have been waiting more than locality.wait
[ https://issues.apache.org/jira/browse/SPARK-27214?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] liupengcheng updated SPARK-27214: - Description: Currently, Spark locality wait mechanism is not friendly for large job, when number of tasks is large(e.g. 1+)and with a large number of executors(e.g. 2000), executors may be launched on some nodes where the locality is not the best(not the same nodes hold HDFS blocks). There are cases when `TaskSetManager.lastLaunchTime` is refreshed due to finished tasks within `spark.locality.wait` but coming at low rate(e.g. every `spark.locality.wait` seconds a task is finished), so locality level would not be upgraded and lots of pending tasks will wait a long time. In this case, when `spark.dynamicAllocation.enabled=true`, then lots of executors may be removed by Driver due to become idle and finally slow down the job. We encountered this issue in our production spark cluster, it caused lots of resources wasting and slowed down user's application. Actually, we can optimize this by following formula: Suppose numPendingTasks=1, localityExecutionGainFactor=0.1, probabilityOfNextLocalitySchedule=0.5 {code:java} maxStarvingTimeForTasks = numPendingTasks * medianOfTaskExecutionTime * localityExecutionGainFactor * probabilityOfNextLocalitySchedule totalStarvingTime = sum(starvingTimeByTasks) if (totalStarvingTime > maxStarvingTimeForTasks) { upgrading locality level... }{code} was: Currently, Spark locality wait mechanism is not friendly for large job, when number of tasks is large(e.g. 1+)and with a large number of executors(e.g. 2000), executors may be launched on some nodes where the locality is not the best(not the same nodes hold HDFS blocks). There are cases when `TaskSetManager.lastLaunchTime` is refreshed due to finished tasks within `spark.locality.wait` but coming at low rate(e.g. every `spark.locality.wait` seconds a task is finished), so locality level would not be upgraded and lots of pending tasks will wait a long time. In this case, when `spark.dynamicAllocation.enabled=true`, then lots of executors may be removed by Driver due to become idle and finally slow down the job. Actually, we can optimize this by following formula: Suppose numPendingTasks=1, localityExecutionGainFactor=0.1, probabilityOfNextLocalitySchedule=0.5 ``` maxStarvingTasks = numPendingTasks * medianOfTaskExecutionTime * localityExecutionGainFactor * probabilityOfNextLocalitySchedule / `spark.locality.wait` if (numStavingTasks > maxStarvingTasks) { upgrading locality level... } ``` > Upgrading locality level when lots of pending tasks have been waiting more > than locality.wait > - > > Key: SPARK-27214 > URL: https://issues.apache.org/jira/browse/SPARK-27214 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.1.0, 2.4.0 >Reporter: liupengcheng >Priority: Major > > Currently, Spark locality wait mechanism is not friendly for large job, when > number of tasks is large(e.g. 1+)and with a large number of > executors(e.g. 2000), executors may be launched on some nodes where the > locality is not the best(not the same nodes hold HDFS blocks). There are > cases when `TaskSetManager.lastLaunchTime` is refreshed due to finished tasks > within `spark.locality.wait` but coming at low rate(e.g. every > `spark.locality.wait` seconds a task is finished), so locality level would > not be upgraded and lots of pending tasks will wait a long time. > In this case, when `spark.dynamicAllocation.enabled=true`, then lots of > executors may be removed by Driver due to become idle and finally slow down > the job. > We encountered this issue in our production spark cluster, it caused lots of > resources wasting and slowed down user's application. > Actually, we can optimize this by following formula: > Suppose numPendingTasks=1, localityExecutionGainFactor=0.1, > probabilityOfNextLocalitySchedule=0.5 > {code:java} > maxStarvingTimeForTasks = numPendingTasks * medianOfTaskExecutionTime * > localityExecutionGainFactor * probabilityOfNextLocalitySchedule > totalStarvingTime = sum(starvingTimeByTasks) > if (totalStarvingTime > maxStarvingTimeForTasks) > { upgrading locality level... }{code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-26894) Fix Alias handling in AggregateEstimation
[ https://issues.apache.org/jira/browse/SPARK-26894?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Takeshi Yamamuro resolved SPARK-26894. -- Resolution: Fixed Fix Version/s: 3.0.0 Resolved by https://github.com/apache/spark/pull/23803 > Fix Alias handling in AggregateEstimation > - > > Key: SPARK-26894 > URL: https://issues.apache.org/jira/browse/SPARK-26894 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.0 >Reporter: Venkata krishnan Sowrirajan >Priority: Major > Fix For: 3.0.0 > > > Aliases are not handled separately in AggregateEstimation similar to > ProjectEstimation due to which stats are not getting propagated when CBO is > enabled. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-27223) Remove private methods that skip conversion when passing user schemas for constructing a DataFrame
[ https://issues.apache.org/jira/browse/SPARK-27223?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hyukjin Kwon resolved SPARK-27223. -- Resolution: Fixed Fix Version/s: 3.0.0 Issue resolved by pull request 24162 [https://github.com/apache/spark/pull/24162] > Remove private methods that skip conversion when passing user schemas for > constructing a DataFrame > -- > > Key: SPARK-27223 > URL: https://issues.apache.org/jira/browse/SPARK-27223 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.0 >Reporter: Maryann Xue >Assignee: Maryann Xue >Priority: Minor > Fix For: 3.0.0 > > > When passing in a user schema to create a DataFrame, there might be > mismatched nullability between the user schema and the the actual data. All > related public interfaces now perform catalyst conversion using the user > provided schema, which catches such mismatches to avoid runtime errors later > on. However, there're private methods which allow this conversion to be > skipped, so we need to remove these private methods which may lead to > confusion and potential issues. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-27221) Improve the assert error message in TreeNode.parseToJson
[ https://issues.apache.org/jira/browse/SPARK-27221?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hyukjin Kwon resolved SPARK-27221. -- Resolution: Fixed Fix Version/s: 3.0.0 Issue resolved by pull request 24159 [https://github.com/apache/spark/pull/24159] > Improve the assert error message in TreeNode.parseToJson > > > Key: SPARK-27221 > URL: https://issues.apache.org/jira/browse/SPARK-27221 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0 >Reporter: Shixiong Zhu >Assignee: Shixiong Zhu >Priority: Minor > Fix For: 3.0.0 > > > When TreeNode.parseToJson may throw an assert error without any error message > when a TreeNode is not implemented properly, and it's hard to find the bad > TreeNode implementation. > It's better to improve the error message to indicate the type of TreeNode. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-27223) Remove private methods that skip conversion when passing user schemas for constructing a DataFrame
[ https://issues.apache.org/jira/browse/SPARK-27223?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hyukjin Kwon reassigned SPARK-27223: Assignee: Maryann Xue > Remove private methods that skip conversion when passing user schemas for > constructing a DataFrame > -- > > Key: SPARK-27223 > URL: https://issues.apache.org/jira/browse/SPARK-27223 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.0 >Reporter: Maryann Xue >Assignee: Maryann Xue >Priority: Minor > > When passing in a user schema to create a DataFrame, there might be > mismatched nullability between the user schema and the the actual data. All > related public interfaces now perform catalyst conversion using the user > provided schema, which catches such mismatches to avoid runtime errors later > on. However, there're private methods which allow this conversion to be > skipped, so we need to remove these private methods which may lead to > confusion and potential issues. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-24105) Spark 2.3.0 on kubernetes
[ https://issues.apache.org/jira/browse/SPARK-24105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16797719#comment-16797719 ] Kevin Hogeland edited comment on SPARK-24105 at 3/21/19 1:07 AM: - [~vanzin] Why was this marked "Won't Fix"? This is a major issue. * There is a limited amount of resources (constrained either by a ResourceQuota or by the size of the cluster) * Drivers are scheduled before executors due to the 2-layer scheduling design * Drivers consume from the same pool of resources as executors, making it possible to consume all available resources * If no driver can schedule an executor, all drivers are stalled indefinitely (even if they timeout and crash) Starting too many drivers at the same time _will_ cause a deadlock. Any spiky workload is very likely to trigger this eventually. For example, if a large amount of Spark jobs are scheduled daily/hourly. We've been able to reproduce this easily in testing. was (Author: hogeland): [~vanzin] Why was this marked "Won't Fix"? This is a _major_ issue. * There is a limited amount of resources (constrained either by a ResourceQuota or by the size of the cluster) * Drivers are scheduled before executors due to the 2-layer scheduling design * Drivers consume from the same pool of resources as executors, making it possible to consume all available resources * If no driver can schedule an executor, all drivers are stalled indefinitely (even if they timeout and crash) Starting too many drivers at the same time _will_ cause a deadlock. Any spiky workload is very likely to trigger this eventually. For example, if a large amount of Spark jobs are scheduled daily/hourly. We've been able to reproduce this easily in testing. > Spark 2.3.0 on kubernetes > - > > Key: SPARK-24105 > URL: https://issues.apache.org/jira/browse/SPARK-24105 > Project: Spark > Issue Type: Improvement > Components: Kubernetes >Affects Versions: 2.3.0 >Reporter: Lenin >Priority: Major > > Right now its only possible to define node selector configurations > thruspark.kubernetes.node.selector.[labelKey]. This gets used for both driver > & executor pods. Without the capability to isolate driver & executor pods, > the cluster can run into a livelock scenario, where if there are a lot of > spark submits, can cause the driver pods to fill up the cluster capacity, > with no room for executor pods to do any work. > > To avoid this deadlock, its required to support node selector (in future > affinity/anti-affinity) configruation by driver & executor. > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-24105) Spark 2.3.0 on kubernetes
[ https://issues.apache.org/jira/browse/SPARK-24105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16797719#comment-16797719 ] Kevin Hogeland edited comment on SPARK-24105 at 3/21/19 1:09 AM: - [~vanzin] Why was this marked "Won't Fix"? This is a major issue. * There are a limited amount of resources (constrained either by a ResourceQuota or by the size of the cluster) * Drivers are scheduled before executors due to the 2-layer scheduling design * Drivers consume from the same pool of resources as executors * Starting too many drivers at once will make it impossible for any driver to schedule an executor * If no driver can schedule an executor, all drivers are stalled indefinitely (even if they timeout and crash) Starting too many drivers at the same time _will_ cause a deadlock. Any spiky workload is very likely to trigger this eventually. For example, if a large amount of Spark jobs are scheduled daily/hourly. We've been able to reproduce this easily in testing. was (Author: hogeland): [~vanzin] Why was this marked "Won't Fix"? This is a major issue. * There are a limited amount of resources (constrained either by a ResourceQuota or by the size of the cluster) * Drivers are scheduled before executors due to the 2-layer scheduling design * Drivers consume from the same pool of resources as executors, making it possible to consume all available resources * If no driver can schedule an executor, all drivers are stalled indefinitely (even if they timeout and crash) Starting too many drivers at the same time _will_ cause a deadlock. Any spiky workload is very likely to trigger this eventually. For example, if a large amount of Spark jobs are scheduled daily/hourly. We've been able to reproduce this easily in testing. > Spark 2.3.0 on kubernetes > - > > Key: SPARK-24105 > URL: https://issues.apache.org/jira/browse/SPARK-24105 > Project: Spark > Issue Type: Improvement > Components: Kubernetes >Affects Versions: 2.3.0 >Reporter: Lenin >Priority: Major > > Right now its only possible to define node selector configurations > thruspark.kubernetes.node.selector.[labelKey]. This gets used for both driver > & executor pods. Without the capability to isolate driver & executor pods, > the cluster can run into a livelock scenario, where if there are a lot of > spark submits, can cause the driver pods to fill up the cluster capacity, > with no room for executor pods to do any work. > > To avoid this deadlock, its required to support node selector (in future > affinity/anti-affinity) configruation by driver & executor. > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-24105) Spark 2.3.0 on kubernetes
[ https://issues.apache.org/jira/browse/SPARK-24105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16797719#comment-16797719 ] Kevin Hogeland edited comment on SPARK-24105 at 3/21/19 1:08 AM: - [~vanzin] Why was this marked "Won't Fix"? This is a major issue. * There are a limited amount of resources (constrained either by a ResourceQuota or by the size of the cluster) * Drivers are scheduled before executors due to the 2-layer scheduling design * Drivers consume from the same pool of resources as executors, making it possible to consume all available resources * If no driver can schedule an executor, all drivers are stalled indefinitely (even if they timeout and crash) Starting too many drivers at the same time _will_ cause a deadlock. Any spiky workload is very likely to trigger this eventually. For example, if a large amount of Spark jobs are scheduled daily/hourly. We've been able to reproduce this easily in testing. was (Author: hogeland): [~vanzin] Why was this marked "Won't Fix"? This is a major issue. * There is a limited amount of resources (constrained either by a ResourceQuota or by the size of the cluster) * Drivers are scheduled before executors due to the 2-layer scheduling design * Drivers consume from the same pool of resources as executors, making it possible to consume all available resources * If no driver can schedule an executor, all drivers are stalled indefinitely (even if they timeout and crash) Starting too many drivers at the same time _will_ cause a deadlock. Any spiky workload is very likely to trigger this eventually. For example, if a large amount of Spark jobs are scheduled daily/hourly. We've been able to reproduce this easily in testing. > Spark 2.3.0 on kubernetes > - > > Key: SPARK-24105 > URL: https://issues.apache.org/jira/browse/SPARK-24105 > Project: Spark > Issue Type: Improvement > Components: Kubernetes >Affects Versions: 2.3.0 >Reporter: Lenin >Priority: Major > > Right now its only possible to define node selector configurations > thruspark.kubernetes.node.selector.[labelKey]. This gets used for both driver > & executor pods. Without the capability to isolate driver & executor pods, > the cluster can run into a livelock scenario, where if there are a lot of > spark submits, can cause the driver pods to fill up the cluster capacity, > with no room for executor pods to do any work. > > To avoid this deadlock, its required to support node selector (in future > affinity/anti-affinity) configruation by driver & executor. > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24105) Spark 2.3.0 on kubernetes
[ https://issues.apache.org/jira/browse/SPARK-24105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16797719#comment-16797719 ] Kevin Hogeland commented on SPARK-24105: [~vanzin] Why was this marked "Won't Fix"? This is a _major_ issue. * There is a limited amount of resources (constrained either by a ResourceQuota or by the size of the cluster) * Drivers are scheduled before executors due to the 2-layer scheduling design * Drivers consume from the same pool of resources as executors, making it possible to consume all available resources * If no driver can schedule an executor, all drivers are stalled indefinitely (even if they timeout and crash) Starting too many drivers at the same time _will_ cause a deadlock. Any spiky workload is very likely to trigger this eventually. For example, if a large amount of Spark jobs are scheduled daily/hourly. We've been able to reproduce this easily in testing. > Spark 2.3.0 on kubernetes > - > > Key: SPARK-24105 > URL: https://issues.apache.org/jira/browse/SPARK-24105 > Project: Spark > Issue Type: Improvement > Components: Kubernetes >Affects Versions: 2.3.0 >Reporter: Lenin >Priority: Major > > Right now its only possible to define node selector configurations > thruspark.kubernetes.node.selector.[labelKey]. This gets used for both driver > & executor pods. Without the capability to isolate driver & executor pods, > the cluster can run into a livelock scenario, where if there are a lot of > spark submits, can cause the driver pods to fill up the cluster capacity, > with no room for executor pods to do any work. > > To avoid this deadlock, its required to support node selector (in future > affinity/anti-affinity) configruation by driver & executor. > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-27224) Spark to_json parses UTC timestamp incorrectly
[ https://issues.apache.org/jira/browse/SPARK-27224?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16797711#comment-16797711 ] Hyukjin Kwon commented on SPARK-27224: -- This was an unexpected behaviour from SimpleDateFormat if I am not mistaken. It's fixed in the current master by explicitly setting {{timestampFormat}}. {code} Seq((s"""{"t":"${t}"}""")).toDF("json").select(from_json(col("json"), schema, Map("timestampFormat" -> "-MM-dd'T'HH:mm:ss.SSSXXX"))).show(false) {code} I don;t think we should backport this lower Spark versions since the changes are quite big and breaking. > Spark to_json parses UTC timestamp incorrectly > -- > > Key: SPARK-27224 > URL: https://issues.apache.org/jira/browse/SPARK-27224 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.0 >Reporter: Jeff Xu >Priority: Major > > When parsing ISO-8601 timestamp, if there is UTC suffix symbol, and more than > 3 digits in the fraction part, from_json will give incorrect result. > > {noformat} > scala> val schema = new StructType().add("t", TimestampType) > # > # no "Z", no problem > # > scala> val t = "2019-03-20T09:01:03.1234567" > scala> Seq((s"""{"t":"${t}"}""")).toDF("json").select(from_json(col("json"), > schema)).show(false) > +-+ > |jsontostructs(json) | > +-+ > |[2019-03-20 09:01:03.123]| > +-+ > # > # Add "Z", incorrect > # > scala> val t = "2019-03-20T09:01:03.1234567Z" > scala> Seq((s"""{"t":"${t}"}""")).toDF("json").select(from_json(col("json"), > schema)).show(false) > +-+ > |jsontostructs(json) | > +-+ > |[2019-03-20 02:21:37.567]| > +-+ > # > # reduce the # of digits, the conversion is incorrect until only we reach 3 > digits > # > scala> val t = "2019-03-20T09:01:03.123456Z" > +-+ > |jsontostructs(json) | > +-+ > |[2019-03-20 02:03:06.456]| > +-+ > scala> val t = "2019-03-20T09:01:03.12345Z > +-+ > |jsontostructs(json) | > +-+ > |[2019-03-20 02:01:15.345]| > +-+ > scala> val t = "2019-03-20T09:01:03.1234Z" > +-+ > |jsontostructs(json) | > +-+ > |[2019-03-20 02:01:04.234]| > +-+ > # correct when there is <=3 digits in fraction > scala> val t = "2019-03-20T09:01:03.123Z" > +-+ > |jsontostructs(json) | > +-+ > |[2019-03-20 02:01:03.123]| > +-+ > scala> val t = "2019-03-20T09:01:03.999Z" > +-+ > |jsontostructs(json) | > +-+ > |[2019-03-20 02:01:03.999]| > +-+ > {noformat} > > This could be related to SPARK-17914. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-27177) Update jenkins locale to en_US.UTF-8
[ https://issues.apache.org/jira/browse/SPARK-27177?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16797709#comment-16797709 ] Hyukjin Kwon commented on SPARK-27177: -- Yes, I think we can unblock https://github.com/apache/spark/pull/23823 as well > Update jenkins locale to en_US.UTF-8 > > > Key: SPARK-27177 > URL: https://issues.apache.org/jira/browse/SPARK-27177 > Project: Spark > Issue Type: Sub-task > Components: Build, jenkins >Affects Versions: 3.0.0 >Reporter: Yuming Wang >Priority: Major > > Two test cases will failed on our jenkins since HADOOP-12045(Hadoop-2.8.0). > I'd like to update our jenkins locale to en_US.UTF-8 to workaround this issue. > How to reproduce: > {code:java} > export LANG= > git clone https://github.com/apache/spark.git && cd spark && git checkout > v2.4.0 > build/sbt "hive/testOnly *.HiveDDLSuite" -Phive -Phadoop-2.7 > -Dhadoop.version=2.8.0 > {code} > Stack trace: > {noformat} > Caused by: sbt.ForkMain$ForkError: java.nio.file.InvalidPathException: > Malformed input or input contains unmappable characters: > /home/jenkins/workspace/SparkPullRequestBuilder@2/target/tmp/warehouse-15474fdf-0808-40ab-946d-1309fb05bf26/DaTaBaSe_I.db/tab_ı > at sun.nio.fs.UnixPath.encode(UnixPath.java:147) > at sun.nio.fs.UnixPath.(UnixPath.java:71) > at sun.nio.fs.UnixFileSystem.getPath(UnixFileSystem.java:281) > at java.io.File.toPath(File.java:2234) > at > org.apache.hadoop.fs.RawLocalFileSystem$DeprecatedRawLocalFileStatus.getLastAccessTime(RawLocalFileSystem.java:683) > at > org.apache.hadoop.fs.RawLocalFileSystem$DeprecatedRawLocalFileStatus.(RawLocalFileSystem.java:694) > at > org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:664) > at > org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:987) > at > org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:656) > at > org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:454) > at org.apache.hadoop.hive.metastore.Warehouse.isDir(Warehouse.java:520) > at > org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.create_table_core(HiveMetaStore.java:1436) > at > org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.create_table_with_environment_context(HiveMetaStore.java:1503) > {noformat} > Workaround: > {code:java} > export LANG=en_US.UTF-8 > build/sbt "hive/testOnly *.HiveDDLSuite" -Phive -Phadoop-2.7 > -Dhadoop.version=2.8.0 > {code} > More details: > https://issues.apache.org/jira/browse/HADOOP-16180 > https://github.com/apache/spark/pull/24044/commits/4c1ec25d3bc64bf358edf1380a7c863596722362 -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-27224) Spark to_json parses UTC timestamp incorrectly
[ https://issues.apache.org/jira/browse/SPARK-27224?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hyukjin Kwon resolved SPARK-27224. -- Resolution: Cannot Reproduce > Spark to_json parses UTC timestamp incorrectly > -- > > Key: SPARK-27224 > URL: https://issues.apache.org/jira/browse/SPARK-27224 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.0 >Reporter: Jeff Xu >Priority: Major > > When parsing ISO-8601 timestamp, if there is UTC suffix symbol, and more than > 3 digits in the fraction part, from_json will give incorrect result. > > {noformat} > scala> val schema = new StructType().add("t", TimestampType) > # > # no "Z", no problem > # > scala> val t = "2019-03-20T09:01:03.1234567" > scala> Seq((s"""{"t":"${t}"}""")).toDF("json").select(from_json(col("json"), > schema)).show(false) > +-+ > |jsontostructs(json) | > +-+ > |[2019-03-20 09:01:03.123]| > +-+ > # > # Add "Z", incorrect > # > scala> val t = "2019-03-20T09:01:03.1234567Z" > scala> Seq((s"""{"t":"${t}"}""")).toDF("json").select(from_json(col("json"), > schema)).show(false) > +-+ > |jsontostructs(json) | > +-+ > |[2019-03-20 02:21:37.567]| > +-+ > # > # reduce the # of digits, the conversion is incorrect until only we reach 3 > digits > # > scala> val t = "2019-03-20T09:01:03.123456Z" > +-+ > |jsontostructs(json) | > +-+ > |[2019-03-20 02:03:06.456]| > +-+ > scala> val t = "2019-03-20T09:01:03.12345Z > +-+ > |jsontostructs(json) | > +-+ > |[2019-03-20 02:01:15.345]| > +-+ > scala> val t = "2019-03-20T09:01:03.1234Z" > +-+ > |jsontostructs(json) | > +-+ > |[2019-03-20 02:01:04.234]| > +-+ > # correct when there is <=3 digits in fraction > scala> val t = "2019-03-20T09:01:03.123Z" > +-+ > |jsontostructs(json) | > +-+ > |[2019-03-20 02:01:03.123]| > +-+ > scala> val t = "2019-03-20T09:01:03.999Z" > +-+ > |jsontostructs(json) | > +-+ > |[2019-03-20 02:01:03.999]| > +-+ > {noformat} > > This could be related to SPARK-17914. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-27006) SPIP: .NET bindings for Apache Spark
[ https://issues.apache.org/jira/browse/SPARK-27006?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16797688#comment-16797688 ] Markus Weimer commented on SPARK-27006: --- +1 on a .NET API for Spark. Over in [ML.NET|[https://github.com/dotnet/machinelearning]] we have a bunch of users who would really benefit from this. > SPIP: .NET bindings for Apache Spark > > > Key: SPARK-27006 > URL: https://issues.apache.org/jira/browse/SPARK-27006 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 3.0.0 >Reporter: Terry Kim >Priority: Major > Original Estimate: 4,032h > Remaining Estimate: 4,032h > > h4. Background and Motivation: > Apache Spark provides programming language support for Scala/Java (native), > and extensions for Python and R. While a variety of other language extensions > are possible to include in Apache Spark, .NET would bring one of the largest > developer community to the table. Presently, no good Big Data solution exists > for .NET developers in open source. This SPIP aims at discussing how we can > bring Apache Spark goodness to the .NET development platform. > .NET is a free, cross-platform, open source developer platform for building > many different types of applications. With .NET, you can use multiple > languages, editors, and libraries to build for web, mobile, desktop, gaming, > and IoT types of applications. Even with .NET serving millions of developers, > there is no good Big Data solution that exists today, which this SPIP aims to > address. > The .NET developer community is one of the largest programming language > communities in the world. Its flagship programming language C# is listed as > one of the most popular programming languages in a variety of articles and > statistics: > * Most popular Technologies on Stack Overflow: > [https://insights.stackoverflow.com/survey/2018/#most-popular-technologies|https://insights.stackoverflow.com/survey/2018/] > > * Most popular languages on GitHub 2018: > [https://www.businessinsider.com/the-10-most-popular-programming-languages-according-to-github-2018-10#2-java-9|https://www.businessinsider.com/the-10-most-popular-programming-languages-according-to-github-2018-10] > > * 1M+ new developers last 1 year > * Second most demanded technology on LinkedIn > * Top 30 High velocity OSS projects on GitHub > Including a C# language extension in Apache Spark will enable millions of > .NET developers to author Big Data applications in their preferred > programming language, developer environment, and tooling support. We aim to > promote the .NET bindings for Spark through engagements with the Spark > community (e.g., we are scheduled to present an early prototype at the SF > Spark Summit 2019) and the .NET developer community (e.g., similar > presentations will be held at .NET developer conferences this year). As > such, we believe that our efforts will help grow the Spark community by > making it accessible to the millions of .NET developers. > Furthermore, our early discussions with some large .NET development teams got > an enthusiastic reception. > We recognize that earlier attempts at this goal (specifically Mobius > [https://github.com/Microsoft/Mobius]) were unsuccessful primarily due to the > lack of communication with the Spark community. Therefore, another goal of > this proposal is to not only develop .NET bindings for Spark in open source, > but also continuously seek feedback from the Spark community via posted > Jira’s (like this one) and the Spark developer mailing list. Our hope is that > through these engagements, we can build a community of developers that are > eager to contribute to this effort or want to leverage the resulting .NET > bindings for Spark in their respective Big Data applications. > h4. Target Personas: > .NET developers looking to build big data solutions. > h4. Goals: > Our primary goal is to help grow Apache Spark by making it accessible to the > large .NET developer base and ecosystem. We will also look for opportunities > to generalize the interop layers for Spark for adding other language > extensions in the future. [SPARK-26257]( > https://issues.apache.org/jira/browse/SPARK-26257) proposes such a > generalized interop layer, which we hope to address over the course of this > project. > Another important goal for us is to not only enable Spark as an application > solution for .NET developers, but also opening the door for .NET developers > to make contributions to Apache Spark itself. > Lastly, we aim to develop a .NET extension in the open, while continually > engaging with the Spark community for feedback on designs and code. We will > welcome PRs from the Spark community throughout this project and aim
[jira] [Created] (SPARK-27224) Spark to_json parses UTC timestamp incorrectly
Jeff Xu created SPARK-27224: --- Summary: Spark to_json parses UTC timestamp incorrectly Key: SPARK-27224 URL: https://issues.apache.org/jira/browse/SPARK-27224 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 2.3.0 Reporter: Jeff Xu When parsing ISO-8601 timestamp, if there is UTC suffix symbol, and more than 3 digits in the fraction part, from_json will give incorrect result. {noformat} scala> val schema = new StructType().add("t", TimestampType) # # no "Z", no problem # scala> val t = "2019-03-20T09:01:03.1234567" scala> Seq((s"""{"t":"${t}"}""")).toDF("json").select(from_json(col("json"), schema)).show(false) +-+ |jsontostructs(json) | +-+ |[2019-03-20 09:01:03.123]| +-+ # # Add "Z", incorrect # scala> val t = "2019-03-20T09:01:03.1234567Z" scala> Seq((s"""{"t":"${t}"}""")).toDF("json").select(from_json(col("json"), schema)).show(false) +-+ |jsontostructs(json) | +-+ |[2019-03-20 02:21:37.567]| +-+ # # reduce the # of digits, the conversion is incorrect until only we reach 3 digits # scala> val t = "2019-03-20T09:01:03.123456Z" +-+ |jsontostructs(json) | +-+ |[2019-03-20 02:03:06.456]| +-+ scala> val t = "2019-03-20T09:01:03.12345Z +-+ |jsontostructs(json) | +-+ |[2019-03-20 02:01:15.345]| +-+ scala> val t = "2019-03-20T09:01:03.1234Z" +-+ |jsontostructs(json) | +-+ |[2019-03-20 02:01:04.234]| +-+ # correct when there is <=3 digits in fraction scala> val t = "2019-03-20T09:01:03.123Z" +-+ |jsontostructs(json) | +-+ |[2019-03-20 02:01:03.123]| +-+ scala> val t = "2019-03-20T09:01:03.999Z" +-+ |jsontostructs(json) | +-+ |[2019-03-20 02:01:03.999]| +-+ {noformat} This could be related to SPARK-17914. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-27194) Job failures when task attempts do not clean up spark-staging parquet files
[ https://issues.apache.org/jira/browse/SPARK-27194?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dongjoon Hyun updated SPARK-27194: -- Affects Version/s: 2.3.3 > Job failures when task attempts do not clean up spark-staging parquet files > --- > > Key: SPARK-27194 > URL: https://issues.apache.org/jira/browse/SPARK-27194 > Project: Spark > Issue Type: Bug > Components: Spark Core, SQL >Affects Versions: 2.3.1, 2.3.2, 2.3.3 >Reporter: Reza Safi >Priority: Major > > When a container fails for some reason (for example when killed by yarn for > exceeding memory limits), the subsequent task attempts for the tasks that > were running on that container all fail with a FileAlreadyExistsException. > The original task attempt does not seem to successfully call abortTask (or at > least its "best effort" delete is unsuccessful) and clean up the parquet file > it was writing to, so when later task attempts try to write to the same > spark-staging directory using the same file name, the job fails. > Here is what transpires in the logs: > The container where task 200.0 is running is killed and the task is lost: > {code} > 19/02/20 09:33:25 ERROR cluster.YarnClusterScheduler: Lost executor y on > t.y.z.com: Container killed by YARN for exceeding memory limits. 8.1 GB of 8 > GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead. > 19/02/20 09:33:25 WARN scheduler.TaskSetManager: Lost task 200.0 in stage > 0.0 (TID xxx, t.y.z.com, executor 93): ExecutorLostFailure (executor 93 > exited caused by one of the running tasks) Reason: Container killed by YARN > for exceeding memory limits. 8.1 GB of 8 GB physical memory used. Consider > boosting spark.yarn.executor.memoryOverhead. > {code} > The task is re-attempted on a different executor and fails because the > part-00200-blah-blah.c000.snappy.parquet file from the first task attempt > already exists: > {code} > 19/02/20 09:35:01 WARN scheduler.TaskSetManager: Lost task 200.1 in stage 0.0 > (TID 594, tn.y.z.com, executor 70): org.apache.spark.SparkException: Task > failed while writing rows. > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:285) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:197) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:196) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) > at org.apache.spark.scheduler.Task.run(Task.scala:109) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: > /user/hive/warehouse/tmp_supply_feb1/.spark-staging-blah-blah-blah/dt=2019-02-17/part-00200-blah-blah.c000.snappy.parquet > for client a.b.c.d already exists > {code} > The job fails when the the configured task attempts (spark.task.maxFailures) > have failed with the same error: > {code} > org.apache.spark.SparkException: Job aborted due to stage failure: Task 200 > in stage 0.0 failed 20 times, most recent failure: Lost task 284.19 in stage > 0.0 (TID yyy, tm.y.z.com, executor 16): org.apache.spark.SparkException: Task > failed while writing rows. > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:285) > ... > Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: > /user/hive/warehouse/tmp_supply_feb1/.spark-staging-blah-blah-blah/dt=2019-02-17/part-00200-blah-blah.c000.snappy.parquet > for client i.p.a.d already exists > {code} > SPARK-26682 wasn't the root cause here, since there wasn't any stage > reattempt. > This issue seems to happen when > spark.sql.sources.partitionOverwriteMode=dynamic. > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-27194) Job failures when task attempts do not clean up spark-staging parquet files
[ https://issues.apache.org/jira/browse/SPARK-27194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16797633#comment-16797633 ] Dongjoon Hyun commented on SPARK-27194: --- Thank you for checking that, [~ajithshetty]. > Job failures when task attempts do not clean up spark-staging parquet files > --- > > Key: SPARK-27194 > URL: https://issues.apache.org/jira/browse/SPARK-27194 > Project: Spark > Issue Type: Bug > Components: Spark Core, SQL >Affects Versions: 2.3.1, 2.3.2, 2.3.3 >Reporter: Reza Safi >Priority: Major > > When a container fails for some reason (for example when killed by yarn for > exceeding memory limits), the subsequent task attempts for the tasks that > were running on that container all fail with a FileAlreadyExistsException. > The original task attempt does not seem to successfully call abortTask (or at > least its "best effort" delete is unsuccessful) and clean up the parquet file > it was writing to, so when later task attempts try to write to the same > spark-staging directory using the same file name, the job fails. > Here is what transpires in the logs: > The container where task 200.0 is running is killed and the task is lost: > {code} > 19/02/20 09:33:25 ERROR cluster.YarnClusterScheduler: Lost executor y on > t.y.z.com: Container killed by YARN for exceeding memory limits. 8.1 GB of 8 > GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead. > 19/02/20 09:33:25 WARN scheduler.TaskSetManager: Lost task 200.0 in stage > 0.0 (TID xxx, t.y.z.com, executor 93): ExecutorLostFailure (executor 93 > exited caused by one of the running tasks) Reason: Container killed by YARN > for exceeding memory limits. 8.1 GB of 8 GB physical memory used. Consider > boosting spark.yarn.executor.memoryOverhead. > {code} > The task is re-attempted on a different executor and fails because the > part-00200-blah-blah.c000.snappy.parquet file from the first task attempt > already exists: > {code} > 19/02/20 09:35:01 WARN scheduler.TaskSetManager: Lost task 200.1 in stage 0.0 > (TID 594, tn.y.z.com, executor 70): org.apache.spark.SparkException: Task > failed while writing rows. > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:285) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:197) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:196) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) > at org.apache.spark.scheduler.Task.run(Task.scala:109) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: > /user/hive/warehouse/tmp_supply_feb1/.spark-staging-blah-blah-blah/dt=2019-02-17/part-00200-blah-blah.c000.snappy.parquet > for client a.b.c.d already exists > {code} > The job fails when the the configured task attempts (spark.task.maxFailures) > have failed with the same error: > {code} > org.apache.spark.SparkException: Job aborted due to stage failure: Task 200 > in stage 0.0 failed 20 times, most recent failure: Lost task 284.19 in stage > 0.0 (TID yyy, tm.y.z.com, executor 16): org.apache.spark.SparkException: Task > failed while writing rows. > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:285) > ... > Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: > /user/hive/warehouse/tmp_supply_feb1/.spark-staging-blah-blah-blah/dt=2019-02-17/part-00200-blah-blah.c000.snappy.parquet > for client i.p.a.d already exists > {code} > SPARK-26682 wasn't the root cause here, since there wasn't any stage > reattempt. > This issue seems to happen when > spark.sql.sources.partitionOverwriteMode=dynamic. > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-27205) spark-shell with packages option fails to load transitive dependencies even ivy successfully pulls jars
[ https://issues.apache.org/jira/browse/SPARK-27205?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sean Owen reassigned SPARK-27205: - Assignee: Jungtaek Lim > spark-shell with packages option fails to load transitive dependencies even > ivy successfully pulls jars > --- > > Key: SPARK-27205 > URL: https://issues.apache.org/jira/browse/SPARK-27205 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.0.0 >Reporter: Jungtaek Lim >Assignee: Jungtaek Lim >Priority: Major > > I found this bug while testing my patch regarding Spark SQL Kafka module - I > tend to open spark-shell and link kafka module via `–packages`. > When we run > {code:java} > ./bin/spark-shell --packages > org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0{code} > we should be able to import "org.apache.kafka" in spark-shell, but it doesn't > work for current master branch. > There's not enough evidence as well as I have no idea what's happening here > even with `–verbose` option, so I had to spend couple of hours dealing with > git bisect. > Turned out the commit introducing the bug was SPARK-26977 > ([81dd21fda99da48ed76adb739a07d1dabf1ffb51|https://github.com/apache/spark/commit/81dd21fda99da48ed76adb739a07d1dabf1ffb51]). > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-27205) spark-shell with packages option fails to load transitive dependencies even ivy successfully pulls jars
[ https://issues.apache.org/jira/browse/SPARK-27205?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sean Owen resolved SPARK-27205. --- Resolution: Fixed Fix Version/s: 3.0.0 Issue resolved by pull request 24147 [https://github.com/apache/spark/pull/24147] > spark-shell with packages option fails to load transitive dependencies even > ivy successfully pulls jars > --- > > Key: SPARK-27205 > URL: https://issues.apache.org/jira/browse/SPARK-27205 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.0.0 >Reporter: Jungtaek Lim >Assignee: Jungtaek Lim >Priority: Major > Fix For: 3.0.0 > > > I found this bug while testing my patch regarding Spark SQL Kafka module - I > tend to open spark-shell and link kafka module via `–packages`. > When we run > {code:java} > ./bin/spark-shell --packages > org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0{code} > we should be able to import "org.apache.kafka" in spark-shell, but it doesn't > work for current master branch. > There's not enough evidence as well as I have no idea what's happening here > even with `–verbose` option, so I had to spend couple of hours dealing with > git bisect. > Turned out the commit introducing the bug was SPARK-26977 > ([81dd21fda99da48ed76adb739a07d1dabf1ffb51|https://github.com/apache/spark/commit/81dd21fda99da48ed76adb739a07d1dabf1ffb51]). > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-27220) Remove Yarn specific leftover from CoarseGrainedSchedulerBackend
[ https://issues.apache.org/jira/browse/SPARK-27220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16797630#comment-16797630 ] Dongjoon Hyun commented on SPARK-27220: --- +1 for [~srowen]'s advice~ > Remove Yarn specific leftover from CoarseGrainedSchedulerBackend > > > Key: SPARK-27220 > URL: https://issues.apache.org/jira/browse/SPARK-27220 > Project: Spark > Issue Type: Task > Components: Spark Core, YARN >Affects Versions: 2.0.2, 2.1.3, 2.2.3, 2.3.3, 2.4.0 >Reporter: Jacek Lewandowski >Priority: Minor > > {{CoarseGrainedSchedulerBackend}} has the following field: > {code:scala} > // The num of current max ExecutorId used to re-register appMaster > @volatile protected var currentExecutorIdCounter = 0 > {code} > which is then updated: > {code:scala} > case RegisterExecutor(executorId, executorRef, hostname, cores, > logUrls) => > ... > // This must be synchronized because variables mutated > // in this block are read when requesting executors > CoarseGrainedSchedulerBackend.this.synchronized { > executorDataMap.put(executorId, data) > if (currentExecutorIdCounter < executorId.toInt) { > currentExecutorIdCounter = executorId.toInt > } > ... > {code} > However it is never really used in {{CoarseGrainedSchedulerBackend}}. Its > only usage is in Yarn-specific code. It should be moved to Yarn then because > {{executorId}} is a {{String}} and there are really no guarantees that it is > always an integer. It was introduced in SPARK-12864 -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-26729) Spark on Kubernetes tooling hardcodes default image names
[ https://issues.apache.org/jira/browse/SPARK-26729?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Marcelo Vanzin resolved SPARK-26729. Resolution: Fixed Fix Version/s: 3.0.0 Issue resolved by pull request 23846 [https://github.com/apache/spark/pull/23846] > Spark on Kubernetes tooling hardcodes default image names > - > > Key: SPARK-26729 > URL: https://issues.apache.org/jira/browse/SPARK-26729 > Project: Spark > Issue Type: Improvement > Components: Kubernetes >Affects Versions: 2.4.0 >Reporter: Rob Vesse >Assignee: Rob Vesse >Priority: Major > Fix For: 3.0.0 > > > Both when creating images with {{bin/docker-image-tool.sh}} and when running > the Kubernetes integration tests the image names are hardcoded to {{spark}}, > {{spark-py}} and {{spark-r}}. > If you are producing custom images in some other way (e.g. a CI/CD process > that doesn't use the script) or are required to use a different naming > convention due to company policy e.g. prefixing with vendor name (e.g. > {{apache-spark}}) then you can't directly create/test your images with the > desired names. > You can of course simply re-tag the images with the desired names but this > might not be possible in some CI/CD pipelines especially if naming > conventions are being enforced at the registry level. > It would be nice if the default image names were customisable -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-26729) Spark on Kubernetes tooling hardcodes default image names
[ https://issues.apache.org/jira/browse/SPARK-26729?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Marcelo Vanzin reassigned SPARK-26729: -- Assignee: Rob Vesse > Spark on Kubernetes tooling hardcodes default image names > - > > Key: SPARK-26729 > URL: https://issues.apache.org/jira/browse/SPARK-26729 > Project: Spark > Issue Type: Improvement > Components: Kubernetes >Affects Versions: 2.4.0 >Reporter: Rob Vesse >Assignee: Rob Vesse >Priority: Major > > Both when creating images with {{bin/docker-image-tool.sh}} and when running > the Kubernetes integration tests the image names are hardcoded to {{spark}}, > {{spark-py}} and {{spark-r}}. > If you are producing custom images in some other way (e.g. a CI/CD process > that doesn't use the script) or are required to use a different naming > convention due to company policy e.g. prefixing with vendor name (e.g. > {{apache-spark}}) then you can't directly create/test your images with the > desired names. > You can of course simply re-tag the images with the desired names but this > might not be possible in some CI/CD pipelines especially if naming > conventions are being enforced at the registry level. > It would be nice if the default image names were customisable -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-27215) Correct the kryo configurations
[ https://issues.apache.org/jira/browse/SPARK-27215?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Marcelo Vanzin resolved SPARK-27215. Resolution: Fixed Fix Version/s: 3.0.0 Issue resolved by pull request 24156 [https://github.com/apache/spark/pull/24156] > Correct the kryo configurations > --- > > Key: SPARK-27215 > URL: https://issues.apache.org/jira/browse/SPARK-27215 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.0.0 >Reporter: Lantao Jin >Assignee: Lantao Jin >Priority: Major > Fix For: 3.0.0 > > > {code} > val KRYO_USE_UNSAFE = ConfigBuilder("spark.kyro.unsafe") > .booleanConf > .createWithDefault(false) > val KRYO_USE_POOL = ConfigBuilder("spark.kyro.pool") > .booleanConf > .createWithDefault(true) > {code} > kyro should be kryo -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-27215) Correct the kryo configurations
[ https://issues.apache.org/jira/browse/SPARK-27215?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Marcelo Vanzin reassigned SPARK-27215: -- Assignee: Lantao Jin > Correct the kryo configurations > --- > > Key: SPARK-27215 > URL: https://issues.apache.org/jira/browse/SPARK-27215 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.0.0 >Reporter: Lantao Jin >Assignee: Lantao Jin >Priority: Major > > {code} > val KRYO_USE_UNSAFE = ConfigBuilder("spark.kyro.unsafe") > .booleanConf > .createWithDefault(false) > val KRYO_USE_POOL = ConfigBuilder("spark.kyro.pool") > .booleanConf > .createWithDefault(true) > {code} > kyro should be kryo -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-27223) Remove private methods that skip conversion when passing user schemas for constructing a DataFrame
[ https://issues.apache.org/jira/browse/SPARK-27223?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maryann Xue updated SPARK-27223: Summary: Remove private methods that skip conversion when passing user schemas for constructing a DataFrame (was: Remove private methods that allow no conversion when passing user schemas for constructing a DataFrame) > Remove private methods that skip conversion when passing user schemas for > constructing a DataFrame > -- > > Key: SPARK-27223 > URL: https://issues.apache.org/jira/browse/SPARK-27223 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.0 >Reporter: Maryann Xue >Priority: Minor > > When passing in a user schema to create a DataFrame, there might be > mismatched nullability between the user schema and the the actual data. All > related public interfaces now perform catalyst conversion using the user > provided schema, which catches such mismatches to avoid runtime errors later > on. However, there're private methods which allow this conversion to be > skipped, so we need to remove these private methods which may lead to > confusion and potential issues. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-27223) Remove private methods that allow no conversion when passing user schemas for constructing a DataFrame
Maryann Xue created SPARK-27223: --- Summary: Remove private methods that allow no conversion when passing user schemas for constructing a DataFrame Key: SPARK-27223 URL: https://issues.apache.org/jira/browse/SPARK-27223 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.4.0 Reporter: Maryann Xue When passing in a user schema to create a DataFrame, there might be mismatched nullability between the user schema and the the actual data. All related public interfaces now perform catalyst conversion using the user provided schema, which catches such mismatches to avoid runtime errors later on. However, there're private methods which allow this conversion to be skipped, so we need to remove these private methods which may lead to confusion and potential issues. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-27222) Support Instant and LocalDate in Literal.apply
Maxim Gekk created SPARK-27222: -- Summary: Support Instant and LocalDate in Literal.apply Key: SPARK-27222 URL: https://issues.apache.org/jira/browse/SPARK-27222 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 2.4.0 Reporter: Maxim Gekk SPARK-26902 and SPARK-27008 supported java.time.Instant and java.time.LocalDate as external types for TimestampType and DateType. The ticket aims to support literals of such types. In particular, need to extend Literal.apply by adding new cases for java.time.Instant/LocalDate. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-27160) Incorrect Literal Casting of DecimalType in OrcFilters
[ https://issues.apache.org/jira/browse/SPARK-27160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dongjoon Hyun resolved SPARK-27160. --- Resolution: Fixed Assignee: Darcy Shen Fix Version/s: 3.0.0 > Incorrect Literal Casting of DecimalType in OrcFilters > -- > > Key: SPARK-27160 > URL: https://issues.apache.org/jira/browse/SPARK-27160 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.0 >Reporter: Darcy Shen >Assignee: Darcy Shen >Priority: Major > Labels: correctness > Fix For: 3.0.0 > > > DecimalType Literal should not be casted to Long. > eg. For `df.filter("x < 3.14")`, assuming df (x in DecimalType) reads from a > ORC table and uses the native ORC reader with predicate push down enabled, we > will push down the `x < 3.14` predicate to the ORC reader via a > SearchArgument. > OrcFilters will construct the SearchArgument, but not handle the DecimalType > correctly. > The previous impl will construct `x < 3` from `x < 3.14`. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-27220) Remove Yarn specific leftover from CoarseGrainedSchedulerBackend
[ https://issues.apache.org/jira/browse/SPARK-27220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16797460#comment-16797460 ] Sean Owen commented on SPARK-27220: --- Probably [~irashid] would know better > Remove Yarn specific leftover from CoarseGrainedSchedulerBackend > > > Key: SPARK-27220 > URL: https://issues.apache.org/jira/browse/SPARK-27220 > Project: Spark > Issue Type: Task > Components: Spark Core, YARN >Affects Versions: 2.0.2, 2.1.3, 2.2.3, 2.3.3, 2.4.0 >Reporter: Jacek Lewandowski >Priority: Minor > > {{CoarseGrainedSchedulerBackend}} has the following field: > {code:scala} > // The num of current max ExecutorId used to re-register appMaster > @volatile protected var currentExecutorIdCounter = 0 > {code} > which is then updated: > {code:scala} > case RegisterExecutor(executorId, executorRef, hostname, cores, > logUrls) => > ... > // This must be synchronized because variables mutated > // in this block are read when requesting executors > CoarseGrainedSchedulerBackend.this.synchronized { > executorDataMap.put(executorId, data) > if (currentExecutorIdCounter < executorId.toInt) { > currentExecutorIdCounter = executorId.toInt > } > ... > {code} > However it is never really used in {{CoarseGrainedSchedulerBackend}}. Its > only usage is in Yarn-specific code. It should be moved to Yarn then because > {{executorId}} is a {{String}} and there are really no guarantees that it is > always an integer. It was introduced in SPARK-12864 -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-27094) Thread interrupt being swallowed while launching executors in YarnAllocator
[ https://issues.apache.org/jira/browse/SPARK-27094?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Marcelo Vanzin resolved SPARK-27094. Resolution: Fixed Fix Version/s: 3.0.0 Issue resolved by pull request 24017 [https://github.com/apache/spark/pull/24017] > Thread interrupt being swallowed while launching executors in YarnAllocator > --- > > Key: SPARK-27094 > URL: https://issues.apache.org/jira/browse/SPARK-27094 > Project: Spark > Issue Type: Bug > Components: YARN >Affects Versions: 2.4.0 >Reporter: Marcelo Vanzin >Assignee: Marcelo Vanzin >Priority: Minor > Fix For: 3.0.0 > > > When shutting down a SparkContext, the YarnAllocator thread is interrupted. > If the interrupt happens just at the wrong time, you'll see something like > this: > {noformat} > 19/03/05 07:04:20 WARN ScriptBasedMapping: Exception running blah > java.io.IOException: java.lang.InterruptedException > at org.apache.hadoop.util.Shell.runCommand(Shell.java:578) > at org.apache.hadoop.util.Shell.run(Shell.java:478) > at > org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:766) > at > org.apache.hadoop.net.ScriptBasedMapping$RawScriptBasedMapping.runResolveCommand(ScriptBasedMapping.java:251) > at > org.apache.hadoop.net.ScriptBasedMapping$RawScriptBasedMapping.resolve(ScriptBasedMapping.java:188) > at > org.apache.hadoop.net.CachedDNSToSwitchMapping.resolve(CachedDNSToSwitchMapping.java:119) > at > org.apache.hadoop.yarn.util.RackResolver.coreResolve(RackResolver.java:101) > at > org.apache.hadoop.yarn.util.RackResolver.resolve(RackResolver.java:81) > at > org.apache.spark.deploy.yarn.SparkRackResolver.resolve(SparkRackResolver.scala:37) > at > org.apache.spark.deploy.yarn.YarnAllocator$$anonfun$handleAllocatedContainers$2.apply(YarnAllocator.scala:431) > at > org.apache.spark.deploy.yarn.YarnAllocator$$anonfun$handleAllocatedContainers$2.apply(YarnAllocator.scala:430) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at > org.apache.spark.deploy.yarn.YarnAllocator.handleAllocatedContainers(YarnAllocator.scala:430) > at > org.apache.spark.deploy.yarn.YarnAllocator.allocateResources(YarnAllocator.scala:281) > at > org.apache.spark.deploy.yarn.ApplicationMaster$$anon$3.run(ApplicationMaster.scala:556) > {noformat} > That means the YARN code being called ({{RackResolver}}) is swallowing the > interrupt , so the Spark allocator thread never exits. In this particular > app, the allocator was in the middle of allocating a very large number of > executors, so it seemed like the application was hung, and there were a lot > of executor coming up even though the context was being shut down. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-27094) Thread interrupt being swallowed while launching executors in YarnAllocator
[ https://issues.apache.org/jira/browse/SPARK-27094?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Marcelo Vanzin reassigned SPARK-27094: -- Assignee: Marcelo Vanzin > Thread interrupt being swallowed while launching executors in YarnAllocator > --- > > Key: SPARK-27094 > URL: https://issues.apache.org/jira/browse/SPARK-27094 > Project: Spark > Issue Type: Bug > Components: YARN >Affects Versions: 2.4.0 >Reporter: Marcelo Vanzin >Assignee: Marcelo Vanzin >Priority: Minor > > When shutting down a SparkContext, the YarnAllocator thread is interrupted. > If the interrupt happens just at the wrong time, you'll see something like > this: > {noformat} > 19/03/05 07:04:20 WARN ScriptBasedMapping: Exception running blah > java.io.IOException: java.lang.InterruptedException > at org.apache.hadoop.util.Shell.runCommand(Shell.java:578) > at org.apache.hadoop.util.Shell.run(Shell.java:478) > at > org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:766) > at > org.apache.hadoop.net.ScriptBasedMapping$RawScriptBasedMapping.runResolveCommand(ScriptBasedMapping.java:251) > at > org.apache.hadoop.net.ScriptBasedMapping$RawScriptBasedMapping.resolve(ScriptBasedMapping.java:188) > at > org.apache.hadoop.net.CachedDNSToSwitchMapping.resolve(CachedDNSToSwitchMapping.java:119) > at > org.apache.hadoop.yarn.util.RackResolver.coreResolve(RackResolver.java:101) > at > org.apache.hadoop.yarn.util.RackResolver.resolve(RackResolver.java:81) > at > org.apache.spark.deploy.yarn.SparkRackResolver.resolve(SparkRackResolver.scala:37) > at > org.apache.spark.deploy.yarn.YarnAllocator$$anonfun$handleAllocatedContainers$2.apply(YarnAllocator.scala:431) > at > org.apache.spark.deploy.yarn.YarnAllocator$$anonfun$handleAllocatedContainers$2.apply(YarnAllocator.scala:430) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at > org.apache.spark.deploy.yarn.YarnAllocator.handleAllocatedContainers(YarnAllocator.scala:430) > at > org.apache.spark.deploy.yarn.YarnAllocator.allocateResources(YarnAllocator.scala:281) > at > org.apache.spark.deploy.yarn.ApplicationMaster$$anon$3.run(ApplicationMaster.scala:556) > {noformat} > That means the YARN code being called ({{RackResolver}}) is swallowing the > interrupt , so the Spark allocator thread never exits. In this particular > app, the allocator was in the middle of allocating a very large number of > executors, so it seemed like the application was hung, and there were a lot > of executor coming up even though the context was being shut down. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-27219) Misleading exceptions in transport code's SASL fallback path
[ https://issues.apache.org/jira/browse/SPARK-27219?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16797435#comment-16797435 ] Ajith S commented on SPARK-27219: - So do we just log a simple warn with one line message and print the stack in a finer(DEBUG, TRACE) log level.? > Misleading exceptions in transport code's SASL fallback path > > > Key: SPARK-27219 > URL: https://issues.apache.org/jira/browse/SPARK-27219 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.4.0 >Reporter: Marcelo Vanzin >Priority: Minor > > There are a couple of code paths in the SASL fallback handling that result in > misleading exceptions printed to logs. One of them is if a timeout occurs > during authentication; for example: > {noformat} > 19/03/15 11:21:37 WARN crypto.AuthClientBootstrap: New auth protocol failed, > trying SASL. > java.lang.RuntimeException: java.util.concurrent.TimeoutException: Timeout > waiting for task. > at > org.spark_project.guava.base.Throwables.propagate(Throwables.java:160) > at > org.apache.spark.network.client.TransportClient.sendRpcSync(TransportClient.java:258) > at > org.apache.spark.network.crypto.AuthClientBootstrap.doSparkAuth(AuthClientBootstrap.java:105) > at > org.apache.spark.network.crypto.AuthClientBootstrap.doBootstrap(AuthClientBootstrap.java:79) > at > org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:262) > at > org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:192) > at > org.apache.spark.network.shuffle.ExternalShuffleClient.lambda$fetchBlocks$0(ExternalShuffleClient.java:100) > at > org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:141) > ... > Caused by: java.util.concurrent.TimeoutException: Timeout waiting for task. > at > org.spark_project.guava.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:276) > at > org.spark_project.guava.util.concurrent.AbstractFuture.get(AbstractFuture.java:96) > at > org.apache.spark.network.client.TransportClient.sendRpcSync(TransportClient.java:254) > ... 38 more > 19/03/15 11:21:38 WARN server.TransportChannelHandler: Exception in > connection from vc1033.halxg.cloudera.com/10.17.216.43:7337 > java.lang.IllegalArgumentException: Frame length should be positive: > -3702202170875367528 > at > org.spark_project.guava.base.Preconditions.checkArgument(Preconditions.java:119) > {noformat} > The IllegalArgumentException shouldn't happen, it only happens because the > code is ignoring the time out and retrying, at which point the remote side is > in a different state and thus doesn't expect the message. > The same line that prints that exception can result in a noisy log message > when the remote side (e.g. an old shuffle service) does not understand the > new auth protocol. Since it's a warning it seems like something is wrong, > when it's just doing what's expected. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-27220) Remove Yarn specific leftover from CoarseGrainedSchedulerBackend
[ https://issues.apache.org/jira/browse/SPARK-27220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16797428#comment-16797428 ] Ajith S commented on SPARK-27220: - # About making *currentExecutorIdCounter* datatype consistent, Yes, *currentExecutorIdCounter* is int initially in *CoarseGrainedSchedulerBackend*, but when it expects *RegisterExecutor* it expects String which makes it confusing. ** Also *CoarseGrainedExecutorBackend* fires *RegisterExecutor* incase of yarn,mesos with executorId as String # About moving out *currentExecutorIdCounter* from *CoarseGrainedSchedulerBackend,* this i am unsure as *CoarseGrainedSchedulerBackend* is just offering a mechanism to maintain executor ids which yarn is just reusing (But i see mesos ignores it completely and instead uses mesosTaskId, so makes sense of moving *currentExecutorIdCounter* out to yarn) cc [~srowen] [~dongjoon] [~hyukjin.kwon] any thoughts.? > Remove Yarn specific leftover from CoarseGrainedSchedulerBackend > > > Key: SPARK-27220 > URL: https://issues.apache.org/jira/browse/SPARK-27220 > Project: Spark > Issue Type: Task > Components: Spark Core, YARN >Affects Versions: 2.0.2, 2.1.3, 2.2.3, 2.3.3, 2.4.0 >Reporter: Jacek Lewandowski >Priority: Minor > > {{CoarseGrainedSchedulerBackend}} has the following field: > {code:scala} > // The num of current max ExecutorId used to re-register appMaster > @volatile protected var currentExecutorIdCounter = 0 > {code} > which is then updated: > {code:scala} > case RegisterExecutor(executorId, executorRef, hostname, cores, > logUrls) => > ... > // This must be synchronized because variables mutated > // in this block are read when requesting executors > CoarseGrainedSchedulerBackend.this.synchronized { > executorDataMap.put(executorId, data) > if (currentExecutorIdCounter < executorId.toInt) { > currentExecutorIdCounter = executorId.toInt > } > ... > {code} > However it is never really used in {{CoarseGrainedSchedulerBackend}}. Its > only usage is in Yarn-specific code. It should be moved to Yarn then because > {{executorId}} is a {{String}} and there are really no guarantees that it is > always an integer. It was introduced in SPARK-12864 -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-27221) Improve the assert error message in TreeNode.parseToJson
[ https://issues.apache.org/jira/browse/SPARK-27221?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu updated SPARK-27221: - Summary: Improve the assert error message in TreeNode.parseToJson (was: Improve the assert error message in TreeNode) > Improve the assert error message in TreeNode.parseToJson > > > Key: SPARK-27221 > URL: https://issues.apache.org/jira/browse/SPARK-27221 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0 >Reporter: Shixiong Zhu >Assignee: Shixiong Zhu >Priority: Minor > > When TreeNode.parseToJson may throw an assert error without any error message > when a TreeNode is not implemented properly, and it's hard to find the bad > TreeNode implementation. > It's better to improve the error message to indicate the type of TreeNode. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-27221) Improve the assert error message in TreeNode
[ https://issues.apache.org/jira/browse/SPARK-27221?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu updated SPARK-27221: - Description: When TreeNode.parseToJson may throw an assert error without any error message when a TreeNode is not implemented properly, and it's hard to find the bad TreeNode implementation. It's better to improve the error message to indicate the type of TreeNode. > Improve the assert error message in TreeNode > > > Key: SPARK-27221 > URL: https://issues.apache.org/jira/browse/SPARK-27221 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0 >Reporter: Shixiong Zhu >Assignee: Shixiong Zhu >Priority: Minor > > When TreeNode.parseToJson may throw an assert error without any error message > when a TreeNode is not implemented properly, and it's hard to find the bad > TreeNode implementation. > It's better to improve the error message to indicate the type of TreeNode. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-27221) Improve the assert error message in TreeNode
Shixiong Zhu created SPARK-27221: Summary: Improve the assert error message in TreeNode Key: SPARK-27221 URL: https://issues.apache.org/jira/browse/SPARK-27221 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 2.4.0 Reporter: Shixiong Zhu Assignee: Shixiong Zhu -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-27194) Job failures when task attempts do not clean up spark-staging parquet files
[ https://issues.apache.org/jira/browse/SPARK-27194?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Reza Safi updated SPARK-27194: -- Issue Type: Bug (was: Improvement) > Job failures when task attempts do not clean up spark-staging parquet files > --- > > Key: SPARK-27194 > URL: https://issues.apache.org/jira/browse/SPARK-27194 > Project: Spark > Issue Type: Bug > Components: Spark Core, SQL >Affects Versions: 2.3.1, 2.3.2 >Reporter: Reza Safi >Priority: Major > > When a container fails for some reason (for example when killed by yarn for > exceeding memory limits), the subsequent task attempts for the tasks that > were running on that container all fail with a FileAlreadyExistsException. > The original task attempt does not seem to successfully call abortTask (or at > least its "best effort" delete is unsuccessful) and clean up the parquet file > it was writing to, so when later task attempts try to write to the same > spark-staging directory using the same file name, the job fails. > Here is what transpires in the logs: > The container where task 200.0 is running is killed and the task is lost: > {code} > 19/02/20 09:33:25 ERROR cluster.YarnClusterScheduler: Lost executor y on > t.y.z.com: Container killed by YARN for exceeding memory limits. 8.1 GB of 8 > GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead. > 19/02/20 09:33:25 WARN scheduler.TaskSetManager: Lost task 200.0 in stage > 0.0 (TID xxx, t.y.z.com, executor 93): ExecutorLostFailure (executor 93 > exited caused by one of the running tasks) Reason: Container killed by YARN > for exceeding memory limits. 8.1 GB of 8 GB physical memory used. Consider > boosting spark.yarn.executor.memoryOverhead. > {code} > The task is re-attempted on a different executor and fails because the > part-00200-blah-blah.c000.snappy.parquet file from the first task attempt > already exists: > {code} > 19/02/20 09:35:01 WARN scheduler.TaskSetManager: Lost task 200.1 in stage 0.0 > (TID 594, tn.y.z.com, executor 70): org.apache.spark.SparkException: Task > failed while writing rows. > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:285) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:197) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:196) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) > at org.apache.spark.scheduler.Task.run(Task.scala:109) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: > /user/hive/warehouse/tmp_supply_feb1/.spark-staging-blah-blah-blah/dt=2019-02-17/part-00200-blah-blah.c000.snappy.parquet > for client a.b.c.d already exists > {code} > The job fails when the the configured task attempts (spark.task.maxFailures) > have failed with the same error: > {code} > org.apache.spark.SparkException: Job aborted due to stage failure: Task 200 > in stage 0.0 failed 20 times, most recent failure: Lost task 284.19 in stage > 0.0 (TID yyy, tm.y.z.com, executor 16): org.apache.spark.SparkException: Task > failed while writing rows. > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:285) > ... > Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: > /user/hive/warehouse/tmp_supply_feb1/.spark-staging-blah-blah-blah/dt=2019-02-17/part-00200-blah-blah.c000.snappy.parquet > for client i.p.a.d already exists > {code} > SPARK-26682 wasn't the root cause here, since there wasn't any stage > reattempt. > This issue seems to happen when > spark.sql.sources.partitionOverwriteMode=dynamic. > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-27112) Spark Scheduler encounters two independent Deadlocks when trying to kill executors either due to dynamic allocation or blacklisting
[ https://issues.apache.org/jira/browse/SPARK-27112?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16797321#comment-16797321 ] Imran Rashid commented on SPARK-27112: -- [~Dhruve Ashar] -- rc8 is already defined, there is nothing I (or anybody else) can do to change that. I simply updated the jira to reflect that. However, you might request that rc8 does not become 2.4.1, and instead we roll an rc9 with this this fix. You should respond to the VOTE thread for rc8 on the dev list with your concerns, that's the right forum for this (thanks for the reminder btw, I will mention it there as well). > Spark Scheduler encounters two independent Deadlocks when trying to kill > executors either due to dynamic allocation or blacklisting > > > Key: SPARK-27112 > URL: https://issues.apache.org/jira/browse/SPARK-27112 > Project: Spark > Issue Type: Bug > Components: Scheduler, Spark Core >Affects Versions: 2.4.0, 3.0.0 >Reporter: Parth Gandhi >Assignee: Parth Gandhi >Priority: Major > Fix For: 2.3.4, 2.4.2, 3.0.0 > > Attachments: Screen Shot 2019-02-26 at 4.10.26 PM.png, Screen Shot > 2019-02-26 at 4.10.48 PM.png, Screen Shot 2019-02-26 at 4.11.11 PM.png, > Screen Shot 2019-02-26 at 4.11.26 PM.png > > > Recently, a few spark users in the organization have reported that their jobs > were getting stuck. On further analysis, it was found out that there exist > two independent deadlocks and either of them occur under different > circumstances. The screenshots for these two deadlocks are attached here. > We were able to reproduce the deadlocks with the following piece of code: > > {code:java} > import org.apache.hadoop.conf.Configuration > import org.apache.hadoop.fs.{FileSystem, Path} > import org.apache.spark._ > import org.apache.spark.TaskContext > // Simple example of Word Count in Scala > object ScalaWordCount { > def main(args: Array[String]) { > if (args.length < 2) { > System.err.println("Usage: ScalaWordCount ") > System.exit(1) > } > val conf = new SparkConf().setAppName("Scala Word Count") > val sc = new SparkContext(conf) > // get the input file uri > val inputFilesUri = args(0) > // get the output file uri > val outputFilesUri = args(1) > while (true) { > val textFile = sc.textFile(inputFilesUri) > val counts = textFile.flatMap(line => line.split(" ")) > .map(word => {if (TaskContext.get.partitionId == 5 && > TaskContext.get.attemptNumber == 0) throw new Exception("Fail for > blacklisting") else (word, 1)}) > .reduceByKey(_ + _) > counts.saveAsTextFile(outputFilesUri) > val conf: Configuration = new Configuration() > val path: Path = new Path(outputFilesUri) > val hdfs: FileSystem = FileSystem.get(conf) > hdfs.delete(path, true) > } > sc.stop() > } > } > {code} > > Additionally, to ensure that the deadlock surfaces up soon enough, I also > added a small delay in the Spark code here: > [https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala#L256] > > {code:java} > executorIdToFailureList.remove(exec) > updateNextExpiryTime() > Thread.sleep(2000) > killBlacklistedExecutor(exec) > {code} > > Also make sure that the following configs are set when launching the above > spark job: > *spark.blacklist.enabled=true* > *spark.blacklist.killBlacklistedExecutors=true* > *spark.blacklist.application.maxFailedTasksPerExecutor=1* -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-27220) Remove Yarn specific leftover from CoarseGrainedSchedulerBackend
[ https://issues.apache.org/jira/browse/SPARK-27220?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jacek Lewandowski updated SPARK-27220: -- Description: {{CoarseGrainedSchedulerBackend}} has the following field: {code:scala} // The num of current max ExecutorId used to re-register appMaster @volatile protected var currentExecutorIdCounter = 0 {code} which is then updated: {code:scala} case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls) => ... // This must be synchronized because variables mutated // in this block are read when requesting executors CoarseGrainedSchedulerBackend.this.synchronized { executorDataMap.put(executorId, data) if (currentExecutorIdCounter < executorId.toInt) { currentExecutorIdCounter = executorId.toInt } ... {code} However it is never really used in {{CoarseGrainedSchedulerBackend}}. Its only usage is in Yarn-specific code. It should be moved to Yarn then because {{executorId}} is a {{String}} and there are really no guarantees that it is always an integer. It was introduced in SPARK-12864 was: {{CoarseGrainedSchedulerBackend}} has the following field: {code:scala} // The num of current max ExecutorId used to re-register appMaster @volatile protected var currentExecutorIdCounter = 0 {code} which is then updated: {code:scala} case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls) => ... // This must be synchronized because variables mutated // in this block are read when requesting executors CoarseGrainedSchedulerBackend.this.synchronized { executorDataMap.put(executorId, data) if (currentExecutorIdCounter < executorId.toInt) { currentExecutorIdCounter = executorId.toInt } ... {code} However it is never really used in {{CoarseGrainedSchedulerBackend}}. Its only usage is in Yarn-specific code. It should be moved to Yarn then because {{executorId}} is a {{String}} and there are really no guarantees that it is always an integer. > Remove Yarn specific leftover from CoarseGrainedSchedulerBackend > > > Key: SPARK-27220 > URL: https://issues.apache.org/jira/browse/SPARK-27220 > Project: Spark > Issue Type: Task > Components: Spark Core, YARN >Affects Versions: 2.0.2, 2.1.3, 2.2.3, 2.3.3, 2.4.0 >Reporter: Jacek Lewandowski >Priority: Minor > > {{CoarseGrainedSchedulerBackend}} has the following field: > {code:scala} > // The num of current max ExecutorId used to re-register appMaster > @volatile protected var currentExecutorIdCounter = 0 > {code} > which is then updated: > {code:scala} > case RegisterExecutor(executorId, executorRef, hostname, cores, > logUrls) => > ... > // This must be synchronized because variables mutated > // in this block are read when requesting executors > CoarseGrainedSchedulerBackend.this.synchronized { > executorDataMap.put(executorId, data) > if (currentExecutorIdCounter < executorId.toInt) { > currentExecutorIdCounter = executorId.toInt > } > ... > {code} > However it is never really used in {{CoarseGrainedSchedulerBackend}}. Its > only usage is in Yarn-specific code. It should be moved to Yarn then because > {{executorId}} is a {{String}} and there are really no guarantees that it is > always an integer. It was introduced in SPARK-12864 -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-27220) Remove Yarn specific leftover from CoarseGrainedSchedulerBackend
Jacek Lewandowski created SPARK-27220: - Summary: Remove Yarn specific leftover from CoarseGrainedSchedulerBackend Key: SPARK-27220 URL: https://issues.apache.org/jira/browse/SPARK-27220 Project: Spark Issue Type: Task Components: Spark Core, YARN Affects Versions: 2.4.0, 2.3.3, 2.2.3, 2.1.3, 2.0.2 Reporter: Jacek Lewandowski {{CoarseGrainedSchedulerBackend}} has the following field: {code:scala} // The num of current max ExecutorId used to re-register appMaster @volatile protected var currentExecutorIdCounter = 0 {code} which is then updated: {code:scala} case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls) => ... // This must be synchronized because variables mutated // in this block are read when requesting executors CoarseGrainedSchedulerBackend.this.synchronized { executorDataMap.put(executorId, data) if (currentExecutorIdCounter < executorId.toInt) { currentExecutorIdCounter = executorId.toInt } ... {code} However it is never really used in {{CoarseGrainedSchedulerBackend}}. Its only usage is in Yarn-specific code. It should be moved to Yarn then because {{executorId}} is a {{String}} and there are really no guarantees that it is always an integer. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-27219) Misleading exceptions in transport code's SASL fallback path
Marcelo Vanzin created SPARK-27219: -- Summary: Misleading exceptions in transport code's SASL fallback path Key: SPARK-27219 URL: https://issues.apache.org/jira/browse/SPARK-27219 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 2.4.0 Reporter: Marcelo Vanzin There are a couple of code paths in the SASL fallback handling that result in misleading exceptions printed to logs. One of them is if a timeout occurs during authentication; for example: {noformat} 19/03/15 11:21:37 WARN crypto.AuthClientBootstrap: New auth protocol failed, trying SASL. java.lang.RuntimeException: java.util.concurrent.TimeoutException: Timeout waiting for task. at org.spark_project.guava.base.Throwables.propagate(Throwables.java:160) at org.apache.spark.network.client.TransportClient.sendRpcSync(TransportClient.java:258) at org.apache.spark.network.crypto.AuthClientBootstrap.doSparkAuth(AuthClientBootstrap.java:105) at org.apache.spark.network.crypto.AuthClientBootstrap.doBootstrap(AuthClientBootstrap.java:79) at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:262) at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:192) at org.apache.spark.network.shuffle.ExternalShuffleClient.lambda$fetchBlocks$0(ExternalShuffleClient.java:100) at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:141) ... Caused by: java.util.concurrent.TimeoutException: Timeout waiting for task. at org.spark_project.guava.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:276) at org.spark_project.guava.util.concurrent.AbstractFuture.get(AbstractFuture.java:96) at org.apache.spark.network.client.TransportClient.sendRpcSync(TransportClient.java:254) ... 38 more 19/03/15 11:21:38 WARN server.TransportChannelHandler: Exception in connection from vc1033.halxg.cloudera.com/10.17.216.43:7337 java.lang.IllegalArgumentException: Frame length should be positive: -3702202170875367528 at org.spark_project.guava.base.Preconditions.checkArgument(Preconditions.java:119) {noformat} The IllegalArgumentException shouldn't happen, it only happens because the code is ignoring the time out and retrying, at which point the remote side is in a different state and thus doesn't expect the message. The same line that prints that exception can result in a noisy log message when the remote side (e.g. an old shuffle service) does not understand the new auth protocol. Since it's a warning it seems like something is wrong, when it's just doing what's expected. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-27112) Spark Scheduler encounters two independent Deadlocks when trying to kill executors either due to dynamic allocation or blacklisting
[ https://issues.apache.org/jira/browse/SPARK-27112?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16797253#comment-16797253 ] Dhruve Ashar commented on SPARK-27112: -- [~irashid] - I think this is a critical bug and since it is resolved we should include it in the rc8. > Spark Scheduler encounters two independent Deadlocks when trying to kill > executors either due to dynamic allocation or blacklisting > > > Key: SPARK-27112 > URL: https://issues.apache.org/jira/browse/SPARK-27112 > Project: Spark > Issue Type: Bug > Components: Scheduler, Spark Core >Affects Versions: 2.4.0, 3.0.0 >Reporter: Parth Gandhi >Assignee: Parth Gandhi >Priority: Major > Fix For: 2.3.4, 2.4.2, 3.0.0 > > Attachments: Screen Shot 2019-02-26 at 4.10.26 PM.png, Screen Shot > 2019-02-26 at 4.10.48 PM.png, Screen Shot 2019-02-26 at 4.11.11 PM.png, > Screen Shot 2019-02-26 at 4.11.26 PM.png > > > Recently, a few spark users in the organization have reported that their jobs > were getting stuck. On further analysis, it was found out that there exist > two independent deadlocks and either of them occur under different > circumstances. The screenshots for these two deadlocks are attached here. > We were able to reproduce the deadlocks with the following piece of code: > > {code:java} > import org.apache.hadoop.conf.Configuration > import org.apache.hadoop.fs.{FileSystem, Path} > import org.apache.spark._ > import org.apache.spark.TaskContext > // Simple example of Word Count in Scala > object ScalaWordCount { > def main(args: Array[String]) { > if (args.length < 2) { > System.err.println("Usage: ScalaWordCount ") > System.exit(1) > } > val conf = new SparkConf().setAppName("Scala Word Count") > val sc = new SparkContext(conf) > // get the input file uri > val inputFilesUri = args(0) > // get the output file uri > val outputFilesUri = args(1) > while (true) { > val textFile = sc.textFile(inputFilesUri) > val counts = textFile.flatMap(line => line.split(" ")) > .map(word => {if (TaskContext.get.partitionId == 5 && > TaskContext.get.attemptNumber == 0) throw new Exception("Fail for > blacklisting") else (word, 1)}) > .reduceByKey(_ + _) > counts.saveAsTextFile(outputFilesUri) > val conf: Configuration = new Configuration() > val path: Path = new Path(outputFilesUri) > val hdfs: FileSystem = FileSystem.get(conf) > hdfs.delete(path, true) > } > sc.stop() > } > } > {code} > > Additionally, to ensure that the deadlock surfaces up soon enough, I also > added a small delay in the Spark code here: > [https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala#L256] > > {code:java} > executorIdToFailureList.remove(exec) > updateNextExpiryTime() > Thread.sleep(2000) > killBlacklistedExecutor(exec) > {code} > > Also make sure that the following configs are set when launching the above > spark job: > *spark.blacklist.enabled=true* > *spark.blacklist.killBlacklistedExecutors=true* > *spark.blacklist.application.maxFailedTasksPerExecutor=1* -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-27194) Job failures when task attempts do not clean up spark-staging parquet files
[ https://issues.apache.org/jira/browse/SPARK-27194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16797241#comment-16797241 ] Ajith S commented on SPARK-27194: - Hi [~dongjoon] , i have some analysis here : [https://github.com/apache/spark/pull/24142#issuecomment-474866759] Please let me know your views > Job failures when task attempts do not clean up spark-staging parquet files > --- > > Key: SPARK-27194 > URL: https://issues.apache.org/jira/browse/SPARK-27194 > Project: Spark > Issue Type: Improvement > Components: Spark Core, SQL >Affects Versions: 2.3.1, 2.3.2 >Reporter: Reza Safi >Priority: Major > > When a container fails for some reason (for example when killed by yarn for > exceeding memory limits), the subsequent task attempts for the tasks that > were running on that container all fail with a FileAlreadyExistsException. > The original task attempt does not seem to successfully call abortTask (or at > least its "best effort" delete is unsuccessful) and clean up the parquet file > it was writing to, so when later task attempts try to write to the same > spark-staging directory using the same file name, the job fails. > Here is what transpires in the logs: > The container where task 200.0 is running is killed and the task is lost: > {code} > 19/02/20 09:33:25 ERROR cluster.YarnClusterScheduler: Lost executor y on > t.y.z.com: Container killed by YARN for exceeding memory limits. 8.1 GB of 8 > GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead. > 19/02/20 09:33:25 WARN scheduler.TaskSetManager: Lost task 200.0 in stage > 0.0 (TID xxx, t.y.z.com, executor 93): ExecutorLostFailure (executor 93 > exited caused by one of the running tasks) Reason: Container killed by YARN > for exceeding memory limits. 8.1 GB of 8 GB physical memory used. Consider > boosting spark.yarn.executor.memoryOverhead. > {code} > The task is re-attempted on a different executor and fails because the > part-00200-blah-blah.c000.snappy.parquet file from the first task attempt > already exists: > {code} > 19/02/20 09:35:01 WARN scheduler.TaskSetManager: Lost task 200.1 in stage 0.0 > (TID 594, tn.y.z.com, executor 70): org.apache.spark.SparkException: Task > failed while writing rows. > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:285) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:197) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:196) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) > at org.apache.spark.scheduler.Task.run(Task.scala:109) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: > /user/hive/warehouse/tmp_supply_feb1/.spark-staging-blah-blah-blah/dt=2019-02-17/part-00200-blah-blah.c000.snappy.parquet > for client a.b.c.d already exists > {code} > The job fails when the the configured task attempts (spark.task.maxFailures) > have failed with the same error: > {code} > org.apache.spark.SparkException: Job aborted due to stage failure: Task 200 > in stage 0.0 failed 20 times, most recent failure: Lost task 284.19 in stage > 0.0 (TID yyy, tm.y.z.com, executor 16): org.apache.spark.SparkException: Task > failed while writing rows. > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:285) > ... > Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: > /user/hive/warehouse/tmp_supply_feb1/.spark-staging-blah-blah-blah/dt=2019-02-17/part-00200-blah-blah.c000.snappy.parquet > for client i.p.a.d already exists > {code} > SPARK-26682 wasn't the root cause here, since there wasn't any stage > reattempt. > This issue seems to happen when > spark.sql.sources.partitionOverwriteMode=dynamic. > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-27194) Job failures when task attempts do not clean up spark-staging parquet files
[ https://issues.apache.org/jira/browse/SPARK-27194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16797216#comment-16797216 ] Ajith S edited comment on SPARK-27194 at 3/20/19 2:38 PM: -- [~dongjoon] Yes i tried with spark 2.3.3, and the issue persist. Here is the operation i performed {code:java} spark.sql.sources.partitionOverwriteMode=DYNAMIC{code} {code:java} create table t1 (i int, part1 int, part2 int) using parquet partitioned by (part1, part2) insert into t1 partition(part1=1, part2=1) select 1 insert overwrite table t1 partition(part1=1, part2=1) select 2 insert overwrite table t1 partition(part1=2, part2) select 2, 2 // here the exec is killed and task respawns{code} and here is the full stacktrace as per 2.3.3 {code:java} 2019-03-20 19:58:06 WARN TaskSetManager:66 - Lost task 0.1 in stage 2.0 (TID 3, QWERTY, executor 2): org.apache.spark.SparkException: Task failed while writing rows. at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:285) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:197) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:196) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: /user/hive/warehouse/t2/.spark-staging-1f1efbfd-7e20-4e0f-a49c-a7fa3eae4cb1/part1=2/part2=2/part-0-1f1efbfd-7e20-4e0f-a49c-a7fa3eae4cb1.c000.snappy.parquet for client 127.0.0.1 already exists at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:2578) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2465) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:2349) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:624) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:398) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2217) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2213) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1746) at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2213) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106) at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73) at org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:1653) at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1689) at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1624) at org.apache.hadoop.hdfs.DistributedFileSystem$7.doCall(DistributedFileSystem.java:448) at org.apache.hadoop.hdfs.DistributedFileSystem$7.doCall(DistributedFileSystem.java:444) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:459) at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:387) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:911) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:892) at org.apache.parquet.hadoop.ParquetFileWriter.(ParquetFileWriter.java:236) at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:342) at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:302) at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.(ParquetOutputWriter.scala:37) at
[jira] [Commented] (SPARK-27194) Job failures when task attempts do not clean up spark-staging parquet files
[ https://issues.apache.org/jira/browse/SPARK-27194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16797216#comment-16797216 ] Ajith S commented on SPARK-27194: - [~dongjoon] Yes i tried with spark 2.3.3, and the issue persist. Here is the operation i performed {code:java} create table t1 (i int, part1 int, part2 int) using parquet partitioned by (part1, part2) insert into t1 partition(part1=1, part2=1) select 1 insert overwrite table t1 partition(part1=1, part2=1) select 2 insert overwrite table t1 partition(part1=2, part2) select 2, 2 // here the exec is killed and task respawns{code} and here is the full stacktrace as per 2.3.3 {code:java} 2019-03-20 19:58:06 WARN TaskSetManager:66 - Lost task 0.1 in stage 2.0 (TID 3, QWERTY, executor 2): org.apache.spark.SparkException: Task failed while writing rows. at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:285) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:197) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:196) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: /user/hive/warehouse/t2/.spark-staging-1f1efbfd-7e20-4e0f-a49c-a7fa3eae4cb1/part1=2/part2=2/part-0-1f1efbfd-7e20-4e0f-a49c-a7fa3eae4cb1.c000.snappy.parquet for client 127.0.0.1 already exists at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:2578) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2465) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:2349) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:624) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:398) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2217) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2213) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1746) at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2213) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106) at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73) at org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:1653) at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1689) at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1624) at org.apache.hadoop.hdfs.DistributedFileSystem$7.doCall(DistributedFileSystem.java:448) at org.apache.hadoop.hdfs.DistributedFileSystem$7.doCall(DistributedFileSystem.java:444) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:459) at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:387) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:911) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:892) at org.apache.parquet.hadoop.ParquetFileWriter.(ParquetFileWriter.java:236) at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:342) at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:302) at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.(ParquetOutputWriter.scala:37) at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.newInstance(ParquetFileFormat.scala:151) at
[jira] [Created] (SPARK-27218) spark-sql-kafka-0-10 startingOffset=earliest not working as expected with streaming
Emanuele Sabellico created SPARK-27218: -- Summary: spark-sql-kafka-0-10 startingOffset=earliest not working as expected with streaming Key: SPARK-27218 URL: https://issues.apache.org/jira/browse/SPARK-27218 Project: Spark Issue Type: Bug Components: Structured Streaming Affects Versions: 2.4.0 Environment: Windows 10, spark-2.4.0-bin-hadoop2.7 Reporter: Emanuele Sabellico Hi, I'm trying to stream a kafka topic with spark-sql-kafka-0-10_2.11:2.4.0 with a code like this: {noformat} spark.readStream .format("kafka") .option("subscribe", topics) .option("startingOffsets", "earliest") .load() .select(from_avro_with_schema_registry($"value", avroOptions) as "body"){noformat} I find that Spark doesn't start from the earliest offset but from the latest. Or better, initially it gets the earliest offsets but then it does a seek to end, skipping the messages in between. In the logs I find this: {noformat} 2019-03-20 11:59:50 INFO KafkaMicroBatchReader:54 - Initial offsets: {"test1":{"0":1740}} 2019-03-20 11:59:50 INFO Fetcher:583 - [Consumer clientId=consumer-1, groupId=spark-kafka-source-6c3a7acb-91fa-4b57-81f1-c8f7c6c5ab6d--880364893-driver-0] Resetting offset for partition test1-0 to offset 15922. {noformat} Looking into the code I find that in _KafkaMicroBatchReader.setOffsetRange_ the method _KafkaOffsetReader.fetchLatestOffsets_ is called and here there is a _consumer.seekToEnd(partitions)_ According to the documentation I was expecting that the streaming started from the earliest offset in this case. Is there something that I'm getting wrong or doing wrong? Thanks in advance! -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-27218) spark-sql-kafka-0-10 startingOffset=earliest not working as expected with streaming
[ https://issues.apache.org/jira/browse/SPARK-27218?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Emanuele Sabellico updated SPARK-27218: --- Description: Hi, I'm trying to stream a kafka topic with spark-sql-kafka-0-10_2.11:2.4.0 with a code like this: {noformat} spark.readStream .format("kafka") .option("subscribe", topics) .option("startingOffsets", "earliest") .load() .select(from_avro_with_schema_registry($"value", avroOptions) as "body"){noformat} I find that Spark doesn't start from the earliest offset but from the latest. Or better, initially it gets the earliest offsets but then it does a seek to end, skipping the messages in-between. In the logs I find this: {noformat} 2019-03-20 11:59:50 INFO KafkaMicroBatchReader:54 - Initial offsets: {"test1":{"0":1740}} 2019-03-20 11:59:50 INFO Fetcher:583 - [Consumer clientId=consumer-1, groupId=spark-kafka-source-6c3a7acb-91fa-4b57-81f1-c8f7c6c5ab6d--880364893-driver-0] Resetting offset for partition test1-0 to offset 15922. {noformat} Looking into the code I find that in _KafkaMicroBatchReader.setOffsetRange_ the method _KafkaOffsetReader.fetchLatestOffsets_ is called and here there is a _consumer.seekToEnd(partitions)_ According to the documentation I was expecting that the streaming would have started from the earliest offset in this case. Is there something that I'm getting wrong or doing wrong? Thanks in advance! was: Hi, I'm trying to stream a kafka topic with spark-sql-kafka-0-10_2.11:2.4.0 with a code like this: {noformat} spark.readStream .format("kafka") .option("subscribe", topics) .option("startingOffsets", "earliest") .load() .select(from_avro_with_schema_registry($"value", avroOptions) as "body"){noformat} I find that Spark doesn't start from the earliest offset but from the latest. Or better, initially it gets the earliest offsets but then it does a seek to end, skipping the messages in-between. In the logs I find this: {noformat} 2019-03-20 11:59:50 INFO KafkaMicroBatchReader:54 - Initial offsets: {"test1":{"0":1740}} 2019-03-20 11:59:50 INFO Fetcher:583 - [Consumer clientId=consumer-1, groupId=spark-kafka-source-6c3a7acb-91fa-4b57-81f1-c8f7c6c5ab6d--880364893-driver-0] Resetting offset for partition test1-0 to offset 15922. {noformat} Looking into the code I find that in _KafkaMicroBatchReader.setOffsetRange_ the method _KafkaOffsetReader.fetchLatestOffsets_ is called and here there is a _consumer.seekToEnd(partitions)_ According to the documentation I was expecting that the streaming started from the earliest offset in this case. Is there something that I'm getting wrong or doing wrong? Thanks in advance! > spark-sql-kafka-0-10 startingOffset=earliest not working as expected with > streaming > --- > > Key: SPARK-27218 > URL: https://issues.apache.org/jira/browse/SPARK-27218 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.4.0 > Environment: Windows 10, spark-2.4.0-bin-hadoop2.7 >Reporter: Emanuele Sabellico >Priority: Minor > > Hi, I'm trying to stream a kafka topic with spark-sql-kafka-0-10_2.11:2.4.0 > with a code like this: > {noformat} > spark.readStream > .format("kafka") > .option("subscribe", topics) > .option("startingOffsets", "earliest") > .load() > .select(from_avro_with_schema_registry($"value", avroOptions) as > "body"){noformat} > I find that Spark doesn't start from the earliest offset but from the latest. > Or better, initially it gets the earliest offsets but then it does a seek to > end, skipping the messages in-between. > In the logs I find this: > {noformat} > 2019-03-20 11:59:50 INFO KafkaMicroBatchReader:54 - Initial offsets: > {"test1":{"0":1740}} > 2019-03-20 11:59:50 INFO Fetcher:583 - [Consumer clientId=consumer-1, > groupId=spark-kafka-source-6c3a7acb-91fa-4b57-81f1-c8f7c6c5ab6d--880364893-driver-0] > Resetting offset for partition test1-0 to offset 15922. > {noformat} > Looking into the code I find that in _KafkaMicroBatchReader.setOffsetRange_ > the method _KafkaOffsetReader.fetchLatestOffsets_ is called and here there is > a _consumer.seekToEnd(partitions)_ > According to the documentation I was expecting that the streaming would have > started from the earliest offset in this case. Is there something that I'm > getting wrong or doing wrong? > Thanks in advance! > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-27218) spark-sql-kafka-0-10 startingOffset=earliest not working as expected with streaming
[ https://issues.apache.org/jira/browse/SPARK-27218?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Emanuele Sabellico updated SPARK-27218: --- Description: Hi, I'm trying to stream a kafka topic with spark-sql-kafka-0-10_2.11:2.4.0 with a code like this: {noformat} spark.readStream .format("kafka") .option("subscribe", topics) .option("startingOffsets", "earliest") .load() .select(from_avro_with_schema_registry($"value", avroOptions) as "body"){noformat} I find that Spark doesn't start from the earliest offset but from the latest. Or better, initially it gets the earliest offsets but then it does a seek to end, skipping the messages in-between. In the logs I find this: {noformat} 2019-03-20 11:59:50 INFO KafkaMicroBatchReader:54 - Initial offsets: {"test1":{"0":1740}} 2019-03-20 11:59:50 INFO Fetcher:583 - [Consumer clientId=consumer-1, groupId=spark-kafka-source-6c3a7acb-91fa-4b57-81f1-c8f7c6c5ab6d--880364893-driver-0] Resetting offset for partition test1-0 to offset 15922. {noformat} Looking into the code I find that in _KafkaMicroBatchReader.setOffsetRange_ the method _KafkaOffsetReader.fetchLatestOffsets_ is called and here there is a _consumer.seekToEnd(partitions)_ According to the documentation I was expecting that the streaming started from the earliest offset in this case. Is there something that I'm getting wrong or doing wrong? Thanks in advance! was: Hi, I'm trying to stream a kafka topic with spark-sql-kafka-0-10_2.11:2.4.0 with a code like this: {noformat} spark.readStream .format("kafka") .option("subscribe", topics) .option("startingOffsets", "earliest") .load() .select(from_avro_with_schema_registry($"value", avroOptions) as "body"){noformat} I find that Spark doesn't start from the earliest offset but from the latest. Or better, initially it gets the earliest offsets but then it does a seek to end, skipping the messages in between. In the logs I find this: {noformat} 2019-03-20 11:59:50 INFO KafkaMicroBatchReader:54 - Initial offsets: {"test1":{"0":1740}} 2019-03-20 11:59:50 INFO Fetcher:583 - [Consumer clientId=consumer-1, groupId=spark-kafka-source-6c3a7acb-91fa-4b57-81f1-c8f7c6c5ab6d--880364893-driver-0] Resetting offset for partition test1-0 to offset 15922. {noformat} Looking into the code I find that in _KafkaMicroBatchReader.setOffsetRange_ the method _KafkaOffsetReader.fetchLatestOffsets_ is called and here there is a _consumer.seekToEnd(partitions)_ According to the documentation I was expecting that the streaming started from the earliest offset in this case. Is there something that I'm getting wrong or doing wrong? Thanks in advance! > spark-sql-kafka-0-10 startingOffset=earliest not working as expected with > streaming > --- > > Key: SPARK-27218 > URL: https://issues.apache.org/jira/browse/SPARK-27218 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.4.0 > Environment: Windows 10, spark-2.4.0-bin-hadoop2.7 >Reporter: Emanuele Sabellico >Priority: Minor > > Hi, I'm trying to stream a kafka topic with spark-sql-kafka-0-10_2.11:2.4.0 > with a code like this: > {noformat} > spark.readStream > .format("kafka") > .option("subscribe", topics) > .option("startingOffsets", "earliest") > .load() > .select(from_avro_with_schema_registry($"value", avroOptions) as > "body"){noformat} > I find that Spark doesn't start from the earliest offset but from the latest. > Or better, initially it gets the earliest offsets but then it does a seek to > end, skipping the messages in-between. > In the logs I find this: > {noformat} > 2019-03-20 11:59:50 INFO KafkaMicroBatchReader:54 - Initial offsets: > {"test1":{"0":1740}} > 2019-03-20 11:59:50 INFO Fetcher:583 - [Consumer clientId=consumer-1, > groupId=spark-kafka-source-6c3a7acb-91fa-4b57-81f1-c8f7c6c5ab6d--880364893-driver-0] > Resetting offset for partition test1-0 to offset 15922. > {noformat} > Looking into the code I find that in _KafkaMicroBatchReader.setOffsetRange_ > the method _KafkaOffsetReader.fetchLatestOffsets_ is called and here there is > a _consumer.seekToEnd(partitions)_ > According to the documentation I was expecting that the streaming started > from the earliest offset in this case. Is there something that I'm getting > wrong or doing wrong? > Thanks in advance! > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-27217) Nested schema pruning doesn't work for aggregation e.g. `sum`.
colin fang created SPARK-27217: -- Summary: Nested schema pruning doesn't work for aggregation e.g. `sum`. Key: SPARK-27217 URL: https://issues.apache.org/jira/browse/SPARK-27217 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 2.4.0 Reporter: colin fang Since SPARK-4502 is fixed, I would expect queries such as `select sum(b.x)` doesn't have to read other nested fields. {code:python} rdd = spark.range(1000).rdd.map(lambda x: [x.id+3, [x.id+1, x.id-1]]) df = spark.createDataFrame(, schema='a:int,b:struct') df.repartition(1).write.mode('overwrite').parquet('test.parquet') df = spark.read.parquet('test.parquet') spark.conf.set('spark.sql.optimizer.nestedSchemaPruning.enabled', 'true') df.select('b.x').explain() # ReadSchema: struct> spark.conf.set('spark.sql.optimizer.nestedSchemaPruning.enabled', 'false') df.select('b.x').explain() # ReadSchema: struct> spark.conf.set('spark.sql.optimizer.nestedSchemaPruning.enabled', 'true') df.selectExpr('sum(b.x)').explain() # ReadSchema: struct> {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-27206) Using slice method with streaming api's Interval on DStream
[ https://issues.apache.org/jira/browse/SPARK-27206?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gabor Somogyi resolved SPARK-27206. --- Resolution: Invalid > Using slice method with streaming api's Interval on DStream > --- > > Key: SPARK-27206 > URL: https://issues.apache.org/jira/browse/SPARK-27206 > Project: Spark > Issue Type: Question > Components: DStreams >Affects Versions: 2.4.0 > Environment: Linux, standalone spark >Reporter: Aarthi >Priority: Major > > Hi. I am in need to slice a DStream that receives data from a custom receiver > (implemented with Spark's Receiver). There are two options to do this > 1. slice(fromTime: > [Time|http://spark.apache.org/docs/2.3.1/api/scala/org/apache/spark/streaming/Time.html], > toTime: > [Time|http://spark.apache.org/docs/2.3.1/api/scala/org/apache/spark/streaming/Time.html]) > 2. slice(interval: Interval) > Although the second option is a public method, the Interval class is private. > Can you please help me understand how to use this api with > org.apache.spark.streaming.Interval ? > Thanks, Aarthi > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-27216) Kryo serialization with RoaringBitmap
[ https://issues.apache.org/jira/browse/SPARK-27216?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-27216: Assignee: (was: Apache Spark) > Kryo serialization with RoaringBitmap > - > > Key: SPARK-27216 > URL: https://issues.apache.org/jira/browse/SPARK-27216 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.3, 2.4.0, 3.0.0 >Reporter: Lantao Jin >Priority: Major > > HighlyCompressedMapStatus uses RoaringBitmap to record the empty blocks. But > RoaringBitmap couldn't be ser/deser with unsafe KryoSerializer. > We can use below UT to reproduce: > {code} > test("kryo serialization with RoaringBitmap") { > val bitmap = new RoaringBitmap > bitmap.add(1787) > val safeSer = new KryoSerializer(conf).newInstance() > val bitmap2 : RoaringBitmap = > safeSer.deserialize(safeSer.serialize(bitmap)) > assert(bitmap2.equals(bitmap)) > conf.set("spark.kryo.unsafe", "true") > val unsafeSer = new KryoSerializer(conf).newInstance() > val bitmap3 : RoaringBitmap = > unsafeSer.deserialize(unsafeSer.serialize(bitmap)) > assert(bitmap3.equals(bitmap)) // this will fail > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-27216) Kryo serialization with RoaringBitmap
[ https://issues.apache.org/jira/browse/SPARK-27216?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-27216: Assignee: Apache Spark > Kryo serialization with RoaringBitmap > - > > Key: SPARK-27216 > URL: https://issues.apache.org/jira/browse/SPARK-27216 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.3, 2.4.0, 3.0.0 >Reporter: Lantao Jin >Assignee: Apache Spark >Priority: Major > > HighlyCompressedMapStatus uses RoaringBitmap to record the empty blocks. But > RoaringBitmap couldn't be ser/deser with unsafe KryoSerializer. > We can use below UT to reproduce: > {code} > test("kryo serialization with RoaringBitmap") { > val bitmap = new RoaringBitmap > bitmap.add(1787) > val safeSer = new KryoSerializer(conf).newInstance() > val bitmap2 : RoaringBitmap = > safeSer.deserialize(safeSer.serialize(bitmap)) > assert(bitmap2.equals(bitmap)) > conf.set("spark.kryo.unsafe", "true") > val unsafeSer = new KryoSerializer(conf).newInstance() > val bitmap3 : RoaringBitmap = > unsafeSer.deserialize(unsafeSer.serialize(bitmap)) > assert(bitmap3.equals(bitmap)) // this will fail > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-27206) Using slice method with streaming api's Interval on DStream
[ https://issues.apache.org/jira/browse/SPARK-27206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16797153#comment-16797153 ] Gabor Somogyi commented on SPARK-27206: --- [~aarthipa] Can you please write a mail to the dev list? (d...@spark.apache.org) > Using slice method with streaming api's Interval on DStream > --- > > Key: SPARK-27206 > URL: https://issues.apache.org/jira/browse/SPARK-27206 > Project: Spark > Issue Type: Question > Components: DStreams >Affects Versions: 2.4.0 > Environment: Linux, standalone spark >Reporter: Aarthi >Priority: Major > > Hi. I am in need to slice a DStream that receives data from a custom receiver > (implemented with Spark's Receiver). There are two options to do this > 1. slice(fromTime: > [Time|http://spark.apache.org/docs/2.3.1/api/scala/org/apache/spark/streaming/Time.html], > toTime: > [Time|http://spark.apache.org/docs/2.3.1/api/scala/org/apache/spark/streaming/Time.html]) > 2. slice(interval: Interval) > Although the second option is a public method, the Interval class is private. > Can you please help me understand how to use this api with > org.apache.spark.streaming.Interval ? > Thanks, Aarthi > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-27216) Kryo serialization with RoaringBitmap
[ https://issues.apache.org/jira/browse/SPARK-27216?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lantao Jin updated SPARK-27216: --- Description: HighlyCompressedMapStatus uses RoaringBitmap to record the empty blocks. But RoaringBitmap couldn't be ser/deser with unsafe KryoSerializer. We can use below UT to reproduce: {code} test("kryo serialization with RoaringBitmap") { val bitmap = new RoaringBitmap bitmap.add(1787) val safeSer = new KryoSerializer(conf).newInstance() val bitmap2 : RoaringBitmap = safeSer.deserialize(safeSer.serialize(bitmap)) assert(bitmap2.equals(bitmap)) conf.set("spark.kryo.unsafe", "true") val unsafeSer = new KryoSerializer(conf).newInstance() val bitmap3 : RoaringBitmap = unsafeSer.deserialize(unsafeSer.serialize(bitmap)) assert(bitmap3.equals(bitmap)) // this will fail } {code} was:HighlyCompressedMapStatus uses RoaringBitmap to record the empty blocks. But RoaringBitmap couldn't be ser/deser with unsafe KryoSerializer. > Kryo serialization with RoaringBitmap > - > > Key: SPARK-27216 > URL: https://issues.apache.org/jira/browse/SPARK-27216 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.3, 2.4.0, 3.0.0 >Reporter: Lantao Jin >Priority: Major > > HighlyCompressedMapStatus uses RoaringBitmap to record the empty blocks. But > RoaringBitmap couldn't be ser/deser with unsafe KryoSerializer. > We can use below UT to reproduce: > {code} > test("kryo serialization with RoaringBitmap") { > val bitmap = new RoaringBitmap > bitmap.add(1787) > val safeSer = new KryoSerializer(conf).newInstance() > val bitmap2 : RoaringBitmap = > safeSer.deserialize(safeSer.serialize(bitmap)) > assert(bitmap2.equals(bitmap)) > conf.set("spark.kryo.unsafe", "true") > val unsafeSer = new KryoSerializer(conf).newInstance() > val bitmap3 : RoaringBitmap = > unsafeSer.deserialize(unsafeSer.serialize(bitmap)) > assert(bitmap3.equals(bitmap)) // this will fail > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-27216) Kryo serialization with RoaringBitmap
Lantao Jin created SPARK-27216: -- Summary: Kryo serialization with RoaringBitmap Key: SPARK-27216 URL: https://issues.apache.org/jira/browse/SPARK-27216 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 2.4.0, 2.3.3, 3.0.0 Reporter: Lantao Jin HighlyCompressedMapStatus uses RoaringBitmap to record the empty blocks. But RoaringBitmap couldn't be ser/deser with unsafe KryoSerializer. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-27215) Correct the kryo configurations
[ https://issues.apache.org/jira/browse/SPARK-27215?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-27215: Assignee: (was: Apache Spark) > Correct the kryo configurations > --- > > Key: SPARK-27215 > URL: https://issues.apache.org/jira/browse/SPARK-27215 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.0.0 >Reporter: Lantao Jin >Priority: Major > > {code} > val KRYO_USE_UNSAFE = ConfigBuilder("spark.kyro.unsafe") > .booleanConf > .createWithDefault(false) > val KRYO_USE_POOL = ConfigBuilder("spark.kyro.pool") > .booleanConf > .createWithDefault(true) > {code} > kyro should be kryo -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-27215) Correct the kryo configurations
[ https://issues.apache.org/jira/browse/SPARK-27215?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-27215: Assignee: Apache Spark > Correct the kryo configurations > --- > > Key: SPARK-27215 > URL: https://issues.apache.org/jira/browse/SPARK-27215 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.0.0 >Reporter: Lantao Jin >Assignee: Apache Spark >Priority: Major > > {code} > val KRYO_USE_UNSAFE = ConfigBuilder("spark.kyro.unsafe") > .booleanConf > .createWithDefault(false) > val KRYO_USE_POOL = ConfigBuilder("spark.kyro.pool") > .booleanConf > .createWithDefault(true) > {code} > kyro should be kryo -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-27215) Correct the kryo configurations
Lantao Jin created SPARK-27215: -- Summary: Correct the kryo configurations Key: SPARK-27215 URL: https://issues.apache.org/jira/browse/SPARK-27215 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 3.0.0 Reporter: Lantao Jin {code} val KRYO_USE_UNSAFE = ConfigBuilder("spark.kyro.unsafe") .booleanConf .createWithDefault(false) val KRYO_USE_POOL = ConfigBuilder("spark.kyro.pool") .booleanConf .createWithDefault(true) {code} kyro should be kryo -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-27199) Replace TimeZone by ZoneId in TimestampFormatter API
[ https://issues.apache.org/jira/browse/SPARK-27199?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hyukjin Kwon resolved SPARK-27199. -- Resolution: Fixed Fix Version/s: 3.0.0 Issue resolved by pull request 24141 [https://github.com/apache/spark/pull/24141] > Replace TimeZone by ZoneId in TimestampFormatter API > > > Key: SPARK-27199 > URL: https://issues.apache.org/jira/browse/SPARK-27199 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0 >Reporter: Maxim Gekk >Assignee: Maxim Gekk >Priority: Minor > Fix For: 3.0.0 > > > Internally, TimestampFormatter implementations use ZoneId but not TimeZone > which comes via API. Conversion from TimeZone to ZoneId is not for free. > Actually, TimeZone is converted to String, and the String and parsed to > ZoneId. The conversion to String can be eliminated if TimestampFormatter > would accept ZoneId. And also, TimeZone is converted from String in some > cases (JSON options). So, in bad case String -> TimeZone -> String -> ZoneId > -> ZoneOffset. The ticket aims to use ZoneId in TimestampFormatter API. We > could require ZoneOffset but it is not convenient in most cases because > conversion ZoneId to ZoneOffset requires Instant. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-27201) Show full job description on click
[ https://issues.apache.org/jira/browse/SPARK-27201?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hyukjin Kwon reassigned SPARK-27201: Assignee: Gengliang Wang > Show full job description on click > -- > > Key: SPARK-27201 > URL: https://issues.apache.org/jira/browse/SPARK-27201 > Project: Spark > Issue Type: Task > Components: Web UI >Affects Versions: 3.0.0 >Reporter: Gengliang Wang >Assignee: Gengliang Wang >Priority: Major > > Previously, in https://github.com/apache/spark/pull/6646 there was an > improvement to show full job description after double click. > I think this is a bit hard to be noticed by some users. I suggest changing > the event to one click. > Also, after the full description is shown, another click should be able to > hide the overflow text again. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-27201) Show full job description on click
[ https://issues.apache.org/jira/browse/SPARK-27201?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hyukjin Kwon resolved SPARK-27201. -- Resolution: Fixed Fix Version/s: 3.0.0 Issue resolved by pull request 24145 [https://github.com/apache/spark/pull/24145] > Show full job description on click > -- > > Key: SPARK-27201 > URL: https://issues.apache.org/jira/browse/SPARK-27201 > Project: Spark > Issue Type: Task > Components: Web UI >Affects Versions: 3.0.0 >Reporter: Gengliang Wang >Assignee: Gengliang Wang >Priority: Major > Fix For: 3.0.0 > > > Previously, in https://github.com/apache/spark/pull/6646 there was an > improvement to show full job description after double click. > I think this is a bit hard to be noticed by some users. I suggest changing > the event to one click. > Also, after the full description is shown, another click should be able to > hide the overflow text again. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-27213) Unexpected results when filter is used after distinct
[ https://issues.apache.org/jira/browse/SPARK-27213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16797112#comment-16797112 ] Udbhav Agrawal commented on SPARK-27213: i will try to check this issue > Unexpected results when filter is used after distinct > - > > Key: SPARK-27213 > URL: https://issues.apache.org/jira/browse/SPARK-27213 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.3.2, 2.4.0 >Reporter: Rinaz Belhaj >Priority: Major > Labels: distinct, filter > > The following code gives unexpected output due to the filter not getting > pushed down in catalyst optimizer. > {code:java} > df = > spark.createDataFrame([['a','123','12.3','n'],['a','123','12.3','y'],['a','123','12.4','y']],['x','y','z','y_n']) > df.show(5) > df.filter("y_n='y'").select('x','y','z').distinct().show() > df.select('x','y','z').distinct().filter("y_n='y'").show() > {code} > {panel:title=Output} > |x|y|z|y_n| > |a|123|12.3|n| > |a|123|12.3|y| > |a|123|12.4|y| > > |x|y|z| > |a|123|12.3| > |a|123|12.4| > > |x|y|z| > |a|123|12.4| > {panel} > Ideally, the second statement should result in an error since the column used > in the filter is not present in the preceding select statement. But the > catalyst optimizer is using first() on column y_n and then applying the > filter. > Even if the filter was pushed down, the result would have been accurate. > {code:java} > df = > spark.createDataFrame([['a','123','12.3','n'],['a','123','12.3','y'],['a','123','12.4','y']],['x','y','z','y_n']) > df.filter("y_n='y'").select('x','y','z').distinct().explain(True) > df.select('x','y','z').distinct().filter("y_n='y'").explain(True) > {code} > {panel:title=Output} > > == Parsed Logical Plan == > Deduplicate [x#74, y#75, z#76|#74, y#75, z#76] > +- AnalysisBarrier > +- Project [x#74, y#75, z#76|#74, y#75, z#76] > +- Filter (y_n#77 = y) > +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false > > == Analyzed Logical Plan == > x: string, y: string, z: string > Deduplicate [x#74, y#75, z#76|#74, y#75, z#76] > +- Project [x#74, y#75, z#76|#74, y#75, z#76] > +- Filter (y_n#77 = y) > +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false > > == Optimized Logical Plan == > Aggregate [x#74, y#75, z#76|#74, y#75, z#76], [x#74, y#75, z#76|#74, y#75, > z#76] > +- Project [x#74, y#75, z#76|#74, y#75, z#76] > +- Filter (isnotnull(y_n#77) && (y_n#77 = y)) > +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false > > == Physical Plan == > *(2) HashAggregate(keys=[x#74, y#75, z#76|#74, y#75, z#76], functions=[], > output=[x#74, y#75, z#76|#74, y#75, z#76]) > +- Exchange hashpartitioning(x#74, y#75, z#76, 10) > +- *(1) HashAggregate(keys=[x#74, y#75, z#76|#74, y#75, z#76], functions=[], > output=[x#74, y#75, z#76|#74, y#75, z#76]) > +- *(1) Project [x#74, y#75, z#76|#74, y#75, z#76] > +- *(1) Filter (isnotnull(y_n#77) && (y_n#77 = y)) > +- Scan ExistingRDD[x#74,y#75,z#76,y_n#77|#74,y#75,z#76,y_n#77] > > > --- > > > == Parsed Logical Plan == > 'Filter ('y_n = y) > +- AnalysisBarrier > +- Deduplicate [x#74, y#75, z#76|#74, y#75, z#76] > +- Project [x#74, y#75, z#76|#74, y#75, z#76] > +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false > > == Analyzed Logical Plan == > x: string, y: string, z: string > Project [x#74, y#75, z#76|#74, y#75, z#76] > +- Filter (y_n#77 = y) > +- Deduplicate [x#74, y#75, z#76|#74, y#75, z#76] > +- Project [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77] > +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false > > == Optimized Logical Plan == > Project [x#74, y#75, z#76|#74, y#75, z#76] > +- Filter (isnotnull(y_n#77) && (y_n#77 = y)) > +- Aggregate [x#74, y#75, z#76|#74, y#75, z#76], [x#74, y#75, z#76, > first(y_n#77, false) AS y_n#77|#74, y#75, z#76, first(y_n#77, false) AS > y_n#77] > +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false > > == Physical Plan == > *(3) Project [x#74, y#75, z#76|#74, y#75, z#76] > +- *(3) Filter (isnotnull(y_n#77) && (y_n#77 = y)) > +- SortAggregate(key=[x#74, y#75, z#76|#74, y#75, z#76], > functions=[first(y_n#77, false)|#77, false)], output=[x#74, y#75, z#76, > y_n#77|#74, y#75, z#76, y_n#77]) > +- *(2) Sort [x#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS > FIRST|#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS FIRST], > false, 0 > +- Exchange hashpartitioning(x#74, y#75, z#76, 10) > +- SortAggregate(key=[x#74, y#75, z#76|#74, y#75, z#76], > functions=[partial_first(y_n#77, false)|#77, false)], output=[x#74, y#75, > z#76,
[jira] [Assigned] (SPARK-27199) Replace TimeZone by ZoneId in TimestampFormatter API
[ https://issues.apache.org/jira/browse/SPARK-27199?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hyukjin Kwon reassigned SPARK-27199: Assignee: Maxim Gekk > Replace TimeZone by ZoneId in TimestampFormatter API > > > Key: SPARK-27199 > URL: https://issues.apache.org/jira/browse/SPARK-27199 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0 >Reporter: Maxim Gekk >Assignee: Maxim Gekk >Priority: Minor > > Internally, TimestampFormatter implementations use ZoneId but not TimeZone > which comes via API. Conversion from TimeZone to ZoneId is not for free. > Actually, TimeZone is converted to String, and the String and parsed to > ZoneId. The conversion to String can be eliminated if TimestampFormatter > would accept ZoneId. And also, TimeZone is converted from String in some > cases (JSON options). So, in bad case String -> TimeZone -> String -> ZoneId > -> ZoneOffset. The ticket aims to use ZoneId in TimestampFormatter API. We > could require ZoneOffset but it is not convenient in most cases because > conversion ZoneId to ZoneOffset requires Instant. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-27089) Loss of precision during decimal division
[ https://issues.apache.org/jira/browse/SPARK-27089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16797111#comment-16797111 ] Jared Leable commented on SPARK-27089: -- I validated that setting spark.sql.decimalOperations.allowPrecisionLoss=false returns the expected value. > Loss of precision during decimal division > - > > Key: SPARK-27089 > URL: https://issues.apache.org/jira/browse/SPARK-27089 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0, 2.4.0 >Reporter: ylo0ztlmtusq >Priority: Major > > Spark looses decimal places when dividing decimal numbers. > > Expected behavior (In Spark 2.2.3 or before) > > {code:java} > scala> val sql = """select cast(cast(3 as decimal(38,14)) / cast(9 as > decimal(38,14)) as decimal(38,14)) val""" > sql: String = select cast(cast(3 as decimal(38,14)) / cast(9 as > decimal(38,14)) as decimal(38,14)) val > scala> spark.sql(sql).show > 19/03/07 21:23:51 WARN ObjectStore: Failed to get database global_temp, > returning NoSuchObjectException > ++ > | val| > ++ > |0.33| > ++ > {code} > > Current behavior (In Spark 2.3.2 and later) > > {code:java} > scala> val sql = """select cast(cast(3 as decimal(38,14)) / cast(9 as > decimal(38,14)) as decimal(38,14)) val""" > sql: String = select cast(cast(3 as decimal(38,14)) / cast(9 as > decimal(38,14)) as decimal(38,14)) val > scala> spark.sql(sql).show > ++ > | val| > ++ > |0.33| > ++ > {code} > > Seems to caused by {{promote_precision(38, 6) }} > > {code:java} > scala> spark.sql(sql).explain(true) > == Parsed Logical Plan == > Project [cast((cast(3 as decimal(38,14)) / cast(9 as decimal(38,14))) as > decimal(38,14)) AS val#20] > +- OneRowRelation > == Analyzed Logical Plan == > val: decimal(38,14) > Project [cast(CheckOverflow((promote_precision(cast(cast(3 as decimal(38,14)) > as decimal(38,14))) / promote_precision(cast(cast(9 as decimal(38,14)) as > decimal(38,14, DecimalType(38,6)) as decimal(38,14)) AS val#20] > +- OneRowRelation > == Optimized Logical Plan == > Project [0.33 AS val#20] > +- OneRowRelation > == Physical Plan == > *(1) Project [0.33 AS val#20] > +- Scan OneRowRelation[] > {code} > > Source https://stackoverflow.com/q/55046492 -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-27213) Unexpected results when filter is used after distinct
[ https://issues.apache.org/jira/browse/SPARK-27213?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rinaz Belhaj updated SPARK-27213: - Priority: Major (was: Minor) > Unexpected results when filter is used after distinct > - > > Key: SPARK-27213 > URL: https://issues.apache.org/jira/browse/SPARK-27213 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.3.2, 2.4.0 >Reporter: Rinaz Belhaj >Priority: Major > Labels: distinct, filter > > The following code gives unexpected output due to the filter not getting > pushed down in catalyst optimizer. > {code:java} > df = > spark.createDataFrame([['a','123','12.3','n'],['a','123','12.3','y'],['a','123','12.4','y']],['x','y','z','y_n']) > df.show(5) > df.filter("y_n='y'").select('x','y','z').distinct().show() > df.select('x','y','z').distinct().filter("y_n='y'").show() > {code} > {panel:title=Output} > |x|y|z|y_n| > |a|123|12.3|n| > |a|123|12.3|y| > |a|123|12.4|y| > > |x|y|z| > |a|123|12.3| > |a|123|12.4| > > |x|y|z| > |a|123|12.4| > {panel} > Ideally, the second statement should result in an error since the column used > in the filter is not present in the preceding select statement. But the > catalyst optimizer is using first() on column y_n and then applying the > filter. > Even if the filter was pushed down, the result would have been accurate. > {code:java} > df = > spark.createDataFrame([['a','123','12.3','n'],['a','123','12.3','y'],['a','123','12.4','y']],['x','y','z','y_n']) > df.filter("y_n='y'").select('x','y','z').distinct().explain(True) > df.select('x','y','z').distinct().filter("y_n='y'").explain(True) > {code} > {panel:title=Output} > > == Parsed Logical Plan == > Deduplicate [x#74, y#75, z#76|#74, y#75, z#76] > +- AnalysisBarrier > +- Project [x#74, y#75, z#76|#74, y#75, z#76] > +- Filter (y_n#77 = y) > +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false > > == Analyzed Logical Plan == > x: string, y: string, z: string > Deduplicate [x#74, y#75, z#76|#74, y#75, z#76] > +- Project [x#74, y#75, z#76|#74, y#75, z#76] > +- Filter (y_n#77 = y) > +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false > > == Optimized Logical Plan == > Aggregate [x#74, y#75, z#76|#74, y#75, z#76], [x#74, y#75, z#76|#74, y#75, > z#76] > +- Project [x#74, y#75, z#76|#74, y#75, z#76] > +- Filter (isnotnull(y_n#77) && (y_n#77 = y)) > +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false > > == Physical Plan == > *(2) HashAggregate(keys=[x#74, y#75, z#76|#74, y#75, z#76], functions=[], > output=[x#74, y#75, z#76|#74, y#75, z#76]) > +- Exchange hashpartitioning(x#74, y#75, z#76, 10) > +- *(1) HashAggregate(keys=[x#74, y#75, z#76|#74, y#75, z#76], functions=[], > output=[x#74, y#75, z#76|#74, y#75, z#76]) > +- *(1) Project [x#74, y#75, z#76|#74, y#75, z#76] > +- *(1) Filter (isnotnull(y_n#77) && (y_n#77 = y)) > +- Scan ExistingRDD[x#74,y#75,z#76,y_n#77|#74,y#75,z#76,y_n#77] > > > --- > > > == Parsed Logical Plan == > 'Filter ('y_n = y) > +- AnalysisBarrier > +- Deduplicate [x#74, y#75, z#76|#74, y#75, z#76] > +- Project [x#74, y#75, z#76|#74, y#75, z#76] > +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false > > == Analyzed Logical Plan == > x: string, y: string, z: string > Project [x#74, y#75, z#76|#74, y#75, z#76] > +- Filter (y_n#77 = y) > +- Deduplicate [x#74, y#75, z#76|#74, y#75, z#76] > +- Project [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77] > +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false > > == Optimized Logical Plan == > Project [x#74, y#75, z#76|#74, y#75, z#76] > +- Filter (isnotnull(y_n#77) && (y_n#77 = y)) > +- Aggregate [x#74, y#75, z#76|#74, y#75, z#76], [x#74, y#75, z#76, > first(y_n#77, false) AS y_n#77|#74, y#75, z#76, first(y_n#77, false) AS > y_n#77] > +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false > > == Physical Plan == > *(3) Project [x#74, y#75, z#76|#74, y#75, z#76] > +- *(3) Filter (isnotnull(y_n#77) && (y_n#77 = y)) > +- SortAggregate(key=[x#74, y#75, z#76|#74, y#75, z#76], > functions=[first(y_n#77, false)|#77, false)], output=[x#74, y#75, z#76, > y_n#77|#74, y#75, z#76, y_n#77]) > +- *(2) Sort [x#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS > FIRST|#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS FIRST], > false, 0 > +- Exchange hashpartitioning(x#74, y#75, z#76, 10) > +- SortAggregate(key=[x#74, y#75, z#76|#74, y#75, z#76], > functions=[partial_first(y_n#77, false)|#77, false)], output=[x#74, y#75, > z#76, first#95, valueSet#96|#74, y#75, z#76,
[jira] [Commented] (SPARK-27169) number of active tasks is negative on executors page
[ https://issues.apache.org/jira/browse/SPARK-27169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16797070#comment-16797070 ] acupple commented on SPARK-27169: - Thanks for you suggestion, and I will try increment the queue size and reproduce the case。 > number of active tasks is negative on executors page > > > Key: SPARK-27169 > URL: https://issues.apache.org/jira/browse/SPARK-27169 > Project: Spark > Issue Type: Bug > Components: Web UI >Affects Versions: 2.3.2 >Reporter: acupple >Priority: Minor > Attachments: QQ20190315-102215.png, QQ20190315-102235.png, > image-2019-03-19-15-17-25-522.png, image-2019-03-19-15-21-03-766.png, > job_1924.log, stage_3511.log > > > I use spark to process some data in HDFS and HBASE, I use one thread consume > message from a queue, and then submit to a thread pool(16 fix size)for spark > processor. > But when run for some time, the active jobs will be thousands, and number of > active tasks are negative. > Actually, these jobs are already done when I check driver logs。 > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-27100) dag-scheduler-event-loop" java.lang.StackOverflowError
[ https://issues.apache.org/jira/browse/SPARK-27100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16797079#comment-16797079 ] Hyukjin Kwon commented on SPARK-27100: -- Would you be interested in narrowing down the problem, so that people can test it against the master branch? > dag-scheduler-event-loop" java.lang.StackOverflowError > -- > > Key: SPARK-27100 > URL: https://issues.apache.org/jira/browse/SPARK-27100 > Project: Spark > Issue Type: Bug > Components: MLlib >Affects Versions: 2.1.3, 2.3.3 >Reporter: KaiXu >Priority: Major > Attachments: stderr > > > ALS in Spark MLlib causes StackOverflow: > /opt/sparkml/spark213/bin/spark-submit --properties-file > /opt/HiBench/report/als/spark/conf/sparkbench/spark.conf --class > com.intel.hibench.sparkbench.ml.ALSExample --master yarn-client > --num-executors 3 --executor-memory 322g > /opt/HiBench/sparkbench/assembly/target/sparkbench-assembly-7.1-SNAPSHOT-dist.jar > --numUsers 4 --numProducts 6 --rank 100 --numRecommends 20 > --numIterations 100 --kryo false --implicitPrefs true --numProductBlocks -1 > --numUserBlocks -1 --lambda 1.0 hdfs://bdw-slave20:8020/HiBench/ALS/Input > > Exception in thread "dag-scheduler-event-loop" java.lang.StackOverflowError > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1534) > at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) > at > scala.collection.immutable.List$SerializationProxy.writeObject(List.scala:468) > at sun.reflect.GeneratedMethodAccessor27.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028) > at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) > at > scala.collection.immutable.List$SerializationProxy.writeObject(List.scala:468) > at sun.reflect.GeneratedMethodAccessor27.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028) > at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-27214) Upgrading locality level when lots of pending tasks have been waiting more than locality.wait
[ https://issues.apache.org/jira/browse/SPARK-27214?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] liupengcheng updated SPARK-27214: - Description: Currently, Spark locality wait mechanism is not friendly for large job, when number of tasks is large(e.g. 1+)and with a large number of executors(e.g. 2000), executors may be launched on some nodes where the locality is not the best(not the same nodes hold HDFS blocks). There are cases when `TaskSetManager.lastLaunchTime` is refreshed due to finished tasks within `spark.locality.wait` but coming at low rate(e.g. every `spark.locality.wait` seconds a task is finished), so locality level would not be upgraded and lots of pending tasks will wait a long time. In this case, when `spark.dynamicAllocation.enabled=true`, then lots of executors may be removed by Driver due to become idle and finally slow down the job. Actually, we can optimize this by following formula: Suppose numPendingTasks=1, localityExecutionGainFactor=0.1, probabilityOfNextLocalitySchedule=0.5 ``` maxStarvingTasks = numPendingTasks * medianOfTaskExecutionTime * localityExecutionGainFactor * probabilityOfNextLocalitySchedule / `spark.locality.wait` if (numStavingTasks > maxStarvingTasks) { upgrading locality level... } ``` was: Currently, Spark locality wait mechanism is not friendly for large job, when tasks is large(e.g. 1+), there are cases when `TaskSetManager.lastLaunchTime` is refreshed due to finished tasks within `spark.locality.wait` but coming at low rate(e.g. every `spark.locality.wait` seconds a task is finished), so locality level would not be upgraded and lots of pending tasks will wait a long time. In this case, when `spark.dynamicAllocation.enabled=true`, then lots of executors may be removed by Driver due to become idle and finally slow down the job. Actually, we can optimize this by following formula: Suppose numPendingTasks=1, localityExecutionGainFactor=0.1, probabilityOfNextLocalitySchedule=0.5 ``` maxStarvingTasks = numPendingTasks * medianOfTaskExecutionTime * localityExecutionGainFactor * probabilityOfNextLocalitySchedule / `spark.locality.wait` if (numStavingTasks > maxStarvingTasks) { upgrading locality level... } ``` > Upgrading locality level when lots of pending tasks have been waiting more > than locality.wait > - > > Key: SPARK-27214 > URL: https://issues.apache.org/jira/browse/SPARK-27214 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.1.0, 2.4.0 >Reporter: liupengcheng >Priority: Major > > Currently, Spark locality wait mechanism is not friendly for large job, when > number of tasks is large(e.g. 1+)and with a large number of > executors(e.g. 2000), executors may be launched on some nodes where the > locality is not the best(not the same nodes hold HDFS blocks). There are > cases when `TaskSetManager.lastLaunchTime` is refreshed due to finished tasks > within `spark.locality.wait` but coming at low rate(e.g. every > `spark.locality.wait` seconds a task is finished), so locality level would > not be upgraded and lots of pending tasks will wait a long time. > In this case, when `spark.dynamicAllocation.enabled=true`, then lots of > executors may be removed by Driver due to become idle and finally slow down > the job. > Actually, we can optimize this by following formula: > Suppose numPendingTasks=1, localityExecutionGainFactor=0.1, > probabilityOfNextLocalitySchedule=0.5 > ``` > maxStarvingTasks = numPendingTasks * medianOfTaskExecutionTime * > localityExecutionGainFactor * probabilityOfNextLocalitySchedule / > `spark.locality.wait` > if (numStavingTasks > maxStarvingTasks) > { upgrading locality level... } > > ``` -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-27169) number of active tasks is negative on executors page
[ https://issues.apache.org/jira/browse/SPARK-27169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16797059#comment-16797059 ] acupple commented on SPARK-27169: - Can not find any "Dropping event" log, but some warn that "Dropped events from appStatus" > number of active tasks is negative on executors page > > > Key: SPARK-27169 > URL: https://issues.apache.org/jira/browse/SPARK-27169 > Project: Spark > Issue Type: Bug > Components: Web UI >Affects Versions: 2.3.2 >Reporter: acupple >Priority: Minor > Attachments: QQ20190315-102215.png, QQ20190315-102235.png, > image-2019-03-19-15-17-25-522.png, image-2019-03-19-15-21-03-766.png, > job_1924.log, stage_3511.log > > > I use spark to process some data in HDFS and HBASE, I use one thread consume > message from a queue, and then submit to a thread pool(16 fix size)for spark > processor. > But when run for some time, the active jobs will be thousands, and number of > active tasks are negative. > Actually, these jobs are already done when I check driver logs。 > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-27214) Upgrading locality level when lots of pending tasks have been waiting more than locality.wait
liupengcheng created SPARK-27214: Summary: Upgrading locality level when lots of pending tasks have been waiting more than locality.wait Key: SPARK-27214 URL: https://issues.apache.org/jira/browse/SPARK-27214 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 2.4.0, 2.1.0 Reporter: liupengcheng Currently, Spark locality wait mechanism is not friendly for large job, when tasks is large(e.g. 1+), there are cases when `TaskSetManager.lastLaunchTime` is refreshed due to finished tasks within `spark.locality.wait` but coming at low rate(e.g. every `spark.locality.wait` seconds a task is finished), so locality level would not be upgraded and lots of pending tasks will wait a long time. In this case, when `spark.dynamicAllocation.enabled=true`, then lots of executors may be removed by Driver due to become idle and finally slow down the job. Actually, we can optimize this by following formula: Suppose numPendingTasks=1, localityExecutionGainFactor=0.1, probabilityOfNextLocalitySchedule=0.5 ``` maxStarvingTasks = numPendingTasks * medianOfTaskExecutionTime * localityExecutionGainFactor * probabilityOfNextLocalitySchedule / `spark.locality.wait` if (numStavingTasks > maxStarvingTasks) { upgrading locality level... } ``` -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-27169) number of active tasks is negative on executors page
[ https://issues.apache.org/jira/browse/SPARK-27169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16797066#comment-16797066 ] shahid commented on SPARK-27169: Yes. that means many event drops happens. Can you try increasing the queue size, "spark.scheduler.listenerbus.eventqueue.capacity" (default 1) might helps. If event drop happens, then UI display weirdly only, I'm not sure, from the UI side we can do anything. Do you have any reproducible steps for that, so that I can try? > number of active tasks is negative on executors page > > > Key: SPARK-27169 > URL: https://issues.apache.org/jira/browse/SPARK-27169 > Project: Spark > Issue Type: Bug > Components: Web UI >Affects Versions: 2.3.2 >Reporter: acupple >Priority: Minor > Attachments: QQ20190315-102215.png, QQ20190315-102235.png, > image-2019-03-19-15-17-25-522.png, image-2019-03-19-15-21-03-766.png, > job_1924.log, stage_3511.log > > > I use spark to process some data in HDFS and HBASE, I use one thread consume > message from a queue, and then submit to a thread pool(16 fix size)for spark > processor. > But when run for some time, the active jobs will be thousands, and number of > active tasks are negative. > Actually, these jobs are already done when I check driver logs。 > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-27169) number of active tasks is negative on executors page
[ https://issues.apache.org/jira/browse/SPARK-27169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16797066#comment-16797066 ] shahid edited comment on SPARK-27169 at 3/20/19 11:26 AM: -- Yes. that means many event drops happens. Can you try increasing the queue size, "spark.scheduler.listenerbus.eventqueue.capacity" (default 1) might helps. If event drop happens, then UI display weirdly, I'm not sure, from the UI side we can do anything. Do you have any reproducible steps for that, so that I can try? was (Author: shahid): Yes. that means many event drops happens. Can you try increasing the queue size, "spark.scheduler.listenerbus.eventqueue.capacity" (default 1) might helps. If event drop happens, then UI display weirdly only, I'm not sure, from the UI side we can do anything. Do you have any reproducible steps for that, so that I can try? > number of active tasks is negative on executors page > > > Key: SPARK-27169 > URL: https://issues.apache.org/jira/browse/SPARK-27169 > Project: Spark > Issue Type: Bug > Components: Web UI >Affects Versions: 2.3.2 >Reporter: acupple >Priority: Minor > Attachments: QQ20190315-102215.png, QQ20190315-102235.png, > image-2019-03-19-15-17-25-522.png, image-2019-03-19-15-21-03-766.png, > job_1924.log, stage_3511.log > > > I use spark to process some data in HDFS and HBASE, I use one thread consume > message from a queue, and then submit to a thread pool(16 fix size)for spark > processor. > But when run for some time, the active jobs will be thousands, and number of > active tasks are negative. > Actually, these jobs are already done when I check driver logs。 > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-27213) Unexpected results when filter is used after distinct
[ https://issues.apache.org/jira/browse/SPARK-27213?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rinaz Belhaj updated SPARK-27213: - Description: The following code gives unexpected output due to the filter not getting pushed down in catalyst optimizer. {code:java} df = spark.createDataFrame([['a','123','12.3','n'],['a','123','12.3','y'],['a','123','12.4','y']],['x','y','z','y_n']) df.show(5) df.filter("y_n='y'").select('x','y','z').distinct().show() df.select('x','y','z').distinct().filter("y_n='y'").show() {code} {panel:title=Output} |x|y|z|y_n| |a|123|12.3|n| |a|123|12.3|y| |a|123|12.4|y| |x|y|z| |a|123|12.3| |a|123|12.4| |x|y|z| |a|123|12.4| {panel} Ideally, the second statement should result in an error since the column used in the filter is not present in the preceding select statement. But the catalyst optimizer is using first() on column y_n and then applying the filter. Even if the filter was pushed down, the result would have been accurate. {code:java} df = spark.createDataFrame([['a','123','12.3','n'],['a','123','12.3','y'],['a','123','12.4','y']],['x','y','z','y_n']) df.filter("y_n='y'").select('x','y','z').distinct().explain(True) df.select('x','y','z').distinct().filter("y_n='y'").explain(True) {code} {panel:title=Output} == Parsed Logical Plan == Deduplicate [x#74, y#75, z#76|#74, y#75, z#76] +- AnalysisBarrier +- Project [x#74, y#75, z#76|#74, y#75, z#76] +- Filter (y_n#77 = y) +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false == Analyzed Logical Plan == x: string, y: string, z: string Deduplicate [x#74, y#75, z#76|#74, y#75, z#76] +- Project [x#74, y#75, z#76|#74, y#75, z#76] +- Filter (y_n#77 = y) +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false == Optimized Logical Plan == Aggregate [x#74, y#75, z#76|#74, y#75, z#76], [x#74, y#75, z#76|#74, y#75, z#76] +- Project [x#74, y#75, z#76|#74, y#75, z#76] +- Filter (isnotnull(y_n#77) && (y_n#77 = y)) +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false == Physical Plan == *(2) HashAggregate(keys=[x#74, y#75, z#76|#74, y#75, z#76], functions=[], output=[x#74, y#75, z#76|#74, y#75, z#76]) +- Exchange hashpartitioning(x#74, y#75, z#76, 10) +- *(1) HashAggregate(keys=[x#74, y#75, z#76|#74, y#75, z#76], functions=[], output=[x#74, y#75, z#76|#74, y#75, z#76]) +- *(1) Project [x#74, y#75, z#76|#74, y#75, z#76] +- *(1) Filter (isnotnull(y_n#77) && (y_n#77 = y)) +- Scan ExistingRDD[x#74,y#75,z#76,y_n#77|#74,y#75,z#76,y_n#77] --- == Parsed Logical Plan == 'Filter ('y_n = y) +- AnalysisBarrier +- Deduplicate [x#74, y#75, z#76|#74, y#75, z#76] +- Project [x#74, y#75, z#76|#74, y#75, z#76] +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false == Analyzed Logical Plan == x: string, y: string, z: string Project [x#74, y#75, z#76|#74, y#75, z#76] +- Filter (y_n#77 = y) +- Deduplicate [x#74, y#75, z#76|#74, y#75, z#76] +- Project [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77] +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false == Optimized Logical Plan == Project [x#74, y#75, z#76|#74, y#75, z#76] +- Filter (isnotnull(y_n#77) && (y_n#77 = y)) +- Aggregate [x#74, y#75, z#76|#74, y#75, z#76], [x#74, y#75, z#76, first(y_n#77, false) AS y_n#77|#74, y#75, z#76, first(y_n#77, false) AS y_n#77] +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false == Physical Plan == *(3) Project [x#74, y#75, z#76|#74, y#75, z#76] +- *(3) Filter (isnotnull(y_n#77) && (y_n#77 = y)) +- SortAggregate(key=[x#74, y#75, z#76|#74, y#75, z#76], functions=[first(y_n#77, false)|#77, false)], output=[x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77]) +- *(2) Sort [x#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS FIRST|#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(x#74, y#75, z#76, 10) +- SortAggregate(key=[x#74, y#75, z#76|#74, y#75, z#76], functions=[partial_first(y_n#77, false)|#77, false)], output=[x#74, y#75, z#76, first#95, valueSet#96|#74, y#75, z#76, first#95, valueSet#96]) +- *(1) Sort [x#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS FIRST|#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS FIRST], false, 0 +- Scan ExistingRDD[x#74,y#75,z#76,y_n#77|#74,y#75,z#76,y_n#77] {panel} The second query. ie *"df.select('x','y','z').distinct().filter("y_n='y'").explain(True)"* should result in error rather than giving wrong output. was: The following code gives unexpected output due to the filter not getting pushed down in catalyst optimizer. {code:java} df = spark.createDataFrame([['a','123','12.3','n'],['a','123','12.3','y'],['a','123','12.4','y']],['x','y','z','y_n']) df.show(5)
[jira] [Updated] (SPARK-27213) Unexpected results when filter is used after distinct
[ https://issues.apache.org/jira/browse/SPARK-27213?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rinaz Belhaj updated SPARK-27213: - Description: The following code gives unexpected output due to the filter not getting pushed down in catalyst optimizer. {code:java} df = spark.createDataFrame([['a','123','12.3','n'],['a','123','12.3','y'],['a','123','12.4','y']],['x','y','z','y_n']) df.show(5) df.filter("y_n='y'").select('x','y','z').distinct().show() df.select('x','y','z').distinct().filter("y_n='y'").show() {code} {panel:title=Output} |x|y|z|y_n| |a|123|12.3|n| |a|123|12.3|y| |a|123|12.4|y| |x|y|z| |a|123|12.3| |a|123|12.4| |x|y|z| |a|123|12.4| {panel} Ideally, the second statement should result in an error since the column used in the filter is not present in the preceding select statement. But the catalyst optimizer is using first() on column y_n and then applying the filter. Even if the filter was pushed down, the result would have been accurate. {code:java} df = spark.createDataFrame([['a','123','12.3','n'],['a','123','12.3','y'],['a','123','12.4','y']],['x','y','z','y_n']) df.filter("y_n='y'").select('x','y','z').distinct().explain(True) df.select('x','y','z').distinct().filter("y_n='y'").explain(True) {code} {panel:title=Output} == Parsed Logical Plan == Deduplicate [x#74, y#75, z#76|#74, y#75, z#76] +- AnalysisBarrier +- Project [x#74, y#75, z#76|#74, y#75, z#76] +- Filter (y_n#77 = y) +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false == Analyzed Logical Plan == x: string, y: string, z: string Deduplicate [x#74, y#75, z#76|#74, y#75, z#76] +- Project [x#74, y#75, z#76|#74, y#75, z#76] +- Filter (y_n#77 = y) +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false == Optimized Logical Plan == Aggregate [x#74, y#75, z#76|#74, y#75, z#76], [x#74, y#75, z#76|#74, y#75, z#76] +- Project [x#74, y#75, z#76|#74, y#75, z#76] +- Filter (isnotnull(y_n#77) && (y_n#77 = y)) +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false == Physical Plan == *(2) HashAggregate(keys=[x#74, y#75, z#76|#74, y#75, z#76], functions=[], output=[x#74, y#75, z#76|#74, y#75, z#76]) +- Exchange hashpartitioning(x#74, y#75, z#76, 10) +- *(1) HashAggregate(keys=[x#74, y#75, z#76|#74, y#75, z#76], functions=[], output=[x#74, y#75, z#76|#74, y#75, z#76]) +- *(1) Project [x#74, y#75, z#76|#74, y#75, z#76] +- *(1) Filter (isnotnull(y_n#77) && (y_n#77 = y)) +- Scan ExistingRDD[x#74,y#75,z#76,y_n#77|#74,y#75,z#76,y_n#77] --- == Parsed Logical Plan == 'Filter ('y_n = y) +- AnalysisBarrier +- Deduplicate [x#74, y#75, z#76|#74, y#75, z#76] +- Project [x#74, y#75, z#76|#74, y#75, z#76] +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false == Analyzed Logical Plan == x: string, y: string, z: string Project [x#74, y#75, z#76|#74, y#75, z#76] +- Filter (y_n#77 = y) +- Deduplicate [x#74, y#75, z#76|#74, y#75, z#76] +- Project [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77] +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false == Optimized Logical Plan == Project [x#74, y#75, z#76|#74, y#75, z#76] +- Filter (isnotnull(y_n#77) && (y_n#77 = y)) +- Aggregate [x#74, y#75, z#76|#74, y#75, z#76], [x#74, y#75, z#76, first(y_n#77, false) AS y_n#77|#74, y#75, z#76, first(y_n#77, false) AS y_n#77] +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false == Physical Plan == *(3) Project [x#74, y#75, z#76|#74, y#75, z#76] +- *(3) Filter (isnotnull(y_n#77) && (y_n#77 = y)) +- SortAggregate(key=[x#74, y#75, z#76|#74, y#75, z#76], functions=[first(y_n#77, false)|#77, false)], output=[x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77]) +- *(2) Sort [x#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS FIRST|#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(x#74, y#75, z#76, 10) +- SortAggregate(key=[x#74, y#75, z#76|#74, y#75, z#76], functions=[partial_first(y_n#77, false)|#77, false)], output=[x#74, y#75, z#76, first#95, valueSet#96|#74, y#75, z#76, first#95, valueSet#96]) +- *(1) Sort [x#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS FIRST|#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS FIRST], false, 0 +- Scan ExistingRDD[x#74,y#75,z#76,y_n#77|#74,y#75,z#76,y_n#77] {panel} The second query. ie *"df.select('x','y','z').distinct().filter("y_n='y'").explain(True)"* should result in error rather than giving wrong output. was: The following code gives unexpected output due to the filter not getting pushed down in catalyst optimizer. {code:java} df = spark.createDataFrame([['a','123','12.3','n'],['a','123','12.3','y'],['a','123','12.4','y']],['x','y','z','y_n']) df.show(5)
[jira] [Updated] (SPARK-27213) Unexpected results when filter is used after distinct
[ https://issues.apache.org/jira/browse/SPARK-27213?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rinaz Belhaj updated SPARK-27213: - Description: The following code gives unexpected output due to the filter not getting pushed down in catalyst optimizer. {code:java} df = spark.createDataFrame([['a','123','12.3','n'],['a','123','12.3','y'],['a','123','12.4','y']],['x','y','z','y_n']) df.show(5) df.filter("y_n='y'").select('x','y','z').distinct().show() df.select('x','y','z').distinct().filter("y_n='y'").show() {code} {panel:title=Output} |x|y|z|y_n| |a|123|12.3|n| |a|123|12.3|y| |a|123|12.4|y| |x|y|z| |a|123|12.3| |a|123|12.4| |x|y|z| |a|123|12.4| {panel} Ideally, the second statement should result in an error since the column used in the filter is not present in the preceding select statement. But the catalyst optimizer is using first() on column y_n and then applying the filter. Even if the filter was pushed down, the result would have been accurate. {code:java} df = spark.createDataFrame([['a','123','12.3','n'],['a','123','12.3','y'],['a','123','12.4','y']],['x','y','z','y_n']) df.filter("y_n='y'").select('x','y','z').distinct().explain(True) df.select('x','y','z').distinct().filter("y_n='y'").explain(True) {code} {panel:title=Output} == Parsed Logical Plan == Deduplicate [x#74, y#75, z#76|#74, y#75, z#76] +- AnalysisBarrier +- Project [x#74, y#75, z#76|#74, y#75, z#76] +- Filter (y_n#77 = y) +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false == Analyzed Logical Plan == x: string, y: string, z: string Deduplicate [x#74, y#75, z#76|#74, y#75, z#76] +- Project [x#74, y#75, z#76|#74, y#75, z#76] +- Filter (y_n#77 = y) +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false == Optimized Logical Plan == Aggregate [x#74, y#75, z#76|#74, y#75, z#76], [x#74, y#75, z#76|#74, y#75, z#76] +- Project [x#74, y#75, z#76|#74, y#75, z#76] +- Filter (isnotnull(y_n#77) && (y_n#77 = y)) +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false == Physical Plan == *(2) HashAggregate(keys=[x#74, y#75, z#76|#74, y#75, z#76], functions=[], output=[x#74, y#75, z#76|#74, y#75, z#76]) +- Exchange hashpartitioning(x#74, y#75, z#76, 10) +- *(1) HashAggregate(keys=[x#74, y#75, z#76|#74, y#75, z#76], functions=[], output=[x#74, y#75, z#76|#74, y#75, z#76]) +- *(1) Project [x#74, y#75, z#76|#74, y#75, z#76] +- *(1) Filter (isnotnull(y_n#77) && (y_n#77 = y)) +- Scan ExistingRDD[x#74,y#75,z#76,y_n#77|#74,y#75,z#76,y_n#77] --- == Parsed Logical Plan == 'Filter ('y_n = y) +- AnalysisBarrier +- Deduplicate [x#74, y#75, z#76|#74, y#75, z#76] +- Project [x#74, y#75, z#76|#74, y#75, z#76] +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false == Analyzed Logical Plan == x: string, y: string, z: string Project [x#74, y#75, z#76|#74, y#75, z#76] +- Filter (y_n#77 = y) +- Deduplicate [x#74, y#75, z#76|#74, y#75, z#76] +- Project [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77] +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false == Optimized Logical Plan == Project [x#74, y#75, z#76|#74, y#75, z#76] +- Filter (isnotnull(y_n#77) && (y_n#77 = y)) +- Aggregate [x#74, y#75, z#76|#74, y#75, z#76], [x#74, y#75, z#76, first(y_n#77, false) AS y_n#77|#74, y#75, z#76, first(y_n#77, false) AS y_n#77] +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false == Physical Plan == *(3) Project [x#74, y#75, z#76|#74, y#75, z#76] +- *(3) Filter (isnotnull(y_n#77) && (y_n#77 = y)) +- SortAggregate(key=[x#74, y#75, z#76|#74, y#75, z#76], functions=[first(y_n#77, false)|#77, false)], output=[x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77]) +- *(2) Sort [x#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS FIRST|#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(x#74, y#75, z#76, 10) +- SortAggregate(key=[x#74, y#75, z#76|#74, y#75, z#76], functions=[partial_first(y_n#77, false)|#77, false)], output=[x#74, y#75, z#76, first#95, valueSet#96|#74, y#75, z#76, first#95, valueSet#96]) +- *(1) Sort [x#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS FIRST|#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS FIRST], false, 0 +- Scan ExistingRDD[x#74,y#75,z#76,y_n#77|#74,y#75,z#76,y_n#77] {panel} The second query. ie *"df.select('x','y','z').distinct().filter("y_n='y'").explain(True)"* should result in error rather than giving wrong output. was: The following code gives unexpected output due to the filter not getting pushed down in catalyst optimizer. {code:java} df = spark.createDataFrame([['a','123','12.3','n'],['a','123','12.3','y'],['a','123','12.4','y']],['x','y','z','y_n']) df.show(5)
[jira] [Updated] (SPARK-27213) Unexpected results when filter is used after distinct
[ https://issues.apache.org/jira/browse/SPARK-27213?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rinaz Belhaj updated SPARK-27213: - Description: The following code gives unexpected output due to the filter not getting pushed down in catalyst optimizer. {code:java} df = spark.createDataFrame([['a','123','12.3','n'],['a','123','12.3','y'],['a','123','12.4','y']],['x','y','z','y_n']) df.show(5) df.filter("y_n='y'").select('x','y','z').distinct().show() df.select('x','y','z').distinct().filter("y_n='y'").show() {code} {panel:title=Output} |x|y|z|y_n| |a|123|12.3|n| |a|123|12.3|y| |a|123|12.4|y| |x|y|z| |a|123|12.3| |a|123|12.4| |x|y|z| |a|123|12.4| {panel} Ideally, the second statement should result in an error since the column used in the filter is not present in the preceding select statement. But the catalyst optimizer is using first() on column y_n and then applying the filter. Even if the filter was pushed down, the result would have been accurate. {{df = spark.createDataFrame([['a','123','12.3','n'],['a','123','12.3','y'],['a','123','12.4','y']],['x','y','z','y_n'])}} {{df.filter("y_n='y'").select('x','y','z').distinct().explain(True)}} {{df.select('x','y','z').distinct().filter("y_n='y'").explain(True)}} {panel:title=Output} == Parsed Logical Plan == Deduplicate [x#74, y#75, z#76|#74, y#75, z#76] +- AnalysisBarrier +- Project [x#74, y#75, z#76|#74, y#75, z#76] +- Filter (y_n#77 = y) +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false == Analyzed Logical Plan == x: string, y: string, z: string Deduplicate [x#74, y#75, z#76|#74, y#75, z#76] +- Project [x#74, y#75, z#76|#74, y#75, z#76] +- Filter (y_n#77 = y) +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false == Optimized Logical Plan == Aggregate [x#74, y#75, z#76|#74, y#75, z#76], [x#74, y#75, z#76|#74, y#75, z#76] +- Project [x#74, y#75, z#76|#74, y#75, z#76] +- Filter (isnotnull(y_n#77) && (y_n#77 = y)) +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false == Physical Plan == *(2) HashAggregate(keys=[x#74, y#75, z#76|#74, y#75, z#76], functions=[], output=[x#74, y#75, z#76|#74, y#75, z#76]) +- Exchange hashpartitioning(x#74, y#75, z#76, 10) +- *(1) HashAggregate(keys=[x#74, y#75, z#76|#74, y#75, z#76], functions=[], output=[x#74, y#75, z#76|#74, y#75, z#76]) +- *(1) Project [x#74, y#75, z#76|#74, y#75, z#76] +- *(1) Filter (isnotnull(y_n#77) && (y_n#77 = y)) +- Scan ExistingRDD[x#74,y#75,z#76,y_n#77|#74,y#75,z#76,y_n#77] --- == Parsed Logical Plan == 'Filter ('y_n = y) +- AnalysisBarrier +- Deduplicate [x#74, y#75, z#76|#74, y#75, z#76] +- Project [x#74, y#75, z#76|#74, y#75, z#76] +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false == Analyzed Logical Plan == x: string, y: string, z: string Project [x#74, y#75, z#76|#74, y#75, z#76] +- Filter (y_n#77 = y) +- Deduplicate [x#74, y#75, z#76|#74, y#75, z#76] +- Project [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77] +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false == Optimized Logical Plan == Project [x#74, y#75, z#76|#74, y#75, z#76] +- Filter (isnotnull(y_n#77) && (y_n#77 = y)) +- Aggregate [x#74, y#75, z#76|#74, y#75, z#76], [x#74, y#75, z#76, first(y_n#77, false) AS y_n#77|#74, y#75, z#76, first(y_n#77, false) AS y_n#77] +- LogicalRDD [x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77], false == Physical Plan == *(3) Project [x#74, y#75, z#76|#74, y#75, z#76] +- *(3) Filter (isnotnull(y_n#77) && (y_n#77 = y)) +- SortAggregate(key=[x#74, y#75, z#76|#74, y#75, z#76], functions=[first(y_n#77, false)|#77, false)], output=[x#74, y#75, z#76, y_n#77|#74, y#75, z#76, y_n#77]) +- *(2) Sort [x#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS FIRST|#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(x#74, y#75, z#76, 10) +- SortAggregate(key=[x#74, y#75, z#76|#74, y#75, z#76], functions=[partial_first(y_n#77, false)|#77, false)], output=[x#74, y#75, z#76, first#95, valueSet#96|#74, y#75, z#76, first#95, valueSet#96]) +- *(1) Sort [x#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS FIRST|#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS FIRST], false, 0 +- Scan ExistingRDD[x#74,y#75,z#76,y_n#77|#74,y#75,z#76,y_n#77] {panel} The second query. ie *"df.select('x','y','z').distinct().filter("y_n='y'").explain(True)"* should result in error rather than giving wrong output. was: The following code gives unexpected output due to the filter not getting pushed down in catalyst optimizer. {code:java} df = spark.createDataFrame([['a','123','12.3','n'],['a','123','12.3','y'],['a','123','12.4','y']],['x','y','z','y_n'])
[jira] [Assigned] (SPARK-27200) History Environment tab must sort Configurations/Properties by default
[ https://issues.apache.org/jira/browse/SPARK-27200?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hyukjin Kwon reassigned SPARK-27200: Assignee: Ajith S > History Environment tab must sort Configurations/Properties by default > -- > > Key: SPARK-27200 > URL: https://issues.apache.org/jira/browse/SPARK-27200 > Project: Spark > Issue Type: Bug > Components: Web UI >Affects Versions: 3.0.0 >Reporter: Ajith S >Assignee: Ajith S >Priority: Minor > > Environment Page in SparkUI have all the configuration sorted by key. But > this is not the case in History server case, to keep UX same, we can have it > sorted in history server too -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-27200) History Environment tab must sort Configurations/Properties by default
[ https://issues.apache.org/jira/browse/SPARK-27200?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hyukjin Kwon resolved SPARK-27200. -- Resolution: Fixed Fix Version/s: 3.0.0 Issue resolved by pull request 24143 [https://github.com/apache/spark/pull/24143] > History Environment tab must sort Configurations/Properties by default > -- > > Key: SPARK-27200 > URL: https://issues.apache.org/jira/browse/SPARK-27200 > Project: Spark > Issue Type: Bug > Components: Web UI >Affects Versions: 3.0.0 >Reporter: Ajith S >Assignee: Ajith S >Priority: Minor > Fix For: 3.0.0 > > > Environment Page in SparkUI have all the configuration sorted by key. But > this is not the case in History server case, to keep UX same, we can have it > sorted in history server too -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-27213) Unexpected results due when filter is used after distinct
Rinaz Belhaj created SPARK-27213: Summary: Unexpected results due when filter is used after distinct Key: SPARK-27213 URL: https://issues.apache.org/jira/browse/SPARK-27213 Project: Spark Issue Type: Bug Components: PySpark Affects Versions: 2.4.0, 2.3.2 Reporter: Rinaz Belhaj The following code gives unexpected output due to the filter not getting pushed down in catalyst optimizer. {code:java} df = spark.createDataFrame([['a','123','12.3','n'],['a','123','12.3','y'],['a','123','12.4','y']],['x','y','z','y_n']) df.show(5) df.filter("y_n='y'").select('x','y','z').distinct().show() df.select('x','y','z').distinct().filter("y_n='y'").show() {code} {panel:title=Output} |x|y|z|y_n| |a|123|12.3|n| |a|123|12.3|y| |a|123|12.4|y| |x|y|z| |a|123|12.3| |a|123|12.4| |x|y|z| |a|123|12.4| {panel} Ideally, the second statement should result in an error since the column used in the filter is not present in the preceding select statement. But the catalyst optimizer is using first() on column y_n and then applying the filter. Even if the filter was pushed down, the result would have been accurate. {{df = spark.createDataFrame([['a','123','12.3','n'],['a','123','12.3','y'],['a','123','12.4','y']],['x','y','z','y_n'])}} {{df.filter("y_n='y'").select('x','y','z').distinct().explain(True)}} {{df.select('x','y','z').distinct().filter("y_n='y'").explain(True)}} {panel:title=Output} == Parsed Logical Plan == Deduplicate [x#74, y#75, z#76] +- AnalysisBarrier +- Project [x#74, y#75, z#76] +- Filter (y_n#77 = y) +- LogicalRDD [x#74, y#75, z#76, y_n#77], false == Analyzed Logical Plan == x: string, y: string, z: string Deduplicate [x#74, y#75, z#76] +- Project [x#74, y#75, z#76] +- Filter (y_n#77 = y) +- LogicalRDD [x#74, y#75, z#76, y_n#77], false == Optimized Logical Plan == Aggregate [x#74, y#75, z#76], [x#74, y#75, z#76] +- Project [x#74, y#75, z#76] +- Filter (isnotnull(y_n#77) && (y_n#77 = y)) +- LogicalRDD [x#74, y#75, z#76, y_n#77], false == Physical Plan == *(2) HashAggregate(keys=[x#74, y#75, z#76], functions=[], output=[x#74, y#75, z#76]) +- Exchange hashpartitioning(x#74, y#75, z#76, 10) +- *(1) HashAggregate(keys=[x#74, y#75, z#76], functions=[], output=[x#74, y#75, z#76]) +- *(1) Project [x#74, y#75, z#76] +- *(1) Filter (isnotnull(y_n#77) && (y_n#77 = y)) +- Scan ExistingRDD[x#74,y#75,z#76,y_n#77] --- == Parsed Logical Plan == 'Filter ('y_n = y) +- AnalysisBarrier +- Deduplicate [x#74, y#75, z#76] +- Project [x#74, y#75, z#76] +- LogicalRDD [x#74, y#75, z#76, y_n#77], false == Analyzed Logical Plan == x: string, y: string, z: string Project [x#74, y#75, z#76] +- Filter (y_n#77 = y) +- Deduplicate [x#74, y#75, z#76] +- Project [x#74, y#75, z#76, y_n#77] +- LogicalRDD [x#74, y#75, z#76, y_n#77], false == Optimized Logical Plan == Project [x#74, y#75, z#76] +- Filter (isnotnull(y_n#77) && (y_n#77 = y)) +- Aggregate [x#74, y#75, z#76], [x#74, y#75, z#76, first(y_n#77, false) AS y_n#77] +- LogicalRDD [x#74, y#75, z#76, y_n#77], false == Physical Plan == *(3) Project [x#74, y#75, z#76] +- *(3) Filter (isnotnull(y_n#77) && (y_n#77 = y)) +- SortAggregate(key=[x#74, y#75, z#76], functions=[first(y_n#77, false)], output=[x#74, y#75, z#76, y_n#77]) +- *(2) Sort [x#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(x#74, y#75, z#76, 10) +- SortAggregate(key=[x#74, y#75, z#76], functions=[partial_first(y_n#77, false)], output=[x#74, y#75, z#76, first#95, valueSet#96]) +- *(1) Sort [x#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS FIRST], false, 0 +- Scan ExistingRDD[x#74,y#75,z#76,y_n#77] {panel} The second query. ie *"df.select('x','y','z').distinct().filter("y_n='y'").explain(True)"* should result in error rather than giving wrong output. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-27213) Unexpected results when filter is used after distinct
[ https://issues.apache.org/jira/browse/SPARK-27213?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rinaz Belhaj updated SPARK-27213: - Summary: Unexpected results when filter is used after distinct (was: Unexpected results due when filter is used after distinct) > Unexpected results when filter is used after distinct > - > > Key: SPARK-27213 > URL: https://issues.apache.org/jira/browse/SPARK-27213 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.3.2, 2.4.0 >Reporter: Rinaz Belhaj >Priority: Minor > Labels: distinct, filter > > The following code gives unexpected output due to the filter not getting > pushed down in catalyst optimizer. > > {code:java} > df = > spark.createDataFrame([['a','123','12.3','n'],['a','123','12.3','y'],['a','123','12.4','y']],['x','y','z','y_n']) > df.show(5) > df.filter("y_n='y'").select('x','y','z').distinct().show() > df.select('x','y','z').distinct().filter("y_n='y'").show() > {code} > > > {panel:title=Output} > > |x|y|z|y_n| > |a|123|12.3|n| > |a|123|12.3|y| > |a|123|12.4|y| > |x|y|z| > |a|123|12.3| > |a|123|12.4| > |x|y|z| > |a|123|12.4| > > > {panel} > > Ideally, the second statement should result in an error since the column used > in the filter is not present in the preceding select statement. But the > catalyst optimizer is using first() on column y_n and then applying the > filter. > Even if the filter was pushed down, the result would have been accurate. > > {{df = > spark.createDataFrame([['a','123','12.3','n'],['a','123','12.3','y'],['a','123','12.4','y']],['x','y','z','y_n'])}} > {{df.filter("y_n='y'").select('x','y','z').distinct().explain(True)}} > {{df.select('x','y','z').distinct().filter("y_n='y'").explain(True)}} > > > > {panel:title=Output} > > == Parsed Logical Plan == > Deduplicate [x#74, y#75, z#76] > +- AnalysisBarrier > +- Project [x#74, y#75, z#76] > +- Filter (y_n#77 = y) > +- LogicalRDD [x#74, y#75, z#76, y_n#77], false > > == Analyzed Logical Plan == > x: string, y: string, z: string > Deduplicate [x#74, y#75, z#76] > +- Project [x#74, y#75, z#76] > +- Filter (y_n#77 = y) > +- LogicalRDD [x#74, y#75, z#76, y_n#77], false > > == Optimized Logical Plan == > Aggregate [x#74, y#75, z#76], [x#74, y#75, z#76] > +- Project [x#74, y#75, z#76] > +- Filter (isnotnull(y_n#77) && (y_n#77 = y)) > +- LogicalRDD [x#74, y#75, z#76, y_n#77], false > > == Physical Plan == > *(2) HashAggregate(keys=[x#74, y#75, z#76], functions=[], output=[x#74, y#75, > z#76]) > +- Exchange hashpartitioning(x#74, y#75, z#76, 10) > +- *(1) HashAggregate(keys=[x#74, y#75, z#76], functions=[], output=[x#74, > y#75, z#76]) > +- *(1) Project [x#74, y#75, z#76] > +- *(1) Filter (isnotnull(y_n#77) && (y_n#77 = y)) > +- Scan ExistingRDD[x#74,y#75,z#76,y_n#77] > > --- > > > == Parsed Logical Plan == > 'Filter ('y_n = y) > +- AnalysisBarrier > +- Deduplicate [x#74, y#75, z#76] > +- Project [x#74, y#75, z#76] > +- LogicalRDD [x#74, y#75, z#76, y_n#77], false > > == Analyzed Logical Plan == > x: string, y: string, z: string > Project [x#74, y#75, z#76] > +- Filter (y_n#77 = y) > +- Deduplicate [x#74, y#75, z#76] > +- Project [x#74, y#75, z#76, y_n#77] > +- LogicalRDD [x#74, y#75, z#76, y_n#77], false > > == Optimized Logical Plan == > Project [x#74, y#75, z#76] > +- Filter (isnotnull(y_n#77) && (y_n#77 = y)) > +- Aggregate [x#74, y#75, z#76], [x#74, y#75, z#76, first(y_n#77, false) AS > y_n#77] > +- LogicalRDD [x#74, y#75, z#76, y_n#77], false > > == Physical Plan == > *(3) Project [x#74, y#75, z#76] > +- *(3) Filter (isnotnull(y_n#77) && (y_n#77 = y)) > +- SortAggregate(key=[x#74, y#75, z#76], functions=[first(y_n#77, false)], > output=[x#74, y#75, z#76, y_n#77]) > +- *(2) Sort [x#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS > FIRST], false, 0 > +- Exchange hashpartitioning(x#74, y#75, z#76, 10) > +- SortAggregate(key=[x#74, y#75, z#76], functions=[partial_first(y_n#77, > false)], output=[x#74, y#75, z#76, first#95, valueSet#96]) > +- *(1) Sort [x#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS > FIRST], false, 0 > +- Scan ExistingRDD[x#74,y#75,z#76,y_n#77] > > {panel} > > > The second query. ie > *"df.select('x','y','z').distinct().filter("y_n='y'").explain(True)"* should > result in error rather than giving wrong output. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-27212) Eliminate TimeZone to ZoneId conversion in stringToTimestamp
[ https://issues.apache.org/jira/browse/SPARK-27212?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-27212: Assignee: (was: Apache Spark) > Eliminate TimeZone to ZoneId conversion in stringToTimestamp > > > Key: SPARK-27212 > URL: https://issues.apache.org/jira/browse/SPARK-27212 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0 >Reporter: Maxim Gekk >Priority: Minor > > The stringToTimestamp method of DateTimeUtils (and stringToDate as well) can > be called per each row. And the method converts TimeZone to ZoneId each time. > The operation is relatively expensive because it does intermediate conversion > to a string: > http://hg.openjdk.java.net/jdk8u/jdk8u/jdk/file/f940e7a48b72/src/share/classes/java/util/TimeZone.java#l547 > The conversion is unnecessary, and could be avoid. The ticket aims to replace > signature of stringToTimestamp to require ZoneId as a parameter. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-27212) Eliminate TimeZone to ZoneId conversion in stringToTimestamp
[ https://issues.apache.org/jira/browse/SPARK-27212?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-27212: Assignee: Apache Spark > Eliminate TimeZone to ZoneId conversion in stringToTimestamp > > > Key: SPARK-27212 > URL: https://issues.apache.org/jira/browse/SPARK-27212 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0 >Reporter: Maxim Gekk >Assignee: Apache Spark >Priority: Minor > > The stringToTimestamp method of DateTimeUtils (and stringToDate as well) can > be called per each row. And the method converts TimeZone to ZoneId each time. > The operation is relatively expensive because it does intermediate conversion > to a string: > http://hg.openjdk.java.net/jdk8u/jdk8u/jdk/file/f940e7a48b72/src/share/classes/java/util/TimeZone.java#l547 > The conversion is unnecessary, and could be avoid. The ticket aims to replace > signature of stringToTimestamp to require ZoneId as a parameter. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-27212) Eliminate TimeZone to ZoneId conversion in stringToTimestamp
Maxim Gekk created SPARK-27212: -- Summary: Eliminate TimeZone to ZoneId conversion in stringToTimestamp Key: SPARK-27212 URL: https://issues.apache.org/jira/browse/SPARK-27212 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 2.4.0 Reporter: Maxim Gekk The stringToTimestamp method of DateTimeUtils (and stringToDate as well) can be called per each row. And the method converts TimeZone to ZoneId each time. The operation is relatively expensive because it does intermediate conversion to a string: http://hg.openjdk.java.net/jdk8u/jdk8u/jdk/file/f940e7a48b72/src/share/classes/java/util/TimeZone.java#l547 The conversion is unnecessary, and could be avoid. The ticket aims to replace signature of stringToTimestamp to require ZoneId as a parameter. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-27169) number of active tasks is negative on executors page
[ https://issues.apache.org/jira/browse/SPARK-27169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16797040#comment-16797040 ] shahid commented on SPARK-27169: Hi, It seems, from the above log we can't say that event drop has happened or not. Could you please check in the driver log that "Dropping event from queue" phrase is there or not? > number of active tasks is negative on executors page > > > Key: SPARK-27169 > URL: https://issues.apache.org/jira/browse/SPARK-27169 > Project: Spark > Issue Type: Bug > Components: Web UI >Affects Versions: 2.3.2 >Reporter: acupple >Priority: Minor > Attachments: QQ20190315-102215.png, QQ20190315-102235.png, > image-2019-03-19-15-17-25-522.png, image-2019-03-19-15-21-03-766.png, > job_1924.log, stage_3511.log > > > I use spark to process some data in HDFS and HBASE, I use one thread consume > message from a queue, and then submit to a thread pool(16 fix size)for spark > processor. > But when run for some time, the active jobs will be thousands, and number of > active tasks are negative. > Actually, these jobs are already done when I check driver logs。 > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-27203) Spark Fails to read a view using CTE (WITH clause) and created via beeline
[ https://issues.apache.org/jira/browse/SPARK-27203?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16796958#comment-16796958 ] Igor Ngouagna commented on SPARK-27203: --- I thought "UNION ALL" was part of the issue here, but no.. if I remove it and just select from one table, I still got a compil error: {code:bash} Error: Error while compiling statement: FAILED: SemanticException line 1:593 missing ) at 'NULLS' near 'LAST' in subquery source line 1:599 missing ) at 'LAST' near 'LAST' in subquery source line 1:604 missing ) at 'ROWS' near 'LAST' in subquery source in definition of VIEW test_cte_view [ SELECT `gen_attr_0` AS `id`, `gen_attr_1` AS `status`, `gen_attr_2` AS `idate` FROM (SELECT `gen_attr_0`, `gen_attr_1`, `gen_attr_2` FROM (SELECT `gen_attr_0`, `gen_attr_1`, `gen_attr_2` FROM (SELECT `id` AS `gen_attr_0`, `status` AS `gen_attr_1`, `idate` AS `gen_attr_2` FROM `db`.`test_cte`) AS gen_subquery_0) AS cte INNER JOIN (SELECT `gen_attr_3`, `gen_attr_4`, `gen_attr_5` FROM (SELECT `gen_attr_3`, `gen_attr_4`, `gen_attr_5` FROM (SELECT gen_subquery_2.`gen_attr_3`, gen_subquery_2.`gen_attr_4`, row_number() OVER (PARTITION BY `gen_attr_3` ORDER BY `gen_attr_4` DESC NULLS LAST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS `gen_attr_5` FROM (SELECT `gen_attr_3`, `gen_attr_4` FROM (SELECT `gen_attr_3`, `gen_attr_6`, `gen_attr_4` FROM (SELECT `id` AS `gen_attr_3`, `status` AS `gen_attr_6`, `idate` AS `gen_attr_4` FROM `db`.`test_cte`) AS gen_subquery_1) AS cte) AS gen_subquery_2) AS gen_subquery_3) AS tmp WHERE (`gen_attr_5` = 1)) AS tmp_2 ON ((`gen_attr_0` = `gen_attr_3`) AND (`gen_attr_2` = `gen_attr_4`))) AS cte ] used as test_cte_view at Line 1:14 (state=42000,code=4) {code} > Spark Fails to read a view using CTE (WITH clause) and created via beeline > --- > > Key: SPARK-27203 > URL: https://issues.apache.org/jira/browse/SPARK-27203 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.1.1 >Reporter: Igor Ngouagna >Priority: Major > > Spark fails when trying to read a view which code involve CTE, and which is > created via beeline. > For example, considering the following view, created via Beeline: > {code:sql} > create view db.test as > with q1 as (select 1 as n) > select n from q1 > {code} > When you do > {code:java} > spark.sql("select * from db.test").show() > {code} > The output is like > {code} > 'Table or view not found: q1; line 2 pos 14' > Traceback (most recent call last): > File > "/DATA/fs11/hadoop/yarn/local/usercache/ingouagn/appcache/application_1552973526615_3878/container_e380_1552973526615_3878_01_01/pyspark.zip/pyspark/sql/session.py", > line 545, in sql > return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped) > File > "/DATA/fs11/hadoop/yarn/local/usercache/ingouagn/appcache/application_1552973526615_3878/container_e380_1552973526615_3878_01_01/py4j-0.10.4-src.zip/py4j/java_gateway.py", > line 1133, in __call__ > answer, self.gateway_client, self.target_id, self.name) > File > "/DATA/fs11/hadoop/yarn/local/usercache/ingouagn/appcache/application_1552973526615_3878/container_e380_1552973526615_3878_01_01/pyspark.zip/pyspark/sql/utils.py", > line 69, in deco > raise AnalysisException(s.split(': ', 1)[1], stackTrace) > pyspark.sql.utils.AnalysisException: 'Table or view not found: q1; line 2 pos > 14' > {code} > > *Spark: 2.1.1* > *Beeline: 1.2.1000* > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-27203) Spark Fails to read a view using CTE (WITH clause) and created via beeline
[ https://issues.apache.org/jira/browse/SPARK-27203?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16796946#comment-16796946 ] Igor Ngouagna commented on SPARK-27203: --- Furthermore, I notice something weird when i tried to create the view from spark. If the view code is basic (like the one above), everything works well. That is the view is readable from Spark, and from Beeline. However, if the view code is a little more complex, and the view is *created via spark.sql*, reading it from spark.sql is OK, but *reading it from beeline fails*! For example considering the following view created via Spark SQL: View Creation {code:sql} spark.sql("CREATE VIEW IF NOT EXISTS db.test_cte_view AS\ with cte as (select * from db.test_cte union all select * from db.test_cte_2),\ tmp as (SELECT id, idate, ROW_NUMBER() over(PARTITION BY id ORDER BY idate desc ) AS row_num from cte)\ SELECT cte.* from cte\ join (SELECT * from tmp where tmp.row_num =1) tmp_2\ on cte.id = tmp_2.id\ and cte.idate = tmp_2.idate") {code} When you do {code:sql} beeline> select * from db.test_cte_view; {code} the output is like {code} Error: Error while compiling statement: FAILED: SemanticException line 1:330 Failed to recognize predicate 'UNION'. Failed rule: 'identifier' in subquery source in definition of VIEW test_cte_view [ SELECT `gen_attr_0` AS `id`, `gen_attr_1` AS `status`, `gen_attr_2` AS `idate` FROM (SELECT `gen_attr_0`, `gen_attr_1`, `gen_attr_2` FROM ((SELECT `gen_attr_0`, `gen_attr_1`, `gen_attr_2` FROM (SELECT `id` AS `gen_attr_0`, `status` AS `gen_attr_1`, `idate` AS `gen_attr_2` FROM `db`.`test_cte`) AS gen_subquery_0) UNION ALL (SELECT `gen_attr_5`, `gen_attr_6`, `gen_attr_7` FROM (SELECT `id` AS `gen_attr_5`, `status` AS `gen_attr_6`, `idate` AS `gen_attr_7` FROM `db`.`test_cte_2`) AS gen_subquery_1)) AS cte INNER JOIN (SELECT `gen_attr_3`, `gen_attr_4`, `gen_attr_8` FROM (SELECT `gen_attr_3`, `gen_attr_4`, `gen_attr_8` FROM (SELECT gen_subquery_4.`gen_attr_3`, gen_subquery_4.`gen_attr_4`, row_number() OVER (PARTITION BY `gen_attr_3` ORDER BY `gen_attr_4` DESC NULLS LAST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS `gen_attr_8` FROM (SELECT `gen_attr_3`, `gen_attr_4` FROM ((SELECT `gen_attr_3`, `gen_attr_9`, `gen_attr_4` FROM (SELECT `id` AS `gen_attr_3`, `status` AS `gen_attr_9`, `idate` AS `gen_attr_4` FROM `db`.`test_cte`) AS gen_subquery_2) UNION ALL (SELECT `gen_attr_5`, `gen_attr_6`, `gen_attr_7` FROM (SELECT `id` AS `gen_attr_5`, `status` AS `gen_attr_6`, `idate` AS `gen_attr_7` FROM `db`.`test_cte_2`) AS gen_subquery_3)) AS cte) AS gen_subquery_4) AS gen_subquery_5) AS tmp WHERE (`gen_attr_8` = 1)) AS tmp_2 ON ((`gen_attr_0` = `gen_attr_3`) AND (`gen_attr_2` = `gen_attr_4`))) AS cte ] used as test_cte_view at Line 1:14 (state=42000,code=4) {code} *Tables for test*: {code:sql} CREATE TABLE db.test_cte( id int, status string, idate date ) CREATE TABLE db.test_cte_2( id int, status string, idate date ) {code} > Spark Fails to read a view using CTE (WITH clause) and created via beeline > --- > > Key: SPARK-27203 > URL: https://issues.apache.org/jira/browse/SPARK-27203 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.1.1 >Reporter: Igor Ngouagna >Priority: Major > > Spark fails when trying to read a view which code involve CTE, and which is > created via beeline. > For example, considering the following view, created via Beeline: > {code:sql} > create view db.test as > with q1 as (select 1 as n) > select n from q1 > {code} > When you do > {code:java} > spark.sql("select * from db.test").show() > {code} > The output is like > {code} > 'Table or view not found: q1; line 2 pos 14' > Traceback (most recent call last): > File > "/DATA/fs11/hadoop/yarn/local/usercache/ingouagn/appcache/application_1552973526615_3878/container_e380_1552973526615_3878_01_01/pyspark.zip/pyspark/sql/session.py", > line 545, in sql > return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped) > File > "/DATA/fs11/hadoop/yarn/local/usercache/ingouagn/appcache/application_1552973526615_3878/container_e380_1552973526615_3878_01_01/py4j-0.10.4-src.zip/py4j/java_gateway.py", > line 1133, in __call__ > answer, self.gateway_client, self.target_id, self.name) > File > "/DATA/fs11/hadoop/yarn/local/usercache/ingouagn/appcache/application_1552973526615_3878/container_e380_1552973526615_3878_01_01/pyspark.zip/pyspark/sql/utils.py", > line 69, in deco > raise AnalysisException(s.split(': ', 1)[1], stackTrace) > pyspark.sql.utils.AnalysisException: 'Table or view not found: q1; line 2 pos > 14' > {code} > > *Spark: 2.1.1* > *Beeline: 1.2.1000* > -- This message was sent
[jira] [Commented] (SPARK-26606) parameters passed in extraJavaOptions are not being picked up
[ https://issues.apache.org/jira/browse/SPARK-26606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16796924#comment-16796924 ] Mateusz Kaczor commented on SPARK-26606: I think I've come across similar issue (at least with the driver, works fine for executors in my case) I've even posted a question on stackoverflow: [https://stackoverflow.com/questions/55244273/spark-2-4-0-submit-in-cluster-mode-why-is-rest-submission-server-required] To sum up the problem: Spark version 2.4.0, *standalone* cluster. I'm submitting app using spark-submit, in all cases exactly the same script is used, just changing master port and deploy mode. I want to pass some extraJavaOptions to driver hence I'm using spark.driver.extraJavaOptions property (-- conf "spark.driver.extraJavaOptions=-Dfoo=BAR") I assume that variable was properly passed if it's listed in System Properties table in Environment tab of app UI (the one running on port 4040). Here is what I've observed: ||Deploy mode||Deploy to 7077 (regular way)||Deploy to 6066 (via REST)|| |Client|Variables are passed correctly|N/A| |Cluster|*{color:#ff}Variables are not passed{color}*|Variables are passed correctly| All in all, it looks to me that if we want to pass system variables in cluster mode *we have to* deploy via REST. I consider it a bug, please correct me if I'm wrong. > parameters passed in extraJavaOptions are not being picked up > -- > > Key: SPARK-26606 > URL: https://issues.apache.org/jira/browse/SPARK-26606 > Project: Spark > Issue Type: Bug > Components: Spark Submit >Affects Versions: 2.3.1 >Reporter: Ravindra >Priority: Major > Labels: java, spark > > driver.extraJavaOptions and executor.extraJavaOptions are not being picked up > . Even though I see the parameters are being passed fine in the spark launch > command I do not see these parameters are being picked up for some unknown > reason. My source code throws an error stating the java params are empty > > This is my spark submit command: > output=`spark-submit \ > --class com.demo.myApp.App \ > --conf 'spark.executor.extraJavaOptions=-Dapp.env=dev -Dapp.country=US > -Dapp.banner=ABC -Doracle.net.tns_admin=/work/artifacts/oracle/current > -Djava.security.egd=[file:/dev/./urandom|file:///dev/urandom]' \ > --conf 'spark.driver.extraJavaOptions=-Dapp.env=dev -Dapp.country=US > -Dapp.banner=ABC -Doracle.net.tns_admin=/work/artifacts/oracle/current > -Djava.security.egd=[file:/dev/./urandom|file:///dev/urandom]' \ > --executor-memory "$EXECUTOR_MEMORY" \ > --executor-cores "$EXECUTOR_CORES" \ > --total-executor-cores "$TOTAL_CORES" \ > --driver-memory "$DRIVER_MEMORY" \ > --deploy-mode cluster \ > /home/spark/asm//current/myapp-*.jar 2>&1 &` > > > Is there any other way I can access the java params with out using > extraJavaOptions. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-27099) Expose xxHash64 as a flexible 64-bit column hash like `hash`
[ https://issues.apache.org/jira/browse/SPARK-27099?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenchen Fan resolved SPARK-27099. - Resolution: Fixed Fix Version/s: 3.0.0 Issue resolved by pull request 24019 [https://github.com/apache/spark/pull/24019] > Expose xxHash64 as a flexible 64-bit column hash like `hash` > > > Key: SPARK-27099 > URL: https://issues.apache.org/jira/browse/SPARK-27099 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.3, 2.4.0 >Reporter: Huon Wilson >Assignee: Huon Wilson >Priority: Major > Fix For: 3.0.0 > > > I’m working on something that requires deterministic randomness, i.e. a row > gets the same “random” value no matter the order of the DataFrame. A seeded > hash seems to be the perfect way to do this, but the existing hashes have > various limitations: > - hash: 32-bit output (only 4 billion possibilities will result in a lot of > collisions for many tables: the birthday paradox implies >50% chance of at > least one for tables larger than 77000 rows, and likely ~1.6 billion > collisions in a table of size 4 billion) > - sha1/sha2/md5: single binary column input, string output > It seems there’s already support for a 64-bit hash function that can work > with an arbitrary number of arbitrary-typed columns (XxHash64), which could > be exposed as xxHash64 or xxhash64 (or similar). -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-27099) Expose xxHash64 as a flexible 64-bit column hash like `hash`
[ https://issues.apache.org/jira/browse/SPARK-27099?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenchen Fan reassigned SPARK-27099: --- Assignee: Huon Wilson > Expose xxHash64 as a flexible 64-bit column hash like `hash` > > > Key: SPARK-27099 > URL: https://issues.apache.org/jira/browse/SPARK-27099 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.3, 2.4.0 >Reporter: Huon Wilson >Assignee: Huon Wilson >Priority: Major > > I’m working on something that requires deterministic randomness, i.e. a row > gets the same “random” value no matter the order of the DataFrame. A seeded > hash seems to be the perfect way to do this, but the existing hashes have > various limitations: > - hash: 32-bit output (only 4 billion possibilities will result in a lot of > collisions for many tables: the birthday paradox implies >50% chance of at > least one for tables larger than 77000 rows, and likely ~1.6 billion > collisions in a table of size 4 billion) > - sha1/sha2/md5: single binary column input, string output > It seems there’s already support for a 64-bit hash function that can work > with an arbitrary number of arbitrary-typed columns (XxHash64), which could > be exposed as xxHash64 or xxhash64 (or similar). -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-27211) cast error when select column from Row
Guiju Zhang created SPARK-27211: --- Summary: cast error when select column from Row Key: SPARK-27211 URL: https://issues.apache.org/jira/browse/SPARK-27211 Project: Spark Issue Type: Question Components: Java API Affects Versions: 2.3.1, 2.3.0 Reporter: Guiju Zhang (1) RawLogPlayload has an field: long timestamp (2) extractedRawTc.printSchema(); // output1 Dataset extractedRawW3cFilled = extractedRawW3c.alias("extractedRawW3c") .join(extractedRawTc.alias("extractedRawTc"), functions.col("extractedRawW3c.rawsessionid").equalTo(functions.col("extractedRawTc.rawsessionid")), "inner") .select(functions.col("extractedRawW3c.df_logdatetime"), functions.col("extractedRawW3c.rawsessionid"), functions.col("extractedRawTc.uid"), functions.col("extractedRawW3c.time"),functions.col("extractedRawW3c.T"),functions.col("extractedRawW3c.url"),functions.col("extractedRawW3c.wid"), functions.col("extractedRawW3c.tid"), functions.col("extractedRawW3c.fid"),functions.col("extractedRawW3c.string1"), functions.col("extractedRawW3c.curWindow"), *functions.col("extractedRawW3c.timestamp")*) .as(Encoders.bean(RawLogPayload.class)); extractedRawW3cFilled.printSchema(); // output2 (4) cast exception 2019-03-20 15:28:31 ERROR CodeGenerator:91 ## failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 103, Column 32: No applicable constructor/method found for actual parameters "org.apache.spark.unsafe.types.UTF8String"; candidates are: "public void com.microsoft.datamining.spartan.api.core.RawLogPayload.setTimestamp(long)" org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 103, Column 32: *No applicable constructor/method found for actual parameters "org.apache.spark.unsafe.types.UTF8String"; candidates are: "public void com.****.****.spartan.api.core.RawLogPayload.setTimestamp(long)"* at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:11821) at org.codehaus.janino.UnitCompiler.findMostSpecificIInvocable(UnitCompiler.java:8910) Output1 extractedRawTc schema root |-- curWindow: string (nullable = true) |-- df_logdatetime: string (nullable = true) |-- fid: string (nullable = true) |-- rawsessionid: string (nullable = true) |-- string1: string (nullable = true) |-- t: string (nullable = true) |-- tid: string (nullable = true) |-- time: string (nullable = true) |-- *timestamp: long (nullable = true)* |-- uid: string (nullable = true) |-- url: string (nullable = true) |-- wid: string (nullable = true) Output2 extractedRawW3cFilled schema root |-- df_logdatetime: string (nullable = true) |-- rawsessionid: string (nullable = true) |-- uid: string (nullable = true) |-- time: string (nullable = true) |-- T: string (nullable = true) |-- url: string (nullable = true) |-- wid: string (nullable = true) |-- tid: string (nullable = true) |-- fid: string (nullable = true) |-- string1: string (nullable = true) |-- curWindow: string (nullable = true) |-- *timestamp: long (nullable = true)* -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-27211) cast error when select column from Row
[ https://issues.apache.org/jira/browse/SPARK-27211?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guiju Zhang updated SPARK-27211: Description: 1.First, I have an object RawLogPlayload which has an field: long timestamp 2.Then I try to join two Dataset and select some of the columns Following is the code Snippet extractedRawTc.printSchema(); // output1 Dataset extractedRawW3cFilled = extractedRawW3c.alias("extractedRawW3c") .join(extractedRawTc.alias("extractedRawTc"), functions.col("extractedRawW3c.rawsessionid").equalTo(functions.col("extractedRawTc.rawsessionid")), "inner") .select(functions.col("extractedRawW3c.df_logdatetime"), functions.col("extractedRawW3c.rawsessionid"), functions.col("extractedRawTc.uid"), functions.col("extractedRawW3c.time"),functions.col("extractedRawW3c.T"),functions.col("extractedRawW3c.url"),functions.col("extractedRawW3c.wid"), functions.col("extractedRawW3c.tid"), functions.col("extractedRawW3c.fid"),functions.col("extractedRawW3c.string1"), functions.col("extractedRawW3c.curWindow"), *functions.col("extractedRawW3c.timestamp")*) .as(Encoders.bean(RawLogPayload.class)); extractedRawW3cFilled.printSchema(); // output2 3. After run this, it will cast following exception 2019-03-20 15:28:31 ERROR CodeGenerator:91 ## failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 103, Column 32: No applicable constructor/method found for actual parameters "org.apache.spark.unsafe.types.UTF8String"; candidates are: "public void com.microsoft.datamining.spartan.api.core.RawLogPayload.setTimestamp(long)" org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 103, Column 32: *No applicable constructor/method found for actual parameters "org.apache.spark.unsafe.types.UTF8String"; candidates are: "public void com.****.****.spartan.api.core.RawLogPayload.setTimestamp(long)"* at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:11821) at org.codehaus.janino.UnitCompiler.findMostSpecificIInvocable(UnitCompiler.java:8910) Output1 extractedRawTc schema root |-- curWindow: string (nullable = true) |-- df_logdatetime: string (nullable = true) |-- fid: string (nullable = true) |-- rawsessionid: string (nullable = true) |-- string1: string (nullable = true) |-- t: string (nullable = true) |-- tid: string (nullable = true) |-- time: string (nullable = true) |-- *timestamp: long (nullable = true)* |-- uid: string (nullable = true) |-- url: string (nullable = true) |-- wid: string (nullable = true) Output2 extractedRawW3cFilled schema root |-- df_logdatetime: string (nullable = true) |-- rawsessionid: string (nullable = true) |-- uid: string (nullable = true) |-- time: string (nullable = true) |-- T: string (nullable = true) |-- url: string (nullable = true) |-- wid: string (nullable = true) |-- tid: string (nullable = true) |-- fid: string (nullable = true) |-- string1: string (nullable = true) |-- curWindow: string (nullable = true) |-- *timestamp: long (nullable = true)* My question: the schema of column timestamp is long, but from the exception log, it seems after selecting the datatype of timestamp becomes UTF8String, Why would this happen? Is it a bug? If not could you point how to use it correctly? Thanks was: First, I have an object RawLogPlayload which has an field: long timestamp Then I try to join two Dataset and select some of the columns Following is the code Snippet extractedRawTc.printSchema(); // output1 Dataset extractedRawW3cFilled = extractedRawW3c.alias("extractedRawW3c") .join(extractedRawTc.alias("extractedRawTc"), functions.col("extractedRawW3c.rawsessionid").equalTo(functions.col("extractedRawTc.rawsessionid")), "inner") .select(functions.col("extractedRawW3c.df_logdatetime"), functions.col("extractedRawW3c.rawsessionid"), functions.col("extractedRawTc.uid"), functions.col("extractedRawW3c.time"),functions.col("extractedRawW3c.T"),functions.col("extractedRawW3c.url"),functions.col("extractedRawW3c.wid"), functions.col("extractedRawW3c.tid"), functions.col("extractedRawW3c.fid"),functions.col("extractedRawW3c.string1"), functions.col("extractedRawW3c.curWindow"), *functions.col("extractedRawW3c.timestamp")*) .as(Encoders.bean(RawLogPayload.class)); extractedRawW3cFilled.printSchema(); // output2 After run this, it will cast following exception 2019-03-20 15:28:31 ERROR CodeGenerator:91 ## failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 103, Column 32: No applicable constructor/method found for actual parameters "org.apache.spark.unsafe.types.UTF8String"; candidates are: "public void com.microsoft.datamining.spartan.api.core.RawLogPayload.setTimestamp(long)" org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 103, Column 32:
[jira] [Updated] (SPARK-27211) cast error when select column from Row
[ https://issues.apache.org/jira/browse/SPARK-27211?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guiju Zhang updated SPARK-27211: Description: First, I have an object RawLogPlayload which has an field: long timestamp Then I try to join two Dataset and select some of the columns Following is the code Snippet extractedRawTc.printSchema(); // output1 Dataset extractedRawW3cFilled = extractedRawW3c.alias("extractedRawW3c") .join(extractedRawTc.alias("extractedRawTc"), functions.col("extractedRawW3c.rawsessionid").equalTo(functions.col("extractedRawTc.rawsessionid")), "inner") .select(functions.col("extractedRawW3c.df_logdatetime"), functions.col("extractedRawW3c.rawsessionid"), functions.col("extractedRawTc.uid"), functions.col("extractedRawW3c.time"),functions.col("extractedRawW3c.T"),functions.col("extractedRawW3c.url"),functions.col("extractedRawW3c.wid"), functions.col("extractedRawW3c.tid"), functions.col("extractedRawW3c.fid"),functions.col("extractedRawW3c.string1"), functions.col("extractedRawW3c.curWindow"), *functions.col("extractedRawW3c.timestamp")*) .as(Encoders.bean(RawLogPayload.class)); extractedRawW3cFilled.printSchema(); // output2 After run this, it will cast following exception 2019-03-20 15:28:31 ERROR CodeGenerator:91 ## failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 103, Column 32: No applicable constructor/method found for actual parameters "org.apache.spark.unsafe.types.UTF8String"; candidates are: "public void com.microsoft.datamining.spartan.api.core.RawLogPayload.setTimestamp(long)" org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 103, Column 32: *No applicable constructor/method found for actual parameters "org.apache.spark.unsafe.types.UTF8String"; candidates are: "public void com.****.****.spartan.api.core.RawLogPayload.setTimestamp(long)"* at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:11821) at org.codehaus.janino.UnitCompiler.findMostSpecificIInvocable(UnitCompiler.java:8910) Output1 extractedRawTc schema root |-- curWindow: string (nullable = true) |-- df_logdatetime: string (nullable = true) |-- fid: string (nullable = true) |-- rawsessionid: string (nullable = true) |-- string1: string (nullable = true) |-- t: string (nullable = true) |-- tid: string (nullable = true) |-- time: string (nullable = true) |-- *timestamp: long (nullable = true)* |-- uid: string (nullable = true) |-- url: string (nullable = true) |-- wid: string (nullable = true) Output2 extractedRawW3cFilled schema root |-- df_logdatetime: string (nullable = true) |-- rawsessionid: string (nullable = true) |-- uid: string (nullable = true) |-- time: string (nullable = true) |-- T: string (nullable = true) |-- url: string (nullable = true) |-- wid: string (nullable = true) |-- tid: string (nullable = true) |-- fid: string (nullable = true) My question: the schema of column timestamp is long, but from the exception log, it seems after selecting the datatype of timestamp becomes UTF8String, Why would this happen? Is it a bug? If not could you point how to use it correctly? Thanks |-- string1: string (nullable = true) |-- curWindow: string (nullable = true) |-- *timestamp: long (nullable = true)* was: (1) RawLogPlayload has an field: long timestamp (2) extractedRawTc.printSchema(); // output1 Dataset extractedRawW3cFilled = extractedRawW3c.alias("extractedRawW3c") .join(extractedRawTc.alias("extractedRawTc"), functions.col("extractedRawW3c.rawsessionid").equalTo(functions.col("extractedRawTc.rawsessionid")), "inner") .select(functions.col("extractedRawW3c.df_logdatetime"), functions.col("extractedRawW3c.rawsessionid"), functions.col("extractedRawTc.uid"), functions.col("extractedRawW3c.time"),functions.col("extractedRawW3c.T"),functions.col("extractedRawW3c.url"),functions.col("extractedRawW3c.wid"), functions.col("extractedRawW3c.tid"), functions.col("extractedRawW3c.fid"),functions.col("extractedRawW3c.string1"), functions.col("extractedRawW3c.curWindow"), *functions.col("extractedRawW3c.timestamp")*) .as(Encoders.bean(RawLogPayload.class)); extractedRawW3cFilled.printSchema(); // output2 (4) cast exception 2019-03-20 15:28:31 ERROR CodeGenerator:91 ## failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 103, Column 32: No applicable constructor/method found for actual parameters "org.apache.spark.unsafe.types.UTF8String"; candidates are: "public void com.microsoft.datamining.spartan.api.core.RawLogPayload.setTimestamp(long)" org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 103, Column 32: *No applicable constructor/method found for actual parameters "org.apache.spark.unsafe.types.UTF8String"; candidates are: "public void
[jira] [Assigned] (SPARK-27210) Cleanup incomplete output files in ManifestFileCommitProtocol if task is aborted
[ https://issues.apache.org/jira/browse/SPARK-27210?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-27210: Assignee: Apache Spark > Cleanup incomplete output files in ManifestFileCommitProtocol if task is > aborted > > > Key: SPARK-27210 > URL: https://issues.apache.org/jira/browse/SPARK-27210 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 3.0.0 >Reporter: Jungtaek Lim >Assignee: Apache Spark >Priority: Minor > > Unlike HadoopMapReduceCommitProtocol, ManifestFileCommitProtocol doesn't > clean up incomplete output files for both cases: task is aborted as well as > job is aborted. > In HadoopMapReduceCommitProtocol, it leverages stage directory to write > intermediate files so once job is aborted it can simply delete stage > directory to clean up everything. Even HadoopMapReduceCommitProtocol puts > more effort on cleaning up intermediate files on task side if task is aborted. > ManifestFileCommitProtocol doesn't do anything for cleaning up but just > maintains the metadata which list of complete output files are written. It > should be better if ManifestFileCommitProtocol can do the best effort to > clean up: not sure it can do job level cleanup since it doesn't leverage > stage directory, but it's clear that it can still put best effort to do task > level cleanup. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-27210) Cleanup incomplete output files in ManifestFileCommitProtocol if task is aborted
[ https://issues.apache.org/jira/browse/SPARK-27210?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-27210: Assignee: (was: Apache Spark) > Cleanup incomplete output files in ManifestFileCommitProtocol if task is > aborted > > > Key: SPARK-27210 > URL: https://issues.apache.org/jira/browse/SPARK-27210 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 3.0.0 >Reporter: Jungtaek Lim >Priority: Minor > > Unlike HadoopMapReduceCommitProtocol, ManifestFileCommitProtocol doesn't > clean up incomplete output files for both cases: task is aborted as well as > job is aborted. > In HadoopMapReduceCommitProtocol, it leverages stage directory to write > intermediate files so once job is aborted it can simply delete stage > directory to clean up everything. Even HadoopMapReduceCommitProtocol puts > more effort on cleaning up intermediate files on task side if task is aborted. > ManifestFileCommitProtocol doesn't do anything for cleaning up but just > maintains the metadata which list of complete output files are written. It > should be better if ManifestFileCommitProtocol can do the best effort to > clean up: not sure it can do job level cleanup since it doesn't leverage > stage directory, but it's clear that it can still put best effort to do task > level cleanup. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-27208) RestSubmissionClient only supports http
[ https://issues.apache.org/jira/browse/SPARK-27208?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16796871#comment-16796871 ] Jorge Machado commented on SPARK-27208: --- Furthermore I'm getting the next error: ./bin/spark-submit --class org.apache.spark.examples.SparkPi --master [mesos://host:5050/api] --deploy-mode cluster --conf spark.master.rest.enabled=true --total-executor-cores 4 --jars /home/machjor/spark-2.4.0-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.4.0.jar /home/machjor/spark-2.4.0-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.4.0.jar 10 2019-03-18 20:39:31 WARN NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 2019-03-18 20:39:31 INFO RestSubmissionClient:54 - Submitting a request to launch an application in [mesos://host:5050/api]. 2019-03-18 20:39:32 ERROR RestSubmissionClient:70 - Server responded with error: Some(Failed to validate master::Call: Expecting 'type' to be present) 2019-03-18 20:39:32 ERROR RestSubmissionClient:70 - Error: Server responded with message of unexpected type ErrorResponse. 2019-03-18 20:39:32 INFO ShutdownHookManager:54 - Shutdown hook called 2019-03-18 20:39:32 INFO ShutdownHookManager:54 - Deleting directory /tmp/spark-259c0e66-c2ab-43b4-90df- > RestSubmissionClient only supports http > --- > > Key: SPARK-27208 > URL: https://issues.apache.org/jira/browse/SPARK-27208 > Project: Spark > Issue Type: Bug > Components: Mesos >Affects Versions: 2.4.0 >Reporter: Jorge Machado >Priority: Minor > > As stand of now the class RestSubmissionClient does not support https, which > fails for example if we run mesos master with ssl and in cluster mode. > The spark-submit command fails with: Mesos cluster mode is only supported > through the REST submission API > > I create a PR for this which checks if the master endpoint given can speak > ssl before submitting the command. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-27208) RestSubmissionClient only supports http
[ https://issues.apache.org/jira/browse/SPARK-27208?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16796860#comment-16796860 ] Jorge Machado commented on SPARK-27208: --- I would like to take this. > RestSubmissionClient only supports http > --- > > Key: SPARK-27208 > URL: https://issues.apache.org/jira/browse/SPARK-27208 > Project: Spark > Issue Type: Bug > Components: Mesos >Affects Versions: 2.4.0 >Reporter: Jorge Machado >Priority: Minor > > As stand of now the class RestSubmissionClient does not support https, which > fails for example if we run mesos master with ssl and in cluster mode. > The spark-submit command fails with: Mesos cluster mode is only supported > through the REST submission API > > I create a PR for this which checks if the master endpoint given can speak > ssl before submitting the command. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-27208) RestSubmissionClient only supports http
[ https://issues.apache.org/jira/browse/SPARK-27208?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jorge Machado updated SPARK-27208: -- Shepherd: Sean Owen > RestSubmissionClient only supports http > --- > > Key: SPARK-27208 > URL: https://issues.apache.org/jira/browse/SPARK-27208 > Project: Spark > Issue Type: Bug > Components: Mesos >Affects Versions: 2.4.0 >Reporter: Jorge Machado >Priority: Minor > > As stand of now the class RestSubmissionClient does not support https, which > fails for example if we run mesos master with ssl and in cluster mode. > The spark-submit command fails with: Mesos cluster mode is only supported > through the REST submission API > > I create a PR for this which checks if the master endpoint given can speak > ssl before submitting the command. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-27210) Cleanup incomplete output files in ManifestFileCommitProtocol if task is aborted
Jungtaek Lim created SPARK-27210: Summary: Cleanup incomplete output files in ManifestFileCommitProtocol if task is aborted Key: SPARK-27210 URL: https://issues.apache.org/jira/browse/SPARK-27210 Project: Spark Issue Type: Improvement Components: Structured Streaming Affects Versions: 3.0.0 Reporter: Jungtaek Lim Unlike HadoopMapReduceCommitProtocol, ManifestFileCommitProtocol doesn't clean up incomplete output files for both cases: task is aborted as well as job is aborted. In HadoopMapReduceCommitProtocol, it leverages stage directory to write intermediate files so once job is aborted it can simply delete stage directory to clean up everything. Even HadoopMapReduceCommitProtocol puts more effort on cleaning up intermediate files on task side if task is aborted. ManifestFileCommitProtocol doesn't do anything for cleaning up but just maintains the metadata which list of complete output files are written. It should be better if ManifestFileCommitProtocol can do the best effort to clean up: not sure it can do job level cleanup since it doesn't leverage stage directory, but it's clear that it can still put best effort to do task level cleanup. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-27160) Incorrect Literal Casting of DecimalType in OrcFilters
[ https://issues.apache.org/jira/browse/SPARK-27160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sean Owen updated SPARK-27160: -- Fix Version/s: (was: 3.0.0) > Incorrect Literal Casting of DecimalType in OrcFilters > -- > > Key: SPARK-27160 > URL: https://issues.apache.org/jira/browse/SPARK-27160 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.0 >Reporter: Darcy Shen >Priority: Blocker > Labels: correctness > > DecimalType Literal should not be casted to Long. > eg. For `df.filter("x < 3.14")`, assuming df (x in DecimalType) reads from a > ORC table and uses the native ORC reader with predicate push down enabled, we > will push down the `x < 3.14` predicate to the ORC reader via a > SearchArgument. > OrcFilters will construct the SearchArgument, but not handle the DecimalType > correctly. > The previous impl will construct `x < 3` from `x < 3.14`. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-27160) Incorrect Literal Casting of DecimalType in OrcFilters
[ https://issues.apache.org/jira/browse/SPARK-27160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sean Owen updated SPARK-27160: -- Priority: Major (was: Blocker) > Incorrect Literal Casting of DecimalType in OrcFilters > -- > > Key: SPARK-27160 > URL: https://issues.apache.org/jira/browse/SPARK-27160 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.0 >Reporter: Darcy Shen >Priority: Major > Labels: correctness > > DecimalType Literal should not be casted to Long. > eg. For `df.filter("x < 3.14")`, assuming df (x in DecimalType) reads from a > ORC table and uses the native ORC reader with predicate push down enabled, we > will push down the `x < 3.14` predicate to the ORC reader via a > SearchArgument. > OrcFilters will construct the SearchArgument, but not handle the DecimalType > correctly. > The previous impl will construct `x < 3` from `x < 3.14`. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org