[GitHub] spark issue #23132: [SPARK-26163][SQL] Parsing decimals from JSON using loca...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/23132 `spark.sql.legacy.decimalParsing.enabled` is still shown in the PR description and commit messages. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/23124#discussion_r240104815 --- Diff: docs/sql-migration-guide-upgrade.md --- @@ -27,6 +27,8 @@ displayTitle: Spark SQL Upgrading Guide - In Spark version 2.4 and earlier, float/double -0.0 is semantically equal to 0.0, but users can still distinguish them via `Dataset.show`, `Dataset.collect` etc. Since Spark 3.0, float/double -0.0 is replaced by 0.0 internally, and users can't distinguish them any more. + - In Spark version 2.4 and earlier, users can create a map with duplicated keys via built-in functions like `CreateMap`, `StringToMap`, etc. The behavior of map with duplicated keys is undefined, e.g. map look up respects the duplicated key appears first, `Dataset.collect` only keeps the duplicated key appears last, `MapKeys` returns duplicated keys, etc. Since Spark 3.0, these built-in functions will remove duplicated map keys with last wins policy. Users may still read map values with duplicated keys from data sources which do not enforce it (e.g. Parquet), the behavior will be udefined. --- End diff -- A few typos. How about? ``` In Spark version 2.4 and earlier, users can create a map with duplicate keys via built-in functions like `CreateMap` and `StringToMap`. The behavior of map with duplicate keys is undefined. For example, the map lookup respects the duplicate key that appears first, `Dataset.collect` only keeps the duplicate key that appears last, and `MapKeys` returns duplicate keys. Since Spark 3.0, these built-in functions will remove duplicate map keys using the last-one-wins policy. Users may still read map values with duplicate keys from the data sources that do not enforce it (e.g. Parquet), but the behavior will be undefined. ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23268: [Hive][Minor] Refactor on HiveShim and Add Unit Tests
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/23268 The existing way is too JAVA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23268: [Hive][Minor] Refactor on HiveShim and Add Unit Tests
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/23268 @sadhen What is the motivation of this PR? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23248: [SPARK-26293][SQL] Cast exception when having python udf...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/23248 LGTM to the surgical fix for backporting. We need to fix this rule with the other rules for avoiding making such a strong and hidden assumption. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23248: [SPARK-26293][SQL] Cast exception when having pyt...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/23248#discussion_r240079120 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala --- @@ -131,8 +131,20 @@ object ExtractPythonUDFs extends Rule[LogicalPlan] with PredicateHelper { expressions.flatMap(collectEvaluableUDFs) } - def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { -case plan: LogicalPlan => extract(plan) + def apply(plan: LogicalPlan): LogicalPlan = plan match { +// SPARK-26293: A subquery will be rewritten into join later, and will go through this rule +// eventually. Here we skip subquery, as Python UDF only needs to be extracted once. +case _: Subquery => plan --- End diff -- Basically, we want to ensure this rule is running once and only once. In the future, if we have another rule/function that calls Optimizer.this.execute(plan), this rule needs to be fixed again... We have a very strong hidden assumption in the implementation. This looks risky in the long term. The current fix is fine for backporting to 2.4. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23255: [SPARK-26307] [SQL] Fix CTAS when INSERT a partitioned t...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/23255 For the reviewers, we need to merge this to 2.3, 2.4 and master. Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22514: [SPARK-25271][SQL] Hive ctas commands should use ...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/22514#discussion_r239895028 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala --- @@ -95,9 +77,116 @@ case class CreateHiveTableAsSelectCommand( Seq.empty[Row] } + // Returns `DataWritingCommand` used to write data when the table exists. + def writingCommandForExistingTable( +catalog: SessionCatalog, +tableDesc: CatalogTable): DataWritingCommand + + // Returns `DataWritingCommand` used to write data when the table doesn't exist. + def writingCommandForNewTable( +catalog: SessionCatalog, +tableDesc: CatalogTable): DataWritingCommand + override def argString: String = { s"[Database:${tableDesc.database}, " + s"TableName: ${tableDesc.identifier.table}, " + s"InsertIntoHiveTable]" } } + +/** + * Create table and insert the query result into it. + * + * @param tableDesc the table description, which may contain serde, storage handler etc. + * @param query the query whose result will be insert into the new relation + * @param mode SaveMode + */ +case class CreateHiveTableAsSelectCommand( +tableDesc: CatalogTable, +query: LogicalPlan, +outputColumnNames: Seq[String], +mode: SaveMode) + extends CreateHiveTableAsSelectBase { + + override def writingCommandForExistingTable( + catalog: SessionCatalog, + tableDesc: CatalogTable): DataWritingCommand = { +InsertIntoHiveTable( + tableDesc, + Map.empty, --- End diff -- https://github.com/apache/spark/pull/23255 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23255: [SPARK-26307] [SQL] Fix CTAS when INSERT a partit...
GitHub user gatorsmile opened a pull request: https://github.com/apache/spark/pull/23255 [SPARK-26307] [SQL] Fix CTAS when INSERT a partitioned table using Hive serde ## What changes were proposed in this pull request? This was a Spark 2.3 regression introduced in https://github.com/apache/spark/pull/20521. We should add the partition info for InsertIntoHiveTable in CreateHiveTableAsSelectCommand. Otherwise, we will hit the following error by running the newly added test case: ``` [info] - CTAS: INSERT a partitioned table using Hive serde *** FAILED *** (829 milliseconds) [info] org.apache.spark.SparkException: Requested partitioning does not match the tab1 table: [info] Requested partitions: [info] Table partitions: part [info] at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.processInsert(InsertIntoHiveTable.scala:179) [info] at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.run(InsertIntoHiveTable.scala:107) ``` ## How was this patch tested? Added a test case. You can merge this pull request into a Git repository by running: $ git pull https://github.com/gatorsmile/spark fixCTAS Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/23255.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #23255 commit 921967b369a8e48269da0825278a7e1095b9173e Author: gatorsmile Date: 2018-12-07T17:59:36Z fix. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22514: [SPARK-25271][SQL] Hive ctas commands should use ...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/22514#discussion_r239887300 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala --- @@ -95,9 +77,116 @@ case class CreateHiveTableAsSelectCommand( Seq.empty[Row] } + // Returns `DataWritingCommand` used to write data when the table exists. + def writingCommandForExistingTable( +catalog: SessionCatalog, +tableDesc: CatalogTable): DataWritingCommand + + // Returns `DataWritingCommand` used to write data when the table doesn't exist. + def writingCommandForNewTable( +catalog: SessionCatalog, +tableDesc: CatalogTable): DataWritingCommand + override def argString: String = { s"[Database:${tableDesc.database}, " + s"TableName: ${tableDesc.identifier.table}, " + s"InsertIntoHiveTable]" } } + +/** + * Create table and insert the query result into it. + * + * @param tableDesc the table description, which may contain serde, storage handler etc. + * @param query the query whose result will be insert into the new relation + * @param mode SaveMode + */ +case class CreateHiveTableAsSelectCommand( +tableDesc: CatalogTable, +query: LogicalPlan, +outputColumnNames: Seq[String], +mode: SaveMode) + extends CreateHiveTableAsSelectBase { + + override def writingCommandForExistingTable( + catalog: SessionCatalog, + tableDesc: CatalogTable): DataWritingCommand = { +InsertIntoHiveTable( + tableDesc, + Map.empty, --- End diff -- Please hold this PR. We need to fix the regression first. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22514: [SPARK-25271][SQL] Hive ctas commands should use ...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/22514#discussion_r239725957 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala --- @@ -95,9 +77,116 @@ case class CreateHiveTableAsSelectCommand( Seq.empty[Row] } + // Returns `DataWritingCommand` used to write data when the table exists. + def writingCommandForExistingTable( +catalog: SessionCatalog, +tableDesc: CatalogTable): DataWritingCommand + + // Returns `DataWritingCommand` used to write data when the table doesn't exist. + def writingCommandForNewTable( +catalog: SessionCatalog, +tableDesc: CatalogTable): DataWritingCommand + override def argString: String = { s"[Database:${tableDesc.database}, " + s"TableName: ${tableDesc.identifier.table}, " + s"InsertIntoHiveTable]" } } + +/** + * Create table and insert the query result into it. + * + * @param tableDesc the table description, which may contain serde, storage handler etc. + * @param query the query whose result will be insert into the new relation + * @param mode SaveMode + */ +case class CreateHiveTableAsSelectCommand( +tableDesc: CatalogTable, +query: LogicalPlan, +outputColumnNames: Seq[String], +mode: SaveMode) + extends CreateHiveTableAsSelectBase { + + override def writingCommandForExistingTable( + catalog: SessionCatalog, + tableDesc: CatalogTable): DataWritingCommand = { +InsertIntoHiveTable( + tableDesc, + Map.empty, --- End diff -- Can we fix it in this PR? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22104: [SPARK-24721][SQL] Extract Python UDFs at the end...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/22104#discussion_r239722680 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala --- @@ -31,7 +31,8 @@ class SparkOptimizer( override def defaultBatches: Seq[Batch] = (preOptimizationBatches ++ super.defaultBatches :+ Batch("Optimize Metadata Only Query", Once, OptimizeMetadataOnlyQuery(catalog)) :+ -Batch("Extract Python UDF from Aggregate", Once, ExtractPythonUDFFromAggregate) :+ +Batch("Extract Python UDFs", Once, + Seq(ExtractPythonUDFFromAggregate, ExtractPythonUDFs): _*) :+ --- End diff -- It looks weird to add this rule in our optimizer batch. We need at least some comments to explain the reason in the code. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23245: [SPARK-26060][SQL][FOLLOW-UP] Rename the config n...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/23245#discussion_r239574982 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -1612,7 +1612,7 @@ object SQLConf { .createWithDefault(25) val SET_COMMAND_REJECTS_SPARK_CONFS = -buildConf("spark.sql.legacy.execution.setCommandRejectsSparkConfs") +buildConf("spark.sql.legacy.setCommandRejectsSparkConfs") --- End diff -- spark.sql.legacy.setCommandRejectsSparkCoreConfs --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22514: [SPARK-25271][SQL] Hive ctas commands should use ...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/22514#discussion_r239539992 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala --- @@ -95,9 +77,116 @@ case class CreateHiveTableAsSelectCommand( Seq.empty[Row] } + // Returns `DataWritingCommand` used to write data when the table exists. + def writingCommandForExistingTable( +catalog: SessionCatalog, +tableDesc: CatalogTable): DataWritingCommand + + // Returns `DataWritingCommand` used to write data when the table doesn't exist. + def writingCommandForNewTable( +catalog: SessionCatalog, +tableDesc: CatalogTable): DataWritingCommand + override def argString: String = { s"[Database:${tableDesc.database}, " + s"TableName: ${tableDesc.identifier.table}, " + s"InsertIntoHiveTable]" } } + +/** + * Create table and insert the query result into it. + * + * @param tableDesc the table description, which may contain serde, storage handler etc. + * @param query the query whose result will be insert into the new relation + * @param mode SaveMode + */ +case class CreateHiveTableAsSelectCommand( +tableDesc: CatalogTable, +query: LogicalPlan, +outputColumnNames: Seq[String], +mode: SaveMode) + extends CreateHiveTableAsSelectBase { + + override def writingCommandForExistingTable( + catalog: SessionCatalog, + tableDesc: CatalogTable): DataWritingCommand = { +InsertIntoHiveTable( + tableDesc, + Map.empty, --- End diff -- Does this work? ``` withTable("hive_test") { withSQLConf( "hive.exec.dynamic.partition.mode" -> "nonstrict") { val df = Seq(("a", 100)).toDF("part", "id") df.write.format("hive").partitionBy("part") .mode("overwrite").saveAsTable("hive_test") df.write.format("hive").partitionBy("part") .mode("append").saveAsTable("hive_test") } } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23243: [SPARK-26288][ExternalShuffleService]add initRegisteredE...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/23243 cc @jiangxb1987 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22514: [SPARK-25271][SQL] Hive ctas commands should use ...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/22514#discussion_r239319889 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala --- @@ -95,9 +77,116 @@ case class CreateHiveTableAsSelectCommand( Seq.empty[Row] } + // Returns `DataWritingCommand` used to write data when the table exists. + def writingCommandForExistingTable( +catalog: SessionCatalog, +tableDesc: CatalogTable): DataWritingCommand + + // Returns `DataWritingCommand` used to write data when the table doesn't exist. + def writingCommandForNewTable( +catalog: SessionCatalog, +tableDesc: CatalogTable): DataWritingCommand + override def argString: String = { s"[Database:${tableDesc.database}, " + s"TableName: ${tableDesc.identifier.table}, " + s"InsertIntoHiveTable]" } } + +/** + * Create table and insert the query result into it. + * + * @param tableDesc the table description, which may contain serde, storage handler etc. + * @param query the query whose result will be insert into the new relation + * @param mode SaveMode + */ +case class CreateHiveTableAsSelectCommand( +tableDesc: CatalogTable, +query: LogicalPlan, +outputColumnNames: Seq[String], +mode: SaveMode) + extends CreateHiveTableAsSelectBase { + + override def writingCommandForExistingTable( + catalog: SessionCatalog, + tableDesc: CatalogTable): DataWritingCommand = { +InsertIntoHiveTable( + tableDesc, + Map.empty, --- End diff -- Could you point it out? I want to ensure it is covered --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22514: [SPARK-25271][SQL] Hive ctas commands should use ...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/22514#discussion_r239264673 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala --- @@ -181,62 +180,39 @@ case class RelationConversions( conf: SQLConf, sessionCatalog: HiveSessionCatalog) extends Rule[LogicalPlan] { private def isConvertible(relation: HiveTableRelation): Boolean = { -val serde = relation.tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT) -serde.contains("parquet") && conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET) || - serde.contains("orc") && conf.getConf(HiveUtils.CONVERT_METASTORE_ORC) +isConvertible(relation.tableMeta) } - // Return true for Apache ORC and Hive ORC-related configuration names. - // Note that Spark doesn't support configurations like `hive.merge.orcfile.stripe.level`. - private def isOrcProperty(key: String) = -key.startsWith("orc.") || key.contains(".orc.") - - private def isParquetProperty(key: String) = -key.startsWith("parquet.") || key.contains(".parquet.") - - private def convert(relation: HiveTableRelation): LogicalRelation = { -val serde = relation.tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT) - -// Consider table and storage properties. For properties existing in both sides, storage -// properties will supersede table properties. -if (serde.contains("parquet")) { - val options = relation.tableMeta.properties.filterKeys(isParquetProperty) ++ -relation.tableMeta.storage.properties + (ParquetOptions.MERGE_SCHEMA -> - conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING).toString) - sessionCatalog.metastoreCatalog -.convertToLogicalRelation(relation, options, classOf[ParquetFileFormat], "parquet") -} else { - val options = relation.tableMeta.properties.filterKeys(isOrcProperty) ++ -relation.tableMeta.storage.properties - if (conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "native") { -sessionCatalog.metastoreCatalog.convertToLogicalRelation( - relation, - options, - classOf[org.apache.spark.sql.execution.datasources.orc.OrcFileFormat], - "orc") - } else { -sessionCatalog.metastoreCatalog.convertToLogicalRelation( - relation, - options, - classOf[org.apache.spark.sql.hive.orc.OrcFileFormat], - "orc") - } -} + private def isConvertible(tableMeta: CatalogTable): Boolean = { +val serde = tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT) +serde.contains("parquet") && SQLConf.get.getConf(HiveUtils.CONVERT_METASTORE_PARQUET) || + serde.contains("orc") && SQLConf.get.getConf(HiveUtils.CONVERT_METASTORE_ORC) } + private val metastoreCatalog = sessionCatalog.metastoreCatalog + override def apply(plan: LogicalPlan): LogicalPlan = { plan resolveOperators { // Write path case InsertIntoTable(r: HiveTableRelation, partition, query, overwrite, ifPartitionNotExists) // Inserting into partitioned table is not supported in Parquet/Orc data source (yet). if query.resolved && DDLUtils.isHiveTable(r.tableMeta) && !r.isPartitioned && isConvertible(r) => -InsertIntoTable(convert(r), partition, query, overwrite, ifPartitionNotExists) +InsertIntoTable(metastoreCatalog.convert(r), partition, + query, overwrite, ifPartitionNotExists) // Read path case relation: HiveTableRelation if DDLUtils.isHiveTable(relation.tableMeta) && isConvertible(relation) => -convert(relation) +metastoreCatalog.convert(relation) + + // CTAS + case CreateTable(tableDesc, mode, Some(query)) + if DDLUtils.isHiveTable(tableDesc) && tableDesc.partitionColumnNames.isEmpty && +isConvertible(tableDesc) => --- End diff -- This is not for perf optimization only. This is using different write paths. Thus, we could have different limits/bugs that might be exposed after this code change. We just let the community aware of this change. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22514: [SPARK-25271][SQL] Hive ctas commands should use ...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/22514#discussion_r239264207 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala --- @@ -95,9 +77,116 @@ case class CreateHiveTableAsSelectCommand( Seq.empty[Row] } + // Returns `DataWritingCommand` used to write data when the table exists. + def writingCommandForExistingTable( +catalog: SessionCatalog, +tableDesc: CatalogTable): DataWritingCommand + + // Returns `DataWritingCommand` used to write data when the table doesn't exist. + def writingCommandForNewTable( +catalog: SessionCatalog, +tableDesc: CatalogTable): DataWritingCommand + override def argString: String = { s"[Database:${tableDesc.database}, " + s"TableName: ${tableDesc.identifier.table}, " + s"InsertIntoHiveTable]" } } + +/** + * Create table and insert the query result into it. + * + * @param tableDesc the table description, which may contain serde, storage handler etc. + * @param query the query whose result will be insert into the new relation + * @param mode SaveMode + */ +case class CreateHiveTableAsSelectCommand( +tableDesc: CatalogTable, +query: LogicalPlan, +outputColumnNames: Seq[String], +mode: SaveMode) + extends CreateHiveTableAsSelectBase { + + override def writingCommandForExistingTable( + catalog: SessionCatalog, + tableDesc: CatalogTable): DataWritingCommand = { +InsertIntoHiveTable( + tableDesc, + Map.empty, --- End diff -- Why this is empty? Do we have a test case for partitioning tables? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23222: [SPARK-20636] Add the rule TransposeWindow to the optimi...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/23222 We can compare the plans and see whether the rule takes an effect. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22514: [SPARK-25271][SQL] Hive ctas commands should use ...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/22514#discussion_r238933039 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala --- @@ -181,62 +180,39 @@ case class RelationConversions( conf: SQLConf, sessionCatalog: HiveSessionCatalog) extends Rule[LogicalPlan] { private def isConvertible(relation: HiveTableRelation): Boolean = { -val serde = relation.tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT) -serde.contains("parquet") && conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET) || - serde.contains("orc") && conf.getConf(HiveUtils.CONVERT_METASTORE_ORC) +isConvertible(relation.tableMeta) } - // Return true for Apache ORC and Hive ORC-related configuration names. - // Note that Spark doesn't support configurations like `hive.merge.orcfile.stripe.level`. - private def isOrcProperty(key: String) = -key.startsWith("orc.") || key.contains(".orc.") - - private def isParquetProperty(key: String) = -key.startsWith("parquet.") || key.contains(".parquet.") - - private def convert(relation: HiveTableRelation): LogicalRelation = { -val serde = relation.tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT) - -// Consider table and storage properties. For properties existing in both sides, storage -// properties will supersede table properties. -if (serde.contains("parquet")) { - val options = relation.tableMeta.properties.filterKeys(isParquetProperty) ++ -relation.tableMeta.storage.properties + (ParquetOptions.MERGE_SCHEMA -> - conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING).toString) - sessionCatalog.metastoreCatalog -.convertToLogicalRelation(relation, options, classOf[ParquetFileFormat], "parquet") -} else { - val options = relation.tableMeta.properties.filterKeys(isOrcProperty) ++ -relation.tableMeta.storage.properties - if (conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "native") { -sessionCatalog.metastoreCatalog.convertToLogicalRelation( - relation, - options, - classOf[org.apache.spark.sql.execution.datasources.orc.OrcFileFormat], - "orc") - } else { -sessionCatalog.metastoreCatalog.convertToLogicalRelation( - relation, - options, - classOf[org.apache.spark.sql.hive.orc.OrcFileFormat], - "orc") - } -} + private def isConvertible(tableMeta: CatalogTable): Boolean = { +val serde = tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT) +serde.contains("parquet") && SQLConf.get.getConf(HiveUtils.CONVERT_METASTORE_PARQUET) || + serde.contains("orc") && SQLConf.get.getConf(HiveUtils.CONVERT_METASTORE_ORC) } + private val metastoreCatalog = sessionCatalog.metastoreCatalog + override def apply(plan: LogicalPlan): LogicalPlan = { plan resolveOperators { // Write path case InsertIntoTable(r: HiveTableRelation, partition, query, overwrite, ifPartitionNotExists) // Inserting into partitioned table is not supported in Parquet/Orc data source (yet). if query.resolved && DDLUtils.isHiveTable(r.tableMeta) && !r.isPartitioned && isConvertible(r) => -InsertIntoTable(convert(r), partition, query, overwrite, ifPartitionNotExists) +InsertIntoTable(metastoreCatalog.convert(r), partition, + query, overwrite, ifPartitionNotExists) // Read path case relation: HiveTableRelation if DDLUtils.isHiveTable(relation.tableMeta) && isConvertible(relation) => -convert(relation) +metastoreCatalog.convert(relation) + + // CTAS + case CreateTable(tableDesc, mode, Some(query)) + if DDLUtils.isHiveTable(tableDesc) && tableDesc.partitionColumnNames.isEmpty && +isConvertible(tableDesc) => --- End diff -- Since the regression was already introduced, we need to add a conf and migration guide. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23222: [SPARK-20636] Add the rule TransposeWindow to the optimi...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/23222 cc @ptkool @jiangxb1987 @cloud-fan --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23222: [SPARK-20636] Add the rule TransposeWindow to the...
GitHub user gatorsmile opened a pull request: https://github.com/apache/spark/pull/23222 [SPARK-20636] Add the rule TransposeWindow to the optimization batch ## What changes were proposed in this pull request? This PR is a follow-up of the PR https://github.com/apache/spark/pull/17899. It is to add the rule the optimizer batch. ## How was this patch tested? The existing tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/gatorsmile/spark followupSPARK-20636 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/23222.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #23222 commit 1270e89026d80c862137c03edbeee53e56f3ed6d Author: gatorsmile Date: 2018-12-05T05:07:00Z add the rule TransposeWindow to the batch --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22514: [SPARK-25271][SQL] Hive ctas commands should use ...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/22514#discussion_r238871523 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala --- @@ -181,62 +180,39 @@ case class RelationConversions( conf: SQLConf, sessionCatalog: HiveSessionCatalog) extends Rule[LogicalPlan] { private def isConvertible(relation: HiveTableRelation): Boolean = { -val serde = relation.tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT) -serde.contains("parquet") && conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET) || - serde.contains("orc") && conf.getConf(HiveUtils.CONVERT_METASTORE_ORC) +isConvertible(relation.tableMeta) } - // Return true for Apache ORC and Hive ORC-related configuration names. - // Note that Spark doesn't support configurations like `hive.merge.orcfile.stripe.level`. - private def isOrcProperty(key: String) = -key.startsWith("orc.") || key.contains(".orc.") - - private def isParquetProperty(key: String) = -key.startsWith("parquet.") || key.contains(".parquet.") - - private def convert(relation: HiveTableRelation): LogicalRelation = { -val serde = relation.tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT) - -// Consider table and storage properties. For properties existing in both sides, storage -// properties will supersede table properties. -if (serde.contains("parquet")) { - val options = relation.tableMeta.properties.filterKeys(isParquetProperty) ++ -relation.tableMeta.storage.properties + (ParquetOptions.MERGE_SCHEMA -> - conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING).toString) - sessionCatalog.metastoreCatalog -.convertToLogicalRelation(relation, options, classOf[ParquetFileFormat], "parquet") -} else { - val options = relation.tableMeta.properties.filterKeys(isOrcProperty) ++ -relation.tableMeta.storage.properties - if (conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "native") { -sessionCatalog.metastoreCatalog.convertToLogicalRelation( - relation, - options, - classOf[org.apache.spark.sql.execution.datasources.orc.OrcFileFormat], - "orc") - } else { -sessionCatalog.metastoreCatalog.convertToLogicalRelation( - relation, - options, - classOf[org.apache.spark.sql.hive.orc.OrcFileFormat], - "orc") - } -} + private def isConvertible(tableMeta: CatalogTable): Boolean = { +val serde = tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT) +serde.contains("parquet") && SQLConf.get.getConf(HiveUtils.CONVERT_METASTORE_PARQUET) || + serde.contains("orc") && SQLConf.get.getConf(HiveUtils.CONVERT_METASTORE_ORC) } + private val metastoreCatalog = sessionCatalog.metastoreCatalog + override def apply(plan: LogicalPlan): LogicalPlan = { plan resolveOperators { // Write path case InsertIntoTable(r: HiveTableRelation, partition, query, overwrite, ifPartitionNotExists) // Inserting into partitioned table is not supported in Parquet/Orc data source (yet). if query.resolved && DDLUtils.isHiveTable(r.tableMeta) && !r.isPartitioned && isConvertible(r) => -InsertIntoTable(convert(r), partition, query, overwrite, ifPartitionNotExists) +InsertIntoTable(metastoreCatalog.convert(r), partition, + query, overwrite, ifPartitionNotExists) // Read path case relation: HiveTableRelation if DDLUtils.isHiveTable(relation.tableMeta) && isConvertible(relation) => -convert(relation) +metastoreCatalog.convert(relation) + + // CTAS + case CreateTable(tableDesc, mode, Some(query)) + if DDLUtils.isHiveTable(tableDesc) && tableDesc.partitionColumnNames.isEmpty && +isConvertible(tableDesc) => --- End diff -- Add an internal SQL conf here? The perf impact is huge. It could be better or worse. Also add it to the migration guide and explain the behavior changes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22721: [SPARK-19784][SPARK-25403][SQL] Refresh the table...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/22721#discussion_r238864486 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala --- @@ -45,6 +45,8 @@ object CommandUtils extends Logging { } else { catalog.alterTableStats(table.identifier, None) } +} else { + catalog.refreshTable(table.identifier) --- End diff -- Could we move this to the DDLs that we need to refresh the table? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22899: [SPARK-25573] Combine resolveExpression and resolve in t...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/22899 LGTM Merged to master. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22899: [SPARK-25573] Combine resolveExpression and resolve in t...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/22899 To be honest, we might still need to revisit it since it is still very confusing to the developer which one they should use, top-down? or bottom-up? The current use case for top-down is majorly for resolving the higher order functions. This PR at least improves the description. We might need to combine them in the future. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23037: [SPARK-26083][k8s] Add Copy pyspark into corresponding d...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/23037 @vanzin Could you create a JIRA for this flaky test? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22899: [SPARK-25573] Combine resolveExpression and resol...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/22899#discussion_r238483571 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -880,21 +880,38 @@ class Analyzer( } } -private def resolve(e: Expression, q: LogicalPlan): Expression = e match { - case f: LambdaFunction if !f.bound => f - case u @ UnresolvedAttribute(nameParts) => -// Leave unchanged if resolution fails. Hopefully will be resolved next round. -val result = - withPosition(u) { -q.resolveChildren(nameParts, resolver) - .orElse(resolveLiteralFunction(nameParts, u, q)) - .getOrElse(u) - } -logDebug(s"Resolving $u to $result") -result - case UnresolvedExtractValue(child, fieldExpr) if child.resolved => -ExtractValue(child, fieldExpr, resolver) - case _ => e.mapChildren(resolve(_, q)) +/** + * Resolves the attribute and extract value expressions(s) by traversing the + * input expression in top down manner. The traversal is done in top-down manner as + * we need to skip over unbound lamda function expression. The lamda expressions are + * resolved in a different rule [[ResolveLambdaVariables]] + * + * Example : + * SELECT transform(array(1, 2, 3), (x, i) -> x + i)" + * + * In the case above, x and i are resolved as lamda variables in [[ResolveLambdaVariables]] + * + * Note : In this routine, the unresolved attributes are resolved from the input plan's + * children attributes. + */ +private def resolveExpressionTopDown(e: Expression, q: LogicalPlan): Expression = { + if (e.resolved) return e --- End diff -- A good catch! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22899: [SPARK-25573] Combine resolveExpression and resolve in t...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/22899 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22857: [SPARK-25860][SQL] Replace Literal(null, _) with ...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/22857#discussion_r238450750 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala --- @@ -31,14 +31,14 @@ import org.apache.spark.scheduler.{SparkListener, SparkListenerJobEnd} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.expressions.Uuid import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation -import org.apache.spark.sql.catalyst.plans.logical.{Filter, OneRowRelation, Union} +import org.apache.spark.sql.catalyst.plans.logical.{OneRowRelation, Union} --- End diff -- BTW, please do not remove these in a huge feature PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23163: [SPARK-26164][SQL] Allow FileFormatWriter to write multi...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/23163 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23197: [SPARK-26165][Optimizer] Filter Query Date and Timestamp...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/23197 Thank you for your contributions. Could you please close the PR? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23197: [SPARK-26165][Optimizer] Filter Query Date and Ti...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/23197#discussion_r238110847 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala --- @@ -119,14 +121,26 @@ object TypeCoercion { * other is a Timestamp by making the target type to be String. */ private def findCommonTypeForBinaryComparison( - dt1: DataType, dt2: DataType, conf: SQLConf): Option[DataType] = (dt1, dt2) match { -// We should cast all relative timestamp/date/string comparison into string comparisons -// This behaves as a user would expect because timestamp strings sort lexicographically. -// i.e. TimeStamp(2013-01-01 00:00 ...) < "2014" = true + left: Expression, + right: Expression, + conf: SQLConf): Option[DataType] = (left.dataType, right.dataType) match { +// We should cast all relative timestamp/date/string comparison into string comparisons only if +// the particular literal value cannot been converted into a valid Timestamp/Date. If the value +// can be converted into a valid TimeStamp/Date then we cast the right side literal value to +// 'Timestamp'/'Date', for more details refer the description provided in stringToTimestamp() +// method. --- End diff -- We will not change the existing Type Coercion rules in the current stage. We plan to re-visit the whole type coercion rules by following some other systems, e.g., PostgreSQL --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23010: [SPARK-26012][SQL]Null and '' values should not cause dy...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/23010 cc @cloud-fan --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23196: [SPARK-26243][SQL] Use java.time API for parsing ...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/23196#discussion_r238110375 --- Diff: docs/sql-migration-guide-upgrade.md --- @@ -33,6 +33,8 @@ displayTitle: Spark SQL Upgrading Guide - Spark applications which are built with Spark version 2.4 and prior, and call methods of `UserDefinedFunction`, need to be re-compiled with Spark 3.0, as they are not binary compatible with Spark 3.0. + - Since Spark 3.0, JSON datasource uses java.time API for parsing and generating JSON content. New formatting implementation supports date/timestamp patterns conformed to ISO 8601. To switch back to the implementation used in Spark 2.4 and earlier, set `spark.sql.legacy.timeParser.enabled` to `true`. --- End diff -- The impact is not clearly documented. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23196: [SPARK-26243][SQL] Use java.time API for parsing ...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/23196#discussion_r238110416 --- Diff: docs/sql-migration-guide-upgrade.md --- @@ -33,6 +33,8 @@ displayTitle: Spark SQL Upgrading Guide - Spark applications which are built with Spark version 2.4 and prior, and call methods of `UserDefinedFunction`, need to be re-compiled with Spark 3.0, as they are not binary compatible with Spark 3.0. + - Since Spark 3.0, JSON datasource uses java.time API for parsing and generating JSON content. New formatting implementation supports date/timestamp patterns conformed to ISO 8601. To switch back to the implementation used in Spark 2.4 and earlier, set `spark.sql.legacy.timeParser.enabled` to `true`. --- End diff -- What is the behavior changes? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23199: [SPARK-26245][SQL] Add Float literal
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/23199#discussion_r238110252 --- Diff: sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 --- @@ -1045,6 +1046,11 @@ DOUBLE_LITERAL | DECIMAL_DIGITS EXPONENT? 'D' {isValidDecimal()}? ; +FLOAT_LITERAL +: DIGIT+ EXPONENT? 'F' +| DECIMAL_DIGITS EXPONENT? 'F' {isValidDecimal()}? --- End diff -- Could you check who supports these Float literals? What are their restrictions? The ANSI SQL standard does not have such a literal. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23199: [SPARK-26245][SQL] Add Float literal
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/23199#discussion_r238109668 --- Diff: sql/core/src/test/resources/sql-tests/inputs/literals.sql --- @@ -53,6 +53,10 @@ select .e3; -- very large decimals (overflowing double). select 1E309, -1E309; +-- float +select 0F, 1F, 1.2F, -1F, -1.2F, 3.4028235E30F; +select 3.4028235E39F; --- End diff -- Could you move the test cases to the end of the file? Also clearly specify the positive and negative test cases in the comments? More test cases are needed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22957: [SPARK-25951][SQL] Ignore aliases for distributio...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/22957#discussion_r238028577 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala --- @@ -223,14 +223,35 @@ abstract class Expression extends TreeNode[Expression] { } /** - * Returns true when two expressions will always compute the same result, even if they differ + * Returns true when two expressions will always compute the same output, even if they differ * cosmetically (i.e. capitalization of names in attributes may be different). * * See [[Canonicalize]] for more details. + * + * This method should be used (instead of `sameResult`) when comparing if 2 expressions are the + * same and one can replace the other (eg. in Optimizer/Analyzer rules where we want to replace + * equivalent expressions). It should not be used (and `sameResult` should be used instead) when + * comparing if 2 expressions produce the same results (in this case `semanticEquals` can be too + * strict). */ def semanticEquals(other: Expression): Boolean = deterministic && other.deterministic && canonicalized == other.canonicalized + /** + * Returns true when two expressions will always compute the same result, even if the output from + * plan perspective may be different, because of different names or similar differences. + * Usually this means that their canonicalized form equals, but it may also not be the case, as + * different output expressions can evaluate to the same result as well (eg. when an expression + * is aliased). + * + * This method should be used (instead of `semanticEquals`) when checking if 2 expressions + * produce the same results (eg. as in the case we are interested to check if the ordering is the + * same). It should not be used (and `semanticEquals` should be used instead) when comparing if 2 + * expressions are the same and one can replace the other. + */ + final def sameResult(other: Expression): Boolean = --- End diff -- Add unit test cases to SubexpressionEliminationSuite? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22957: [SPARK-25951][SQL] Ignore aliases for distributio...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/22957#discussion_r238027941 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala --- @@ -780,6 +780,23 @@ class PlannerSuite extends SharedSQLContext { classOf[PartitioningCollection]) } } + + test("SPARK-25951: avoid redundant shuffle on rename") { --- End diff -- I think the test case coverage is not enough for such massive changes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23183: [SPARK-26226][SQL] Update query tracker to report timeli...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/23183 LGTM Thanks! Merged to master. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23183: [SPARK-26226][SQL] Update query tracker to report...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/23183#discussion_r238014562 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/QueryPlanningTracker.scala --- @@ -51,6 +58,18 @@ object QueryPlanningTracker { } } + /** + * Summary of a phase, with start time and end time so we can construct a timeline. + */ + class PhaseSummary(val startTimeMs: Long, val endTimeMs: Long) { + +def durationMs: Long = endTimeMs - startTimeMs + +override def toString: String = { + s"PhaseSummary($startTimeMs, $endTimeMs)" --- End diff -- Also include the duration in toString? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23153: [SPARK-26147][SQL] only pull out unevaluable pyth...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/23153#discussion_r237944306 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala --- @@ -155,19 +155,20 @@ object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper { } /** - * PythonUDF in join condition can not be evaluated, this rule will detect the PythonUDF - * and pull them out from join condition. For python udf accessing attributes from only one side, - * they are pushed down by operation push down rules. If not (e.g. user disables filter push - * down rules), we need to pull them out in this rule too. + * PythonUDF in join condition can't be evaluated if it refers to attributes from both join sides. + * See `ExtractPythonUDFs` for details. This rule will detect un-evaluable PythonUDF and pull them + * out from join condition. */ object PullOutPythonUDFInJoinCondition extends Rule[LogicalPlan] with PredicateHelper { - def hasPythonUDF(expression: Expression): Boolean = { -expression.collectFirst { case udf: PythonUDF => udf }.isDefined + + private def hasUnevaluablePythonUDF(expr: Expression, j: Join): Boolean = { +expr.find { e => + PythonUDF.isScalarPythonUDF(e) && !canEvaluate(e, j.left) && !canEvaluate(e, j.right) --- End diff -- We might need a comment to explain why we only pull out the Scalar `PythonUDF`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23152: [SPARK-26181][SQL] the `hasMinMaxStats` method of...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/23152#discussion_r237776897 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala --- @@ -2276,4 +2276,16 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { } } + + test("SPARK-26181 hasMinMaxStats method of ColumnStatsMap is not correct") { +withSQLConf(SQLConf.CBO_ENABLED.key -> "true") { + withTable("all_null") { +sql("create table all_null (attrInt int)") +sql("insert into all_null values (null)") +sql("analyze table all_null compute statistics for columns attrInt") +checkAnswer(sql("select * from all_null where attrInt < 1"), Nil) --- End diff -- This will be triggered by `sql("select * from all_null where attrInt < 1").queryExecution.stringWithStats`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23086: [SPARK-25528][SQL] data source v2 API refactor (batch re...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/23086 I still do not think we should mix the catalog support with the data source APIs. Catalog is a well-defined concept. The so-called "table catalog" is not a catalog to me. The data source APIs in this PR looks good to me. Merged to master. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23176: [SPARK-26211][SQL] Fix InSet for binary, and stru...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/23176#discussion_r237766198 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/PredicateSuite.scala --- @@ -293,6 +293,54 @@ class PredicateSuite extends SparkFunSuite with ExpressionEvalHelper { } } + test("INSET: binary") { --- End diff -- Regarding the semantics, InSet is equal to In. Could we combine the test cases? Test both? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23152: [SPARK-26181][SQL] the `hasMinMaxStats` method of `Colum...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/23152 CC @liancheng @juliuszsompolski @cloud-fan --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23152: [SPARK-26181][SQL] the `hasMinMaxStats` method of...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/23152#discussion_r237717671 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala --- @@ -879,13 +879,13 @@ case class ColumnStatsMap(originalMap: AttributeMap[ColumnStat]) { } def hasCountStats(a: Attribute): Boolean = -get(a).map(_.hasCountStats).getOrElse(false) +get(a).exists(_.hasCountStats) def hasDistinctCount(a: Attribute): Boolean = -get(a).map(_.distinctCount.isDefined).getOrElse(false) +get(a).exists(_.distinctCount.isDefined) def hasMinMaxStats(a: Attribute): Boolean = -get(a).map(_.hasCountStats).getOrElse(false) +get(a).exists(_.hasMinMaxStats) --- End diff -- This is a copy-and-paste bug. You can reproduce it by ``` spark.sql("create table Foo1(a int)") spark.sql("insert into Foo1 values (null)") spark.sql("analyze table Foo1 compute statistics for columns a") spark.sql("select * from Foo1 where a < 1").queryExecution.stringWithStats ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21306: [SPARK-24252][SQL] Add catalog registration and table ca...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/21306 A general question. How to use this catalog API to implement the Hive metastore? Is it doable? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21306: [SPARK-24252][SQL] Add catalog registration and t...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21306#discussion_r237699163 --- Diff: sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/Table.java --- @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalog.v2; + +import org.apache.spark.sql.types.StructType; + +import java.util.List; +import java.util.Map; + +/** + * Represents table metadata from a {@link TableCatalog} or other table sources. --- End diff -- Does it include a View, like what we are doing in the CatalogTable? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23068: [SPARK-26098][WebUI] Show associated SQL query in Job pa...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/23068 Could you post the UI if this is a job without using Spark SQL? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23068: [SPARK-26098][WebUI] Show associated SQL query in Job pa...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/23068 This is a pretty useful enhancement. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23160: [SPARK-26196]Total tasks title in the stage page is inco...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/23160 cc @gengliangwang --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23163: [SPARK-26164][SQL] Allow FileFormatWriter to write multi...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/23163 @c21 Any perf number? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23163: [SPARK-26164][SQL] Allow FileFormatWriter to write multi...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/23163 add to whitelist --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23166: [SPARK-26201] Fix python broadcast with encryption
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/23166 cc @HyukjinKwon @ueshin --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23168: [SPARK-26207][doc]add PowerIterationClustering (PIC) doc...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/23168 cc @srowen --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23169: [SPARK-26103][SQL] Limit the length of debug strings for...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/23169 ok to test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23170: [SPARK-24423][FOLLOW-UP][SQL] Fix error example
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/23170 @wangyum Could you check whether the example works before? Is it a regression? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23171: [SPARK-26205][SQL] Optimize In for bytes, shorts, ints
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/23171 Also cc @ueshin --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23131: [SPARK-25908][SQL][FOLLOW-UP] Add back unionAll
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/23131#discussion_r236803943 --- Diff: R/pkg/R/DataFrame.R --- @@ -2732,6 +2732,20 @@ setMethod("union", dataFrame(unioned) }) +#' Return a new SparkDataFrame containing the union of rows +#' +#' This is an alias for `union`. --- End diff -- I see. Instead of directly copying the comments back, we should follow intersectAll. Opened a ticket: https://issues.apache.org/jira/browse/SPARK-26189 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23139: [SPARK-25860][SPARK-26107] [FOLLOW-UP] Rule Repla...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/23139#discussion_r236468395 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceNullWithFalseInPredicate.scala --- @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.expressions.{And, ArrayExists, ArrayFilter, CaseWhen, Expression, If} +import org.apache.spark.sql.catalyst.expressions.{LambdaFunction, Literal, MapFilter, Or} +import org.apache.spark.sql.catalyst.expressions.Literal.FalseLiteral +import org.apache.spark.sql.catalyst.plans.logical.{Filter, Join, LogicalPlan} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.types.BooleanType +import org.apache.spark.util.Utils + + +/** + * A rule that replaces `Literal(null, BooleanType)` with `FalseLiteral`, if possible, in the search + * condition of the WHERE/HAVING/ON(JOIN) clauses, which contain an implicit Boolean operator + * "(search condition) = TRUE". The replacement is only valid when `Literal(null, BooleanType)` is --- End diff -- This is based on the ANSI SQL. All these clauses have the implicit Boolean operator "(search condition) = TRUE". That is why NULL and FALSE do not satisfy the condition in these clauses --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23139: [SPARK-25860][SPARK-26107] [FOLLOW-UP] Rule Repla...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/23139#discussion_r236468109 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceNullWithFalseInPredicate.scala --- @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.expressions.{And, ArrayExists, ArrayFilter, CaseWhen, Expression, If} +import org.apache.spark.sql.catalyst.expressions.{LambdaFunction, Literal, MapFilter, Or} +import org.apache.spark.sql.catalyst.expressions.Literal.FalseLiteral +import org.apache.spark.sql.catalyst.plans.logical.{Filter, Join, LogicalPlan} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.types.BooleanType +import org.apache.spark.util.Utils + + +/** + * A rule that replaces `Literal(null, BooleanType)` with `FalseLiteral`, if possible, in the search + * condition of the WHERE/HAVING/ON(JOIN) clauses, which contain an implicit Boolean operator --- End diff -- The extra scope is covered by "Moreover, ..." --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23139: [SPARK-25860][SPARK-26107] [FOLLOW-UP] Rule Repla...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/23139#discussion_r236456694 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceNullWithFalseInPredicate.scala --- @@ -79,29 +80,31 @@ object ReplaceNullWithFalseInPredicate extends Rule[LogicalPlan] { * an expression that is not [[CaseWhen]], [[If]], [[And]], [[Or]] or * `Literal(null, BooleanType)`. */ - private def replaceNullWithFalse(e: Expression): Expression = { -if (e.dataType != BooleanType) { + private def replaceNullWithFalse(e: Expression): Expression = e match { +case Literal(null, BooleanType) => + FalseLiteral +case And(left, right) => + And(replaceNullWithFalse(left), replaceNullWithFalse(right)) +case Or(left, right) => + Or(replaceNullWithFalse(left), replaceNullWithFalse(right)) +case cw: CaseWhen if cw.dataType == BooleanType => + val newBranches = cw.branches.map { case (cond, value) => +replaceNullWithFalse(cond) -> replaceNullWithFalse(value) + } + val newElseValue = cw.elseValue.map(replaceNullWithFalse) + CaseWhen(newBranches, newElseValue) +case i @ If(pred, trueVal, falseVal) if i.dataType == BooleanType => + If(replaceNullWithFalse(pred), replaceNullWithFalse(trueVal), replaceNullWithFalse(falseVal)) +case e if e.dataType == BooleanType => e -} else { - e match { -case Literal(null, BooleanType) => - FalseLiteral -case And(left, right) => - And(replaceNullWithFalse(left), replaceNullWithFalse(right)) -case Or(left, right) => - Or(replaceNullWithFalse(left), replaceNullWithFalse(right)) -case cw: CaseWhen => - val newBranches = cw.branches.map { case (cond, value) => -replaceNullWithFalse(cond) -> replaceNullWithFalse(value) - } - val newElseValue = cw.elseValue.map(replaceNullWithFalse) - CaseWhen(newBranches, newElseValue) -case If(pred, trueVal, falseVal) => - If(replaceNullWithFalse(pred), -replaceNullWithFalse(trueVal), -replaceNullWithFalse(falseVal)) -case _ => e +case e => + val message = "Expected a Boolean type expression in replaceNullWithFalse, " + +s"but got the type `${e.dataType.catalogString}` in `${e.sql}`." + if (Utils.isTesting) { +throw new IllegalArgumentException(message) --- End diff -- Added a test case. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23139: [SPARK-25860][SPARK-26107] [FOLLOW-UP] Rule Repla...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/23139#discussion_r236450239 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceNullWithFalseInPredicate.scala --- @@ -79,29 +80,31 @@ object ReplaceNullWithFalseInPredicate extends Rule[LogicalPlan] { * an expression that is not [[CaseWhen]], [[If]], [[And]], [[Or]] or * `Literal(null, BooleanType)`. */ - private def replaceNullWithFalse(e: Expression): Expression = { -if (e.dataType != BooleanType) { + private def replaceNullWithFalse(e: Expression): Expression = e match { +case Literal(null, BooleanType) => + FalseLiteral +case And(left, right) => + And(replaceNullWithFalse(left), replaceNullWithFalse(right)) +case Or(left, right) => + Or(replaceNullWithFalse(left), replaceNullWithFalse(right)) +case cw: CaseWhen if cw.dataType == BooleanType => + val newBranches = cw.branches.map { case (cond, value) => +replaceNullWithFalse(cond) -> replaceNullWithFalse(value) + } + val newElseValue = cw.elseValue.map(replaceNullWithFalse) + CaseWhen(newBranches, newElseValue) +case i @ If(pred, trueVal, falseVal) if i.dataType == BooleanType => + If(replaceNullWithFalse(pred), replaceNullWithFalse(trueVal), replaceNullWithFalse(falseVal)) +case e if e.dataType == BooleanType => e -} else { - e match { -case Literal(null, BooleanType) => - FalseLiteral -case And(left, right) => - And(replaceNullWithFalse(left), replaceNullWithFalse(right)) -case Or(left, right) => - Or(replaceNullWithFalse(left), replaceNullWithFalse(right)) -case cw: CaseWhen => - val newBranches = cw.branches.map { case (cond, value) => -replaceNullWithFalse(cond) -> replaceNullWithFalse(value) - } - val newElseValue = cw.elseValue.map(replaceNullWithFalse) - CaseWhen(newBranches, newElseValue) -case If(pred, trueVal, falseVal) => - If(replaceNullWithFalse(pred), -replaceNullWithFalse(trueVal), -replaceNullWithFalse(falseVal)) -case _ => e +case e => + val message = "Expected a Boolean type expression in replaceNullWithFalse, " + +s"but got the type `${e.dataType.catalogString}` in `${e.sql}`." + if (Utils.isTesting) { +throw new IllegalArgumentException(message) --- End diff -- The tests might not catch all the cases if the test coverage is not complete. Such an exception should not block the query execution. Thus, we just throw an exception in our testing mode instead of the production mode. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23139: [SPARK-25860][SPARK-26107] [FOLLOW-UP] Rule Repla...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/23139#discussion_r236377208 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceNullWithFalseInPredicate.scala --- @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.expressions.{And, ArrayExists, ArrayFilter, CaseWhen, Expression, If} +import org.apache.spark.sql.catalyst.expressions.{LambdaFunction, Literal, MapFilter, Or} +import org.apache.spark.sql.catalyst.expressions.Literal.FalseLiteral +import org.apache.spark.sql.catalyst.plans.logical.{Filter, Join, LogicalPlan} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.types.BooleanType + + +/** + * A rule that replaces `Literal(null, BooleanType)` with `FalseLiteral`, if possible, in the search + * condition of the WHERE/HAVING/ON(JOIN) clauses, which contain an implicit Boolean operator + * "(search condition) = TRUE". The replacement is only valid when `Literal(null, BooleanType)` is + * semantically equivalent to `FalseLiteral` when evaluating the whole search condition. + * + * Please note that FALSE and NULL are not exchangeable in most cases, when the search condition + * contains NOT and NULL-tolerant expressions. Thus, the rule is very conservative and applicable + * in very limited cases. + * + * For example, `Filter(Literal(null, BooleanType))` is equal to `Filter(FalseLiteral)`. + * + * Another example containing branches is `Filter(If(cond, FalseLiteral, Literal(null, _)))`; + * this can be optimized to `Filter(If(cond, FalseLiteral, FalseLiteral))`, and eventually + * `Filter(FalseLiteral)`. + * + * Moreover, this rule also transforms predicates in all [[If]] expressions as well as branch + * conditions in all [[CaseWhen]] expressions, even if they are not part of the search conditions. + * + * For example, `Project(If(And(cond, Literal(null)), Literal(1), Literal(2)))` can be simplified + * into `Project(Literal(2))`. + */ +object ReplaceNullWithFalseInPredicate extends Rule[LogicalPlan] { + + def apply(plan: LogicalPlan): LogicalPlan = plan transform { +case f @ Filter(cond, _) => f.copy(condition = replaceNullWithFalse(cond)) +case j @ Join(_, _, _, Some(cond)) => j.copy(condition = Some(replaceNullWithFalse(cond))) +case p: LogicalPlan => p transformExpressions { + case i @ If(pred, _, _) => i.copy(predicate = replaceNullWithFalse(pred)) + case cw @ CaseWhen(branches, _) => +val newBranches = branches.map { case (cond, value) => + replaceNullWithFalse(cond) -> value +} +cw.copy(branches = newBranches) + case af @ ArrayFilter(_, lf @ LambdaFunction(func, _, _)) => +val newLambda = lf.copy(function = replaceNullWithFalse(func)) +af.copy(function = newLambda) + case ae @ ArrayExists(_, lf @ LambdaFunction(func, _, _)) => +val newLambda = lf.copy(function = replaceNullWithFalse(func)) +ae.copy(function = newLambda) + case mf @ MapFilter(_, lf @ LambdaFunction(func, _, _)) => +val newLambda = lf.copy(function = replaceNullWithFalse(func)) +mf.copy(function = newLambda) +} + } + + /** + * Recursively traverse the Boolean-type expression to replace + * `Literal(null, BooleanType)` with `FalseLiteral`, if possible. + * + * Note that `transformExpressionsDown` can not be used here as we must stop as soon as we hit + * an expression that is not [[CaseWhen]], [[If]], [[And]], [[Or]] or + * `Literal(null, BooleanType)`. + */ + private def replaceNullWithFalse(e: Expression): Expression = { +if (e.dataType != BooleanType) { --- End diff -- Had an offl
[GitHub] spark pull request #23130: [SPARK-26161][SQL] Ignore empty files in load
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/23130#discussion_r236361995 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala --- @@ -388,7 +388,7 @@ case class FileSourceScanExec( logInfo(s"Planning with ${bucketSpec.numBuckets} buckets") val filesGroupedToBuckets = selectedPartitions.flatMap { p => -p.files.map { f => +p.files.filter(_.getLen > 0).map { f => --- End diff -- This `createBucketedReadRDD ` is for the bucket table, right? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23139: [SPARK-25860][SPARK-26107] [FOLLOW-UP] Rule Repla...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/23139#discussion_r236157802 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceNullWithFalseInPredicate.scala --- @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.expressions.{And, ArrayExists, ArrayFilter, CaseWhen, Expression, If} +import org.apache.spark.sql.catalyst.expressions.{LambdaFunction, Literal, MapFilter, Or} +import org.apache.spark.sql.catalyst.expressions.Literal.FalseLiteral +import org.apache.spark.sql.catalyst.plans.logical.{Filter, Join, LogicalPlan} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.types.BooleanType + + +/** + * A rule that replaces `Literal(null, BooleanType)` with `FalseLiteral`, if possible, in the search + * condition of the WHERE/HAVING/ON(JOIN) clauses, which contain an implicit Boolean operator + * "(search condition) = TRUE". The replacement is only valid when `Literal(null, BooleanType)` is + * semantically equivalent to `FalseLiteral` when evaluating the whole search condition. + * + * Please note that FALSE and NULL are not exchangeable in most cases, when the search condition + * contains NOT and NULL-tolerant expressions. Thus, the rule is very conservative and applicable + * in very limited cases. + * + * For example, `Filter(Literal(null, BooleanType))` is equal to `Filter(FalseLiteral)`. + * + * Another example containing branches is `Filter(If(cond, FalseLiteral, Literal(null, _)))`; + * this can be optimized to `Filter(If(cond, FalseLiteral, FalseLiteral))`, and eventually + * `Filter(FalseLiteral)`. + * + * Moreover, this rule also transforms predicates in all [[If]] expressions as well as branch + * conditions in all [[CaseWhen]] expressions, even if they are not part of the search conditions. + * + * For example, `Project(If(And(cond, Literal(null)), Literal(1), Literal(2)))` can be simplified + * into `Project(Literal(2))`. + */ +object ReplaceNullWithFalseInPredicate extends Rule[LogicalPlan] { + + def apply(plan: LogicalPlan): LogicalPlan = plan transform { +case f @ Filter(cond, _) => f.copy(condition = replaceNullWithFalse(cond)) +case j @ Join(_, _, _, Some(cond)) => j.copy(condition = Some(replaceNullWithFalse(cond))) +case p: LogicalPlan => p transformExpressions { + case i @ If(pred, _, _) => i.copy(predicate = replaceNullWithFalse(pred)) + case cw @ CaseWhen(branches, _) => +val newBranches = branches.map { case (cond, value) => + replaceNullWithFalse(cond) -> value +} +cw.copy(branches = newBranches) + case af @ ArrayFilter(_, lf @ LambdaFunction(func, _, _)) => +val newLambda = lf.copy(function = replaceNullWithFalse(func)) +af.copy(function = newLambda) + case ae @ ArrayExists(_, lf @ LambdaFunction(func, _, _)) => +val newLambda = lf.copy(function = replaceNullWithFalse(func)) +ae.copy(function = newLambda) + case mf @ MapFilter(_, lf @ LambdaFunction(func, _, _)) => +val newLambda = lf.copy(function = replaceNullWithFalse(func)) +mf.copy(function = newLambda) +} + } + + /** + * Recursively traverse the Boolean-type expression to replace + * `Literal(null, BooleanType)` with `FalseLiteral`, if possible. + * + * Note that `transformExpressionsDown` can not be used here as we must stop as soon as we hit + * an expression that is not [[CaseWhen]], [[If]], [[And]], [[Or]] or + * `Literal(null, BooleanType)`. + */ + private def replaceNullWithFalse(e: Expression): Expression = { +if (e.dataType != BooleanType) { --- End diff -- See
[GitHub] spark issue #23139: [SPARK-25860][SPARK-26107] [FOLLOW-UP] Rule ReplaceNullW...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/23139 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23139: [SPARK-25860][SPARK-26107] [FOLLOW-UP] Rule Repla...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/23139#discussion_r236157275 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala --- @@ -736,69 +736,3 @@ object CombineConcats extends Rule[LogicalPlan] { flattenConcats(concat) } } - -/** - * A rule that replaces `Literal(null, _)` with `FalseLiteral` for further optimizations. - * - * This rule applies to conditions in [[Filter]] and [[Join]]. Moreover, it transforms predicates - * in all [[If]] expressions as well as branch conditions in all [[CaseWhen]] expressions. - * - * For example, `Filter(Literal(null, _))` is equal to `Filter(FalseLiteral)`. - * - * Another example containing branches is `Filter(If(cond, FalseLiteral, Literal(null, _)))`; - * this can be optimized to `Filter(If(cond, FalseLiteral, FalseLiteral))`, and eventually - * `Filter(FalseLiteral)`. - * - * As this rule is not limited to conditions in [[Filter]] and [[Join]], arbitrary plans can - * benefit from it. For example, `Project(If(And(cond, Literal(null)), Literal(1), Literal(2)))` - * can be simplified into `Project(Literal(2))`. - * - * As a result, many unnecessary computations can be removed in the query optimization phase. - */ -object ReplaceNullWithFalseInPredicate extends Rule[LogicalPlan] { - - def apply(plan: LogicalPlan): LogicalPlan = plan transform { -case f @ Filter(cond, _) => f.copy(condition = replaceNullWithFalse(cond)) -case j @ Join(_, _, _, Some(cond)) => j.copy(condition = Some(replaceNullWithFalse(cond))) -case p: LogicalPlan => p transformExpressions { - case i @ If(pred, _, _) => i.copy(predicate = replaceNullWithFalse(pred)) - case cw @ CaseWhen(branches, _) => -val newBranches = branches.map { case (cond, value) => - replaceNullWithFalse(cond) -> value -} -cw.copy(branches = newBranches) - case af @ ArrayFilter(_, lf @ LambdaFunction(func, _, _)) => -val newLambda = lf.copy(function = replaceNullWithFalse(func)) -af.copy(function = newLambda) - case ae @ ArrayExists(_, lf @ LambdaFunction(func, _, _)) => -val newLambda = lf.copy(function = replaceNullWithFalse(func)) -ae.copy(function = newLambda) - case mf @ MapFilter(_, lf @ LambdaFunction(func, _, _)) => -val newLambda = lf.copy(function = replaceNullWithFalse(func)) -mf.copy(function = newLambda) -} - } - - /** - * Recursively replaces `Literal(null, _)` with `FalseLiteral`. - * - * Note that `transformExpressionsDown` can not be used here as we must stop as soon as we hit - * an expression that is not [[CaseWhen]], [[If]], [[And]], [[Or]] or `Literal(null, _)`. - */ - private def replaceNullWithFalse(e: Expression): Expression = e match { -case cw: CaseWhen if cw.dataType == BooleanType => - val newBranches = cw.branches.map { case (cond, value) => -replaceNullWithFalse(cond) -> replaceNullWithFalse(value) - } - val newElseValue = cw.elseValue.map(replaceNullWithFalse) - CaseWhen(newBranches, newElseValue) -case i @ If(pred, trueVal, falseVal) if i.dataType == BooleanType => - If(replaceNullWithFalse(pred), replaceNullWithFalse(trueVal), replaceNullWithFalse(falseVal)) -case And(left, right) => - And(replaceNullWithFalse(left), replaceNullWithFalse(right)) -case Or(left, right) => - Or(replaceNullWithFalse(left), replaceNullWithFalse(right)) -case Literal(null, _) => FalseLiteral --- End diff -- How about this line? What happened if the input data type of `e` is not Boolean? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23104: [SPARK-26138][SQL] LimitPushDown cross join requi...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/23104#discussion_r236137768 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -459,6 +459,7 @@ object LimitPushDown extends Rule[LogicalPlan] { val newJoin = joinType match { case RightOuter => join.copy(right = maybePushLocalLimit(exp, right)) case LeftOuter => join.copy(left = maybePushLocalLimit(exp, left)) +case Cross => join.copy(left = maybePushLocalLimit(exp, left), right = maybePushLocalLimit(exp, right)) --- End diff -- @guoxiaolongzte nope. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23104: [SPARK-26138][SQL] LimitPushDown cross join requires may...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/23104 The title has a typo. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23104: [SPARK-26138][SQL] LimitPushDown cross join requi...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/23104#discussion_r236137426 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -459,6 +459,7 @@ object LimitPushDown extends Rule[LogicalPlan] { val newJoin = joinType match { case RightOuter => join.copy(right = maybePushLocalLimit(exp, right)) case LeftOuter => join.copy(left = maybePushLocalLimit(exp, left)) +case Cross => join.copy(left = maybePushLocalLimit(exp, left), right = maybePushLocalLimit(exp, right)) --- End diff -- +1 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23138: [SPARK-23356][SQL][TEST] add new test cases for a...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/23138#discussion_r236136228 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationSuite.scala --- @@ -196,4 +196,31 @@ class SetOperationSuite extends PlanTest { )) comparePlans(expectedPlan, rewrittenPlan) } + + test("SPARK-23356 union: expressions in project list are addition to each side") { +val unionQuery = testUnion.select(('a + 1).as("aa")) +val unionOptimized = Optimize.execute(unionQuery.analyze) +val unionCorrectAnswer = + Union(testRelation.select(('a + 1).as("aa")) :: +testRelation2.select(('d + 1).as("aa")) :: +testRelation3.select(('g + 1).as("aa")) :: Nil).analyze +comparePlans(unionOptimized, unionCorrectAnswer) + } + + test("SPARK-23356 union: expressions in project list are attribute addition to each side") { --- End diff -- the same here --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23138: [SPARK-23356][SQL][TEST] add new test cases for a...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/23138#discussion_r236136273 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationSuite.scala --- @@ -196,4 +196,31 @@ class SetOperationSuite extends PlanTest { )) comparePlans(expectedPlan, rewrittenPlan) } + + test("SPARK-23356 union: expressions in project list are addition to each side") { +val unionQuery = testUnion.select(('a + 1).as("aa")) +val unionOptimized = Optimize.execute(unionQuery.analyze) +val unionCorrectAnswer = + Union(testRelation.select(('a + 1).as("aa")) :: +testRelation2.select(('d + 1).as("aa")) :: +testRelation3.select(('g + 1).as("aa")) :: Nil).analyze +comparePlans(unionOptimized, unionCorrectAnswer) + } + + test("SPARK-23356 union: expressions in project list are attribute addition to each side") { +val unionQuery = testUnion.select(('a + 'b).as("ab")) +val unionOptimized = Optimize.execute(unionQuery.analyze) +val unionCorrectAnswer = + Union(testRelation.select(('a + 'b).as("ab")) :: +testRelation2.select(('d + 'e).as("ab")) :: +testRelation3.select(('g + 'h).as("ab")) :: Nil).analyze +comparePlans(unionOptimized, unionCorrectAnswer) + } + + test("SPARK-23356 union: project don't each side with non-deterministic expression") { --- End diff -- no pushdown for non-deterministic expression --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23138: [SPARK-23356][SQL][TEST] add new test cases for a...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/23138#discussion_r236136178 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationSuite.scala --- @@ -196,4 +196,31 @@ class SetOperationSuite extends PlanTest { )) comparePlans(expectedPlan, rewrittenPlan) } + + test("SPARK-23356 union: expressions in project list are addition to each side") { --- End diff -- `are addition to each side` -> `are pushed down` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23130: [SPARK-26161][SQL] Ignore empty files in load
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/23130#discussion_r236135787 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala --- @@ -388,7 +388,7 @@ case class FileSourceScanExec( logInfo(s"Planning with ${bucketSpec.numBuckets} buckets") val filesGroupedToBuckets = selectedPartitions.flatMap { p => -p.files.map { f => +p.files.filter(_.getLen > 0).map { f => --- End diff -- Do we have a test case for this line? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23130: [SPARK-26161][SQL] Ignore empty files in load
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/23130#discussion_r236135647 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala --- @@ -388,7 +388,7 @@ case class FileSourceScanExec( logInfo(s"Planning with ${bucketSpec.numBuckets} buckets") val filesGroupedToBuckets = selectedPartitions.flatMap { p => -p.files.map { f => +p.files.filter(_.getLen > 0).map { f => --- End diff -- do the filtering inside the map? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23131: [SPARK-25908][SQL][FOLLOW-UP] Add back unionAll
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/23131 Thanks! Merged to master. Yes. Adding Distinct over Union is super expensive especially when the underlying data set is huge. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23139: [SPARK-25860][SPARK-26107] [FOLLOW-UP] Rule Repla...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/23139#discussion_r236120731 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceNullWithFalseInPredicate.scala --- @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.expressions.{And, ArrayExists, ArrayFilter, CaseWhen, Expression, If} +import org.apache.spark.sql.catalyst.expressions.{LambdaFunction, Literal, MapFilter, Or} +import org.apache.spark.sql.catalyst.expressions.Literal.FalseLiteral +import org.apache.spark.sql.catalyst.plans.logical.{Filter, Join, LogicalPlan} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.types.BooleanType + + +/** + * A rule that replaces `Literal(null, BooleanType)` with `FalseLiteral`, if possible, in the search + * condition of the WHERE/HAVING/ON(JOIN) clauses, which contain an implicit Boolean operator + * "(search condition) = TRUE". The replacement is only valid when `Literal(null, BooleanType)` is + * semantically equivalent to `FalseLiteral` when evaluating the whole search condition. + * + * Please note that FALSE and NULL are not exchangeable in most cases, when the search condition + * contains NOT and NULL-tolerant expressions. Thus, the rule is very conservative and applicable + * in very limited cases. + * + * For example, `Filter(Literal(null, BooleanType))` is equal to `Filter(FalseLiteral)`. + * + * Another example containing branches is `Filter(If(cond, FalseLiteral, Literal(null, _)))`; + * this can be optimized to `Filter(If(cond, FalseLiteral, FalseLiteral))`, and eventually + * `Filter(FalseLiteral)`. + * + * Moreover, this rule also transforms predicates in all [[If]] expressions as well as branch + * conditions in all [[CaseWhen]] expressions, even if they are not part of the search conditions. + * + * For example, `Project(If(And(cond, Literal(null)), Literal(1), Literal(2)))` can be simplified + * into `Project(Literal(2))`. + */ +object ReplaceNullWithFalseInPredicate extends Rule[LogicalPlan] { + + def apply(plan: LogicalPlan): LogicalPlan = plan transform { +case f @ Filter(cond, _) => f.copy(condition = replaceNullWithFalse(cond)) +case j @ Join(_, _, _, Some(cond)) => j.copy(condition = Some(replaceNullWithFalse(cond))) +case p: LogicalPlan => p transformExpressions { + case i @ If(pred, _, _) => i.copy(predicate = replaceNullWithFalse(pred)) + case cw @ CaseWhen(branches, _) => +val newBranches = branches.map { case (cond, value) => + replaceNullWithFalse(cond) -> value +} +cw.copy(branches = newBranches) + case af @ ArrayFilter(_, lf @ LambdaFunction(func, _, _)) => +val newLambda = lf.copy(function = replaceNullWithFalse(func)) +af.copy(function = newLambda) + case ae @ ArrayExists(_, lf @ LambdaFunction(func, _, _)) => +val newLambda = lf.copy(function = replaceNullWithFalse(func)) +ae.copy(function = newLambda) + case mf @ MapFilter(_, lf @ LambdaFunction(func, _, _)) => +val newLambda = lf.copy(function = replaceNullWithFalse(func)) +mf.copy(function = newLambda) +} + } + + /** + * Recursively traverse the Boolean-type expression to replace + * `Literal(null, BooleanType)` with `FalseLiteral`, if possible. + * + * Note that `transformExpressionsDown` can not be used here as we must stop as soon as we hit + * an expression that is not [[CaseWhen]], [[If]], [[And]], [[Or]] or + * `Literal(null, BooleanType)`. + */ + private def replaceNullWithFalse(e: Expression): Expression = { +if (e.dataType != BooleanType) { --- End diff -- How about the
[GitHub] spark pull request #23135: [SPARK-26168][SQL] Update the code comments in Ex...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/23135#discussion_r236117773 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala --- @@ -43,9 +43,24 @@ import org.apache.spark.sql.types._ * There are a few important traits: * * - [[Nondeterministic]]: an expression that is not deterministic. + * - [[Stateful]]: an expression that contains mutable state. For example, MonotonicallyIncreasingID + * and Rand. A stateful expression is always non-deterministic. * - [[Unevaluable]]: an expression that is not supposed to be evaluated. * - [[CodegenFallback]]: an expression that does not have code gen implemented and falls back to *interpreted mode. + * - [[NullIntolerant]]: an expression that is null intolerant (i.e. any null input will result in + * null output). + * - [[NonSQLExpression]]: a common base trait for the expressions that doesn't have SQL + * expressions like representation. For example, `ScalaUDF`, `ScalaUDAF`, + * and object `MapObjects` and `Invoke`. + * - [[UserDefinedExpression]]: a common base trait for user-defined functions, including + * UDF/UDAF/UDTF. + * - [[HigherOrderFunction]]: a common base trait for higher order functions that take one or more + *(lambda) functions and applies these to some objects. The function + *produces a number of variables which can be consumed by some lambda + *function. + * - [[NamedExpression]]: An [[Expression]] that is named. + * - [[TimeZoneAwareExpression]]: A common base trait for time zone aware expressions. --- End diff -- Added. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23139: [SPARK-25860][SPARK-26107] [FOLLOW-UP] Rule ReplaceNullW...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/23139 cc @dbtsai @aokolnychyi @rednaxelafx @cloud-fan --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23139: [SPARK-25860][SPARK-26107] [FOLLOW-UP] Rule Repla...
GitHub user gatorsmile opened a pull request: https://github.com/apache/spark/pull/23139 [SPARK-25860][SPARK-26107] [FOLLOW-UP] Rule ReplaceNullWithFalseInPredicate ## What changes were proposed in this pull request? Based on https://github.com/apache/spark/pull/22857 and https://github.com/apache/spark/pull/23079, this PR did a few updates - Limit the data types of NULL to Boolean. - Limit the input data type of replaceNullWithFalse to Boolean - Create a new file for the rule ReplaceNullWithFalseInPredicate - Update the description of this rule. ## How was this patch tested? N/A You can merge this pull request into a Git repository by running: $ git pull https://github.com/gatorsmile/spark followupSpark-25860 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/23139.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #23139 commit d2c5e814439a049b4cb7a28ae4802b49d164 Author: gatorsmile Date: 2018-11-26T03:00:27Z fix commit 6b6997d6c5eedb9a75af61345ae808c9d98e6f4d Author: gatorsmile Date: 2018-11-26T03:18:05Z fix --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22857: [SPARK-25860][SQL] Replace Literal(null, _) with ...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/22857#discussion_r236098905 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala --- @@ -736,3 +736,60 @@ object CombineConcats extends Rule[LogicalPlan] { flattenConcats(concat) } } + +/** + * A rule that replaces `Literal(null, _)` with `FalseLiteral` for further optimizations. + * + * This rule applies to conditions in [[Filter]] and [[Join]]. Moreover, it transforms predicates + * in all [[If]] expressions as well as branch conditions in all [[CaseWhen]] expressions. + * + * For example, `Filter(Literal(null, _))` is equal to `Filter(FalseLiteral)`. + * + * Another example containing branches is `Filter(If(cond, FalseLiteral, Literal(null, _)))`; + * this can be optimized to `Filter(If(cond, FalseLiteral, FalseLiteral))`, and eventually + * `Filter(FalseLiteral)`. + * + * As this rule is not limited to conditions in [[Filter]] and [[Join]], arbitrary plans can + * benefit from it. For example, `Project(If(And(cond, Literal(null)), Literal(1), Literal(2)))` + * can be simplified into `Project(Literal(2))`. + * + * As a result, many unnecessary computations can be removed in the query optimization phase. + */ +object ReplaceNullWithFalse extends Rule[LogicalPlan] { --- End diff -- Let us move it to a new file. The file is growing too big. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22857: [SPARK-25860][SQL] Replace Literal(null, _) with ...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/22857#discussion_r236098841 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala --- @@ -736,3 +736,60 @@ object CombineConcats extends Rule[LogicalPlan] { flattenConcats(concat) } } + +/** + * A rule that replaces `Literal(null, _)` with `FalseLiteral` for further optimizations. + * + * This rule applies to conditions in [[Filter]] and [[Join]]. Moreover, it transforms predicates + * in all [[If]] expressions as well as branch conditions in all [[CaseWhen]] expressions. + * + * For example, `Filter(Literal(null, _))` is equal to `Filter(FalseLiteral)`. + * + * Another example containing branches is `Filter(If(cond, FalseLiteral, Literal(null, _)))`; + * this can be optimized to `Filter(If(cond, FalseLiteral, FalseLiteral))`, and eventually + * `Filter(FalseLiteral)`. + * + * As this rule is not limited to conditions in [[Filter]] and [[Join]], arbitrary plans can + * benefit from it. For example, `Project(If(And(cond, Literal(null)), Literal(1), Literal(2)))` + * can be simplified into `Project(Literal(2))`. + * + * As a result, many unnecessary computations can be removed in the query optimization phase. + */ +object ReplaceNullWithFalse extends Rule[LogicalPlan] { + + def apply(plan: LogicalPlan): LogicalPlan = plan transform { +case f @ Filter(cond, _) => f.copy(condition = replaceNullWithFalse(cond)) +case j @ Join(_, _, _, Some(cond)) => j.copy(condition = Some(replaceNullWithFalse(cond))) +case p: LogicalPlan => p transformExpressions { + case i @ If(pred, _, _) => i.copy(predicate = replaceNullWithFalse(pred)) + case cw @ CaseWhen(branches, _) => +val newBranches = branches.map { case (cond, value) => + replaceNullWithFalse(cond) -> value +} +cw.copy(branches = newBranches) +} + } + + /** + * Recursively replaces `Literal(null, _)` with `FalseLiteral`. + * + * Note that `transformExpressionsDown` can not be used here as we must stop as soon as we hit + * an expression that is not [[CaseWhen]], [[If]], [[And]], [[Or]] or `Literal(null, _)`. + */ + private def replaceNullWithFalse(e: Expression): Expression = e match { +case cw: CaseWhen if cw.dataType == BooleanType => + val newBranches = cw.branches.map { case (cond, value) => +replaceNullWithFalse(cond) -> replaceNullWithFalse(value) + } + val newElseValue = cw.elseValue.map(replaceNullWithFalse) + CaseWhen(newBranches, newElseValue) +case i @ If(pred, trueVal, falseVal) if i.dataType == BooleanType => + If(replaceNullWithFalse(pred), replaceNullWithFalse(trueVal), replaceNullWithFalse(falseVal)) +case And(left, right) => + And(replaceNullWithFalse(left), replaceNullWithFalse(right)) +case Or(left, right) => + Or(replaceNullWithFalse(left), replaceNullWithFalse(right)) +case Literal(null, _) => FalseLiteral --- End diff -- Here, for safety, we should check the data types. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23135: [SPARK-26168][SQL] Update the code comments in Ex...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/23135#discussion_r236097033 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala --- @@ -575,6 +575,19 @@ case class Range( } } +/** + * This is a Group by operator with the aggregate functions and projections. + * + * @param groupingExpressions expressions for grouping keys + * @param aggregateExpressions expressions for a project list, which could contain + * [[org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction]]s. + * + * Note: Currently, aggregateExpressions correspond to both [[AggregateExpression]] and the output --- End diff -- Removed it from the description. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23137: [SPARK-26169] Create DataFrameSetOperationsSuite
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/23137 CC @cloud-fan --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23137: [SPARK-26169] Create DataFrameSetOperationsSuite
GitHub user gatorsmile opened a pull request: https://github.com/apache/spark/pull/23137 [SPARK-26169] Create DataFrameSetOperationsSuite ## What changes were proposed in this pull request? Create a new suite DataFrameSetOperationsSuite for the test cases of DataFrame/Dataset's set operations. Also, add test cases of NULL handling for Array Except and Array Intersect. ## How was this patch tested? N/A You can merge this pull request into a Git repository by running: $ git pull https://github.com/gatorsmile/spark setOpsTest Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/23137.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #23137 commit 4544d7a8cfa5e645d6c4b944e1bb8d8b02eb0206 Author: gatorsmile Date: 2018-10-15T17:12:14Z fix commit fa0b5be373ad3f3351847bcc4e9d32b9d14d67a1 Author: gatorsmile Date: 2018-10-16T17:46:04Z fix commit 14a927d2ae6bc68ae3c2b8e65706c0c67ad4342d Author: gatorsmile Date: 2018-11-25T23:30:34Z Merge remote-tracking branch 'upstream/master' into setOpsTest --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23135: [SPARK-26168] Update the code comments in Express...
GitHub user gatorsmile opened a pull request: https://github.com/apache/spark/pull/23135 [SPARK-26168] Update the code comments in Expression and Aggregate ## What changes were proposed in this pull request? This PR is to improve the code comments to document some common traits and traps about the expression. ## How was this patch tested? N/A You can merge this pull request into a Git repository by running: $ git pull https://github.com/gatorsmile/spark addcomments Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/23135.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #23135 commit a6e725641f86309638dc1daf82d8e5e592df1fed Author: gatorsmile Date: 2018-11-25T20:44:23Z fix --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23135: [SPARK-26168] Update the code comments in Expression and...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/23135 cc @rxin @rednaxelafx @cloud-fan --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23131: [SPARK-25908][SQL][FOLLOW-UP] Add back unionAll
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/23131 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23131: [SPARK-25908][SQL][FOLLOW-UP] Add back unionAll
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/23131#discussion_r236054278 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala --- @@ -1852,6 +1852,19 @@ class Dataset[T] private[sql]( CombineUnions(Union(logicalPlan, other.logicalPlan)) } + /** + * Returns a new Dataset containing union of rows in this Dataset and another Dataset. --- End diff -- Done. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23131: [SPARK-25908][SQL][FOLLOW-UP] Add back unionAll
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/23131 cc @rxin @srowen @cloud-fan --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23131: [SPARK-25908][SQL][FOLLOW-UP] Add back unionAll
GitHub user gatorsmile opened a pull request: https://github.com/apache/spark/pull/23131 [SPARK-25908][SQL][FOLLOW-UP] Add back unionAll ## What changes were proposed in this pull request? This PR is to add back `unionAll`, which is widely used. The name is also consistent with our ANSI SQL. We also have the corresponding `IntersectAll` and `exceptAll`, which were introduced in Spark 2.4. ## How was this patch tested? Added a test case in DataFrameSuite You can merge this pull request into a Git repository by running: $ git pull https://github.com/gatorsmile/spark addBackUnionAll Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/23131.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #23131 commit 133246d973eb516ebc12ba5bb49cd30ba4f108f9 Author: gatorsmile Date: 2018-11-24T20:04:52Z Add back unionAll --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23128: [SPARK-26142][SQL] Support passing shuffle metrics to ex...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/23128 @xuanyuanking Could you address the conflicts? Thanks for you fast work! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22466: [SPARK-25464][SQL] Create Database to the location,only ...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/22466 Regarding the potentially high cost of file listing, `CREATE DATABASE` is not a frequent operation. The cost is high only if the target directory is non-empty with many many files. We are blocking users from creating such a database. Thus, the cost is not a big deal I think. We need to list this behavior change in the SQL migration guide. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22466: [SPARK-25464][SQL] Create Database to the locatio...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/22466#discussion_r236005686 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala --- @@ -2370,4 +2370,17 @@ class HiveDDLSuite )) } } + + test("SPARK-25464 create a database with a non empty location") { --- End diff -- Do we have a test case to check "create a database with an empty location"? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23096: [SPARK-26129][SQL] Instrumentation for per-query plannin...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/23096 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/22237#discussion_r234789275 --- Diff: R/pkg/tests/fulltests/test_sparkSQL.R --- @@ -1694,7 +1694,7 @@ test_that("column functions", { df <- as.DataFrame(list(list("col" = "{\"date\":\"21/10/2014\"}"))) schema2 <- structType(structField("date", "date")) s <- collect(select(df, from_json(df$col, schema2))) - expect_equal(s[[1]][[1]], NA) + expect_equal(s[[1]][[1]]$date, NA) --- End diff -- What is the reason we made this change? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22721: [SPARK-19784][SPARK-25403][SQL] Refresh the table even t...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/22721 cc @jiangxb1987 Could you take a look at this? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org