[GitHub] spark pull request #18193: [SPARK-15616] [SQL] CatalogRelation should fallba...
Github user lianhuiwang commented on a diff in the pull request: https://github.com/apache/spark/pull/18193#discussion_r139879632 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala --- @@ -140,6 +141,62 @@ class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] { } /** + * + * TODO: merge this with PruneFileSourcePartitions after we completely make hive as a data source. + */ +case class PruneHiveTablePartitions( +session: SparkSession) extends Rule[LogicalPlan] with PredicateHelper { + override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown { +case filter @ Filter(condition, relation: HiveTableRelation) if relation.isPartitioned => + val predicates = splitConjunctivePredicates(condition) + val normalizedFilters = predicates.map { e => +e transform { + case a: AttributeReference => +a.withName(relation.output.find(_.semanticEquals(a)).get.name) +} + } + val partitionSet = AttributeSet(relation.partitionCols) + val pruningPredicates = normalizedFilters.filter { predicate => +!predicate.references.isEmpty && + predicate.references.subsetOf(partitionSet) + } + if (pruningPredicates.nonEmpty && session.sessionState.conf.fallBackToHdfsForStatsEnabled && +session.sessionState.conf.metastorePartitionPruning) { +val prunedPartitions = session.sharedState.externalCatalog.listPartitionsByFilter( + relation.tableMeta.database, + relation.tableMeta.identifier.table, + pruningPredicates, + session.sessionState.conf.sessionLocalTimeZone) +val sizeInBytes = try { + prunedPartitions.map { part => +val totalSize = part.parameters.get(StatsSetupConst.TOTAL_SIZE).map(_.toLong) +val rawDataSize = part.parameters.get(StatsSetupConst.RAW_DATA_SIZE).map(_.toLong) +if (totalSize.isDefined && totalSize.get > 0L) { --- End diff -- @cenyuhai Yes,I think what you said is right.Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18193: [SPARK-15616] [SQL] CatalogRelation should fallba...
Github user lianhuiwang commented on a diff in the pull request: https://github.com/apache/spark/pull/18193#discussion_r139740180 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala --- @@ -139,6 +138,54 @@ class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] { } /** + * + * TODO: merge this with PruneFileSourcePartitions after we completely make hive as a data source. + */ +case class PruneHiveTablePartitions( +session: SparkSession) extends Rule[LogicalPlan] with PredicateHelper { + override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown { +case filter @ Filter(condition, relation: HiveTableRelation) if relation.isPartitioned => + val predicates = splitConjunctivePredicates(condition) + val normalizedFilters = predicates.map { e => +e transform { + case a: AttributeReference => +a.withName(relation.output.find(_.semanticEquals(a)).get.name) +} + } + val partitionSet = AttributeSet(relation.partitionCols) + val pruningPredicates = normalizedFilters.filter { predicate => +!predicate.references.isEmpty && + predicate.references.subsetOf(partitionSet) + } + if (pruningPredicates.nonEmpty && session.sessionState.conf.fallBackToHdfsForStatsEnabled && +session.sessionState.conf.metastorePartitionPruning) { +val prunedPartitions = session.sharedState.externalCatalog.listPartitionsByFilter( + relation.tableMeta.database, + relation.tableMeta.identifier.table, + pruningPredicates, + session.sessionState.conf.sessionLocalTimeZone) +val sizeInBytes = try { + prunedPartitions.map { part => +CommandUtils.calculateLocationSize( --- End diff -- @cenyuhai yes,Good idea. I will add it.Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18193: [SPARK-15616] [SQL] CatalogRelation should fallback to H...
Github user lianhuiwang commented on the issue: https://github.com/apache/spark/pull/18193 @cloud-fan I have address your comments. Thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18193: [SPARK-15616] [SQL] CatalogRelation should fallback to H...
Github user lianhuiwang commented on the issue: https://github.com/apache/spark/pull/18193 retest it please. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18193: [SPARK-15616] [SQL] CatalogRelation should fallback to H...
Github user lianhuiwang commented on the issue: https://github.com/apache/spark/pull/18193 @cloud-fan PruneFileSourcePartitions is kind of a rule for datasource, But now we cannot make hive as a data source. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18193: [SPARK-15616] [SQL] CatalogRelation should fallba...
Github user lianhuiwang commented on a diff in the pull request: https://github.com/apache/spark/pull/18193#discussion_r134523133 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala --- @@ -139,6 +138,54 @@ class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] { } } +case class DeterminePartitionedTableStats( --- End diff -- yes, i will rename it to PruneHiveTablePartitions. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14285: [SPARK-16649][SQL] Push partition predicates down into m...
Github user lianhuiwang commented on the issue: https://github.com/apache/spark/pull/14285 @cloud-fan OK, I will close it. Thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14285: [SPARK-16649][SQL] Push partition predicates down...
Github user lianhuiwang closed the pull request at: https://github.com/apache/spark/pull/14285 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14655: [SPARK-16669][SQL]Adding partition prunning to Metastore...
Github user lianhuiwang commented on the issue: https://github.com/apache/spark/pull/14655 @wzhfy Yes, I think this is same with SPARK-15616. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18193: [SPARK-15616] [SQL] CatalogRelation should fallba...
Github user lianhuiwang commented on a diff in the pull request: https://github.com/apache/spark/pull/18193#discussion_r121985050 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala --- @@ -139,6 +138,54 @@ class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] { } } +case class DeterminePartitionedTableStats( +session: SparkSession) extends Rule[LogicalPlan] with PredicateHelper { + override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown { +case filter @ Filter(condition, relation: CatalogRelation) + if DDLUtils.isHiveTable(relation.tableMeta) && relation.isPartitioned => + val predicates = splitConjunctivePredicates(condition) + val normalizedFilters = predicates.map { e => +e transform { + case a: AttributeReference => +a.withName(relation.output.find(_.semanticEquals(a)).get.name) +} + } + val partitionSet = AttributeSet(relation.partitionCols) + val pruningPredicates = normalizedFilters.filter { predicate => +!predicate.references.isEmpty && + predicate.references.subsetOf(partitionSet) + } + if (pruningPredicates.nonEmpty && session.sessionState.conf.fallBackToHdfsForStatsEnabled && +session.sessionState.conf.metastorePartitionPruning) { +val prunedPartitions = session.sharedState.externalCatalog.listPartitionsByFilter( + relation.tableMeta.database, + relation.tableMeta.identifier.table, + pruningPredicates, + session.sessionState.conf.sessionLocalTimeZone) +val hiveTable = HiveClientImpl.toHiveTable(relation.tableMeta) +val partitions = prunedPartitions.map(HiveClientImpl.toHivePartition(_, hiveTable)) +val sizeInBytes = try { --- End diff -- If we already have partition level statistics, But we cannot know total number of partition, so it cannot compute the statistics for pruned partitions. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18193: [SPARK-15616] [SQL] CatalogRelation should fallba...
Github user lianhuiwang commented on a diff in the pull request: https://github.com/apache/spark/pull/18193#discussion_r121982742 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala --- @@ -139,6 +138,54 @@ class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] { } } +case class DeterminePartitionedTableStats( +session: SparkSession) extends Rule[LogicalPlan] with PredicateHelper { + override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown { +case filter @ Filter(condition, relation: CatalogRelation) + if DDLUtils.isHiveTable(relation.tableMeta) && relation.isPartitioned => --- End diff -- I think that PruneFileSourcePartitions can be for data source table now. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18193: [SPARK-15616] [SQL] CatalogRelation should fallba...
Github user lianhuiwang commented on a diff in the pull request: https://github.com/apache/spark/pull/18193#discussion_r121982605 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala --- @@ -139,6 +138,54 @@ class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] { } } +case class DeterminePartitionedTableStats( --- End diff -- Yes, DeterminePartitionedTableStats is kind of a PruneFileSourcePartitions rule for Hive tables. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18205: [SPARK-20986] [SQL] Reset table's statistics after Prune...
Github user lianhuiwang commented on the issue: https://github.com/apache/spark/pull/18205 @cloud-fan Thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18205: [SPARK-20986] [SQL] Reset table's statistics after Prune...
Github user lianhuiwang commented on the issue: https://github.com/apache/spark/pull/18205 @cloud-fan I have addressed your comments. Thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18205: [SPARK-20986] [SQL] Reset table's statistics afte...
Github user lianhuiwang commented on a diff in the pull request: https://github.com/apache/spark/pull/18205#discussion_r121648290 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala --- @@ -66,4 +68,45 @@ class PruneFileSourcePartitionsSuite extends QueryTest with SQLTestUtils with Te } } } + + test("SPARK-20986 Reset table's statistics after PruneFileSourcePartitions rule") { +withTempView("tempTbl") { + withTable("partTbl") { +spark.range(10).selectExpr("id").createOrReplaceTempView("tempTbl") --- End diff -- Yes,Great, Thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18205: [SPARK-20986] [SQL] Reset table's statistics afte...
Github user lianhuiwang commented on a diff in the pull request: https://github.com/apache/spark/pull/18205#discussion_r121648258 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala --- @@ -59,8 +60,11 @@ private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] { val prunedFileIndex = catalogFileIndex.filterPartitions(partitionKeyFilters.toSeq) val prunedFsRelation = fsRelation.copy(location = prunedFileIndex)(sparkSession) -val prunedLogicalRelation = logicalRelation.copy(relation = prunedFsRelation) - +// Change table stats based on the sizeInBytes of pruned files +val withStats = logicalRelation.catalogTable.map(_.copy( + stats = Some(CatalogStatistics(sizeInBytes = BigInt(prunedFileIndex.sizeInBytes) --- End diff -- Yes, Now it replace stats of CatalogTable with new CatalogStatistics() like DetermineTableStats. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18205: [SPARK-20986] [SQL] Reset table's statistics afte...
Github user lianhuiwang commented on a diff in the pull request: https://github.com/apache/spark/pull/18205#discussion_r121563368 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala --- @@ -66,4 +68,41 @@ class PruneFileSourcePartitionsSuite extends QueryTest with SQLTestUtils with Te } } } + + test("SPARK-20986 Reset table's statistics after PruneFileSourcePartitions rule") { +withTempView("tempTbl") { + withTable("partTbl") { +spark.range(10).selectExpr("id").createOrReplaceTempView("tempTbl") +sql("CREATE TABLE partTbl (id INT) PARTITIONED BY (part INT) STORED AS parquet") +for (part <- Seq(1, 2)) { + sql( +s""" + |INSERT OVERWRITE TABLE partTbl PARTITION (part='$part') + |select id from tempTbl +""".stripMargin) +} + +val tableName = "partTbl" +sql(s"ANALYZE TABLE partTbl COMPUTE STATISTICS") +val tableStats = + spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName)).stats +assert(tableStats.isDefined && tableStats.get.sizeInBytes > 0, "tableStats is lost") + +withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> "true") { + val df = sql("SELECT * FROM partTbl where part = 1") + val query = df.queryExecution.analyzed.analyze + val sizes1 = query.collect { +case relation: LogicalRelation => relation.computeStats(conf).sizeInBytes + } + assert(sizes1.size === 1, s"Size wrong for:\n ${df.queryExecution}") + assert(sizes1(0) == tableStats.get.sizeInBytes) + val sizes2 = Optimize.execute(query).collect { +case relation: LogicalRelation => relation.catalogTable.get.stats.get.sizeInBytes --- End diff -- Yes, Thanks. I have update with it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18205: [SPARK-20986] [SQL] Reset table's statistics after Prune...
Github user lianhuiwang commented on the issue: https://github.com/apache/spark/pull/18205 @wzhfy I have addressed your comments. Thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18205: [SPARK-20986] [SQL] Reset table's statistics afte...
Github user lianhuiwang commented on a diff in the pull request: https://github.com/apache/spark/pull/18205#discussion_r121378281 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala --- @@ -66,4 +68,42 @@ class PruneFileSourcePartitionsSuite extends QueryTest with SQLTestUtils with Te } } } + + test("SPARK-20986 Reset table's statistics after PruneFileSourcePartitions rule") { +withTempView("tempTbl") { + withTable("partTbl") { + spark.range(1000).selectExpr("id").createOrReplaceTempView("tempTbl") +sql("CREATE TABLE partTbl (id INT) PARTITIONED BY (part INT) STORED AS parquet") +for (part <- Seq(1, 2, 3)) { + sql( +s""" + |INSERT OVERWRITE TABLE partTbl PARTITION (part='$part') + |select id from tempTbl +""".stripMargin) +} + +val tableName = "partTbl" +sql(s"analyze table partTbl compute STATISTICS") + +val tableStats = + spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName)).stats +assert(tableStats.isDefined && tableStats.get.sizeInBytes > 0, "tableStats is lost") + +withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> "true") { + val df = sql("SELECT * FROM partTbl where part = 1") + val query = df.queryExecution.analyzed.analyze --- End diff -- Because there is SubqueryAlias plan, I think that we need analyze() to eliminate it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18205: [SPARK-20986] [SQL] Reset table's statistics afte...
Github user lianhuiwang commented on a diff in the pull request: https://github.com/apache/spark/pull/18205#discussion_r121377999 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala --- @@ -66,4 +68,42 @@ class PruneFileSourcePartitionsSuite extends QueryTest with SQLTestUtils with Te } } } + + test("SPARK-20986 Reset table's statistics after PruneFileSourcePartitions rule") { +withTempView("tempTbl") { + withTable("partTbl") { + spark.range(1000).selectExpr("id").createOrReplaceTempView("tempTbl") +sql("CREATE TABLE partTbl (id INT) PARTITIONED BY (part INT) STORED AS parquet") +for (part <- Seq(1, 2, 3)) { + sql( +s""" + |INSERT OVERWRITE TABLE partTbl PARTITION (part='$part') + |select id from tempTbl +""".stripMargin) +} + +val tableName = "partTbl" +sql(s"analyze table partTbl compute STATISTICS") --- End diff -- Yes, Thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18205: [SPARK-20986] [SQL] Reset table's statistics afte...
Github user lianhuiwang commented on a diff in the pull request: https://github.com/apache/spark/pull/18205#discussion_r121378021 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala --- @@ -66,4 +68,42 @@ class PruneFileSourcePartitionsSuite extends QueryTest with SQLTestUtils with Te } } } + + test("SPARK-20986 Reset table's statistics after PruneFileSourcePartitions rule") { +withTempView("tempTbl") { + withTable("partTbl") { + spark.range(1000).selectExpr("id").createOrReplaceTempView("tempTbl") +sql("CREATE TABLE partTbl (id INT) PARTITIONED BY (part INT) STORED AS parquet") +for (part <- Seq(1, 2, 3)) { + sql( +s""" + |INSERT OVERWRITE TABLE partTbl PARTITION (part='$part') + |select id from tempTbl +""".stripMargin) +} + +val tableName = "partTbl" +sql(s"analyze table partTbl compute STATISTICS") + +val tableStats = + spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName)).stats +assert(tableStats.isDefined && tableStats.get.sizeInBytes > 0, "tableStats is lost") + +withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> "true") { + val df = sql("SELECT * FROM partTbl where part = 1") + val query = df.queryExecution.analyzed.analyze + val sizes1 = query.collect { +case relation: LogicalRelation => relation.computeStats(conf).sizeInBytes --- End diff -- Yes, Thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18205: [SPARK-20986] [SQL] Reset table's statistics afte...
Github user lianhuiwang commented on a diff in the pull request: https://github.com/apache/spark/pull/18205#discussion_r121378012 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala --- @@ -66,4 +68,42 @@ class PruneFileSourcePartitionsSuite extends QueryTest with SQLTestUtils with Te } } } + + test("SPARK-20986 Reset table's statistics after PruneFileSourcePartitions rule") { +withTempView("tempTbl") { + withTable("partTbl") { + spark.range(1000).selectExpr("id").createOrReplaceTempView("tempTbl") --- End diff -- Yes, Thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18205: [SPARK-20986] [SQL] Reset table's statistics afte...
Github user lianhuiwang commented on a diff in the pull request: https://github.com/apache/spark/pull/18205#discussion_r121275701 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala --- @@ -59,8 +60,10 @@ private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] { val prunedFileIndex = catalogFileIndex.filterPartitions(partitionKeyFilters.toSeq) val prunedFsRelation = fsRelation.copy(location = prunedFileIndex)(sparkSession) -val prunedLogicalRelation = logicalRelation.copy(relation = prunedFsRelation) - +val withStats = logicalRelation.catalogTable.map(_.copy( + stats = Some(CatalogStatistics(sizeInBytes = BigInt(prunedFileIndex.sizeInBytes) --- End diff -- Yes, Thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18205: [SPARK-20986] [SQL] Reset table's statistics afte...
Github user lianhuiwang commented on a diff in the pull request: https://github.com/apache/spark/pull/18205#discussion_r121275665 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala --- @@ -66,4 +67,35 @@ class PruneFileSourcePartitionsSuite extends QueryTest with SQLTestUtils with Te } } } + + test("SPARK-20986 Reset table's statistics after PruneFileSourcePartitions rule") { +withTempView("tempTbl") { + withTable("partTbl") { + spark.range(1000).selectExpr("id").createOrReplaceTempView("tempTbl") +sql("CREATE TABLE partTbl (id INT) PARTITIONED BY (part INT) STORED AS parquet") +for (part <- Seq(1, 2, 3)) { + sql( +s""" + |INSERT OVERWRITE TABLE partTbl PARTITION (part='$part') + |select id from tempTbl +""".stripMargin) +} + +withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> "true") { + val df = sql("SELECT * FROM partTbl where part = 1") + val query = df.queryExecution.analyzed.analyze + val sizes1 = query.collect { +case relation: LogicalRelation => relation.computeStats(conf).sizeInBytes + } + assert(sizes1.size === 1, s"Size wrong for:\n ${df.queryExecution}") + assert(sizes1(0) > 5000, s"expected > 5000 for test table 'src', got: ${sizes1(0)}") + val sizes2 = Optimize.execute(query).collect { +case relation: LogicalRelation => relation.computeStats(conf).sizeInBytes + } + assert(sizes2.size === 1, s"Size wrong for:\n ${df.queryExecution}") --- End diff -- I donot think that it have changed the stats of catalog. after the optimizer, the size in catalog stats is larger than computeStats(conf).sizeInBytes because the partition pruned. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18205: [SPARK-20986] [SQL] Reset table's statistics afte...
Github user lianhuiwang commented on a diff in the pull request: https://github.com/apache/spark/pull/18205#discussion_r121275610 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala --- @@ -66,4 +67,35 @@ class PruneFileSourcePartitionsSuite extends QueryTest with SQLTestUtils with Te } } } + + test("SPARK-20986 Reset table's statistics after PruneFileSourcePartitions rule") { +withTempView("tempTbl") { + withTable("partTbl") { + spark.range(1000).selectExpr("id").createOrReplaceTempView("tempTbl") +sql("CREATE TABLE partTbl (id INT) PARTITIONED BY (part INT) STORED AS parquet") +for (part <- Seq(1, 2, 3)) { + sql( +s""" + |INSERT OVERWRITE TABLE partTbl PARTITION (part='$part') + |select id from tempTbl +""".stripMargin) +} + +withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> "true") { + val df = sql("SELECT * FROM partTbl where part = 1") + val query = df.queryExecution.analyzed.analyze + val sizes1 = query.collect { +case relation: LogicalRelation => relation.computeStats(conf).sizeInBytes --- End diff -- Yes, Thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18205: [SPARK-20986] [SQL] Reset table's statistics afte...
Github user lianhuiwang commented on a diff in the pull request: https://github.com/apache/spark/pull/18205#discussion_r121275118 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala --- @@ -66,4 +67,35 @@ class PruneFileSourcePartitionsSuite extends QueryTest with SQLTestUtils with Te } } } + + test("SPARK-20986 Reset table's statistics after PruneFileSourcePartitions rule") { +withTempView("tempTbl") { + withTable("partTbl") { + spark.range(1000).selectExpr("id").createOrReplaceTempView("tempTbl") +sql("CREATE TABLE partTbl (id INT) PARTITIONED BY (part INT) STORED AS parquet") +for (part <- Seq(1, 2, 3)) { + sql( +s""" + |INSERT OVERWRITE TABLE partTbl PARTITION (part='$part') + |select id from tempTbl +""".stripMargin) +} + +withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> "true") { + val df = sql("SELECT * FROM partTbl where part = 1") + val query = df.queryExecution.analyzed.analyze + val sizes1 = query.collect { +case relation: LogicalRelation => relation.computeStats(conf).sizeInBytes --- End diff -- I do not think so.CatalogTable's stats is not defined when table has no statistics. From https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala#L50, The statistics of LogicalRelation will use the sizeInBytes of BaseRelation. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18205: [SPARK-20986] [SQL] Reset table's statistics afte...
Github user lianhuiwang commented on a diff in the pull request: https://github.com/apache/spark/pull/18205#discussion_r120244237 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala --- @@ -66,4 +67,33 @@ class PruneFileSourcePartitionsSuite extends QueryTest with SQLTestUtils with Te } } } + + test("SPARK-20986 Reset table's statistics after PruneFileSourcePartitions rule") { +withTempView("tempTbl", "partTbl") { + spark.range(1000).selectExpr("id").createOrReplaceTempView("tempTbl") + sql("CREATE TABLE partTbl (id INT) PARTITIONED BY (part INT) STORED AS parquet") --- End diff -- Yes, thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18205: [SPARK-20986] [SQL] Reset table's statistics afte...
GitHub user lianhuiwang opened a pull request: https://github.com/apache/spark/pull/18205 [SPARK-20986] [SQL] Reset table's statistics after PruneFileSourcePartitions rule. ## What changes were proposed in this pull request? After PruneFileSourcePartitions rule, It needs reset table's statistics because PruneFileSourcePartitions can filter some unnecessary partitions. So the statistics need to be changed. ## How was this patch tested? add unit test. You can merge this pull request into a Git repository by running: $ git pull https://github.com/lianhuiwang/spark SPARK-20986 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/18205.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #18205 commit 20a6043643c073a128c8fa70bf2a8602b1c1537e Author: lianhuiwang <lianhuiwan...@gmail.com> Date: 2017-06-05T15:09:38Z init commit --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18193: [SPARK-15616] [SQL] CatalogRelation should fallba...
Github user lianhuiwang commented on a diff in the pull request: https://github.com/apache/spark/pull/18193#discussion_r120095474 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala --- @@ -59,7 +60,10 @@ private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] { val prunedFileIndex = catalogFileIndex.filterPartitions(partitionKeyFilters.toSeq) val prunedFsRelation = fsRelation.copy(location = prunedFileIndex)(sparkSession) -val prunedLogicalRelation = logicalRelation.copy(relation = prunedFsRelation) +val withStats = logicalRelation.catalogTable.map(_.copy( + stats = Some(CatalogStatistics(sizeInBytes = BigInt(prunedFileIndex.sizeInBytes) --- End diff -- yes, i have created SPARK-20986. thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18193: [SPARK-15616] [SQL] CatalogRelation should fallba...
Github user lianhuiwang commented on a diff in the pull request: https://github.com/apache/spark/pull/18193#discussion_r120092962 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala --- @@ -88,6 +89,20 @@ class HiveSessionStateBuilder(session: SparkSession, parentState: Option[Session } /** + * Logical query plan optimizer that takes into account Hive. --- End diff -- yes, thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #13373: [SPARK-15616] [SQL] Metastore relation should fallback t...
Github user lianhuiwang commented on the issue: https://github.com/apache/spark/pull/13373 @HyukjinKwon @cloud-fan I will close this PR and create new PR #18193 for it. Thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13373: [SPARK-15616] [SQL] Metastore relation should fal...
Github user lianhuiwang closed the pull request at: https://github.com/apache/spark/pull/13373 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18193: [SPARK-15616] [SQL] Metastore relation should fal...
GitHub user lianhuiwang opened a pull request: https://github.com/apache/spark/pull/18193 [SPARK-15616] [SQL] Metastore relation should fallback to HDFS size of partitions that are involved in Query for JoinSelection. ## What changes were proposed in this pull request? Currently if some partitions of a partitioned table are used in join operation we rely on Metastore returned size of table to calculate if we can convert the operation to Broadcast join. if Filter can prune some partitions, Hive can prune partition before determining to use broadcast joins according to HDFS size of partitions that are involved in Query.So sparkSQL needs it that can improve join's performance for partitioned table. ## How was this patch tested? add unit tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/lianhuiwang/spark partition_broadcast_2 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/18193.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #18193 commit 5591a1c7623e4c668b257c940b5b2663f67acf51 Author: lianhuiwang <lianhuiwan...@gmail.com> Date: 2017-06-04T07:02:23Z init commit --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #13373: [SPARK-15616] [SQL] Metastore relation should fallback t...
Github user lianhuiwang commented on the issue: https://github.com/apache/spark/pull/13373 @cloud-fan I do not think that PruneFileSourcePartitions rule is for Hive's CatalogRelation. example in this PR with master branch cannot get expected result. So i will update it with the latest code. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #13706: [SPARK-15988] [SQL] Implement DDL commands: Create/Drop ...
Github user lianhuiwang commented on the issue: https://github.com/apache/spark/pull/13706 @gatorsmile OK. Thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13706: [SPARK-15988] [SQL] Implement DDL commands: Creat...
Github user lianhuiwang commented on a diff in the pull request: https://github.com/apache/spark/pull/13706#discussion_r119124256 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/macros.scala --- @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command + +import org.apache.spark.sql.{AnalysisException, Row, SparkSession} +import org.apache.spark.sql.catalyst.analysis._ +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.types.StructField + +/** + * This class provides arguments and body expression of the macro function. + */ +case class MacroFunctionWrapper(columns: Seq[StructField], macroFunction: Expression) + +/** + * The DDL command that creates a macro. + * To create a temporary macro, the syntax of using this command in SQL is: + * {{{ + *CREATE TEMPORARY MACRO macro_name([col_name col_type, ...]) expression; + * }}} + */ +case class CreateMacroCommand( +macroName: String, +funcWrapper: MacroFunctionWrapper) + extends RunnableCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { +val catalog = sparkSession.sessionState.catalog +val columns = funcWrapper.columns.map { col => + AttributeReference(col.name, col.dataType, col.nullable, col.metadata)() } +val colToIndex: Map[String, Int] = columns.map(_.name).zipWithIndex.toMap +if (colToIndex.size != columns.size) { + throw new AnalysisException(s"Cannot support duplicate colNames " + +s"for CREATE TEMPORARY MACRO $macroName, actual columns: ${columns.mkString(",")}") +} +val macroFunction = funcWrapper.macroFunction.transform { + case u: UnresolvedAttribute => +val index = colToIndex.get(u.name).getOrElse( + throw new AnalysisException(s"Cannot find colName: ${u} " + +s"for CREATE TEMPORARY MACRO $macroName, actual columns: ${columns.mkString(",")}")) +BoundReference(index, columns(index).dataType, columns(index).nullable) + case u: UnresolvedFunction => +sparkSession.sessionState.catalog.lookupFunction(u.name, u.children) + case s: SubqueryExpression => +throw new AnalysisException(s"Cannot support Subquery: ${s} " + + s"for CREATE TEMPORARY MACRO $macroName") + case u: UnresolvedGenerator => +throw new AnalysisException(s"Cannot support Generator: ${u} " + + s"for CREATE TEMPORARY MACRO $macroName") +} + +val macroInfo = columns.mkString(",") + " -> " + funcWrapper.macroFunction.toString +val info = new ExpressionInfo(macroInfo, macroName, true) +val builder = (children: Seq[Expression]) => { + if (children.size != columns.size) { +throw new AnalysisException(s"Actual number of columns: ${children.size} != " + + s"expected number of columns: ${columns.size} for Macro $macroName") + } + macroFunction.transform { +// Skip to validate the input type because check it at runtime. --- End diff -- Yes, Now i update it with you ideas. Thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13706: [SPARK-15988] [SQL] Implement DDL commands: Creat...
Github user lianhuiwang commented on a diff in the pull request: https://github.com/apache/spark/pull/13706#discussion_r119124106 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala --- @@ -52,3 +52,6 @@ class NoSuchPartitionsException(db: String, table: String, specs: Seq[TableParti class NoSuchTempFunctionException(func: String) extends AnalysisException(s"Temporary function '$func' not found") + +class NoSuchTempMacroException(func: String) --- End diff -- Yes, Thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13706: [SPARK-15988] [SQL] Implement DDL commands: Creat...
Github user lianhuiwang commented on a diff in the pull request: https://github.com/apache/spark/pull/13706#discussion_r119124043 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala --- @@ -1090,6 +1090,24 @@ class SessionCatalog( } } + /** Create a temporary macro. */ + def createTempMacro( + name: String, + info: ExpressionInfo, + functionBuilder: FunctionBuilder): Unit = { +if (functionRegistry.functionExists(name)) { + throw new AnalysisException(s"Function $name already exists") +} +functionRegistry.registerFunction(name, info, functionBuilder) + } + + /** Drop a temporary macro. */ + def dropTempMacro(name: String, ignoreIfNotExists: Boolean): Unit = { +if (!functionRegistry.dropMacro(name) && !ignoreIfNotExists) { + throw new NoSuchTempMacroException(name) --- End diff -- Yes, I have update it with this case. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13706: [SPARK-15988] [SQL] Implement DDL commands: Creat...
Github user lianhuiwang commented on a diff in the pull request: https://github.com/apache/spark/pull/13706#discussion_r119123747 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala --- @@ -107,6 +110,14 @@ class SimpleFunctionRegistry extends FunctionRegistry { functionBuilders.remove(name).isDefined } + override def dropMacro(name: String): Boolean = synchronized { --- End diff -- Hive can drop temporary function using command 'drop Macro'. And it also can drop temporary macro using command 'drop temporary function'. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13706: [SPARK-15988] [SQL] Implement DDL commands: Creat...
Github user lianhuiwang commented on a diff in the pull request: https://github.com/apache/spark/pull/13706#discussion_r119122834 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/macros.scala --- @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command + +import org.apache.spark.sql.{AnalysisException, Row, SparkSession} +import org.apache.spark.sql.catalyst.analysis._ +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.types.StructField + +/** + * This class provides arguments and body expression of the macro function. + */ +case class MacroFunctionWrapper(columns: Seq[StructField], macroFunction: Expression) + +/** + * The DDL command that creates a macro. + * To create a temporary macro, the syntax of using this command in SQL is: + * {{{ + *CREATE TEMPORARY MACRO macro_name([col_name col_type, ...]) expression; + * }}} + */ +case class CreateMacroCommand( +macroName: String, +funcWrapper: MacroFunctionWrapper) + extends RunnableCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { +val catalog = sparkSession.sessionState.catalog +val columns = funcWrapper.columns.map { col => --- End diff -- yes, i will do it, thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13706: [SPARK-15988] [SQL] Implement DDL commands: Creat...
Github user lianhuiwang commented on a diff in the pull request: https://github.com/apache/spark/pull/13706#discussion_r119122863 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/macros.scala --- @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command + +import org.apache.spark.sql.{AnalysisException, Row, SparkSession} +import org.apache.spark.sql.catalyst.analysis._ +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.types.StructField + +/** + * This class provides arguments and body expression of the macro function. + */ +case class MacroFunctionWrapper(columns: Seq[StructField], macroFunction: Expression) + +/** + * The DDL command that creates a macro. + * To create a temporary macro, the syntax of using this command in SQL is: + * {{{ + *CREATE TEMPORARY MACRO macro_name([col_name col_type, ...]) expression; + * }}} + */ +case class CreateMacroCommand( +macroName: String, +funcWrapper: MacroFunctionWrapper) + extends RunnableCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { +val catalog = sparkSession.sessionState.catalog +val columns = funcWrapper.columns.map { col => + AttributeReference(col.name, col.dataType, col.nullable, col.metadata)() } +val colToIndex: Map[String, Int] = columns.map(_.name).zipWithIndex.toMap +if (colToIndex.size != columns.size) { + throw new AnalysisException(s"Cannot support duplicate colNames " + +s"for CREATE TEMPORARY MACRO $macroName, actual columns: ${columns.mkString(",")}") +} +val macroFunction = funcWrapper.macroFunction.transform { + case u: UnresolvedAttribute => +val index = colToIndex.get(u.name).getOrElse( --- End diff -- yes, i will do it, thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13706: [SPARK-15988] [SQL] Implement DDL commands: Creat...
Github user lianhuiwang commented on a diff in the pull request: https://github.com/apache/spark/pull/13706#discussion_r119122622 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/macros.scala --- @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command + +import org.apache.spark.sql.{AnalysisException, Row, SparkSession} +import org.apache.spark.sql.catalyst.analysis._ +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.types.StructField + +/** + * This class provides arguments and body expression of the macro function. + */ +case class MacroFunctionWrapper(columns: Seq[StructField], macroFunction: Expression) --- End diff -- because Analyzer will check macroFunction that is invalid if I donot use MacroFunctionWrapper. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #13706: [SPARK-15988] [SQL] Implement DDL commands: Create/Drop ...
Github user lianhuiwang commented on the issue: https://github.com/apache/spark/pull/13706 @gatorsmile sorry for reply lately. Now i have merged with master. Thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #13706: [SPARK-15988] [SQL] Implement DDL commands: Create/Drop ...
Github user lianhuiwang commented on the issue: https://github.com/apache/spark/pull/13706 @hvanhovell I have updated this PR. Can you take a look? Thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #13706: [SPARK-15988] [SQL] Implement DDL commands: Create/Drop ...
Github user lianhuiwang commented on the issue: https://github.com/apache/spark/pull/13706 @hvanhovell Yes, I will update later. Thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #13979: [SPARK-SPARK-16302] [SQL] Set the right number of partit...
Github user lianhuiwang commented on the issue: https://github.com/apache/spark/pull/13979 @JoshRosen Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14111: [SPARK-16456][SQL] Reuse the uncorrelated scalar ...
Github user lianhuiwang closed the pull request at: https://github.com/apache/spark/pull/14111 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14111: [SPARK-16456][SQL] Reuse the uncorrelated scalar subquer...
Github user lianhuiwang commented on the issue: https://github.com/apache/spark/pull/14111 OK. Thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14363: [SPARK-16731][SQL] use StructType in CatalogTable and re...
Github user lianhuiwang commented on the issue: https://github.com/apache/spark/pull/14363 @cloud-fan There is a case that i met. The varchar(length)/char(length) type is not a String Type. But now SparkSQL consider them a string type. So there are different result with the following example: TestHive.sessionState.metadataHive.runSqlHive("CREATE TABLE test (id varchar(50))") TestHive.sessionState.metadataHive.runSqlHive("INSERT INTO TABLE test VALUES ('abcdef')") TestHive.sessionState.metadataHive.runSqlHive("CREATE TABLE test_parquet (id varchar(2) stored as parquet)") TestHive.sessionState.metadataHive.runSqlHive("insert overwrite table varchar_parquet1 select * from test") the result of varchar_parquet1 are 'ab'. spark.sql("insert overwrite table varchar_parquet1 select * from test").show() the result of varchar_parquet1 are 'abcdef'. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14111: [SPARK-16456][SQL] Reuse the uncorrelated scalar subquer...
Github user lianhuiwang commented on the issue: https://github.com/apache/spark/pull/14111 @cloud-fan I don't think it is a bug of constraints propagation because filter with the uncorrelated scalar subquery needs to push down due to it can filter many records. In addition, like query: ``` with avg_table as (select avg (key) as avg_key from t3) select 1 as col from t3 where key > (select avg_key from avg_table) union all select 1 as col from t1 join (select key, value from t2 where key > (select avg_key from avg_table))t on (t1.key = t.key) ``` When BroadcastExchangeExec has one same subquery that also appears in other places of this query, The first place will at firstly prepare subquery, But the second place in BroadcastExchangeExec will firstly wait for subquery because in BroadcastExchangeExec.doPrepare it will execute child plan. So BroadcastExchangeExec's child plan needs to wait for the subquery Results that the first place has been submitted. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #13373: [SPARK-15616] [SQL] Metastore relation should fallback t...
Github user lianhuiwang commented on the issue: https://github.com/apache/spark/pull/13373 cc @cloud-fan @rxin @hvanhovell --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14285: [SPARK-16649][SQL] Push partition predicates down into m...
Github user lianhuiwang commented on the issue: https://github.com/apache/spark/pull/14285 cc @cloud-fan @rxin @hvanhovell --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14285: [SPARK-16649][SQL] Push partition predicates down...
GitHub user lianhuiwang opened a pull request: https://github.com/apache/spark/pull/14285 [SPARK-16649][SQL] Push partition predicates down into metastore for OptimizeMetadataOnlyQuery ## What changes were proposed in this pull request? SPARK-6910 has supported for pushing partition predicates down into the metastore for table scan. So it also should push partition predicates down into metastore for OptimizeMetadataOnlyQuery. ## How was this patch tested? add unit tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/lianhuiwang/spark metadata-only-filter Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/14285.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #14285 commit 4b851056c6e04165683e8ea6370b3e404a688bf2 Author: Lianhui Wang <lianhuiwan...@gmail.com> Date: 2016-07-20T13:02:11Z init commit commit 0773b8219010d4c745b5adde79dedad41ba9a6aa Author: Lianhui Wang <lianhuiwan...@gmail.com> Date: 2016-07-20T13:13:08Z update --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14154: [SPARK-16497][SQL] Don't throw an exception if drop non-...
Github user lianhuiwang commented on the issue: https://github.com/apache/spark/pull/14154 OK, I close it. Thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14154: [SPARK-16497][SQL] Don't throw an exception if dr...
Github user lianhuiwang closed the pull request at: https://github.com/apache/spark/pull/14154 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14036: [SPARK-16323] [SQL] Add IntegerDivide to avoid un...
Github user lianhuiwang commented on a diff in the pull request: https://github.com/apache/spark/pull/14036#discussion_r70746279 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala --- @@ -234,6 +234,7 @@ object FunctionRegistry { expression[Subtract]("-"), expression[Multiply]("*"), expression[Divide]("/"), +expression[IntegerDivide]("div"), --- End diff -- 'select 4 div 2' is the right code. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14036: [SPARK-16323] [SQL] Add IntegerDivide to avoid un...
Github user lianhuiwang commented on a diff in the pull request: https://github.com/apache/spark/pull/14036#discussion_r70660136 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala --- @@ -207,20 +207,12 @@ case class Multiply(left: Expression, right: Expression) protected override def nullSafeEval(input1: Any, input2: Any): Any = numeric.times(input1, input2) } -@ExpressionDescription( - usage = "a _FUNC_ b - Divides a by b.", - extended = "> SELECT 3 _FUNC_ 2;\n 1.5") -case class Divide(left: Expression, right: Expression) -extends BinaryArithmetic with NullIntolerant { - - override def inputType: AbstractDataType = TypeCollection(DoubleType, DecimalType) - - override def symbol: String = "/" - override def decimalMethod: String = "$div" +abstract class DivideBase extends BinaryArithmetic with NullIntolerant { override def nullable: Boolean = true private lazy val div: (Any, Any) => Any = dataType match { case ft: FractionalType => ft.fractional.asInstanceOf[Fractional[Any]].div +case i: IntegralType => i.integral.asInstanceOf[Integral[Any]].quot --- End diff -- @cloud-fan how about make this line in IntegralDivide that can be more readable? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14036: [SPARK-16323] [SQL] Add IntegerDivide to avoid un...
Github user lianhuiwang commented on a diff in the pull request: https://github.com/apache/spark/pull/14036#discussion_r70660445 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala --- @@ -207,20 +207,12 @@ case class Multiply(left: Expression, right: Expression) protected override def nullSafeEval(input1: Any, input2: Any): Any = numeric.times(input1, input2) } -@ExpressionDescription( - usage = "a _FUNC_ b - Divides a by b.", - extended = "> SELECT 3 _FUNC_ 2;\n 1.5") -case class Divide(left: Expression, right: Expression) -extends BinaryArithmetic with NullIntolerant { - - override def inputType: AbstractDataType = TypeCollection(DoubleType, DecimalType) - - override def symbol: String = "/" --- End diff -- we need to keep symbol because it is very useful for BinaryOperator' sqlOperator() and sql() --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14036: [SPARK-16323] [SQL] Add IntegerDivide to avoid un...
Github user lianhuiwang commented on a diff in the pull request: https://github.com/apache/spark/pull/14036#discussion_r70659855 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala --- @@ -957,7 +957,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging { case SqlBaseParser.PERCENT => Remainder(left, right) case SqlBaseParser.DIV => -Cast(Divide(left, right), LongType) +IntegralDivide(left, right) --- End diff -- That's ok because I find SparkSQL has SqlBaseParser.SLASH for '/' . --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14036: [SPARK-16323] [SQL] Add IntegerDivide to avoid un...
Github user lianhuiwang commented on a diff in the pull request: https://github.com/apache/spark/pull/14036#discussion_r70656424 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala --- @@ -957,7 +957,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging { case SqlBaseParser.PERCENT => Remainder(left, right) case SqlBaseParser.DIV => -Cast(Divide(left, right), LongType) +IntegralDivide(left, right) --- End diff -- I think we needs to add SqlBaseParser.DIVIDE for '/'. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14036: [SPARK-16323] [SQL] Add IntegerDivide to avoid un...
Github user lianhuiwang commented on a diff in the pull request: https://github.com/apache/spark/pull/14036#discussion_r70628892 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala --- @@ -234,6 +234,7 @@ object FunctionRegistry { expression[Subtract]("-"), expression[Multiply]("*"), expression[Divide]("/"), +expression[IntegerDivide]("div"), --- End diff -- @cloud-fan yes, hive support div and / . --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14111: [SPARK-16456][SQL] Reuse the uncorrelated scalar subquer...
Github user lianhuiwang commented on the issue: https://github.com/apache/spark/pull/14111 @cloud-fan At firstly I have implemented it with you said. But the following situation that has broadcast join will have a error 'ScalarSubquery has not finished', example (from SPARK-14791): val df = (1 to 3).map(i => (i, i)).toDF("key", "value") df.createOrReplaceTempView("t1") df.createOrReplaceTempView("t2") df.createOrReplaceTempView("t3") val q = sql("select * from t1 join (select key, value from t2 " + " where key > (select avg (key) from t3))t on (t1.key = t.key)") Before: ''' *BroadcastHashJoin [key#5], [key#26], Inner, BuildRight :- *Project [_1#2 AS key#5, _2#3 AS value#6] : +- *Filter (cast(_1#2 as double) > subquery#13) : : +- Subquery subquery#13 : : +- *HashAggregate(keys=[], functions=[avg(cast(key#5 as bigint))], output=[avg(key)#25]) : :+- Exchange SinglePartition : : +- *HashAggregate(keys=[], functions=[partial_avg(cast(key#5 as bigint))], output=[sum#30, count#31L]) : : +- LocalTableScan [key#5] : +- LocalTableScan [_1#2, _2#3] +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))) +- *Project [_1#2 AS key#26, _2#3 AS value#27] +- *Filter (cast(_1#2 as double) > subquery#13) : +- Subquery subquery#13 : +- *HashAggregate(keys=[], functions=[avg(cast(key#5 as bigint))], output=[avg(key)#25]) :+- Exchange SinglePartition : +- *HashAggregate(keys=[], functions=[partial_avg(cast(key#5 as bigint))], output=[sum#30, count#31L]) : +- LocalTableScan [key#5] +- LocalTableScan [_1#2, _2#3] ''' The steps are as follows: 1. BroadcastHashJoin.prepare() 2. t1.Filter.prepareSubqueries, it will prepare subquery. 3. BroadcastExchange.prepare() 4. t2.Filter.prepareSubqueries, it will prepare subquery. 5. BroadcastExchange.doPrepare() 6. t2.Filter.execute() 7. t2.Filter.waitForSubqueries(), it will wait for subquery. 8. BroadcastHashJoin.doExecute() 9. BroadcastExchange.executeBroadcast() 10. t1.Filter.execute() 11. t1.Filter.waitForSubqueries(), it will wait for subquery. because before that there are two different subqueries, they cannot wait for other's results. But after this PR, they are the same subquery, the steps are as follows: 1. t1.Filter.prepareSubqueries, it will prepare subquery. 2. t2.Filter.prepareSubqueries, it will do not submit subquery's execute(). 3. t2.Filter.waitForSubqueries(), it will can wait for subquery that step-1 have submitted before. 4. t1.Filter.waitForSubqueries(), it do not await subquery's results because step-3 have updated. So I make some logical codes to ScalarSubquery in order to deal with it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14111: [SPARK-16456][SQL] Reuse the uncorrelated scalar ...
Github user lianhuiwang commented on a diff in the pull request: https://github.com/apache/spark/pull/14111#discussion_r70563183 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala --- @@ -263,7 +263,9 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT * All the subqueries of current plan. */ def subqueries: Seq[PlanType] = { -expressions.flatMap(_.collect {case e: SubqueryExpression => e.plan.asInstanceOf[PlanType]}) +expressions.flatMap(_.collect { + case e: SubqueryExpression => e --- End diff -- because we use ReusedScalarSubquery, not Alias to indicate the reused SubqueryExpression, I think we don't use ExpressionCanonicalizer. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14111: [SPARK-16456][SQL] Reuse the uncorrelated scalar subquer...
Github user lianhuiwang commented on the issue: https://github.com/apache/spark/pull/14111 cc @rxin @hvanhovell @cloud-fan --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #13494: [SPARK-15752] [SQL] Optimize metadata only query that ha...
Github user lianhuiwang commented on the issue: https://github.com/apache/spark/pull/13494 Thank you for review and merging. @rxin @hvanhovell @cloud-fan . --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #11293: [SPARK-13080] [SQL] Implement new Catalog API usi...
Github user lianhuiwang commented on a diff in the pull request: https://github.com/apache/spark/pull/11293#discussion_r70475334 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala --- @@ -127,33 +166,30 @@ abstract class Catalog { * @param name name of the function * @param className fully qualified class name, e.g. "org.apache.spark.util.MyFunc" */ -case class Function( - name: String, - className: String -) +case class CatalogFunction(name: String, className: String) /** * Storage format, used to describe how a partition or a table is stored. */ -case class StorageFormat( - locationUri: String, - inputFormat: String, - outputFormat: String, - serde: String, - serdeProperties: Map[String, String] -) +case class CatalogStorageFormat( +locationUri: Option[String], +inputFormat: Option[String], +outputFormat: Option[String], +serde: Option[String], +serdeProperties: Map[String, String]) /** * A column in a table. */ -case class Column( - name: String, - dataType: String, - nullable: Boolean, - comment: String -) +case class CatalogColumn( +name: String, +// This may be null when used to create views. TODO: make this type-safe; this is left +// as a string due to issues in converting Hive varchars to and from SparkSQL strings. --- End diff -- I find that max length of Hive varchars is 65535. Mysql also support varchars. Maybe it is issue? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14154: [SPARK-16497][SQL] Don't throw an exception if drop non-...
Github user lianhuiwang commented on the issue: https://github.com/apache/spark/pull/14154 @hvanhovell I cannot find why Hive support it from https://issues.apache.org/jira/browse/HIVE-1856. But now many Spark users have used Hive before, So that make some previous these queries failed. How about set spark.sql.drop.ignorenonexistent=false? If you think this PR is unnecessary, I will close it. thank. btw: I find that presto/impala/mysql will throw a exception for it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14154: [SPARK-16497][SQL] Don't throw an exception if dr...
GitHub user lianhuiwang opened a pull request: https://github.com/apache/spark/pull/14154 [SPARK-16497][SQL] Don't throw an exception if drop non-existent TABLE/VIEW/Function/Partitions ## What changes were proposed in this pull request? from https://cwiki.apache.org/confluence/display/Hive/Configuration+Properties#ConfigurationProperties-hive.exec.drop.ignorenonexistent, Hive use 'hive.exec.drop.ignorenonexistent'(default=true) to do not report an error if DROP TABLE/VIEW/PARTITION/INDEX/TEMPORARY FUNCTION specifies a non-existent table/view. So SparkSQL also should support it. ## How was this patch tested? add unit tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/lianhuiwang/spark drop-ignorenonexist Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/14154.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #14154 commit 25bd4b8fad66784aec7235ddfea5c4dee1450d11 Author: Lianhui Wang <lianhuiwan...@gmail.com> Date: 2016-07-11T14:43:37Z init commit commit 6eb9e8e916cea391aff0d07a85781e77121ce9b9 Author: Lianhui Wang <lianhuiwan...@gmail.com> Date: 2016-07-12T13:39:29Z update --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #13494: [SPARK-15752] [SQL] Optimize metadata only query that ha...
Github user lianhuiwang commented on the issue: https://github.com/apache/spark/pull/13494 @cloud-fan I have addressed your latest comments. thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13494: [SPARK-15752] [SQL] Optimize metadata only query ...
Github user lianhuiwang commented on a diff in the pull request: https://github.com/apache/spark/pull/13494#discussion_r70404513 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuerySuite.scala --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.plans.logical.LocalRelation +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SharedSQLContext + +class OptimizeMetadataOnlyQuerySuite extends QueryTest with SharedSQLContext { + import testImplicits._ + + override def beforeAll(): Unit = { +super.beforeAll() +val data = (1 to 10).map(i => (i, s"data-$i", i % 2, if ((i % 2) == 0) "even" else "odd")) + .toDF("col1", "col2", "partcol1", "partcol2") +data.write.partitionBy("partcol1", "partcol2").mode("append").saveAsTable("srcpart") + } + + override protected def afterAll(): Unit = { +try { + sql("DROP TABLE IF EXISTS srcpart") +} finally { + super.afterAll() +} + } + + private def assertMetadataOnlyQuery(df: DataFrame): Unit = { +val localRelations = df.queryExecution.optimizedPlan.collect { + case l @ LocalRelation(_, _) => l +} +assert(localRelations.size == 1) + } + + private def assertNotMetadataOnlyQuery(df: DataFrame): Unit = { +val localRelations = df.queryExecution.optimizedPlan.collect { + case l @ LocalRelation(_, _) => l +} +assert(localRelations.size == 0) + } + + private def testMetadataOnly(name: String, sqls: String*): Unit = { +test(name) { + withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> "true") { +sqls.foreach { case q => assertMetadataOnlyQuery(sql(q)) } + } + withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> "false") { +sqls.foreach { case q => assertNotMetadataOnlyQuery(sql(q)) } + } +} + } + + private def testNotMetadataOnly(name: String, sqls: String*): Unit = { +test(name) { + withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> "true") { +sqls.foreach { case q => assertNotMetadataOnlyQuery(sql(q)) } + } + withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> "false") { +sqls.foreach { case q => assertNotMetadataOnlyQuery(sql(q)) } + } +} + } + + testMetadataOnly( +"OptimizeMetadataOnlyQuery test: aggregate expression is partition columns", --- End diff -- Sure. thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13494: [SPARK-15752] [SQL] Optimize metadata only query ...
Github user lianhuiwang commented on a diff in the pull request: https://github.com/apache/spark/pull/13494#discussion_r70404475 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala --- @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, SessionCatalog} +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate._ +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.internal.SQLConf + +/** + * This rule optimizes the execution of queries that can be answered by looking only at + * partition-level metadata. This applies when all the columns scanned are partition columns, and + * the query has an aggregate operator that satisfies the following conditions: + * 1. aggregate expression is partition columns. + * e.g. SELECT col FROM tbl GROUP BY col. + * 2. aggregate function on partition columns with DISTINCT. + * e.g. SELECT col1, count(DISTINCT col2) FROM tbl GROUP BY col1. + * 3. aggregate function on partition columns which have same result w or w/o DISTINCT keyword. + * e.g. SELECT col1, Max(col2) FROM tbl GROUP BY col1. + */ +case class OptimizeMetadataOnlyQuery( +catalog: SessionCatalog, +conf: SQLConf) extends Rule[LogicalPlan] { + + def apply(plan: LogicalPlan): LogicalPlan = { +if (!conf.optimizerMetadataOnly) { + return plan +} + +plan.transform { + case a @ Aggregate(_, aggExprs, child @ PartitionedRelation(partAttrs, relation)) => +// We only apply this optimization when only partitioned attributes are scanned. +if (a.references.subsetOf(partAttrs)) { + val aggFunctions = aggExprs.flatMap(_.collect { +case agg: AggregateExpression => agg + }) + val isAllDistinctAgg = aggFunctions.forall { agg => +agg.isDistinct || (agg.aggregateFunction match { + // `Max`, `Min`, `First` and `Last` are always distinct aggregate functions no matter + // they have DISTINCT keyword or not, as the result will be same. + case _: Max => true + case _: Min => true + case _: First => true + case _: Last => true + case _ => false +}) + } + if (isAllDistinctAgg) { + a.withNewChildren(Seq(replaceTableScanWithPartitionMetadata(child, relation))) + } else { +a + } +} else { + a +} +} + } + + /** + * Returns the partition attributes of the table relation plan. + */ + private def getPartitionAttrs( + partitionColumnNames: Seq[String], + relation: LogicalPlan): Seq[Attribute] = { +val partColumns = partitionColumnNames.map(_.toLowerCase).toSet +relation.output.filter(a => partColumns.contains(a.name.toLowerCase)) + } + + /** + * Transform the given plan, find its table scan nodes that matches the given relation, and then + * replace the table scan node with its corresponding partition values. + */ + private def replaceTableScanWithPartitionMetadata( + child: LogicalPlan, + relation: LogicalPlan): LogicalPlan = { +child transform { + case plan if plan eq relation => +relation match { + case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _) => +val partAttrs = getPartitionAttrs(fsRelation.partitionSche
[GitHub] spark issue #13494: [SPARK-15752] [SQL] Optimize metadata only query that ha...
Github user lianhuiwang commented on the issue: https://github.com/apache/spark/pull/13494 @cloud-fan @hvanhovell about getPartitionAttrs() It has a improve place that we can define it in relation node. but now relation node has not this function. how about added in follow-up PRs? Thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13494: [SPARK-15752] [SQL] Optimize metadata only query ...
Github user lianhuiwang commented on a diff in the pull request: https://github.com/apache/spark/pull/13494#discussion_r70370384 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala --- @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, SessionCatalog} +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate._ +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.internal.SQLConf + +/** + * This rule optimizes the execution of queries that can be answered by looking only at + * partition-level metadata. This applies when all the columns scanned are partition columns, and + * the query has an aggregate operator that satisfies the following conditions: + * 1. aggregate expression is partition columns. + * e.g. SELECT col FROM tbl GROUP BY col. + * 2. aggregate function on partition columns with DISTINCT. + * e.g. SELECT col1, count(DISTINCT col2) FROM tbl GROUP BY col1. + * 3. aggregate function on partition columns which have same result w or w/o DISTINCT keyword. + * e.g. SELECT col1, Max(col2) FROM tbl GROUP BY col1. + */ +case class OptimizeMetadataOnlyQuery( +catalog: SessionCatalog, +conf: SQLConf) extends Rule[LogicalPlan] { + + def apply(plan: LogicalPlan): LogicalPlan = { +if (!conf.optimizerMetadataOnly) { + return plan +} + +plan.transform { + case a @ Aggregate(_, aggExprs, child @ PartitionedRelation(partAttrs, relation)) => +// We only apply this optimization when only partitioned attributes are scanned. +if (a.references.subsetOf(partAttrs)) { + val aggFunctions = aggExprs.flatMap(_.collect { +case agg: AggregateExpression => agg + }) + val isAllDistinctAgg = aggFunctions.forall { agg => +agg.isDistinct || (agg.aggregateFunction match { + // `Max`, `Min`, `First` and `Last` are always distinct aggregate functions no matter + // they have DISTINCT keyword or not, as the result will be same. + case _: Max => true + case _: Min => true + case _: First => true + case _: Last => true + case _ => false +}) + } + if (isAllDistinctAgg) { + a.withNewChildren(Seq(replaceTableScanWithPartitionMetadata(child, relation))) + } else { +a + } +} else { + a +} +} + } + + /** + * Transform the given plan, find its table scan nodes that matches the given relation, and then + * replace the table scan node with its corresponding partition values. + */ + private def replaceTableScanWithPartitionMetadata( + child: LogicalPlan, + relation: LogicalPlan): LogicalPlan = { +child transform { + case plan if plan eq relation => +relation match { + case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _) => +val partAttrs = PartitionedRelation.getPartitionAttrs( --- End diff -- @cloud-fan I will define two functions for getPartitionAttrs(). In the future, I think we can put getPartitionAttrs() into relation plan. If i has some problem, please tell me. thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the
[GitHub] spark pull request #13494: [SPARK-15752] [SQL] Optimize metadata only query ...
Github user lianhuiwang commented on a diff in the pull request: https://github.com/apache/spark/pull/13494#discussion_r70369134 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala --- @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, SessionCatalog} +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate._ +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.internal.SQLConf + +/** + * This rule optimizes the execution of queries that can be answered by looking only at + * partition-level metadata. This applies when all the columns scanned are partition columns, and + * the query has an aggregate operator that satisfies the following conditions: + * 1. aggregate expression is partition columns. + * e.g. SELECT col FROM tbl GROUP BY col. + * 2. aggregate function on partition columns with DISTINCT. + * e.g. SELECT col1, count(DISTINCT col2) FROM tbl GROUP BY col1. + * 3. aggregate function on partition columns which have same result w or w/o DISTINCT keyword. + * e.g. SELECT col1, Max(col2) FROM tbl GROUP BY col1. + */ +case class OptimizeMetadataOnlyQuery( +catalog: SessionCatalog, +conf: SQLConf) extends Rule[LogicalPlan] { + + def apply(plan: LogicalPlan): LogicalPlan = { +if (!conf.optimizerMetadataOnly) { + return plan +} + +plan.transform { + case a @ Aggregate(_, aggExprs, child @ PartitionedRelation(partAttrs, relation)) => +// We only apply this optimization when only partitioned attributes are scanned. +if (a.references.subsetOf(partAttrs)) { + val aggFunctions = aggExprs.flatMap(_.collect { +case agg: AggregateExpression => agg + }) + val isAllDistinctAgg = aggFunctions.forall { agg => +agg.isDistinct || (agg.aggregateFunction match { + // `Max`, `Min`, `First` and `Last` are always distinct aggregate functions no matter + // they have DISTINCT keyword or not, as the result will be same. + case _: Max => true + case _: Min => true + case _: First => true + case _: Last => true + case _ => false +}) + } + if (isAllDistinctAgg) { + a.withNewChildren(Seq(replaceTableScanWithPartitionMetadata(child, relation))) + } else { +a + } +} else { + a +} +} + } + + /** + * Transform the given plan, find its table scan nodes that matches the given relation, and then + * replace the table scan node with its corresponding partition values. + */ + private def replaceTableScanWithPartitionMetadata( + child: LogicalPlan, + relation: LogicalPlan): LogicalPlan = { +child transform { + case plan if plan eq relation => +relation match { + case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _) => +val partAttrs = PartitionedRelation.getPartitionAttrs( + fsRelation.partitionSchema.map(_.name), l) +val partitionData = fsRelation.location.listFiles(filters = Nil) +LocalRelation(partAttrs, partitionData.map(_.values)) + + case relation: CatalogRelation => +val partAttrs = PartitionedRelation.getPartitionAttrs( + relation.catalogTable.partitionColu
[GitHub] spark pull request #13494: [SPARK-15752] [SQL] Optimize metadata only query ...
Github user lianhuiwang commented on a diff in the pull request: https://github.com/apache/spark/pull/13494#discussion_r70368905 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala --- @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, SessionCatalog} +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate._ +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.internal.SQLConf + +/** + * This rule optimizes the execution of queries that can be answered by looking only at + * partition-level metadata. This applies when all the columns scanned are partition columns, and + * the query has an aggregate operator that satisfies the following conditions: + * 1. aggregate expression is partition columns. + * e.g. SELECT col FROM tbl GROUP BY col. + * 2. aggregate function on partition columns with DISTINCT. + * e.g. SELECT col1, count(DISTINCT col2) FROM tbl GROUP BY col1. + * 3. aggregate function on partition columns which have same result w or w/o DISTINCT keyword. + * e.g. SELECT col1, Max(col2) FROM tbl GROUP BY col1. + */ +case class OptimizeMetadataOnlyQuery( +catalog: SessionCatalog, +conf: SQLConf) extends Rule[LogicalPlan] { + + def apply(plan: LogicalPlan): LogicalPlan = { +if (!conf.optimizerMetadataOnly) { + return plan +} + +plan.transform { + case a @ Aggregate(_, aggExprs, child @ PartitionedRelation(partAttrs, relation)) => +// We only apply this optimization when only partitioned attributes are scanned. +if (a.references.subsetOf(partAttrs)) { + val aggFunctions = aggExprs.flatMap(_.collect { +case agg: AggregateExpression => agg + }) + val isAllDistinctAgg = aggFunctions.forall { agg => +agg.isDistinct || (agg.aggregateFunction match { + // `Max`, `Min`, `First` and `Last` are always distinct aggregate functions no matter + // they have DISTINCT keyword or not, as the result will be same. + case _: Max => true + case _: Min => true + case _: First => true + case _: Last => true + case _ => false +}) + } + if (isAllDistinctAgg) { + a.withNewChildren(Seq(replaceTableScanWithPartitionMetadata(child, relation))) + } else { +a + } +} else { + a +} +} + } + + /** + * Transform the given plan, find its table scan nodes that matches the given relation, and then + * replace the table scan node with its corresponding partition values. + */ + private def replaceTableScanWithPartitionMetadata( + child: LogicalPlan, + relation: LogicalPlan): LogicalPlan = { +child transform { + case plan if plan eq relation => +relation match { + case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _) => +val partAttrs = PartitionedRelation.getPartitionAttrs( + fsRelation.partitionSchema.map(_.name), l) +val partitionData = fsRelation.location.listFiles(filters = Nil) +LocalRelation(partAttrs, partitionData.map(_.values)) + + case relation: CatalogRelation => +val partAttrs = PartitionedRelation.getPartitionAttrs( + relation.catalogTable.partitionColu
[GitHub] spark pull request #13494: [SPARK-15752] [SQL] Optimize metadata only query ...
Github user lianhuiwang commented on a diff in the pull request: https://github.com/apache/spark/pull/13494#discussion_r70368861 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala --- @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, SessionCatalog} +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate._ +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.internal.SQLConf + +/** + * This rule optimizes the execution of queries that can be answered by looking only at + * partition-level metadata. This applies when all the columns scanned are partition columns, and + * the query has an aggregate operator that satisfies the following conditions: + * 1. aggregate expression is partition columns. + * e.g. SELECT col FROM tbl GROUP BY col. + * 2. aggregate function on partition columns with DISTINCT. + * e.g. SELECT col1, count(DISTINCT col2) FROM tbl GROUP BY col1. + * 3. aggregate function on partition columns which have same result w or w/o DISTINCT keyword. + * e.g. SELECT col1, Max(col2) FROM tbl GROUP BY col1. + */ +case class OptimizeMetadataOnlyQuery( +catalog: SessionCatalog, +conf: SQLConf) extends Rule[LogicalPlan] { + + def apply(plan: LogicalPlan): LogicalPlan = { +if (!conf.optimizerMetadataOnly) { + return plan +} + +plan.transform { + case a @ Aggregate(_, aggExprs, child @ PartitionedRelation(partAttrs, relation)) => +// We only apply this optimization when only partitioned attributes are scanned. +if (a.references.subsetOf(partAttrs)) { + val aggFunctions = aggExprs.flatMap(_.collect { +case agg: AggregateExpression => agg + }) + val isAllDistinctAgg = aggFunctions.forall { agg => +agg.isDistinct || (agg.aggregateFunction match { + // `Max`, `Min`, `First` and `Last` are always distinct aggregate functions no matter + // they have DISTINCT keyword or not, as the result will be same. + case _: Max => true + case _: Min => true + case _: First => true + case _: Last => true + case _ => false +}) + } + if (isAllDistinctAgg) { + a.withNewChildren(Seq(replaceTableScanWithPartitionMetadata(child, relation))) + } else { +a + } +} else { + a +} +} + } + + /** + * Transform the given plan, find its table scan nodes that matches the given relation, and then + * replace the table scan node with its corresponding partition values. + */ + private def replaceTableScanWithPartitionMetadata( + child: LogicalPlan, + relation: LogicalPlan): LogicalPlan = { +child transform { + case plan if plan eq relation => +relation match { + case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _) => +val partAttrs = PartitionedRelation.getPartitionAttrs( --- End diff -- Because object PartitionedRelation also use getPartitionAttrs, Now i just define it in PartitionedRelation. If it define a private method in class OptimizeMetadataOnlyQuery, there are two same getPartitionAttrs() functions in PartitionedRelation and OptimizeMetadataOnlyQuery. Based on it, here use PartitionedRelation.getPartitionAttrs. --- If your project is set up for i
[GitHub] spark issue #13494: [SPARK-15752] [SQL] Optimize metadata only query that ha...
Github user lianhuiwang commented on the issue: https://github.com/apache/spark/pull/13494 @hvanhovell I have addressed some of your comments. Thanks. Could you look at again? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13494: [SPARK-15752] [SQL] Optimize metadata only query ...
Github user lianhuiwang commented on a diff in the pull request: https://github.com/apache/spark/pull/13494#discussion_r70267348 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuerySuite.scala --- @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.plans.logical.LocalRelation +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SharedSQLContext + +class OptimizeMetadataOnlyQuerySuite extends QueryTest with SharedSQLContext { + import testImplicits._ + + override def beforeAll(): Unit = { +super.beforeAll() +val data = (1 to 10).map(i => (i, s"data-$i", i % 2, if ((i % 2) == 0) "even" else "odd")) + .toDF("col1", "col2", "partcol1", "partcol2") +data.write.partitionBy("partcol1", "partcol2").mode("append").saveAsTable("srcpart") + } + + override protected def afterAll(): Unit = { +try { + sql("DROP TABLE IF EXISTS srcpart") +} finally { + super.afterAll() +} + } + + private def assertMetadataOnlyQuery(df: DataFrame): Unit = { +val localRelations = df.queryExecution.optimizedPlan.collect { + case l @ LocalRelation(_, _) => l +} +assert(localRelations.size == 1) + } + + private def assertNotMetadataOnlyQuery(df: DataFrame): Unit = { +val localRelations = df.queryExecution.optimizedPlan.collect { + case l @ LocalRelation(_, _) => l +} +assert(localRelations.size == 0) + } + + test("OptimizeMetadataOnlyQuery test: aggregate expression is partition columns") { --- End diff -- Get it, thanks. I will update it. Thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14132: [SPARK-16475][SQL][WIP] Broadcast Hint for SQL Qu...
Github user lianhuiwang commented on a diff in the pull request: https://github.com/apache/spark/pull/14132#discussion_r70254654 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala --- @@ -339,8 +339,24 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging { case SqlBaseParser.SELECT => // Regular select +// Broadcast hints +var withBroadcastedTable = relation +if (ctx.hint != null) { + val broadcastedTables = + hint.mapJoinHint.broadcastedTables.asScala.map(visitTableIdentifier) + for (table <- broadcastedTables) { +var stop = false +withBroadcastedTable = withBroadcastedTable.transformDown { + case r @ BroadcastHint(UnresolvedRelation(_, _)) => r + case r @ UnresolvedRelation(t, _) if !stop && t == table => --- End diff -- I have saw the code of Hive. I think that the first place on tbl_b will be broadcasted. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #13494: [SPARK-15752] [SQL] Optimize metadata only query that ha...
Github user lianhuiwang commented on the issue: https://github.com/apache/spark/pull/13494 @hvanhovell I have addressed your comments. Thanks. If I missed something, please tell me. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13494: [SPARK-15752] [SQL] Optimize metadata only query ...
Github user lianhuiwang commented on a diff in the pull request: https://github.com/apache/spark/pull/13494#discussion_r70249015 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuerySuite.scala --- @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.plans.logical.LocalRelation +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SharedSQLContext + +class OptimizeMetadataOnlyQuerySuite extends QueryTest with SharedSQLContext { + import testImplicits._ + + override def beforeAll(): Unit = { +super.beforeAll() +val data = (1 to 10).map(i => (i, s"data-$i", i % 2, if ((i % 2) == 0) "even" else "odd")) + .toDF("col1", "col2", "partcol1", "partcol2") +data.write.partitionBy("partcol1", "partcol2").mode("append").saveAsTable("srcpart") + } + + override protected def afterAll(): Unit = { +try { + sql("DROP TABLE IF EXISTS srcpart") +} finally { + super.afterAll() +} + } + + private def assertMetadataOnlyQuery(df: DataFrame): Unit = { +val localRelations = df.queryExecution.optimizedPlan.collect { + case l @ LocalRelation(_, _) => l +} +assert(localRelations.size == 1) + } + + private def assertNotMetadataOnlyQuery(df: DataFrame): Unit = { +val localRelations = df.queryExecution.optimizedPlan.collect { + case l @ LocalRelation(_, _) => l +} +assert(localRelations.size == 0) + } + + test("OptimizeMetadataOnlyQuery test: aggregate expression is partition columns") { --- End diff -- as @rxin addressed before, Dividing this into multiple functions is to have separate test cases for each of the category that has documented in the OptimizeMetadataOnlyQuery. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13494: [SPARK-15752] [SQL] Optimize metadata only query ...
Github user lianhuiwang commented on a diff in the pull request: https://github.com/apache/spark/pull/13494#discussion_r70248199 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala --- @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, SessionCatalog} +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate._ +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.internal.SQLConf + +/** + * This rule optimizes the execution of queries that can be answered by looking only at + * partition-level metadata. This applies when all the columns scanned are partition columns, and + * the query has an aggregate operator that satisfies the following conditions: + * 1. aggregate expression is partition columns. + * e.g. SELECT col FROM tbl GROUP BY col. + * 2. aggregate function on partition columns with DISTINCT. + * e.g. SELECT col1, count(DISTINCT col2) FROM tbl GROUP BY col1. + * 3. aggregate function on partition columns which have same result w or w/o DISTINCT keyword. + * e.g. SELECT col1, Max(col2) FROM tbl GROUP BY col1. + */ +case class OptimizeMetadataOnlyQuery( +catalog: SessionCatalog, +conf: SQLConf) extends Rule[LogicalPlan] { + + def apply(plan: LogicalPlan): LogicalPlan = { +if (!conf.optimizerMetadataOnly) { + return plan +} + +plan.transform { + case a @ Aggregate(_, aggExprs, child @ PartitionedRelation(partAttrs, relation)) => +// We only apply this optimization when only partitioned attributes are scanned. +if (a.references.subsetOf(partAttrs)) { + val aggFunctions = aggExprs.flatMap(_.collect { +case agg: AggregateExpression => agg + }) + val isAllDistinctAgg = aggFunctions.forall { agg => +agg.isDistinct || (agg.aggregateFunction match { + // `Max` and `Min` are always distinct aggregate functions no matter they have + // DISTINCT keyword or not, as the result will be same. + case _: Max => true + case _: Min => true + case _ => false +}) + } + if (isAllDistinctAgg) { + a.withNewChildren(Seq(replaceTableScanWithPartitionMetadata(child, relation))) + } else { +a + } +} else { + a +} +} + } + + /** + * Transform the given plan, find its table scan nodes that matches the given relation, and then + * replace the table scan node with its corresponding partition values. + */ + private def replaceTableScanWithPartitionMetadata( + child: LogicalPlan, + relation: LogicalPlan): LogicalPlan = { +child transform { + case plan if plan eq relation => +relation match { + case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _) => +val partColumns = fsRelation.partitionSchema.map(_.name.toLowerCase).toSet +val partAttrs = l.output.filter(a => partColumns.contains(a.name.toLowerCase)) +val partitionData = fsRelation.location.listFiles(filters = Nil) +LocalRelation(partAttrs, partitionData.map(_.values)) + + case relation: CatalogRelation => +val partColumns = relation.catalogTable.partitionColumnNames.map(_.toLowerCase).toSet +val partAttrs = relation.output.filter(a => partColum
[GitHub] spark pull request #13494: [SPARK-15752] [SQL] Optimize metadata only query ...
Github user lianhuiwang commented on a diff in the pull request: https://github.com/apache/spark/pull/13494#discussion_r70248137 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala --- @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, SessionCatalog} +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate._ +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.internal.SQLConf + +/** + * This rule optimizes the execution of queries that can be answered by looking only at + * partition-level metadata. This applies when all the columns scanned are partition columns, and + * the query has an aggregate operator that satisfies the following conditions: + * 1. aggregate expression is partition columns. + * e.g. SELECT col FROM tbl GROUP BY col. + * 2. aggregate function on partition columns with DISTINCT. + * e.g. SELECT col1, count(DISTINCT col2) FROM tbl GROUP BY col1. + * 3. aggregate function on partition columns which have same result w or w/o DISTINCT keyword. + * e.g. SELECT col1, Max(col2) FROM tbl GROUP BY col1. + */ +case class OptimizeMetadataOnlyQuery( +catalog: SessionCatalog, +conf: SQLConf) extends Rule[LogicalPlan] { + + def apply(plan: LogicalPlan): LogicalPlan = { +if (!conf.optimizerMetadataOnly) { + return plan +} + +plan.transform { + case a @ Aggregate(_, aggExprs, child @ PartitionedRelation(partAttrs, relation)) => +// We only apply this optimization when only partitioned attributes are scanned. +if (a.references.subsetOf(partAttrs)) { + val aggFunctions = aggExprs.flatMap(_.collect { +case agg: AggregateExpression => agg + }) + val isAllDistinctAgg = aggFunctions.forall { agg => +agg.isDistinct || (agg.aggregateFunction match { + // `Max` and `Min` are always distinct aggregate functions no matter they have + // DISTINCT keyword or not, as the result will be same. + case _: Max => true + case _: Min => true + case _ => false +}) + } + if (isAllDistinctAgg) { + a.withNewChildren(Seq(replaceTableScanWithPartitionMetadata(child, relation))) + } else { +a + } +} else { + a +} +} + } + + /** + * Transform the given plan, find its table scan nodes that matches the given relation, and then + * replace the table scan node with its corresponding partition values. + */ + private def replaceTableScanWithPartitionMetadata( + child: LogicalPlan, + relation: LogicalPlan): LogicalPlan = { +child transform { + case plan if plan eq relation => +relation match { + case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _) => +val partColumns = fsRelation.partitionSchema.map(_.name.toLowerCase).toSet +val partAttrs = l.output.filter(a => partColumns.contains(a.name.toLowerCase)) +val partitionData = fsRelation.location.listFiles(filters = Nil) +LocalRelation(partAttrs, partitionData.map(_.values)) + + case relation: CatalogRelation => +val partColumns = relation.catalogTable.partitionColumnNames.map(_.toLowerCase).toSet +val partAttrs = relation.output.filter(a => partColum
[GitHub] spark pull request #13494: [SPARK-15752] [SQL] Optimize metadata only query ...
Github user lianhuiwang commented on a diff in the pull request: https://github.com/apache/spark/pull/13494#discussion_r70247911 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala --- @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, SessionCatalog} +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate._ +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.internal.SQLConf + +/** + * This rule optimizes the execution of queries that can be answered by looking only at + * partition-level metadata. This applies when all the columns scanned are partition columns, and + * the query has an aggregate operator that satisfies the following conditions: + * 1. aggregate expression is partition columns. + * e.g. SELECT col FROM tbl GROUP BY col. + * 2. aggregate function on partition columns with DISTINCT. + * e.g. SELECT col1, count(DISTINCT col2) FROM tbl GROUP BY col1. + * 3. aggregate function on partition columns which have same result w or w/o DISTINCT keyword. + * e.g. SELECT col1, Max(col2) FROM tbl GROUP BY col1. + */ +case class OptimizeMetadataOnlyQuery( +catalog: SessionCatalog, +conf: SQLConf) extends Rule[LogicalPlan] { + + def apply(plan: LogicalPlan): LogicalPlan = { +if (!conf.optimizerMetadataOnly) { + return plan +} + +plan.transform { + case a @ Aggregate(_, aggExprs, child @ PartitionedRelation(partAttrs, relation)) => +// We only apply this optimization when only partitioned attributes are scanned. +if (a.references.subsetOf(partAttrs)) { + val aggFunctions = aggExprs.flatMap(_.collect { +case agg: AggregateExpression => agg + }) + val isAllDistinctAgg = aggFunctions.forall { agg => +agg.isDistinct || (agg.aggregateFunction match { + // `Max` and `Min` are always distinct aggregate functions no matter they have + // DISTINCT keyword or not, as the result will be same. + case _: Max => true + case _: Min => true + case _ => false +}) + } + if (isAllDistinctAgg) { + a.withNewChildren(Seq(replaceTableScanWithPartitionMetadata(child, relation))) + } else { +a + } +} else { + a +} +} + } + + /** + * Transform the given plan, find its table scan nodes that matches the given relation, and then + * replace the table scan node with its corresponding partition values. + */ + private def replaceTableScanWithPartitionMetadata( + child: LogicalPlan, + relation: LogicalPlan): LogicalPlan = { +child transform { + case plan if plan eq relation => +relation match { + case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _) => +val partColumns = fsRelation.partitionSchema.map(_.name.toLowerCase).toSet +val partAttrs = l.output.filter(a => partColumns.contains(a.name.toLowerCase)) --- End diff -- Yes, there is no need to do it. thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct
[GitHub] spark pull request #13494: [SPARK-15752] [SQL] Optimize metadata only query ...
Github user lianhuiwang commented on a diff in the pull request: https://github.com/apache/spark/pull/13494#discussion_r70247856 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala --- @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, SessionCatalog} +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate._ +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.internal.SQLConf + +/** + * This rule optimizes the execution of queries that can be answered by looking only at + * partition-level metadata. This applies when all the columns scanned are partition columns, and + * the query has an aggregate operator that satisfies the following conditions: + * 1. aggregate expression is partition columns. + * e.g. SELECT col FROM tbl GROUP BY col. + * 2. aggregate function on partition columns with DISTINCT. + * e.g. SELECT col1, count(DISTINCT col2) FROM tbl GROUP BY col1. + * 3. aggregate function on partition columns which have same result w or w/o DISTINCT keyword. + * e.g. SELECT col1, Max(col2) FROM tbl GROUP BY col1. + */ +case class OptimizeMetadataOnlyQuery( +catalog: SessionCatalog, +conf: SQLConf) extends Rule[LogicalPlan] { + + def apply(plan: LogicalPlan): LogicalPlan = { +if (!conf.optimizerMetadataOnly) { + return plan +} + +plan.transform { + case a @ Aggregate(_, aggExprs, child @ PartitionedRelation(partAttrs, relation)) => +// We only apply this optimization when only partitioned attributes are scanned. +if (a.references.subsetOf(partAttrs)) { + val aggFunctions = aggExprs.flatMap(_.collect { +case agg: AggregateExpression => agg + }) + val isAllDistinctAgg = aggFunctions.forall { agg => +agg.isDistinct || (agg.aggregateFunction match { + // `Max` and `Min` are always distinct aggregate functions no matter they have + // DISTINCT keyword or not, as the result will be same. + case _: Max => true --- End diff -- Yes, we need to handle them. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14111: [SPARK-16456][SQL] Reuse the uncorrelated scalar ...
GitHub user lianhuiwang opened a pull request: https://github.com/apache/spark/pull/14111 [SPARK-16456][SQL] Reuse the uncorrelated scalar subqueries with the same logical plan in a query ## What changes were proposed in this pull request? In [TPCDS-Q14](https://github.com/apache/spark/blob/master/sql/core/src/test/resources/tpcds/q14a.sql) the same physical plan of uncorrelated scalar subqueries from a CTE could be executed multiple times, we should re-use the same result to avoid the duplicated computing. ## How was this patch tested? Pass the Jenkins tests (including a new testcase). You can merge this pull request into a Git repository by running: $ git pull https://github.com/lianhuiwang/spark subquery-reuse Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/14111.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #14111 commit 5290e42d416897bea4914c7ef2472c7b8190ea0f Author: Lianhui Wang <lianhuiwan...@gmail.com> Date: 2016-07-09T04:18:13Z init commit --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #13494: [SPARK-15752] [SQL] Optimize metadata only query that ha...
Github user lianhuiwang commented on the issue: https://github.com/apache/spark/pull/13494 @cloud-fan thanks. I have rebased it to master. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13494: [SPARK-15752] [SQL] Optimize metadata only query ...
Github user lianhuiwang commented on a diff in the pull request: https://github.com/apache/spark/pull/13494#discussion_r69860765 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala --- @@ -1689,4 +1689,76 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { ) } } + + test("spark-15752 optimize metadata only query for hive table") { --- End diff -- I will make them having same cases. thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13494: [SPARK-15752] [SQL] Optimize metadata only query ...
Github user lianhuiwang commented on a diff in the pull request: https://github.com/apache/spark/pull/13494#discussion_r69843015 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala --- @@ -1689,4 +1689,86 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { ) } } + + test("spark-15752 optimize metadata only query for hive table") { +withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> "true") { + withTable("data_15752", "srcpart_15752", "srctext_15752") { +val df = Seq((1, "2"), (3, "4")).toDF("key", "value") +df.createOrReplaceTempView("data_15752") +sql( + """ +|CREATE TABLE srcpart_15752 (col1 INT, col2 STRING) +|PARTITIONED BY (partcol1 INT, partcol2 STRING) STORED AS parquet + """.stripMargin) +for (partcol1 <- Seq(11, 12); partcol2 <- Seq("a", "b")) { + sql( +s""" + |INSERT OVERWRITE TABLE srcpart_15752 + |PARTITION (partcol1='$partcol1', partcol2='$partcol2') + |select key, value from data_15752 +""".stripMargin) +} +checkAnswer( + sql("select partcol1 from srcpart_15752 where partcol1 = 11 group by partcol1"), + Row(11)) +checkAnswer(sql("select max(partcol1) from srcpart_15752"), Row(12)) +checkAnswer(sql("select max(partcol1) from srcpart_15752 where partcol1 = 11"), Row(11)) +checkAnswer( + sql("select max(partcol1) from (select partcol1 from srcpart_15752) t"), + Row(12)) +checkAnswer( + sql("select max(col) from (select partcol1 + 1 as col from srcpart_15752 " + +"where partcol1 = 12) t"), + Row(13)) +checkAnswer(sql("select distinct partcol1 from srcpart_15752"), Row(11) :: Row(12) :: Nil) +checkAnswer(sql("select distinct partcol1 from srcpart_15752 where partcol1 = 11"), Row(11)) +checkAnswer( + sql("select distinct col from (select partcol1 + 1 as col from srcpart_15752 " + +"where partcol1 = 12) t"), + Row(13)) + +// Now donot support metadata only optimizer --- End diff -- This PR do not handle them, They will be added in follow-up PRs. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #13494: [SPARK-15752] [SQL] Optimize metadata only query that ha...
Github user lianhuiwang commented on the issue: https://github.com/apache/spark/pull/13494 @cloud-fan Yes, Thanks, I have merged it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #13494: [SPARK-15752] [SQL] Optimize metadata only query that ha...
Github user lianhuiwang commented on the issue: https://github.com/apache/spark/pull/13494 @rxin @yhuai @cloud-fan I have updated with your comments. Thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13494: [SPARK-15752] [SQL] support optimization for meta...
Github user lianhuiwang commented on a diff in the pull request: https://github.com/apache/spark/pull/13494#discussion_r69670547 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala --- @@ -30,6 +30,7 @@ class SparkOptimizer( extends Optimizer(catalog, conf) { override def batches: Seq[Batch] = super.batches :+ +Batch("Metadata Only Optimization", Once, MetadataOnlyOptimizer(catalog, conf)) :+ --- End diff -- ok, Thanks. I will update it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #13494: [SPARK-15752] [SQL] support optimization for metadata on...
Github user lianhuiwang commented on the issue: https://github.com/apache/spark/pull/13494 @cloud-fan Thanks. I have updated with some of your comments. Yes, it is not a small patch and it needs more reviewers. @yhuai @liancheng Could you take a look at this PR? Thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13494: [SPARK-15752] [SQL] support optimization for meta...
Github user lianhuiwang commented on a diff in the pull request: https://github.com/apache/spark/pull/13494#discussion_r69440435 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/MetadataOnlyOptimizerSuite.scala --- @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.plans.logical.LocalRelation +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SharedSQLContext + +class MetadataOnlyOptimizerSuite extends QueryTest with SharedSQLContext { + import testImplicits._ + + override def beforeAll(): Unit = { +super.beforeAll() +val data = (1 to 10).map(i => (i, s"data-$i", i % 2, if ((i % 2) == 0) "even" else "odd")) + .toDF("id", "data", "partId", "part") +data.write.partitionBy("partId", "part").mode("append").saveAsTable("srcpart_15752") + } + + private def checkWithMetadataOnly(df: DataFrame): Unit = { +val localRelations = df.queryExecution.optimizedPlan.collect { + case l @ LocalRelation(_, _) => l +} +assert(localRelations.size == 1) + } + + private def checkWithoutMetadataOnly(df: DataFrame): Unit = { +val localRelations = df.queryExecution.optimizedPlan.collect{ + case l @ LocalRelation(_, _) => l +} +assert(localRelations.size == 0) + } + + test("spark-15752 metadata only optimizer for partition table") { +withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> "true") { + checkWithMetadataOnly(sql("select part from srcpart_15752 where part = 0 group by part")) + checkWithMetadataOnly(sql("select max(part) from srcpart_15752")) + checkWithMetadataOnly(sql("select max(part) from srcpart_15752 where part = 0")) + checkWithMetadataOnly( +sql("select part, min(partId) from srcpart_15752 where part = 0 group by part")) + checkWithMetadataOnly( +sql("select max(x) from (select part + 1 as x from srcpart_15752 where part = 1) t")) + checkWithMetadataOnly(sql("select distinct part from srcpart_15752")) + checkWithMetadataOnly(sql("select distinct part, partId from srcpart_15752")) + checkWithMetadataOnly( +sql("select distinct x from (select part + 1 as x from srcpart_15752 where part = 0) t")) + + // Now donot support metadata only optimizer + checkWithoutMetadataOnly(sql("select part, max(id) from srcpart_15752 group by part")) + checkWithoutMetadataOnly(sql("select distinct part, id from srcpart_15752")) + checkWithoutMetadataOnly(sql("select part, sum(partId) from srcpart_15752 group by part")) + checkWithoutMetadataOnly( +sql("select part from srcpart_15752 where part = 1 group by rollup(part)")) + checkWithoutMetadataOnly( +sql("select part from (select part from srcpart_15752 where part = 0 union all " + --- End diff -- Yes, I think it is not difficult. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13494: [SPARK-15752] [SQL] support optimization for meta...
Github user lianhuiwang commented on a diff in the pull request: https://github.com/apache/spark/pull/13494#discussion_r69440261 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala --- @@ -30,6 +30,7 @@ class SparkOptimizer( extends Optimizer(catalog, conf) { override def batches: Seq[Batch] = super.batches :+ +Batch("Metadata Only Optimization", Once, MetadataOnlyOptimizer(catalog, conf)) :+ --- End diff -- BTW: I find that currently Hive/Presto/Impala have used Metadata Only. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13494: [SPARK-15752] [SQL] support optimization for meta...
Github user lianhuiwang commented on a diff in the pull request: https://github.com/apache/spark/pull/13494#discussion_r69439940 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/MetadataOnlyOptimizer.scala --- @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.{CatalystConf, InternalRow} +import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, SessionCatalog} +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate._ +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} + +/** + * When scanning only partition columns, get results based on partition data without scanning files. + * It's used for operators that only need distinct values. Currently only [[Aggregate]] operator + * which satisfy the following conditions are supported: + * 1. aggregate expression is partition columns. + * e.g. SELECT col FROM tbl GROUP BY col. + * 2. aggregate function on partition columns with DISTINCT. + * e.g. SELECT count(DISTINCT col) FROM tbl GROUP BY col. + * 3. aggregate function on partition columns which have same result w or w/o DISTINCT keyword. + * e.g. SELECT Max(col2) FROM tbl GROUP BY col1. + */ +case class MetadataOnlyOptimizer( +catalog: SessionCatalog, +conf: CatalystConf) extends Rule[LogicalPlan] { + + def apply(plan: LogicalPlan): LogicalPlan = { +if (!conf.optimizerMetadataOnly) { + return plan +} + +plan.transform { + case a @ Aggregate(_, aggExprs, child @ PartitionedRelation(partAttrs, relation)) => +if (a.references.subsetOf(partAttrs)) { + val aggFunctions = aggExprs.flatMap(_.collect { +case agg: AggregateExpression => agg + }) + val isPartitionDataOnly = aggFunctions.isEmpty || aggFunctions.forall { agg => +agg.isDistinct || (agg.aggregateFunction match { + case _: Max => true + case _: Min => true + case _ => false +}) + } + if (isPartitionDataOnly) { +a.withNewChildren(Seq(usePartitionData(child, relation))) + } else { +a + } +} else { + a +} +} + } + + private def usePartitionData(child: LogicalPlan, relation: LogicalPlan): LogicalPlan = { +child transform { + case plan if plan eq relation => +relation match { + case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _) => +val partColumns = fsRelation.partitionSchema.map(_.name.toLowerCase).toSet +val partAttrs = l.output.filter(a => partColumns.contains(a.name.toLowerCase)) +val partitionData = fsRelation.location.listFiles(Nil) +LocalRelation(partAttrs, partitionData.map(_.values)) --- End diff -- How about sizeInBytes of LocalRelation's statistics is used to determine the parallelism? Initial code is https://github.com/apache/spark/pull/13979/commits/2ca01f26df7572251136d2c059299f846cf8a3f1. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #13494: [SPARK-15752] [SQL] support optimization for metadata on...
Github user lianhuiwang commented on the issue: https://github.com/apache/spark/pull/13494 @cloud-fan I have updated with your branch code. Thanks a lot. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #13494: [SPARK-15752] [SQL] support optimization for metadata on...
Github user lianhuiwang commented on the issue: https://github.com/apache/spark/pull/13494 @cloud-fan Thanks. I will look at it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #13494: [SPARK-15752] [SQL] support optimization for metadata on...
Github user lianhuiwang commented on the issue: https://github.com/apache/spark/pull/13494 retest it please. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13979: [SPARK-SPARK-16302] [SQL] Set the right number of...
GitHub user lianhuiwang opened a pull request: https://github.com/apache/spark/pull/13979 [SPARK-SPARK-16302] [SQL] Set the right number of partitions for reading data from a local collection. ## What changes were proposed in this pull request? follow #13137 This pr sets the right number of partitions when reading data from a local collection. Query 'val df = Seq[(Int, Int)]().toDF("key", "value").count' always use defaultParallelism tasks. So i cause run empty or small tasks. I think we can use (conf.autoBroadcastJoinThreshold / 10 = 1M) bytes for one partition. ## How was this patch tested? Manually tested and checked. You can merge this pull request into a Git repository by running: $ git pull https://github.com/lianhuiwang/spark localTable-Parallel Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/13979.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #13979 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org