[jira] [Created] (SPARK-41637) ORDER BY ALL
Reynold Xin created SPARK-41637: --- Summary: ORDER BY ALL Key: SPARK-41637 URL: https://issues.apache.org/jira/browse/SPARK-41637 Project: Spark Issue Type: New Feature Components: SQL Affects Versions: 3.3.0 Reporter: Reynold Xin Assignee: Reynold Xin This patch adds ORDER BY ALL support to SQL. ORDER BY ALL is a syntactic sugar to sort the output by all the fields, from left to right. It also allows specifying asc/desc as well as null ordering. This was initially introduced by DuckDB. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-41635) GROUP BY ALL
[ https://issues.apache.org/jira/browse/SPARK-41635?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Reynold Xin updated SPARK-41635: Description: This patch implements GROUP BY ALL, similar to the one initially implemented in DuckDB. When specified, the analyzer automatically infers the grouping columns based on the expressions specified in the select clause: all expressions that don't include any aggregate expressions are pulled implicitly into the grouping columns. This avoids users having to specify individually the list of grouping columns in most cases. Examples: {noformat} select key, count, sum(score) from table group by all -- rewritten to select key, count, sum(score) from table group by key{noformat} was: This patch implements GROUP BY ALL, similar to the one initially implemented in DuckDB. When specified, the analyzer automatically infers the grouping columns based on the expressions specified in the select clause: all expressions that don't include any aggregate expressions are pulled implicitly into the grouping columns. This avoids users having to specify individually the list of grouping columns in most cases. Examples: {{select key, count(*), sum(score) from table group by all}} {{-- rewritten to}} {{select key, count(*), sum(score) from table group by key}} > GROUP BY ALL > > > Key: SPARK-41635 > URL: https://issues.apache.org/jira/browse/SPARK-41635 > Project: Spark > Issue Type: New Feature > Components: SQL >Affects Versions: 3.3.0 >Reporter: Reynold Xin >Assignee: Reynold Xin >Priority: Major > > This patch implements GROUP BY ALL, similar to the one initially implemented > in DuckDB. When specified, the analyzer automatically infers the grouping > columns based on the expressions specified in the select clause: all > expressions that don't include any aggregate expressions are pulled > implicitly into the grouping columns. This avoids users having to specify > individually the list of grouping columns in most cases. > Examples: > {noformat} > select key, count, sum(score) from table group by all > -- rewritten to > select key, count, sum(score) from table group by key{noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-41635) GROUP BY ALL
Reynold Xin created SPARK-41635: --- Summary: GROUP BY ALL Key: SPARK-41635 URL: https://issues.apache.org/jira/browse/SPARK-41635 Project: Spark Issue Type: New Feature Components: SQL Affects Versions: 3.3.0 Reporter: Reynold Xin Assignee: Reynold Xin This patch implements GROUP BY ALL, similar to the one initially implemented in DuckDB. When specified, the analyzer automatically infers the grouping columns based on the expressions specified in the select clause: all expressions that don't include any aggregate expressions are pulled implicitly into the grouping columns. This avoids users having to specify individually the list of grouping columns in most cases. Examples: {{select key, count(*), sum(score) from table group by all}} {{-- rewritten to}} {{select key, count(*), sum(score) from table group by key}} -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-41588) Make "Rule id not found" error message more actionable
Reynold Xin created SPARK-41588: --- Summary: Make "Rule id not found" error message more actionable Key: SPARK-41588 URL: https://issues.apache.org/jira/browse/SPARK-41588 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.3.0 Reporter: Reynold Xin Assignee: Reynold Xin It was super confusing to me when adding a new rule that I bumped into the rule id error. We should update the error message to make it more actionable, i.e. explaining to the developers which file to modify. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-36913) Implement createIndex and IndexExists in JDBC (MySQL dialect)
[ https://issues.apache.org/jira/browse/SPARK-36913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17430780#comment-17430780 ] Reynold Xin commented on SPARK-36913: - My concern is not about JDBC (I should've commented on the parent ticket). My concern is that there are *a lot* of RDBMS features and we can't possibly support all of them. It seems like we'd be much better off just having a generic fallback API to execute a command that's passed through by Spark to the underlying data source, and then the underlying data source can decide what to do. Otherwise we will have to add create index, define foreign key, define sequence objects, and 50 other DDL commands in Spark. > Implement createIndex and IndexExists in JDBC (MySQL dialect) > - > > Key: SPARK-36913 > URL: https://issues.apache.org/jira/browse/SPARK-36913 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 3.3.0 >Reporter: Huaxin Gao >Assignee: Huaxin Gao >Priority: Major > Fix For: 3.3.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-36913) Implement createIndex and IndexExists in JDBC (MySQL dialect)
[ https://issues.apache.org/jira/browse/SPARK-36913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17430324#comment-17430324 ] Reynold Xin commented on SPARK-36913: - I'm actually a bit worried about adding stuff like this to Spark. There are so many different types of indexes out there (and on top of that a lot of other common database features, e.g. create primary key, foreign key, sequence objects). Spark shouldn't and can't become a layer for doing database management across all database features. Are there concrete use cases for create index that we have seen? > Implement createIndex and IndexExists in JDBC (MySQL dialect) > - > > Key: SPARK-36913 > URL: https://issues.apache.org/jira/browse/SPARK-36913 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 3.3.0 >Reporter: Huaxin Gao >Assignee: Huaxin Gao >Priority: Major > Fix For: 3.3.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-34198) Add RocksDB StateStore as external module
[ https://issues.apache.org/jira/browse/SPARK-34198?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17284479#comment-17284479 ] Reynold Xin edited comment on SPARK-34198 at 2/14/21, 6:59 PM: --- I don't know the intricate details of it but I suspect it's a different one with much more features because it existed long before those two. was (Author: rxin): I don't know the intricate details of it but I suspect it's a different one because it existed long before those two. > Add RocksDB StateStore as external module > - > > Key: SPARK-34198 > URL: https://issues.apache.org/jira/browse/SPARK-34198 > Project: Spark > Issue Type: New Feature > Components: Structured Streaming >Affects Versions: 3.2.0 >Reporter: L. C. Hsieh >Assignee: L. C. Hsieh >Priority: Major > > Currently Spark SS only has one built-in StateStore implementation > HDFSBackedStateStore. Actually it uses in-memory map to store state rows. As > there are more and more streaming applications, some of them requires to use > large state in stateful operations such as streaming aggregation and join. > Several other major streaming frameworks already use RocksDB for state > management. So it is proven to be good choice for large state usage. But > Spark SS still lacks of a built-in state store for the requirement. > We would like to explore the possibility to add RocksDB-based StateStore into > Spark SS. For the concern about adding RocksDB as a direct dependency, our > plan is to add this StateStore as an external module first. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-34198) Add RocksDB StateStore as external module
[ https://issues.apache.org/jira/browse/SPARK-34198?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17284479#comment-17284479 ] Reynold Xin commented on SPARK-34198: - I don't know the intricate details of it but I suspect it's a different one because it existed long before those two. > Add RocksDB StateStore as external module > - > > Key: SPARK-34198 > URL: https://issues.apache.org/jira/browse/SPARK-34198 > Project: Spark > Issue Type: New Feature > Components: Structured Streaming >Affects Versions: 3.2.0 >Reporter: L. C. Hsieh >Assignee: L. C. Hsieh >Priority: Major > > Currently Spark SS only has one built-in StateStore implementation > HDFSBackedStateStore. Actually it uses in-memory map to store state rows. As > there are more and more streaming applications, some of them requires to use > large state in stateful operations such as streaming aggregation and join. > Several other major streaming frameworks already use RocksDB for state > management. So it is proven to be good choice for large state usage. But > Spark SS still lacks of a built-in state store for the requirement. > We would like to explore the possibility to add RocksDB-based StateStore into > Spark SS. For the concern about adding RocksDB as a direct dependency, our > plan is to add this StateStore as an external module first. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-34198) Add RocksDB StateStore as external module
[ https://issues.apache.org/jira/browse/SPARK-34198?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17284474#comment-17284474 ] Reynold Xin commented on SPARK-34198: - [~kabhwan] let me talk to the team that built our internal version of that on whether it'd make sense. > Add RocksDB StateStore as external module > - > > Key: SPARK-34198 > URL: https://issues.apache.org/jira/browse/SPARK-34198 > Project: Spark > Issue Type: New Feature > Components: Structured Streaming >Affects Versions: 3.2.0 >Reporter: L. C. Hsieh >Assignee: L. C. Hsieh >Priority: Major > > Currently Spark SS only has one built-in StateStore implementation > HDFSBackedStateStore. Actually it uses in-memory map to store state rows. As > there are more and more streaming applications, some of them requires to use > large state in stateful operations such as streaming aggregation and join. > Several other major streaming frameworks already use RocksDB for state > management. So it is proven to be good choice for large state usage. But > Spark SS still lacks of a built-in state store for the requirement. > We would like to explore the possibility to add RocksDB-based StateStore into > Spark SS. For the concern about adding RocksDB as a direct dependency, our > plan is to add this StateStore as an external module first. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-31399) closure cleaner is broken in Spark 3.0
[ https://issues.apache.org/jira/browse/SPARK-31399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17080265#comment-17080265 ] Reynold Xin commented on SPARK-31399: - This is bad... [~sowen] and [~joshrosen] did you look into this in the past? > closure cleaner is broken in Spark 3.0 > -- > > Key: SPARK-31399 > URL: https://issues.apache.org/jira/browse/SPARK-31399 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.0.0 >Reporter: Wenchen Fan >Priority: Blocker > > The `ClosureCleaner` only support Scala functions and it uses the following > check to catch closures > {code} > // Check whether a class represents a Scala closure > private def isClosure(cls: Class[_]): Boolean = { > cls.getName.contains("$anonfun$") > } > {code} > This doesn't work in 3.0 any more as we upgrade to Scala 2.12 and most Scala > functions become Java lambdas. > As an example, the following code works well in Spark 2.4 Spark Shell: > {code} > scala> :pa > // Entering paste mode (ctrl-D to finish) > import org.apache.spark.sql.functions.lit > case class Foo(id: String) > val col = lit("123") > val df = sc.range(0,10,1,1).map { _ => Foo("") } > // Exiting paste mode, now interpreting. > import org.apache.spark.sql.functions.lit > defined class Foo > col: org.apache.spark.sql.Column = 123 > df: org.apache.spark.rdd.RDD[Foo] = MapPartitionsRDD[5] at map at :20 > {code} > But fails in 3.0 > {code} > scala> :pa > // Entering paste mode (ctrl-D to finish) > import org.apache.spark.sql.functions.lit > case class Foo(id: String) > val col = lit("123") > val df = sc.range(0,10,1,1).map { _ => Foo("") } > // Exiting paste mode, now interpreting. > org.apache.spark.SparkException: Task not serializable > at > org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:396) > at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:386) > at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159) > at org.apache.spark.SparkContext.clean(SparkContext.scala:2371) > at org.apache.spark.rdd.RDD.$anonfun$map$1(RDD.scala:422) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:414) > at org.apache.spark.rdd.RDD.map(RDD.scala:421) > ... 39 elided > Caused by: java.io.NotSerializableException: org.apache.spark.sql.Column > Serialization stack: > - object not serializable (class: org.apache.spark.sql.Column, value: > 123) > - field (class: $iw, name: col, type: class org.apache.spark.sql.Column) > - object (class $iw, $iw@2d87ac2b) > - element of array (index: 0) > - array (class [Ljava.lang.Object;, size 1) > - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, > type: class [Ljava.lang.Object;) > - object (class java.lang.invoke.SerializedLambda, > SerializedLambda[capturingClass=class $iw, > functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, > implementation=invokeStatic > $anonfun$df$1$adapted:(L$iw;Ljava/lang/Object;)LFoo;, > instantiatedMethodType=(Ljava/lang/Object;)LFoo;, numCaptured=1]) > - writeReplace data (class: java.lang.invoke.SerializedLambda) > - object (class $Lambda$2438/170049100, $Lambda$2438/170049100@d6b8c43) > at > org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41) > at > org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) > at > org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101) > at > org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:393) > ... 47 more > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22231) Support of map, filter, withColumn, dropColumn in nested list of structures
[ https://issues.apache.org/jira/browse/SPARK-22231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17070688#comment-17070688 ] Reynold Xin commented on SPARK-22231: - [~fqaiser94] thanks for your persistence and my apologies for the delay. You have my buy-in. This is a great idea. > Support of map, filter, withColumn, dropColumn in nested list of structures > --- > > Key: SPARK-22231 > URL: https://issues.apache.org/jira/browse/SPARK-22231 > Project: Spark > Issue Type: New Feature > Components: SQL >Affects Versions: 2.2.0 >Reporter: DB Tsai >Assignee: Jeremy Smith >Priority: Major > > At Netflix's algorithm team, we work on ranking problems to find the great > content to fulfill the unique tastes of our members. Before building a > recommendation algorithms, we need to prepare the training, testing, and > validation datasets in Apache Spark. Due to the nature of ranking problems, > we have a nested list of items to be ranked in one column, and the top level > is the contexts describing the setting for where a model is to be used (e.g. > profiles, country, time, device, etc.) Here is a blog post describing the > details, [Distributed Time Travel for Feature > Generation|https://medium.com/netflix-techblog/distributed-time-travel-for-feature-generation-389cccdd3907]. > > To be more concrete, for the ranks of videos for a given profile_id at a > given country, our data schema can be looked like this, > {code:java} > root > |-- profile_id: long (nullable = true) > |-- country_iso_code: string (nullable = true) > |-- items: array (nullable = false) > ||-- element: struct (containsNull = false) > |||-- title_id: integer (nullable = true) > |||-- scores: double (nullable = true) > ... > {code} > We oftentimes need to work on the nested list of structs by applying some > functions on them. Sometimes, we're dropping or adding new columns in the > nested list of structs. Currently, there is no easy solution in open source > Apache Spark to perform those operations using SQL primitives; many people > just convert the data into RDD to work on the nested level of data, and then > reconstruct the new dataframe as workaround. This is extremely inefficient > because all the optimizations like predicate pushdown in SQL can not be > performed, we can not leverage on the columnar format, and the serialization > and deserialization cost becomes really huge even we just want to add a new > column in the nested level. > We built a solution internally at Netflix which we're very happy with. We > plan to make it open source in Spark upstream. We would like to socialize the > API design to see if we miss any use-case. > The first API we added is *mapItems* on dataframe which take a function from > *Column* to *Column*, and then apply the function on nested dataframe. Here > is an example, > {code:java} > case class Data(foo: Int, bar: Double, items: Seq[Double]) > val df: Dataset[Data] = spark.createDataset(Seq( > Data(10, 10.0, Seq(10.1, 10.2, 10.3, 10.4)), > Data(20, 20.0, Seq(20.1, 20.2, 20.3, 20.4)) > )) > val result = df.mapItems("items") { > item => item * 2.0 > } > result.printSchema() > // root > // |-- foo: integer (nullable = false) > // |-- bar: double (nullable = false) > // |-- items: array (nullable = true) > // ||-- element: double (containsNull = true) > result.show() > // +---+++ > // |foo| bar| items| > // +---+++ > // | 10|10.0|[20.2, 20.4, 20.6...| > // | 20|20.0|[40.2, 40.4, 40.6...| > // +---+++ > {code} > Now, with the ability of applying a function in the nested dataframe, we can > add a new function, *withColumn* in *Column* to add or replace the existing > column that has the same name in the nested list of struct. Here is two > examples demonstrating the API together with *mapItems*; the first one > replaces the existing column, > {code:java} > case class Item(a: Int, b: Double) > case class Data(foo: Int, bar: Double, items: Seq[Item]) > val df: Dataset[Data] = spark.createDataset(Seq( > Data(10, 10.0, Seq(Item(10, 10.0), Item(11, 11.0))), > Data(20, 20.0, Seq(Item(20, 20.0), Item(21, 21.0))) > )) > val result = df.mapItems("items") { > item => item.withColumn(item("b") + 1 as "b") > } > result.printSchema > root > // |-- foo: integer (nullable = false) > // |-- bar: double (nullable = false) > // |-- items: array (nullable = true) > // ||-- element: struct (containsNull = true) > // |||-- a: integer (nullable = true) > // |||-- b: double (nullable = true) > result.show(false) > // +---++--+ > // |foo|bar |items | > // +---+-
[jira] [Commented] (SPARK-25728) SPIP: Structured Intermediate Representation (Tungsten IR) for generating Java code
[ https://issues.apache.org/jira/browse/SPARK-25728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17061916#comment-17061916 ] Reynold Xin commented on SPARK-25728: - It's too big of a change and realistically speaking probably only a few people in the world that can do this well. I'm going to close it. > SPIP: Structured Intermediate Representation (Tungsten IR) for generating > Java code > --- > > Key: SPARK-25728 > URL: https://issues.apache.org/jira/browse/SPARK-25728 > Project: Spark > Issue Type: New Feature > Components: SQL >Affects Versions: 3.1.0 >Reporter: Kazuaki Ishizaki >Priority: Major > > This JIRA entry is to start a discussion about adding structure intermediate > representation for generating Java code from a program using DataFrame or > Dataset API, in addition to the current String-based representation. > This addition is based on the discussions in [a > thread|https://github.com/apache/spark/pull/21537#issuecomment-413268196]. > Please feel free to comment on this JIRA entry or [Google > Doc|https://docs.google.com/document/d/1Jzf56bxpMpSwsGV_hSzl9wQG22hyI731McQcjognqxY/edit?usp=sharing], > too. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-25728) SPIP: Structured Intermediate Representation (Tungsten IR) for generating Java code
[ https://issues.apache.org/jira/browse/SPARK-25728?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Reynold Xin resolved SPARK-25728. - Resolution: Won't Fix > SPIP: Structured Intermediate Representation (Tungsten IR) for generating > Java code > --- > > Key: SPARK-25728 > URL: https://issues.apache.org/jira/browse/SPARK-25728 > Project: Spark > Issue Type: New Feature > Components: SQL >Affects Versions: 3.1.0 >Reporter: Kazuaki Ishizaki >Priority: Major > > This JIRA entry is to start a discussion about adding structure intermediate > representation for generating Java code from a program using DataFrame or > Dataset API, in addition to the current String-based representation. > This addition is based on the discussions in [a > thread|https://github.com/apache/spark/pull/21537#issuecomment-413268196]. > Please feel free to comment on this JIRA entry or [Google > Doc|https://docs.google.com/document/d/1Jzf56bxpMpSwsGV_hSzl9wQG22hyI731McQcjognqxY/edit?usp=sharing], > too. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-29189) Add an option to ignore block locations when listing file
[ https://issues.apache.org/jira/browse/SPARK-29189?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17023340#comment-17023340 ] Reynold Xin commented on SPARK-29189: - This is great, but how would users know when to set this? Shouldn't we do a slight incremental improvement to just automatically detect the common object stores and disable locality check? > Add an option to ignore block locations when listing file > - > > Key: SPARK-29189 > URL: https://issues.apache.org/jira/browse/SPARK-29189 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.0.0 >Reporter: Wang, Gang >Assignee: Wang, Gang >Priority: Major > Fix For: 3.0.0 > > > In our PROD env, we have a pure Spark cluster, I think this is also pretty > common, where computation is separated from storage layer. In such deploy > mode, data locality is never reachable. > And there are some configurations in Spark scheduler to reduce waiting time > for data locality(e.g. "spark.locality.wait"). While, problem is that, in > listing file phase, the location informations of all the files, with all the > blocks inside each file, are all fetched from the distributed file system. > Actually, in a PROD environment, a table can be so huge that even fetching > all these location informations need take tens of seconds. > To improve such scenario, Spark need provide an option, where data locality > can be totally ignored, all we need in the listing file phase are the files > locations, without any block location informations. > > And we made a benchmark in our PROD env, after ignore the block locations, we > got a pretty huge improvement. > ||Table Size||Total File Number||Total Block Number||List File Duration(With > Block Location)||List File Duration(Without Block Location)|| > |22.6T|3|12|16.841s|1.730s| > |28.8 T|42001|148964|10.099s|2.858s| > |3.4 T|2| 2|5.833s|4.881s| > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-27117) current_date/current_timestamp should be reserved keywords in ansi parser mode
[ https://issues.apache.org/jira/browse/SPARK-27117?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17023317#comment-17023317 ] Reynold Xin commented on SPARK-27117: - I changed the title to make it more clear to end users what's happening. > current_date/current_timestamp should be reserved keywords in ansi parser mode > -- > > Key: SPARK-27117 > URL: https://issues.apache.org/jira/browse/SPARK-27117 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.0.0 >Reporter: Wenchen Fan >Assignee: Wenchen Fan >Priority: Major > Fix For: 3.0.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-27117) current_date/current_timestamp should be reserved keywords in ansi parser mode
[ https://issues.apache.org/jira/browse/SPARK-27117?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Reynold Xin updated SPARK-27117: Summary: current_date/current_timestamp should be reserved keywords in ansi parser mode (was: current_date/current_timestamp should not refer to columns with ansi parser mode) > current_date/current_timestamp should be reserved keywords in ansi parser mode > -- > > Key: SPARK-27117 > URL: https://issues.apache.org/jira/browse/SPARK-27117 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.0.0 >Reporter: Wenchen Fan >Assignee: Wenchen Fan >Priority: Major > Fix For: 3.0.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-29175) Make maven central repository in IsolatedClientLoader configurable
[ https://issues.apache.org/jira/browse/SPARK-29175?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17021586#comment-17021586 ] Reynold Xin commented on SPARK-29175: - I think the config should be more clear, e.g. "spark.sql.maven.additionalRemoteRepositories". > Make maven central repository in IsolatedClientLoader configurable > -- > > Key: SPARK-29175 > URL: https://issues.apache.org/jira/browse/SPARK-29175 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.0.0 >Reporter: Yuanjian Li >Assignee: Yuanjian Li >Priority: Major > Fix For: 3.0.0 > > > We need to connect a central repository in IsolatedClientLoader for > downloading Hive jars. Here we added a new config > `spark.sql.additionalRemoteRepositories`, a comma-delimited string config of > the optional additional remote maven mirror repositories, it can be used as > the additional remote repositories for the default maven central repo. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-27871) LambdaVariable should use per-query unique IDs instead of globally unique IDs
[ https://issues.apache.org/jira/browse/SPARK-27871?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17021583#comment-17021583 ] Reynold Xin commented on SPARK-27871: - Why would anybody want to turn this off? I feel like this should be an internal config. > LambdaVariable should use per-query unique IDs instead of globally unique IDs > - > > Key: SPARK-27871 > URL: https://issues.apache.org/jira/browse/SPARK-27871 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.0.0 >Reporter: Wenchen Fan >Assignee: Wenchen Fan >Priority: Major > Fix For: 3.0.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22231) Support of map, filter, withColumn, dropColumn in nested list of structures
[ https://issues.apache.org/jira/browse/SPARK-22231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17015705#comment-17015705 ] Reynold Xin commented on SPARK-22231: - Hey sorry. Been pretty busy. I will take a look this week. > Support of map, filter, withColumn, dropColumn in nested list of structures > --- > > Key: SPARK-22231 > URL: https://issues.apache.org/jira/browse/SPARK-22231 > Project: Spark > Issue Type: New Feature > Components: SQL >Affects Versions: 2.2.0 >Reporter: DB Tsai >Assignee: Jeremy Smith >Priority: Major > > At Netflix's algorithm team, we work on ranking problems to find the great > content to fulfill the unique tastes of our members. Before building a > recommendation algorithms, we need to prepare the training, testing, and > validation datasets in Apache Spark. Due to the nature of ranking problems, > we have a nested list of items to be ranked in one column, and the top level > is the contexts describing the setting for where a model is to be used (e.g. > profiles, country, time, device, etc.) Here is a blog post describing the > details, [Distributed Time Travel for Feature > Generation|https://medium.com/netflix-techblog/distributed-time-travel-for-feature-generation-389cccdd3907]. > > To be more concrete, for the ranks of videos for a given profile_id at a > given country, our data schema can be looked like this, > {code:java} > root > |-- profile_id: long (nullable = true) > |-- country_iso_code: string (nullable = true) > |-- items: array (nullable = false) > ||-- element: struct (containsNull = false) > |||-- title_id: integer (nullable = true) > |||-- scores: double (nullable = true) > ... > {code} > We oftentimes need to work on the nested list of structs by applying some > functions on them. Sometimes, we're dropping or adding new columns in the > nested list of structs. Currently, there is no easy solution in open source > Apache Spark to perform those operations using SQL primitives; many people > just convert the data into RDD to work on the nested level of data, and then > reconstruct the new dataframe as workaround. This is extremely inefficient > because all the optimizations like predicate pushdown in SQL can not be > performed, we can not leverage on the columnar format, and the serialization > and deserialization cost becomes really huge even we just want to add a new > column in the nested level. > We built a solution internally at Netflix which we're very happy with. We > plan to make it open source in Spark upstream. We would like to socialize the > API design to see if we miss any use-case. > The first API we added is *mapItems* on dataframe which take a function from > *Column* to *Column*, and then apply the function on nested dataframe. Here > is an example, > {code:java} > case class Data(foo: Int, bar: Double, items: Seq[Double]) > val df: Dataset[Data] = spark.createDataset(Seq( > Data(10, 10.0, Seq(10.1, 10.2, 10.3, 10.4)), > Data(20, 20.0, Seq(20.1, 20.2, 20.3, 20.4)) > )) > val result = df.mapItems("items") { > item => item * 2.0 > } > result.printSchema() > // root > // |-- foo: integer (nullable = false) > // |-- bar: double (nullable = false) > // |-- items: array (nullable = true) > // ||-- element: double (containsNull = true) > result.show() > // +---+++ > // |foo| bar| items| > // +---+++ > // | 10|10.0|[20.2, 20.4, 20.6...| > // | 20|20.0|[40.2, 40.4, 40.6...| > // +---+++ > {code} > Now, with the ability of applying a function in the nested dataframe, we can > add a new function, *withColumn* in *Column* to add or replace the existing > column that has the same name in the nested list of struct. Here is two > examples demonstrating the API together with *mapItems*; the first one > replaces the existing column, > {code:java} > case class Item(a: Int, b: Double) > case class Data(foo: Int, bar: Double, items: Seq[Item]) > val df: Dataset[Data] = spark.createDataset(Seq( > Data(10, 10.0, Seq(Item(10, 10.0), Item(11, 11.0))), > Data(20, 20.0, Seq(Item(20, 20.0), Item(21, 21.0))) > )) > val result = df.mapItems("items") { > item => item.withColumn(item("b") + 1 as "b") > } > result.printSchema > root > // |-- foo: integer (nullable = false) > // |-- bar: double (nullable = false) > // |-- items: array (nullable = true) > // ||-- element: struct (containsNull = true) > // |||-- a: integer (nullable = true) > // |||-- b: double (nullable = true) > result.show(false) > // +---++--+ > // |foo|bar |items | > // +---++--+ > // |10 |10.0|[[10,11.0], [11,1
[jira] [Comment Edited] (SPARK-22231) Support of map, filter, withColumn, dropColumn in nested list of structures
[ https://issues.apache.org/jira/browse/SPARK-22231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17007215#comment-17007215 ] Reynold Xin edited comment on SPARK-22231 at 1/3/20 4:18 AM: - [~fqaiser94] you convinced me with #2. It'd be very verbose if we only allow DataFrame.withColumnRenamed to modify nested fields and no new methods in Column. #1 isn't really a problem because DataFrame.withColumnRenamed should be able to handle both top level field and struct fields as well. Another question: can withField modify a nested field itself? was (Author: rxin): [~fqaiser94] you convinced me with #2. It'd be very verbose if we only allow DataFrame.withColumnRenamed to modify nested fields and no new methods in Column. #1 isn't really a problem because DataFrame.withColumnRenamed should be able to handle both top level field and struct fields as well. > Support of map, filter, withColumn, dropColumn in nested list of structures > --- > > Key: SPARK-22231 > URL: https://issues.apache.org/jira/browse/SPARK-22231 > Project: Spark > Issue Type: New Feature > Components: SQL >Affects Versions: 2.2.0 >Reporter: DB Tsai >Assignee: Jeremy Smith >Priority: Major > > At Netflix's algorithm team, we work on ranking problems to find the great > content to fulfill the unique tastes of our members. Before building a > recommendation algorithms, we need to prepare the training, testing, and > validation datasets in Apache Spark. Due to the nature of ranking problems, > we have a nested list of items to be ranked in one column, and the top level > is the contexts describing the setting for where a model is to be used (e.g. > profiles, country, time, device, etc.) Here is a blog post describing the > details, [Distributed Time Travel for Feature > Generation|https://medium.com/netflix-techblog/distributed-time-travel-for-feature-generation-389cccdd3907]. > > To be more concrete, for the ranks of videos for a given profile_id at a > given country, our data schema can be looked like this, > {code:java} > root > |-- profile_id: long (nullable = true) > |-- country_iso_code: string (nullable = true) > |-- items: array (nullable = false) > ||-- element: struct (containsNull = false) > |||-- title_id: integer (nullable = true) > |||-- scores: double (nullable = true) > ... > {code} > We oftentimes need to work on the nested list of structs by applying some > functions on them. Sometimes, we're dropping or adding new columns in the > nested list of structs. Currently, there is no easy solution in open source > Apache Spark to perform those operations using SQL primitives; many people > just convert the data into RDD to work on the nested level of data, and then > reconstruct the new dataframe as workaround. This is extremely inefficient > because all the optimizations like predicate pushdown in SQL can not be > performed, we can not leverage on the columnar format, and the serialization > and deserialization cost becomes really huge even we just want to add a new > column in the nested level. > We built a solution internally at Netflix which we're very happy with. We > plan to make it open source in Spark upstream. We would like to socialize the > API design to see if we miss any use-case. > The first API we added is *mapItems* on dataframe which take a function from > *Column* to *Column*, and then apply the function on nested dataframe. Here > is an example, > {code:java} > case class Data(foo: Int, bar: Double, items: Seq[Double]) > val df: Dataset[Data] = spark.createDataset(Seq( > Data(10, 10.0, Seq(10.1, 10.2, 10.3, 10.4)), > Data(20, 20.0, Seq(20.1, 20.2, 20.3, 20.4)) > )) > val result = df.mapItems("items") { > item => item * 2.0 > } > result.printSchema() > // root > // |-- foo: integer (nullable = false) > // |-- bar: double (nullable = false) > // |-- items: array (nullable = true) > // ||-- element: double (containsNull = true) > result.show() > // +---+++ > // |foo| bar| items| > // +---+++ > // | 10|10.0|[20.2, 20.4, 20.6...| > // | 20|20.0|[40.2, 40.4, 40.6...| > // +---+++ > {code} > Now, with the ability of applying a function in the nested dataframe, we can > add a new function, *withColumn* in *Column* to add or replace the existing > column that has the same name in the nested list of struct. Here is two > examples demonstrating the API together with *mapItems*; the first one > replaces the existing column, > {code:java} > case class Item(a: Int, b: Double) > case class Data(foo: Int, bar: Double, items: Seq[Item]) > val df: Dataset[Data] = spark.createDataset(
[jira] [Commented] (SPARK-22231) Support of map, filter, withColumn, dropColumn in nested list of structures
[ https://issues.apache.org/jira/browse/SPARK-22231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17007215#comment-17007215 ] Reynold Xin commented on SPARK-22231: - [~fqaiser94] you convinced me with #2. It'd be very verbose if we only allow DataFrame.withColumnRenamed to modify nested fields and no new methods in Column. #1 isn't really a problem because DataFrame.withColumnRenamed should be able to handle both top level field and struct fields as well. > Support of map, filter, withColumn, dropColumn in nested list of structures > --- > > Key: SPARK-22231 > URL: https://issues.apache.org/jira/browse/SPARK-22231 > Project: Spark > Issue Type: New Feature > Components: SQL >Affects Versions: 2.2.0 >Reporter: DB Tsai >Assignee: Jeremy Smith >Priority: Major > > At Netflix's algorithm team, we work on ranking problems to find the great > content to fulfill the unique tastes of our members. Before building a > recommendation algorithms, we need to prepare the training, testing, and > validation datasets in Apache Spark. Due to the nature of ranking problems, > we have a nested list of items to be ranked in one column, and the top level > is the contexts describing the setting for where a model is to be used (e.g. > profiles, country, time, device, etc.) Here is a blog post describing the > details, [Distributed Time Travel for Feature > Generation|https://medium.com/netflix-techblog/distributed-time-travel-for-feature-generation-389cccdd3907]. > > To be more concrete, for the ranks of videos for a given profile_id at a > given country, our data schema can be looked like this, > {code:java} > root > |-- profile_id: long (nullable = true) > |-- country_iso_code: string (nullable = true) > |-- items: array (nullable = false) > ||-- element: struct (containsNull = false) > |||-- title_id: integer (nullable = true) > |||-- scores: double (nullable = true) > ... > {code} > We oftentimes need to work on the nested list of structs by applying some > functions on them. Sometimes, we're dropping or adding new columns in the > nested list of structs. Currently, there is no easy solution in open source > Apache Spark to perform those operations using SQL primitives; many people > just convert the data into RDD to work on the nested level of data, and then > reconstruct the new dataframe as workaround. This is extremely inefficient > because all the optimizations like predicate pushdown in SQL can not be > performed, we can not leverage on the columnar format, and the serialization > and deserialization cost becomes really huge even we just want to add a new > column in the nested level. > We built a solution internally at Netflix which we're very happy with. We > plan to make it open source in Spark upstream. We would like to socialize the > API design to see if we miss any use-case. > The first API we added is *mapItems* on dataframe which take a function from > *Column* to *Column*, and then apply the function on nested dataframe. Here > is an example, > {code:java} > case class Data(foo: Int, bar: Double, items: Seq[Double]) > val df: Dataset[Data] = spark.createDataset(Seq( > Data(10, 10.0, Seq(10.1, 10.2, 10.3, 10.4)), > Data(20, 20.0, Seq(20.1, 20.2, 20.3, 20.4)) > )) > val result = df.mapItems("items") { > item => item * 2.0 > } > result.printSchema() > // root > // |-- foo: integer (nullable = false) > // |-- bar: double (nullable = false) > // |-- items: array (nullable = true) > // ||-- element: double (containsNull = true) > result.show() > // +---+++ > // |foo| bar| items| > // +---+++ > // | 10|10.0|[20.2, 20.4, 20.6...| > // | 20|20.0|[40.2, 40.4, 40.6...| > // +---+++ > {code} > Now, with the ability of applying a function in the nested dataframe, we can > add a new function, *withColumn* in *Column* to add or replace the existing > column that has the same name in the nested list of struct. Here is two > examples demonstrating the API together with *mapItems*; the first one > replaces the existing column, > {code:java} > case class Item(a: Int, b: Double) > case class Data(foo: Int, bar: Double, items: Seq[Item]) > val df: Dataset[Data] = spark.createDataset(Seq( > Data(10, 10.0, Seq(Item(10, 10.0), Item(11, 11.0))), > Data(20, 20.0, Seq(Item(20, 20.0), Item(21, 21.0))) > )) > val result = df.mapItems("items") { > item => item.withColumn(item("b") + 1 as "b") > } > result.printSchema > root > // |-- foo: integer (nullable = false) > // |-- bar: double (nullable = false) > // |-- items: array (nullable = true) > // ||-- element: struct (containsNull = true) > // |||-- a
[jira] [Commented] (SPARK-22231) Support of map, filter, withColumn, dropColumn in nested list of structures
[ https://issues.apache.org/jira/browse/SPARK-22231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17007117#comment-17007117 ] Reynold Xin commented on SPARK-22231: - Makes sense. One question (I've asked about this before): should the 3 functions be on DataFrame, or on Column? > Support of map, filter, withColumn, dropColumn in nested list of structures > --- > > Key: SPARK-22231 > URL: https://issues.apache.org/jira/browse/SPARK-22231 > Project: Spark > Issue Type: New Feature > Components: SQL >Affects Versions: 2.2.0 >Reporter: DB Tsai >Assignee: Jeremy Smith >Priority: Major > > At Netflix's algorithm team, we work on ranking problems to find the great > content to fulfill the unique tastes of our members. Before building a > recommendation algorithms, we need to prepare the training, testing, and > validation datasets in Apache Spark. Due to the nature of ranking problems, > we have a nested list of items to be ranked in one column, and the top level > is the contexts describing the setting for where a model is to be used (e.g. > profiles, country, time, device, etc.) Here is a blog post describing the > details, [Distributed Time Travel for Feature > Generation|https://medium.com/netflix-techblog/distributed-time-travel-for-feature-generation-389cccdd3907]. > > To be more concrete, for the ranks of videos for a given profile_id at a > given country, our data schema can be looked like this, > {code:java} > root > |-- profile_id: long (nullable = true) > |-- country_iso_code: string (nullable = true) > |-- items: array (nullable = false) > ||-- element: struct (containsNull = false) > |||-- title_id: integer (nullable = true) > |||-- scores: double (nullable = true) > ... > {code} > We oftentimes need to work on the nested list of structs by applying some > functions on them. Sometimes, we're dropping or adding new columns in the > nested list of structs. Currently, there is no easy solution in open source > Apache Spark to perform those operations using SQL primitives; many people > just convert the data into RDD to work on the nested level of data, and then > reconstruct the new dataframe as workaround. This is extremely inefficient > because all the optimizations like predicate pushdown in SQL can not be > performed, we can not leverage on the columnar format, and the serialization > and deserialization cost becomes really huge even we just want to add a new > column in the nested level. > We built a solution internally at Netflix which we're very happy with. We > plan to make it open source in Spark upstream. We would like to socialize the > API design to see if we miss any use-case. > The first API we added is *mapItems* on dataframe which take a function from > *Column* to *Column*, and then apply the function on nested dataframe. Here > is an example, > {code:java} > case class Data(foo: Int, bar: Double, items: Seq[Double]) > val df: Dataset[Data] = spark.createDataset(Seq( > Data(10, 10.0, Seq(10.1, 10.2, 10.3, 10.4)), > Data(20, 20.0, Seq(20.1, 20.2, 20.3, 20.4)) > )) > val result = df.mapItems("items") { > item => item * 2.0 > } > result.printSchema() > // root > // |-- foo: integer (nullable = false) > // |-- bar: double (nullable = false) > // |-- items: array (nullable = true) > // ||-- element: double (containsNull = true) > result.show() > // +---+++ > // |foo| bar| items| > // +---+++ > // | 10|10.0|[20.2, 20.4, 20.6...| > // | 20|20.0|[40.2, 40.4, 40.6...| > // +---+++ > {code} > Now, with the ability of applying a function in the nested dataframe, we can > add a new function, *withColumn* in *Column* to add or replace the existing > column that has the same name in the nested list of struct. Here is two > examples demonstrating the API together with *mapItems*; the first one > replaces the existing column, > {code:java} > case class Item(a: Int, b: Double) > case class Data(foo: Int, bar: Double, items: Seq[Item]) > val df: Dataset[Data] = spark.createDataset(Seq( > Data(10, 10.0, Seq(Item(10, 10.0), Item(11, 11.0))), > Data(20, 20.0, Seq(Item(20, 20.0), Item(21, 21.0))) > )) > val result = df.mapItems("items") { > item => item.withColumn(item("b") + 1 as "b") > } > result.printSchema > root > // |-- foo: integer (nullable = false) > // |-- bar: double (nullable = false) > // |-- items: array (nullable = true) > // ||-- element: struct (containsNull = true) > // |||-- a: integer (nullable = true) > // |||-- b: double (nullable = true) > result.show(false) > // +---++--+ > // |foo|bar |items | > // +---++-
[jira] [Commented] (SPARK-28264) Revisiting Python / pandas UDF
[ https://issues.apache.org/jira/browse/SPARK-28264?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16990958#comment-16990958 ] Reynold Xin commented on SPARK-28264: - Sounds good. Thanks for doing this [~hyukjin.kwon]! > Revisiting Python / pandas UDF > -- > > Key: SPARK-28264 > URL: https://issues.apache.org/jira/browse/SPARK-28264 > Project: Spark > Issue Type: Improvement > Components: PySpark, SQL >Affects Versions: 3.0.0 >Reporter: Reynold Xin >Assignee: Reynold Xin >Priority: Critical > > In the past two years, the pandas UDFs are perhaps the most important changes > to Spark for Python data science. However, these functionalities have evolved > organically, leading to some inconsistencies and confusions among users. This > document revisits UDF definition and naming, as a result of discussions among > Xiangrui, Li Jin, Hyukjin, and Reynold. > > See document here: > [https://docs.google.com/document/d/10Pkl-rqygGao2xQf6sddt0b-4FYK4g8qr_bXLKTL65A/edit#|https://docs.google.com/document/d/10Pkl-rqygGao2xQf6sddt0b-4FYK4g8qr_bXLKTL65A/edit] > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-29931) Declare all SQL legacy configs as will be removed in Spark 4.0
[ https://issues.apache.org/jira/browse/SPARK-29931?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16976134#comment-16976134 ] Reynold Xin commented on SPARK-29931: - You can say "This config will be removed in Spark 4.0 or a later release." > Declare all SQL legacy configs as will be removed in Spark 4.0 > -- > > Key: SPARK-29931 > URL: https://issues.apache.org/jira/browse/SPARK-29931 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.0.0 >Reporter: Maxim Gekk >Priority: Minor > > Add the sentence to descriptions of all legacy SQL configs existed before > Spark 3.0: "This config will be removed in Spark 4.0.". Here is the list of > such configs: > * spark.sql.legacy.execution.pandas.groupedMap.assignColumnsByName > * spark.sql.legacy.literal.pickMinimumPrecision > * spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation > * spark.sql.legacy.sizeOfNull > * spark.sql.legacy.replaceDatabricksSparkAvro.enabled > * spark.sql.legacy.setopsPrecedence.enabled > * spark.sql.legacy.integralDivide.returnBigint > * spark.sql.legacy.bucketedTableScan.outputOrdering > * spark.sql.legacy.parser.havingWithoutGroupByAsWhere > * spark.sql.legacy.dataset.nameNonStructGroupingKeyAsValue > * spark.sql.legacy.setCommandRejectsSparkCoreConfs > * spark.sql.legacy.utcTimestampFunc.enabled > * spark.sql.legacy.typeCoercion.datetimeToString > * spark.sql.legacy.looseUpcast > * spark.sql.legacy.ctePrecedence.enabled > * spark.sql.legacy.arrayExistsFollowsThreeValuedLogic -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-28264) Revisiting Python / pandas UDF
Reynold Xin created SPARK-28264: --- Summary: Revisiting Python / pandas UDF Key: SPARK-28264 URL: https://issues.apache.org/jira/browse/SPARK-28264 Project: Spark Issue Type: Improvement Components: PySpark, SQL Affects Versions: 3.0.0 Reporter: Reynold Xin Assignee: Reynold Xin In the past two years, the pandas UDFs are perhaps the most important changes to Spark for Python data science. However, these functionalities have evolved organically, leading to some inconsistencies and confusions among users. This document revisits UDF definition and naming, as a result of discussions among Xiangrui, Li Jin, Hyukjin, and Reynold. See document here: [https://docs.google.com/document/d/10Pkl-rqygGao2xQf6sddt0b-4FYK4g8qr_bXLKTL65A/edit#|https://docs.google.com/document/d/10Pkl-rqygGao2xQf6sddt0b-4FYK4g8qr_bXLKTL65A/edit] -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-27386) Improve partition transform parsing
[ https://issues.apache.org/jira/browse/SPARK-27386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16818457#comment-16818457 ] Reynold Xin commented on SPARK-27386: - [~rdblue] when will you fix this? > Improve partition transform parsing > --- > > Key: SPARK-27386 > URL: https://issues.apache.org/jira/browse/SPARK-27386 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.0.0 >Reporter: Ryan Blue >Priority: Major > > SPARK-27181 adds support to the SQL parser for transformation functions in > the {{PARTITION BY}} clause. The rules to match this are specific to > transforms and can match only literals or qualified names (field references). > This should be improved to match a broader set of expressions so that Spark > can produce better error messages than an expected symbol list. > For example, {{PARTITION BY (2 + 3)}} should produce "invalid transformation > expression: 2 + 3" instead of "expecting qualified name". -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26218) Throw exception on overflow for integers
[ https://issues.apache.org/jira/browse/SPARK-26218?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16814709#comment-16814709 ] Reynold Xin commented on SPARK-26218: - The no-exception is by design. Imagine you have an ETL job that runs for hours, and then it suddenly throws an exception because one row overflows ... > Throw exception on overflow for integers > > > Key: SPARK-26218 > URL: https://issues.apache.org/jira/browse/SPARK-26218 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 3.0.0 >Reporter: Marco Gaido >Priority: Major > > SPARK-24598 just updated the documentation in order to state that our > addition is a Java style one and not a SQL style. But in order to follow the > SQL standard we should instead throw an exception if an overflow occurs. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Issue Comment Deleted] (SPARK-26366) Except with transform regression
[ https://issues.apache.org/jira/browse/SPARK-26366?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Reynold Xin updated SPARK-26366: Comment: was deleted (was: mgaido91 opened a new pull request #23372: [SPARK-26366][SQL][BACKPORT-2.3] ReplaceExceptWithFilter should consider NULL as False URL: https://github.com/apache/spark/pull/23372 ## What changes were proposed in this pull request? In `ReplaceExceptWithFilter` we do not consider properly the case in which the condition returns NULL. Indeed, in that case, since negating NULL still returns NULL, so it is not true the assumption that negating the condition returns all the rows which didn't satisfy it, rows returning NULL may not be returned. This happens when constraints inferred by `InferFiltersFromConstraints` are not enough, as it happens with `OR` conditions. The rule had also problems with non-deterministic conditions: in such a scenario, this rule would change the probability of the output. The PR fixes these problem by: - returning False for the condition when it is Null (in this way we do return all the rows which didn't satisfy it); - avoiding any transformation when the condition is non-deterministic. ## How was this patch tested? added UTs This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org ) > Except with transform regression > > > Key: SPARK-26366 > URL: https://issues.apache.org/jira/browse/SPARK-26366 > Project: Spark > Issue Type: Bug > Components: Spark Core, SQL >Affects Versions: 2.3.2 >Reporter: Dan Osipov >Assignee: Marco Gaido >Priority: Major > Labels: correctness > Fix For: 2.3.3, 2.4.1, 3.0.0 > > > There appears to be a regression between Spark 2.2 and 2.3. Below is the code > to reproduce it: > > {code:java} > import org.apache.spark.sql.functions.col > import org.apache.spark.sql.Row > import org.apache.spark.sql.types._ > val inputDF = spark.sqlContext.createDataFrame( > spark.sparkContext.parallelize(Seq( > Row("0", "john", "smith", "j...@smith.com"), > Row("1", "jane", "doe", "j...@doe.com"), > Row("2", "apache", "spark", "sp...@apache.org"), > Row("3", "foo", "bar", null) > )), > StructType(List( > StructField("id", StringType, nullable=true), > StructField("first_name", StringType, nullable=true), > StructField("last_name", StringType, nullable=true), > StructField("email", StringType, nullable=true) > )) > ) > val exceptDF = inputDF.transform( toProcessDF => > toProcessDF.filter( > ( > col("first_name").isin(Seq("john", "jane"): _*) > and col("last_name").isin(Seq("smith", "doe"): _*) > ) > or col("email").isin(List(): _*) > ) > ) > inputDF.except(exceptDF).show() > {code} > Output with Spark 2.2: > {noformat} > +---+--+-++ > | id|first_name|last_name| email| > +---+--+-++ > | 2| apache| spark|sp...@apache.org| > | 3| foo| bar| null| > +---+--+-++{noformat} > Output with Spark 2.3: > {noformat} > +---+--+-++ > | id|first_name|last_name| email| > +---+--+-++ > | 2| apache| spark|sp...@apache.org| > +---+--+-++{noformat} > Note, changing the last line to > {code:java} > inputDF.except(exceptDF.cache()).show() > {code} > produces identical output for both Spark 2.3 and 2.2 > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26362) Remove 'spark.driver.allowMultipleContexts' to disallow multiple Spark contexts
[ https://issues.apache.org/jira/browse/SPARK-26362?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Reynold Xin updated SPARK-26362: Labels: releasenotes (was: ) > Remove 'spark.driver.allowMultipleContexts' to disallow multiple Spark > contexts > --- > > Key: SPARK-26362 > URL: https://issues.apache.org/jira/browse/SPARK-26362 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 3.0.0 >Reporter: Hyukjin Kwon >Assignee: Hyukjin Kwon >Priority: Major > Labels: releasenotes > Fix For: 3.0.0 > > > Multiple Spark contexts are discouraged and it has been warning from 4 years > ago (see SPARK-4180). > It could cause arbitrary and mysterious error cases. (Honestly, I didn't even > know Spark allows it). -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Issue Comment Deleted] (SPARK-26362) Remove 'spark.driver.allowMultipleContexts' to disallow multiple Spark contexts
[ https://issues.apache.org/jira/browse/SPARK-26362?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Reynold Xin updated SPARK-26362: Comment: was deleted (was: asfgit closed pull request #23311: [SPARK-26362][CORE] Remove 'spark.driver.allowMultipleContexts' to disallow multiple creation of SparkContexts URL: https://github.com/apache/spark/pull/23311 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 696dafda6d1ec..09cc346db0ed2 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -64,9 +64,8 @@ import org.apache.spark.util.logging.DriverLogger * Main entry point for Spark functionality. A SparkContext represents the connection to a Spark * cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster. * - * Only one SparkContext may be active per JVM. You must `stop()` the active SparkContext before - * creating a new one. This limitation may eventually be removed; see SPARK-2243 for more details. - * + * @note Only one `SparkContext` should be active per JVM. You must `stop()` the + * active `SparkContext` before creating a new one. * @param config a Spark Config object describing the application configuration. Any settings in * this config overrides the default configs as well as system properties. */ @@ -75,14 +74,10 @@ class SparkContext(config: SparkConf) extends Logging { // The call site where this SparkContext was constructed. private val creationSite: CallSite = Utils.getCallSite() - // If true, log warnings instead of throwing exceptions when multiple SparkContexts are active - private val allowMultipleContexts: Boolean = -config.getBoolean("spark.driver.allowMultipleContexts", false) - // In order to prevent multiple SparkContexts from being active at the same time, mark this // context as having started construction. // NOTE: this must be placed at the beginning of the SparkContext constructor. - SparkContext.markPartiallyConstructed(this, allowMultipleContexts) + SparkContext.markPartiallyConstructed(this) val startTime = System.currentTimeMillis() @@ -2392,7 +2387,7 @@ class SparkContext(config: SparkConf) extends Logging { // In order to prevent multiple SparkContexts from being active at the same time, mark this // context as having finished construction. // NOTE: this must be placed at the end of the SparkContext constructor. - SparkContext.setActiveContext(this, allowMultipleContexts) + SparkContext.setActiveContext(this) } /** @@ -2409,18 +2404,18 @@ object SparkContext extends Logging { private val SPARK_CONTEXT_CONSTRUCTOR_LOCK = new Object() /** - * The active, fully-constructed SparkContext. If no SparkContext is active, then this is `null`. + * The active, fully-constructed SparkContext. If no SparkContext is active, then this is `null`. * - * Access to this field is guarded by SPARK_CONTEXT_CONSTRUCTOR_LOCK. + * Access to this field is guarded by `SPARK_CONTEXT_CONSTRUCTOR_LOCK`. */ private val activeContext: AtomicReference[SparkContext] = new AtomicReference[SparkContext](null) /** - * Points to a partially-constructed SparkContext if some thread is in the SparkContext + * Points to a partially-constructed SparkContext if another thread is in the SparkContext * constructor, or `None` if no SparkContext is being constructed. * - * Access to this field is guarded by SPARK_CONTEXT_CONSTRUCTOR_LOCK + * Access to this field is guarded by `SPARK_CONTEXT_CONSTRUCTOR_LOCK`. */ private var contextBeingConstructed: Option[SparkContext] = None @@ -2428,24 +2423,16 @@ object SparkContext extends Logging { * Called to ensure that no other SparkContext is running in this JVM. * * Throws an exception if a running context is detected and logs a warning if another thread is - * constructing a SparkContext. This warning is necessary because the current locking scheme + * constructing a SparkContext. This warning is necessary because the current locking scheme * prevents us from reliably distinguishing between cases where another context is being * constructed and cases where another constructor threw an exception. */ - private def assertNoOtherContextIsRunning( - sc: SparkContext, - allowMultipleContexts: Boolean): Unit = { + private def assertNoOtherContextIsRunning(sc: SparkContext): Unit = { SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized { Option(activeContext.get()).
[jira] [Issue Comment Deleted] (SPARK-26366) Except with transform regression
[ https://issues.apache.org/jira/browse/SPARK-26366?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Reynold Xin updated SPARK-26366: Comment: was deleted (was: mgaido91 opened a new pull request #23350: [SPARK-26366][SQL][BACKPORT-2.3] ReplaceExceptWithFilter should consider NULL as False URL: https://github.com/apache/spark/pull/23350 ## What changes were proposed in this pull request? In `ReplaceExceptWithFilter` we do not consider properly the case in which the condition returns NULL. Indeed, in that case, since negating NULL still returns NULL, so it is not true the assumption that negating the condition returns all the rows which didn't satisfy it, rows returning NULL may not be returned. This happens when constraints inferred by `InferFiltersFromConstraints` are not enough, as it happens with `OR` conditions. The rule had also problems with non-deterministic conditions: in such a scenario, this rule would change the probability of the output. The PR fixes these problem by: - returning False for the condition when it is Null (in this way we do return all the rows which didn't satisfy it); - avoiding any transformation when the condition is non-deterministic. ## How was this patch tested? added UTs This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org ) > Except with transform regression > > > Key: SPARK-26366 > URL: https://issues.apache.org/jira/browse/SPARK-26366 > Project: Spark > Issue Type: Bug > Components: Spark Core, SQL >Affects Versions: 2.3.2 >Reporter: Dan Osipov >Assignee: Marco Gaido >Priority: Major > Labels: correctness > Fix For: 2.3.3, 2.4.1, 3.0.0 > > > There appears to be a regression between Spark 2.2 and 2.3. Below is the code > to reproduce it: > > {code:java} > import org.apache.spark.sql.functions.col > import org.apache.spark.sql.Row > import org.apache.spark.sql.types._ > val inputDF = spark.sqlContext.createDataFrame( > spark.sparkContext.parallelize(Seq( > Row("0", "john", "smith", "j...@smith.com"), > Row("1", "jane", "doe", "j...@doe.com"), > Row("2", "apache", "spark", "sp...@apache.org"), > Row("3", "foo", "bar", null) > )), > StructType(List( > StructField("id", StringType, nullable=true), > StructField("first_name", StringType, nullable=true), > StructField("last_name", StringType, nullable=true), > StructField("email", StringType, nullable=true) > )) > ) > val exceptDF = inputDF.transform( toProcessDF => > toProcessDF.filter( > ( > col("first_name").isin(Seq("john", "jane"): _*) > and col("last_name").isin(Seq("smith", "doe"): _*) > ) > or col("email").isin(List(): _*) > ) > ) > inputDF.except(exceptDF).show() > {code} > Output with Spark 2.2: > {noformat} > +---+--+-++ > | id|first_name|last_name| email| > +---+--+-++ > | 2| apache| spark|sp...@apache.org| > | 3| foo| bar| null| > +---+--+-++{noformat} > Output with Spark 2.3: > {noformat} > +---+--+-++ > | id|first_name|last_name| email| > +---+--+-++ > | 2| apache| spark|sp...@apache.org| > +---+--+-++{noformat} > Note, changing the last line to > {code:java} > inputDF.except(exceptDF.cache()).show() > {code} > produces identical output for both Spark 2.3 and 2.2 > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Issue Comment Deleted] (SPARK-26366) Except with transform regression
[ https://issues.apache.org/jira/browse/SPARK-26366?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Reynold Xin updated SPARK-26366: Comment: was deleted (was: mgaido91 opened a new pull request #23315: [SPARK-26366][SQL] ReplaceExceptWithFilter should consider NULL as False URL: https://github.com/apache/spark/pull/23315 ## What changes were proposed in this pull request? In `ReplaceExceptWithFilter` we do not consider the case in which the condition returns NULL. Indeed, in that case, negating NULL still returns NULL, so it is not true the assumption that negating the condition returns all the rows which didn't satisfy it: rows returning NULL are not returned. The PR fixes this problem by returning False for the condition when it is Null. In this way we do return all the rows which didn't satisfy it. ## How was this patch tested? added UTs This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org ) > Except with transform regression > > > Key: SPARK-26366 > URL: https://issues.apache.org/jira/browse/SPARK-26366 > Project: Spark > Issue Type: Bug > Components: Spark Core, SQL >Affects Versions: 2.3.2 >Reporter: Dan Osipov >Assignee: Marco Gaido >Priority: Major > Labels: correctness > Fix For: 2.3.3, 2.4.1, 3.0.0 > > > There appears to be a regression between Spark 2.2 and 2.3. Below is the code > to reproduce it: > > {code:java} > import org.apache.spark.sql.functions.col > import org.apache.spark.sql.Row > import org.apache.spark.sql.types._ > val inputDF = spark.sqlContext.createDataFrame( > spark.sparkContext.parallelize(Seq( > Row("0", "john", "smith", "j...@smith.com"), > Row("1", "jane", "doe", "j...@doe.com"), > Row("2", "apache", "spark", "sp...@apache.org"), > Row("3", "foo", "bar", null) > )), > StructType(List( > StructField("id", StringType, nullable=true), > StructField("first_name", StringType, nullable=true), > StructField("last_name", StringType, nullable=true), > StructField("email", StringType, nullable=true) > )) > ) > val exceptDF = inputDF.transform( toProcessDF => > toProcessDF.filter( > ( > col("first_name").isin(Seq("john", "jane"): _*) > and col("last_name").isin(Seq("smith", "doe"): _*) > ) > or col("email").isin(List(): _*) > ) > ) > inputDF.except(exceptDF).show() > {code} > Output with Spark 2.2: > {noformat} > +---+--+-++ > | id|first_name|last_name| email| > +---+--+-++ > | 2| apache| spark|sp...@apache.org| > | 3| foo| bar| null| > +---+--+-++{noformat} > Output with Spark 2.3: > {noformat} > +---+--+-++ > | id|first_name|last_name| email| > +---+--+-++ > | 2| apache| spark|sp...@apache.org| > +---+--+-++{noformat} > Note, changing the last line to > {code:java} > inputDF.except(exceptDF.cache()).show() > {code} > produces identical output for both Spark 2.3 and 2.2 > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Issue Comment Deleted] (SPARK-26366) Except with transform regression
[ https://issues.apache.org/jira/browse/SPARK-26366?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Reynold Xin updated SPARK-26366: Comment: was deleted (was: gatorsmile closed pull request #23350: [SPARK-26366][SQL][BACKPORT-2.3] ReplaceExceptWithFilter should consider NULL as False URL: https://github.com/apache/spark/pull/23350 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceExceptWithFilter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceExceptWithFilter.scala index 45edf266bbce4..08cf16038a654 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceExceptWithFilter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceExceptWithFilter.scala @@ -36,7 +36,8 @@ import org.apache.spark.sql.catalyst.rules.Rule * Note: * Before flipping the filter condition of the right node, we should: * 1. Combine all it's [[Filter]]. - * 2. Apply InferFiltersFromConstraints rule (to take into account of NULL values in the condition). + * 2. Update the attribute references to the left node; + * 3. Add a Coalesce(condition, False) (to take into account of NULL values in the condition). */ object ReplaceExceptWithFilter extends Rule[LogicalPlan] { @@ -47,23 +48,28 @@ object ReplaceExceptWithFilter extends Rule[LogicalPlan] { plan.transform { case e @ Except(left, right) if isEligible(left, right) => -val newCondition = transformCondition(left, skipProject(right)) -newCondition.map { c => - Distinct(Filter(Not(c), left)) -}.getOrElse { +val filterCondition = combineFilters(skipProject(right)).asInstanceOf[Filter].condition +if (filterCondition.deterministic) { + transformCondition(left, filterCondition).map { c => +Distinct(Filter(Not(c), left)) + }.getOrElse { +e + } +} else { e } } } - private def transformCondition(left: LogicalPlan, right: LogicalPlan): Option[Expression] = { -val filterCondition = - InferFiltersFromConstraints(combineFilters(right)).asInstanceOf[Filter].condition - -val attributeNameMap: Map[String, Attribute] = left.output.map(x => (x.name, x)).toMap - -if (filterCondition.references.forall(r => attributeNameMap.contains(r.name))) { - Some(filterCondition.transform { case a: AttributeReference => attributeNameMap(a.name) }) + private def transformCondition(plan: LogicalPlan, condition: Expression): Option[Expression] = { +val attributeNameMap: Map[String, Attribute] = plan.output.map(x => (x.name, x)).toMap +if (condition.references.forall(r => attributeNameMap.contains(r.name))) { + val rewrittenCondition = condition.transform { +case a: AttributeReference => attributeNameMap(a.name) + } + // We need to consider as False when the condition is NULL, otherwise we do not return those + // rows containing NULL which are instead filtered in the Except right plan + Some(Coalesce(Seq(rewrittenCondition, Literal.FalseLiteral))) } else { None } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala index 52dc2e9fb076c..78d3969906e99 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala @@ -20,11 +20,12 @@ package org.apache.spark.sql.catalyst.optimizer import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ -import org.apache.spark.sql.catalyst.expressions.{Alias, Literal, Not} +import org.apache.spark.sql.catalyst.expressions.{Alias, Coalesce, If, Literal, Not} import org.apache.spark.sql.catalyst.expressions.aggregate.First import org.apache.spark.sql.catalyst.plans.{LeftAnti, LeftSemi, PlanTest} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.RuleExecutor +import org.apache.spark.sql.types.BooleanType class ReplaceOperatorSuite extends PlanTest { @@ -65,8 +66,7 @@ class ReplaceOperatorSuite extends PlanTest { val correctAnswer = Aggregate(table1.output, table1.output, -Filter(Not((attributeA.isNotNull && attributeB.isNotNull) && - (attributeA >= 2 && attributeB < 1)), +Filter(Not(Coalesce(Seq
[jira] [Commented] (SPARK-26366) Except with transform regression
[ https://issues.apache.org/jira/browse/SPARK-26366?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16734363#comment-16734363 ] Reynold Xin commented on SPARK-26366: - Please make sure you guys tag these tickets with correctness label! > Except with transform regression > > > Key: SPARK-26366 > URL: https://issues.apache.org/jira/browse/SPARK-26366 > Project: Spark > Issue Type: Bug > Components: Spark Core, SQL >Affects Versions: 2.3.2 >Reporter: Dan Osipov >Assignee: Marco Gaido >Priority: Major > Labels: correctness > Fix For: 2.3.3, 2.4.1, 3.0.0 > > > There appears to be a regression between Spark 2.2 and 2.3. Below is the code > to reproduce it: > > {code:java} > import org.apache.spark.sql.functions.col > import org.apache.spark.sql.Row > import org.apache.spark.sql.types._ > val inputDF = spark.sqlContext.createDataFrame( > spark.sparkContext.parallelize(Seq( > Row("0", "john", "smith", "j...@smith.com"), > Row("1", "jane", "doe", "j...@doe.com"), > Row("2", "apache", "spark", "sp...@apache.org"), > Row("3", "foo", "bar", null) > )), > StructType(List( > StructField("id", StringType, nullable=true), > StructField("first_name", StringType, nullable=true), > StructField("last_name", StringType, nullable=true), > StructField("email", StringType, nullable=true) > )) > ) > val exceptDF = inputDF.transform( toProcessDF => > toProcessDF.filter( > ( > col("first_name").isin(Seq("john", "jane"): _*) > and col("last_name").isin(Seq("smith", "doe"): _*) > ) > or col("email").isin(List(): _*) > ) > ) > inputDF.except(exceptDF).show() > {code} > Output with Spark 2.2: > {noformat} > +---+--+-++ > | id|first_name|last_name| email| > +---+--+-++ > | 2| apache| spark|sp...@apache.org| > | 3| foo| bar| null| > +---+--+-++{noformat} > Output with Spark 2.3: > {noformat} > +---+--+-++ > | id|first_name|last_name| email| > +---+--+-++ > | 2| apache| spark|sp...@apache.org| > +---+--+-++{noformat} > Note, changing the last line to > {code:java} > inputDF.except(exceptDF.cache()).show() > {code} > produces identical output for both Spark 2.3 and 2.2 > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26366) Except with transform regression
[ https://issues.apache.org/jira/browse/SPARK-26366?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Reynold Xin updated SPARK-26366: Labels: correctness (was: ) > Except with transform regression > > > Key: SPARK-26366 > URL: https://issues.apache.org/jira/browse/SPARK-26366 > Project: Spark > Issue Type: Bug > Components: Spark Core, SQL >Affects Versions: 2.3.2 >Reporter: Dan Osipov >Assignee: Marco Gaido >Priority: Major > Labels: correctness > Fix For: 2.3.3, 2.4.1, 3.0.0 > > > There appears to be a regression between Spark 2.2 and 2.3. Below is the code > to reproduce it: > > {code:java} > import org.apache.spark.sql.functions.col > import org.apache.spark.sql.Row > import org.apache.spark.sql.types._ > val inputDF = spark.sqlContext.createDataFrame( > spark.sparkContext.parallelize(Seq( > Row("0", "john", "smith", "j...@smith.com"), > Row("1", "jane", "doe", "j...@doe.com"), > Row("2", "apache", "spark", "sp...@apache.org"), > Row("3", "foo", "bar", null) > )), > StructType(List( > StructField("id", StringType, nullable=true), > StructField("first_name", StringType, nullable=true), > StructField("last_name", StringType, nullable=true), > StructField("email", StringType, nullable=true) > )) > ) > val exceptDF = inputDF.transform( toProcessDF => > toProcessDF.filter( > ( > col("first_name").isin(Seq("john", "jane"): _*) > and col("last_name").isin(Seq("smith", "doe"): _*) > ) > or col("email").isin(List(): _*) > ) > ) > inputDF.except(exceptDF).show() > {code} > Output with Spark 2.2: > {noformat} > +---+--+-++ > | id|first_name|last_name| email| > +---+--+-++ > | 2| apache| spark|sp...@apache.org| > | 3| foo| bar| null| > +---+--+-++{noformat} > Output with Spark 2.3: > {noformat} > +---+--+-++ > | id|first_name|last_name| email| > +---+--+-++ > | 2| apache| spark|sp...@apache.org| > +---+--+-++{noformat} > Note, changing the last line to > {code:java} > inputDF.except(exceptDF.cache()).show() > {code} > produces identical output for both Spark 2.3 and 2.2 > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Issue Comment Deleted] (SPARK-26366) Except with transform regression
[ https://issues.apache.org/jira/browse/SPARK-26366?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Reynold Xin updated SPARK-26366: Comment: was deleted (was: asfgit closed pull request #23315: [SPARK-26366][SQL] ReplaceExceptWithFilter should consider NULL as False URL: https://github.com/apache/spark/pull/23315 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceExceptWithFilter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceExceptWithFilter.scala index efd3944eba7f5..4996d24dfd298 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceExceptWithFilter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceExceptWithFilter.scala @@ -36,7 +36,8 @@ import org.apache.spark.sql.catalyst.rules.Rule * Note: * Before flipping the filter condition of the right node, we should: * 1. Combine all it's [[Filter]]. - * 2. Apply InferFiltersFromConstraints rule (to take into account of NULL values in the condition). + * 2. Update the attribute references to the left node; + * 3. Add a Coalesce(condition, False) (to take into account of NULL values in the condition). */ object ReplaceExceptWithFilter extends Rule[LogicalPlan] { @@ -47,23 +48,28 @@ object ReplaceExceptWithFilter extends Rule[LogicalPlan] { plan.transform { case e @ Except(left, right, false) if isEligible(left, right) => -val newCondition = transformCondition(left, skipProject(right)) -newCondition.map { c => - Distinct(Filter(Not(c), left)) -}.getOrElse { +val filterCondition = combineFilters(skipProject(right)).asInstanceOf[Filter].condition +if (filterCondition.deterministic) { + transformCondition(left, filterCondition).map { c => +Distinct(Filter(Not(c), left)) + }.getOrElse { +e + } +} else { e } } } - private def transformCondition(left: LogicalPlan, right: LogicalPlan): Option[Expression] = { -val filterCondition = - InferFiltersFromConstraints(combineFilters(right)).asInstanceOf[Filter].condition - -val attributeNameMap: Map[String, Attribute] = left.output.map(x => (x.name, x)).toMap - -if (filterCondition.references.forall(r => attributeNameMap.contains(r.name))) { - Some(filterCondition.transform { case a: AttributeReference => attributeNameMap(a.name) }) + private def transformCondition(plan: LogicalPlan, condition: Expression): Option[Expression] = { +val attributeNameMap: Map[String, Attribute] = plan.output.map(x => (x.name, x)).toMap +if (condition.references.forall(r => attributeNameMap.contains(r.name))) { + val rewrittenCondition = condition.transform { +case a: AttributeReference => attributeNameMap(a.name) + } + // We need to consider as False when the condition is NULL, otherwise we do not return those + // rows containing NULL which are instead filtered in the Except right plan + Some(Coalesce(Seq(rewrittenCondition, Literal.FalseLiteral))) } else { None } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala index 3b1b2d588ef67..c8e15c7da763e 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala @@ -20,11 +20,12 @@ package org.apache.spark.sql.catalyst.optimizer import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ -import org.apache.spark.sql.catalyst.expressions.{Alias, Literal, Not} +import org.apache.spark.sql.catalyst.expressions.{Alias, Coalesce, If, Literal, Not} import org.apache.spark.sql.catalyst.expressions.aggregate.First import org.apache.spark.sql.catalyst.plans.{LeftAnti, LeftSemi, PlanTest} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.RuleExecutor +import org.apache.spark.sql.types.BooleanType class ReplaceOperatorSuite extends PlanTest { @@ -65,8 +66,7 @@ class ReplaceOperatorSuite extends PlanTest { val correctAnswer = Aggregate(table1.output, table1.output, -Filter(Not((attributeA.isNotNull && attributeB.isNotNull) && - (attributeA >= 2 && attributeB < 1)), +Filter(Not(Coalesce(Seq(attributeA
[jira] [Issue Comment Deleted] (SPARK-26246) Infer timestamp types from JSON
[ https://issues.apache.org/jira/browse/SPARK-26246?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Reynold Xin updated SPARK-26246: Comment: was deleted (was: asfgit closed pull request #23201: [SPARK-26246][SQL] Inferring TimestampType from JSON URL: https://github.com/apache/spark/pull/23201 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala index 263e05de32075..d1bc00c08c1c6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala @@ -28,7 +28,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.analysis.TypeCoercion import org.apache.spark.sql.catalyst.expressions.ExprUtils import org.apache.spark.sql.catalyst.json.JacksonUtils.nextUntil -import org.apache.spark.sql.catalyst.util.{DropMalformedMode, FailFastMode, ParseMode, PermissiveMode} +import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.util.Utils @@ -37,6 +37,12 @@ private[sql] class JsonInferSchema(options: JSONOptions) extends Serializable { private val decimalParser = ExprUtils.getDecimalParser(options.locale) + @transient + private lazy val timestampFormatter = TimestampFormatter( +options.timestampFormat, +options.timeZone, +options.locale) + /** * Infer the type of a collection of json records in three stages: * 1. Infer the type of each record @@ -115,13 +121,19 @@ private[sql] class JsonInferSchema(options: JSONOptions) extends Serializable { // record fields' types have been combined. NullType - case VALUE_STRING if options.prefersDecimal => + case VALUE_STRING => +val field = parser.getText val decimalTry = allCatch opt { - val bigDecimal = decimalParser(parser.getText) + val bigDecimal = decimalParser(field) DecimalType(bigDecimal.precision, bigDecimal.scale) } -decimalTry.getOrElse(StringType) - case VALUE_STRING => StringType +if (options.prefersDecimal && decimalTry.isDefined) { + decimalTry.get +} else if ((allCatch opt timestampFormatter.parse(field)).isDefined) { + TimestampType +} else { + StringType +} case START_OBJECT => val builder = Array.newBuilder[StructField] diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JsonInferSchemaSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JsonInferSchemaSuite.scala new file mode 100644 index 0..9307f9b47b807 --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JsonInferSchemaSuite.scala @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.json + +import com.fasterxml.jackson.core.JsonFactory + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.plans.SQLHelper +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types._ + +class JsonInferSchemaSuite extends SparkFunSuite with SQLHelper { + + def checkType(options: Map[String, String], json: String, dt: DataType): Unit = { +val jsonOptions = new JSONOptions(options, "UTC", "") +val inferSchema = new JsonInferSchema(jsonOptions) +val factory = new JsonFactory() +jsonOptions.setJacksonOptions(factory) +val parser = CreateJacksonParser.string(factory, json) +parser.nextToken() +val expectedType = StructType(Seq(StructField("a", dt, true))) + +assert(inferSchema.inferField(parser) === expectedType) + } + + def checkTime
[jira] [Comment Edited] (SPARK-26246) Infer timestamp types from JSON
[ https://issues.apache.org/jira/browse/SPARK-26246?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16734361#comment-16734361 ] Reynold Xin edited comment on SPARK-26246 at 1/4/19 5:22 PM: - Is there an option flag for this? This is a breaking change for people, and we need a way to fallback. was (Author: rxin): |Is there an option flag for this? This is a breaking change for people, and we need a way to fallback.| > Infer timestamp types from JSON > --- > > Key: SPARK-26246 > URL: https://issues.apache.org/jira/browse/SPARK-26246 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0 >Reporter: Maxim Gekk >Assignee: Maxim Gekk >Priority: Minor > Fix For: 3.0.0 > > > Currently, TimestampType cannot be inferred from JSON. To parse JSON string, > you have to specify schema explicitly if JSON input contains timestamps. This > ticket aims to extend JsonInferSchema to support such inferring. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26246) Infer timestamp types from JSON
[ https://issues.apache.org/jira/browse/SPARK-26246?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16734361#comment-16734361 ] Reynold Xin commented on SPARK-26246: - |Is there an option flag for this? This is a breaking change for people, and we need a way to fallback.| > Infer timestamp types from JSON > --- > > Key: SPARK-26246 > URL: https://issues.apache.org/jira/browse/SPARK-26246 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0 >Reporter: Maxim Gekk >Assignee: Maxim Gekk >Priority: Minor > Fix For: 3.0.0 > > > Currently, TimestampType cannot be inferred from JSON. To parse JSON string, > you have to specify schema explicitly if JSON input contains timestamps. This > ticket aims to extend JsonInferSchema to support such inferring. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26362) Remove 'spark.driver.allowMultipleContexts' to disallow multiple Spark contexts
[ https://issues.apache.org/jira/browse/SPARK-26362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16734355#comment-16734355 ] Reynold Xin commented on SPARK-26362: - [~hyukjin.kwon] please make sure we add releasenotes label to tickets like this. > Remove 'spark.driver.allowMultipleContexts' to disallow multiple Spark > contexts > --- > > Key: SPARK-26362 > URL: https://issues.apache.org/jira/browse/SPARK-26362 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 3.0.0 >Reporter: Hyukjin Kwon >Assignee: Hyukjin Kwon >Priority: Major > Labels: releasenotes > Fix For: 3.0.0 > > > Multiple Spark contexts are discouraged and it has been warning from 4 years > ago (see SPARK-4180). > It could cause arbitrary and mysterious error cases. (Honestly, I didn't even > know Spark allows it). -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23693) SQL function uuid()
[ https://issues.apache.org/jira/browse/SPARK-23693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16728078#comment-16728078 ] Reynold Xin commented on SPARK-23693: - [~tashoyan] the issue with calling uuid directly is that it is non-deterministic, and when recompute happens due to fault, the ids are not stable. We'd need a different way to generate uuid that can be deterministic based on some seed. > SQL function uuid() > --- > > Key: SPARK-23693 > URL: https://issues.apache.org/jira/browse/SPARK-23693 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.2.1, 2.3.0 >Reporter: Arseniy Tashoyan >Priority: Minor > > Add function uuid() to org.apache.spark.sql.functions that returns > [Universally Unique > ID|https://en.wikipedia.org/wiki/Universally_unique_identifier]. > Sometimes it is necessary to uniquely identify each row in a DataFrame. > Currently the following ways are available: > * monotonically_increasing_id() function > * row_number() function over some window > * convert the DataFrame to RDD and zipWithIndex() > All these approaches do not work when appending this DataFrame to another > DataFrame (union). Collisions may occur - two rows in different DataFrames > may have the same ID. Re-generating IDs on the resulting DataFrame is not an > option, because some data in some other system may already refer to old IDs. > The proposed solution is to add new function: > {code:scala} > def uuid(): Column > {code} > that returns String representation of UUID. > UUID is represented as a 128-bit number (two long numbers). Such numbers are > not supported in Scala or Java. In addition, some storage systems do not > support 128-bit numbers (Parquet's largest numeric type is INT96). This is > the reason for the uuid() function to return String. > I already have a simple implementation based on > [java-uuid-generator|https://github.com/cowtowncoder/java-uuid-generator]. I > can share it as a PR. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26222) Scan: track file listing time
[ https://issues.apache.org/jira/browse/SPARK-26222?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16720957#comment-16720957 ] Reynold Xin commented on SPARK-26222: - So I spent some time looking at the code base to understand what's going on, and how we should report this. In short, I think we have two types of tables: (1) tables that require full file listing in order to resolve the schema (including partition columns) (2) tables that don't. This means there are 3 scenarios to think about: (1) spark.read.parquet("/path/to/table").count() -> in this case an InMemoryFileIndex containing all of the leaf files is created. (2a) spark.read.table("abcd").count() -> when partitions are not tracked in the catalog, which is basically the same as (1) (2b) spark.read.table("abcd").count() -> when partitions are tracked in the catalog. In this case a CatalogFileIndex is created. We should measure the listing time in CatalogFileIndex.filterPartitions. Also instead of tracking them as phases, I'd associate the timing with the scan operator in SQL metrics. I'd report the start and end time, rather than just a single duration. > Scan: track file listing time > - > > Key: SPARK-26222 > URL: https://issues.apache.org/jira/browse/SPARK-26222 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.4.0 >Reporter: Reynold Xin >Priority: Major > > We should track file listing time and add it to the scan node's SQL metric, > so we have visibility how much is spent in file listing. It'd be useful to > track not just duration, but also start and end time so we can construct a > timeline. > This requires a little bit design to define what file listing time means, > when we are reading from cache, vs not cache. > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Issue Comment Deleted] (SPARK-26222) Scan: track file listing time
[ https://issues.apache.org/jira/browse/SPARK-26222?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Reynold Xin updated SPARK-26222: Comment: was deleted (was: xuanyuanking opened a new pull request #23298: [SPARK-26222][SQL] Track file listing time URL: https://github.com/apache/spark/pull/23298 ## What changes were proposed in this pull request? File listing time in scan node's SQL metrics has done and improved in spark-20136/SPARK-26327. In this pr we use QueryPlanningTracker to track start and end time of file listing. ## How was this patch tested? Add test for DataFrameWriter and Non-physical phase below: - DataFrameReader.load, file listing will be triggered by DataSource.resolveRelation. - Analyze rule like FindDataSourceTable. - Optimization rule like PruneFileSourcePartitions, OptimizeMetadataOnlyQuery. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org ) > Scan: track file listing time > - > > Key: SPARK-26222 > URL: https://issues.apache.org/jira/browse/SPARK-26222 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.4.0 >Reporter: Reynold Xin >Priority: Major > > We should track file listing time and add it to the scan node's SQL metric, > so we have visibility how much is spent in file listing. It'd be useful to > track not just duration, but also start and end time so we can construct a > timeline. > This requires a little bit design to define what file listing time means, > when we are reading from cache, vs not cache. > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-26368) Make it clear that getOrInferFileFormatSchema doesn't create InMemoryFileIndex
Reynold Xin created SPARK-26368: --- Summary: Make it clear that getOrInferFileFormatSchema doesn't create InMemoryFileIndex Key: SPARK-26368 URL: https://issues.apache.org/jira/browse/SPARK-26368 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 2.4.0 Reporter: Reynold Xin Assignee: Reynold Xin I was looking at the code and it was a bit difficult to see the life cycle of InMemoryFileIndex passed into getOrInferFileFormatSchema, because once it is passed in, and another time it was created in getOrInferFileFormatSchema. It'd be easier to understand the life cycle if we move the creation of it out. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26222) Scan: track file listing time
[ https://issues.apache.org/jira/browse/SPARK-26222?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16715663#comment-16715663 ] Reynold Xin commented on SPARK-26222: - Do we do any file listing in non-physical phase? E.g. if somebody does spark.read.parquet("..."), will we do any file listing there? > Scan: track file listing time > - > > Key: SPARK-26222 > URL: https://issues.apache.org/jira/browse/SPARK-26222 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.4.0 >Reporter: Reynold Xin >Priority: Major > > We should track file listing time and add it to the scan node's SQL metric, > so we have visibility how much is spent in file listing. It'd be useful to > track not just duration, but also start and end time so we can construct a > timeline. > This requires a little bit design to define what file listing time means, > when we are reading from cache, vs not cache. > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Issue Comment Deleted] (SPARK-26221) Improve Spark SQL instrumentation and metrics
[ https://issues.apache.org/jira/browse/SPARK-26221?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Reynold Xin updated SPARK-26221: Comment: was deleted (was: User 'rxin' has created a pull request for this issue: https://github.com/apache/spark/pull/23192) > Improve Spark SQL instrumentation and metrics > - > > Key: SPARK-26221 > URL: https://issues.apache.org/jira/browse/SPARK-26221 > Project: Spark > Issue Type: Umbrella > Components: SQL >Affects Versions: 2.4.0 >Reporter: Reynold Xin >Assignee: Reynold Xin >Priority: Major > > This is an umbrella ticket for various small improvements for better metrics > and instrumentation. Some thoughts: > > Differentiate query plan that’s writing data out, vs returning data to the > driver > * I.e. ETL & report generation vs interactive analysis > * This is related to the data sink item below. We need to make sure from the > query plan we can tell what a query is doing > Data sink: Have an operator for data sink, with metrics that can tell us: > * Write time > * Number of records written > * Size of output written > * Number of partitions modified > * Metastore update time > * Also track number of records for collect / limit > Scan > * Track file listing time (start and end so we can construct timeline, not > just duration) > * Track metastore operation time > * Track IO decoding time for row-based input sources; Need to make sure > overhead is low > Shuffle > * Track read time and write time > * Decide if we can measure serialization and deserialization > Client fetch time > * Sometimes a query take long to run because it is blocked on the client > fetching result (e.g. using a result iterator). Record the time blocked on > client so we can remove it in measuring query execution time. > Make it easy to correlate queries with jobs, stages, and tasks belonging to a > single query, e.g. dump execution id in task logs? > Better logging: > * Enable logging the query execution id and TID in executor logs, and query > execution id in driver logs. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Issue Comment Deleted] (SPARK-26221) Improve Spark SQL instrumentation and metrics
[ https://issues.apache.org/jira/browse/SPARK-26221?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Reynold Xin updated SPARK-26221: Comment: was deleted (was: User 'rxin' has created a pull request for this issue: https://github.com/apache/spark/pull/23192) > Improve Spark SQL instrumentation and metrics > - > > Key: SPARK-26221 > URL: https://issues.apache.org/jira/browse/SPARK-26221 > Project: Spark > Issue Type: Umbrella > Components: SQL >Affects Versions: 2.4.0 >Reporter: Reynold Xin >Assignee: Reynold Xin >Priority: Major > > This is an umbrella ticket for various small improvements for better metrics > and instrumentation. Some thoughts: > > Differentiate query plan that’s writing data out, vs returning data to the > driver > * I.e. ETL & report generation vs interactive analysis > * This is related to the data sink item below. We need to make sure from the > query plan we can tell what a query is doing > Data sink: Have an operator for data sink, with metrics that can tell us: > * Write time > * Number of records written > * Size of output written > * Number of partitions modified > * Metastore update time > * Also track number of records for collect / limit > Scan > * Track file listing time (start and end so we can construct timeline, not > just duration) > * Track metastore operation time > * Track IO decoding time for row-based input sources; Need to make sure > overhead is low > Shuffle > * Track read time and write time > * Decide if we can measure serialization and deserialization > Client fetch time > * Sometimes a query take long to run because it is blocked on the client > fetching result (e.g. using a result iterator). Record the time blocked on > client so we can remove it in measuring query execution time. > Make it easy to correlate queries with jobs, stages, and tasks belonging to a > single query, e.g. dump execution id in task logs? > Better logging: > * Enable logging the query execution id and TID in executor logs, and query > execution id in driver logs. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-26139) Support passing shuffle metrics to exchange operator
[ https://issues.apache.org/jira/browse/SPARK-26139?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Reynold Xin resolved SPARK-26139. - Resolution: Fixed Fix Version/s: 3.0.0 > Support passing shuffle metrics to exchange operator > > > Key: SPARK-26139 > URL: https://issues.apache.org/jira/browse/SPARK-26139 > Project: Spark > Issue Type: New Feature > Components: SQL >Affects Versions: 2.4.0 >Reporter: Reynold Xin >Assignee: Reynold Xin >Priority: Major > Fix For: 3.0.0 > > > Due to the way Spark's architected (SQL is defined on top of the RDD API), > there are two separate metrics system used in core vs SQL. Ideally, we'd want > to be able to get the shuffle metrics for each of the exchange operator > independently, e.g. blocks read, number of records. > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Issue Comment Deleted] (SPARK-26193) Implement shuffle write metrics in SQL
[ https://issues.apache.org/jira/browse/SPARK-26193?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Reynold Xin updated SPARK-26193: Comment: was deleted (was: rxin commented on a change in pull request #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQL URL: https://github.com/apache/spark/pull/23207#discussion_r240394461 ## File path: core/src/main/scala/org/apache/spark/shuffle/ShuffleWriteProcessor.scala ## @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle + +import org.apache.spark.{Partition, ShuffleDependency, SparkEnv, TaskContext} +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.scheduler.MapStatus + +/** + * The interface for customizing shuffle write process. The driver create a ShuffleWriteProcessor + * and put it into [[ShuffleDependency]], and executors use it in each ShuffleMapTask. + */ +private[spark] class ShuffleWriteProcessor extends Serializable with Logging { + + /** + * Create a [[ShuffleWriteMetricsReporter]] from the task context. As the reporter is a + * per-row operator, here need a careful consideration on performance. + */ + protected def createMetricsReporter(context: TaskContext): ShuffleWriteMetricsReporter = { +context.taskMetrics().shuffleWriteMetrics + } + + /** + * The write process for particular partition, it controls the life circle of [[ShuffleWriter]] + * get from [[ShuffleManager]] and triggers rdd compute, finally return the [[MapStatus]] for + * this task. + */ + def writeProcess( Review comment: a nit: it's weird to call this "writeProcess". Maybe just "write", or just "process". This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org ) > Implement shuffle write metrics in SQL > -- > > Key: SPARK-26193 > URL: https://issues.apache.org/jira/browse/SPARK-26193 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 3.0.0 >Reporter: Xiao Li >Assignee: Yuanjian Li >Priority: Major > Fix For: 3.0.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Issue Comment Deleted] (SPARK-26139) Support passing shuffle metrics to exchange operator
[ https://issues.apache.org/jira/browse/SPARK-26139?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Reynold Xin updated SPARK-26139: Comment: was deleted (was: User 'xuanyuanking' has created a pull request for this issue: https://github.com/apache/spark/pull/23128) > Support passing shuffle metrics to exchange operator > > > Key: SPARK-26139 > URL: https://issues.apache.org/jira/browse/SPARK-26139 > Project: Spark > Issue Type: New Feature > Components: SQL >Affects Versions: 2.4.0 >Reporter: Reynold Xin >Assignee: Reynold Xin >Priority: Major > > Due to the way Spark's architected (SQL is defined on top of the RDD API), > there are two separate metrics system used in core vs SQL. Ideally, we'd want > to be able to get the shuffle metrics for each of the exchange operator > independently, e.g. blocks read, number of records. > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26193) Implement shuffle write metrics in SQL
[ https://issues.apache.org/jira/browse/SPARK-26193?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16706368#comment-16706368 ] Reynold Xin commented on SPARK-26193: - Can we simplify it and add those metrics only to the same exchange operator as the read side? > Implement shuffle write metrics in SQL > -- > > Key: SPARK-26193 > URL: https://issues.apache.org/jira/browse/SPARK-26193 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 3.0.0 >Reporter: Xiao Li >Assignee: Yuanjian Li >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-26226) Update query tracker to report timeline for phases, rather than duration
[ https://issues.apache.org/jira/browse/SPARK-26226?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Reynold Xin resolved SPARK-26226. - Resolution: Fixed Fix Version/s: 3.0.0 > Update query tracker to report timeline for phases, rather than duration > > > Key: SPARK-26226 > URL: https://issues.apache.org/jira/browse/SPARK-26226 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.4.0 >Reporter: Reynold Xin >Assignee: Reynold Xin >Priority: Major > Fix For: 3.0.0 > > > It'd be more useful to report start and end time for each phrase, rather than > only a single duration. This way we can look at timelines and figure out if > there is any unaccounted time. > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-26241) Add queryId to IncrementalExecution
Reynold Xin created SPARK-26241: --- Summary: Add queryId to IncrementalExecution Key: SPARK-26241 URL: https://issues.apache.org/jira/browse/SPARK-26241 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.4.0 Reporter: Reynold Xin Assignee: Reynold Xin It'd be useful to have the streaming query uuid in IncrementalExecution, when we look at the QueryExecution in isolation to trace back the query. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26241) Add queryId to IncrementalExecution
[ https://issues.apache.org/jira/browse/SPARK-26241?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Reynold Xin updated SPARK-26241: Issue Type: Sub-task (was: Bug) Parent: SPARK-26221 > Add queryId to IncrementalExecution > --- > > Key: SPARK-26241 > URL: https://issues.apache.org/jira/browse/SPARK-26241 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.4.0 >Reporter: Reynold Xin >Assignee: Reynold Xin >Priority: Major > > It'd be useful to have the streaming query uuid in IncrementalExecution, when > we look at the QueryExecution in isolation to trace back the query. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-26226) Update query tracker to report timeline for phases, rather than duration
[ https://issues.apache.org/jira/browse/SPARK-26226?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Reynold Xin reassigned SPARK-26226: --- Assignee: Reynold Xin > Update query tracker to report timeline for phases, rather than duration > > > Key: SPARK-26226 > URL: https://issues.apache.org/jira/browse/SPARK-26226 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.4.0 >Reporter: Reynold Xin >Assignee: Reynold Xin >Priority: Major > > It'd be more useful to report start and end time for each phrase, rather than > only a single duration. This way we can look at timelines and figure out if > there is any unaccounted time. > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-26226) Update query tracker to report timeline for phases, rather than duration
Reynold Xin created SPARK-26226: --- Summary: Update query tracker to report timeline for phases, rather than duration Key: SPARK-26226 URL: https://issues.apache.org/jira/browse/SPARK-26226 Project: Spark Issue Type: Sub-task Components: SQL Affects Versions: 2.4.0 Reporter: Reynold Xin It'd be more useful to report start and end time for each phrase, rather than only a single duration. This way we can look at timelines and figure out if there is any unaccounted time. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26221) Improve Spark SQL instrumentation and metrics
[ https://issues.apache.org/jira/browse/SPARK-26221?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Reynold Xin updated SPARK-26221: Description: This is an umbrella ticket for various small improvements for better metrics and instrumentation. Some thoughts: Differentiate query plan that’s writing data out, vs returning data to the driver * I.e. ETL & report generation vs interactive analysis * This is related to the data sink item below. We need to make sure from the query plan we can tell what a query is doing Data sink: Have an operator for data sink, with metrics that can tell us: * Write time * Number of records written * Size of output written * Number of partitions modified * Metastore update time * Also track number of records for collect / limit Scan * Track file listing time (start and end so we can construct timeline, not just duration) * Track metastore operation time * Track IO decoding time for row-based input sources; Need to make sure overhead is low Shuffle * Track read time and write time * Decide if we can measure serialization and deserialization Client fetch time * Sometimes a query take long to run because it is blocked on the client fetching result (e.g. using a result iterator). Record the time blocked on client so we can remove it in measuring query execution time. Make it easy to correlate queries with jobs, stages, and tasks belonging to a single query Better logging: * Enable logging the query execution id and TID in executor logs, and query execution id in driver logs. was:This is an umbrella ticket for various small improvements for better metrics and instrumentation. > Improve Spark SQL instrumentation and metrics > - > > Key: SPARK-26221 > URL: https://issues.apache.org/jira/browse/SPARK-26221 > Project: Spark > Issue Type: Umbrella > Components: SQL >Affects Versions: 2.4.0 >Reporter: Reynold Xin >Assignee: Reynold Xin >Priority: Major > > This is an umbrella ticket for various small improvements for better metrics > and instrumentation. Some thoughts: > > Differentiate query plan that’s writing data out, vs returning data to the > driver > * I.e. ETL & report generation vs interactive analysis > * This is related to the data sink item below. We need to make sure from the > query plan we can tell what a query is doing > Data sink: Have an operator for data sink, with metrics that can tell us: > * Write time > * Number of records written > * Size of output written > * Number of partitions modified > * Metastore update time > * Also track number of records for collect / limit > Scan > * Track file listing time (start and end so we can construct timeline, not > just duration) > * Track metastore operation time > * Track IO decoding time for row-based input sources; Need to make sure > overhead is low > Shuffle > * Track read time and write time > * Decide if we can measure serialization and deserialization > Client fetch time > * Sometimes a query take long to run because it is blocked on the client > fetching result (e.g. using a result iterator). Record the time blocked on > client so we can remove it in measuring query execution time. > Make it easy to correlate queries with jobs, stages, and tasks belonging to a > single query > Better logging: > * Enable logging the query execution id and TID in executor logs, and query > execution id in driver logs. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26221) Improve Spark SQL instrumentation and metrics
[ https://issues.apache.org/jira/browse/SPARK-26221?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Reynold Xin updated SPARK-26221: Description: This is an umbrella ticket for various small improvements for better metrics and instrumentation. Some thoughts: Differentiate query plan that’s writing data out, vs returning data to the driver * I.e. ETL & report generation vs interactive analysis * This is related to the data sink item below. We need to make sure from the query plan we can tell what a query is doing Data sink: Have an operator for data sink, with metrics that can tell us: * Write time * Number of records written * Size of output written * Number of partitions modified * Metastore update time * Also track number of records for collect / limit Scan * Track file listing time (start and end so we can construct timeline, not just duration) * Track metastore operation time * Track IO decoding time for row-based input sources; Need to make sure overhead is low Shuffle * Track read time and write time * Decide if we can measure serialization and deserialization Client fetch time * Sometimes a query take long to run because it is blocked on the client fetching result (e.g. using a result iterator). Record the time blocked on client so we can remove it in measuring query execution time. Make it easy to correlate queries with jobs, stages, and tasks belonging to a single query Better logging: * Enable logging the query execution id and TID in executor logs, and query execution id in driver logs. was: This is an umbrella ticket for various small improvements for better metrics and instrumentation. Some thoughts: Differentiate query plan that’s writing data out, vs returning data to the driver * I.e. ETL & report generation vs interactive analysis * This is related to the data sink item below. We need to make sure from the query plan we can tell what a query is doing Data sink: Have an operator for data sink, with metrics that can tell us: * Write time * Number of records written * Size of output written * Number of partitions modified * Metastore update time * Also track number of records for collect / limit Scan * Track file listing time (start and end so we can construct timeline, not just duration) * Track metastore operation time * Track IO decoding time for row-based input sources; Need to make sure overhead is low Shuffle * Track read time and write time * Decide if we can measure serialization and deserialization Client fetch time * Sometimes a query take long to run because it is blocked on the client fetching result (e.g. using a result iterator). Record the time blocked on client so we can remove it in measuring query execution time. Make it easy to correlate queries with jobs, stages, and tasks belonging to a single query Better logging: * Enable logging the query execution id and TID in executor logs, and query execution id in driver logs. > Improve Spark SQL instrumentation and metrics > - > > Key: SPARK-26221 > URL: https://issues.apache.org/jira/browse/SPARK-26221 > Project: Spark > Issue Type: Umbrella > Components: SQL >Affects Versions: 2.4.0 >Reporter: Reynold Xin >Assignee: Reynold Xin >Priority: Major > > This is an umbrella ticket for various small improvements for better metrics > and instrumentation. Some thoughts: > > Differentiate query plan that’s writing data out, vs returning data to the > driver > * I.e. ETL & report generation vs interactive analysis > * This is related to the data sink item below. We need to make sure from the > query plan we can tell what a query is doing > Data sink: Have an operator for data sink, with metrics that can tell us: > * Write time > * Number of records written > * Size of output written > * Number of partitions modified > * Metastore update time > * Also track number of records for collect / limit > Scan > * Track file listing time (start and end so we can construct timeline, not > just duration) > * Track metastore operation time > * Track IO decoding time for row-based input sources; Need to make sure > overhead is low > Shuffle > * Track read time and write time > * Decide if we can measure serialization and deserialization > Client fetch time > * Sometimes a query take long to run because it is blocked on the client > fetching result (e.g. using a result iterator). Record the time blocked on > client so we can remove it in measuring query execution time. > Make it easy to correlate queries with jobs, stages, and tasks belonging to a > single query > Better logging: > * Enable logging the query execution id and TID in executor logs, and query > execution id in driver logs. -- This message was sent by At
[jira] [Updated] (SPARK-26129) Instrumentation for query planning time
[ https://issues.apache.org/jira/browse/SPARK-26129?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Reynold Xin updated SPARK-26129: Issue Type: Sub-task (was: New Feature) Parent: SPARK-26221 > Instrumentation for query planning time > --- > > Key: SPARK-26129 > URL: https://issues.apache.org/jira/browse/SPARK-26129 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.4.0 >Reporter: Reynold Xin >Assignee: Reynold Xin >Priority: Major > Fix For: 3.0.0 > > > We currently don't have good visibility into query planning time (analysis vs > optimization vs physical planning). This patch adds a simple utility to track > the runtime of various rules and various planning phases. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26221) Improve Spark SQL instrumentation and metrics
[ https://issues.apache.org/jira/browse/SPARK-26221?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Reynold Xin updated SPARK-26221: Description: This is an umbrella ticket for various small improvements for better metrics and instrumentation. Some thoughts: Differentiate query plan that’s writing data out, vs returning data to the driver * I.e. ETL & report generation vs interactive analysis * This is related to the data sink item below. We need to make sure from the query plan we can tell what a query is doing Data sink: Have an operator for data sink, with metrics that can tell us: * Write time * Number of records written * Size of output written * Number of partitions modified * Metastore update time * Also track number of records for collect / limit Scan * Track file listing time (start and end so we can construct timeline, not just duration) * Track metastore operation time * Track IO decoding time for row-based input sources; Need to make sure overhead is low Shuffle * Track read time and write time * Decide if we can measure serialization and deserialization Client fetch time * Sometimes a query take long to run because it is blocked on the client fetching result (e.g. using a result iterator). Record the time blocked on client so we can remove it in measuring query execution time. Make it easy to correlate queries with jobs, stages, and tasks belonging to a single query, e.g. dump execution id in task logs? Better logging: * Enable logging the query execution id and TID in executor logs, and query execution id in driver logs. was: This is an umbrella ticket for various small improvements for better metrics and instrumentation. Some thoughts: Differentiate query plan that’s writing data out, vs returning data to the driver * I.e. ETL & report generation vs interactive analysis * This is related to the data sink item below. We need to make sure from the query plan we can tell what a query is doing Data sink: Have an operator for data sink, with metrics that can tell us: * Write time * Number of records written * Size of output written * Number of partitions modified * Metastore update time * Also track number of records for collect / limit Scan * Track file listing time (start and end so we can construct timeline, not just duration) * Track metastore operation time * Track IO decoding time for row-based input sources; Need to make sure overhead is low Shuffle * Track read time and write time * Decide if we can measure serialization and deserialization Client fetch time * Sometimes a query take long to run because it is blocked on the client fetching result (e.g. using a result iterator). Record the time blocked on client so we can remove it in measuring query execution time. Make it easy to correlate queries with jobs, stages, and tasks belonging to a single query Better logging: * Enable logging the query execution id and TID in executor logs, and query execution id in driver logs. > Improve Spark SQL instrumentation and metrics > - > > Key: SPARK-26221 > URL: https://issues.apache.org/jira/browse/SPARK-26221 > Project: Spark > Issue Type: Umbrella > Components: SQL >Affects Versions: 2.4.0 >Reporter: Reynold Xin >Assignee: Reynold Xin >Priority: Major > > This is an umbrella ticket for various small improvements for better metrics > and instrumentation. Some thoughts: > > Differentiate query plan that’s writing data out, vs returning data to the > driver > * I.e. ETL & report generation vs interactive analysis > * This is related to the data sink item below. We need to make sure from the > query plan we can tell what a query is doing > Data sink: Have an operator for data sink, with metrics that can tell us: > * Write time > * Number of records written > * Size of output written > * Number of partitions modified > * Metastore update time > * Also track number of records for collect / limit > Scan > * Track file listing time (start and end so we can construct timeline, not > just duration) > * Track metastore operation time > * Track IO decoding time for row-based input sources; Need to make sure > overhead is low > Shuffle > * Track read time and write time > * Decide if we can measure serialization and deserialization > Client fetch time > * Sometimes a query take long to run because it is blocked on the client > fetching result (e.g. using a result iterator). Record the time blocked on > client so we can remove it in measuring query execution time. > Make it easy to correlate queries with jobs, stages, and tasks belonging to a > single query, e.g. dump execution id in task logs? > Better logging: > * Enable logging the query execution id and TID in executor logs, and query > execution id
[jira] [Updated] (SPARK-26223) Scan: track metastore operation time
[ https://issues.apache.org/jira/browse/SPARK-26223?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Reynold Xin updated SPARK-26223: Description: The Scan node should report how much time it spent in metastore operations. Similar to file listing, would be great to also report start and end time for constructing a timeline. was:The Scan node should report how much time it spent in metastore operations. > Scan: track metastore operation time > > > Key: SPARK-26223 > URL: https://issues.apache.org/jira/browse/SPARK-26223 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.4.0 >Reporter: Reynold Xin >Priority: Major > > The Scan node should report how much time it spent in metastore operations. > Similar to file listing, would be great to also report start and end time for > constructing a timeline. > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-26225) Scan: track decoding time for row-based data sources
Reynold Xin created SPARK-26225: --- Summary: Scan: track decoding time for row-based data sources Key: SPARK-26225 URL: https://issues.apache.org/jira/browse/SPARK-26225 Project: Spark Issue Type: Sub-task Components: SQL Affects Versions: 2.4.0 Reporter: Reynold Xin Scan node should report decoding time for each record, if it is not too much overhead. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-26223) Scan: track metastore operation time
Reynold Xin created SPARK-26223: --- Summary: Scan: track metastore operation time Key: SPARK-26223 URL: https://issues.apache.org/jira/browse/SPARK-26223 Project: Spark Issue Type: Sub-task Components: SQL Affects Versions: 2.4.0 Reporter: Reynold Xin The Scan node should report how much time it spent in metastore operations. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-26222) Scan: track file listing time
Reynold Xin created SPARK-26222: --- Summary: Scan: track file listing time Key: SPARK-26222 URL: https://issues.apache.org/jira/browse/SPARK-26222 Project: Spark Issue Type: Sub-task Components: SQL Affects Versions: 2.4.0 Reporter: Reynold Xin We should track file listing time and add it to the scan node's SQL metric, so we have visibility how much is spent in file listing. It'd be useful to track not just duration, but also start and end time so we can construct a timeline. This requires a little bit design to define what file listing time means, when we are reading from cache, vs not cache. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26221) Improve Spark SQL instrumentation and metrics
[ https://issues.apache.org/jira/browse/SPARK-26221?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Reynold Xin updated SPARK-26221: Description: This is an umbrella ticket for various small improvements for better metrics and instrumentation. (was: This creates an umbrella ticket for various small improvements for better metrics and instrumentation. ) > Improve Spark SQL instrumentation and metrics > - > > Key: SPARK-26221 > URL: https://issues.apache.org/jira/browse/SPARK-26221 > Project: Spark > Issue Type: Umbrella > Components: SQL >Affects Versions: 2.4.0 >Reporter: Reynold Xin >Assignee: Reynold Xin >Priority: Major > > This is an umbrella ticket for various small improvements for better metrics > and instrumentation. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-26221) Improve Spark SQL instrumentation and metrics
Reynold Xin created SPARK-26221: --- Summary: Improve Spark SQL instrumentation and metrics Key: SPARK-26221 URL: https://issues.apache.org/jira/browse/SPARK-26221 Project: Spark Issue Type: Umbrella Components: SQL Affects Versions: 2.4.0 Reporter: Reynold Xin Assignee: Reynold Xin This creates an umbrella ticket for various small improvements for better metrics and instrumentation. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24498) Add JDK compiler for runtime codegen
[ https://issues.apache.org/jira/browse/SPARK-24498?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16702515#comment-16702515 ] Reynold Xin commented on SPARK-24498: - Why don't we close the ticket? I heard we would get mixed performance, and it doesn't seem like worth it to add 500 lines of code for something that has unclear value. > Add JDK compiler for runtime codegen > > > Key: SPARK-24498 > URL: https://issues.apache.org/jira/browse/SPARK-24498 > Project: Spark > Issue Type: New Feature > Components: SQL >Affects Versions: 2.3.0 >Reporter: Xiao Li >Priority: Major > > In some cases, JDK compiler can generate smaller bytecode and take less time > in compilation compared to Janino. However, in some cases, Janino is better. > We should support both for our runtime codegen. Janino will be still our > default runtime codegen compiler. > See the related JIRAs in DRILL: > - https://issues.apache.org/jira/browse/DRILL-1155 > - https://issues.apache.org/jira/browse/DRILL-4778 > - https://issues.apache.org/jira/browse/DRILL-5696 -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-26141) Enable custom shuffle metrics implementation in shuffle write
[ https://issues.apache.org/jira/browse/SPARK-26141?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Reynold Xin resolved SPARK-26141. - Resolution: Fixed Fix Version/s: 3.0.0 > Enable custom shuffle metrics implementation in shuffle write > - > > Key: SPARK-26141 > URL: https://issues.apache.org/jira/browse/SPARK-26141 > Project: Spark > Issue Type: Sub-task > Components: Spark Core >Affects Versions: 2.4.0 >Reporter: Reynold Xin >Assignee: Reynold Xin >Priority: Major > Fix For: 3.0.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-26142) Implement shuffle read metrics in SQL
[ https://issues.apache.org/jira/browse/SPARK-26142?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Reynold Xin reassigned SPARK-26142: --- Assignee: Yuanjian Li > Implement shuffle read metrics in SQL > - > > Key: SPARK-26142 > URL: https://issues.apache.org/jira/browse/SPARK-26142 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.4.0 >Reporter: Reynold Xin >Assignee: Yuanjian Li >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26141) Enable custom shuffle metrics implementation in shuffle write
[ https://issues.apache.org/jira/browse/SPARK-26141?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Reynold Xin updated SPARK-26141: Summary: Enable custom shuffle metrics implementation in shuffle write (was: Enable passing in custom shuffle metrics implementation in shuffle write) > Enable custom shuffle metrics implementation in shuffle write > - > > Key: SPARK-26141 > URL: https://issues.apache.org/jira/browse/SPARK-26141 > Project: Spark > Issue Type: Sub-task > Components: Spark Core >Affects Versions: 2.4.0 >Reporter: Reynold Xin >Assignee: Reynold Xin >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-26142) Implement shuffle read metrics in SQL
Reynold Xin created SPARK-26142: --- Summary: Implement shuffle read metrics in SQL Key: SPARK-26142 URL: https://issues.apache.org/jira/browse/SPARK-26142 Project: Spark Issue Type: Sub-task Components: SQL Affects Versions: 2.4.0 Reporter: Reynold Xin -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26140) Enable custom shuffle metrics reporter in shuffle reader
[ https://issues.apache.org/jira/browse/SPARK-26140?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Reynold Xin updated SPARK-26140: Summary: Enable custom shuffle metrics reporter in shuffle reader (was: Enable custom shuffle metrics reporter into shuffle reader) > Enable custom shuffle metrics reporter in shuffle reader > > > Key: SPARK-26140 > URL: https://issues.apache.org/jira/browse/SPARK-26140 > Project: Spark > Issue Type: Sub-task > Components: Spark Core >Affects Versions: 2.4.0 >Reporter: Reynold Xin >Assignee: Reynold Xin >Priority: Major > > The first step to pull the creation of TempShuffleReadMetrics out of shuffle > layer, so it can be driven by an external caller. Then we can in SQL > execution pass in a special metrics reporter that allows updating > ShuffleExchangeExec's metrics. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26140) Enable custom shuffle metrics implementation in shuffle reader
[ https://issues.apache.org/jira/browse/SPARK-26140?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Reynold Xin updated SPARK-26140: Summary: Enable custom shuffle metrics implementation in shuffle reader (was: Enable custom shuffle metrics reporter in shuffle reader) > Enable custom shuffle metrics implementation in shuffle reader > -- > > Key: SPARK-26140 > URL: https://issues.apache.org/jira/browse/SPARK-26140 > Project: Spark > Issue Type: Sub-task > Components: Spark Core >Affects Versions: 2.4.0 >Reporter: Reynold Xin >Assignee: Reynold Xin >Priority: Major > > The first step to pull the creation of TempShuffleReadMetrics out of shuffle > layer, so it can be driven by an external caller. Then we can in SQL > execution pass in a special metrics reporter that allows updating > ShuffleExchangeExec's metrics. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26140) Enable custom shuffle metrics reporter into shuffle reader
[ https://issues.apache.org/jira/browse/SPARK-26140?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Reynold Xin updated SPARK-26140: Summary: Enable custom shuffle metrics reporter into shuffle reader (was: Enable passing in a custom shuffle metrics reporter into shuffle reader) > Enable custom shuffle metrics reporter into shuffle reader > -- > > Key: SPARK-26140 > URL: https://issues.apache.org/jira/browse/SPARK-26140 > Project: Spark > Issue Type: Sub-task > Components: Spark Core >Affects Versions: 2.4.0 >Reporter: Reynold Xin >Assignee: Reynold Xin >Priority: Major > > The first step to pull the creation of TempShuffleReadMetrics out of shuffle > layer, so it can be driven by an external caller. Then we can in SQL > execution pass in a special metrics reporter that allows updating > ShuffleExchangeExec's metrics. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-26141) Enable passing in custom shuffle metrics implementation in shuffle write
Reynold Xin created SPARK-26141: --- Summary: Enable passing in custom shuffle metrics implementation in shuffle write Key: SPARK-26141 URL: https://issues.apache.org/jira/browse/SPARK-26141 Project: Spark Issue Type: Sub-task Components: Spark Core Affects Versions: 2.4.0 Reporter: Reynold Xin Assignee: Reynold Xin -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-26129) Instrumentation for query planning time
[ https://issues.apache.org/jira/browse/SPARK-26129?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Reynold Xin resolved SPARK-26129. - Resolution: Fixed Fix Version/s: 3.0.0 > Instrumentation for query planning time > --- > > Key: SPARK-26129 > URL: https://issues.apache.org/jira/browse/SPARK-26129 > Project: Spark > Issue Type: New Feature > Components: SQL >Affects Versions: 2.4.0 >Reporter: Reynold Xin >Assignee: Reynold Xin >Priority: Major > Fix For: 3.0.0 > > > We currently don't have good visibility into query planning time (analysis vs > optimization vs physical planning). This patch adds a simple utility to track > the runtime of various rules and various planning phases. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-26140) Pull TempShuffleReadMetrics creation out of shuffle layer
Reynold Xin created SPARK-26140: --- Summary: Pull TempShuffleReadMetrics creation out of shuffle layer Key: SPARK-26140 URL: https://issues.apache.org/jira/browse/SPARK-26140 Project: Spark Issue Type: Sub-task Components: Spark Core Affects Versions: 2.4.0 Reporter: Reynold Xin Assignee: Reynold Xin The first step to pull the creation of TempShuffleReadMetrics out of shuffle layer, so it can be driven by an external caller. Then we can in SQL execution pass in a special metrics reporter that allows updating ShuffleExchangeExec's metrics. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26140) Enable passing in a custom shuffle metrics reporter into shuffle reader
[ https://issues.apache.org/jira/browse/SPARK-26140?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Reynold Xin updated SPARK-26140: Summary: Enable passing in a custom shuffle metrics reporter into shuffle reader (was: Allow passing in a custom shuffle metrics reporter into shuffle reader) > Enable passing in a custom shuffle metrics reporter into shuffle reader > --- > > Key: SPARK-26140 > URL: https://issues.apache.org/jira/browse/SPARK-26140 > Project: Spark > Issue Type: Sub-task > Components: Spark Core >Affects Versions: 2.4.0 >Reporter: Reynold Xin >Assignee: Reynold Xin >Priority: Major > > The first step to pull the creation of TempShuffleReadMetrics out of shuffle > layer, so it can be driven by an external caller. Then we can in SQL > execution pass in a special metrics reporter that allows updating > ShuffleExchangeExec's metrics. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26140) Allow passing in a custom shuffle metrics reporter into shuffle reader
[ https://issues.apache.org/jira/browse/SPARK-26140?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Reynold Xin updated SPARK-26140: Summary: Allow passing in a custom shuffle metrics reporter into shuffle reader (was: Pull TempShuffleReadMetrics creation out of shuffle layer) > Allow passing in a custom shuffle metrics reporter into shuffle reader > -- > > Key: SPARK-26140 > URL: https://issues.apache.org/jira/browse/SPARK-26140 > Project: Spark > Issue Type: Sub-task > Components: Spark Core >Affects Versions: 2.4.0 >Reporter: Reynold Xin >Assignee: Reynold Xin >Priority: Major > > The first step to pull the creation of TempShuffleReadMetrics out of shuffle > layer, so it can be driven by an external caller. Then we can in SQL > execution pass in a special metrics reporter that allows updating > ShuffleExchangeExec's metrics. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-26139) Support passing shuffle metrics to exchange operator
Reynold Xin created SPARK-26139: --- Summary: Support passing shuffle metrics to exchange operator Key: SPARK-26139 URL: https://issues.apache.org/jira/browse/SPARK-26139 Project: Spark Issue Type: New Feature Components: SQL Affects Versions: 2.4.0 Reporter: Reynold Xin Assignee: Reynold Xin Due to the way Spark's architected (SQL is defined on top of the RDD API), there are two separate metrics system used in core vs SQL. Ideally, we'd want to be able to get the shuffle metrics for each of the exchange operator independently, e.g. blocks read, number of records. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-26129) Instrumentation for query planning time
Reynold Xin created SPARK-26129: --- Summary: Instrumentation for query planning time Key: SPARK-26129 URL: https://issues.apache.org/jira/browse/SPARK-26129 Project: Spark Issue Type: New Feature Components: SQL Affects Versions: 2.4.0 Reporter: Reynold Xin Assignee: Reynold Xin We currently don't have good visibility into query planning time (analysis vs optimization vs physical planning). This patch adds a simple utility to track the runtime of various rules and various planning phases. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-21559) Remove Mesos fine-grained mode
[ https://issues.apache.org/jira/browse/SPARK-21559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Reynold Xin updated SPARK-21559: Target Version/s: 3.0.0 Labels: release-notes (was: ) > Remove Mesos fine-grained mode > -- > > Key: SPARK-21559 > URL: https://issues.apache.org/jira/browse/SPARK-21559 > Project: Spark > Issue Type: Task > Components: Mesos >Affects Versions: 2.2.0 >Reporter: Stavros Kontopoulos >Priority: Major > Labels: release-notes > > After discussing this with people from Mesosphere we agreed that it is time > to remove fine grained mode. Plans are to improve cluster mode to cover any > benefits may existed when using fine grained mode. > [~susanxhuynh] > Previous status of this can be found here: > https://issues.apache.org/jira/browse/SPARK-11857 -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17967) Support for list or other types as an option for datasources
[ https://issues.apache.org/jira/browse/SPARK-17967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16677418#comment-16677418 ] Reynold Xin commented on SPARK-17967: - BTW how important is this? Seems like for CSV people can just replace the null values with null themselves using the programmatic API. > Support for list or other types as an option for datasources > > > Key: SPARK-17967 > URL: https://issues.apache.org/jira/browse/SPARK-17967 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.0.0, 2.0.1 >Reporter: Hyukjin Kwon >Priority: Major > > This was discussed in SPARK-17878 > For other datasources, it seems okay with string/long/boolean/double value as > an option but it seems it is not enough for the datasource such as CSV. As it > is an interface for other external datasources, I guess it'd affect several > ones out there. > I took a look a first but it seems it'd be difficult to support this (need to > change a lot). > One suggestion is support this as a JSON array. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-25841) Redesign window function rangeBetween API
[ https://issues.apache.org/jira/browse/SPARK-25841?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1559#comment-1559 ] Reynold Xin commented on SPARK-25841: - I posted api proposal sketches in https://issues.apache.org/jira/browse/SPARK-25843 > Redesign window function rangeBetween API > - > > Key: SPARK-25841 > URL: https://issues.apache.org/jira/browse/SPARK-25841 > Project: Spark > Issue Type: Umbrella > Components: SQL >Affects Versions: 2.3.2, 2.4.0 >Reporter: Reynold Xin >Assignee: Reynold Xin >Priority: Major > > As I was reviewing the Spark API changes for 2.4, I found that through > organic, ad-hoc evolution the current API for window functions in Scala is > pretty bad. > > To illustrate the problem, we have two rangeBetween functions in Window > class: > > {code:java} > class Window { > def unboundedPreceding: Long > ... > def rangeBetween(start: Long, end: Long): WindowSpec > def rangeBetween(start: Column, end: Column): WindowSpec > }{code} > > The Column version of rangeBetween was added in Spark 2.3 because the > previous version (Long) could only support integral values and not time > intervals. Now in order to support specifying unboundedPreceding in the > rangeBetween(Column, Column) API, we added an unboundedPreceding that returns > a Column in functions.scala. > > There are a few issues I have with the API: > > 1. To the end user, this can be just super confusing. Why are there two > unboundedPreceding functions, in different classes, that are named the same > but return different types? > > 2. Using Column as the parameter signature implies this can be an actual > Column, but in practice rangeBetween can only accept literal values. > > 3. We added the new APIs to support intervals, but they don't actually work, > because in the implementation we try to validate the start is less than the > end, but calendar interval types are not comparable, and as a result we throw > a type mismatch exception at runtime: scala.MatchError: CalendarIntervalType > (of class org.apache.spark.sql.types.CalendarIntervalType$) > > 4. In order to make interval work, users need to create an interval using > CalendarInterval, which is an internal class that has no documentation and no > stable API. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-25843) Redesign rangeBetween API
[ https://issues.apache.org/jira/browse/SPARK-25843?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Reynold Xin updated SPARK-25843: Description: See parent ticket for more information. Two proposals with sketches: Proposal 1. create a version of rangeBetween that accepts Strings, i.e. rangeBetween(String, String). This is obviously very flexible, but less type safe. Proposal 2. creates a new type called WindowFrameBoundary: {code:java} trait WindowFrameBoundary object WindowFrameBoundary { def unboundedPreceding: WindowFrameBoundary def unboundedFollowing: WindowFrameBoundary def currentRow: WindowFrameBoundary def at(value: Long) def interval(interval: String) }{code} And create a new rangeBetween that accepts WindowFrameBoundary's, i.e. {code:java} def rangeBetween(start: WindowFrameBoundary, end: WindowFrameBoundary) {code} This is also very flexible and type safe at the same time. Note the two are not mutually exclusive, and we can also deprecate the existing confusing APIs. was: See parent ticket for more information. I have a rough design that I will post later. > Redesign rangeBetween API > - > > Key: SPARK-25843 > URL: https://issues.apache.org/jira/browse/SPARK-25843 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.4.0 >Reporter: Reynold Xin >Assignee: Reynold Xin >Priority: Major > > See parent ticket for more information. Two proposals with sketches: > > Proposal 1. create a version of rangeBetween that accepts Strings, i.e. > rangeBetween(String, String). This is obviously very flexible, but less type > safe. > > Proposal 2. creates a new type called WindowFrameBoundary: > > > {code:java} > trait WindowFrameBoundary > > object WindowFrameBoundary { > def unboundedPreceding: WindowFrameBoundary > def unboundedFollowing: WindowFrameBoundary > def currentRow: WindowFrameBoundary > def at(value: Long) > def interval(interval: String) > }{code} > > And create a new rangeBetween that accepts WindowFrameBoundary's, i.e. > > > {code:java} > def rangeBetween(start: WindowFrameBoundary, end: WindowFrameBoundary) {code} > > This is also very flexible and type safe at the same time. > > > Note the two are not mutually exclusive, and we can also deprecate the > existing confusing APIs. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-25862) Remove rangeBetween APIs introduced in SPARK-21608
Reynold Xin created SPARK-25862: --- Summary: Remove rangeBetween APIs introduced in SPARK-21608 Key: SPARK-25862 URL: https://issues.apache.org/jira/browse/SPARK-25862 Project: Spark Issue Type: Sub-task Components: SQL Affects Versions: 2.4.0 Reporter: Reynold Xin Assignee: Reynold Xin As a follow up to https://issues.apache.org/jira/browse/SPARK-25842, removing the API so we can introduce a new one. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-23084) Add unboundedPreceding(), unboundedFollowing() and currentRow() to PySpark
[ https://issues.apache.org/jira/browse/SPARK-23084?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Reynold Xin resolved SPARK-23084. - Resolution: Won't Fix Fix Version/s: (was: 2.4.0) This was merged but then reverted due to https://issues.apache.org/jira/browse/SPARK-25842 > Add unboundedPreceding(), unboundedFollowing() and currentRow() to PySpark > --- > > Key: SPARK-23084 > URL: https://issues.apache.org/jira/browse/SPARK-23084 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 2.3.0 >Reporter: Xiao Li >Assignee: Huaxin Gao >Priority: Major > > Add the new APIs (introduced by https://github.com/apache/spark/pull/18814) > to PySpark. Also update the rangeBetween API > {noformat} > /** > * Window function: returns the special frame boundary that represents the > first row in the > * window partition. > * > * @group window_funcs > * @since 2.3.0 > */ > def unboundedPreceding(): Column = Column(UnboundedPreceding) > /** > * Window function: returns the special frame boundary that represents the > last row in the > * window partition. > * > * @group window_funcs > * @since 2.3.0 > */ > def unboundedFollowing(): Column = Column(UnboundedFollowing) > /** > * Window function: returns the special frame boundary that represents the > current row in the > * window partition. > * > * @group window_funcs > * @since 2.3.0 > */ > def currentRow(): Column = Column(CurrentRow) > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Reopened] (SPARK-23084) Add unboundedPreceding(), unboundedFollowing() and currentRow() to PySpark
[ https://issues.apache.org/jira/browse/SPARK-23084?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Reynold Xin reopened SPARK-23084: - > Add unboundedPreceding(), unboundedFollowing() and currentRow() to PySpark > --- > > Key: SPARK-23084 > URL: https://issues.apache.org/jira/browse/SPARK-23084 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 2.3.0 >Reporter: Xiao Li >Assignee: Huaxin Gao >Priority: Major > > Add the new APIs (introduced by https://github.com/apache/spark/pull/18814) > to PySpark. Also update the rangeBetween API > {noformat} > /** > * Window function: returns the special frame boundary that represents the > first row in the > * window partition. > * > * @group window_funcs > * @since 2.3.0 > */ > def unboundedPreceding(): Column = Column(UnboundedPreceding) > /** > * Window function: returns the special frame boundary that represents the > last row in the > * window partition. > * > * @group window_funcs > * @since 2.3.0 > */ > def unboundedFollowing(): Column = Column(UnboundedFollowing) > /** > * Window function: returns the special frame boundary that represents the > current row in the > * window partition. > * > * @group window_funcs > * @since 2.3.0 > */ > def currentRow(): Column = Column(CurrentRow) > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-25845) Fix MatchError for calendar interval type in rangeBetween
Reynold Xin created SPARK-25845: --- Summary: Fix MatchError for calendar interval type in rangeBetween Key: SPARK-25845 URL: https://issues.apache.org/jira/browse/SPARK-25845 Project: Spark Issue Type: Sub-task Components: SQL Affects Versions: 2.4.0 Reporter: Reynold Xin WindowSpecDefinition checks start < less, but CalendarIntervalType is not comparable, so it would throw the following exception at runtime: {noformat} scala.MatchError: CalendarIntervalType (of class org.apache.spark.sql.types.CalendarIntervalType$) at org.apache.spark.sql.catalyst.util.TypeUtils$.getInterpretedOrdering(TypeUtils.scala:58) at org.apache.spark.sql.catalyst.expressions.BinaryComparison.ordering$lzycompute(predicates.scala:592) at org.apache.spark.sql.catalyst.expressions.BinaryComparison.ordering(predicates.scala:592) at org.apache.spark.sql.catalyst.expressions.GreaterThan.nullSafeEval(predicates.scala:797) at org.apache.spark.sql.catalyst.expressions.BinaryExpression.eval(Expression.scala:496) at org.apache.spark.sql.catalyst.expressions.SpecifiedWindowFrame.isGreaterThan(windowExpressions.scala:245) at org.apache.spark.sql.catalyst.expressions.SpecifiedWindowFrame.checkInputDataTypes(windowExpressions.scala:216) at org.apache.spark.sql.catalyst.expressions.Expression.resolved$lzycompute(Expression.scala:171) at org.apache.spark.sql.catalyst.expressions.Expression.resolved(Expression.scala:171) at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$childrenResolved$1.apply(Expression.scala:183) at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$childrenResolved$1.apply(Expression.scala:183) at scala.collection.IndexedSeqOptimized$class.prefixLengthImpl(IndexedSeqOptimized.scala:38) at scala.collection.IndexedSeqOptimized$class.forall(IndexedSeqOptimized.scala:43) at scala.collection.mutable.ArrayBuffer.forall(ArrayBuffer.scala:48) at org.apache.spark.sql.catalyst.expressions.Expression.childrenResolved(Expression.scala:183) at org.apache.spark.sql.catalyst.expressions.WindowSpecDefinition.resolved$lzycompute(windowExpressions.scala:48) at org.apache.spark.sql.catalyst.expressions.WindowSpecDefinition.resolved(windowExpressions.scala:48) at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$childrenResolved$1.apply(Expression.scala:183) at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$childrenResolved$1.apply(Expression.scala:183) at scala.collection.LinearSeqOptimized$class.forall(LinearSeqOptimized.scala:83) {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-25844) Implement Python API once we have a new API
Reynold Xin created SPARK-25844: --- Summary: Implement Python API once we have a new API Key: SPARK-25844 URL: https://issues.apache.org/jira/browse/SPARK-25844 Project: Spark Issue Type: Sub-task Components: SQL Affects Versions: 3.0.0 Reporter: Reynold Xin -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-25842) Deprecate APIs introduced in SPARK-21608
[ https://issues.apache.org/jira/browse/SPARK-25842?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Reynold Xin updated SPARK-25842: Target Version/s: 2.4.0 > Deprecate APIs introduced in SPARK-21608 > > > Key: SPARK-25842 > URL: https://issues.apache.org/jira/browse/SPARK-25842 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.3.0 >Reporter: Reynold Xin >Assignee: Reynold Xin >Priority: Major > > See the parent ticket for more information. The newly introduced API is not > only confusing, but doesn't work. We should deprecate it in 2.4, and > introduce a new version in 3.0. > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-25843) Redesign rangeBetween API
Reynold Xin created SPARK-25843: --- Summary: Redesign rangeBetween API Key: SPARK-25843 URL: https://issues.apache.org/jira/browse/SPARK-25843 Project: Spark Issue Type: Sub-task Components: SQL Affects Versions: 2.4.0 Reporter: Reynold Xin Assignee: Reynold Xin See parent ticket for more information. I have a rough design that I will post later. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-25842) Deprecate APIs introduced in SPARK-21608
Reynold Xin created SPARK-25842: --- Summary: Deprecate APIs introduced in SPARK-21608 Key: SPARK-25842 URL: https://issues.apache.org/jira/browse/SPARK-25842 Project: Spark Issue Type: Sub-task Components: SQL Affects Versions: 2.3.0 Reporter: Reynold Xin Assignee: Reynold Xin See the parent ticket for more information. The newly introduced API is not only confusing, but doesn't work. We should deprecate it in 2.4, and introduce a new version in 3.0. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-25841) Redesign window function rangeBetween API
[ https://issues.apache.org/jira/browse/SPARK-25841?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Reynold Xin updated SPARK-25841: Description: As I was reviewing the Spark API changes for 2.4, I found that through organic, ad-hoc evolution the current API for window functions in Scala is pretty bad. To illustrate the problem, we have two rangeBetween functions in Window class: {code:java} class Window { def unboundedPreceding: Long ... def rangeBetween(start: Long, end: Long): WindowSpec def rangeBetween(start: Column, end: Column): WindowSpec }{code} The Column version of rangeBetween was added in Spark 2.3 because the previous version (Long) could only support integral values and not time intervals. Now in order to support specifying unboundedPreceding in the rangeBetween(Column, Column) API, we added an unboundedPreceding that returns a Column in functions.scala. There are a few issues I have with the API: 1. To the end user, this can be just super confusing. Why are there two unboundedPreceding functions, in different classes, that are named the same but return different types? 2. Using Column as the parameter signature implies this can be an actual Column, but in practice rangeBetween can only accept literal values. 3. We added the new APIs to support intervals, but they don't actually work, because in the implementation we try to validate the start is less than the end, but calendar interval types are not comparable, and as a result we throw a type mismatch exception at runtime: scala.MatchError: CalendarIntervalType (of class org.apache.spark.sql.types.CalendarIntervalType$) 4. In order to make interval work, users need to create an interval using CalendarInterval, which is an internal class that has no documentation and no stable API. was: As I was reviewing the Spark API changes for 2.4, I found that through organic, ad-hoc evolution the current API for window functions in Scala is pretty bad. To illustrate the problem, we have two rangeBetween functions in Window class: class Window { def unboundedPreceding: Long ... def rangeBetween(start: Long, end: Long): WindowSpec def rangeBetween(start: Column, end: Column): WindowSpec } The Column version of rangeBetween was added in Spark 2.3 because the previous version (Long) could only support integral values and not time intervals. Now in order to support specifying unboundedPreceding in the rangeBetween(Column, Column) API, we added an unboundedPreceding that returns a Column in functions.scala. There are a few issues I have with the API: 1. To the end user, this can be just super confusing. Why are there two unboundedPreceding functions, in different classes, that are named the same but return different types? 2. Using Column as the parameter signature implies this can be an actual Column, but in practice rangeBetween can only accept literal values. 3. We added the new APIs to support intervals, but they don't actually work, because in the implementation we try to validate the start is less than the end, but calendar interval types are not comparable, and as a result we throw a type mismatch exception at runtime: scala.MatchError: CalendarIntervalType (of class org.apache.spark.sql.types.CalendarIntervalType$) 4. In order to make interval work, users need to create an interval using CalendarInterval, which is an internal class that has no documentation and no stable API. > Redesign window function rangeBetween API > - > > Key: SPARK-25841 > URL: https://issues.apache.org/jira/browse/SPARK-25841 > Project: Spark > Issue Type: Umbrella > Components: SQL >Affects Versions: 2.3.2, 2.4.0 >Reporter: Reynold Xin >Assignee: Reynold Xin >Priority: Major > > As I was reviewing the Spark API changes for 2.4, I found that through > organic, ad-hoc evolution the current API for window functions in Scala is > pretty bad. > > To illustrate the problem, we have two rangeBetween functions in Window > class: > > {code:java} > class Window { > def unboundedPreceding: Long > ... > def rangeBetween(start: Long, end: Long): WindowSpec > def rangeBetween(start: Column, end: Column): WindowSpec > }{code} > > The Column version of rangeBetween was added in Spark 2.3 because the > previous version (Long) could only support integral values and not time > intervals. Now in order to support specifying unboundedPreceding in the > rangeBetween(Column, Column) API, we added an unboundedPreceding that returns > a Column in functions.scala. > > There are a few issues I have with the API: > > 1. To the end user, this can be just super confusing. Why are there two > unboundedPreceding functions, in different classes, that are name
[jira] [Created] (SPARK-25841) Redesign window function rangeBetween API
Reynold Xin created SPARK-25841: --- Summary: Redesign window function rangeBetween API Key: SPARK-25841 URL: https://issues.apache.org/jira/browse/SPARK-25841 Project: Spark Issue Type: Umbrella Components: SQL Affects Versions: 2.3.2, 2.4.0 Reporter: Reynold Xin Assignee: Reynold Xin As I was reviewing the Spark API changes for 2.4, I found that through organic, ad-hoc evolution the current API for window functions in Scala is pretty bad. To illustrate the problem, we have two rangeBetween functions in Window class: class Window { def unboundedPreceding: Long ... def rangeBetween(start: Long, end: Long): WindowSpec def rangeBetween(start: Column, end: Column): WindowSpec } The Column version of rangeBetween was added in Spark 2.3 because the previous version (Long) could only support integral values and not time intervals. Now in order to support specifying unboundedPreceding in the rangeBetween(Column, Column) API, we added an unboundedPreceding that returns a Column in functions.scala. There are a few issues I have with the API: 1. To the end user, this can be just super confusing. Why are there two unboundedPreceding functions, in different classes, that are named the same but return different types? 2. Using Column as the parameter signature implies this can be an actual Column, but in practice rangeBetween can only accept literal values. 3. We added the new APIs to support intervals, but they don't actually work, because in the implementation we try to validate the start is less than the end, but calendar interval types are not comparable, and as a result we throw a type mismatch exception at runtime: scala.MatchError: CalendarIntervalType (of class org.apache.spark.sql.types.CalendarIntervalType$) 4. In order to make interval work, users need to create an interval using CalendarInterval, which is an internal class that has no documentation and no stable API. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-25841) Redesign window function rangeBetween API
[ https://issues.apache.org/jira/browse/SPARK-25841?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Reynold Xin updated SPARK-25841: Description: As I was reviewing the Spark API changes for 2.4, I found that through organic, ad-hoc evolution the current API for window functions in Scala is pretty bad. To illustrate the problem, we have two rangeBetween functions in Window class: class Window { def unboundedPreceding: Long ... def rangeBetween(start: Long, end: Long): WindowSpec def rangeBetween(start: Column, end: Column): WindowSpec } The Column version of rangeBetween was added in Spark 2.3 because the previous version (Long) could only support integral values and not time intervals. Now in order to support specifying unboundedPreceding in the rangeBetween(Column, Column) API, we added an unboundedPreceding that returns a Column in functions.scala. There are a few issues I have with the API: 1. To the end user, this can be just super confusing. Why are there two unboundedPreceding functions, in different classes, that are named the same but return different types? 2. Using Column as the parameter signature implies this can be an actual Column, but in practice rangeBetween can only accept literal values. 3. We added the new APIs to support intervals, but they don't actually work, because in the implementation we try to validate the start is less than the end, but calendar interval types are not comparable, and as a result we throw a type mismatch exception at runtime: scala.MatchError: CalendarIntervalType (of class org.apache.spark.sql.types.CalendarIntervalType$) 4. In order to make interval work, users need to create an interval using CalendarInterval, which is an internal class that has no documentation and no stable API. was: As I was reviewing the Spark API changes for 2.4, I found that through organic, ad-hoc evolution the current API for window functions in Scala is pretty bad. To illustrate the problem, we have two rangeBetween functions in Window class: class Window { def unboundedPreceding: Long ... def rangeBetween(start: Long, end: Long): WindowSpec def rangeBetween(start: Column, end: Column): WindowSpec } The Column version of rangeBetween was added in Spark 2.3 because the previous version (Long) could only support integral values and not time intervals. Now in order to support specifying unboundedPreceding in the rangeBetween(Column, Column) API, we added an unboundedPreceding that returns a Column in functions.scala. There are a few issues I have with the API: 1. To the end user, this can be just super confusing. Why are there two unboundedPreceding functions, in different classes, that are named the same but return different types? 2. Using Column as the parameter signature implies this can be an actual Column, but in practice rangeBetween can only accept literal values. 3. We added the new APIs to support intervals, but they don't actually work, because in the implementation we try to validate the start is less than the end, but calendar interval types are not comparable, and as a result we throw a type mismatch exception at runtime: scala.MatchError: CalendarIntervalType (of class org.apache.spark.sql.types.CalendarIntervalType$) 4. In order to make interval work, users need to create an interval using CalendarInterval, which is an internal class that has no documentation and no stable API. > Redesign window function rangeBetween API > - > > Key: SPARK-25841 > URL: https://issues.apache.org/jira/browse/SPARK-25841 > Project: Spark > Issue Type: Umbrella > Components: SQL >Affects Versions: 2.3.2, 2.4.0 >Reporter: Reynold Xin >Assignee: Reynold Xin >Priority: Major > > As I was reviewing the Spark API changes for 2.4, I found that through > organic, ad-hoc evolution the current API for window functions in Scala is > pretty bad. > > To illustrate the problem, we have two rangeBetween functions in Window > class: > > class Window { > def unboundedPreceding: Long > ... > def rangeBetween(start: Long, end: Long): WindowSpec > def rangeBetween(start: Column, end: Column): WindowSpec > } > > The Column version of rangeBetween was added in Spark 2.3 because the > previous version (Long) could only support integral values and not time > intervals. Now in order to support specifying unboundedPreceding in the > rangeBetween(Column, Column) API, we added an unboundedPreceding that returns > a Column in functions.scala. > > There are a few issues I have with the API: > > 1. To the end user, this can be just super confusing. Why are there two > unboundedPreceding functions, in different classes, that are named the same > but return different types? >
[jira] [Created] (SPARK-25496) Deprecate from_utc_timestamp and to_utc_timestamp
Reynold Xin created SPARK-25496: --- Summary: Deprecate from_utc_timestamp and to_utc_timestamp Key: SPARK-25496 URL: https://issues.apache.org/jira/browse/SPARK-25496 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.4.0 Reporter: Reynold Xin See discussions in https://issues.apache.org/jira/browse/SPARK-23715 These two functions don't really make sense given how Spark implements timestamps. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-23715) from_utc_timestamp returns incorrect results for some UTC date/time values
[ https://issues.apache.org/jira/browse/SPARK-23715?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Reynold Xin resolved SPARK-23715. - Resolution: Won't Fix Fix Version/s: (was: 2.4.0) > from_utc_timestamp returns incorrect results for some UTC date/time values > -- > > Key: SPARK-23715 > URL: https://issues.apache.org/jira/browse/SPARK-23715 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0 >Reporter: Bruce Robbins >Priority: Major > > This produces the expected answer: > {noformat} > df.select(from_utc_timestamp(lit("2018-03-13T06:18:23"), "GMT+1" > ).as("dt")).show > +---+ > | dt| > +---+ > |2018-03-13 07:18:23| > +---+ > {noformat} > However, the equivalent UTC input (but with an explicit timezone) produces a > wrong answer: > {noformat} > df.select(from_utc_timestamp(lit("2018-03-13T06:18:23+00:00"), "GMT+1" > ).as("dt")).show > +---+ > | dt| > +---+ > |2018-03-13 00:18:23| > +---+ > {noformat} > Additionally, the equivalent Unix time (1520921903, which is also > "2018-03-13T06:18:23" in the UTC time zone) produces the same wrong answer: > {noformat} > df.select(from_utc_timestamp(to_timestamp(lit(1520921903)), "GMT+1" > ).as("dt")).show > +---+ > | dt| > +---+ > |2018-03-13 00:18:23| > +---+ > {noformat} > These issues stem from the fact that the FromUTCTimestamp expression, despite > its name, expects the input to be in the user's local timezone. There is some > magic under the covers to make things work (mostly) as the user expects. > As an example, let's say a user in Los Angeles issues the following: > {noformat} > df.select(from_utc_timestamp(lit("2018-03-13T06:18:23"), "GMT+1" > ).as("dt")).show > {noformat} > FromUTCTimestamp gets as input a Timestamp (long) value representing > {noformat} > 2018-03-13T06:18:23-07:00 (long value 152094710300) > {noformat} > What FromUTCTimestamp needs instead is > {noformat} > 2018-03-13T06:18:23+00:00 (long value 152092190300) > {noformat} > So, it applies the local timezone's offset to the input timestamp to get the > correct value (152094710300 minus 7 hours is 152092190300). Then it > can process the value and produce the expected output. > When the user explicitly specifies a time zone, FromUTCTimestamp's > assumptions break down. The input is no longer in the local time zone. > Because of the way input data is implicitly casted, FromUTCTimestamp never > knows whether the input data had an explicit timezone. > Here are some gory details: > There is sometimes a mismatch in expectations between the (string => > timestamp) cast and FromUTCTimestamp. Also, since the FromUTCTimestamp > expression never sees the actual input string (the cast "intercepts" the > input and converts it to a long timestamp before FromUTCTimestamp uses the > value), FromUTCTimestamp cannot reject any input value that would exercise > this mismatch in expectations. > There is a similar mismatch in expectations in the (integer => timestamp) > cast and FromUTCTimestamp. As a result, Unix time input almost always > produces incorrect output. > h3. When things work as expected for String input: > When from_utc_timestamp is passed a string time value with no time zone, > DateTimeUtils.stringToTimestamp (called from a Cast expression) treats the > datetime string as though it's in the user's local time zone. Because > DateTimeUtils.stringToTimestamp is a general function, this is reasonable. > As a result, FromUTCTimestamp's input is a timestamp shifted by the local > time zone's offset. FromUTCTimestamp assumes this (or more accurately, a > utility function called by FromUTCTimestamp assumes this), so the first thing > it does is reverse-shift to get it back the correct value. Now that the long > value has been shifted back to the correct timestamp value, it can now > process it (by shifting it again based on the specified time zone). > h3. When things go wrong with String input: > When from_utc_timestamp is passed a string datetime value with an explicit > time zone, stringToTimestamp honors that timezone and ignores the local time > zone. stringToTimestamp does not shift the timestamp by the local timezone's > offset, but by the timezone specified on the datetime string. > Unfortunately, FromUTCTimestamp, which has no insight into the actual input > or the conversion, still assumes the timestamp is shifted by the local time > zone. So it reverse-shifts the long value by the local time zone's offset, > which produces a incorrect timestamp (except in the case where the input > datetime s
[jira] [Reopened] (SPARK-23715) from_utc_timestamp returns incorrect results for some UTC date/time values
[ https://issues.apache.org/jira/browse/SPARK-23715?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Reynold Xin reopened SPARK-23715: - Assignee: (was: Wenchen Fan) > from_utc_timestamp returns incorrect results for some UTC date/time values > -- > > Key: SPARK-23715 > URL: https://issues.apache.org/jira/browse/SPARK-23715 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0 >Reporter: Bruce Robbins >Priority: Major > > This produces the expected answer: > {noformat} > df.select(from_utc_timestamp(lit("2018-03-13T06:18:23"), "GMT+1" > ).as("dt")).show > +---+ > | dt| > +---+ > |2018-03-13 07:18:23| > +---+ > {noformat} > However, the equivalent UTC input (but with an explicit timezone) produces a > wrong answer: > {noformat} > df.select(from_utc_timestamp(lit("2018-03-13T06:18:23+00:00"), "GMT+1" > ).as("dt")).show > +---+ > | dt| > +---+ > |2018-03-13 00:18:23| > +---+ > {noformat} > Additionally, the equivalent Unix time (1520921903, which is also > "2018-03-13T06:18:23" in the UTC time zone) produces the same wrong answer: > {noformat} > df.select(from_utc_timestamp(to_timestamp(lit(1520921903)), "GMT+1" > ).as("dt")).show > +---+ > | dt| > +---+ > |2018-03-13 00:18:23| > +---+ > {noformat} > These issues stem from the fact that the FromUTCTimestamp expression, despite > its name, expects the input to be in the user's local timezone. There is some > magic under the covers to make things work (mostly) as the user expects. > As an example, let's say a user in Los Angeles issues the following: > {noformat} > df.select(from_utc_timestamp(lit("2018-03-13T06:18:23"), "GMT+1" > ).as("dt")).show > {noformat} > FromUTCTimestamp gets as input a Timestamp (long) value representing > {noformat} > 2018-03-13T06:18:23-07:00 (long value 152094710300) > {noformat} > What FromUTCTimestamp needs instead is > {noformat} > 2018-03-13T06:18:23+00:00 (long value 152092190300) > {noformat} > So, it applies the local timezone's offset to the input timestamp to get the > correct value (152094710300 minus 7 hours is 152092190300). Then it > can process the value and produce the expected output. > When the user explicitly specifies a time zone, FromUTCTimestamp's > assumptions break down. The input is no longer in the local time zone. > Because of the way input data is implicitly casted, FromUTCTimestamp never > knows whether the input data had an explicit timezone. > Here are some gory details: > There is sometimes a mismatch in expectations between the (string => > timestamp) cast and FromUTCTimestamp. Also, since the FromUTCTimestamp > expression never sees the actual input string (the cast "intercepts" the > input and converts it to a long timestamp before FromUTCTimestamp uses the > value), FromUTCTimestamp cannot reject any input value that would exercise > this mismatch in expectations. > There is a similar mismatch in expectations in the (integer => timestamp) > cast and FromUTCTimestamp. As a result, Unix time input almost always > produces incorrect output. > h3. When things work as expected for String input: > When from_utc_timestamp is passed a string time value with no time zone, > DateTimeUtils.stringToTimestamp (called from a Cast expression) treats the > datetime string as though it's in the user's local time zone. Because > DateTimeUtils.stringToTimestamp is a general function, this is reasonable. > As a result, FromUTCTimestamp's input is a timestamp shifted by the local > time zone's offset. FromUTCTimestamp assumes this (or more accurately, a > utility function called by FromUTCTimestamp assumes this), so the first thing > it does is reverse-shift to get it back the correct value. Now that the long > value has been shifted back to the correct timestamp value, it can now > process it (by shifting it again based on the specified time zone). > h3. When things go wrong with String input: > When from_utc_timestamp is passed a string datetime value with an explicit > time zone, stringToTimestamp honors that timezone and ignores the local time > zone. stringToTimestamp does not shift the timestamp by the local timezone's > offset, but by the timezone specified on the datetime string. > Unfortunately, FromUTCTimestamp, which has no insight into the actual input > or the conversion, still assumes the timestamp is shifted by the local time > zone. So it reverse-shifts the long value by the local time zone's offset, > which produces a incorrect timestamp (except in the case where the input > datetime string just happened to hav
[jira] [Commented] (SPARK-23715) from_utc_timestamp returns incorrect results for some UTC date/time values
[ https://issues.apache.org/jira/browse/SPARK-23715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16622953#comment-16622953 ] Reynold Xin commented on SPARK-23715: - the current behavior is that it only takes a timestamp type data right? if it is a string one, it gets cast to timestamp following cast's semantics. > from_utc_timestamp returns incorrect results for some UTC date/time values > -- > > Key: SPARK-23715 > URL: https://issues.apache.org/jira/browse/SPARK-23715 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0 >Reporter: Bruce Robbins >Assignee: Wenchen Fan >Priority: Major > Fix For: 2.4.0 > > > This produces the expected answer: > {noformat} > df.select(from_utc_timestamp(lit("2018-03-13T06:18:23"), "GMT+1" > ).as("dt")).show > +---+ > | dt| > +---+ > |2018-03-13 07:18:23| > +---+ > {noformat} > However, the equivalent UTC input (but with an explicit timezone) produces a > wrong answer: > {noformat} > df.select(from_utc_timestamp(lit("2018-03-13T06:18:23+00:00"), "GMT+1" > ).as("dt")).show > +---+ > | dt| > +---+ > |2018-03-13 00:18:23| > +---+ > {noformat} > Additionally, the equivalent Unix time (1520921903, which is also > "2018-03-13T06:18:23" in the UTC time zone) produces the same wrong answer: > {noformat} > df.select(from_utc_timestamp(to_timestamp(lit(1520921903)), "GMT+1" > ).as("dt")).show > +---+ > | dt| > +---+ > |2018-03-13 00:18:23| > +---+ > {noformat} > These issues stem from the fact that the FromUTCTimestamp expression, despite > its name, expects the input to be in the user's local timezone. There is some > magic under the covers to make things work (mostly) as the user expects. > As an example, let's say a user in Los Angeles issues the following: > {noformat} > df.select(from_utc_timestamp(lit("2018-03-13T06:18:23"), "GMT+1" > ).as("dt")).show > {noformat} > FromUTCTimestamp gets as input a Timestamp (long) value representing > {noformat} > 2018-03-13T06:18:23-07:00 (long value 152094710300) > {noformat} > What FromUTCTimestamp needs instead is > {noformat} > 2018-03-13T06:18:23+00:00 (long value 152092190300) > {noformat} > So, it applies the local timezone's offset to the input timestamp to get the > correct value (152094710300 minus 7 hours is 152092190300). Then it > can process the value and produce the expected output. > When the user explicitly specifies a time zone, FromUTCTimestamp's > assumptions break down. The input is no longer in the local time zone. > Because of the way input data is implicitly casted, FromUTCTimestamp never > knows whether the input data had an explicit timezone. > Here are some gory details: > There is sometimes a mismatch in expectations between the (string => > timestamp) cast and FromUTCTimestamp. Also, since the FromUTCTimestamp > expression never sees the actual input string (the cast "intercepts" the > input and converts it to a long timestamp before FromUTCTimestamp uses the > value), FromUTCTimestamp cannot reject any input value that would exercise > this mismatch in expectations. > There is a similar mismatch in expectations in the (integer => timestamp) > cast and FromUTCTimestamp. As a result, Unix time input almost always > produces incorrect output. > h3. When things work as expected for String input: > When from_utc_timestamp is passed a string time value with no time zone, > DateTimeUtils.stringToTimestamp (called from a Cast expression) treats the > datetime string as though it's in the user's local time zone. Because > DateTimeUtils.stringToTimestamp is a general function, this is reasonable. > As a result, FromUTCTimestamp's input is a timestamp shifted by the local > time zone's offset. FromUTCTimestamp assumes this (or more accurately, a > utility function called by FromUTCTimestamp assumes this), so the first thing > it does is reverse-shift to get it back the correct value. Now that the long > value has been shifted back to the correct timestamp value, it can now > process it (by shifting it again based on the specified time zone). > h3. When things go wrong with String input: > When from_utc_timestamp is passed a string datetime value with an explicit > time zone, stringToTimestamp honors that timezone and ignores the local time > zone. stringToTimestamp does not shift the timestamp by the local timezone's > offset, but by the timezone specified on the datetime string. > Unfortunately, FromUTCTimestamp, which has no insight into the actual input > or the conversion, still assumes the