[GitHub] spark pull request #13494: [SPARK-15752] [SQL] support optimization for meta...

2016-07-06 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/13494#discussion_r69681850
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlySuite.scala
 ---
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSQLContext
+
+class OptimizeMetadataOnlySuite extends QueryTest with SharedSQLContext {
+  import testImplicits._
+
+  override def beforeAll(): Unit = {
+super.beforeAll()
+val data = (1 to 10).map(i => (i, s"data-$i", i % 2, if ((i % 2) == 0) 
"even" else "odd"))
+  .toDF("id", "data", "partId", "part")
--- End diff --

I'd name these ...

"col1", "col2", "partcol1", "partcol2".

to make it very obvious which is partition column and which is not.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13494: [SPARK-15752] [SQL] support optimization for meta...

2016-07-06 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/13494#discussion_r69681789
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnly.scala
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.{CatalystConf, InternalRow}
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, 
SessionCatalog}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+
+/**
+ * When scanning only partition columns, get results based on partition 
data without scanning files.
+ * It's used for operators that only need distinct values. Currently only 
[[Aggregate]] operator
+ * which satisfy the following conditions are supported:
+ * 1. aggregate expression is partition columns.
+ *  e.g. SELECT col FROM tbl GROUP BY col.
+ * 2. aggregate function on partition columns with DISTINCT.
+ *  e.g. SELECT count(DISTINCT col1) FROM tbl GROUP BY col2.
+ * 3. aggregate function on partition columns which have same result w or 
w/o DISTINCT keyword.
+ *  e.g. SELECT Max(col2) FROM tbl GROUP BY col1.
+ */
+case class OptimizeMetadataOnly(
+catalog: SessionCatalog,
+conf: CatalystConf) extends Rule[LogicalPlan] {
+
+  def apply(plan: LogicalPlan): LogicalPlan = {
+if (!conf.optimizerMetadataOnly) {
+  return plan
+}
+
+plan.transform {
+  case a @ Aggregate(_, aggExprs, child @ 
PartitionedRelation(partAttrs, relation)) =>
+if (a.references.subsetOf(partAttrs)) {
+  val aggFunctions = aggExprs.flatMap(_.collect {
+case agg: AggregateExpression => agg
+  })
+  val isPartitionDataOnly = aggFunctions.isEmpty || 
aggFunctions.forall { agg =>
+agg.isDistinct || (agg.aggregateFunction match {
--- End diff --

yea, as long as `col1` and `col2` are both partition columns


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13494: [SPARK-15752] [SQL] support optimization for meta...

2016-07-06 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/13494#discussion_r69680337
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala ---
@@ -51,6 +52,7 @@ case class SimpleCatalystConf(
 caseSensitiveAnalysis: Boolean,
 orderByOrdinal: Boolean = true,
 groupByOrdinal: Boolean = true,
+optimizerMetadataOnly: Boolean = true,
--- End diff --

that's a good point. The new rule is in sql core module and can access the 
`SQLConf`, we don't need to add the conf in `CatalystConf`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13494: [SPARK-15752] [SQL] support optimization for meta...

2016-07-06 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/13494#discussion_r69679529
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala ---
@@ -51,6 +52,7 @@ case class SimpleCatalystConf(
 caseSensitiveAnalysis: Boolean,
 orderByOrdinal: Boolean = true,
 groupByOrdinal: Boolean = true,
+optimizerMetadataOnly: Boolean = true,
--- End diff --

Do we need to change this file?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13494: [SPARK-15752] [SQL] support optimization for meta...

2016-07-06 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/13494#discussion_r69679372
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -258,6 +258,11 @@ object SQLConf {
   .booleanConf
   .createWithDefault(false)
 
+  val OPTIMIZER_METADATA_ONLY = 
SQLConfigBuilder("spark.sql.optimizer.metadataOnly")
+.doc("When true, enable the metadata-only query optimization.")
--- End diff --

Please update the doc to explain what `metadata-only query` means.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13494: [SPARK-15752] [SQL] support optimization for meta...

2016-07-05 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/13494#discussion_r69677532
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlySuite.scala
 ---
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSQLContext
+
+class OptimizeMetadataOnlySuite extends QueryTest with SharedSQLContext {
+  import testImplicits._
+
+  override def beforeAll(): Unit = {
+super.beforeAll()
+val data = (1 to 10).map(i => (i, s"data-$i", i % 2, if ((i % 2) == 0) 
"even" else "odd"))
+  .toDF("id", "data", "partId", "part")
+data.write.partitionBy("partId", 
"part").mode("append").saveAsTable("srcpart_15752")
+  }
+
+  override protected def afterAll(): Unit = {
+try {
+  sql("DROP TABLE IF EXISTS srcpart_15752")
+} finally {
+  super.afterAll()
+}
+  }
+
+  private def checkWithMetadataOnly(df: DataFrame): Unit = {
+val localRelations = df.queryExecution.optimizedPlan.collect {
+  case l @ LocalRelation(_, _) => l
+}
+assert(localRelations.size == 1)
+  }
+
+  private def checkWithoutMetadataOnly(df: DataFrame): Unit = {
+val localRelations = df.queryExecution.optimizedPlan.collect{
+  case l @ LocalRelation(_, _) => l
+}
+assert(localRelations.size == 0)
+  }
+
+  test("spark-15752 metadata only optimizer for partition table") {
+withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> "true") {
+  checkWithMetadataOnly(sql("select part from srcpart_15752 where part 
= 0 group by part"))
+  checkWithMetadataOnly(sql("select max(part) from srcpart_15752"))
+  checkWithMetadataOnly(sql("select max(part) from srcpart_15752 where 
part = 0"))
+  checkWithMetadataOnly(
+sql("select part, min(partId) from srcpart_15752 where part = 0 
group by part"))
+  checkWithMetadataOnly(
+sql("select max(x) from (select part + 1 as x from srcpart_15752 
where part = 1) t"))
+  checkWithMetadataOnly(sql("select distinct part from srcpart_15752"))
+  checkWithMetadataOnly(sql("select distinct part, partId from 
srcpart_15752"))
+  checkWithMetadataOnly(
+sql("select distinct x from (select part + 1 as x from 
srcpart_15752 where part = 0) t"))
+
+  // Now donot support metadata only optimizer
+  checkWithoutMetadataOnly(sql("select part, max(id) from 
srcpart_15752 group by part"))
+  checkWithoutMetadataOnly(sql("select distinct part, id from 
srcpart_15752"))
+  checkWithoutMetadataOnly(sql("select part, sum(partId) from 
srcpart_15752 group by part"))
+  checkWithoutMetadataOnly(
+sql("select part from srcpart_15752 where part = 1 group by 
rollup(part)"))
+  checkWithoutMetadataOnly(
+sql("select part from (select part from srcpart_15752 where part = 
0 union all " +
+  "select part from srcpart_15752 where part= 1)t group by part"))
+}
+  }
+
+  test("spark-15752 without metadata only optimizer for partition table") {
+withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> "false") {
+  checkWithoutMetadataOnly(sql("select part from srcpart_15752 where 
part = 0 group by part"))
+  checkWithoutMetadataOnly(sql("select max(part) from srcpart_15752"))
--- End diff --

ah ok - this is getting confusing. we should just have 
checkWithMetadataOnly test the behavior when the flag is off.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact 

[GitHub] spark pull request #13494: [SPARK-15752] [SQL] support optimization for meta...

2016-07-05 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/13494#discussion_r69676774
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlySuite.scala
 ---
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSQLContext
+
+class OptimizeMetadataOnlySuite extends QueryTest with SharedSQLContext {
+  import testImplicits._
+
+  override def beforeAll(): Unit = {
+super.beforeAll()
+val data = (1 to 10).map(i => (i, s"data-$i", i % 2, if ((i % 2) == 0) 
"even" else "odd"))
+  .toDF("id", "data", "partId", "part")
+data.write.partitionBy("partId", 
"part").mode("append").saveAsTable("srcpart_15752")
+  }
+
+  override protected def afterAll(): Unit = {
+try {
+  sql("DROP TABLE IF EXISTS srcpart_15752")
+} finally {
+  super.afterAll()
+}
+  }
+
+  private def checkWithMetadataOnly(df: DataFrame): Unit = {
+val localRelations = df.queryExecution.optimizedPlan.collect {
+  case l @ LocalRelation(_, _) => l
+}
+assert(localRelations.size == 1)
+  }
+
+  private def checkWithoutMetadataOnly(df: DataFrame): Unit = {
+val localRelations = df.queryExecution.optimizedPlan.collect{
+  case l @ LocalRelation(_, _) => l
+}
+assert(localRelations.size == 0)
+  }
+
+  test("spark-15752 metadata only optimizer for partition table") {
+withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> "true") {
+  checkWithMetadataOnly(sql("select part from srcpart_15752 where part 
= 0 group by part"))
+  checkWithMetadataOnly(sql("select max(part) from srcpart_15752"))
+  checkWithMetadataOnly(sql("select max(part) from srcpart_15752 where 
part = 0"))
+  checkWithMetadataOnly(
+sql("select part, min(partId) from srcpart_15752 where part = 0 
group by part"))
+  checkWithMetadataOnly(
+sql("select max(x) from (select part + 1 as x from srcpart_15752 
where part = 1) t"))
+  checkWithMetadataOnly(sql("select distinct part from srcpart_15752"))
+  checkWithMetadataOnly(sql("select distinct part, partId from 
srcpart_15752"))
+  checkWithMetadataOnly(
+sql("select distinct x from (select part + 1 as x from 
srcpart_15752 where part = 0) t"))
+
+  // Now donot support metadata only optimizer
+  checkWithoutMetadataOnly(sql("select part, max(id) from 
srcpart_15752 group by part"))
+  checkWithoutMetadataOnly(sql("select distinct part, id from 
srcpart_15752"))
+  checkWithoutMetadataOnly(sql("select part, sum(partId) from 
srcpart_15752 group by part"))
+  checkWithoutMetadataOnly(
+sql("select part from srcpart_15752 where part = 1 group by 
rollup(part)"))
+  checkWithoutMetadataOnly(
+sql("select part from (select part from srcpart_15752 where part = 
0 union all " +
+  "select part from srcpart_15752 where part= 1)t group by part"))
+}
+  }
+
+  test("spark-15752 without metadata only optimizer for partition table") {
+withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> "false") {
+  checkWithoutMetadataOnly(sql("select part from srcpart_15752 where 
part = 0 group by part"))
+  checkWithoutMetadataOnly(sql("select max(part) from srcpart_15752"))
--- End diff --

because we turn off the flag for this test


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request #13494: [SPARK-15752] [SQL] support optimization for meta...

2016-07-05 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/13494#discussion_r69676173
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnly.scala
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.{CatalystConf, InternalRow}
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, 
SessionCatalog}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+
+/**
+ * When scanning only partition columns, get results based on partition 
data without scanning files.
+ * It's used for operators that only need distinct values. Currently only 
[[Aggregate]] operator
+ * which satisfy the following conditions are supported:
+ * 1. aggregate expression is partition columns.
+ *  e.g. SELECT col FROM tbl GROUP BY col.
+ * 2. aggregate function on partition columns with DISTINCT.
+ *  e.g. SELECT count(DISTINCT col1) FROM tbl GROUP BY col2.
+ * 3. aggregate function on partition columns which have same result w or 
w/o DISTINCT keyword.
+ *  e.g. SELECT Max(col2) FROM tbl GROUP BY col1.
+ */
+case class OptimizeMetadataOnly(
+catalog: SessionCatalog,
+conf: CatalystConf) extends Rule[LogicalPlan] {
+
+  def apply(plan: LogicalPlan): LogicalPlan = {
+if (!conf.optimizerMetadataOnly) {
+  return plan
+}
+
+plan.transform {
+  case a @ Aggregate(_, aggExprs, child @ 
PartitionedRelation(partAttrs, relation)) =>
+if (a.references.subsetOf(partAttrs)) {
+  val aggFunctions = aggExprs.flatMap(_.collect {
+case agg: AggregateExpression => agg
+  })
+  val isPartitionDataOnly = aggFunctions.isEmpty || 
aggFunctions.forall { agg =>
+agg.isDistinct || (agg.aggregateFunction match {
--- End diff --

```
select col1, max(col2) from table group by col1
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13494: [SPARK-15752] [SQL] support optimization for meta...

2016-07-05 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/13494#discussion_r69675862
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlySuite.scala
 ---
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSQLContext
+
+class OptimizeMetadataOnlySuite extends QueryTest with SharedSQLContext {
+  import testImplicits._
+
+  override def beforeAll(): Unit = {
+super.beforeAll()
+val data = (1 to 10).map(i => (i, s"data-$i", i % 2, if ((i % 2) == 0) 
"even" else "odd"))
+  .toDF("id", "data", "partId", "part")
+data.write.partitionBy("partId", 
"part").mode("append").saveAsTable("srcpart_15752")
+  }
+
+  override protected def afterAll(): Unit = {
+try {
+  sql("DROP TABLE IF EXISTS srcpart_15752")
+} finally {
+  super.afterAll()
+}
+  }
+
+  private def checkWithMetadataOnly(df: DataFrame): Unit = {
+val localRelations = df.queryExecution.optimizedPlan.collect {
+  case l @ LocalRelation(_, _) => l
+}
+assert(localRelations.size == 1)
+  }
+
+  private def checkWithoutMetadataOnly(df: DataFrame): Unit = {
+val localRelations = df.queryExecution.optimizedPlan.collect{
+  case l @ LocalRelation(_, _) => l
+}
+assert(localRelations.size == 0)
+  }
+
+  test("spark-15752 metadata only optimizer for partition table") {
+withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> "true") {
+  checkWithMetadataOnly(sql("select part from srcpart_15752 where part 
= 0 group by part"))
+  checkWithMetadataOnly(sql("select max(part) from srcpart_15752"))
+  checkWithMetadataOnly(sql("select max(part) from srcpart_15752 where 
part = 0"))
+  checkWithMetadataOnly(
+sql("select part, min(partId) from srcpart_15752 where part = 0 
group by part"))
+  checkWithMetadataOnly(
+sql("select max(x) from (select part + 1 as x from srcpart_15752 
where part = 1) t"))
+  checkWithMetadataOnly(sql("select distinct part from srcpart_15752"))
+  checkWithMetadataOnly(sql("select distinct part, partId from 
srcpart_15752"))
+  checkWithMetadataOnly(
+sql("select distinct x from (select part + 1 as x from 
srcpart_15752 where part = 0) t"))
+
+  // Now donot support metadata only optimizer
+  checkWithoutMetadataOnly(sql("select part, max(id) from 
srcpart_15752 group by part"))
+  checkWithoutMetadataOnly(sql("select distinct part, id from 
srcpart_15752"))
+  checkWithoutMetadataOnly(sql("select part, sum(partId) from 
srcpart_15752 group by part"))
+  checkWithoutMetadataOnly(
+sql("select part from srcpart_15752 where part = 1 group by 
rollup(part)"))
+  checkWithoutMetadataOnly(
+sql("select part from (select part from srcpart_15752 where part = 
0 union all " +
+  "select part from srcpart_15752 where part= 1)t group by part"))
+}
+  }
+
+  test("spark-15752 without metadata only optimizer for partition table") {
+withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> "false") {
+  checkWithoutMetadataOnly(sql("select part from srcpart_15752 where 
part = 0 group by part"))
+  checkWithoutMetadataOnly(sql("select max(part) from srcpart_15752"))
--- End diff --

why isn't this one supported?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request #13494: [SPARK-15752] [SQL] support optimization for meta...

2016-07-05 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/13494#discussion_r69675796
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlySuite.scala
 ---
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSQLContext
+
+class OptimizeMetadataOnlySuite extends QueryTest with SharedSQLContext {
+  import testImplicits._
+
+  override def beforeAll(): Unit = {
+super.beforeAll()
+val data = (1 to 10).map(i => (i, s"data-$i", i % 2, if ((i % 2) == 0) 
"even" else "odd"))
+  .toDF("id", "data", "partId", "part")
+data.write.partitionBy("partId", 
"part").mode("append").saveAsTable("srcpart_15752")
+  }
+
+  override protected def afterAll(): Unit = {
+try {
+  sql("DROP TABLE IF EXISTS srcpart_15752")
+} finally {
+  super.afterAll()
+}
+  }
+
+  private def checkWithMetadataOnly(df: DataFrame): Unit = {
+val localRelations = df.queryExecution.optimizedPlan.collect {
+  case l @ LocalRelation(_, _) => l
+}
+assert(localRelations.size == 1)
+  }
+
+  private def checkWithoutMetadataOnly(df: DataFrame): Unit = {
+val localRelations = df.queryExecution.optimizedPlan.collect{
+  case l @ LocalRelation(_, _) => l
+}
+assert(localRelations.size == 0)
+  }
+
+  test("spark-15752 metadata only optimizer for partition table") {
--- End diff --

for example, you can have a test case for each of the category you 
documented in the optimizer classdoc comment, and also have separate test cases 
for filter, project, etc.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13494: [SPARK-15752] [SQL] support optimization for meta...

2016-07-05 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/13494#discussion_r69675767
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlySuite.scala
 ---
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSQLContext
+
+class OptimizeMetadataOnlySuite extends QueryTest with SharedSQLContext {
+  import testImplicits._
+
+  override def beforeAll(): Unit = {
+super.beforeAll()
+val data = (1 to 10).map(i => (i, s"data-$i", i % 2, if ((i % 2) == 0) 
"even" else "odd"))
+  .toDF("id", "data", "partId", "part")
+data.write.partitionBy("partId", 
"part").mode("append").saveAsTable("srcpart_15752")
+  }
+
+  override protected def afterAll(): Unit = {
+try {
+  sql("DROP TABLE IF EXISTS srcpart_15752")
+} finally {
+  super.afterAll()
+}
+  }
+
+  private def checkWithMetadataOnly(df: DataFrame): Unit = {
+val localRelations = df.queryExecution.optimizedPlan.collect {
+  case l @ LocalRelation(_, _) => l
+}
+assert(localRelations.size == 1)
+  }
+
+  private def checkWithoutMetadataOnly(df: DataFrame): Unit = {
+val localRelations = df.queryExecution.optimizedPlan.collect{
+  case l @ LocalRelation(_, _) => l
+}
+assert(localRelations.size == 0)
+  }
+
+  test("spark-15752 metadata only optimizer for partition table") {
--- End diff --

it'd be great to break this into multiple cases, rather than simply "yes" 
vs "no".



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13494: [SPARK-15752] [SQL] support optimization for meta...

2016-07-05 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/13494#discussion_r69675675
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlySuite.scala
 ---
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSQLContext
+
+class OptimizeMetadataOnlySuite extends QueryTest with SharedSQLContext {
+  import testImplicits._
+
+  override def beforeAll(): Unit = {
+super.beforeAll()
+val data = (1 to 10).map(i => (i, s"data-$i", i % 2, if ((i % 2) == 0) 
"even" else "odd"))
+  .toDF("id", "data", "partId", "part")
+data.write.partitionBy("partId", 
"part").mode("append").saveAsTable("srcpart_15752")
+  }
+
+  override protected def afterAll(): Unit = {
+try {
+  sql("DROP TABLE IF EXISTS srcpart_15752")
+} finally {
+  super.afterAll()
+}
+  }
+
+  private def checkWithMetadataOnly(df: DataFrame): Unit = {
+val localRelations = df.queryExecution.optimizedPlan.collect {
+  case l @ LocalRelation(_, _) => l
+}
+assert(localRelations.size == 1)
+  }
+
+  private def checkWithoutMetadataOnly(df: DataFrame): Unit = {
+val localRelations = df.queryExecution.optimizedPlan.collect{
--- End diff --

add a space after collect


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13494: [SPARK-15752] [SQL] support optimization for meta...

2016-07-05 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/13494#discussion_r69675663
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlySuite.scala
 ---
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSQLContext
+
+class OptimizeMetadataOnlySuite extends QueryTest with SharedSQLContext {
+  import testImplicits._
+
+  override def beforeAll(): Unit = {
+super.beforeAll()
+val data = (1 to 10).map(i => (i, s"data-$i", i % 2, if ((i % 2) == 0) 
"even" else "odd"))
+  .toDF("id", "data", "partId", "part")
+data.write.partitionBy("partId", 
"part").mode("append").saveAsTable("srcpart_15752")
+  }
+
+  override protected def afterAll(): Unit = {
+try {
+  sql("DROP TABLE IF EXISTS srcpart_15752")
+} finally {
+  super.afterAll()
+}
+  }
+
+  private def checkWithMetadataOnly(df: DataFrame): Unit = {
+val localRelations = df.queryExecution.optimizedPlan.collect {
+  case l @ LocalRelation(_, _) => l
+}
+assert(localRelations.size == 1)
+  }
+
+  private def checkWithoutMetadataOnly(df: DataFrame): Unit = {
--- End diff --

assertNotMetadataOnlyQuery


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13494: [SPARK-15752] [SQL] support optimization for meta...

2016-07-05 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/13494#discussion_r69675655
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlySuite.scala
 ---
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSQLContext
+
+class OptimizeMetadataOnlySuite extends QueryTest with SharedSQLContext {
+  import testImplicits._
+
+  override def beforeAll(): Unit = {
+super.beforeAll()
+val data = (1 to 10).map(i => (i, s"data-$i", i % 2, if ((i % 2) == 0) 
"even" else "odd"))
+  .toDF("id", "data", "partId", "part")
+data.write.partitionBy("partId", 
"part").mode("append").saveAsTable("srcpart_15752")
+  }
+
+  override protected def afterAll(): Unit = {
+try {
+  sql("DROP TABLE IF EXISTS srcpart_15752")
+} finally {
+  super.afterAll()
+}
+  }
+
+  private def checkWithMetadataOnly(df: DataFrame): Unit = {
--- End diff --

assertMetadataOnlyQuery


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13494: [SPARK-15752] [SQL] support optimization for meta...

2016-07-05 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/13494#discussion_r69675558
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnly.scala
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.{CatalystConf, InternalRow}
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, 
SessionCatalog}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+
+/**
+ * When scanning only partition columns, get results based on partition 
data without scanning files.
+ * It's used for operators that only need distinct values. Currently only 
[[Aggregate]] operator
+ * which satisfy the following conditions are supported:
+ * 1. aggregate expression is partition columns.
+ *  e.g. SELECT col FROM tbl GROUP BY col.
+ * 2. aggregate function on partition columns with DISTINCT.
+ *  e.g. SELECT count(DISTINCT col1) FROM tbl GROUP BY col2.
+ * 3. aggregate function on partition columns which have same result w or 
w/o DISTINCT keyword.
+ *  e.g. SELECT Max(col2) FROM tbl GROUP BY col1.
+ */
+case class OptimizeMetadataOnly(
+catalog: SessionCatalog,
+conf: CatalystConf) extends Rule[LogicalPlan] {
+
+  def apply(plan: LogicalPlan): LogicalPlan = {
+if (!conf.optimizerMetadataOnly) {
+  return plan
+}
+
+plan.transform {
+  case a @ Aggregate(_, aggExprs, child @ 
PartitionedRelation(partAttrs, relation)) =>
+if (a.references.subsetOf(partAttrs)) {
+  val aggFunctions = aggExprs.flatMap(_.collect {
+case agg: AggregateExpression => agg
+  })
+  val isPartitionDataOnly = aggFunctions.isEmpty || 
aggFunctions.forall { agg =>
+agg.isDistinct || (agg.aggregateFunction match {
+  case _: Max => true
+  case _: Min => true
+  case _ => false
+})
+  }
+  if (isPartitionDataOnly) {
+a.withNewChildren(Seq(usePartitionData(child, relation)))
+  } else {
+a
+  }
+} else {
+  a
+}
+}
+  }
+
+  private def usePartitionData(child: LogicalPlan, relation: LogicalPlan): 
LogicalPlan = {
+child transform {
+  case plan if plan eq relation =>
+relation match {
+  case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _) =>
+val partColumns = 
fsRelation.partitionSchema.map(_.name.toLowerCase).toSet
+val partAttrs = l.output.filter(a => 
partColumns.contains(a.name.toLowerCase))
+val partitionData = fsRelation.location.listFiles(Nil)
+LocalRelation(partAttrs, partitionData.map(_.values))
+
+  case relation: CatalogRelation =>
+val partColumns = 
relation.catalogTable.partitionColumnNames.map(_.toLowerCase).toSet
+val partAttrs = relation.output.filter(a => 
partColumns.contains(a.name.toLowerCase))
+val partitionData = 
catalog.listPartitions(relation.catalogTable.identifier).map { p =>
+  InternalRow.fromSeq(partAttrs.map { attr =>
+Cast(Literal(p.spec(attr.name)), attr.dataType).eval()
+  })
+}
+LocalRelation(partAttrs, partitionData)
+
+  case _ => throw new IllegalStateException()
+}
+}
+  }
+
+  object PartitionedRelation {
+def unapply(plan: LogicalPlan): Option[(AttributeSet, LogicalPlan)] = 

[GitHub] spark pull request #13494: [SPARK-15752] [SQL] support optimization for meta...

2016-07-05 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/13494#discussion_r69675543
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnly.scala
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.{CatalystConf, InternalRow}
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, 
SessionCatalog}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+
+/**
+ * When scanning only partition columns, get results based on partition 
data without scanning files.
+ * It's used for operators that only need distinct values. Currently only 
[[Aggregate]] operator
+ * which satisfy the following conditions are supported:
+ * 1. aggregate expression is partition columns.
+ *  e.g. SELECT col FROM tbl GROUP BY col.
+ * 2. aggregate function on partition columns with DISTINCT.
+ *  e.g. SELECT count(DISTINCT col1) FROM tbl GROUP BY col2.
+ * 3. aggregate function on partition columns which have same result w or 
w/o DISTINCT keyword.
+ *  e.g. SELECT Max(col2) FROM tbl GROUP BY col1.
+ */
+case class OptimizeMetadataOnly(
+catalog: SessionCatalog,
+conf: CatalystConf) extends Rule[LogicalPlan] {
+
+  def apply(plan: LogicalPlan): LogicalPlan = {
+if (!conf.optimizerMetadataOnly) {
+  return plan
+}
+
+plan.transform {
+  case a @ Aggregate(_, aggExprs, child @ 
PartitionedRelation(partAttrs, relation)) =>
+if (a.references.subsetOf(partAttrs)) {
+  val aggFunctions = aggExprs.flatMap(_.collect {
+case agg: AggregateExpression => agg
+  })
+  val isPartitionDataOnly = aggFunctions.isEmpty || 
aggFunctions.forall { agg =>
+agg.isDistinct || (agg.aggregateFunction match {
+  case _: Max => true
+  case _: Min => true
+  case _ => false
+})
+  }
+  if (isPartitionDataOnly) {
+a.withNewChildren(Seq(usePartitionData(child, relation)))
+  } else {
+a
+  }
+} else {
+  a
+}
+}
+  }
+
+  private def usePartitionData(child: LogicalPlan, relation: LogicalPlan): 
LogicalPlan = {
+child transform {
+  case plan if plan eq relation =>
+relation match {
+  case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _) =>
+val partColumns = 
fsRelation.partitionSchema.map(_.name.toLowerCase).toSet
+val partAttrs = l.output.filter(a => 
partColumns.contains(a.name.toLowerCase))
+val partitionData = fsRelation.location.listFiles(Nil)
+LocalRelation(partAttrs, partitionData.map(_.values))
+
+  case relation: CatalogRelation =>
+val partColumns = 
relation.catalogTable.partitionColumnNames.map(_.toLowerCase).toSet
+val partAttrs = relation.output.filter(a => 
partColumns.contains(a.name.toLowerCase))
+val partitionData = 
catalog.listPartitions(relation.catalogTable.identifier).map { p =>
+  InternalRow.fromSeq(partAttrs.map { attr =>
+Cast(Literal(p.spec(attr.name)), attr.dataType).eval()
+  })
+}
+LocalRelation(partAttrs, partitionData)
+
+  case _ => throw new IllegalStateException()
+}
+}
+  }
+
+  object PartitionedRelation {
+def unapply(plan: LogicalPlan): Option[(AttributeSet, LogicalPlan)] = 

[GitHub] spark pull request #13494: [SPARK-15752] [SQL] support optimization for meta...

2016-07-05 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/13494#discussion_r69675141
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnly.scala
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.{CatalystConf, InternalRow}
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, 
SessionCatalog}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+
+/**
+ * When scanning only partition columns, get results based on partition 
data without scanning files.
+ * It's used for operators that only need distinct values. Currently only 
[[Aggregate]] operator
+ * which satisfy the following conditions are supported:
+ * 1. aggregate expression is partition columns.
+ *  e.g. SELECT col FROM tbl GROUP BY col.
+ * 2. aggregate function on partition columns with DISTINCT.
+ *  e.g. SELECT count(DISTINCT col1) FROM tbl GROUP BY col2.
+ * 3. aggregate function on partition columns which have same result w or 
w/o DISTINCT keyword.
+ *  e.g. SELECT Max(col2) FROM tbl GROUP BY col1.
+ */
+case class OptimizeMetadataOnly(
+catalog: SessionCatalog,
+conf: CatalystConf) extends Rule[LogicalPlan] {
+
+  def apply(plan: LogicalPlan): LogicalPlan = {
+if (!conf.optimizerMetadataOnly) {
+  return plan
+}
+
+plan.transform {
+  case a @ Aggregate(_, aggExprs, child @ 
PartitionedRelation(partAttrs, relation)) =>
+if (a.references.subsetOf(partAttrs)) {
+  val aggFunctions = aggExprs.flatMap(_.collect {
+case agg: AggregateExpression => agg
+  })
+  val isPartitionDataOnly = aggFunctions.isEmpty || 
aggFunctions.forall { agg =>
+agg.isDistinct || (agg.aggregateFunction match {
--- End diff --

it can't pass analysis... `col2` is not grouping column and must be put 
inside agg functins.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13494: [SPARK-15752] [SQL] support optimization for meta...

2016-07-05 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/13494#discussion_r69675086
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnly.scala
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.{CatalystConf, InternalRow}
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, 
SessionCatalog}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+
+/**
+ * When scanning only partition columns, get results based on partition 
data without scanning files.
+ * It's used for operators that only need distinct values. Currently only 
[[Aggregate]] operator
+ * which satisfy the following conditions are supported:
+ * 1. aggregate expression is partition columns.
+ *  e.g. SELECT col FROM tbl GROUP BY col.
+ * 2. aggregate function on partition columns with DISTINCT.
+ *  e.g. SELECT count(DISTINCT col1) FROM tbl GROUP BY col2.
+ * 3. aggregate function on partition columns which have same result w or 
w/o DISTINCT keyword.
+ *  e.g. SELECT Max(col2) FROM tbl GROUP BY col1.
+ */
+case class OptimizeMetadataOnly(
+catalog: SessionCatalog,
+conf: CatalystConf) extends Rule[LogicalPlan] {
+
+  def apply(plan: LogicalPlan): LogicalPlan = {
+if (!conf.optimizerMetadataOnly) {
+  return plan
+}
+
+plan.transform {
+  case a @ Aggregate(_, aggExprs, child @ 
PartitionedRelation(partAttrs, relation)) =>
+if (a.references.subsetOf(partAttrs)) {
+  val aggFunctions = aggExprs.flatMap(_.collect {
+case agg: AggregateExpression => agg
+  })
+  val isPartitionDataOnly = aggFunctions.isEmpty || 
aggFunctions.forall { agg =>
+agg.isDistinct || (agg.aggregateFunction match {
+  case _: Max => true
+  case _: Min => true
+  case _ => false
+})
+  }
+  if (isPartitionDataOnly) {
+a.withNewChildren(Seq(usePartitionData(child, relation)))
+  } else {
+a
+  }
+} else {
+  a
+}
+}
+  }
+
+  private def usePartitionData(child: LogicalPlan, relation: LogicalPlan): 
LogicalPlan = {
+child transform {
+  case plan if plan eq relation =>
+relation match {
+  case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _) =>
+val partColumns = 
fsRelation.partitionSchema.map(_.name.toLowerCase).toSet
+val partAttrs = l.output.filter(a => 
partColumns.contains(a.name.toLowerCase))
+val partitionData = fsRelation.location.listFiles(Nil)
+LocalRelation(partAttrs, partitionData.map(_.values))
+
+  case relation: CatalogRelation =>
+val partColumns = 
relation.catalogTable.partitionColumnNames.map(_.toLowerCase).toSet
+val partAttrs = relation.output.filter(a => 
partColumns.contains(a.name.toLowerCase))
+val partitionData = 
catalog.listPartitions(relation.catalogTable.identifier).map { p =>
+  InternalRow.fromSeq(partAttrs.map { attr =>
+Cast(Literal(p.spec(attr.name)), attr.dataType).eval()
+  })
+}
+LocalRelation(partAttrs, partitionData)
+
+  case _ => throw new IllegalStateException()
+}
+}
+  }
+
+  object PartitionedRelation {
--- End diff --

we should also explain what patterns are 

[GitHub] spark pull request #13494: [SPARK-15752] [SQL] support optimization for meta...

2016-07-05 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/13494#discussion_r69674974
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnly.scala
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.{CatalystConf, InternalRow}
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, 
SessionCatalog}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+
+/**
+ * When scanning only partition columns, get results based on partition 
data without scanning files.
+ * It's used for operators that only need distinct values. Currently only 
[[Aggregate]] operator
+ * which satisfy the following conditions are supported:
+ * 1. aggregate expression is partition columns.
+ *  e.g. SELECT col FROM tbl GROUP BY col.
+ * 2. aggregate function on partition columns with DISTINCT.
+ *  e.g. SELECT count(DISTINCT col1) FROM tbl GROUP BY col2.
+ * 3. aggregate function on partition columns which have same result w or 
w/o DISTINCT keyword.
+ *  e.g. SELECT Max(col2) FROM tbl GROUP BY col1.
+ */
+case class OptimizeMetadataOnly(
+catalog: SessionCatalog,
+conf: CatalystConf) extends Rule[LogicalPlan] {
+
+  def apply(plan: LogicalPlan): LogicalPlan = {
+if (!conf.optimizerMetadataOnly) {
+  return plan
+}
+
+plan.transform {
+  case a @ Aggregate(_, aggExprs, child @ 
PartitionedRelation(partAttrs, relation)) =>
+if (a.references.subsetOf(partAttrs)) {
+  val aggFunctions = aggExprs.flatMap(_.collect {
+case agg: AggregateExpression => agg
+  })
+  val isPartitionDataOnly = aggFunctions.isEmpty || 
aggFunctions.forall { agg =>
+agg.isDistinct || (agg.aggregateFunction match {
+  case _: Max => true
+  case _: Min => true
+  case _ => false
+})
+  }
+  if (isPartitionDataOnly) {
+a.withNewChildren(Seq(usePartitionData(child, relation)))
+  } else {
+a
+  }
+} else {
+  a
+}
+}
+  }
+
+  private def usePartitionData(child: LogicalPlan, relation: LogicalPlan): 
LogicalPlan = {
+child transform {
+  case plan if plan eq relation =>
+relation match {
+  case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _) =>
+val partColumns = 
fsRelation.partitionSchema.map(_.name.toLowerCase).toSet
+val partAttrs = l.output.filter(a => 
partColumns.contains(a.name.toLowerCase))
+val partitionData = fsRelation.location.listFiles(Nil)
+LocalRelation(partAttrs, partitionData.map(_.values))
+
+  case relation: CatalogRelation =>
+val partColumns = 
relation.catalogTable.partitionColumnNames.map(_.toLowerCase).toSet
+val partAttrs = relation.output.filter(a => 
partColumns.contains(a.name.toLowerCase))
+val partitionData = 
catalog.listPartitions(relation.catalogTable.identifier).map { p =>
+  InternalRow.fromSeq(partAttrs.map { attr =>
+Cast(Literal(p.spec(attr.name)), attr.dataType).eval()
+  })
+}
+LocalRelation(partAttrs, partitionData)
+
+  case _ => throw new IllegalStateException()
+}
+}
+  }
+
+  object PartitionedRelation {
--- End diff --

we need comment explaining what this does


---
If 

[GitHub] spark pull request #13494: [SPARK-15752] [SQL] support optimization for meta...

2016-07-05 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/13494#discussion_r69674988
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnly.scala
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.{CatalystConf, InternalRow}
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, 
SessionCatalog}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+
+/**
+ * When scanning only partition columns, get results based on partition 
data without scanning files.
+ * It's used for operators that only need distinct values. Currently only 
[[Aggregate]] operator
+ * which satisfy the following conditions are supported:
+ * 1. aggregate expression is partition columns.
+ *  e.g. SELECT col FROM tbl GROUP BY col.
+ * 2. aggregate function on partition columns with DISTINCT.
+ *  e.g. SELECT count(DISTINCT col1) FROM tbl GROUP BY col2.
+ * 3. aggregate function on partition columns which have same result w or 
w/o DISTINCT keyword.
+ *  e.g. SELECT Max(col2) FROM tbl GROUP BY col1.
+ */
+case class OptimizeMetadataOnly(
+catalog: SessionCatalog,
+conf: CatalystConf) extends Rule[LogicalPlan] {
+
+  def apply(plan: LogicalPlan): LogicalPlan = {
+if (!conf.optimizerMetadataOnly) {
+  return plan
+}
+
+plan.transform {
+  case a @ Aggregate(_, aggExprs, child @ 
PartitionedRelation(partAttrs, relation)) =>
+if (a.references.subsetOf(partAttrs)) {
+  val aggFunctions = aggExprs.flatMap(_.collect {
+case agg: AggregateExpression => agg
+  })
+  val isPartitionDataOnly = aggFunctions.isEmpty || 
aggFunctions.forall { agg =>
+agg.isDistinct || (agg.aggregateFunction match {
+  case _: Max => true
+  case _: Min => true
+  case _ => false
+})
+  }
+  if (isPartitionDataOnly) {
+a.withNewChildren(Seq(usePartitionData(child, relation)))
+  } else {
+a
+  }
+} else {
+  a
+}
+}
+  }
+
+  private def usePartitionData(child: LogicalPlan, relation: LogicalPlan): 
LogicalPlan = {
+child transform {
+  case plan if plan eq relation =>
+relation match {
+  case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _) =>
+val partColumns = 
fsRelation.partitionSchema.map(_.name.toLowerCase).toSet
+val partAttrs = l.output.filter(a => 
partColumns.contains(a.name.toLowerCase))
+val partitionData = fsRelation.location.listFiles(Nil)
+LocalRelation(partAttrs, partitionData.map(_.values))
+
+  case relation: CatalogRelation =>
+val partColumns = 
relation.catalogTable.partitionColumnNames.map(_.toLowerCase).toSet
+val partAttrs = relation.output.filter(a => 
partColumns.contains(a.name.toLowerCase))
+val partitionData = 
catalog.listPartitions(relation.catalogTable.identifier).map { p =>
+  InternalRow.fromSeq(partAttrs.map { attr =>
+Cast(Literal(p.spec(attr.name)), attr.dataType).eval()
+  })
+}
+LocalRelation(partAttrs, partitionData)
+
+  case _ => throw new IllegalStateException()
+}
+}
+  }
+
+  object PartitionedRelation {
--- End diff --

also need to document what the returned tuple means

[GitHub] spark pull request #13494: [SPARK-15752] [SQL] support optimization for meta...

2016-07-05 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/13494#discussion_r69674955
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnly.scala
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.{CatalystConf, InternalRow}
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, 
SessionCatalog}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+
+/**
+ * When scanning only partition columns, get results based on partition 
data without scanning files.
+ * It's used for operators that only need distinct values. Currently only 
[[Aggregate]] operator
+ * which satisfy the following conditions are supported:
+ * 1. aggregate expression is partition columns.
+ *  e.g. SELECT col FROM tbl GROUP BY col.
+ * 2. aggregate function on partition columns with DISTINCT.
+ *  e.g. SELECT count(DISTINCT col1) FROM tbl GROUP BY col2.
+ * 3. aggregate function on partition columns which have same result w or 
w/o DISTINCT keyword.
+ *  e.g. SELECT Max(col2) FROM tbl GROUP BY col1.
+ */
+case class OptimizeMetadataOnly(
+catalog: SessionCatalog,
+conf: CatalystConf) extends Rule[LogicalPlan] {
+
+  def apply(plan: LogicalPlan): LogicalPlan = {
+if (!conf.optimizerMetadataOnly) {
+  return plan
+}
+
+plan.transform {
+  case a @ Aggregate(_, aggExprs, child @ 
PartitionedRelation(partAttrs, relation)) =>
+if (a.references.subsetOf(partAttrs)) {
+  val aggFunctions = aggExprs.flatMap(_.collect {
+case agg: AggregateExpression => agg
+  })
+  val isPartitionDataOnly = aggFunctions.isEmpty || 
aggFunctions.forall { agg =>
+agg.isDistinct || (agg.aggregateFunction match {
+  case _: Max => true
+  case _: Min => true
+  case _ => false
+})
+  }
+  if (isPartitionDataOnly) {
+a.withNewChildren(Seq(usePartitionData(child, relation)))
+  } else {
+a
+  }
+} else {
+  a
+}
+}
+  }
+
+  private def usePartitionData(child: LogicalPlan, relation: LogicalPlan): 
LogicalPlan = {
--- End diff --

actually we should probably just rename this function to make it more self 
evident, e.g. replaceTableScanWithPartitionMetadata



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13494: [SPARK-15752] [SQL] support optimization for meta...

2016-07-05 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/13494#discussion_r69674923
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnly.scala
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.{CatalystConf, InternalRow}
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, 
SessionCatalog}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+
+/**
+ * When scanning only partition columns, get results based on partition 
data without scanning files.
+ * It's used for operators that only need distinct values. Currently only 
[[Aggregate]] operator
+ * which satisfy the following conditions are supported:
+ * 1. aggregate expression is partition columns.
+ *  e.g. SELECT col FROM tbl GROUP BY col.
+ * 2. aggregate function on partition columns with DISTINCT.
+ *  e.g. SELECT count(DISTINCT col1) FROM tbl GROUP BY col2.
+ * 3. aggregate function on partition columns which have same result w or 
w/o DISTINCT keyword.
+ *  e.g. SELECT Max(col2) FROM tbl GROUP BY col1.
+ */
+case class OptimizeMetadataOnly(
+catalog: SessionCatalog,
+conf: CatalystConf) extends Rule[LogicalPlan] {
+
+  def apply(plan: LogicalPlan): LogicalPlan = {
+if (!conf.optimizerMetadataOnly) {
+  return plan
+}
+
+plan.transform {
+  case a @ Aggregate(_, aggExprs, child @ 
PartitionedRelation(partAttrs, relation)) =>
+if (a.references.subsetOf(partAttrs)) {
+  val aggFunctions = aggExprs.flatMap(_.collect {
+case agg: AggregateExpression => agg
+  })
+  val isPartitionDataOnly = aggFunctions.isEmpty || 
aggFunctions.forall { agg =>
+agg.isDistinct || (agg.aggregateFunction match {
--- End diff --

does this work for
```
select col1, col2 from table group by col1
```
?

It'd be good to handle that too.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13494: [SPARK-15752] [SQL] support optimization for meta...

2016-07-05 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/13494#discussion_r69674624
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnly.scala
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.{CatalystConf, InternalRow}
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, 
SessionCatalog}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+
+/**
+ * When scanning only partition columns, get results based on partition 
data without scanning files.
+ * It's used for operators that only need distinct values. Currently only 
[[Aggregate]] operator
+ * which satisfy the following conditions are supported:
+ * 1. aggregate expression is partition columns.
+ *  e.g. SELECT col FROM tbl GROUP BY col.
+ * 2. aggregate function on partition columns with DISTINCT.
+ *  e.g. SELECT count(DISTINCT col1) FROM tbl GROUP BY col2.
+ * 3. aggregate function on partition columns which have same result w or 
w/o DISTINCT keyword.
+ *  e.g. SELECT Max(col2) FROM tbl GROUP BY col1.
+ */
+case class OptimizeMetadataOnly(
+catalog: SessionCatalog,
+conf: CatalystConf) extends Rule[LogicalPlan] {
+
+  def apply(plan: LogicalPlan): LogicalPlan = {
+if (!conf.optimizerMetadataOnly) {
+  return plan
+}
+
+plan.transform {
+  case a @ Aggregate(_, aggExprs, child @ 
PartitionedRelation(partAttrs, relation)) =>
+if (a.references.subsetOf(partAttrs)) {
+  val aggFunctions = aggExprs.flatMap(_.collect {
+case agg: AggregateExpression => agg
+  })
+  val isPartitionDataOnly = aggFunctions.isEmpty || 
aggFunctions.forall { agg =>
--- End diff --

i think `aggFunctions.isEmpty` is not necessary, since forall returns true 
if aggFunctions is empty?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13494: [SPARK-15752] [SQL] support optimization for meta...

2016-07-05 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/13494#discussion_r69674487
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnly.scala
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.{CatalystConf, InternalRow}
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, 
SessionCatalog}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+
+/**
+ * When scanning only partition columns, get results based on partition 
data without scanning files.
+ * It's used for operators that only need distinct values. Currently only 
[[Aggregate]] operator
+ * which satisfy the following conditions are supported:
+ * 1. aggregate expression is partition columns.
+ *  e.g. SELECT col FROM tbl GROUP BY col.
+ * 2. aggregate function on partition columns with DISTINCT.
+ *  e.g. SELECT count(DISTINCT col1) FROM tbl GROUP BY col2.
+ * 3. aggregate function on partition columns which have same result w or 
w/o DISTINCT keyword.
+ *  e.g. SELECT Max(col2) FROM tbl GROUP BY col1.
+ */
+case class OptimizeMetadataOnly(
+catalog: SessionCatalog,
+conf: CatalystConf) extends Rule[LogicalPlan] {
+
+  def apply(plan: LogicalPlan): LogicalPlan = {
+if (!conf.optimizerMetadataOnly) {
+  return plan
+}
+
+plan.transform {
+  case a @ Aggregate(_, aggExprs, child @ 
PartitionedRelation(partAttrs, relation)) =>
+if (a.references.subsetOf(partAttrs)) {
+  val aggFunctions = aggExprs.flatMap(_.collect {
+case agg: AggregateExpression => agg
+  })
+  val isPartitionDataOnly = aggFunctions.isEmpty || 
aggFunctions.forall { agg =>
+agg.isDistinct || (agg.aggregateFunction match {
+  case _: Max => true
+  case _: Min => true
+  case _ => false
+})
+  }
+  if (isPartitionDataOnly) {
+a.withNewChildren(Seq(usePartitionData(child, relation)))
+  } else {
+a
+  }
+} else {
+  a
+}
+}
+  }
+
+  private def usePartitionData(child: LogicalPlan, relation: LogicalPlan): 
LogicalPlan = {
+child transform {
+  case plan if plan eq relation =>
+relation match {
+  case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _) =>
+val partColumns = 
fsRelation.partitionSchema.map(_.name.toLowerCase).toSet
+val partAttrs = l.output.filter(a => 
partColumns.contains(a.name.toLowerCase))
+val partitionData = fsRelation.location.listFiles(Nil)
+LocalRelation(partAttrs, partitionData.map(_.values))
+
+  case relation: CatalogRelation =>
+val partColumns = 
relation.catalogTable.partitionColumnNames.map(_.toLowerCase).toSet
+val partAttrs = relation.output.filter(a => 
partColumns.contains(a.name.toLowerCase))
+val partitionData = 
catalog.listPartitions(relation.catalogTable.identifier).map { p =>
+  InternalRow.fromSeq(partAttrs.map { attr =>
+Cast(Literal(p.spec(attr.name)), attr.dataType).eval()
+  })
+}
+LocalRelation(partAttrs, partitionData)
+
+  case _ => throw new IllegalStateException()
--- End diff --

this cannot be hit unless there is a bug in Spark right? if that's the 
case, add a comment saying that.


---
If your project 

[GitHub] spark pull request #13494: [SPARK-15752] [SQL] support optimization for meta...

2016-07-05 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/13494#discussion_r69674388
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnly.scala
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.{CatalystConf, InternalRow}
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, 
SessionCatalog}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+
+/**
+ * When scanning only partition columns, get results based on partition 
data without scanning files.
+ * It's used for operators that only need distinct values. Currently only 
[[Aggregate]] operator
+ * which satisfy the following conditions are supported:
+ * 1. aggregate expression is partition columns.
+ *  e.g. SELECT col FROM tbl GROUP BY col.
+ * 2. aggregate function on partition columns with DISTINCT.
+ *  e.g. SELECT count(DISTINCT col1) FROM tbl GROUP BY col2.
+ * 3. aggregate function on partition columns which have same result w or 
w/o DISTINCT keyword.
+ *  e.g. SELECT Max(col2) FROM tbl GROUP BY col1.
+ */
+case class OptimizeMetadataOnly(
+catalog: SessionCatalog,
+conf: CatalystConf) extends Rule[LogicalPlan] {
+
+  def apply(plan: LogicalPlan): LogicalPlan = {
+if (!conf.optimizerMetadataOnly) {
+  return plan
+}
+
+plan.transform {
+  case a @ Aggregate(_, aggExprs, child @ 
PartitionedRelation(partAttrs, relation)) =>
+if (a.references.subsetOf(partAttrs)) {
+  val aggFunctions = aggExprs.flatMap(_.collect {
+case agg: AggregateExpression => agg
+  })
+  val isPartitionDataOnly = aggFunctions.isEmpty || 
aggFunctions.forall { agg =>
+agg.isDistinct || (agg.aggregateFunction match {
+  case _: Max => true
+  case _: Min => true
+  case _ => false
+})
+  }
+  if (isPartitionDataOnly) {
+a.withNewChildren(Seq(usePartitionData(child, relation)))
+  } else {
+a
+  }
+} else {
+  a
+}
+}
+  }
+
+  private def usePartitionData(child: LogicalPlan, relation: LogicalPlan): 
LogicalPlan = {
+child transform {
+  case plan if plan eq relation =>
+relation match {
+  case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _) =>
+val partColumns = 
fsRelation.partitionSchema.map(_.name.toLowerCase).toSet
+val partAttrs = l.output.filter(a => 
partColumns.contains(a.name.toLowerCase))
+val partitionData = fsRelation.location.listFiles(Nil)
--- End diff --

use named argument, e.g. 
```
val partitionData = fsRelation.location.listFiles(filters = Nil)
```

for a while I was wondering what that argument does.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13494: [SPARK-15752] [SQL] support optimization for meta...

2016-07-05 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/13494#discussion_r69674280
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnly.scala
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.{CatalystConf, InternalRow}
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, 
SessionCatalog}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+
+/**
+ * When scanning only partition columns, get results based on partition 
data without scanning files.
+ * It's used for operators that only need distinct values. Currently only 
[[Aggregate]] operator
+ * which satisfy the following conditions are supported:
+ * 1. aggregate expression is partition columns.
+ *  e.g. SELECT col FROM tbl GROUP BY col.
+ * 2. aggregate function on partition columns with DISTINCT.
+ *  e.g. SELECT count(DISTINCT col1) FROM tbl GROUP BY col2.
+ * 3. aggregate function on partition columns which have same result w or 
w/o DISTINCT keyword.
+ *  e.g. SELECT Max(col2) FROM tbl GROUP BY col1.
+ */
+case class OptimizeMetadataOnly(
+catalog: SessionCatalog,
+conf: CatalystConf) extends Rule[LogicalPlan] {
+
+  def apply(plan: LogicalPlan): LogicalPlan = {
+if (!conf.optimizerMetadataOnly) {
+  return plan
+}
+
+plan.transform {
+  case a @ Aggregate(_, aggExprs, child @ 
PartitionedRelation(partAttrs, relation)) =>
+if (a.references.subsetOf(partAttrs)) {
+  val aggFunctions = aggExprs.flatMap(_.collect {
+case agg: AggregateExpression => agg
+  })
+  val isPartitionDataOnly = aggFunctions.isEmpty || 
aggFunctions.forall { agg =>
+agg.isDistinct || (agg.aggregateFunction match {
+  case _: Max => true
+  case _: Min => true
+  case _ => false
+})
+  }
+  if (isPartitionDataOnly) {
+a.withNewChildren(Seq(usePartitionData(child, relation)))
+  } else {
+a
+  }
+} else {
+  a
+}
+}
+  }
+
+  private def usePartitionData(child: LogicalPlan, relation: LogicalPlan): 
LogicalPlan = {
--- End diff --

we need comment explaining what this function does


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13494: [SPARK-15752] [SQL] support optimization for meta...

2016-07-05 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/13494#discussion_r69674209
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnly.scala
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.{CatalystConf, InternalRow}
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, 
SessionCatalog}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+
+/**
+ * When scanning only partition columns, get results based on partition 
data without scanning files.
+ * It's used for operators that only need distinct values. Currently only 
[[Aggregate]] operator
+ * which satisfy the following conditions are supported:
+ * 1. aggregate expression is partition columns.
+ *  e.g. SELECT col FROM tbl GROUP BY col.
+ * 2. aggregate function on partition columns with DISTINCT.
+ *  e.g. SELECT count(DISTINCT col1) FROM tbl GROUP BY col2.
--- End diff --

OK I would say something like the following to make it very explicit.

```
This rule optimizes the execution of queries that can be answered by 
looking only at partition-level metadata.
This applies when all the columns scanned are partition columns, and the 
query has an aggregate operator that satisfies the following conditions:
1.
2.
3.
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13494: [SPARK-15752] [SQL] support optimization for meta...

2016-07-05 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/13494#discussion_r69673088
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnly.scala
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.{CatalystConf, InternalRow}
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, 
SessionCatalog}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+
+/**
+ * When scanning only partition columns, get results based on partition 
data without scanning files.
+ * It's used for operators that only need distinct values. Currently only 
[[Aggregate]] operator
+ * which satisfy the following conditions are supported:
+ * 1. aggregate expression is partition columns.
+ *  e.g. SELECT col FROM tbl GROUP BY col.
+ * 2. aggregate function on partition columns with DISTINCT.
+ *  e.g. SELECT count(DISTINCT col1) FROM tbl GROUP BY col2.
--- End diff --

Yea, it's indicated by the first sentence of the class doc: `When scanning 
only partition columns`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13494: [SPARK-15752] [SQL] support optimization for meta...

2016-07-05 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/13494#discussion_r69673047
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnly.scala
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.{CatalystConf, InternalRow}
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, 
SessionCatalog}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+
+/**
+ * When scanning only partition columns, get results based on partition 
data without scanning files.
+ * It's used for operators that only need distinct values. Currently only 
[[Aggregate]] operator
+ * which satisfy the following conditions are supported:
+ * 1. aggregate expression is partition columns.
+ *  e.g. SELECT col FROM tbl GROUP BY col.
+ * 2. aggregate function on partition columns with DISTINCT.
+ *  e.g. SELECT count(DISTINCT col1) FROM tbl GROUP BY col2.
+ * 3. aggregate function on partition columns which have same result w or 
w/o DISTINCT keyword.
+ *  e.g. SELECT Max(col2) FROM tbl GROUP BY col1.
+ */
+case class OptimizeMetadataOnly(
--- End diff --

OptimizeMetadataOnlyQuery

if you don't add "query" it is not clear to me this actually optimizing a 
query or optimizing some metadata operation.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13494: [SPARK-15752] [SQL] support optimization for meta...

2016-07-05 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/13494#discussion_r69672989
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnly.scala
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.{CatalystConf, InternalRow}
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, 
SessionCatalog}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+
+/**
+ * When scanning only partition columns, get results based on partition 
data without scanning files.
+ * It's used for operators that only need distinct values. Currently only 
[[Aggregate]] operator
+ * which satisfy the following conditions are supported:
+ * 1. aggregate expression is partition columns.
+ *  e.g. SELECT col FROM tbl GROUP BY col.
+ * 2. aggregate function on partition columns with DISTINCT.
+ *  e.g. SELECT count(DISTINCT col1) FROM tbl GROUP BY col2.
--- End diff --

does col2 need to be a partition column?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13494: [SPARK-15752] [SQL] support optimization for meta...

2016-07-05 Thread lianhuiwang
Github user lianhuiwang commented on a diff in the pull request:

https://github.com/apache/spark/pull/13494#discussion_r69670547
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala ---
@@ -30,6 +30,7 @@ class SparkOptimizer(
   extends Optimizer(catalog, conf) {
 
   override def batches: Seq[Batch] = super.batches :+
+Batch("Metadata Only Optimization", Once, 
MetadataOnlyOptimizer(catalog, conf)) :+
--- End diff --

ok, Thanks. I will update it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13494: [SPARK-15752] [SQL] support optimization for meta...

2016-07-05 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/13494#discussion_r69657897
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala ---
@@ -30,6 +30,7 @@ class SparkOptimizer(
   extends Optimizer(catalog, conf) {
 
   override def batches: Seq[Batch] = super.batches :+
+Batch("Metadata Only Optimization", Once, 
MetadataOnlyOptimizer(catalog, conf)) :+
--- End diff --

"Optimize Metadata Only Query"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13494: [SPARK-15752] [SQL] support optimization for meta...

2016-07-04 Thread lianhuiwang
Github user lianhuiwang commented on a diff in the pull request:

https://github.com/apache/spark/pull/13494#discussion_r69440435
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/MetadataOnlyOptimizerSuite.scala
 ---
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSQLContext
+
+class MetadataOnlyOptimizerSuite extends QueryTest with SharedSQLContext {
+  import testImplicits._
+
+  override def beforeAll(): Unit = {
+super.beforeAll()
+val data = (1 to 10).map(i => (i, s"data-$i", i % 2, if ((i % 2) == 0) 
"even" else "odd"))
+  .toDF("id", "data", "partId", "part")
+data.write.partitionBy("partId", 
"part").mode("append").saveAsTable("srcpart_15752")
+  }
+
+  private def checkWithMetadataOnly(df: DataFrame): Unit = {
+val localRelations = df.queryExecution.optimizedPlan.collect {
+  case l @ LocalRelation(_, _) => l
+}
+assert(localRelations.size == 1)
+  }
+
+  private def checkWithoutMetadataOnly(df: DataFrame): Unit = {
+val localRelations = df.queryExecution.optimizedPlan.collect{
+  case l @ LocalRelation(_, _) => l
+}
+assert(localRelations.size == 0)
+  }
+
+  test("spark-15752 metadata only optimizer for partition table") {
+withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> "true") {
+  checkWithMetadataOnly(sql("select part from srcpart_15752 where part 
= 0 group by part"))
+  checkWithMetadataOnly(sql("select max(part) from srcpart_15752"))
+  checkWithMetadataOnly(sql("select max(part) from srcpart_15752 where 
part = 0"))
+  checkWithMetadataOnly(
+sql("select part, min(partId) from srcpart_15752 where part = 0 
group by part"))
+  checkWithMetadataOnly(
+sql("select max(x) from (select part + 1 as x from srcpart_15752 
where part = 1) t"))
+  checkWithMetadataOnly(sql("select distinct part from srcpart_15752"))
+  checkWithMetadataOnly(sql("select distinct part, partId from 
srcpart_15752"))
+  checkWithMetadataOnly(
+sql("select distinct x from (select part + 1 as x from 
srcpart_15752 where part = 0) t"))
+
+  // Now donot support metadata only optimizer
+  checkWithoutMetadataOnly(sql("select part, max(id) from 
srcpart_15752 group by part"))
+  checkWithoutMetadataOnly(sql("select distinct part, id from 
srcpart_15752"))
+  checkWithoutMetadataOnly(sql("select part, sum(partId) from 
srcpart_15752 group by part"))
+  checkWithoutMetadataOnly(
+sql("select part from srcpart_15752 where part = 1 group by 
rollup(part)"))
+  checkWithoutMetadataOnly(
+sql("select part from (select part from srcpart_15752 where part = 
0 union all " +
--- End diff --

Yes, I think it is not difficult.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13494: [SPARK-15752] [SQL] support optimization for meta...

2016-07-04 Thread lianhuiwang
Github user lianhuiwang commented on a diff in the pull request:

https://github.com/apache/spark/pull/13494#discussion_r69440261
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala ---
@@ -30,6 +30,7 @@ class SparkOptimizer(
   extends Optimizer(catalog, conf) {
 
   override def batches: Seq[Batch] = super.batches :+
+Batch("Metadata Only Optimization", Once, 
MetadataOnlyOptimizer(catalog, conf)) :+
--- End diff --

BTW: I find that currently Hive/Presto/Impala have used Metadata Only.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13494: [SPARK-15752] [SQL] support optimization for meta...

2016-07-04 Thread lianhuiwang
Github user lianhuiwang commented on a diff in the pull request:

https://github.com/apache/spark/pull/13494#discussion_r69439940
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/MetadataOnlyOptimizer.scala
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.{CatalystConf, InternalRow}
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, 
SessionCatalog}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+
+/**
+ * When scanning only partition columns, get results based on partition 
data without scanning files.
+ * It's used for operators that only need distinct values. Currently only 
[[Aggregate]] operator
+ * which satisfy the following conditions are supported:
+ * 1. aggregate expression is partition columns.
+ *  e.g. SELECT col FROM tbl GROUP BY col.
+ * 2. aggregate function on partition columns with DISTINCT.
+ *  e.g. SELECT count(DISTINCT col) FROM tbl GROUP BY col.
+ * 3. aggregate function on partition columns which have same result w or 
w/o DISTINCT keyword.
+ *  e.g. SELECT Max(col2) FROM tbl GROUP BY col1.
+ */
+case class MetadataOnlyOptimizer(
+catalog: SessionCatalog,
+conf: CatalystConf) extends Rule[LogicalPlan] {
+
+  def apply(plan: LogicalPlan): LogicalPlan = {
+if (!conf.optimizerMetadataOnly) {
+  return plan
+}
+
+plan.transform {
+  case a @ Aggregate(_, aggExprs, child @ 
PartitionedRelation(partAttrs, relation)) =>
+if (a.references.subsetOf(partAttrs)) {
+  val aggFunctions = aggExprs.flatMap(_.collect {
+case agg: AggregateExpression => agg
+  })
+  val isPartitionDataOnly = aggFunctions.isEmpty || 
aggFunctions.forall { agg =>
+agg.isDistinct || (agg.aggregateFunction match {
+  case _: Max => true
+  case _: Min => true
+  case _ => false
+})
+  }
+  if (isPartitionDataOnly) {
+a.withNewChildren(Seq(usePartitionData(child, relation)))
+  } else {
+a
+  }
+} else {
+  a
+}
+}
+  }
+
+  private def usePartitionData(child: LogicalPlan, relation: LogicalPlan): 
LogicalPlan = {
+child transform {
+  case plan if plan eq relation =>
+relation match {
+  case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _) =>
+val partColumns = 
fsRelation.partitionSchema.map(_.name.toLowerCase).toSet
+val partAttrs = l.output.filter(a => 
partColumns.contains(a.name.toLowerCase))
+val partitionData = fsRelation.location.listFiles(Nil)
+LocalRelation(partAttrs, partitionData.map(_.values))
--- End diff --

How about sizeInBytes of LocalRelation's statistics is used to determine 
the parallelism? Initial code is 
https://github.com/apache/spark/pull/13979/commits/2ca01f26df7572251136d2c059299f846cf8a3f1.
 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13494: [SPARK-15752] [SQL] support optimization for meta...

2016-07-03 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/13494#discussion_r69395021
  
--- Diff: sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala 
---
@@ -2865,4 +2865,15 @@ class SQLQuerySuite extends QueryTest with 
SharedSQLContext {
   sql(s"SELECT '$literal' AS DUMMY"),
   Row(s"$expected") :: Nil)
   }
+
+  test("spark-15752 metadata only optimizer for datasource table") {
+val data = (1 to 10).map(i => (i, s"data-$i", i % 2, if ((i % 2) == 0) 
"even" else "odd"))
--- End diff --

also use `withSQLConf` here to make sure the optimization is enabled.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13494: [SPARK-15752] [SQL] support optimization for meta...

2016-07-03 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/13494#discussion_r69395014
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/MetadataOnlyOptimizerSuite.scala
 ---
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSQLContext
+
+class MetadataOnlyOptimizerSuite extends QueryTest with SharedSQLContext {
+  import testImplicits._
+
+  override def beforeAll(): Unit = {
+super.beforeAll()
+val data = (1 to 10).map(i => (i, s"data-$i", i % 2, if ((i % 2) == 0) 
"even" else "odd"))
+  .toDF("id", "data", "partId", "part")
+data.write.partitionBy("partId", 
"part").mode("append").saveAsTable("srcpart_15752")
+  }
+
+  private def checkWithMetadataOnly(df: DataFrame): Unit = {
+val localRelations = df.queryExecution.optimizedPlan.collect {
+  case l @ LocalRelation(_, _) => l
+}
+assert(localRelations.size == 1)
+  }
+
+  private def checkWithoutMetadataOnly(df: DataFrame): Unit = {
+val localRelations = df.queryExecution.optimizedPlan.collect{
+  case l @ LocalRelation(_, _) => l
+}
+assert(localRelations.size == 0)
+  }
+
+  test("spark-15752 metadata only optimizer for partition table") {
+withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> "true") {
+  checkWithMetadataOnly(sql("select part from srcpart_15752 where part 
= 0 group by part"))
+  checkWithMetadataOnly(sql("select max(part) from srcpart_15752"))
+  checkWithMetadataOnly(sql("select max(part) from srcpart_15752 where 
part = 0"))
+  checkWithMetadataOnly(
+sql("select part, min(partId) from srcpart_15752 where part = 0 
group by part"))
+  checkWithMetadataOnly(
+sql("select max(x) from (select part + 1 as x from srcpart_15752 
where part = 1) t"))
+  checkWithMetadataOnly(sql("select distinct part from srcpart_15752"))
+  checkWithMetadataOnly(sql("select distinct part, partId from 
srcpart_15752"))
+  checkWithMetadataOnly(
+sql("select distinct x from (select part + 1 as x from 
srcpart_15752 where part = 0) t"))
+
+  // Now donot support metadata only optimizer
+  checkWithoutMetadataOnly(sql("select part, max(id) from 
srcpart_15752 group by part"))
+  checkWithoutMetadataOnly(sql("select distinct part, id from 
srcpart_15752"))
+  checkWithoutMetadataOnly(sql("select part, sum(partId) from 
srcpart_15752 group by part"))
+  checkWithoutMetadataOnly(
+sql("select part from srcpart_15752 where part = 1 group by 
rollup(part)"))
+  checkWithoutMetadataOnly(
+sql("select part from (select part from srcpart_15752 where part = 
0 union all " +
--- End diff --

the last 2 cases can be added in follow-up PRs :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13494: [SPARK-15752] [SQL] support optimization for meta...

2016-07-03 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/13494#discussion_r69394970
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/MetadataOnlyOptimizerSuite.scala
 ---
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSQLContext
+
+class MetadataOnlyOptimizerSuite extends QueryTest with SharedSQLContext {
+  import testImplicits._
+
+  override def beforeAll(): Unit = {
+super.beforeAll()
+val data = (1 to 10).map(i => (i, s"data-$i", i % 2, if ((i % 2) == 0) 
"even" else "odd"))
+  .toDF("id", "data", "partId", "part")
+data.write.partitionBy("partId", 
"part").mode("append").saveAsTable("srcpart_15752")
--- End diff --

The session is shared among all test suites, so we should drop the table 
after all tests here, or we may pollute other test suites.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13494: [SPARK-15752] [SQL] support optimization for meta...

2016-07-03 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/13494#discussion_r69394921
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala ---
@@ -30,6 +30,7 @@ class SparkOptimizer(
   extends Optimizer(catalog, conf) {
 
   override def batches: Seq[Batch] = super.batches :+
+Batch("Metadata Only Optimization", Once, 
MetadataOnlyOptimizer(catalog, conf)) :+
--- End diff --

`Metadata Only` seems not so intuitive, any ideas here? cc @yhuai 
@liancheng 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13494: [SPARK-15752] [SQL] support optimization for meta...

2016-07-03 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/13494#discussion_r69394890
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/MetadataOnlyOptimizer.scala
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.{CatalystConf, InternalRow}
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, 
SessionCatalog}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+
+/**
+ * When scanning only partition columns, get results based on partition 
data without scanning files.
+ * It's used for operators that only need distinct values. Currently only 
[[Aggregate]] operator
+ * which satisfy the following conditions are supported:
+ * 1. aggregate expression is partition columns.
+ *  e.g. SELECT col FROM tbl GROUP BY col.
+ * 2. aggregate function on partition columns with DISTINCT.
+ *  e.g. SELECT count(DISTINCT col) FROM tbl GROUP BY col.
+ * 3. aggregate function on partition columns which have same result w or 
w/o DISTINCT keyword.
+ *  e.g. SELECT Max(col2) FROM tbl GROUP BY col1.
+ */
+case class MetadataOnlyOptimizer(
+catalog: SessionCatalog,
+conf: CatalystConf) extends Rule[LogicalPlan] {
+
+  def apply(plan: LogicalPlan): LogicalPlan = {
+if (!conf.optimizerMetadataOnly) {
+  return plan
+}
+
+plan.transform {
+  case a @ Aggregate(_, aggExprs, child @ 
PartitionedRelation(partAttrs, relation)) =>
+if (a.references.subsetOf(partAttrs)) {
+  val aggFunctions = aggExprs.flatMap(_.collect {
+case agg: AggregateExpression => agg
+  })
+  val isPartitionDataOnly = aggFunctions.isEmpty || 
aggFunctions.forall { agg =>
+agg.isDistinct || (agg.aggregateFunction match {
+  case _: Max => true
+  case _: Min => true
+  case _ => false
+})
+  }
+  if (isPartitionDataOnly) {
+a.withNewChildren(Seq(usePartitionData(child, relation)))
+  } else {
+a
+  }
+} else {
+  a
+}
+}
+  }
+
+  private def usePartitionData(child: LogicalPlan, relation: LogicalPlan): 
LogicalPlan = {
+child transform {
+  case plan if plan eq relation =>
+relation match {
+  case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _) =>
+val partColumns = 
fsRelation.partitionSchema.map(_.name.toLowerCase).toSet
+val partAttrs = l.output.filter(a => 
partColumns.contains(a.name.toLowerCase))
+val partitionData = fsRelation.location.listFiles(Nil)
+LocalRelation(partAttrs, partitionData.map(_.values))
--- End diff --

This is something we need to discuss, there may be a lot of partition 
values and using `LocalRelation` may not give enough parallelism here.
cc @yhuai @liancheng 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13494: [SPARK-15752] [SQL] support optimization for meta...

2016-07-03 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/13494#discussion_r69394848
  
--- Diff: sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala 
---
@@ -2865,4 +2865,15 @@ class SQLQuerySuite extends QueryTest with 
SharedSQLContext {
   sql(s"SELECT '$literal' AS DUMMY"),
   Row(s"$expected") :: Nil)
   }
+
+  test("spark-15752 metadata only optimizer for datasource table") {
+val data = (1 to 10).map(i => (i, s"data-$i", i % 2, if ((i % 2) == 0) 
"even" else "odd"))
--- End diff --

put these codes inside `withTable` when we create a table in the tests, 
then our framework will clean up the table automatically.
```
withTable("xxx") {
  ...
}
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13494: [SPARK-15752] [SQL] support optimization for meta...

2016-07-03 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/13494#discussion_r69394772
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/MetadataOnlyOptimizer.scala
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.{CatalystConf, InternalRow}
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, 
SessionCatalog}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+
+/**
+ * When scanning only partition columns, get results based on partition 
data without scanning files.
+ * It's used for operators that only need distinct values. Currently only 
[[Aggregate]] operator
+ * which satisfy the following conditions are supported:
+ * 1. aggregate expression is partition columns.
+ *  e.g. SELECT col FROM tbl GROUP BY col.
+ * 2. aggregate function on partition columns with DISTINCT.
+ *  e.g. SELECT count(DISTINCT col) FROM tbl GROUP BY col.
--- End diff --

this example is wrong, we can not aggregate on grouping columns, it should 
be `SELECT count(DISTINCT col1) FROM tbl GROUP BY col2`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13494: [SPARK-15752] [SQL] support optimization for meta...

2016-06-28 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/13494#discussion_r68882374
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/MetadataOnlyOptimizer.scala
 ---
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.{AnalysisException, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, 
SessionCatalog}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+
+/**
+ * When scanning only partition columns, get results based on metadata 
without scanning files.
+ * It is used for distinct, distinct aggregations or distinct-like 
aggregations(example: Max/Min).
+ * First of all, scanning only partition columns are required, then the 
rule does the following
+ * things here:
+ * 1. aggregate expression is partition columns,
+ *  e.g. SELECT col FROM tbl GROUP BY col or SELECT col FROM tbl GROUP BY 
cube(col).
+ * 2. aggregate function on partition columns with DISTINCT,
+ *  e.g. SELECT count(DISTINCT col) FROM tbl GROUP BY col.
+ * 3. aggregate function on partition columns which have same result with 
DISTINCT keyword.
+ *  e.g. SELECT Max(col2) FROM tbl GROUP BY col1.
+ */
+case class MetadataOnlyOptimizer(
+sparkSession: SparkSession,
+catalog: SessionCatalog) extends Rule[LogicalPlan] {
+
+  private def canSupportMetadataOnly(a: Aggregate): Boolean = {
+val aggregateExpressions = a.aggregateExpressions.flatMap { expr =>
+  expr.collect {
+case agg: AggregateExpression => agg
+  }
+}.distinct
+if (aggregateExpressions.isEmpty) {
+  // Support for aggregate that has no aggregateFunction when 
expressions are partition columns
+  // example: select partitionCol from table group by partitionCol.
+  // Moreover, multiple-distinct has been rewritted into it by 
RewriteDistinctAggregates.
+  true
+} else {
+  aggregateExpressions.forall { agg =>
+if (agg.isDistinct) {
+  true
+} else {
+  // If function can be evaluated on just the distinct values of a 
column, it can be used
+  // by metadata-only optimizer.
+  agg.aggregateFunction match {
+case max: Max => true
+case min: Min => true
+case hyperLog: HyperLogLogPlusPlus => true
+case _ => false
+  }
+}
+  }
+}
+  }
+
+  private def convertLogicalToMetadataOnly(
+  project: LogicalPlan,
+  filter: Option[Expression],
+  logical: LogicalRelation,
+  files: HadoopFsRelation): LogicalPlan = {
+val attributeMap = logical.output.map(attr => (attr.name, attr)).toMap
+val partitionColumns = files.partitionSchema.map { field =>
+  attributeMap.getOrElse(field.name, throw new AnalysisException(
+s"Unable to resolve ${field.name} given 
[${logical.output.map(_.name).mkString(", ")}]"))
+}
+val projectSet = filter.map(project.references ++ 
_.references).getOrElse(project.references)
+if (projectSet.subsetOf(AttributeSet(partitionColumns))) {
+  val selectedPartitions = 
files.location.listFiles(filter.map(Seq(_)).getOrElse(Seq.empty))
+  val valuesRdd = 
sparkSession.sparkContext.parallelize(selectedPartitions.map(_.values), 1)
+  val valuesPlan = LogicalRDD(partitionColumns, 
valuesRdd)(sparkSession)
+  valuesPlan
+} else {
+  logical
+}
+  }
+
+  private def convertCatalogToMetadataOnly(
+  project: 

[GitHub] spark pull request #13494: [SPARK-15752] [SQL] support optimization for meta...

2016-06-28 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/13494#discussion_r68881745
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/MetadataOnlyOptimizer.scala
 ---
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.{AnalysisException, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, 
SessionCatalog}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+
+/**
+ * When scanning only partition columns, get results based on metadata 
without scanning files.
+ * It is used for distinct, distinct aggregations or distinct-like 
aggregations(example: Max/Min).
+ * First of all, scanning only partition columns are required, then the 
rule does the following
+ * things here:
+ * 1. aggregate expression is partition columns,
+ *  e.g. SELECT col FROM tbl GROUP BY col or SELECT col FROM tbl GROUP BY 
cube(col).
+ * 2. aggregate function on partition columns with DISTINCT,
+ *  e.g. SELECT count(DISTINCT col) FROM tbl GROUP BY col.
+ * 3. aggregate function on partition columns which have same result with 
DISTINCT keyword.
+ *  e.g. SELECT Max(col2) FROM tbl GROUP BY col1.
+ */
+case class MetadataOnlyOptimizer(
+sparkSession: SparkSession,
+catalog: SessionCatalog) extends Rule[LogicalPlan] {
+
+  private def canSupportMetadataOnly(a: Aggregate): Boolean = {
+val aggregateExpressions = a.aggregateExpressions.flatMap { expr =>
--- End diff --

where do we check it only requires partition columns?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13494: [SPARK-15752] [SQL] support optimization for meta...

2016-06-28 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/13494#discussion_r68881475
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/MetadataOnlyOptimizer.scala
 ---
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.{AnalysisException, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, 
SessionCatalog}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+
+/**
+ * When scanning only partition columns, get results based on metadata 
without scanning files.
+ * It is used for distinct, distinct aggregations or distinct-like 
aggregations(example: Max/Min).
--- End diff --

how about
```
It's used for operators that only need distinct values. Currently only 
[[Aggregate]] operator which satisfy the following conditions are supported:
1. .
2. .
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13494: [SPARK-15752] [SQL] support optimization for meta...

2016-06-27 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/13494#discussion_r68580134
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/MetadataOnlyOptimizer.scala
 ---
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.{AnalysisException, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, 
SessionCatalog}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+
+/**
+ * When scanning only partition columns, get results based on metadata 
without scanning files.
+ * It is used for distinct, distinct aggregations or distinct-like 
aggregations(example: Max/Min).
+ * Example: select Max(partition) from table.
+ */
+case class MetadataOnlyOptimizer(
+sparkSession: SparkSession,
+catalog: SessionCatalog) extends Rule[LogicalPlan] {
+
+  private def canSupportMetadataOnly(a: Aggregate): Boolean = {
+val aggregateExpressions = a.aggregateExpressions.flatMap { expr =>
+  expr.collect {
+case agg: AggregateExpression => agg
+  }
+}.distinct
+if (aggregateExpressions.isEmpty) {
+  // Cannot support for aggregate that has no aggregateFunction.
+  // example: select col1 from table group by col1.
+  false
+} else {
+  aggregateExpressions.forall { agg =>
+if (agg.isDistinct) {
+  true
+} else {
+  // If function can be evaluated on just the distinct values of a 
column, it can be used
+  // by metadata-only optimizer.
+  agg.aggregateFunction match {
+case max: Max => true
+case min: Min => true
+case hyperLog: HyperLogLogPlusPlus => true
+case _ => false
+  }
+}
+  }
+}
+  }
+
+  private def collectAliases(fields: Seq[Expression]): Map[ExprId, 
Expression] = fields.collect {
+case a @ Alias(child, _) => a.toAttribute.exprId -> child
+  }.toMap
+
+  private def substitute(aliases: Map[ExprId, Expression])(expr: 
Expression): Expression = {
+expr.transform {
+  case a @ Alias(ref: AttributeReference, name) =>
+aliases.get(ref.exprId)
+  .map(Alias(_, name)(a.exprId, a.qualifier, isGenerated = 
a.isGenerated))
+  .getOrElse(a)
+
+  case a: AttributeReference =>
+aliases.get(a.exprId)
+  .map(Alias(_, a.name)(a.exprId, a.qualifier, isGenerated = 
a.isGenerated)).getOrElse(a)
+}
+  }
+
+  private def findRelation(plan: LogicalPlan)
+  : (Option[LogicalPlan], Seq[NamedExpression], Seq[Expression], 
Map[ExprId, Expression]) = {
+plan match {
+  case relation @ LogicalRelation(files: HadoopFsRelation, _, table)
+if files.partitionSchema.nonEmpty =>
+(Some(relation), Seq.empty[NamedExpression], 
Seq.empty[Expression], Map.empty)
+
+  case relation: CatalogRelation if 
relation.catalogTable.partitionColumnNames.nonEmpty =>
+(Some(relation), Seq.empty[NamedExpression], 
Seq.empty[Expression], Map.empty)
+
+  case p @ Project(fields, child) if fields.forall(_.deterministic) =>
+val (plan, _, filters, aliases) = findRelation(child)
+val substitutedFields = 
fields.map(substitute(aliases)).asInstanceOf[Seq[NamedExpression]]
+(plan, substitutedFields, filters, 
collectAliases(substitutedFields))
+
+  case f @ Filter(condition, child) if condition.deterministic =>
+val (plan, fields, filters, 

[GitHub] spark pull request #13494: [SPARK-15752] [SQL] support optimization for meta...

2016-06-27 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/13494#discussion_r68536931
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/MetadataOnlyOptimizer.scala
 ---
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.{AnalysisException, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, 
SessionCatalog}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+
+/**
+ * When scanning only partition columns, get results based on metadata 
without scanning files.
+ * It is used for distinct, distinct aggregations or distinct-like 
aggregations(example: Max/Min).
+ * Example: select Max(partition) from table.
+ */
+case class MetadataOnlyOptimizer(
+sparkSession: SparkSession,
+catalog: SessionCatalog) extends Rule[LogicalPlan] {
+
+  private def canSupportMetadataOnly(a: Aggregate): Boolean = {
+val aggregateExpressions = a.aggregateExpressions.flatMap { expr =>
+  expr.collect {
+case agg: AggregateExpression => agg
+  }
+}.distinct
+if (aggregateExpressions.isEmpty) {
+  // Cannot support for aggregate that has no aggregateFunction.
+  // example: select col1 from table group by col1.
+  false
+} else {
+  aggregateExpressions.forall { agg =>
+if (agg.isDistinct) {
+  true
+} else {
+  // If function can be evaluated on just the distinct values of a 
column, it can be used
+  // by metadata-only optimizer.
+  agg.aggregateFunction match {
+case max: Max => true
+case min: Min => true
+case hyperLog: HyperLogLogPlusPlus => true
+case _ => false
+  }
+}
+  }
+}
+  }
+
+  private def collectAliases(fields: Seq[Expression]): Map[ExprId, 
Expression] = fields.collect {
+case a @ Alias(child, _) => a.toAttribute.exprId -> child
+  }.toMap
+
+  private def substitute(aliases: Map[ExprId, Expression])(expr: 
Expression): Expression = {
+expr.transform {
+  case a @ Alias(ref: AttributeReference, name) =>
+aliases.get(ref.exprId)
+  .map(Alias(_, name)(a.exprId, a.qualifier, isGenerated = 
a.isGenerated))
+  .getOrElse(a)
+
+  case a: AttributeReference =>
+aliases.get(a.exprId)
+  .map(Alias(_, a.name)(a.exprId, a.qualifier, isGenerated = 
a.isGenerated)).getOrElse(a)
+}
+  }
+
+  private def findRelation(plan: LogicalPlan)
+  : (Option[LogicalPlan], Seq[NamedExpression], Seq[Expression], 
Map[ExprId, Expression]) = {
+plan match {
+  case relation @ LogicalRelation(files: HadoopFsRelation, _, table)
+if files.partitionSchema.nonEmpty =>
+(Some(relation), Seq.empty[NamedExpression], 
Seq.empty[Expression], Map.empty)
+
+  case relation: CatalogRelation if 
relation.catalogTable.partitionColumnNames.nonEmpty =>
+(Some(relation), Seq.empty[NamedExpression], 
Seq.empty[Expression], Map.empty)
+
+  case p @ Project(fields, child) if fields.forall(_.deterministic) =>
+val (plan, _, filters, aliases) = findRelation(child)
+val substitutedFields = 
fields.map(substitute(aliases)).asInstanceOf[Seq[NamedExpression]]
+(plan, substitutedFields, filters, 
collectAliases(substitutedFields))
+
+  case f @ Filter(condition, child) if condition.deterministic =>
+val (plan, fields, filters, 

[GitHub] spark pull request #13494: [SPARK-15752] [SQL] support optimization for meta...

2016-06-27 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/13494#discussion_r68536827
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/MetadataOnlyOptimizer.scala
 ---
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.{AnalysisException, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, 
SessionCatalog}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+
+/**
+ * When scanning only partition columns, get results based on metadata 
without scanning files.
+ * It is used for distinct, distinct aggregations or distinct-like 
aggregations(example: Max/Min).
+ * Example: select Max(partition) from table.
+ */
+case class MetadataOnlyOptimizer(
+sparkSession: SparkSession,
+catalog: SessionCatalog) extends Rule[LogicalPlan] {
+
+  private def canSupportMetadataOnly(a: Aggregate): Boolean = {
+val aggregateExpressions = a.aggregateExpressions.flatMap { expr =>
+  expr.collect {
+case agg: AggregateExpression => agg
+  }
+}.distinct
+if (aggregateExpressions.isEmpty) {
+  // Cannot support for aggregate that has no aggregateFunction.
+  // example: select col1 from table group by col1.
+  false
+} else {
+  aggregateExpressions.forall { agg =>
+if (agg.isDistinct) {
+  true
+} else {
+  // If function can be evaluated on just the distinct values of a 
column, it can be used
+  // by metadata-only optimizer.
+  agg.aggregateFunction match {
+case max: Max => true
+case min: Min => true
+case hyperLog: HyperLogLogPlusPlus => true
+case _ => false
+  }
+}
+  }
+}
+  }
+
+  private def collectAliases(fields: Seq[Expression]): Map[ExprId, 
Expression] = fields.collect {
+case a @ Alias(child, _) => a.toAttribute.exprId -> child
+  }.toMap
+
+  private def substitute(aliases: Map[ExprId, Expression])(expr: 
Expression): Expression = {
+expr.transform {
+  case a @ Alias(ref: AttributeReference, name) =>
+aliases.get(ref.exprId)
+  .map(Alias(_, name)(a.exprId, a.qualifier, isGenerated = 
a.isGenerated))
+  .getOrElse(a)
+
+  case a: AttributeReference =>
+aliases.get(a.exprId)
+  .map(Alias(_, a.name)(a.exprId, a.qualifier, isGenerated = 
a.isGenerated)).getOrElse(a)
+}
+  }
+
+  private def findRelation(plan: LogicalPlan)
+  : (Option[LogicalPlan], Seq[NamedExpression], Seq[Expression], 
Map[ExprId, Expression]) = {
+plan match {
+  case relation @ LogicalRelation(files: HadoopFsRelation, _, table)
+if files.partitionSchema.nonEmpty =>
+(Some(relation), Seq.empty[NamedExpression], 
Seq.empty[Expression], Map.empty)
+
+  case relation: CatalogRelation if 
relation.catalogTable.partitionColumnNames.nonEmpty =>
+(Some(relation), Seq.empty[NamedExpression], 
Seq.empty[Expression], Map.empty)
+
+  case p @ Project(fields, child) if fields.forall(_.deterministic) =>
+val (plan, _, filters, aliases) = findRelation(child)
+val substitutedFields = 
fields.map(substitute(aliases)).asInstanceOf[Seq[NamedExpression]]
+(plan, substitutedFields, filters, 
collectAliases(substitutedFields))
+
+  case f @ Filter(condition, child) if condition.deterministic =>
+val (plan, fields, filters, 

[GitHub] spark pull request #13494: [SPARK-15752] [SQL] support optimization for meta...

2016-06-27 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/13494#discussion_r68536426
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/MetadataOnlyOptimizer.scala
 ---
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.{AnalysisException, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, 
SessionCatalog}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+
+/**
+ * When scanning only partition columns, get results based on metadata 
without scanning files.
+ * It is used for distinct, distinct aggregations or distinct-like 
aggregations(example: Max/Min).
+ * Example: select Max(partition) from table.
+ */
+case class MetadataOnlyOptimizer(
+sparkSession: SparkSession,
+catalog: SessionCatalog) extends Rule[LogicalPlan] {
+
+  private def canSupportMetadataOnly(a: Aggregate): Boolean = {
+val aggregateExpressions = a.aggregateExpressions.flatMap { expr =>
+  expr.collect {
+case agg: AggregateExpression => agg
+  }
+}.distinct
+if (aggregateExpressions.isEmpty) {
+  // Cannot support for aggregate that has no aggregateFunction.
+  // example: select col1 from table group by col1.
+  false
+} else {
+  aggregateExpressions.forall { agg =>
+if (agg.isDistinct) {
+  true
+} else {
+  // If function can be evaluated on just the distinct values of a 
column, it can be used
+  // by metadata-only optimizer.
+  agg.aggregateFunction match {
+case max: Max => true
+case min: Min => true
+case hyperLog: HyperLogLogPlusPlus => true
+case _ => false
+  }
+}
+  }
+}
+  }
+
+  private def collectAliases(fields: Seq[Expression]): Map[ExprId, 
Expression] = fields.collect {
+case a @ Alias(child, _) => a.toAttribute.exprId -> child
+  }.toMap
+
+  private def substitute(aliases: Map[ExprId, Expression])(expr: 
Expression): Expression = {
+expr.transform {
+  case a @ Alias(ref: AttributeReference, name) =>
+aliases.get(ref.exprId)
+  .map(Alias(_, name)(a.exprId, a.qualifier, isGenerated = 
a.isGenerated))
+  .getOrElse(a)
+
+  case a: AttributeReference =>
+aliases.get(a.exprId)
+  .map(Alias(_, a.name)(a.exprId, a.qualifier, isGenerated = 
a.isGenerated)).getOrElse(a)
+}
+  }
+
+  private def findRelation(plan: LogicalPlan)
+  : (Option[LogicalPlan], Seq[NamedExpression], Seq[Expression], 
Map[ExprId, Expression]) = {
+plan match {
+  case relation @ LogicalRelation(files: HadoopFsRelation, _, table)
+if files.partitionSchema.nonEmpty =>
+(Some(relation), Seq.empty[NamedExpression], 
Seq.empty[Expression], Map.empty)
+
+  case relation: CatalogRelation if 
relation.catalogTable.partitionColumnNames.nonEmpty =>
+(Some(relation), Seq.empty[NamedExpression], 
Seq.empty[Expression], Map.empty)
+
+  case p @ Project(fields, child) if fields.forall(_.deterministic) =>
+val (plan, _, filters, aliases) = findRelation(child)
+val substitutedFields = 
fields.map(substitute(aliases)).asInstanceOf[Seq[NamedExpression]]
+(plan, substitutedFields, filters, 
collectAliases(substitutedFields))
+
+  case f @ Filter(condition, child) if condition.deterministic =>
+val (plan, fields, filters, 

[GitHub] spark pull request #13494: [SPARK-15752] [SQL] support optimization for meta...

2016-06-27 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/13494#discussion_r68536010
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/MetadataOnlyOptimizer.scala
 ---
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.{AnalysisException, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, 
SessionCatalog}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+
+/**
+ * When scanning only partition columns, get results based on metadata 
without scanning files.
+ * It is used for distinct, distinct aggregations or distinct-like 
aggregations(example: Max/Min).
+ * Example: select Max(partition) from table.
+ */
+case class MetadataOnlyOptimizer(
+sparkSession: SparkSession,
+catalog: SessionCatalog) extends Rule[LogicalPlan] {
+
+  private def canSupportMetadataOnly(a: Aggregate): Boolean = {
--- End diff --

here is my thoughts about the optimizable cases:

First of all, only parition colums are required(which means we need to 
traverse down the plan tree and find table relation here)

1. aggregate expression is partition columns, e.g. `SELECT col FROM tbl 
GROUP BY col`
2. aggregate function on partition columns with DISTINCT, e.g. `SELECT 
count(DISTINCT a) FROM tbl GROUP BY b`
3. aggregate function on partition columns which have same result with or 
without DISTINCT keyword, e.g. `SELECT sum(a) FROM tbl GROUP BY b`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13494: [SPARK-15752] [SQL] support optimization for meta...

2016-06-24 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/13494#discussion_r68377541
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala ---
@@ -457,6 +458,125 @@ private[hive] class 
HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
   allowExisting)
 }
   }
+
+  /**
+   * When scanning only partition columns, get results based on metadata 
without scanning files.
+   * It is used for distinct or distinct/Max/Min aggregations, example: 
max(partition).
+   */
+  object MetadataOnlyOptimizer extends Rule[LogicalPlan] {
+
+private def canSupportMetadataOnly(a: Aggregate): Boolean = {
+  val aggregateExpressions = a.aggregateExpressions.flatMap { expr =>
+expr.collect {
+  case agg: AggregateExpression => agg
+}
+  }.distinct
+  aggregateExpressions.forall { agg =>
+if (agg.isDistinct) {
+  true
+} else {
+  agg.aggregateFunction match {
+case max: Max => true
+case min: Min => true
+case _ => false
+  }
+}
+  }
+}
+
+private def findRelation(plan: LogicalPlan): (Option[LogicalPlan], 
Seq[Expression]) = {
+  plan match {
+case relation @ LogicalRelation(files: HadoopFsRelation, _, table)
+  if files.partitionSchema.nonEmpty =>
+  (Some(relation), Seq.empty[Expression])
+
+case relation: MetastoreRelation if 
relation.partitionKeys.nonEmpty =>
+  (Some(relation), Seq.empty[Expression])
+
+case p @ Project(_, child) =>
+  findRelation(child)
+
+case f @ Filter(filterCondition, child) =>
+  val (plan, conditions) = findRelation(child)
+  (plan, conditions ++ Seq(filterCondition))
+
+case SubqueryAlias(_, child) =>
+  findRelation(child)
+
+case _ => (None, Seq.empty[Expression])
+  }
+}
+
+private def convertToMetadataOnlyPlan(
+parent: LogicalPlan,
+project: Option[LogicalPlan],
+filters: Seq[Expression],
+relation: LogicalPlan): LogicalPlan = relation match {
+  case l @ LogicalRelation(files: HadoopFsRelation, _, _) =>
+val attributeMap = l.output.map(attr => (attr.name, attr)).toMap
+val partitionColumns = files.partitionSchema.map { field =>
+  attributeMap.getOrElse(field.name, throw new AnalysisException(
+s"Unable to resolve ${field.name} given 
[${l.output.map(_.name).mkString(", ")}]"))
+}
+val filterColumns = filters.flatMap(_.references)
+val projectSet = parent.references ++ AttributeSet(filterColumns)
+if (projectSet.subsetOf(AttributeSet(partitionColumns))) {
+  val selectedPartitions = files.location.listFiles(filters)
+  val partitionValues = selectedPartitions.map(_.values)
+  val valuesRdd = 
sparkSession.sparkContext.parallelize(partitionValues, 1)
+  val valuesPlan = LogicalRDD(partitionColumns, 
valuesRdd)(sparkSession)
+  val scanPlan = project.map(_.withNewChildren(valuesPlan :: 
Nil)).getOrElse(valuesPlan)
+  parent.withNewChildren(scanPlan :: Nil)
+} else {
+  parent
+}
+
+  case relation: MetastoreRelation =>
+if 
(parent.references.subsetOf(AttributeSet(relation.partitionKeys))) {
+  val partitionColumnDataTypes = 
relation.partitionKeys.map(_.dataType)
+  val partitionValues = relation.getHiveQlPartitions(filters).map 
{ p =>
+
InternalRow.fromSeq(p.getValues.asScala.zip(partitionColumnDataTypes).map {
+  case (rawValue, dataType) => Cast(Literal(rawValue), 
dataType).eval(null)
+})
+  }
+  val valuesRdd = 
sparkSession.sparkContext.parallelize(partitionValues, 1)
+  val valuesPlan = LogicalRDD(relation.partitionKeys, 
valuesRdd)(sparkSession)
+  val filterPlan =
+filters.reduceLeftOption(And).map(Filter(_, 
valuesPlan)).getOrElse(valuesPlan)
+  val scanPlan = project.map(_.withNewChildren(filterPlan :: 
Nil)).getOrElse(filterPlan)
+  parent.withNewChildren(scanPlan :: Nil)
+} else {
+  parent
+}
+
+  case _ =>
+parent
+}
+
+def apply(plan: LogicalPlan): LogicalPlan = {
+  if (!sparkSession.sessionState.conf.optimizerMetadataOnly) {
+return plan
+  }
+  plan.transform {
+case a @ Aggregate(_, _, child) if canSupportMetadataOnly(a) =>
+ 

[GitHub] spark pull request #13494: [SPARK-15752] [SQL] support optimization for meta...

2016-06-24 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/13494#discussion_r68377009
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala ---
@@ -457,6 +458,125 @@ private[hive] class 
HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
   allowExisting)
 }
   }
+
+  /**
+   * When scanning only partition columns, get results based on metadata 
without scanning files.
+   * It is used for distinct or distinct/Max/Min aggregations, example: 
max(partition).
+   */
+  object MetadataOnlyOptimizer extends Rule[LogicalPlan] {
+
+private def canSupportMetadataOnly(a: Aggregate): Boolean = {
+  val aggregateExpressions = a.aggregateExpressions.flatMap { expr =>
+expr.collect {
+  case agg: AggregateExpression => agg
+}
+  }.distinct
+  aggregateExpressions.forall { agg =>
+if (agg.isDistinct) {
+  true
+} else {
+  agg.aggregateFunction match {
+case max: Max => true
+case min: Min => true
+case _ => false
+  }
+}
+  }
+}
+
+private def findRelation(plan: LogicalPlan): (Option[LogicalPlan], 
Seq[Expression]) = {
+  plan match {
+case relation @ LogicalRelation(files: HadoopFsRelation, _, table)
+  if files.partitionSchema.nonEmpty =>
+  (Some(relation), Seq.empty[Expression])
+
+case relation: MetastoreRelation if 
relation.partitionKeys.nonEmpty =>
+  (Some(relation), Seq.empty[Expression])
+
+case p @ Project(_, child) =>
--- End diff --

is it safe to keep traversing the plan tree through `Project`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13494: [SPARK-15752] [SQL] support optimization for meta...

2016-06-24 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/13494#discussion_r68375799
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala ---
@@ -457,6 +458,125 @@ private[hive] class 
HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
   allowExisting)
 }
   }
+
+  /**
+   * When scanning only partition columns, get results based on metadata 
without scanning files.
+   * It is used for distinct or distinct/Max/Min aggregations, example: 
max(partition).
+   */
+  object MetadataOnlyOptimizer extends Rule[LogicalPlan] {
+
+private def canSupportMetadataOnly(a: Aggregate): Boolean = {
+  val aggregateExpressions = a.aggregateExpressions.flatMap { expr =>
+expr.collect {
+  case agg: AggregateExpression => agg
+}
+  }.distinct
+  aggregateExpressions.forall { agg =>
+if (agg.isDistinct) {
+  true
+} else {
+  agg.aggregateFunction match {
+case max: Max => true
+case min: Min => true
+case _ => false
+  }
+}
+  }
+}
+
+private def findRelation(plan: LogicalPlan): (Option[LogicalPlan], 
Seq[Expression]) = {
+  plan match {
+case relation @ LogicalRelation(files: HadoopFsRelation, _, table)
+  if files.partitionSchema.nonEmpty =>
+  (Some(relation), Seq.empty[Expression])
+
+case relation: MetastoreRelation if 
relation.partitionKeys.nonEmpty =>
--- End diff --

`MetastoreRelation` extends `CatalogRelation`, I think we can put this rule 
in sql core instead of hive module.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13494: [SPARK-15752] [SQL] support optimization for meta...

2016-06-24 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/13494#discussion_r68375446
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala ---
@@ -457,6 +458,125 @@ private[hive] class 
HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
   allowExisting)
 }
   }
+
+  /**
+   * When scanning only partition columns, get results based on metadata 
without scanning files.
+   * It is used for distinct or distinct/Max/Min aggregations, example: 
max(partition).
+   */
+  object MetadataOnlyOptimizer extends Rule[LogicalPlan] {
+
+private def canSupportMetadataOnly(a: Aggregate): Boolean = {
+  val aggregateExpressions = a.aggregateExpressions.flatMap { expr =>
+expr.collect {
+  case agg: AggregateExpression => agg
+}
+  }.distinct
+  aggregateExpressions.forall { agg =>
--- End diff --

can we add more comments to explain the condition to enable metadata 
optimization? e.g. the agg expression must only reference to partition columns, 
all distinct agg functions, `Max` and `Min`, etc.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13494: [SPARK-15752] [SQL] support optimization for meta...

2016-06-24 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/13494#discussion_r68374833
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -258,6 +258,11 @@ object SQLConf {
   .booleanConf
   .createWithDefault(false)
 
+  val OPTIMIZER_METADATA_ONLY = 
SQLConfigBuilder("spark.sql.optimizer.metadataOnly")
+.doc("When true, enable the metadata-only query optimization.")
+.booleanConf
+.createWithDefault(false)
--- End diff --

can we turn it on by default?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13494: [SPARK-15752] [SQL] support optimization for meta...

2016-06-23 Thread lianhuiwang
Github user lianhuiwang commented on a diff in the pull request:

https://github.com/apache/spark/pull/13494#discussion_r68204608
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
 ---
@@ -109,108 +111,45 @@ private[sql] object FileSourceStrategy extends 
Strategy with Logging {
   val pushedDownFilters = 
dataFilters.flatMap(DataSourceStrategy.translateFilter)
   logInfo(s"Pushed Filters: ${pushedDownFilters.mkString(",")}")
 
-  val readFile = files.fileFormat.buildReaderWithPartitionValues(
-sparkSession = files.sparkSession,
-dataSchema = files.dataSchema,
-partitionSchema = files.partitionSchema,
-requiredSchema = prunedDataSchema,
-filters = pushedDownFilters,
-options = files.options,
-hadoopConf = 
files.sparkSession.sessionState.newHadoopConfWithOptions(files.options))
-
-  val plannedPartitions = files.bucketSpec match {
-case Some(bucketing) if 
files.sparkSession.sessionState.conf.bucketingEnabled =>
-  logInfo(s"Planning with ${bucketing.numBuckets} buckets")
-  val bucketed =
-selectedPartitions.flatMap { p =>
-  p.files.map { f =>
-val hosts = getBlockHosts(getBlockLocations(f), 0, 
f.getLen)
-PartitionedFile(p.values, f.getPath.toUri.toString, 0, 
f.getLen, hosts)
-  }
-}.groupBy { f =>
-  BucketingUtils
-.getBucketId(new Path(f.filePath).getName)
-.getOrElse(sys.error(s"Invalid bucket file ${f.filePath}"))
-}
-
-  (0 until bucketing.numBuckets).map { bucketId =>
-FilePartition(bucketId, bucketed.getOrElse(bucketId, Nil))
-  }
-
-case _ =>
-  val defaultMaxSplitBytes = 
files.sparkSession.sessionState.conf.filesMaxPartitionBytes
-  val openCostInBytes = 
files.sparkSession.sessionState.conf.filesOpenCostInBytes
-  val defaultParallelism = 
files.sparkSession.sparkContext.defaultParallelism
-  val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen 
+ openCostInBytes)).sum
-  val bytesPerCore = totalBytes / defaultParallelism
-  val maxSplitBytes = Math.min(defaultMaxSplitBytes,
-Math.max(openCostInBytes, bytesPerCore))
-  logInfo(s"Planning scan with bin packing, max size: 
$maxSplitBytes bytes, " +
-s"open cost is considered as scanning $openCostInBytes bytes.")
-
-  val splitFiles = selectedPartitions.flatMap { partition =>
-partition.files.flatMap { file =>
-  val blockLocations = getBlockLocations(file)
-  (0L until file.getLen by maxSplitBytes).map { offset =>
-val remaining = file.getLen - offset
-val size = if (remaining > maxSplitBytes) maxSplitBytes 
else remaining
-val hosts = getBlockHosts(blockLocations, offset, size)
-PartitionedFile(partition.values, 
file.getPath.toUri.toString, offset, size, hosts)
-  }
-}
-  }.toArray.sortBy(_.length)(implicitly[Ordering[Long]].reverse)
-
-  val partitions = new ArrayBuffer[FilePartition]
-  val currentFiles = new ArrayBuffer[PartitionedFile]
-  var currentSize = 0L
-
-  /** Add the given file to the current partition. */
-  def addFile(file: PartitionedFile): Unit = {
-currentSize += file.length + openCostInBytes
-currentFiles.append(file)
-  }
-
-  /** Close the current partition and move to the next. */
-  def closePartition(): Unit = {
-if (currentFiles.nonEmpty) {
-  val newPartition =
-FilePartition(
-  partitions.size,
-  currentFiles.toArray.toSeq) // Copy to a new Array.
-  partitions.append(newPartition)
-}
-currentFiles.clear()
-currentSize = 0
-  }
-
-  // Assign files to partitions using "First Fit Decreasing" (FFD)
-  // TODO: consider adding a slop factor here?
-  splitFiles.foreach { file =>
-if (currentSize + file.length > maxSplitBytes) {
-  closePartition()
-}
-addFile(file)
-  }
-  closePartition()
-  partitions
+  val optimizerMetadataOnly =
+readDataColumns.isEmpty && 
files.sparkSession.sessionState.conf.optimizerMetadataOnly
+  val scanRdd: RDD[InternalRow] = if (optimizerMetadataOnly) {

[GitHub] spark pull request #13494: [SPARK-15752] [SQL] support optimization for meta...

2016-06-23 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/13494#discussion_r68193963
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
 ---
@@ -109,108 +111,45 @@ private[sql] object FileSourceStrategy extends 
Strategy with Logging {
   val pushedDownFilters = 
dataFilters.flatMap(DataSourceStrategy.translateFilter)
   logInfo(s"Pushed Filters: ${pushedDownFilters.mkString(",")}")
 
-  val readFile = files.fileFormat.buildReaderWithPartitionValues(
-sparkSession = files.sparkSession,
-dataSchema = files.dataSchema,
-partitionSchema = files.partitionSchema,
-requiredSchema = prunedDataSchema,
-filters = pushedDownFilters,
-options = files.options,
-hadoopConf = 
files.sparkSession.sessionState.newHadoopConfWithOptions(files.options))
-
-  val plannedPartitions = files.bucketSpec match {
-case Some(bucketing) if 
files.sparkSession.sessionState.conf.bucketingEnabled =>
-  logInfo(s"Planning with ${bucketing.numBuckets} buckets")
-  val bucketed =
-selectedPartitions.flatMap { p =>
-  p.files.map { f =>
-val hosts = getBlockHosts(getBlockLocations(f), 0, 
f.getLen)
-PartitionedFile(p.values, f.getPath.toUri.toString, 0, 
f.getLen, hosts)
-  }
-}.groupBy { f =>
-  BucketingUtils
-.getBucketId(new Path(f.filePath).getName)
-.getOrElse(sys.error(s"Invalid bucket file ${f.filePath}"))
-}
-
-  (0 until bucketing.numBuckets).map { bucketId =>
-FilePartition(bucketId, bucketed.getOrElse(bucketId, Nil))
-  }
-
-case _ =>
-  val defaultMaxSplitBytes = 
files.sparkSession.sessionState.conf.filesMaxPartitionBytes
-  val openCostInBytes = 
files.sparkSession.sessionState.conf.filesOpenCostInBytes
-  val defaultParallelism = 
files.sparkSession.sparkContext.defaultParallelism
-  val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen 
+ openCostInBytes)).sum
-  val bytesPerCore = totalBytes / defaultParallelism
-  val maxSplitBytes = Math.min(defaultMaxSplitBytes,
-Math.max(openCostInBytes, bytesPerCore))
-  logInfo(s"Planning scan with bin packing, max size: 
$maxSplitBytes bytes, " +
-s"open cost is considered as scanning $openCostInBytes bytes.")
-
-  val splitFiles = selectedPartitions.flatMap { partition =>
-partition.files.flatMap { file =>
-  val blockLocations = getBlockLocations(file)
-  (0L until file.getLen by maxSplitBytes).map { offset =>
-val remaining = file.getLen - offset
-val size = if (remaining > maxSplitBytes) maxSplitBytes 
else remaining
-val hosts = getBlockHosts(blockLocations, offset, size)
-PartitionedFile(partition.values, 
file.getPath.toUri.toString, offset, size, hosts)
-  }
-}
-  }.toArray.sortBy(_.length)(implicitly[Ordering[Long]].reverse)
-
-  val partitions = new ArrayBuffer[FilePartition]
-  val currentFiles = new ArrayBuffer[PartitionedFile]
-  var currentSize = 0L
-
-  /** Add the given file to the current partition. */
-  def addFile(file: PartitionedFile): Unit = {
-currentSize += file.length + openCostInBytes
-currentFiles.append(file)
-  }
-
-  /** Close the current partition and move to the next. */
-  def closePartition(): Unit = {
-if (currentFiles.nonEmpty) {
-  val newPartition =
-FilePartition(
-  partitions.size,
-  currentFiles.toArray.toSeq) // Copy to a new Array.
-  partitions.append(newPartition)
-}
-currentFiles.clear()
-currentSize = 0
-  }
-
-  // Assign files to partitions using "First Fit Decreasing" (FFD)
-  // TODO: consider adding a slop factor here?
-  splitFiles.foreach { file =>
-if (currentSize + file.length > maxSplitBytes) {
-  closePartition()
-}
-addFile(file)
-  }
-  closePartition()
-  partitions
+  val optimizerMetadataOnly =
+readDataColumns.isEmpty && 
files.sparkSession.sessionState.conf.optimizerMetadataOnly
+  val scanRdd: RDD[InternalRow] = if (optimizerMetadataOnly) {
  

[GitHub] spark pull request #13494: [SPARK-15752] [SQL] support optimization for meta...

2016-06-23 Thread lianhuiwang
Github user lianhuiwang commented on a diff in the pull request:

https://github.com/apache/spark/pull/13494#discussion_r68190320
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
 ---
@@ -109,108 +111,45 @@ private[sql] object FileSourceStrategy extends 
Strategy with Logging {
   val pushedDownFilters = 
dataFilters.flatMap(DataSourceStrategy.translateFilter)
   logInfo(s"Pushed Filters: ${pushedDownFilters.mkString(",")}")
 
-  val readFile = files.fileFormat.buildReaderWithPartitionValues(
-sparkSession = files.sparkSession,
-dataSchema = files.dataSchema,
-partitionSchema = files.partitionSchema,
-requiredSchema = prunedDataSchema,
-filters = pushedDownFilters,
-options = files.options,
-hadoopConf = 
files.sparkSession.sessionState.newHadoopConfWithOptions(files.options))
-
-  val plannedPartitions = files.bucketSpec match {
-case Some(bucketing) if 
files.sparkSession.sessionState.conf.bucketingEnabled =>
-  logInfo(s"Planning with ${bucketing.numBuckets} buckets")
-  val bucketed =
-selectedPartitions.flatMap { p =>
-  p.files.map { f =>
-val hosts = getBlockHosts(getBlockLocations(f), 0, 
f.getLen)
-PartitionedFile(p.values, f.getPath.toUri.toString, 0, 
f.getLen, hosts)
-  }
-}.groupBy { f =>
-  BucketingUtils
-.getBucketId(new Path(f.filePath).getName)
-.getOrElse(sys.error(s"Invalid bucket file ${f.filePath}"))
-}
-
-  (0 until bucketing.numBuckets).map { bucketId =>
-FilePartition(bucketId, bucketed.getOrElse(bucketId, Nil))
-  }
-
-case _ =>
-  val defaultMaxSplitBytes = 
files.sparkSession.sessionState.conf.filesMaxPartitionBytes
-  val openCostInBytes = 
files.sparkSession.sessionState.conf.filesOpenCostInBytes
-  val defaultParallelism = 
files.sparkSession.sparkContext.defaultParallelism
-  val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen 
+ openCostInBytes)).sum
-  val bytesPerCore = totalBytes / defaultParallelism
-  val maxSplitBytes = Math.min(defaultMaxSplitBytes,
-Math.max(openCostInBytes, bytesPerCore))
-  logInfo(s"Planning scan with bin packing, max size: 
$maxSplitBytes bytes, " +
-s"open cost is considered as scanning $openCostInBytes bytes.")
-
-  val splitFiles = selectedPartitions.flatMap { partition =>
-partition.files.flatMap { file =>
-  val blockLocations = getBlockLocations(file)
-  (0L until file.getLen by maxSplitBytes).map { offset =>
-val remaining = file.getLen - offset
-val size = if (remaining > maxSplitBytes) maxSplitBytes 
else remaining
-val hosts = getBlockHosts(blockLocations, offset, size)
-PartitionedFile(partition.values, 
file.getPath.toUri.toString, offset, size, hosts)
-  }
-}
-  }.toArray.sortBy(_.length)(implicitly[Ordering[Long]].reverse)
-
-  val partitions = new ArrayBuffer[FilePartition]
-  val currentFiles = new ArrayBuffer[PartitionedFile]
-  var currentSize = 0L
-
-  /** Add the given file to the current partition. */
-  def addFile(file: PartitionedFile): Unit = {
-currentSize += file.length + openCostInBytes
-currentFiles.append(file)
-  }
-
-  /** Close the current partition and move to the next. */
-  def closePartition(): Unit = {
-if (currentFiles.nonEmpty) {
-  val newPartition =
-FilePartition(
-  partitions.size,
-  currentFiles.toArray.toSeq) // Copy to a new Array.
-  partitions.append(newPartition)
-}
-currentFiles.clear()
-currentSize = 0
-  }
-
-  // Assign files to partitions using "First Fit Decreasing" (FFD)
-  // TODO: consider adding a slop factor here?
-  splitFiles.foreach { file =>
-if (currentSize + file.length > maxSplitBytes) {
-  closePartition()
-}
-addFile(file)
-  }
-  closePartition()
-  partitions
+  val optimizerMetadataOnly =
+readDataColumns.isEmpty && 
files.sparkSession.sessionState.conf.optimizerMetadataOnly
+  val scanRdd: RDD[InternalRow] = if (optimizerMetadataOnly) {

[GitHub] spark pull request #13494: [SPARK-15752] [SQL] support optimization for meta...

2016-06-23 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/13494#discussion_r68186202
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
 ---
@@ -109,108 +111,45 @@ private[sql] object FileSourceStrategy extends 
Strategy with Logging {
   val pushedDownFilters = 
dataFilters.flatMap(DataSourceStrategy.translateFilter)
   logInfo(s"Pushed Filters: ${pushedDownFilters.mkString(",")}")
 
-  val readFile = files.fileFormat.buildReaderWithPartitionValues(
-sparkSession = files.sparkSession,
-dataSchema = files.dataSchema,
-partitionSchema = files.partitionSchema,
-requiredSchema = prunedDataSchema,
-filters = pushedDownFilters,
-options = files.options,
-hadoopConf = 
files.sparkSession.sessionState.newHadoopConfWithOptions(files.options))
-
-  val plannedPartitions = files.bucketSpec match {
-case Some(bucketing) if 
files.sparkSession.sessionState.conf.bucketingEnabled =>
-  logInfo(s"Planning with ${bucketing.numBuckets} buckets")
-  val bucketed =
-selectedPartitions.flatMap { p =>
-  p.files.map { f =>
-val hosts = getBlockHosts(getBlockLocations(f), 0, 
f.getLen)
-PartitionedFile(p.values, f.getPath.toUri.toString, 0, 
f.getLen, hosts)
-  }
-}.groupBy { f =>
-  BucketingUtils
-.getBucketId(new Path(f.filePath).getName)
-.getOrElse(sys.error(s"Invalid bucket file ${f.filePath}"))
-}
-
-  (0 until bucketing.numBuckets).map { bucketId =>
-FilePartition(bucketId, bucketed.getOrElse(bucketId, Nil))
-  }
-
-case _ =>
-  val defaultMaxSplitBytes = 
files.sparkSession.sessionState.conf.filesMaxPartitionBytes
-  val openCostInBytes = 
files.sparkSession.sessionState.conf.filesOpenCostInBytes
-  val defaultParallelism = 
files.sparkSession.sparkContext.defaultParallelism
-  val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen 
+ openCostInBytes)).sum
-  val bytesPerCore = totalBytes / defaultParallelism
-  val maxSplitBytes = Math.min(defaultMaxSplitBytes,
-Math.max(openCostInBytes, bytesPerCore))
-  logInfo(s"Planning scan with bin packing, max size: 
$maxSplitBytes bytes, " +
-s"open cost is considered as scanning $openCostInBytes bytes.")
-
-  val splitFiles = selectedPartitions.flatMap { partition =>
-partition.files.flatMap { file =>
-  val blockLocations = getBlockLocations(file)
-  (0L until file.getLen by maxSplitBytes).map { offset =>
-val remaining = file.getLen - offset
-val size = if (remaining > maxSplitBytes) maxSplitBytes 
else remaining
-val hosts = getBlockHosts(blockLocations, offset, size)
-PartitionedFile(partition.values, 
file.getPath.toUri.toString, offset, size, hosts)
-  }
-}
-  }.toArray.sortBy(_.length)(implicitly[Ordering[Long]].reverse)
-
-  val partitions = new ArrayBuffer[FilePartition]
-  val currentFiles = new ArrayBuffer[PartitionedFile]
-  var currentSize = 0L
-
-  /** Add the given file to the current partition. */
-  def addFile(file: PartitionedFile): Unit = {
-currentSize += file.length + openCostInBytes
-currentFiles.append(file)
-  }
-
-  /** Close the current partition and move to the next. */
-  def closePartition(): Unit = {
-if (currentFiles.nonEmpty) {
-  val newPartition =
-FilePartition(
-  partitions.size,
-  currentFiles.toArray.toSeq) // Copy to a new Array.
-  partitions.append(newPartition)
-}
-currentFiles.clear()
-currentSize = 0
-  }
-
-  // Assign files to partitions using "First Fit Decreasing" (FFD)
-  // TODO: consider adding a slop factor here?
-  splitFiles.foreach { file =>
-if (currentSize + file.length > maxSplitBytes) {
-  closePartition()
-}
-addFile(file)
-  }
-  closePartition()
-  partitions
+  val optimizerMetadataOnly =
+readDataColumns.isEmpty && 
files.sparkSession.sessionState.conf.optimizerMetadataOnly
+  val scanRdd: RDD[InternalRow] = if (optimizerMetadataOnly) {
  

[GitHub] spark pull request #13494: [SPARK-15752] [SQL] support optimization for meta...

2016-06-03 Thread lianhuiwang
GitHub user lianhuiwang opened a pull request:

https://github.com/apache/spark/pull/13494

[SPARK-15752] [SQL] support optimization for metadata only queries

## What changes were proposed in this pull request?
when query only use metadata (example: partition key), it can return 
results based on metadata without scanning files. Hive did it in HIVE-1003.

## How was this patch tested?
add unit tests


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/lianhuiwang/spark metadata-only

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/13494.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #13494


commit 2ca2c38643d648f48638eb90b2a17b099047ce70
Author: Lianhui Wang 
Date:   2016-06-03T07:34:02Z

init commit




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org