[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...

2016-10-04 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/15090#discussion_r81768164
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -581,6 +581,13 @@ object SQLConf {
   .timeConf(TimeUnit.MILLISECONDS)
   .createWithDefault(10L)
 
+  val NDV_MAX_ERROR =
+SQLConfigBuilder("spark.sql.statistics.ndv.maxError")
+  .internal()
+  .doc("The maximum estimation error allowed in HyperLogLog++ 
algorithm.")
--- End diff --

@viirya Thanks, I'll update the config description in the followup pr.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...

2016-10-03 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/15090#discussion_r81594675
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -581,6 +581,13 @@ object SQLConf {
   .timeConf(TimeUnit.MILLISECONDS)
   .createWithDefault(10L)
 
+  val NDV_MAX_ERROR =
+SQLConfigBuilder("spark.sql.statistics.ndv.maxError")
+  .internal()
+  .doc("The maximum estimation error allowed in HyperLogLog++ 
algorithm.")
--- End diff --

I'm slightly confused: what's the difference between what you said the 
what's in the doc?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...

2016-10-03 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/15090#discussion_r81507325
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -581,6 +581,13 @@ object SQLConf {
   .timeConf(TimeUnit.MILLISECONDS)
   .createWithDefault(10L)
 
+  val NDV_MAX_ERROR =
+SQLConfigBuilder("spark.sql.statistics.ndv.maxError")
+  .internal()
+  .doc("The maximum estimation error allowed in HyperLogLog++ 
algorithm.")
--- End diff --

This config comment is not correct. It looks like it will set up the 
maximum estimation error for HyperLogLog++ algorithm.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...

2016-10-01 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/15090#discussion_r81454093
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/StatisticsColumnSuite.scala ---
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.sql.{Date, Timestamp}
+
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
+import org.apache.spark.sql.catalyst.parser.ParseException
+import org.apache.spark.sql.catalyst.plans.logical.ColumnStat
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.command.AnalyzeColumnCommand
+import org.apache.spark.sql.test.SQLTestData.ArrayData
+import org.apache.spark.sql.types._
+
+class StatisticsColumnSuite extends StatisticsTest {
+  import testImplicits._
+
+  test("parse analyze column commands") {
+val tableName = "tbl"
+
+// we need to specify column names
+intercept[ParseException] {
+  sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS FOR COLUMNS")
+}
+
+val analyzeSql = s"ANALYZE TABLE $tableName COMPUTE STATISTICS FOR 
COLUMNS key, value"
+val parsed = spark.sessionState.sqlParser.parsePlan(analyzeSql)
+val expected = AnalyzeColumnCommand(TableIdentifier(tableName), 
Seq("key", "value"))
+comparePlans(parsed, expected)
+  }
+
+  test("analyzing columns of non-atomic types is not supported") {
+val tableName = "tbl"
+withTable(tableName) {
+  Seq(ArrayData(Seq(1, 2, 3), Seq(Seq(1, 2, 
3.toDF().write.saveAsTable(tableName)
+  val err = intercept[AnalysisException] {
+sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS FOR COLUMNS 
data")
+  }
+  assert(err.message.contains("Analyzing columns is not supported"))
+}
+  }
+
+  test("check correctness of columns") {
+val table = "tbl"
+val colName1 = "abc"
+val colName2 = "x.yz"
+withTable(table) {
+  sql(s"CREATE TABLE $table ($colName1 int, `$colName2` string) USING 
PARQUET")
+
+  val invalidColError = intercept[AnalysisException] {
+sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS key")
+  }
+  assert(invalidColError.message == "Invalid column name: key.")
+
+  withSQLConf("spark.sql.caseSensitive" -> "true") {
+val invalidErr = intercept[AnalysisException] {
+  sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS 
${colName1.toUpperCase}")
+}
+assert(invalidErr.message == s"Invalid column name: 
${colName1.toUpperCase}.")
+  }
+
+  withSQLConf("spark.sql.caseSensitive" -> "false") {
+val columnsToAnalyze = Seq(colName2.toUpperCase, colName1, 
colName2)
+val tableIdent = TableIdentifier(table, Some("default"))
+val relation = 
spark.sessionState.catalog.lookupRelation(tableIdent)
+val columnStats =
+  AnalyzeColumnCommand(tableIdent, 
columnsToAnalyze).computeColStats(spark, relation)._2
--- End diff --

The same here. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...

2016-10-01 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/15090#discussion_r81453676
  
--- Diff: sql/core/src/test/scala/org/apache/spark/sql/StatisticsTest.scala 
---
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, Statistics}
+import org.apache.spark.sql.execution.command.{AnalyzeColumnCommand, 
ColumnStatStruct}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types._
+
+trait StatisticsTest extends QueryTest with SharedSQLContext {
+
+  def checkColStats(
+  df: DataFrame,
+  expectedColStatsSeq: Seq[(StructField, ColumnStat)]): Unit = {
+val table = "tbl"
+withTable(table) {
+  df.write.format("json").saveAsTable(table)
+  val columns = expectedColStatsSeq.map(_._1)
+  val tableIdent = TableIdentifier(table, Some("default"))
+  val relation = spark.sessionState.catalog.lookupRelation(tableIdent)
+  val columnStats =
+AnalyzeColumnCommand(tableIdent, 
columns.map(_.name)).computeColStats(spark, relation)._2
+  expectedColStatsSeq.foreach { expected =>
--- End diff --

Yeah, that's better.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...

2016-10-01 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/15090#discussion_r81453655
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
 ---
@@ -32,19 +38,80 @@ package org.apache.spark.sql.catalyst.plans.logical
  * @param sizeInBytes Physical size in bytes. For leaf operators this 
defaults to 1, otherwise it
  *defaults to the product of children's `sizeInBytes`.
  * @param rowCount Estimated number of rows.
+ * @param colStats Column-level statistics.
  * @param isBroadcastable If true, output is small enough to be used in a 
broadcast join.
  */
 case class Statistics(
 sizeInBytes: BigInt,
 rowCount: Option[BigInt] = None,
+colStats: Map[String, ColumnStat] = Map.empty,
 isBroadcastable: Boolean = false) {
+
   override def toString: String = "Statistics(" + simpleString + ")"
 
   /** Readable string representation for the Statistics. */
   def simpleString: String = {
 Seq(s"sizeInBytes=$sizeInBytes",
   if (rowCount.isDefined) s"rowCount=${rowCount.get}" else "",
   s"isBroadcastable=$isBroadcastable"
-).filter(_.nonEmpty).mkString("", ", ", "")
+).filter(_.nonEmpty).mkString(", ")
+  }
+}
+
+/**
+ * Statistics for a column.
+ */
+case class ColumnStat(statRow: InternalRow) {
+
+  def forNumeric[T <: AtomicType](dataType: T): NumericColumnStat[T] = {
--- End diff --

But we need to include DateType and TimestampType.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...

2016-10-01 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/15090#discussion_r81450056
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/StatisticsColumnSuite.scala ---
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.sql.{Date, Timestamp}
+
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
+import org.apache.spark.sql.catalyst.parser.ParseException
+import org.apache.spark.sql.catalyst.plans.logical.ColumnStat
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.command.AnalyzeColumnCommand
+import org.apache.spark.sql.test.SQLTestData.ArrayData
+import org.apache.spark.sql.types._
+
+class StatisticsColumnSuite extends StatisticsTest {
+  import testImplicits._
+
+  test("parse analyze column commands") {
+val tableName = "tbl"
+
+// we need to specify column names
+intercept[ParseException] {
+  sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS FOR COLUMNS")
+}
+
+val analyzeSql = s"ANALYZE TABLE $tableName COMPUTE STATISTICS FOR 
COLUMNS key, value"
+val parsed = spark.sessionState.sqlParser.parsePlan(analyzeSql)
+val expected = AnalyzeColumnCommand(TableIdentifier(tableName), 
Seq("key", "value"))
+comparePlans(parsed, expected)
+  }
+
+  test("analyzing columns of non-atomic types is not supported") {
+val tableName = "tbl"
+withTable(tableName) {
+  Seq(ArrayData(Seq(1, 2, 3), Seq(Seq(1, 2, 
3.toDF().write.saveAsTable(tableName)
+  val err = intercept[AnalysisException] {
+sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS FOR COLUMNS 
data")
+  }
+  assert(err.message.contains("Analyzing columns is not supported"))
+}
+  }
+
+  test("check correctness of columns") {
+val table = "tbl"
+val colName1 = "abc"
+val colName2 = "x.yz"
+withTable(table) {
+  sql(s"CREATE TABLE $table ($colName1 int, `$colName2` string) USING 
PARQUET")
+
+  val invalidColError = intercept[AnalysisException] {
+sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS key")
+  }
+  assert(invalidColError.message == "Invalid column name: key.")
+
+  withSQLConf("spark.sql.caseSensitive" -> "true") {
+val invalidErr = intercept[AnalysisException] {
+  sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS 
${colName1.toUpperCase}")
+}
+assert(invalidErr.message == s"Invalid column name: 
${colName1.toUpperCase}.")
+  }
+
+  withSQLConf("spark.sql.caseSensitive" -> "false") {
+val columnsToAnalyze = Seq(colName2.toUpperCase, colName1, 
colName2)
+val tableIdent = TableIdentifier(table, Some("default"))
+val relation = 
spark.sessionState.catalog.lookupRelation(tableIdent)
+val columnStats =
+  AnalyzeColumnCommand(tableIdent, 
columnsToAnalyze).computeColStats(spark, relation)._2
+assert(columnStats.contains(colName1))
+assert(columnStats.contains(colName2))
+// check deduplication
+assert(columnStats.size == 2)
+assert(!columnStats.contains(colName2.toUpperCase))
+  }
+}
+  }
+
+  private def getNonNullValues[T](values: Seq[Option[T]]): Seq[T] = {
+values.filter(_.isDefined).map(_.get)
+  }
+
+  test("column-level statistics for integral type columns") {
+val values = (0 to 5).map { i =>
+  if (i % 2 == 0) None else Some(i)
+}
+val data = values.map { i =>
+  (i.map(_.toByte), i.map(_.toShort), i.map(_.toInt), i.map(_.toLong))
+}
+
+val df = data.toDF("c1", "c2", "c3", "c4")
+val nonNullValues = getNonNullValues[Int](values)
+val expectedColStatsSeq = df.schema.map { f =>
+  

[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...

2016-10-01 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/15090#discussion_r81449665
  
--- Diff: sql/core/src/test/scala/org/apache/spark/sql/StatisticsTest.scala 
---
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, Statistics}
+import org.apache.spark.sql.execution.command.{AnalyzeColumnCommand, 
ColumnStatStruct}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types._
+
+trait StatisticsTest extends QueryTest with SharedSQLContext {
+
+  def checkColStats(
+  df: DataFrame,
+  expectedColStatsSeq: Seq[(StructField, ColumnStat)]): Unit = {
+val table = "tbl"
+withTable(table) {
+  df.write.format("json").saveAsTable(table)
+  val columns = expectedColStatsSeq.map(_._1)
+  val tableIdent = TableIdentifier(table, Some("default"))
+  val relation = spark.sessionState.catalog.lookupRelation(tableIdent)
+  val columnStats =
+AnalyzeColumnCommand(tableIdent, 
columns.map(_.name)).computeColStats(spark, relation)._2
+  expectedColStatsSeq.foreach { expected =>
--- End diff --

how about `...foreach { case (field, expectedStat) =>` ? Then we use 
`field.name` instead of `expected._1.name`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...

2016-09-30 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/15090#discussion_r81379672
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
 ---
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import scala.collection.mutable
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
+import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, 
CatalogTable}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, ColumnStat, 
LogicalPlan, Statistics}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.types._
+
+
+/**
+ * Analyzes the given columns of the given table to generate statistics, 
which will be used in
+ * query optimizations.
+ */
+case class AnalyzeColumnCommand(
+tableIdent: TableIdentifier,
+columnNames: Seq[String]) extends RunnableCommand {
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+val sessionState = sparkSession.sessionState
+val db = 
tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase)
+val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db))
+val relation = 
EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdentWithDB))
+
+relation match {
+  case catalogRel: CatalogRelation =>
+updateStats(catalogRel.catalogTable,
+  AnalyzeTableCommand.calculateTotalSize(sessionState, 
catalogRel.catalogTable))
+
+  case logicalRel: LogicalRelation if 
logicalRel.catalogTable.isDefined =>
+updateStats(logicalRel.catalogTable.get, 
logicalRel.relation.sizeInBytes)
+
+  case otherRelation =>
+throw new AnalysisException("ANALYZE TABLE is not supported for " +
+  s"${otherRelation.nodeName}.")
+}
+
+def updateStats(catalogTable: CatalogTable, newTotalSize: Long): Unit 
= {
+  val (rowCount, columnStats) = computeColStats(sparkSession, relation)
+  val statistics = Statistics(
+sizeInBytes = newTotalSize,
+rowCount = Some(rowCount),
+colStats = columnStats ++ 
catalogTable.stats.map(_.colStats).getOrElse(Map()))
+  sessionState.catalog.alterTable(catalogTable.copy(stats = 
Some(statistics)))
+  // Refresh the cached data source table in the catalog.
+  sessionState.catalog.refreshTable(tableIdentWithDB)
+}
+
+Seq.empty[Row]
+  }
+
+  def computeColStats(
+  sparkSession: SparkSession,
+  relation: LogicalPlan): (Long, Map[String, ColumnStat]) = {
+
+// check correctness of column names
+val attributesToAnalyze = mutable.MutableList[Attribute]()
+val resolver = sparkSession.sessionState.conf.resolver
+columnNames.foreach { col =>
+  val exprOption = relation.output.find(attr => resolver(attr.name, 
col))
+  val expr = exprOption.getOrElse(throw new 
AnalysisException(s"Invalid column name: $col."))
+  // do deduplication
--- End diff --

@gatorsmile OK, let me add a log for it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...

2016-09-29 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/15090#discussion_r81275778
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
 ---
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import scala.collection.mutable
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
+import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, 
CatalogTable}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, ColumnStat, 
LogicalPlan, Statistics}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.types._
+
+
+/**
+ * Analyzes the given columns of the given table to generate statistics, 
which will be used in
+ * query optimizations.
+ */
+case class AnalyzeColumnCommand(
+tableIdent: TableIdentifier,
+columnNames: Seq[String]) extends RunnableCommand {
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+val sessionState = sparkSession.sessionState
+val db = 
tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase)
+val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db))
+val relation = 
EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdentWithDB))
+
+relation match {
+  case catalogRel: CatalogRelation =>
+updateStats(catalogRel.catalogTable,
+  AnalyzeTableCommand.calculateTotalSize(sessionState, 
catalogRel.catalogTable))
+
+  case logicalRel: LogicalRelation if 
logicalRel.catalogTable.isDefined =>
+updateStats(logicalRel.catalogTable.get, 
logicalRel.relation.sizeInBytes)
+
+  case otherRelation =>
+throw new AnalysisException("ANALYZE TABLE is not supported for " +
+  s"${otherRelation.nodeName}.")
+}
+
+def updateStats(catalogTable: CatalogTable, newTotalSize: Long): Unit 
= {
+  val (rowCount, columnStats) = computeColStats(sparkSession, relation)
+  val statistics = Statistics(
+sizeInBytes = newTotalSize,
+rowCount = Some(rowCount),
+colStats = columnStats ++ 
catalogTable.stats.map(_.colStats).getOrElse(Map()))
+  sessionState.catalog.alterTable(catalogTable.copy(stats = 
Some(statistics)))
+  // Refresh the cached data source table in the catalog.
+  sessionState.catalog.refreshTable(tableIdentWithDB)
+}
+
+Seq.empty[Row]
+  }
+
+  def computeColStats(
+  sparkSession: SparkSession,
+  relation: LogicalPlan): (Long, Map[String, ColumnStat]) = {
+
+// check correctness of column names
+val attributesToAnalyze = mutable.MutableList[Attribute]()
+val resolver = sparkSession.sessionState.conf.resolver
+columnNames.foreach { col =>
+  val exprOption = relation.output.find(attr => resolver(attr.name, 
col))
+  val expr = exprOption.getOrElse(throw new 
AnalysisException(s"Invalid column name: $col."))
+  // do deduplication
--- End diff --

How about issuing `logWarning`? I still think a silent drop is a concern


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...

2016-09-29 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/15090#discussion_r81259988
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/StatisticsColumnSuite.scala ---
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.sql.{Date, Timestamp}
+
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
+import org.apache.spark.sql.catalyst.parser.ParseException
+import org.apache.spark.sql.catalyst.plans.logical.ColumnStat
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.command.AnalyzeColumnCommand
+import org.apache.spark.sql.test.SQLTestData.ArrayData
+import org.apache.spark.sql.types._
+
+class StatisticsColumnSuite extends StatisticsTest {
+  import testImplicits._
+
+  test("parse analyze column commands") {
+def assertAnalyzeColumnCommand(analyzeCommand: String, c: Class[_]) {
+  val parsed = spark.sessionState.sqlParser.parsePlan(analyzeCommand)
+  val operators = parsed.collect {
+case a: AnalyzeColumnCommand => a
+case o => o
+  }
+  assert(operators.size == 1)
+  if (operators.head.getClass != c) {
--- End diff --

ok, then I'll fix it


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...

2016-09-29 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/15090#discussion_r81259329
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/StatisticsColumnSuite.scala ---
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.sql.{Date, Timestamp}
+
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
+import org.apache.spark.sql.catalyst.parser.ParseException
+import org.apache.spark.sql.catalyst.plans.logical.ColumnStat
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.command.AnalyzeColumnCommand
+import org.apache.spark.sql.test.SQLTestData.ArrayData
+import org.apache.spark.sql.types._
+
+class StatisticsColumnSuite extends StatisticsTest {
+  import testImplicits._
+
+  test("parse analyze column commands") {
+def assertAnalyzeColumnCommand(analyzeCommand: String, c: Class[_]) {
+  val parsed = spark.sessionState.sqlParser.parsePlan(analyzeCommand)
+  val operators = parsed.collect {
+case a: AnalyzeColumnCommand => a
+case o => o
+  }
+  assert(operators.size == 1)
+  if (operators.head.getClass != c) {
--- End diff --

I don't think it's a good style, parsing a SQL string and constructing an 
expected logical plan and then compare them looks much clearer.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...

2016-09-29 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/15090#discussion_r81259200
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/StatisticsColumnSuite.scala ---
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.sql.{Date, Timestamp}
+
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
+import org.apache.spark.sql.catalyst.parser.ParseException
+import org.apache.spark.sql.catalyst.plans.logical.ColumnStat
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.command.AnalyzeColumnCommand
+import org.apache.spark.sql.test.SQLTestData.ArrayData
+import org.apache.spark.sql.types._
+
+class StatisticsColumnSuite extends StatisticsTest {
+  import testImplicits._
+
+  test("parse analyze column commands") {
+def assertAnalyzeColumnCommand(analyzeCommand: String, c: Class[_]) {
+  val parsed = spark.sessionState.sqlParser.parsePlan(analyzeCommand)
+  val operators = parsed.collect {
--- End diff --

What this `collect` do? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...

2016-09-29 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/15090#discussion_r81259064
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
 ---
@@ -32,19 +38,84 @@ package org.apache.spark.sql.catalyst.plans.logical
  * @param sizeInBytes Physical size in bytes. For leaf operators this 
defaults to 1, otherwise it
  *defaults to the product of children's `sizeInBytes`.
  * @param rowCount Estimated number of rows.
+ * @param colStats Column-level statistics.
  * @param isBroadcastable If true, output is small enough to be used in a 
broadcast join.
  */
 case class Statistics(
 sizeInBytes: BigInt,
 rowCount: Option[BigInt] = None,
+colStats: Map[String, ColumnStat] = Map.empty,
 isBroadcastable: Boolean = false) {
+
   override def toString: String = "Statistics(" + simpleString + ")"
 
   /** Readable string representation for the Statistics. */
   def simpleString: String = {
 Seq(s"sizeInBytes=$sizeInBytes",
   if (rowCount.isDefined) s"rowCount=${rowCount.get}" else "",
   s"isBroadcastable=$isBroadcastable"
-).filter(_.nonEmpty).mkString("", ", ", "")
+).filter(_.nonEmpty).mkString(", ")
+  }
+}
+
+/**
+ * Statistics for a column.
+ */
+case class ColumnStat(statRow: InternalRow) {
+
+  def forNumeric[T <: AtomicType](dataType: T): NumericColumnStat[T] = {
+NumericColumnStat(statRow, dataType)
+  }
+  def forString: StringColumnStat = StringColumnStat(statRow)
+  def forBinary: BinaryColumnStat = BinaryColumnStat(statRow)
+  def forBoolean: BooleanColumnStat = BooleanColumnStat(statRow)
+
+  override def toString: String = {
+// use Base64 for encoding
+Base64.encodeBase64String(statRow.asInstanceOf[UnsafeRow].getBytes)
   }
 }
+
+object ColumnStat {
+  def apply(dataType: DataType, str: String): ColumnStat = {
+// use Base64 for decoding
+val bytes = Base64.decodeBase64(str)
+val numFields = dataType match {
+  case BinaryType | BooleanType => 3
+  case _ => 4
+}
+val unsafeRow = new UnsafeRow(numFields)
--- End diff --

ah i see, sorry I read the code wrong


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...

2016-09-29 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/15090#discussion_r81243645
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
 ---
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import scala.collection.mutable
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
+import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, 
CatalogTable}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, ColumnStat, 
LogicalPlan, Statistics}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.types._
+
+
+/**
+ * Analyzes the given columns of the given table to generate statistics, 
which will be used in
+ * query optimizations.
+ */
+case class AnalyzeColumnCommand(
+tableIdent: TableIdentifier,
+columnNames: Seq[String]) extends RunnableCommand {
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+val sessionState = sparkSession.sessionState
+val db = 
tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase)
+val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db))
+val relation = 
EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdentWithDB))
+
+relation match {
+  case catalogRel: CatalogRelation =>
+updateStats(catalogRel.catalogTable,
+  AnalyzeTableCommand.calculateTotalSize(sessionState, 
catalogRel.catalogTable))
+
+  case logicalRel: LogicalRelation if 
logicalRel.catalogTable.isDefined =>
+updateStats(logicalRel.catalogTable.get, 
logicalRel.relation.sizeInBytes)
+
+  case otherRelation =>
+throw new AnalysisException("ANALYZE TABLE is not supported for " +
+  s"${otherRelation.nodeName}.")
+}
+
+def updateStats(catalogTable: CatalogTable, newTotalSize: Long): Unit 
= {
+  val (rowCount, columnStats) = computeColStats(sparkSession, relation)
+  val statistics = Statistics(
+sizeInBytes = newTotalSize,
+rowCount = Some(rowCount),
+colStats = columnStats ++ 
catalogTable.stats.map(_.colStats).getOrElse(Map()))
+  sessionState.catalog.alterTable(catalogTable.copy(stats = 
Some(statistics)))
+  // Refresh the cached data source table in the catalog.
+  sessionState.catalog.refreshTable(tableIdentWithDB)
+}
+
+Seq.empty[Row]
+  }
+
+  def computeColStats(
+  sparkSession: SparkSession,
+  relation: LogicalPlan): (Long, Map[String, ColumnStat]) = {
+
+// check correctness of column names
+val attributesToAnalyze = mutable.MutableList[Attribute]()
+val resolver = sparkSession.sessionState.conf.resolver
+columnNames.foreach { col =>
+  val exprOption = relation.output.find(attr => resolver(attr.name, 
col))
+  val expr = exprOption.getOrElse(throw new 
AnalysisException(s"Invalid column name: $col."))
+  // do deduplication
+  if (!attributesToAnalyze.contains(expr)) {
+attributesToAnalyze += expr
+  }
+}
+
+// Collect statistics per column.
+// The first element in the result will be the overall row count, the 
following elements
+// will be structs containing all column stats.
+// The layout of each struct follows the layout of the ColumnStats.
+val ndvMaxErr = sparkSession.sessionState.conf.ndvMaxError
+val expressions = Count(Literal(1)).toAggregateExpression() +:
+  attributesToAnalyze.map(ColumnStatStruct(_, ndvMaxErr))
+val 

[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...

2016-09-29 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/15090#discussion_r81192204
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/StatisticsColumnSuite.scala ---
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.sql.{Date, Timestamp}
+
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
+import org.apache.spark.sql.catalyst.parser.ParseException
+import org.apache.spark.sql.catalyst.plans.logical.ColumnStat
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.command.AnalyzeColumnCommand
+import org.apache.spark.sql.test.SQLTestData.ArrayData
+import org.apache.spark.sql.types._
+
+class StatisticsColumnSuite extends StatisticsTest {
+  import testImplicits._
+
+  test("parse analyze column commands") {
--- End diff --

I follow the style in StatisticsSuite. I think this style is ok.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...

2016-09-29 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/15090#discussion_r81186610
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala ---
@@ -186,13 +187,18 @@ private[sql] class SessionState(sparkSession: 
SparkSession) {
   }
 
   /**
-   * Analyzes the given table in the current database to generate 
statistics, which will be
+   * Analyzes the given table in the current database to generate 
table-level statistics, which
+   * will be used in query optimizations.
+   */
+  def analyzeTable(tableIdent: TableIdentifier, noscan: Boolean = true): 
Unit = {
+AnalyzeTableCommand(tableIdent, noscan).run(sparkSession)
+  }
+
+  /**
+   * Analyzes the given columns in the table to generate column-level 
statistics, which will be
* used in query optimizations.
-   *
-   * Right now, it only supports catalog tables and it only updates the 
size of a catalog table
-   * in the external catalog.
*/
-  def analyze(tableName: String, noscan: Boolean = true): Unit = {
--- End diff --

Now AnalyzeTableCommand receives TableIdentifier instead of String 
tableName as parameter, as @hvanhovell suggested in 
[comment](https://github.com/apache/spark/pull/15090#r78717769)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...

2016-09-29 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/15090#discussion_r81183331
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
 ---
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import scala.collection.mutable
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
+import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, 
CatalogTable}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, ColumnStat, 
LogicalPlan, Statistics}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.types._
+
+
+/**
+ * Analyzes the given columns of the given table to generate statistics, 
which will be used in
+ * query optimizations.
+ */
+case class AnalyzeColumnCommand(
+tableIdent: TableIdentifier,
+columnNames: Seq[String]) extends RunnableCommand {
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+val sessionState = sparkSession.sessionState
+val db = 
tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase)
+val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db))
+val relation = 
EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdentWithDB))
+
+relation match {
+  case catalogRel: CatalogRelation =>
+updateStats(catalogRel.catalogTable,
+  AnalyzeTableCommand.calculateTotalSize(sessionState, 
catalogRel.catalogTable))
+
+  case logicalRel: LogicalRelation if 
logicalRel.catalogTable.isDefined =>
+updateStats(logicalRel.catalogTable.get, 
logicalRel.relation.sizeInBytes)
+
+  case otherRelation =>
+throw new AnalysisException("ANALYZE TABLE is not supported for " +
+  s"${otherRelation.nodeName}.")
+}
+
+def updateStats(catalogTable: CatalogTable, newTotalSize: Long): Unit 
= {
+  val (rowCount, columnStats) = computeColStats(sparkSession, relation)
+  val statistics = Statistics(
+sizeInBytes = newTotalSize,
+rowCount = Some(rowCount),
+colStats = columnStats ++ 
catalogTable.stats.map(_.colStats).getOrElse(Map()))
+  sessionState.catalog.alterTable(catalogTable.copy(stats = 
Some(statistics)))
+  // Refresh the cached data source table in the catalog.
+  sessionState.catalog.refreshTable(tableIdentWithDB)
+}
+
+Seq.empty[Row]
+  }
+
+  def computeColStats(
+  sparkSession: SparkSession,
+  relation: LogicalPlan): (Long, Map[String, ColumnStat]) = {
+
+// check correctness of column names
+val attributesToAnalyze = mutable.MutableList[Attribute]()
+val resolver = sparkSession.sessionState.conf.resolver
+columnNames.foreach { col =>
+  val exprOption = relation.output.find(attr => resolver(attr.name, 
col))
+  val expr = exprOption.getOrElse(throw new 
AnalysisException(s"Invalid column name: $col."))
+  // do deduplication
--- End diff --

Yes, hive does so. @hvanhovell and @gatorsmile also agreed on this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...

2016-09-29 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/15090#discussion_r81183194
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
 ---
@@ -32,19 +38,84 @@ package org.apache.spark.sql.catalyst.plans.logical
  * @param sizeInBytes Physical size in bytes. For leaf operators this 
defaults to 1, otherwise it
  *defaults to the product of children's `sizeInBytes`.
  * @param rowCount Estimated number of rows.
+ * @param colStats Column-level statistics.
  * @param isBroadcastable If true, output is small enough to be used in a 
broadcast join.
  */
 case class Statistics(
 sizeInBytes: BigInt,
 rowCount: Option[BigInt] = None,
+colStats: Map[String, ColumnStat] = Map.empty,
 isBroadcastable: Boolean = false) {
+
   override def toString: String = "Statistics(" + simpleString + ")"
 
   /** Readable string representation for the Statistics. */
   def simpleString: String = {
 Seq(s"sizeInBytes=$sizeInBytes",
   if (rowCount.isDefined) s"rowCount=${rowCount.get}" else "",
   s"isBroadcastable=$isBroadcastable"
-).filter(_.nonEmpty).mkString("", ", ", "")
+).filter(_.nonEmpty).mkString(", ")
+  }
+}
+
+/**
+ * Statistics for a column.
+ */
+case class ColumnStat(statRow: InternalRow) {
+
+  def forNumeric[T <: AtomicType](dataType: T): NumericColumnStat[T] = {
+NumericColumnStat(statRow, dataType)
+  }
+  def forString: StringColumnStat = StringColumnStat(statRow)
+  def forBinary: BinaryColumnStat = BinaryColumnStat(statRow)
+  def forBoolean: BooleanColumnStat = BooleanColumnStat(statRow)
+
+  override def toString: String = {
+// use Base64 for encoding
+Base64.encodeBase64String(statRow.asInstanceOf[UnsafeRow].getBytes)
   }
 }
+
+object ColumnStat {
+  def apply(dataType: DataType, str: String): ColumnStat = {
+// use Base64 for decoding
+val bytes = Base64.decodeBase64(str)
+val numFields = dataType match {
+  case BinaryType | BooleanType => 3
+  case _ => 4
+}
+val unsafeRow = new UnsafeRow(numFields)
--- End diff --

it only returns an empty UnsafeRow:
```java
row.pointTo(new byte[numBytes], numBytes);
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...

2016-09-29 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/15090#discussion_r81103501
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/StatisticsColumnSuite.scala ---
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.sql.{Date, Timestamp}
+
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
+import org.apache.spark.sql.catalyst.parser.ParseException
+import org.apache.spark.sql.catalyst.plans.logical.ColumnStat
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.command.AnalyzeColumnCommand
+import org.apache.spark.sql.test.SQLTestData.ArrayData
+import org.apache.spark.sql.types._
+
+class StatisticsColumnSuite extends StatisticsTest {
+  import testImplicits._
+
+  test("parse analyze column commands") {
+def assertAnalyzeColumnCommand(analyzeCommand: String, c: Class[_]) {
+  val parsed = spark.sessionState.sqlParser.parsePlan(analyzeCommand)
+  val operators = parsed.collect {
+case a: AnalyzeColumnCommand => a
+case o => o
+  }
+  assert(operators.size == 1)
+  if (operators.head.getClass != c) {
+fail(
+  s"""$analyzeCommand expected command: $c, but got 
${operators.head}
+ |parsed command:
+ |$parsed
+   """.stripMargin)
+  }
+}
+
+val table = "table"
+assertAnalyzeColumnCommand(
+  s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS key, value",
+  classOf[AnalyzeColumnCommand])
+
+intercept[ParseException] {
+  sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS")
+}
+  }
+
+  test("analyzing columns of non-atomic types is not supported") {
+val tableName = "tbl"
+withTable(tableName) {
+  Seq(ArrayData(Seq(1, 2, 3), Seq(Seq(1, 2, 
3.toDF().write.saveAsTable(tableName)
+  val err = intercept[AnalysisException] {
+sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS FOR COLUMNS 
data")
+  }
+  assert(err.message.contains("Analyzing columns is not supported"))
+}
+  }
+
+  test("check correctness of columns") {
+val table = "tbl"
+val colName1 = "abc"
+val colName2 = "x.yz"
+val quotedColName2 = s"`$colName2`"
--- End diff --

you can inline this variable, i.e.`CREATE TABLE $table ($colName1 int, 
`$colName2` string)`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...

2016-09-29 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/15090#discussion_r81103116
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/StatisticsColumnSuite.scala ---
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.sql.{Date, Timestamp}
+
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
+import org.apache.spark.sql.catalyst.parser.ParseException
+import org.apache.spark.sql.catalyst.plans.logical.ColumnStat
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.command.AnalyzeColumnCommand
+import org.apache.spark.sql.test.SQLTestData.ArrayData
+import org.apache.spark.sql.types._
+
+class StatisticsColumnSuite extends StatisticsTest {
+  import testImplicits._
+
+  test("parse analyze column commands") {
--- End diff --

please follow the style in `DDLCommandSuite` to write test for parser rules


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...

2016-09-29 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/15090#discussion_r81102672
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -581,6 +581,13 @@ object SQLConf {
   .timeConf(TimeUnit.MILLISECONDS)
   .createWithDefault(10L)
 
+  val NDV_MAX_ERROR =
+SQLConfigBuilder("spark.sql.statistics.ndv.maxError")
+  .internal()
+  .doc("The maximum estimation error allowed in HyperLogLog++ 
algorithm.")
--- End diff --

should we mention that this conf is only used to generate column level 
statistics?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...

2016-09-29 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/15090#discussion_r81102487
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
 ---
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import scala.collection.mutable
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
+import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, 
CatalogTable}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, ColumnStat, 
LogicalPlan, Statistics}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.types._
+
+
+/**
+ * Analyzes the given columns of the given table to generate statistics, 
which will be used in
+ * query optimizations.
+ */
+case class AnalyzeColumnCommand(
+tableIdent: TableIdentifier,
+columnNames: Seq[String]) extends RunnableCommand {
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+val sessionState = sparkSession.sessionState
+val db = 
tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase)
+val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db))
+val relation = 
EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdentWithDB))
+
+relation match {
+  case catalogRel: CatalogRelation =>
+updateStats(catalogRel.catalogTable,
+  AnalyzeTableCommand.calculateTotalSize(sessionState, 
catalogRel.catalogTable))
+
+  case logicalRel: LogicalRelation if 
logicalRel.catalogTable.isDefined =>
+updateStats(logicalRel.catalogTable.get, 
logicalRel.relation.sizeInBytes)
+
+  case otherRelation =>
+throw new AnalysisException("ANALYZE TABLE is not supported for " +
+  s"${otherRelation.nodeName}.")
+}
+
+def updateStats(catalogTable: CatalogTable, newTotalSize: Long): Unit 
= {
+  val (rowCount, columnStats) = computeColStats(sparkSession, relation)
+  val statistics = Statistics(
+sizeInBytes = newTotalSize,
+rowCount = Some(rowCount),
+colStats = columnStats ++ 
catalogTable.stats.map(_.colStats).getOrElse(Map()))
+  sessionState.catalog.alterTable(catalogTable.copy(stats = 
Some(statistics)))
+  // Refresh the cached data source table in the catalog.
+  sessionState.catalog.refreshTable(tableIdentWithDB)
+}
+
+Seq.empty[Row]
+  }
+
+  def computeColStats(
+  sparkSession: SparkSession,
+  relation: LogicalPlan): (Long, Map[String, ColumnStat]) = {
+
+// check correctness of column names
+val attributesToAnalyze = mutable.MutableList[Attribute]()
+val resolver = sparkSession.sessionState.conf.resolver
+columnNames.foreach { col =>
+  val exprOption = relation.output.find(attr => resolver(attr.name, 
col))
+  val expr = exprOption.getOrElse(throw new 
AnalysisException(s"Invalid column name: $col."))
+  // do deduplication
+  if (!attributesToAnalyze.contains(expr)) {
+attributesToAnalyze += expr
+  }
+}
+
+// Collect statistics per column.
+// The first element in the result will be the overall row count, the 
following elements
+// will be structs containing all column stats.
+// The layout of each struct follows the layout of the ColumnStats.
+val ndvMaxErr = sparkSession.sessionState.conf.ndvMaxError
+val expressions = Count(Literal(1)).toAggregateExpression() +:
+  attributesToAnalyze.map(ColumnStatStruct(_, ndvMaxErr))
+val 

[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...

2016-09-29 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/15090#discussion_r81102018
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
 ---
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import scala.collection.mutable
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
+import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, 
CatalogTable}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, ColumnStat, 
LogicalPlan, Statistics}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.types._
+
+
+/**
+ * Analyzes the given columns of the given table to generate statistics, 
which will be used in
+ * query optimizations.
+ */
+case class AnalyzeColumnCommand(
+tableIdent: TableIdentifier,
+columnNames: Seq[String]) extends RunnableCommand {
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+val sessionState = sparkSession.sessionState
+val db = 
tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase)
+val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db))
+val relation = 
EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdentWithDB))
+
+relation match {
+  case catalogRel: CatalogRelation =>
+updateStats(catalogRel.catalogTable,
+  AnalyzeTableCommand.calculateTotalSize(sessionState, 
catalogRel.catalogTable))
+
+  case logicalRel: LogicalRelation if 
logicalRel.catalogTable.isDefined =>
+updateStats(logicalRel.catalogTable.get, 
logicalRel.relation.sizeInBytes)
+
+  case otherRelation =>
+throw new AnalysisException("ANALYZE TABLE is not supported for " +
+  s"${otherRelation.nodeName}.")
+}
+
+def updateStats(catalogTable: CatalogTable, newTotalSize: Long): Unit 
= {
+  val (rowCount, columnStats) = computeColStats(sparkSession, relation)
+  val statistics = Statistics(
+sizeInBytes = newTotalSize,
+rowCount = Some(rowCount),
+colStats = columnStats ++ 
catalogTable.stats.map(_.colStats).getOrElse(Map()))
+  sessionState.catalog.alterTable(catalogTable.copy(stats = 
Some(statistics)))
+  // Refresh the cached data source table in the catalog.
+  sessionState.catalog.refreshTable(tableIdentWithDB)
+}
+
+Seq.empty[Row]
+  }
+
+  def computeColStats(
+  sparkSession: SparkSession,
+  relation: LogicalPlan): (Long, Map[String, ColumnStat]) = {
+
+// check correctness of column names
+val attributesToAnalyze = mutable.MutableList[Attribute]()
+val resolver = sparkSession.sessionState.conf.resolver
+columnNames.foreach { col =>
+  val exprOption = relation.output.find(attr => resolver(attr.name, 
col))
+  val expr = exprOption.getOrElse(throw new 
AnalysisException(s"Invalid column name: $col."))
+  // do deduplication
+  if (!attributesToAnalyze.contains(expr)) {
+attributesToAnalyze += expr
+  }
+}
+
+// Collect statistics per column.
+// The first element in the result will be the overall row count, the 
following elements
+// will be structs containing all column stats.
+// The layout of each struct follows the layout of the ColumnStats.
+val ndvMaxErr = sparkSession.sessionState.conf.ndvMaxError
+val expressions = Count(Literal(1)).toAggregateExpression() +:
+  attributesToAnalyze.map(ColumnStatStruct(_, ndvMaxErr))
+val 

[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...

2016-09-29 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/15090#discussion_r81100678
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
 ---
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import scala.collection.mutable
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
+import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, 
CatalogTable}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, ColumnStat, 
LogicalPlan, Statistics}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.types._
+
+
+/**
+ * Analyzes the given columns of the given table to generate statistics, 
which will be used in
+ * query optimizations.
+ */
+case class AnalyzeColumnCommand(
+tableIdent: TableIdentifier,
+columnNames: Seq[String]) extends RunnableCommand {
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+val sessionState = sparkSession.sessionState
+val db = 
tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase)
+val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db))
+val relation = 
EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdentWithDB))
+
+relation match {
+  case catalogRel: CatalogRelation =>
+updateStats(catalogRel.catalogTable,
+  AnalyzeTableCommand.calculateTotalSize(sessionState, 
catalogRel.catalogTable))
+
+  case logicalRel: LogicalRelation if 
logicalRel.catalogTable.isDefined =>
+updateStats(logicalRel.catalogTable.get, 
logicalRel.relation.sizeInBytes)
+
+  case otherRelation =>
+throw new AnalysisException("ANALYZE TABLE is not supported for " +
+  s"${otherRelation.nodeName}.")
+}
+
+def updateStats(catalogTable: CatalogTable, newTotalSize: Long): Unit 
= {
+  val (rowCount, columnStats) = computeColStats(sparkSession, relation)
+  val statistics = Statistics(
+sizeInBytes = newTotalSize,
+rowCount = Some(rowCount),
+colStats = columnStats ++ 
catalogTable.stats.map(_.colStats).getOrElse(Map()))
+  sessionState.catalog.alterTable(catalogTable.copy(stats = 
Some(statistics)))
+  // Refresh the cached data source table in the catalog.
+  sessionState.catalog.refreshTable(tableIdentWithDB)
+}
+
+Seq.empty[Row]
+  }
+
+  def computeColStats(
+  sparkSession: SparkSession,
+  relation: LogicalPlan): (Long, Map[String, ColumnStat]) = {
+
+// check correctness of column names
+val attributesToAnalyze = mutable.MutableList[Attribute]()
+val resolver = sparkSession.sessionState.conf.resolver
+columnNames.foreach { col =>
+  val exprOption = relation.output.find(attr => resolver(attr.name, 
col))
+  val expr = exprOption.getOrElse(throw new 
AnalysisException(s"Invalid column name: $col."))
+  // do deduplication
--- End diff --

is it a common behaviour? i.e. if users specify duplicated columns, we just 
deduplicate silently instead of reporting error.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For 

[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...

2016-09-29 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/15090#discussion_r81100194
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
 ---
@@ -32,19 +38,84 @@ package org.apache.spark.sql.catalyst.plans.logical
  * @param sizeInBytes Physical size in bytes. For leaf operators this 
defaults to 1, otherwise it
  *defaults to the product of children's `sizeInBytes`.
  * @param rowCount Estimated number of rows.
+ * @param colStats Column-level statistics.
  * @param isBroadcastable If true, output is small enough to be used in a 
broadcast join.
  */
 case class Statistics(
 sizeInBytes: BigInt,
 rowCount: Option[BigInt] = None,
+colStats: Map[String, ColumnStat] = Map.empty,
 isBroadcastable: Boolean = false) {
+
   override def toString: String = "Statistics(" + simpleString + ")"
 
   /** Readable string representation for the Statistics. */
   def simpleString: String = {
 Seq(s"sizeInBytes=$sizeInBytes",
   if (rowCount.isDefined) s"rowCount=${rowCount.get}" else "",
   s"isBroadcastable=$isBroadcastable"
-).filter(_.nonEmpty).mkString("", ", ", "")
+).filter(_.nonEmpty).mkString(", ")
+  }
+}
+
+/**
+ * Statistics for a column.
+ */
+case class ColumnStat(statRow: InternalRow) {
+
+  def forNumeric[T <: AtomicType](dataType: T): NumericColumnStat[T] = {
+NumericColumnStat(statRow, dataType)
+  }
+  def forString: StringColumnStat = StringColumnStat(statRow)
+  def forBinary: BinaryColumnStat = BinaryColumnStat(statRow)
+  def forBoolean: BooleanColumnStat = BooleanColumnStat(statRow)
+
+  override def toString: String = {
+// use Base64 for encoding
+Base64.encodeBase64String(statRow.asInstanceOf[UnsafeRow].getBytes)
   }
 }
+
+object ColumnStat {
+  def apply(dataType: DataType, str: String): ColumnStat = {
+// use Base64 for decoding
+val bytes = Base64.decodeBase64(str)
+val numFields = dataType match {
+  case BinaryType | BooleanType => 3
+  case _ => 4
+}
+val unsafeRow = new UnsafeRow(numFields)
--- End diff --

we can just write `UnsafeRow.createFromBytesArray(bytes, numFields)`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...

2016-09-27 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/15090#discussion_r80763362
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
 ---
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import scala.collection.mutable
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
+import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, 
CatalogTable}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, ColumnStat, 
LogicalPlan, Statistics}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.types._
+
+
+/**
+ * Analyzes the given columns of the given table in the current database 
to generate statistics,
+ * which will be used in query optimizations.
+ */
+case class AnalyzeColumnCommand(
+tableIdent: TableIdentifier,
+columnNames: Seq[String]) extends RunnableCommand {
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+val sessionState = sparkSession.sessionState
+val db = 
tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase)
+val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db))
+val relation = 
EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdentWithDB))
+
+relation match {
+  case catalogRel: CatalogRelation =>
+updateStats(catalogRel.catalogTable,
+  AnalyzeTableCommand.calculateTotalSize(sessionState, 
catalogRel.catalogTable))
+
+  case logicalRel: LogicalRelation if 
logicalRel.catalogTable.isDefined =>
+updateStats(logicalRel.catalogTable.get, 
logicalRel.relation.sizeInBytes)
+
+  case otherRelation =>
+throw new AnalysisException("ANALYZE TABLE is not supported for " +
+  s"${otherRelation.nodeName}.")
+}
+
+def updateStats(catalogTable: CatalogTable, newTotalSize: Long): Unit 
= {
+  val (rowCount, columnStats) = computeColStats(sparkSession, relation)
+  val statistics = Statistics(
+sizeInBytes = newTotalSize,
+rowCount = Some(rowCount),
+colStats = columnStats ++ 
catalogTable.stats.map(_.colStats).getOrElse(Map()))
+  sessionState.catalog.alterTable(catalogTable.copy(stats = 
Some(statistics)))
+  // Refresh the cached data source table in the catalog.
+  sessionState.catalog.refreshTable(tableIdentWithDB)
+}
+
+Seq.empty[Row]
+  }
+
+  def computeColStats(
+  sparkSession: SparkSession,
+  relation: LogicalPlan): (Long, Map[String, ColumnStat]) = {
+
+// check correctness of column names
+val attributesToAnalyze = mutable.MutableList[Attribute]()
+val caseSensitive = 
sparkSession.sessionState.conf.caseSensitiveAnalysis
+columnNames.foreach { col =>
+  val exprOption = relation.output.find { attr =>
+if (caseSensitive) attr.name == col else 
attr.name.equalsIgnoreCase(col)
+  }
+  val expr = exprOption.getOrElse(throw new 
AnalysisException(s"Invalid column name: $col."))
+  // do deduplication
+  if (!attributesToAnalyze.contains(expr)) {
+attributesToAnalyze += expr
+  }
+}
+
+// Collect statistics per column.
+// The first element in the result will be the overall row count, the 
following elements
+// will be structs containing all column stats.
+// The layout of each struct follows the layout of the ColumnStats.
+val ndvMaxErr = sparkSession.sessionState.conf.ndvMaxError
+val expressions = 

[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...

2016-09-27 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/15090#discussion_r80761163
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
 ---
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import scala.collection.mutable
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
+import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, 
CatalogTable}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, ColumnStat, 
LogicalPlan, Statistics}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.types._
+
+
+/**
+ * Analyzes the given columns of the given table in the current database 
to generate statistics,
+ * which will be used in query optimizations.
+ */
+case class AnalyzeColumnCommand(
+tableIdent: TableIdentifier,
+columnNames: Seq[String]) extends RunnableCommand {
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+val sessionState = sparkSession.sessionState
+val db = 
tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase)
+val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db))
+val relation = 
EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdentWithDB))
+
+relation match {
+  case catalogRel: CatalogRelation =>
+updateStats(catalogRel.catalogTable,
+  AnalyzeTableCommand.calculateTotalSize(sessionState, 
catalogRel.catalogTable))
+
+  case logicalRel: LogicalRelation if 
logicalRel.catalogTable.isDefined =>
+updateStats(logicalRel.catalogTable.get, 
logicalRel.relation.sizeInBytes)
+
+  case otherRelation =>
+throw new AnalysisException("ANALYZE TABLE is not supported for " +
+  s"${otherRelation.nodeName}.")
+}
+
+def updateStats(catalogTable: CatalogTable, newTotalSize: Long): Unit 
= {
+  val (rowCount, columnStats) = computeColStats(sparkSession, relation)
+  val statistics = Statistics(
+sizeInBytes = newTotalSize,
+rowCount = Some(rowCount),
+colStats = columnStats ++ 
catalogTable.stats.map(_.colStats).getOrElse(Map()))
+  sessionState.catalog.alterTable(catalogTable.copy(stats = 
Some(statistics)))
+  // Refresh the cached data source table in the catalog.
+  sessionState.catalog.refreshTable(tableIdentWithDB)
+}
+
+Seq.empty[Row]
+  }
+
+  def computeColStats(
+  sparkSession: SparkSession,
+  relation: LogicalPlan): (Long, Map[String, ColumnStat]) = {
+
+// check correctness of column names
+val attributesToAnalyze = mutable.MutableList[Attribute]()
+val caseSensitive = 
sparkSession.sessionState.conf.caseSensitiveAnalysis
+columnNames.foreach { col =>
+  val exprOption = relation.output.find { attr =>
+if (caseSensitive) attr.name == col else 
attr.name.equalsIgnoreCase(col)
+  }
+  val expr = exprOption.getOrElse(throw new 
AnalysisException(s"Invalid column name: $col."))
+  // do deduplication
+  if (!attributesToAnalyze.contains(expr)) {
+attributesToAnalyze += expr
+  }
+}
+
+// Collect statistics per column.
+// The first element in the result will be the overall row count, the 
following elements
+// will be structs containing all column stats.
+// The layout of each struct follows the layout of the ColumnStats.
+val ndvMaxErr = sparkSession.sessionState.conf.ndvMaxError
+val expressions = 

[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...

2016-09-27 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/15090#discussion_r80755706
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala ---
@@ -693,6 +701,7 @@ object HiveExternalCatalog {
   val STATISTICS_PREFIX = "spark.sql.statistics."
   val STATISTICS_TOTAL_SIZE = STATISTICS_PREFIX + "totalSize"
   val STATISTICS_NUM_ROWS = STATISTICS_PREFIX + "numRows"
+  val STATISTICS_BASIC_COL_STATS_PREFIX = STATISTICS_PREFIX + "colStats."
--- End diff --

Oh, sorry I forgot to change the name here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...

2016-09-27 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/15090#discussion_r80753145
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
 ---
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import scala.collection.mutable
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
+import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, 
CatalogTable}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, ColumnStat, 
LogicalPlan, Statistics}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.types._
+
+
+/**
+ * Analyzes the given columns of the given table in the current database 
to generate statistics,
+ * which will be used in query optimizations.
+ */
+case class AnalyzeColumnCommand(
+tableIdent: TableIdentifier,
+columnNames: Seq[String]) extends RunnableCommand {
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+val sessionState = sparkSession.sessionState
+val db = 
tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase)
+val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db))
+val relation = 
EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdentWithDB))
+
+relation match {
+  case catalogRel: CatalogRelation =>
+updateStats(catalogRel.catalogTable,
+  AnalyzeTableCommand.calculateTotalSize(sessionState, 
catalogRel.catalogTable))
+
+  case logicalRel: LogicalRelation if 
logicalRel.catalogTable.isDefined =>
+updateStats(logicalRel.catalogTable.get, 
logicalRel.relation.sizeInBytes)
+
+  case otherRelation =>
+throw new AnalysisException("ANALYZE TABLE is not supported for " +
+  s"${otherRelation.nodeName}.")
+}
+
+def updateStats(catalogTable: CatalogTable, newTotalSize: Long): Unit 
= {
+  val (rowCount, columnStats) = computeColStats(sparkSession, relation)
+  val statistics = Statistics(
+sizeInBytes = newTotalSize,
+rowCount = Some(rowCount),
+colStats = columnStats ++ 
catalogTable.stats.map(_.colStats).getOrElse(Map()))
+  sessionState.catalog.alterTable(catalogTable.copy(stats = 
Some(statistics)))
+  // Refresh the cached data source table in the catalog.
+  sessionState.catalog.refreshTable(tableIdentWithDB)
+}
+
+Seq.empty[Row]
+  }
+
+  def computeColStats(
+  sparkSession: SparkSession,
+  relation: LogicalPlan): (Long, Map[String, ColumnStat]) = {
+
+// check correctness of column names
+val attributesToAnalyze = mutable.MutableList[Attribute]()
+val caseSensitive = 
sparkSession.sessionState.conf.caseSensitiveAnalysis
+columnNames.foreach { col =>
+  val exprOption = relation.output.find { attr =>
+if (caseSensitive) attr.name == col else 
attr.name.equalsIgnoreCase(col)
+  }
+  val expr = exprOption.getOrElse(throw new 
AnalysisException(s"Invalid column name: $col."))
+  // do deduplication
+  if (!attributesToAnalyze.contains(expr)) {
+attributesToAnalyze += expr
+  }
+}
+
+// Collect statistics per column.
+// The first element in the result will be the overall row count, the 
following elements
+// will be structs containing all column stats.
+// The layout of each struct follows the layout of the ColumnStats.
+val ndvMaxErr = sparkSession.sessionState.conf.ndvMaxError
+val expressions = 

[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...

2016-09-27 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/15090#discussion_r80639732
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
 ---
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import scala.collection.mutable
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
+import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, 
CatalogTable}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, ColumnStat, 
LogicalPlan, Statistics}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.types._
+
+
+/**
+ * Analyzes the given columns of the given table in the current database 
to generate statistics,
+ * which will be used in query optimizations.
+ */
+case class AnalyzeColumnCommand(
+tableIdent: TableIdentifier,
+columnNames: Seq[String]) extends RunnableCommand {
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+val sessionState = sparkSession.sessionState
+val db = 
tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase)
+val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db))
+val relation = 
EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdentWithDB))
+
+relation match {
+  case catalogRel: CatalogRelation =>
+updateStats(catalogRel.catalogTable,
+  AnalyzeTableCommand.calculateTotalSize(sessionState, 
catalogRel.catalogTable))
+
+  case logicalRel: LogicalRelation if 
logicalRel.catalogTable.isDefined =>
+updateStats(logicalRel.catalogTable.get, 
logicalRel.relation.sizeInBytes)
+
+  case otherRelation =>
+throw new AnalysisException("ANALYZE TABLE is not supported for " +
+  s"${otherRelation.nodeName}.")
+}
+
+def updateStats(catalogTable: CatalogTable, newTotalSize: Long): Unit 
= {
+  val (rowCount, columnStats) = computeColStats(sparkSession, relation)
+  val statistics = Statistics(
+sizeInBytes = newTotalSize,
+rowCount = Some(rowCount),
+colStats = columnStats ++ 
catalogTable.stats.map(_.colStats).getOrElse(Map()))
+  sessionState.catalog.alterTable(catalogTable.copy(stats = 
Some(statistics)))
+  // Refresh the cached data source table in the catalog.
+  sessionState.catalog.refreshTable(tableIdentWithDB)
--- End diff --

ah i see


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...

2016-09-27 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/15090#discussion_r80639243
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala ---
@@ -473,15 +476,20 @@ private[spark] class HiveExternalCatalog(conf: 
SparkConf, hadoopConf: Configurat
   }
 }
 // construct Spark's statistics from information in Hive metastore
-if (catalogTable.properties.contains(STATISTICS_TOTAL_SIZE)) {
-  val totalSize = 
BigInt(catalogTable.properties.get(STATISTICS_TOTAL_SIZE).get)
-  // TODO: we will compute "estimatedSize" when we have column stats:
-  // average size of row * number of rows
+if 
(catalogTable.properties.filterKeys(_.startsWith(STATISTICS_PREFIX)).nonEmpty) {
+  val colStatsProps = catalogTable.properties
+.filterKeys(_.startsWith(STATISTICS_BASIC_COL_STATS_PREFIX))
+.map { case (k, v) => 
(k.replace(STATISTICS_BASIC_COL_STATS_PREFIX, ""), v) }
--- End diff --

I think it's safer to do `k.drop(STATISTICS_BASIC_COL_STATS_PREFIX.length)`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...

2016-09-27 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/15090#discussion_r80638681
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala ---
@@ -186,13 +187,18 @@ private[sql] class SessionState(sparkSession: 
SparkSession) {
   }
 
   /**
-   * Analyzes the given table in the current database to generate 
statistics, which will be
+   * Analyzes the given table in the current database to generate 
table-level statistics, which
+   * will be used in query optimizations.
+   */
+  def analyzeTable(tableIdent: TableIdentifier, noscan: Boolean = true): 
Unit = {
+AnalyzeTableCommand(tableIdent, noscan).run(sparkSession)
+  }
+
+  /**
+   * Analyzes the given columns in the table to generate column-level 
statistics, which will be
* used in query optimizations.
-   *
-   * Right now, it only supports catalog tables and it only updates the 
size of a catalog table
-   * in the external catalog.
*/
-  def analyze(tableName: String, noscan: Boolean = true): Unit = {
--- End diff --

actually this method is never used

We should keep it unchanged in this PR, and remove it in follow-up


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...

2016-09-27 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/15090#discussion_r80638078
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
 ---
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import scala.collection.mutable
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
+import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, 
CatalogTable}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, ColumnStat, 
LogicalPlan, Statistics}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.types._
+
+
+/**
+ * Analyzes the given columns of the given table in the current database 
to generate statistics,
+ * which will be used in query optimizations.
+ */
+case class AnalyzeColumnCommand(
+tableIdent: TableIdentifier,
+columnNames: Seq[String]) extends RunnableCommand {
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+val sessionState = sparkSession.sessionState
+val db = 
tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase)
+val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db))
+val relation = 
EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdentWithDB))
+
+relation match {
+  case catalogRel: CatalogRelation =>
+updateStats(catalogRel.catalogTable,
+  AnalyzeTableCommand.calculateTotalSize(sessionState, 
catalogRel.catalogTable))
+
+  case logicalRel: LogicalRelation if 
logicalRel.catalogTable.isDefined =>
+updateStats(logicalRel.catalogTable.get, 
logicalRel.relation.sizeInBytes)
+
+  case otherRelation =>
+throw new AnalysisException("ANALYZE TABLE is not supported for " +
+  s"${otherRelation.nodeName}.")
+}
+
+def updateStats(catalogTable: CatalogTable, newTotalSize: Long): Unit 
= {
+  val (rowCount, columnStats) = computeColStats(sparkSession, relation)
+  val statistics = Statistics(
+sizeInBytes = newTotalSize,
+rowCount = Some(rowCount),
+colStats = columnStats ++ 
catalogTable.stats.map(_.colStats).getOrElse(Map()))
+  sessionState.catalog.alterTable(catalogTable.copy(stats = 
Some(statistics)))
+  // Refresh the cached data source table in the catalog.
+  sessionState.catalog.refreshTable(tableIdentWithDB)
+}
+
+Seq.empty[Row]
+  }
+
+  def computeColStats(
+  sparkSession: SparkSession,
+  relation: LogicalPlan): (Long, Map[String, ColumnStat]) = {
+
+// check correctness of column names
+val attributesToAnalyze = mutable.MutableList[Attribute]()
+val caseSensitive = 
sparkSession.sessionState.conf.caseSensitiveAnalysis
+columnNames.foreach { col =>
+  val exprOption = relation.output.find { attr =>
+if (caseSensitive) attr.name == col else 
attr.name.equalsIgnoreCase(col)
+  }
+  val expr = exprOption.getOrElse(throw new 
AnalysisException(s"Invalid column name: $col."))
+  // do deduplication
+  if (!attributesToAnalyze.contains(expr)) {
+attributesToAnalyze += expr
+  }
+}
+
+// Collect statistics per column.
+// The first element in the result will be the overall row count, the 
following elements
+// will be structs containing all column stats.
+// The layout of each struct follows the layout of the ColumnStats.
+val ndvMaxErr = sparkSession.sessionState.conf.ndvMaxError
+val expressions = 

[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...

2016-09-27 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/15090#discussion_r80637650
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
 ---
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import scala.collection.mutable
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
+import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, 
CatalogTable}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, ColumnStat, 
LogicalPlan, Statistics}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.types._
+
+
+/**
+ * Analyzes the given columns of the given table in the current database 
to generate statistics,
+ * which will be used in query optimizations.
+ */
+case class AnalyzeColumnCommand(
+tableIdent: TableIdentifier,
+columnNames: Seq[String]) extends RunnableCommand {
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+val sessionState = sparkSession.sessionState
+val db = 
tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase)
+val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db))
+val relation = 
EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdentWithDB))
+
+relation match {
+  case catalogRel: CatalogRelation =>
+updateStats(catalogRel.catalogTable,
+  AnalyzeTableCommand.calculateTotalSize(sessionState, 
catalogRel.catalogTable))
+
+  case logicalRel: LogicalRelation if 
logicalRel.catalogTable.isDefined =>
+updateStats(logicalRel.catalogTable.get, 
logicalRel.relation.sizeInBytes)
+
+  case otherRelation =>
+throw new AnalysisException("ANALYZE TABLE is not supported for " +
+  s"${otherRelation.nodeName}.")
+}
+
+def updateStats(catalogTable: CatalogTable, newTotalSize: Long): Unit 
= {
+  val (rowCount, columnStats) = computeColStats(sparkSession, relation)
+  val statistics = Statistics(
+sizeInBytes = newTotalSize,
+rowCount = Some(rowCount),
+colStats = columnStats ++ 
catalogTable.stats.map(_.colStats).getOrElse(Map()))
+  sessionState.catalog.alterTable(catalogTable.copy(stats = 
Some(statistics)))
+  // Refresh the cached data source table in the catalog.
+  sessionState.catalog.refreshTable(tableIdentWithDB)
+}
+
+Seq.empty[Row]
+  }
+
+  def computeColStats(
+  sparkSession: SparkSession,
+  relation: LogicalPlan): (Long, Map[String, ColumnStat]) = {
+
+// check correctness of column names
+val attributesToAnalyze = mutable.MutableList[Attribute]()
+val caseSensitive = 
sparkSession.sessionState.conf.caseSensitiveAnalysis
+columnNames.foreach { col =>
+  val exprOption = relation.output.find { attr =>
+if (caseSensitive) attr.name == col else 
attr.name.equalsIgnoreCase(col)
+  }
+  val expr = exprOption.getOrElse(throw new 
AnalysisException(s"Invalid column name: $col."))
+  // do deduplication
+  if (!attributesToAnalyze.contains(expr)) {
+attributesToAnalyze += expr
+  }
+}
+
+// Collect statistics per column.
+// The first element in the result will be the overall row count, the 
following elements
+// will be structs containing all column stats.
+// The layout of each struct follows the layout of the ColumnStats.
+val ndvMaxErr = sparkSession.sessionState.conf.ndvMaxError
+val expressions = 

[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...

2016-09-27 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/15090#discussion_r80633172
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
 ---
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import scala.collection.mutable
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
+import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, 
CatalogTable}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, ColumnStat, 
LogicalPlan, Statistics}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.types._
+
+
+/**
+ * Analyzes the given columns of the given table in the current database 
to generate statistics,
+ * which will be used in query optimizations.
+ */
+case class AnalyzeColumnCommand(
+tableIdent: TableIdentifier,
+columnNames: Seq[String]) extends RunnableCommand {
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+val sessionState = sparkSession.sessionState
+val db = 
tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase)
+val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db))
+val relation = 
EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdentWithDB))
+
+relation match {
+  case catalogRel: CatalogRelation =>
+updateStats(catalogRel.catalogTable,
+  AnalyzeTableCommand.calculateTotalSize(sessionState, 
catalogRel.catalogTable))
+
+  case logicalRel: LogicalRelation if 
logicalRel.catalogTable.isDefined =>
+updateStats(logicalRel.catalogTable.get, 
logicalRel.relation.sizeInBytes)
+
+  case otherRelation =>
+throw new AnalysisException("ANALYZE TABLE is not supported for " +
+  s"${otherRelation.nodeName}.")
+}
+
+def updateStats(catalogTable: CatalogTable, newTotalSize: Long): Unit 
= {
+  val (rowCount, columnStats) = computeColStats(sparkSession, relation)
+  val statistics = Statistics(
+sizeInBytes = newTotalSize,
+rowCount = Some(rowCount),
+colStats = columnStats ++ 
catalogTable.stats.map(_.colStats).getOrElse(Map()))
+  sessionState.catalog.alterTable(catalogTable.copy(stats = 
Some(statistics)))
+  // Refresh the cached data source table in the catalog.
+  sessionState.catalog.refreshTable(tableIdentWithDB)
+}
+
+Seq.empty[Row]
+  }
+
+  def computeColStats(
+  sparkSession: SparkSession,
+  relation: LogicalPlan): (Long, Map[String, ColumnStat]) = {
+
+// check correctness of column names
+val attributesToAnalyze = mutable.MutableList[Attribute]()
+val caseSensitive = 
sparkSession.sessionState.conf.caseSensitiveAnalysis
--- End diff --

nice!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...

2016-09-27 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/15090#discussion_r80632476
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
 ---
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import scala.collection.mutable
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
+import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, 
CatalogTable}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, ColumnStat, 
LogicalPlan, Statistics}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.types._
+
+
+/**
+ * Analyzes the given columns of the given table in the current database 
to generate statistics,
+ * which will be used in query optimizations.
+ */
+case class AnalyzeColumnCommand(
+tableIdent: TableIdentifier,
+columnNames: Seq[String]) extends RunnableCommand {
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+val sessionState = sparkSession.sessionState
+val db = 
tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase)
+val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db))
+val relation = 
EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdentWithDB))
+
+relation match {
+  case catalogRel: CatalogRelation =>
+updateStats(catalogRel.catalogTable,
+  AnalyzeTableCommand.calculateTotalSize(sessionState, 
catalogRel.catalogTable))
+
+  case logicalRel: LogicalRelation if 
logicalRel.catalogTable.isDefined =>
+updateStats(logicalRel.catalogTable.get, 
logicalRel.relation.sizeInBytes)
+
+  case otherRelation =>
+throw new AnalysisException("ANALYZE TABLE is not supported for " +
+  s"${otherRelation.nodeName}.")
+}
+
+def updateStats(catalogTable: CatalogTable, newTotalSize: Long): Unit 
= {
+  val (rowCount, columnStats) = computeColStats(sparkSession, relation)
+  val statistics = Statistics(
+sizeInBytes = newTotalSize,
+rowCount = Some(rowCount),
+colStats = columnStats ++ 
catalogTable.stats.map(_.colStats).getOrElse(Map()))
+  sessionState.catalog.alterTable(catalogTable.copy(stats = 
Some(statistics)))
+  // Refresh the cached data source table in the catalog.
+  sessionState.catalog.refreshTable(tableIdentWithDB)
--- End diff --

We need to refresh the `stats` property of `catalogTable` in cached data 
source tables


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...

2016-09-27 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/15090#discussion_r80630427
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
 ---
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import scala.collection.mutable
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
+import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, 
CatalogTable}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, ColumnStat, 
LogicalPlan, Statistics}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.types._
+
+
+/**
+ * Analyzes the given columns of the given table in the current database 
to generate statistics,
+ * which will be used in query optimizations.
+ */
+case class AnalyzeColumnCommand(
+tableIdent: TableIdentifier,
+columnNames: Seq[String]) extends RunnableCommand {
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+val sessionState = sparkSession.sessionState
+val db = 
tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase)
+val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db))
+val relation = 
EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdentWithDB))
+
+relation match {
+  case catalogRel: CatalogRelation =>
+updateStats(catalogRel.catalogTable,
+  AnalyzeTableCommand.calculateTotalSize(sessionState, 
catalogRel.catalogTable))
+
+  case logicalRel: LogicalRelation if 
logicalRel.catalogTable.isDefined =>
+updateStats(logicalRel.catalogTable.get, 
logicalRel.relation.sizeInBytes)
+
+  case otherRelation =>
+throw new AnalysisException("ANALYZE TABLE is not supported for " +
+  s"${otherRelation.nodeName}.")
+}
+
+def updateStats(catalogTable: CatalogTable, newTotalSize: Long): Unit 
= {
+  val (rowCount, columnStats) = computeColStats(sparkSession, relation)
+  val statistics = Statistics(
+sizeInBytes = newTotalSize,
+rowCount = Some(rowCount),
+colStats = columnStats ++ 
catalogTable.stats.map(_.colStats).getOrElse(Map()))
+  sessionState.catalog.alterTable(catalogTable.copy(stats = 
Some(statistics)))
+  // Refresh the cached data source table in the catalog.
+  sessionState.catalog.refreshTable(tableIdentWithDB)
+}
+
+Seq.empty[Row]
+  }
+
+  def computeColStats(
+  sparkSession: SparkSession,
+  relation: LogicalPlan): (Long, Map[String, ColumnStat]) = {
+
+// check correctness of column names
+val attributesToAnalyze = mutable.MutableList[Attribute]()
+val caseSensitive = 
sparkSession.sessionState.conf.caseSensitiveAnalysis
--- End diff --

```
val resolver = sparkSession.sessionState.conf.resolver
...
relation.output.find(attr => resolver(attr.name, col))
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...

2016-09-27 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/15090#discussion_r80629937
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
 ---
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import scala.collection.mutable
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
+import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, 
CatalogTable}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, ColumnStat, 
LogicalPlan, Statistics}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.types._
+
+
+/**
+ * Analyzes the given columns of the given table in the current database 
to generate statistics,
+ * which will be used in query optimizations.
+ */
+case class AnalyzeColumnCommand(
+tableIdent: TableIdentifier,
+columnNames: Seq[String]) extends RunnableCommand {
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+val sessionState = sparkSession.sessionState
+val db = 
tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase)
+val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db))
+val relation = 
EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdentWithDB))
+
+relation match {
+  case catalogRel: CatalogRelation =>
+updateStats(catalogRel.catalogTable,
+  AnalyzeTableCommand.calculateTotalSize(sessionState, 
catalogRel.catalogTable))
+
+  case logicalRel: LogicalRelation if 
logicalRel.catalogTable.isDefined =>
+updateStats(logicalRel.catalogTable.get, 
logicalRel.relation.sizeInBytes)
+
+  case otherRelation =>
+throw new AnalysisException("ANALYZE TABLE is not supported for " +
+  s"${otherRelation.nodeName}.")
+}
+
+def updateStats(catalogTable: CatalogTable, newTotalSize: Long): Unit 
= {
+  val (rowCount, columnStats) = computeColStats(sparkSession, relation)
+  val statistics = Statistics(
+sizeInBytes = newTotalSize,
+rowCount = Some(rowCount),
+colStats = columnStats ++ 
catalogTable.stats.map(_.colStats).getOrElse(Map()))
+  sessionState.catalog.alterTable(catalogTable.copy(stats = 
Some(statistics)))
+  // Refresh the cached data source table in the catalog.
+  sessionState.catalog.refreshTable(tableIdentWithDB)
--- End diff --

why do we need to refresh table for statistics updating? AFAIK, refresh 
table is used when the table data files changed, and we need to list files 
again and invalidate table cache.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...

2016-09-27 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/15090#discussion_r80629684
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala ---
@@ -87,19 +87,27 @@ class SparkSqlAstBuilder(conf: SQLConf) extends 
AstBuilder {
   }
 
   /**
-   * Create an [[AnalyzeTableCommand]] command. This currently only 
implements the NOSCAN
-   * option (other options are passed on to Hive) e.g.:
+   * Create an [[AnalyzeTableCommand]] command or an 
[[AnalyzeColumnCommand]] command.
+   * Example SQL for analyzing table :
* {{{
-   *   ANALYZE TABLE table COMPUTE STATISTICS NOSCAN;
+   *   ANALYZE TABLE table COMPUTE STATISTICS [NOSCAN];
+   * }}}
+   * Example SQL for analyzing columns :
+   * {{{
+   *   ANALYZE TABLE table COMPUTE STATISTICS FOR COLUMNS column1, column2;
* }}}
*/
   override def visitAnalyze(ctx: AnalyzeContext): LogicalPlan = 
withOrigin(ctx) {
 if (ctx.partitionSpec == null &&
   ctx.identifier != null &&
   ctx.identifier.getText.toLowerCase == "noscan") {
-  
AnalyzeTableCommand(visitTableIdentifier(ctx.tableIdentifier).toString)
+  AnalyzeTableCommand(visitTableIdentifier(ctx.tableIdentifier))
+} else if (ctx.identifierSeq() == null) {
+  AnalyzeTableCommand(visitTableIdentifier(ctx.tableIdentifier), 
noscan = false)
--- End diff --

analyze table without `noncan`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...

2016-09-27 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/15090#discussion_r80629335
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
 ---
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import scala.collection.mutable
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
+import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, 
CatalogTable}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, ColumnStat, 
LogicalPlan, Statistics}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.types._
+
+
+/**
+ * Analyzes the given columns of the given table in the current database 
to generate statistics,
--- End diff --

`... in the current database ...` users can specify the database in table 
name right? I think we can just say `given table`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...

2016-09-27 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/15090#discussion_r80629224
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala ---
@@ -87,19 +87,27 @@ class SparkSqlAstBuilder(conf: SQLConf) extends 
AstBuilder {
   }
 
   /**
-   * Create an [[AnalyzeTableCommand]] command. This currently only 
implements the NOSCAN
-   * option (other options are passed on to Hive) e.g.:
+   * Create an [[AnalyzeTableCommand]] command or an 
[[AnalyzeColumnCommand]] command.
+   * Example SQL for analyzing table :
* {{{
-   *   ANALYZE TABLE table COMPUTE STATISTICS NOSCAN;
+   *   ANALYZE TABLE table COMPUTE STATISTICS [NOSCAN];
+   * }}}
+   * Example SQL for analyzing columns :
+   * {{{
+   *   ANALYZE TABLE table COMPUTE STATISTICS FOR COLUMNS column1, column2;
* }}}
*/
   override def visitAnalyze(ctx: AnalyzeContext): LogicalPlan = 
withOrigin(ctx) {
 if (ctx.partitionSpec == null &&
   ctx.identifier != null &&
   ctx.identifier.getText.toLowerCase == "noscan") {
-  
AnalyzeTableCommand(visitTableIdentifier(ctx.tableIdentifier).toString)
+  AnalyzeTableCommand(visitTableIdentifier(ctx.tableIdentifier))
+} else if (ctx.identifierSeq() == null) {
+  AnalyzeTableCommand(visitTableIdentifier(ctx.tableIdentifier), 
noscan = false)
--- End diff --

when will we hit this branch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...

2016-09-27 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/15090#discussion_r80628790
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
 ---
@@ -31,6 +31,7 @@
 import com.esotericsoftware.kryo.io.Input;
 import com.esotericsoftware.kryo.io.Output;
 
+import org.apache.spark.sql.catalyst.plans.logical.Except;
--- End diff --

unnecessary change


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...

2016-09-27 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/15090#discussion_r80626981
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
 ---
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import scala.collection.mutable
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
+import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, 
CatalogTable}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, ColumnStat, 
LogicalPlan, Statistics}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.types._
+
+
+/**
+ * Analyzes the given columns of the given table in the current database 
to generate statistics,
+ * which will be used in query optimizations.
+ */
+case class AnalyzeColumnCommand(
+tableIdent: TableIdentifier,
+columnNames: Seq[String]) extends RunnableCommand {
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+val sessionState = sparkSession.sessionState
+val db = 
tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase)
+val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db))
+val relation = 
EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdentWithDB))
+
+relation match {
+  case catalogRel: CatalogRelation =>
+updateStats(catalogRel.catalogTable,
+  AnalyzeTableCommand.calculateTotalSize(sessionState, 
catalogRel.catalogTable))
+
+  case logicalRel: LogicalRelation if 
logicalRel.catalogTable.isDefined =>
+updateStats(logicalRel.catalogTable.get, 
logicalRel.relation.sizeInBytes)
+
+  case otherRelation =>
+throw new AnalysisException("ANALYZE TABLE is not supported for " +
+  s"${otherRelation.nodeName}.")
+}
+
+def updateStats(catalogTable: CatalogTable, newTotalSize: Long): Unit 
= {
+  val (rowCount, columnStats) = computeColStats(sparkSession, relation)
+  val statistics = Statistics(
+sizeInBytes = newTotalSize,
+rowCount = Some(rowCount),
+colStats = columnStats ++ 
catalogTable.stats.map(_.colStats).getOrElse(Map()))
+  sessionState.catalog.alterTable(catalogTable.copy(stats = 
Some(statistics)))
+  // Refresh the cached data source table in the catalog.
+  sessionState.catalog.refreshTable(tableIdentWithDB)
+}
+
+Seq.empty[Row]
+  }
+
+  def computeColStats(
+  sparkSession: SparkSession,
+  relation: LogicalPlan): (Long, Map[String, ColumnStat]) = {
+
+// check correctness of column names
+val attributesToAnalyze = mutable.MutableList[Attribute]()
+val caseSensitive = 
sparkSession.sessionState.conf.caseSensitiveAnalysis
+columnNames.foreach { col =>
+  val exprOption = relation.output.find { attr =>
+if (caseSensitive) attr.name == col else 
attr.name.equalsIgnoreCase(col)
+  }
+  val expr = exprOption.getOrElse(throw new 
AnalysisException(s"Invalid column name: $col."))
+  // do deduplication
+  if (!attributesToAnalyze.contains(expr)) {
+attributesToAnalyze += expr
+  }
+}
+
+// Collect statistics per column.
+// The first element in the result will be the overall row count, the 
following elements
+// will be structs containing all column stats.
+// The layout of each struct follows the layout of the ColumnStats.
+val ndvMaxErr = sparkSession.sessionState.conf.ndvMaxError
+val expressions = 

[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...

2016-09-26 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/15090#discussion_r80615261
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
 ---
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import scala.collection.mutable
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
+import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, 
CatalogTable}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, ColumnStat, 
LogicalPlan, Statistics}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.types._
+
+
+/**
+ * Analyzes the given columns of the given table in the current database 
to generate statistics,
+ * which will be used in query optimizations.
+ */
+case class AnalyzeColumnCommand(
+tableIdent: TableIdentifier,
+columnNames: Seq[String]) extends RunnableCommand {
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+val sessionState = sparkSession.sessionState
+val db = 
tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase)
+val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db))
+val relation = 
EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdentWithDB))
+
+relation match {
+  case catalogRel: CatalogRelation =>
+updateStats(catalogRel.catalogTable,
+  AnalyzeTableCommand.calculateTotalSize(sessionState, 
catalogRel.catalogTable))
+
+  case logicalRel: LogicalRelation if 
logicalRel.catalogTable.isDefined =>
+updateStats(logicalRel.catalogTable.get, 
logicalRel.relation.sizeInBytes)
+
+  case otherRelation =>
+throw new AnalysisException("ANALYZE TABLE is not supported for " +
+  s"${otherRelation.nodeName}.")
+}
+
+def updateStats(catalogTable: CatalogTable, newTotalSize: Long): Unit 
= {
+  val (rowCount, columnStats) = computeColStats(sparkSession, relation)
+  val statistics = Statistics(
+sizeInBytes = newTotalSize,
+rowCount = Some(rowCount),
+colStats = columnStats ++ 
catalogTable.stats.map(_.colStats).getOrElse(Map()))
+  sessionState.catalog.alterTable(catalogTable.copy(stats = 
Some(statistics)))
+  // Refresh the cached data source table in the catalog.
+  sessionState.catalog.refreshTable(tableIdentWithDB)
+}
+
+Seq.empty[Row]
+  }
+
+  def computeColStats(
+  sparkSession: SparkSession,
+  relation: LogicalPlan): (Long, Map[String, ColumnStat]) = {
+
+// check correctness of column names
+val attributesToAnalyze = mutable.MutableList[Attribute]()
+val caseSensitive = 
sparkSession.sessionState.conf.caseSensitiveAnalysis
+columnNames.foreach { col =>
+  val exprOption = relation.output.find { attr =>
+if (caseSensitive) attr.name == col else 
attr.name.equalsIgnoreCase(col)
+  }
+  val expr = exprOption.getOrElse(throw new 
AnalysisException(s"Invalid column name: $col."))
+  // do deduplication
+  if (!attributesToAnalyze.contains(expr)) {
+attributesToAnalyze += expr
+  }
+}
+
+// Collect statistics per column.
+// The first element in the result will be the overall row count, the 
following elements
+// will be structs containing all column stats.
+// The layout of each struct follows the layout of the ColumnStats.
+val ndvMaxErr = sparkSession.sessionState.conf.ndvMaxError
+val expressions = 

[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...

2016-09-22 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/15090#discussion_r80165466
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
 ---
@@ -32,19 +34,74 @@ package org.apache.spark.sql.catalyst.plans.logical
  * @param sizeInBytes Physical size in bytes. For leaf operators this 
defaults to 1, otherwise it
  *defaults to the product of children's `sizeInBytes`.
  * @param rowCount Estimated number of rows.
+ * @param colStats Column-level statistics.
  * @param isBroadcastable If true, output is small enough to be used in a 
broadcast join.
  */
 case class Statistics(
 sizeInBytes: BigInt,
 rowCount: Option[BigInt] = None,
+colStats: Map[String, ColumnStats] = Map.empty,
 isBroadcastable: Boolean = false) {
+
   override def toString: String = "Statistics(" + simpleString + ")"
 
   /** Readable string representation for the Statistics. */
   def simpleString: String = {
 Seq(s"sizeInBytes=$sizeInBytes",
   if (rowCount.isDefined) s"rowCount=${rowCount.get}" else "",
+  if (colStats.nonEmpty) s"colStats=$colStats" else "",
--- End diff --

BTW, I've created a jira for this. We can fix it when column-level 
statistics including histograms are supported.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...

2016-09-22 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/15090#discussion_r80161814
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
 ---
@@ -32,19 +34,74 @@ package org.apache.spark.sql.catalyst.plans.logical
  * @param sizeInBytes Physical size in bytes. For leaf operators this 
defaults to 1, otherwise it
  *defaults to the product of children's `sizeInBytes`.
  * @param rowCount Estimated number of rows.
+ * @param colStats Column-level statistics.
  * @param isBroadcastable If true, output is small enough to be used in a 
broadcast join.
  */
 case class Statistics(
 sizeInBytes: BigInt,
 rowCount: Option[BigInt] = None,
+colStats: Map[String, ColumnStats] = Map.empty,
 isBroadcastable: Boolean = false) {
+
   override def toString: String = "Statistics(" + simpleString + ")"
 
   /** Readable string representation for the Statistics. */
   def simpleString: String = {
 Seq(s"sizeInBytes=$sizeInBytes",
   if (rowCount.isDefined) s"rowCount=${rowCount.get}" else "",
+  if (colStats.nonEmpty) s"colStats=$colStats" else "",
--- End diff --

I think it's ok for now, because we don't have other ways to see the column 
stats. We can delete this when we support `DESC FORMATTED TABLE COLUMN` 
command, which shows column-level stats (including histograms?) of the given 
column. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...

2016-09-22 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/15090#discussion_r80156525
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
 ---
@@ -32,19 +34,74 @@ package org.apache.spark.sql.catalyst.plans.logical
  * @param sizeInBytes Physical size in bytes. For leaf operators this 
defaults to 1, otherwise it
  *defaults to the product of children's `sizeInBytes`.
  * @param rowCount Estimated number of rows.
+ * @param colStats Column-level statistics.
  * @param isBroadcastable If true, output is small enough to be used in a 
broadcast join.
  */
 case class Statistics(
 sizeInBytes: BigInt,
 rowCount: Option[BigInt] = None,
+colStats: Map[String, ColumnStats] = Map.empty,
 isBroadcastable: Boolean = false) {
+
   override def toString: String = "Statistics(" + simpleString + ")"
 
   /** Readable string representation for the Statistics. */
   def simpleString: String = {
 Seq(s"sizeInBytes=$sizeInBytes",
   if (rowCount.isDefined) s"rowCount=${rowCount.get}" else "",
+  if (colStats.nonEmpty) s"colStats=$colStats" else "",
--- End diff --

I am not sure whether we should do this. Hive does not show it in the `DESC 
FORMATTED/EXTENDED`, right? The string could be super long when the table has 
many columns.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...

2016-09-22 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/15090#discussion_r80097979
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
 ---
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import scala.collection.mutable
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
+import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, 
CatalogTable}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, 
ColumnStats, LogicalPlan, Statistics}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.types._
+
+
+/**
+ * Analyzes the given columns of the given table in the current database 
to generate statistics,
+ * which will be used in query optimizations.
+ */
+case class AnalyzeColumnCommand(
+tableIdent: TableIdentifier,
+columnNames: Seq[String]) extends RunnableCommand {
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+val sessionState = sparkSession.sessionState
+val db = 
tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase)
+val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db))
+val relation = 
EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdentWithDB))
+
+relation match {
+  case catalogRel: CatalogRelation =>
+updateStats(catalogRel.catalogTable,
+  AnalyzeTableCommand.calculateTotalSize(sessionState, 
catalogRel.catalogTable))
+
+  case logicalRel: LogicalRelation if 
logicalRel.catalogTable.isDefined =>
+updateStats(logicalRel.catalogTable.get, 
logicalRel.relation.sizeInBytes)
+
+  case otherRelation =>
+throw new AnalysisException("ANALYZE TABLE is not supported for " +
+  s"${otherRelation.nodeName}.")
+}
+
+def updateStats(catalogTable: CatalogTable, newTotalSize: Long): Unit 
= {
+  val (rowCount, columnStats) = computeColStats(sparkSession, relation)
+  val statistics = Statistics(
+sizeInBytes = newTotalSize,
+rowCount = Some(rowCount),
+colStats = columnStats ++ 
catalogTable.stats.map(_.colStats).getOrElse(Map()))
+  sessionState.catalog.alterTable(catalogTable.copy(stats = 
Some(statistics)))
+  // Refresh the cached data source table in the catalog.
+  sessionState.catalog.refreshTable(tableIdentWithDB)
+}
+
+Seq.empty[Row]
+  }
+
+  def computeColStats(
+  sparkSession: SparkSession,
+  relation: LogicalPlan): (Long, Map[String, ColumnStats]) = {
+
+// check correctness of column names
+val attributesToAnalyze = mutable.MutableList[Attribute]()
+val caseSensitive = 
sparkSession.sessionState.conf.caseSensitiveAnalysis
+columnNames.foreach { col =>
+  val exprOption = relation.output.find { attr =>
+if (caseSensitive) attr.name == col else 
attr.name.equalsIgnoreCase(col)
+  }
+  val expr = exprOption.getOrElse(throw new 
AnalysisException(s"Invalid column name: $col."))
+  // do deduplication
+  if (!attributesToAnalyze.contains(expr)) {
--- End diff --

@gatorsmile There's no need to handle case sensitivity here, because in 
`attributesToAnalyze` we have the name in schema. For example, if the column 
name in schema is "abc", for columns "abc" and "ABC", both their name in expr 
are "abc".


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or 

[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...

2016-09-21 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/15090#discussion_r79975372
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
 ---
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import scala.collection.mutable
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
+import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, 
CatalogTable}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, 
ColumnStats, LogicalPlan, Statistics}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.types._
+
+
+/**
+ * Analyzes the given columns of the given table in the current database 
to generate statistics,
+ * which will be used in query optimizations.
+ */
+case class AnalyzeColumnCommand(
+tableIdent: TableIdentifier,
+columnNames: Seq[String]) extends RunnableCommand {
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+val sessionState = sparkSession.sessionState
+val db = 
tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase)
+val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db))
+val relation = 
EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdentWithDB))
+
+relation match {
+  case catalogRel: CatalogRelation =>
+updateStats(catalogRel.catalogTable,
+  AnalyzeTableCommand.calculateTotalSize(sessionState, 
catalogRel.catalogTable))
+
+  case logicalRel: LogicalRelation if 
logicalRel.catalogTable.isDefined =>
+updateStats(logicalRel.catalogTable.get, 
logicalRel.relation.sizeInBytes)
+
+  case otherRelation =>
+throw new AnalysisException("ANALYZE TABLE is not supported for " +
+  s"${otherRelation.nodeName}.")
+}
+
+def updateStats(catalogTable: CatalogTable, newTotalSize: Long): Unit 
= {
+  val (rowCount, columnStats) = computeColStats(sparkSession, relation)
+  val statistics = Statistics(
+sizeInBytes = newTotalSize,
+rowCount = Some(rowCount),
+colStats = columnStats ++ 
catalogTable.stats.map(_.colStats).getOrElse(Map()))
+  sessionState.catalog.alterTable(catalogTable.copy(stats = 
Some(statistics)))
+  // Refresh the cached data source table in the catalog.
+  sessionState.catalog.refreshTable(tableIdentWithDB)
+}
+
+Seq.empty[Row]
+  }
+
+  def computeColStats(
+  sparkSession: SparkSession,
+  relation: LogicalPlan): (Long, Map[String, ColumnStats]) = {
+
+// check correctness of column names
+val attributesToAnalyze = mutable.MutableList[Attribute]()
+val caseSensitive = 
sparkSession.sessionState.conf.caseSensitiveAnalysis
+columnNames.foreach { col =>
+  val exprOption = relation.output.find { attr =>
+if (caseSensitive) attr.name == col else 
attr.name.equalsIgnoreCase(col)
+  }
+  val expr = exprOption.getOrElse(throw new 
AnalysisException(s"Invalid column name: $col."))
+  // do deduplication
+  if (!attributesToAnalyze.contains(expr)) {
--- End diff --

Deduplication lacks case sensitivity handling. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...

2016-09-21 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/15090#discussion_r79975005
  
--- Diff: sql/core/src/test/scala/org/apache/spark/sql/StatisticsTest.scala 
---
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.plans.logical.{ColumnStats, 
Statistics}
+import org.apache.spark.sql.execution.command.AnalyzeColumnCommand
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types._
+
+trait StatisticsTest extends QueryTest with SharedSQLContext {
+
+  def checkColStats(
+  df: DataFrame,
+  expectedColStatsSeq: Seq[(String, ColumnStats)]): Unit = {
+val table = "tbl"
+withTable(table) {
+  df.write.format("json").saveAsTable(table)
+  val columns = expectedColStatsSeq.map(_._1)
+  val tableIdent = TableIdentifier(table, Some("default"))
+  val relation = spark.sessionState.catalog.lookupRelation(tableIdent)
+  val columnStats =
+AnalyzeColumnCommand(tableIdent, columns).computeColStats(spark, 
relation)._2
+  expectedColStatsSeq.foreach { expected =>
+assert(columnStats.contains(expected._1))
+checkColStats(colStats = columnStats(expected._1), 
expectedColStats = expected._2)
+  }
+}
+  }
+
+  def checkColStats(colStats: ColumnStats, expectedColStats: ColumnStats): 
Unit = {
+assert(colStats.dataType == expectedColStats.dataType)
+assert(colStats.numNulls == expectedColStats.numNulls)
+colStats.dataType match {
+  case _: IntegralType | DateType | TimestampType =>
+assert(colStats.max.map(_.toString.toLong) == 
expectedColStats.max.map(_.toString.toLong))
+assert(colStats.min.map(_.toString.toLong) == 
expectedColStats.min.map(_.toString.toLong))
+  case _: FractionalType =>
+assert(colStats.max.map(_.toString.toDouble) == expectedColStats
+  .max.map(_.toString.toDouble))
+assert(colStats.min.map(_.toString.toDouble) == expectedColStats
+  .min.map(_.toString.toDouble))
+  case _ =>
+// other types don't have max and min stats
+assert(colStats.max.isEmpty)
+assert(colStats.min.isEmpty)
+}
+colStats.dataType match {
+  case BinaryType | BooleanType => assert(colStats.ndv.isEmpty)
+  case _ =>
+// ndv is an approximate value, so we make sure we have the value, 
and it should be
+// within 3*SD's of the given rsd.
+assert(colStats.ndv.get >= 0)
+if (expectedColStats.ndv.get == 0) {
+  assert(colStats.ndv.get == 0)
+} else if (expectedColStats.ndv.get > 0) {
+  val rsd = spark.sessionState.conf.ndvMaxError
+  val error = math.abs((colStats.ndv.get / 
expectedColStats.ndv.get.toDouble) - 1.0d)
+  assert(error <= rsd * 3.0d, "Error should be within 3 std. 
errors.")
+}
+}
+assert(colStats.avgColLen == expectedColStats.avgColLen)
+assert(colStats.maxColLen == expectedColStats.maxColLen)
+assert(colStats.numTrues == expectedColStats.numTrues)
+assert(colStats.numFalses == expectedColStats.numFalses)
+  }
+
+  def checkTableStats(tableName: String, expectedRowCount: Option[Int]): 
Option[Statistics] = {
+val df = sql(s"SELECT * FROM $tableName")
--- End diff --

```Scala
val df = spark.table(tableName)
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...

2016-09-21 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/15090#discussion_r79974658
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala ---
@@ -473,15 +476,20 @@ private[spark] class HiveExternalCatalog(conf: 
SparkConf, hadoopConf: Configurat
   }
 }
 // construct Spark's statistics from information in Hive metastore
-if (catalogTable.properties.contains(STATISTICS_TOTAL_SIZE)) {
-  val totalSize = 
BigInt(catalogTable.properties.get(STATISTICS_TOTAL_SIZE).get)
-  // TODO: we will compute "estimatedSize" when we have column stats:
-  // average size of row * number of rows
+if 
(catalogTable.properties.filterKeys(_.startsWith(STATISTICS_PREFIX)).nonEmpty) {
+  val colStatsProps = catalogTable.properties
+.filterKeys(_.startsWith(STATISTICS_BASIC_COL_STATS_PREFIX))
+.map { case (k, v) => 
(k.replace(STATISTICS_BASIC_COL_STATS_PREFIX, ""), v)}
--- End diff --

Add a space between `)` and `}`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...

2016-09-21 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/15090#discussion_r79974623
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
 ---
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import scala.collection.mutable
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
+import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, 
CatalogTable}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, 
ColumnStats, LogicalPlan, Statistics}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.types._
+
+
+/**
+ * Analyzes the given columns of the given table in the current database 
to generate statistics,
+ * which will be used in query optimizations.
+ */
+case class AnalyzeColumnCommand(
+tableIdent: TableIdentifier,
+columnNames: Seq[String]) extends RunnableCommand {
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+val sessionState = sparkSession.sessionState
+val db = 
tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase)
+val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db))
+val relation = 
EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdentWithDB))
+
+relation match {
+  case catalogRel: CatalogRelation =>
+updateStats(catalogRel.catalogTable,
+  AnalyzeTableCommand.calculateTotalSize(sessionState, 
catalogRel.catalogTable))
+
+  case logicalRel: LogicalRelation if 
logicalRel.catalogTable.isDefined =>
+updateStats(logicalRel.catalogTable.get, 
logicalRel.relation.sizeInBytes)
+
+  case otherRelation =>
+throw new AnalysisException("ANALYZE TABLE is not supported for " +
+  s"${otherRelation.nodeName}.")
+}
+
+def updateStats(catalogTable: CatalogTable, newTotalSize: Long): Unit 
= {
+  val (rowCount, columnStats) = computeColStats(sparkSession, relation)
+  val statistics = Statistics(
+sizeInBytes = newTotalSize,
+rowCount = Some(rowCount),
+colStats = columnStats ++ 
catalogTable.stats.map(_.colStats).getOrElse(Map()))
+  sessionState.catalog.alterTable(catalogTable.copy(stats = 
Some(statistics)))
+  // Refresh the cached data source table in the catalog.
+  sessionState.catalog.refreshTable(tableIdentWithDB)
+}
+
+Seq.empty[Row]
+  }
+
+  def computeColStats(
+  sparkSession: SparkSession,
+  relation: LogicalPlan): (Long, Map[String, ColumnStats]) = {
+
+// check correctness of column names
+val attributesToAnalyze = mutable.MutableList[Attribute]()
+val caseSensitive = 
sparkSession.sessionState.conf.caseSensitiveAnalysis
+columnNames.foreach { col =>
+  val exprOption = relation.output.find { attr =>
+if (caseSensitive) attr.name == col else 
attr.name.equalsIgnoreCase(col)
+  }
+  val expr = exprOption.getOrElse(throw new 
AnalysisException(s"Invalid column name: $col."))
+  // do deduplication
+  if (!attributesToAnalyze.contains(expr)) {
+attributesToAnalyze += expr
+  }
+}
+
+// Collect statistics per column.
+// The first element in the result will be the overall row count, the 
following elements
+// will be structs containing all column stats.
+// The layout of each struct follows the layout of the ColumnStats.
+val ndvMaxErr = sparkSession.sessionState.conf.ndvMaxError
+val expressions = 

[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...

2016-09-21 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/15090#discussion_r79968621
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/StatisticsColumnSuite.scala ---
@@ -0,0 +1,352 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.sql.{Date, Timestamp}
+
+import org.apache.spark.sql.catalyst.parser.ParseException
+import org.apache.spark.sql.catalyst.plans.logical.ColumnStats
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.command.AnalyzeColumnCommand
+import org.apache.spark.sql.types._
+
+class StatisticsColumnSuite extends StatisticsTest {
+  import testImplicits._
+
+  test("parse analyze column commands") {
+def assertAnalyzeColumnCommand(analyzeCommand: String, c: Class[_]) {
+  val parsed = spark.sessionState.sqlParser.parsePlan(analyzeCommand)
+  val operators = parsed.collect {
+case a: AnalyzeColumnCommand => a
+case o => o
+  }
+  assert(operators.size == 1)
+  if (operators.head.getClass != c) {
+fail(
+  s"""$analyzeCommand expected command: $c, but got 
${operators.head}
+ |parsed command:
+ |$parsed
+   """.stripMargin)
+  }
+}
+
+val table = "table"
+assertAnalyzeColumnCommand(
+  s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS key, value",
+  classOf[AnalyzeColumnCommand])
+
+intercept[ParseException] {
+  sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS")
+}
+  }
+
+  test("check correctness of columns") {
+val table = "tbl"
+val colName1 = "abc"
+val colName2 = "x.yz"
+val quotedColName2 = s"`$colName2`"
+withTable(table) {
+  sql(s"CREATE TABLE $table ($colName1 int, $quotedColName2 string) 
USING PARQUET")
+
+  val invalidColError = intercept[AnalysisException] {
+sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS key")
+  }
+  assert(invalidColError.message == "Invalid column name: key.")
+
+  withSQLConf("spark.sql.caseSensitive" -> "true") {
+val invalidErr = intercept[AnalysisException] {
+  sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS 
${colName1.toUpperCase}")
+}
+assert(invalidErr.message == s"Invalid column name: 
${colName1.toUpperCase}.")
+  }
+
+  withSQLConf("spark.sql.caseSensitive" -> "false") {
+val columnsToAnalyze = Seq(colName2.toUpperCase, colName1, 
colName2)
+val columnStats = spark.sessionState.computeColumnStats(table, 
columnsToAnalyze)
--- End diff --

Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...

2016-09-21 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/15090#discussion_r79965497
  
--- Diff: sql/core/src/test/scala/org/apache/spark/sql/StatisticsTest.scala 
---
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.sql.catalyst.plans.logical.{ColumnStats, 
Statistics}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types._
+
+trait StatisticsTest extends QueryTest with SharedSQLContext {
+
+  def checkColStats(
+  df: DataFrame,
+  expectedColStatsSeq: Seq[(String, ColumnStats)]): Unit = {
+val table = "tbl"
+withTable(table) {
+  df.write.format("json").saveAsTable(table)
+  val columns = expectedColStatsSeq.map(_._1)
+  val columnStats = spark.sessionState.computeColumnStats(table, 
columns)
--- End diff --

Change this too.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...

2016-09-21 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/15090#discussion_r79965425
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/StatisticsColumnSuite.scala ---
@@ -0,0 +1,352 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.sql.{Date, Timestamp}
+
+import org.apache.spark.sql.catalyst.parser.ParseException
+import org.apache.spark.sql.catalyst.plans.logical.ColumnStats
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.command.AnalyzeColumnCommand
+import org.apache.spark.sql.types._
+
+class StatisticsColumnSuite extends StatisticsTest {
+  import testImplicits._
+
+  test("parse analyze column commands") {
+def assertAnalyzeColumnCommand(analyzeCommand: String, c: Class[_]) {
+  val parsed = spark.sessionState.sqlParser.parsePlan(analyzeCommand)
+  val operators = parsed.collect {
+case a: AnalyzeColumnCommand => a
+case o => o
+  }
+  assert(operators.size == 1)
+  if (operators.head.getClass != c) {
+fail(
+  s"""$analyzeCommand expected command: $c, but got 
${operators.head}
+ |parsed command:
+ |$parsed
+   """.stripMargin)
+  }
+}
+
+val table = "table"
+assertAnalyzeColumnCommand(
+  s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS key, value",
+  classOf[AnalyzeColumnCommand])
+
+intercept[ParseException] {
+  sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS")
+}
+  }
+
+  test("check correctness of columns") {
+val table = "tbl"
+val colName1 = "abc"
+val colName2 = "x.yz"
+val quotedColName2 = s"`$colName2`"
+withTable(table) {
+  sql(s"CREATE TABLE $table ($colName1 int, $quotedColName2 string) 
USING PARQUET")
+
+  val invalidColError = intercept[AnalysisException] {
+sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS key")
+  }
+  assert(invalidColError.message == "Invalid column name: key.")
+
+  withSQLConf("spark.sql.caseSensitive" -> "true") {
+val invalidErr = intercept[AnalysisException] {
+  sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS 
${colName1.toUpperCase}")
+}
+assert(invalidErr.message == s"Invalid column name: 
${colName1.toUpperCase}.")
+  }
+
+  withSQLConf("spark.sql.caseSensitive" -> "false") {
+val columnsToAnalyze = Seq(colName2.toUpperCase, colName1, 
colName2)
+val columnStats = spark.sessionState.computeColumnStats(table, 
columnsToAnalyze)
--- End diff --

Here, you can just replace it by 
```Scala
val tableIdent = TableIdentifier(table, Option("default"))
val relation = spark.sessionState.catalog.lookupRelation(tableIdent)
val columnStats =
  AnalyzeColumnCommand(tableIdent, 
columnsToAnalyze).computeColStats(spark, relation)._2
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...

2016-09-21 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/15090#discussion_r79965370
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala ---
@@ -186,13 +187,27 @@ private[sql] class SessionState(sparkSession: 
SparkSession) {
   }
 
   /**
-   * Analyzes the given table in the current database to generate 
statistics, which will be
+   * Analyzes the given table in the current database to generate 
table-level statistics, which
+   * will be used in query optimizations.
+   */
+  def analyzeTable(tableIdent: TableIdentifier, noscan: Boolean = true): 
Unit = {
+AnalyzeTableCommand(tableIdent, noscan).run(sparkSession)
+  }
+
+  /**
+   * Analyzes the given columns in the table to generate column-level 
statistics, which will be
* used in query optimizations.
-   *
-   * Right now, it only supports catalog tables and it only updates the 
size of a catalog table
-   * in the external catalog.
*/
-  def analyze(tableName: String, noscan: Boolean = true): Unit = {
-AnalyzeTableCommand(tableName, noscan).run(sparkSession)
+  def analyzeTableColumns(tableIdent: TableIdentifier, columnNames: 
Seq[String]): Unit = {
+AnalyzeColumnCommand(tableIdent, columnNames).run(sparkSession)
+  }
+
+  // This api is used for testing.
+  def computeColumnStats(tableName: String, columnNames: Seq[String]): 
Map[String, ColumnStats] = {
--- End diff --

Avoid adding any testing-only API, if possible.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...

2016-09-21 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/15090#discussion_r79926080
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
 ---
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import scala.collection.mutable
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
+import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, 
CatalogTable}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, 
ColumnStats, Statistics}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.types._
+
+
+/**
+ * Analyzes the given columns of the given table in the current database 
to generate statistics,
+ * which will be used in query optimizations.
+ */
+case class AnalyzeColumnCommand(
+tableIdent: TableIdentifier,
+columnNames: Seq[String]) extends RunnableCommand {
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+val sessionState = sparkSession.sessionState
+val db = 
tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase)
+val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db))
+val relation = 
EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdentWithDB))
+
+// check correctness of column names
+val attributesToAnalyze = mutable.MutableList[Attribute]()
+val caseSensitive = sessionState.conf.caseSensitiveAnalysis
+columnNames.foreach { col =>
+  val exprOption = relation.output.find { attr =>
+if (caseSensitive) attr.name == col else 
attr.name.equalsIgnoreCase(col)
+  }
+  val expr = exprOption.getOrElse(throw new 
AnalysisException(s"Invalid column name: $col."))
+  // do deduplication
+  if (!attributesToAnalyze.contains(expr)) {
+attributesToAnalyze += expr
+  }
+}
+
+relation match {
+  case catalogRel: CatalogRelation =>
+updateStats(catalogRel.catalogTable,
+  AnalyzeTableCommand.calculateTotalSize(sessionState, 
catalogRel.catalogTable))
+
+  case logicalRel: LogicalRelation if 
logicalRel.catalogTable.isDefined =>
+updateStats(logicalRel.catalogTable.get, 
logicalRel.relation.sizeInBytes)
+
+  case otherRelation =>
+throw new AnalysisException("ANALYZE TABLE is not supported for " +
+  s"${otherRelation.nodeName}.")
+}
+
+def updateStats(catalogTable: CatalogTable, newTotalSize: Long): Unit 
= {
+  // Collect statistics per column.
+  // The first element in the result will be the overall row count, 
the following elements
+  // will be structs containing all column stats.
+  // The layout of each struct follows the layout of the ColumnStats.
+  val ndvMaxErr = sessionState.conf.ndvMaxError
+  val expressions = Count(Literal(1)).toAggregateExpression() +:
+attributesToAnalyze.map(ColumnStatsStruct(_, ndvMaxErr))
+  val namedExpressions = expressions.map(e => Alias(e, e.toString)())
+  val statsRow = Dataset.ofRows(sparkSession, Aggregate(Nil, 
namedExpressions, relation))
+.queryExecution.toRdd.collect().head
+
+  // unwrap the result
+  val rowCount = statsRow.getLong(0)
+  val columnStats = attributesToAnalyze.zipWithIndex.map { case (expr, 
i) =>
+(expr.name, ColumnStatsStruct.unwrapStruct(statsRow, i + 1, expr))
+  }.toMap
+
+  val statistics = Statistics(
+sizeInBytes = newTotalSize,
+rowCount = Some(rowCount),
+

[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...

2016-09-21 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/15090#discussion_r79925839
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
 ---
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import scala.collection.mutable
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
+import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, 
CatalogTable}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, 
ColumnStats, Statistics}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.types._
+
+
+/**
+ * Analyzes the given columns of the given table in the current database 
to generate statistics,
+ * which will be used in query optimizations.
+ */
+case class AnalyzeColumnCommand(
+tableIdent: TableIdentifier,
+columnNames: Seq[String]) extends RunnableCommand {
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+val sessionState = sparkSession.sessionState
+val db = 
tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase)
+val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db))
+val relation = 
EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdentWithDB))
+
+// check correctness of column names
+val attributesToAnalyze = mutable.MutableList[Attribute]()
+val caseSensitive = sessionState.conf.caseSensitiveAnalysis
--- End diff --

See if  [the comment 
here](https://github.com/apache/spark/pull/15090#discussion_r79329382) can 
answer your concern.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...

2016-09-21 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/15090#discussion_r79925464
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
 ---
@@ -32,19 +34,74 @@ package org.apache.spark.sql.catalyst.plans.logical
  * @param sizeInBytes Physical size in bytes. For leaf operators this 
defaults to 1, otherwise it
  *defaults to the product of children's `sizeInBytes`.
  * @param rowCount Estimated number of rows.
+ * @param colStats Column-level statistics.
  * @param isBroadcastable If true, output is small enough to be used in a 
broadcast join.
  */
 case class Statistics(
 sizeInBytes: BigInt,
 rowCount: Option[BigInt] = None,
+colStats: Map[String, ColumnStats] = Map.empty,
 isBroadcastable: Boolean = false) {
+
   override def toString: String = "Statistics(" + simpleString + ")"
 
   /** Readable string representation for the Statistics. */
   def simpleString: String = {
 Seq(s"sizeInBytes=$sizeInBytes",
   if (rowCount.isDefined) s"rowCount=${rowCount.get}" else "",
+  if (colStats.nonEmpty) s"colStats=$colStats" else "",
   s"isBroadcastable=$isBroadcastable"
-).filter(_.nonEmpty).mkString("", ", ", "")
+).filter(_.nonEmpty).mkString(", ")
+  }
+}
+
+/**
+ * Statistics for a column.
+ * @param ndv Number of distinct values of the column.
+ */
+case class ColumnStats(
+dataType: DataType,
+numNulls: Long,
+max: Option[Any] = None,
+min: Option[Any] = None,
+ndv: Option[Long] = None,
+avgColLen: Option[Double] = None,
+maxColLen: Option[Long] = None,
+numTrues: Option[Long] = None,
+numFalses: Option[Long] = None) {
+
+  override def toString: String = "ColumnStats(" + simpleString + ")"
+
+  def simpleString: String = {
+Seq(s"numNulls=$numNulls",
+  if (max.isDefined) s"max=${max.get}" else "",
+  if (min.isDefined) s"min=${min.get}" else "",
+  if (ndv.isDefined) s"ndv=${ndv.get}" else "",
+  if (avgColLen.isDefined) s"avgColLen=${avgColLen.get}" else "",
+  if (maxColLen.isDefined) s"maxColLen=${maxColLen.get}" else "",
+  if (numTrues.isDefined) s"numTrues=${numTrues.get}" else "",
+  if (numFalses.isDefined) s"numFalses=${numFalses.get}" else ""
+).filter(_.nonEmpty).mkString(", ")
+  }
+}
+
+object ColumnStats {
+  def fromString(str: String, dataType: DataType): ColumnStats = {
--- End diff --

ok


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...

2016-09-21 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/15090#discussion_r79925435
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
 ---
@@ -32,19 +34,74 @@ package org.apache.spark.sql.catalyst.plans.logical
  * @param sizeInBytes Physical size in bytes. For leaf operators this 
defaults to 1, otherwise it
  *defaults to the product of children's `sizeInBytes`.
  * @param rowCount Estimated number of rows.
+ * @param colStats Column-level statistics.
  * @param isBroadcastable If true, output is small enough to be used in a 
broadcast join.
  */
 case class Statistics(
 sizeInBytes: BigInt,
 rowCount: Option[BigInt] = None,
+colStats: Map[String, ColumnStats] = Map.empty,
 isBroadcastable: Boolean = false) {
+
   override def toString: String = "Statistics(" + simpleString + ")"
 
   /** Readable string representation for the Statistics. */
   def simpleString: String = {
 Seq(s"sizeInBytes=$sizeInBytes",
   if (rowCount.isDefined) s"rowCount=${rowCount.get}" else "",
+  if (colStats.nonEmpty) s"colStats=$colStats" else "",
   s"isBroadcastable=$isBroadcastable"
-).filter(_.nonEmpty).mkString("", ", ", "")
+).filter(_.nonEmpty).mkString(", ")
+  }
+}
+
+/**
+ * Statistics for a column.
+ * @param ndv Number of distinct values of the column.
+ */
+case class ColumnStats(
+dataType: DataType,
+numNulls: Long,
+max: Option[Any] = None,
+min: Option[Any] = None,
+ndv: Option[Long] = None,
+avgColLen: Option[Double] = None,
+maxColLen: Option[Long] = None,
+numTrues: Option[Long] = None,
--- End diff --

For boolean type, we don't need histograms, we can just use these two 
values.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...

2016-09-21 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/15090#discussion_r79924979
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
 ---
@@ -32,19 +34,74 @@ package org.apache.spark.sql.catalyst.plans.logical
  * @param sizeInBytes Physical size in bytes. For leaf operators this 
defaults to 1, otherwise it
  *defaults to the product of children's `sizeInBytes`.
  * @param rowCount Estimated number of rows.
+ * @param colStats Column-level statistics.
  * @param isBroadcastable If true, output is small enough to be used in a 
broadcast join.
  */
 case class Statistics(
 sizeInBytes: BigInt,
 rowCount: Option[BigInt] = None,
+colStats: Map[String, ColumnStats] = Map.empty,
 isBroadcastable: Boolean = false) {
+
   override def toString: String = "Statistics(" + simpleString + ")"
 
   /** Readable string representation for the Statistics. */
   def simpleString: String = {
 Seq(s"sizeInBytes=$sizeInBytes",
   if (rowCount.isDefined) s"rowCount=${rowCount.get}" else "",
+  if (colStats.nonEmpty) s"colStats=$colStats" else "",
   s"isBroadcastable=$isBroadcastable"
-).filter(_.nonEmpty).mkString("", ", ", "")
+).filter(_.nonEmpty).mkString(", ")
+  }
+}
+
+/**
+ * Statistics for a column.
+ * @param ndv Number of distinct values of the column.
+ */
+case class ColumnStats(
+dataType: DataType,
+numNulls: Long,
+max: Option[Any] = None,
--- End diff --

Yes, but do we have a primitive numeric type is scala?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...

2016-09-21 Thread srinathshankar
Github user srinathshankar commented on a diff in the pull request:

https://github.com/apache/spark/pull/15090#discussion_r79921689
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
 ---
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import scala.collection.mutable
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
+import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, 
CatalogTable}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, 
ColumnStats, Statistics}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.types._
+
+
+/**
+ * Analyzes the given columns of the given table in the current database 
to generate statistics,
+ * which will be used in query optimizations.
+ */
+case class AnalyzeColumnCommand(
+tableIdent: TableIdentifier,
+columnNames: Seq[String]) extends RunnableCommand {
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+val sessionState = sparkSession.sessionState
+val db = 
tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase)
+val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db))
+val relation = 
EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdentWithDB))
+
+// check correctness of column names
+val attributesToAnalyze = mutable.MutableList[Attribute]()
+val caseSensitive = sessionState.conf.caseSensitiveAnalysis
+columnNames.foreach { col =>
+  val exprOption = relation.output.find { attr =>
+if (caseSensitive) attr.name == col else 
attr.name.equalsIgnoreCase(col)
+  }
+  val expr = exprOption.getOrElse(throw new 
AnalysisException(s"Invalid column name: $col."))
+  // do deduplication
+  if (!attributesToAnalyze.contains(expr)) {
+attributesToAnalyze += expr
+  }
+}
+
+relation match {
+  case catalogRel: CatalogRelation =>
+updateStats(catalogRel.catalogTable,
+  AnalyzeTableCommand.calculateTotalSize(sessionState, 
catalogRel.catalogTable))
+
+  case logicalRel: LogicalRelation if 
logicalRel.catalogTable.isDefined =>
+updateStats(logicalRel.catalogTable.get, 
logicalRel.relation.sizeInBytes)
+
+  case otherRelation =>
+throw new AnalysisException("ANALYZE TABLE is not supported for " +
+  s"${otherRelation.nodeName}.")
+}
+
+def updateStats(catalogTable: CatalogTable, newTotalSize: Long): Unit 
= {
+  // Collect statistics per column.
+  // The first element in the result will be the overall row count, 
the following elements
+  // will be structs containing all column stats.
+  // The layout of each struct follows the layout of the ColumnStats.
+  val ndvMaxErr = sessionState.conf.ndvMaxError
+  val expressions = Count(Literal(1)).toAggregateExpression() +:
+attributesToAnalyze.map(ColumnStatsStruct(_, ndvMaxErr))
+  val namedExpressions = expressions.map(e => Alias(e, e.toString)())
+  val statsRow = Dataset.ofRows(sparkSession, Aggregate(Nil, 
namedExpressions, relation))
+.queryExecution.toRdd.collect().head
+
+  // unwrap the result
+  val rowCount = statsRow.getLong(0)
+  val columnStats = attributesToAnalyze.zipWithIndex.map { case (expr, 
i) =>
+(expr.name, ColumnStatsStruct.unwrapStruct(statsRow, i + 1, expr))
+  }.toMap
+
+  val statistics = Statistics(
+sizeInBytes = newTotalSize,
+rowCount = Some(rowCount),

[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...

2016-09-21 Thread srinathshankar
Github user srinathshankar commented on a diff in the pull request:

https://github.com/apache/spark/pull/15090#discussion_r79893014
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
 ---
@@ -32,19 +34,74 @@ package org.apache.spark.sql.catalyst.plans.logical
  * @param sizeInBytes Physical size in bytes. For leaf operators this 
defaults to 1, otherwise it
  *defaults to the product of children's `sizeInBytes`.
  * @param rowCount Estimated number of rows.
+ * @param colStats Column-level statistics.
  * @param isBroadcastable If true, output is small enough to be used in a 
broadcast join.
  */
 case class Statistics(
 sizeInBytes: BigInt,
 rowCount: Option[BigInt] = None,
+colStats: Map[String, ColumnStats] = Map.empty,
 isBroadcastable: Boolean = false) {
+
   override def toString: String = "Statistics(" + simpleString + ")"
 
   /** Readable string representation for the Statistics. */
   def simpleString: String = {
 Seq(s"sizeInBytes=$sizeInBytes",
   if (rowCount.isDefined) s"rowCount=${rowCount.get}" else "",
+  if (colStats.nonEmpty) s"colStats=$colStats" else "",
   s"isBroadcastable=$isBroadcastable"
-).filter(_.nonEmpty).mkString("", ", ", "")
+).filter(_.nonEmpty).mkString(", ")
+  }
+}
+
+/**
+ * Statistics for a column.
+ * @param ndv Number of distinct values of the column.
+ */
+case class ColumnStats(
+dataType: DataType,
+numNulls: Long,
+max: Option[Any] = None,
+min: Option[Any] = None,
+ndv: Option[Long] = None,
+avgColLen: Option[Double] = None,
+maxColLen: Option[Long] = None,
+numTrues: Option[Long] = None,
+numFalses: Option[Long] = None) {
+
+  override def toString: String = "ColumnStats(" + simpleString + ")"
+
+  def simpleString: String = {
+Seq(s"numNulls=$numNulls",
+  if (max.isDefined) s"max=${max.get}" else "",
+  if (min.isDefined) s"min=${min.get}" else "",
+  if (ndv.isDefined) s"ndv=${ndv.get}" else "",
+  if (avgColLen.isDefined) s"avgColLen=${avgColLen.get}" else "",
+  if (maxColLen.isDefined) s"maxColLen=${maxColLen.get}" else "",
+  if (numTrues.isDefined) s"numTrues=${numTrues.get}" else "",
+  if (numFalses.isDefined) s"numFalses=${numFalses.get}" else ""
+).filter(_.nonEmpty).mkString(", ")
+  }
+}
+
+object ColumnStats {
+  def fromString(str: String, dataType: DataType): ColumnStats = {
--- End diff --

Suggestion: Based on fromString, It looks like simpleString above is 
practically used as a serializer to stuff stats in the catalog ? If so, could 
you change the name to catalogRepresentation or something like that so it's 
clear.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...

2016-09-21 Thread srinathshankar
Github user srinathshankar commented on a diff in the pull request:

https://github.com/apache/spark/pull/15090#discussion_r79891286
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
 ---
@@ -32,19 +34,74 @@ package org.apache.spark.sql.catalyst.plans.logical
  * @param sizeInBytes Physical size in bytes. For leaf operators this 
defaults to 1, otherwise it
  *defaults to the product of children's `sizeInBytes`.
  * @param rowCount Estimated number of rows.
+ * @param colStats Column-level statistics.
  * @param isBroadcastable If true, output is small enough to be used in a 
broadcast join.
  */
 case class Statistics(
 sizeInBytes: BigInt,
 rowCount: Option[BigInt] = None,
+colStats: Map[String, ColumnStats] = Map.empty,
 isBroadcastable: Boolean = false) {
+
   override def toString: String = "Statistics(" + simpleString + ")"
 
   /** Readable string representation for the Statistics. */
   def simpleString: String = {
 Seq(s"sizeInBytes=$sizeInBytes",
   if (rowCount.isDefined) s"rowCount=${rowCount.get}" else "",
+  if (colStats.nonEmpty) s"colStats=$colStats" else "",
   s"isBroadcastable=$isBroadcastable"
-).filter(_.nonEmpty).mkString("", ", ", "")
+).filter(_.nonEmpty).mkString(", ")
+  }
+}
+
+/**
+ * Statistics for a column.
+ * @param ndv Number of distinct values of the column.
+ */
+case class ColumnStats(
+dataType: DataType,
+numNulls: Long,
+max: Option[Any] = None,
+min: Option[Any] = None,
+ndv: Option[Long] = None,
+avgColLen: Option[Double] = None,
+maxColLen: Option[Long] = None,
+numTrues: Option[Long] = None,
--- End diff --

This seems special-cased for booleans. If we're planning to maintain 
histograms or frequent values, it would seem that this is unnecessary


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...

2016-09-21 Thread srinathshankar
Github user srinathshankar commented on a diff in the pull request:

https://github.com/apache/spark/pull/15090#discussion_r79897549
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
 ---
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import scala.collection.mutable
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
+import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, 
CatalogTable}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, 
ColumnStats, Statistics}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.types._
+
+
+/**
+ * Analyzes the given columns of the given table in the current database 
to generate statistics,
+ * which will be used in query optimizations.
+ */
+case class AnalyzeColumnCommand(
+tableIdent: TableIdentifier,
+columnNames: Seq[String]) extends RunnableCommand {
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+val sessionState = sparkSession.sessionState
+val db = 
tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase)
+val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db))
+val relation = 
EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdentWithDB))
+
+// check correctness of column names
+val attributesToAnalyze = mutable.MutableList[Attribute]()
+val caseSensitive = sessionState.conf.caseSensitiveAnalysis
--- End diff --

I think catalyst.resolver gives you a comparator based on the conf. Might 
be cleaner to just use that


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...

2016-09-21 Thread srinathshankar
Github user srinathshankar commented on a diff in the pull request:

https://github.com/apache/spark/pull/15090#discussion_r79893449
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
 ---
@@ -32,19 +34,74 @@ package org.apache.spark.sql.catalyst.plans.logical
  * @param sizeInBytes Physical size in bytes. For leaf operators this 
defaults to 1, otherwise it
  *defaults to the product of children's `sizeInBytes`.
  * @param rowCount Estimated number of rows.
+ * @param colStats Column-level statistics.
  * @param isBroadcastable If true, output is small enough to be used in a 
broadcast join.
  */
 case class Statistics(
 sizeInBytes: BigInt,
 rowCount: Option[BigInt] = None,
+colStats: Map[String, ColumnStats] = Map.empty,
 isBroadcastable: Boolean = false) {
+
   override def toString: String = "Statistics(" + simpleString + ")"
 
   /** Readable string representation for the Statistics. */
   def simpleString: String = {
 Seq(s"sizeInBytes=$sizeInBytes",
   if (rowCount.isDefined) s"rowCount=${rowCount.get}" else "",
+  if (colStats.nonEmpty) s"colStats=$colStats" else "",
   s"isBroadcastable=$isBroadcastable"
-).filter(_.nonEmpty).mkString("", ", ", "")
+).filter(_.nonEmpty).mkString(", ")
+  }
+}
+
+/**
+ * Statistics for a column.
+ * @param ndv Number of distinct values of the column.
+ */
+case class ColumnStats(
+dataType: DataType,
+numNulls: Long,
+max: Option[Any] = None,
--- End diff --

Shouldn't max/min be numeric ? (You're not planning on putting strings in 
here, right) ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...

2016-09-21 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/15090#discussion_r79767198
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
 ---
@@ -209,16 +212,17 @@ class FindDataSourceTable(sparkSession: SparkSession) 
extends Rule[LogicalPlan]
 
 LogicalRelation(
   dataSource.resolveRelation(),
+  expectedOutputAttributes = Some(simpleCatalogRelation.output),
--- End diff --

Yeah, I'm doing this, I'll submit it tomorrow:)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...

2016-09-21 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/15090#discussion_r79766998
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
 ---
@@ -209,16 +212,17 @@ class FindDataSourceTable(sparkSession: SparkSession) 
extends Rule[LogicalPlan]
 
 LogicalRelation(
   dataSource.resolveRelation(),
+  expectedOutputAttributes = Some(simpleCatalogRelation.output),
--- End diff --

Could you create a separate PR for this bug fix?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...

2016-09-21 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/15090#discussion_r79766674
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala ---
@@ -87,19 +87,28 @@ class SparkSqlAstBuilder(conf: SQLConf) extends 
AstBuilder {
   }
 
   /**
-   * Create an [[AnalyzeTableCommand]] command. This currently only 
implements the NOSCAN
-   * option (other options are passed on to Hive) e.g.:
+   * Create an [[AnalyzeTableCommand]] command or an 
[[AnalyzeColumnCommand]] command.
+   * Example SQL for analyzing table :
* {{{
*   ANALYZE TABLE table COMPUTE STATISTICS NOSCAN;
+   *   ANALYZE TABLE table COMPUTE STATISTICS;
--- End diff --

You can combine them by `ANALYZE TABLE table COMPUTE STATISTICS [NOSCAN];`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...

2016-09-20 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/15090#discussion_r79707997
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala ---
@@ -87,19 +87,19 @@ class SparkSqlAstBuilder(conf: SQLConf) extends 
AstBuilder {
   }
 
   /**
-   * Create an [[AnalyzeTableCommand]] command. This currently only 
implements the NOSCAN
-   * option (other options are passed on to Hive) e.g.:
-   * {{{
-   *   ANALYZE TABLE table COMPUTE STATISTICS NOSCAN;
-   * }}}
--- End diff --

This is to show the syntax we support. You can check almost all the DDL, we 
put the syntax before the function. It can help others understand your 
implementation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...

2016-09-20 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/15090#discussion_r79705637
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala ---
@@ -87,19 +87,19 @@ class SparkSqlAstBuilder(conf: SQLConf) extends 
AstBuilder {
   }
 
   /**
-   * Create an [[AnalyzeTableCommand]] command. This currently only 
implements the NOSCAN
-   * option (other options are passed on to Hive) e.g.:
-   * {{{
-   *   ANALYZE TABLE table COMPUTE STATISTICS NOSCAN;
-   * }}}
--- End diff --

@gatorsmile This was a limitation explaining that we only supported analyze 
table with NOSCAN, now we don't have this limitation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...

2016-09-20 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/15090#discussion_r79571121
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
 ---
@@ -129,3 +89,51 @@ case class AnalyzeTableCommand(tableName: String, 
noscan: Boolean = true) extend
 Seq.empty[Row]
   }
 }
+
+object AnalyzeTableCommand extends Logging {
+
+  def calculateTotalSize(sparkSession: SparkSession, catalogTable: 
CatalogTable): Long = {
+// This method is mainly based on
+// 
org.apache.hadoop.hive.ql.stats.StatsUtils.getFileSizeForTable(HiveConf, Table)
+// in Hive 0.13 (except that we do not use fs.getContentSummary).
+// TODO: Generalize statistics collection.
+// TODO: Why fs.getContentSummary returns wrong size on Jenkins?
+// Can we use fs.getContentSummary in future?
+// Seems fs.getContentSummary returns wrong table size on Jenkins. So 
we use
+// countFileSize to count the table size.
+val stagingDir =
+sparkSession.sessionState.conf.getConfString("hive.exec.stagingdir", 
".hive-staging")
--- End diff --

FYI - there is an Indentation mistake here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...

2016-09-20 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/15090#discussion_r79550951
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsTest.scala ---
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import org.apache.spark.sql.{DataFrame, QueryTest}
+import org.apache.spark.sql.catalyst.plans.logical.{ColumnStats, 
Statistics}
+import org.apache.spark.sql.execution.command.{AnalyzeColumnCommand, 
AnalyzeTableCommand}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.test.SQLTestUtils
+import org.apache.spark.sql.types._
+
+trait StatisticsTest extends QueryTest with TestHiveSingleton with 
SQLTestUtils {
+
+  def assertAnalyzeCommand(analyzeCommand: String, c: Class[_]) {
+val parsed = spark.sessionState.sqlParser.parsePlan(analyzeCommand)
+val operators = parsed.collect {
+  case a: AnalyzeTableCommand => a
+  case b: AnalyzeColumnCommand => b
+  case o => o
--- End diff --

? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...

2016-09-20 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/15090#discussion_r79550718
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsTest.scala ---
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import org.apache.spark.sql.{DataFrame, QueryTest}
+import org.apache.spark.sql.catalyst.plans.logical.{ColumnStats, 
Statistics}
+import org.apache.spark.sql.execution.command.{AnalyzeColumnCommand, 
AnalyzeTableCommand}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.test.SQLTestUtils
+import org.apache.spark.sql.types._
+
+trait StatisticsTest extends QueryTest with TestHiveSingleton with 
SQLTestUtils {
+
+  def assertAnalyzeCommand(analyzeCommand: String, c: Class[_]) {
+val parsed = spark.sessionState.sqlParser.parsePlan(analyzeCommand)
+val operators = parsed.collect {
+  case a: AnalyzeTableCommand => a
+  case b: AnalyzeColumnCommand => b
+  case o => o
+}
+
+assert(operators.size === 1)
+if (operators(0).getClass() != c) {
+  fail(
+s"""$analyzeCommand expected command: $c, but got ${operators(0)}
+   |parsed command:
+   |$parsed
+   """.stripMargin)
--- End diff --

Please update the syntax.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...

2016-09-20 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/15090#discussion_r79547634
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
 ---
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import scala.collection.mutable
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
+import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, 
CatalogTable}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, 
ColumnStats, Statistics}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.types._
+
+
+/**
+ * Analyzes the given columns of the given table in the current database 
to generate statistics,
+ * which will be used in query optimizations.
+ */
+case class AnalyzeColumnCommand(
+tableIdent: TableIdentifier,
+columnNames: Seq[String]) extends RunnableCommand {
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+val sessionState = sparkSession.sessionState
+val relation = 
EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdent))
+
+// check correctness of column names
+val attributesToAnalyze = mutable.MutableList[Attribute]()
+val caseSensitive = sessionState.conf.caseSensitiveAnalysis
+columnNames.foreach { col =>
+  val exprOption = relation.output.find { attr =>
+if (caseSensitive) attr.name == col else 
attr.name.equalsIgnoreCase(col)
+  }
+  val expr = exprOption.getOrElse(throw new 
AnalysisException(s"Invalid column name: $col."))
+  // do deduplication
+  if (!attributesToAnalyze.contains(expr)) {
+attributesToAnalyze += expr
+  }
+}
+
+relation match {
+  case catalogRel: CatalogRelation =>
+updateStats(catalogRel.catalogTable,
+  AnalyzeTableCommand.calculateTotalSize(sparkSession, 
catalogRel.catalogTable))
+
+  case logicalRel: LogicalRelation if 
logicalRel.catalogTable.isDefined =>
+updateStats(logicalRel.catalogTable.get, 
logicalRel.relation.sizeInBytes)
+
+  case otherRelation =>
+throw new AnalysisException("ANALYZE TABLE is not supported for " +
+  s"${otherRelation.nodeName}.")
+}
+
+def updateStats(catalogTable: CatalogTable, newTotalSize: Long): Unit 
= {
+  // Collect statistics per column.
+  // The first element in the result will be the overall row count, 
the following elements
+  // will be structs containing all column stats.
+  // The layout of each struct follows the layout of the ColumnStats.
+  val ndvMaxErr = sessionState.conf.ndvMaxError
+  val expressions = Count(Literal(1)).toAggregateExpression() +:
+attributesToAnalyze.map(ColumnStatsStruct(_, ndvMaxErr))
+  val namedExpressions = expressions.map(e => Alias(e, e.toString)())
+  val statsRow = Dataset.ofRows(sparkSession, Aggregate(Nil, 
namedExpressions, relation))
+.queryExecution.toRdd.collect().head
+
+  // unwrap the result
+  val rowCount = statsRow.getLong(0)
+  val columnStats = attributesToAnalyze.zipWithIndex.map { case (expr, 
i) =>
+(expr.name, ColumnStatsStruct.unwrapStruct(statsRow, i + 1, expr))
+  }.toMap
+
+  val statistics = Statistics(
+sizeInBytes = newTotalSize,
+rowCount = Some(rowCount),
+colStats = columnStats ++ 
catalogTable.stats.map(_.colStats).getOrElse(Map()))
+  sessionState.catalog.alterTable(catalogTable.copy(stats = 

[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...

2016-09-20 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/15090#discussion_r79547296
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
 ---
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import scala.collection.mutable
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
+import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, 
CatalogTable}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, 
ColumnStats, Statistics}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.types._
+
+
+/**
+ * Analyzes the given columns of the given table in the current database 
to generate statistics,
+ * which will be used in query optimizations.
+ */
+case class AnalyzeColumnCommand(
+tableIdent: TableIdentifier,
+columnNames: Seq[String]) extends RunnableCommand {
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+val sessionState = sparkSession.sessionState
+val relation = 
EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdent))
+
+// check correctness of column names
+val attributesToAnalyze = mutable.MutableList[Attribute]()
+val caseSensitive = sessionState.conf.caseSensitiveAnalysis
+columnNames.foreach { col =>
+  val exprOption = relation.output.find { attr =>
+if (caseSensitive) attr.name == col else 
attr.name.equalsIgnoreCase(col)
+  }
+  val expr = exprOption.getOrElse(throw new 
AnalysisException(s"Invalid column name: $col."))
+  // do deduplication
+  if (!attributesToAnalyze.contains(expr)) {
+attributesToAnalyze += expr
+  }
+}
+
+relation match {
+  case catalogRel: CatalogRelation =>
+updateStats(catalogRel.catalogTable,
+  AnalyzeTableCommand.calculateTotalSize(sparkSession, 
catalogRel.catalogTable))
+
+  case logicalRel: LogicalRelation if 
logicalRel.catalogTable.isDefined =>
+updateStats(logicalRel.catalogTable.get, 
logicalRel.relation.sizeInBytes)
+
+  case otherRelation =>
+throw new AnalysisException("ANALYZE TABLE is not supported for " +
+  s"${otherRelation.nodeName}.")
+}
+
+def updateStats(catalogTable: CatalogTable, newTotalSize: Long): Unit 
= {
+  // Collect statistics per column.
+  // The first element in the result will be the overall row count, 
the following elements
+  // will be structs containing all column stats.
+  // The layout of each struct follows the layout of the ColumnStats.
+  val ndvMaxErr = sessionState.conf.ndvMaxError
+  val expressions = Count(Literal(1)).toAggregateExpression() +:
+attributesToAnalyze.map(ColumnStatsStruct(_, ndvMaxErr))
+  val namedExpressions = expressions.map(e => Alias(e, e.toString)())
+  val statsRow = Dataset.ofRows(sparkSession, Aggregate(Nil, 
namedExpressions, relation))
+.queryExecution.toRdd.collect().head
+
+  // unwrap the result
+  val rowCount = statsRow.getLong(0)
+  val columnStats = attributesToAnalyze.zipWithIndex.map { case (expr, 
i) =>
+(expr.name, ColumnStatsStruct.unwrapStruct(statsRow, i + 1, expr))
+  }.toMap
+
+  val statistics = Statistics(
+sizeInBytes = newTotalSize,
+rowCount = Some(rowCount),
+colStats = columnStats ++ 
catalogTable.stats.map(_.colStats).getOrElse(Map()))
+  sessionState.catalog.alterTable(catalogTable.copy(stats = 

[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...

2016-09-20 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/15090#discussion_r79546838
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
 ---
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import scala.collection.mutable
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
+import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, 
CatalogTable}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, 
ColumnStats, Statistics}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.types._
+
+
+/**
+ * Analyzes the given columns of the given table in the current database 
to generate statistics,
+ * which will be used in query optimizations.
+ */
+case class AnalyzeColumnCommand(
+tableIdent: TableIdentifier,
+columnNames: Seq[String]) extends RunnableCommand {
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+val sessionState = sparkSession.sessionState
+val relation = 
EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdent))
+
+// check correctness of column names
+val attributesToAnalyze = mutable.MutableList[Attribute]()
+val caseSensitive = sessionState.conf.caseSensitiveAnalysis
+columnNames.foreach { col =>
+  val exprOption = relation.output.find { attr =>
+if (caseSensitive) attr.name == col else 
attr.name.equalsIgnoreCase(col)
+  }
+  val expr = exprOption.getOrElse(throw new 
AnalysisException(s"Invalid column name: $col."))
+  // do deduplication
+  if (!attributesToAnalyze.contains(expr)) {
+attributesToAnalyze += expr
+  }
+}
+
+relation match {
+  case catalogRel: CatalogRelation =>
+updateStats(catalogRel.catalogTable,
+  AnalyzeTableCommand.calculateTotalSize(sparkSession, 
catalogRel.catalogTable))
+
+  case logicalRel: LogicalRelation if 
logicalRel.catalogTable.isDefined =>
+updateStats(logicalRel.catalogTable.get, 
logicalRel.relation.sizeInBytes)
+
+  case otherRelation =>
+throw new AnalysisException("ANALYZE TABLE is not supported for " +
+  s"${otherRelation.nodeName}.")
+}
+
+def updateStats(catalogTable: CatalogTable, newTotalSize: Long): Unit 
= {
+  // Collect statistics per column.
+  // The first element in the result will be the overall row count, 
the following elements
+  // will be structs containing all column stats.
+  // The layout of each struct follows the layout of the ColumnStats.
+  val ndvMaxErr = sessionState.conf.ndvMaxError
+  val expressions = Count(Literal(1)).toAggregateExpression() +:
+attributesToAnalyze.map(ColumnStatsStruct(_, ndvMaxErr))
+  val namedExpressions = expressions.map(e => Alias(e, e.toString)())
+  val statsRow = Dataset.ofRows(sparkSession, Aggregate(Nil, 
namedExpressions, relation))
+.queryExecution.toRdd.collect().head
+
+  // unwrap the result
+  val rowCount = statsRow.getLong(0)
+  val columnStats = attributesToAnalyze.zipWithIndex.map { case (expr, 
i) =>
+(expr.name, ColumnStatsStruct.unwrapStruct(statsRow, i + 1, expr))
+  }.toMap
+
+  val statistics = Statistics(
+sizeInBytes = newTotalSize,
+rowCount = Some(rowCount),
+colStats = columnStats ++ 
catalogTable.stats.map(_.colStats).getOrElse(Map()))
+  sessionState.catalog.alterTable(catalogTable.copy(stats = 

[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...

2016-09-20 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/15090#discussion_r79546259
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
 ---
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import scala.collection.mutable
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
+import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, 
CatalogTable}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, 
ColumnStats, Statistics}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.types._
+
+
+/**
+ * Analyzes the given columns of the given table in the current database 
to generate statistics,
+ * which will be used in query optimizations.
+ */
+case class AnalyzeColumnCommand(
+tableIdent: TableIdentifier,
+columnNames: Seq[String]) extends RunnableCommand {
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+val sessionState = sparkSession.sessionState
+val relation = 
EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdent))
+
+// check correctness of column names
+val attributesToAnalyze = mutable.MutableList[Attribute]()
+val caseSensitive = sessionState.conf.caseSensitiveAnalysis
+columnNames.foreach { col =>
+  val exprOption = relation.output.find { attr =>
+if (caseSensitive) attr.name == col else 
attr.name.equalsIgnoreCase(col)
+  }
+  val expr = exprOption.getOrElse(throw new 
AnalysisException(s"Invalid column name: $col."))
+  // do deduplication
+  if (!attributesToAnalyze.contains(expr)) {
+attributesToAnalyze += expr
+  }
+}
+
+relation match {
+  case catalogRel: CatalogRelation =>
+updateStats(catalogRel.catalogTable,
+  AnalyzeTableCommand.calculateTotalSize(sparkSession, 
catalogRel.catalogTable))
+
+  case logicalRel: LogicalRelation if 
logicalRel.catalogTable.isDefined =>
+updateStats(logicalRel.catalogTable.get, 
logicalRel.relation.sizeInBytes)
+
+  case otherRelation =>
+throw new AnalysisException("ANALYZE TABLE is not supported for " +
+  s"${otherRelation.nodeName}.")
+}
+
+def updateStats(catalogTable: CatalogTable, newTotalSize: Long): Unit 
= {
+  // Collect statistics per column.
+  // The first element in the result will be the overall row count, 
the following elements
+  // will be structs containing all column stats.
+  // The layout of each struct follows the layout of the ColumnStats.
+  val ndvMaxErr = sessionState.conf.ndvMaxError
+  val expressions = Count(Literal(1)).toAggregateExpression() +:
+attributesToAnalyze.map(ColumnStatsStruct(_, ndvMaxErr))
+  val namedExpressions = expressions.map(e => Alias(e, e.toString)())
+  val statsRow = Dataset.ofRows(sparkSession, Aggregate(Nil, 
namedExpressions, relation))
+.queryExecution.toRdd.collect().head
+
+  // unwrap the result
+  val rowCount = statsRow.getLong(0)
+  val columnStats = attributesToAnalyze.zipWithIndex.map { case (expr, 
i) =>
+(expr.name, ColumnStatsStruct.unwrapStruct(statsRow, i + 1, expr))
+  }.toMap
+
+  val statistics = Statistics(
+sizeInBytes = newTotalSize,
+rowCount = Some(rowCount),
+colStats = columnStats ++ 
catalogTable.stats.map(_.colStats).getOrElse(Map()))
+  sessionState.catalog.alterTable(catalogTable.copy(stats = 

[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...

2016-09-20 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/15090#discussion_r79544029
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsColumnSuite.scala 
---
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import java.sql.{Date, Timestamp}
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.plans.logical.ColumnStats
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.command.AnalyzeColumnCommand
+import org.apache.spark.sql.types._
+
+class StatisticsColumnSuite extends StatisticsTest {
+  import testImplicits._
+
+  test("parse analyze column commands") {
+val table = "table"
+assertAnalyzeCommand(
+  s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS key, value",
+  classOf[AnalyzeColumnCommand])
+
+intercept[AnalysisException] {
+  sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS")
+}
+  }
+
+  test("check correctness of columns") {
+val table = "tbl"
+val quotedColumn = "x.yz"
+val quotedName = s"`$quotedColumn`"
+withTable(table) {
+  sql(s"CREATE TABLE $table (abc int, $quotedName string)")
+
+  val invalidColError = intercept[AnalysisException] {
+sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS key")
+  }
+  assert(invalidColError.message == s"Invalid column name: key.")
+
+  withSQLConf("spark.sql.caseSensitive" -> "true") {
+val invalidErr = intercept[AnalysisException] {
+  sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS ABC")
+}
+assert(invalidErr.message == s"Invalid column name: ABC.")
+  }
+
+  withSQLConf("spark.sql.caseSensitive" -> "false") {
+sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS 
${quotedName.toUpperCase}, " +
+  s"ABC, $quotedName")
+val df = sql(s"SELECT * FROM $table")
+val stats = df.queryExecution.analyzed.collect {
+  case rel: MetastoreRelation =>
+val colStats = rel.catalogTable.stats.get.colStats
+// check deduplication
+assert(colStats.size == 2)
+assert(colStats.contains(quotedColumn))
+assert(colStats.contains("abc"))
+}
+assert(stats.size == 1)
+  }
+}
+  }
+
+  private def getNonNullValues[T](values: Seq[Option[T]]): Seq[T] = {
+values.filter(_.isDefined).map(_.get)
+  }
+
+  test("column-level statistics for integral type columns") {
+val values = (0 to 5).map { i =>
+  if (i % 2 == 0) None else Some(i)
+}
+val data = values.map { i =>
+  (i.map(_.toByte), i.map(_.toShort), i.map(_.toInt), i.map(_.toLong))
+}
+
+val df = data.toDF("c1", "c2", "c3", "c4")
+val nonNullValues = getNonNullValues[Int](values)
+val statsSeq = df.schema.map { f =>
+  val colStats = ColumnStats(
+dataType = f.dataType,
+numNulls = values.count(_.isEmpty),
+max = Some(nonNullValues.max),
+min = Some(nonNullValues.min),
+ndv = Some(nonNullValues.distinct.length.toLong))
+  (f.name, colStats)
+}
+checkColStats(df, statsSeq)
+  }
+
+  test("column-level statistics for fractional type columns") {
+val values = (0 to 5).map { i =>
+  if (i == 0) None else Some(i + i * 0.01d)
+}
+val data = values.map { i =>
+  (i.map(_.toFloat), i.map(_.toDouble), i.map(Decimal(_)))
+}
+
+val df = data.toDF("c1", "c2", "c3")
+val nonNullValues = getNonNullValues[Double](values)
+val statsSeq = df.schema.map { f =>
+  val colStats = ColumnStats(
+dataType = f.dataType,
+ 

[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...

2016-09-20 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/15090#discussion_r79543613
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsColumnSuite.scala 
---
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import java.sql.{Date, Timestamp}
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.plans.logical.ColumnStats
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.command.AnalyzeColumnCommand
+import org.apache.spark.sql.types._
+
+class StatisticsColumnSuite extends StatisticsTest {
+  import testImplicits._
+
+  test("parse analyze column commands") {
+val table = "table"
+assertAnalyzeCommand(
+  s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS key, value",
+  classOf[AnalyzeColumnCommand])
+
+intercept[AnalysisException] {
+  sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS")
+}
+  }
+
+  test("check correctness of columns") {
+val table = "tbl"
+val quotedColumn = "x.yz"
+val quotedName = s"`$quotedColumn`"
+withTable(table) {
+  sql(s"CREATE TABLE $table (abc int, $quotedName string)")
+
+  val invalidColError = intercept[AnalysisException] {
+sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS key")
+  }
+  assert(invalidColError.message == s"Invalid column name: key.")
+
+  withSQLConf("spark.sql.caseSensitive" -> "true") {
+val invalidErr = intercept[AnalysisException] {
+  sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS ABC")
+}
+assert(invalidErr.message == s"Invalid column name: ABC.")
+  }
+
+  withSQLConf("spark.sql.caseSensitive" -> "false") {
+sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS 
${quotedName.toUpperCase}, " +
+  s"ABC, $quotedName")
+val df = sql(s"SELECT * FROM $table")
+val stats = df.queryExecution.analyzed.collect {
+  case rel: MetastoreRelation =>
+val colStats = rel.catalogTable.stats.get.colStats
+// check deduplication
+assert(colStats.size == 2)
+assert(colStats.contains(quotedColumn))
+assert(colStats.contains("abc"))
+}
+assert(stats.size == 1)
+  }
+}
+  }
+
+  private def getNonNullValues[T](values: Seq[Option[T]]): Seq[T] = {
+values.filter(_.isDefined).map(_.get)
+  }
+
+  test("column-level statistics for integral type columns") {
+val values = (0 to 5).map { i =>
+  if (i % 2 == 0) None else Some(i)
+}
+val data = values.map { i =>
+  (i.map(_.toByte), i.map(_.toShort), i.map(_.toInt), i.map(_.toLong))
+}
+
+val df = data.toDF("c1", "c2", "c3", "c4")
+val nonNullValues = getNonNullValues[Int](values)
+val statsSeq = df.schema.map { f =>
--- End diff --

Please rename all the `statsSeq` to `expectedColStats`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...

2016-09-20 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/15090#discussion_r79543159
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsColumnSuite.scala 
---
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import java.sql.{Date, Timestamp}
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.plans.logical.ColumnStats
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.command.AnalyzeColumnCommand
+import org.apache.spark.sql.types._
+
+class StatisticsColumnSuite extends StatisticsTest {
+  import testImplicits._
+
+  test("parse analyze column commands") {
+val table = "table"
+assertAnalyzeCommand(
+  s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS key, value",
+  classOf[AnalyzeColumnCommand])
+
+intercept[AnalysisException] {
+  sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS")
+}
+  }
+
+  test("check correctness of columns") {
+val table = "tbl"
+val quotedColumn = "x.yz"
+val quotedName = s"`$quotedColumn`"
+withTable(table) {
+  sql(s"CREATE TABLE $table (abc int, $quotedName string)")
+
+  val invalidColError = intercept[AnalysisException] {
+sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS key")
+  }
+  assert(invalidColError.message == s"Invalid column name: key.")
+
+  withSQLConf("spark.sql.caseSensitive" -> "true") {
+val invalidErr = intercept[AnalysisException] {
+  sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS ABC")
+}
+assert(invalidErr.message == s"Invalid column name: ABC.")
--- End diff --

A useless `s`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...

2016-09-20 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/15090#discussion_r79543164
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsColumnSuite.scala 
---
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import java.sql.{Date, Timestamp}
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.plans.logical.ColumnStats
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.command.AnalyzeColumnCommand
+import org.apache.spark.sql.types._
+
+class StatisticsColumnSuite extends StatisticsTest {
+  import testImplicits._
+
+  test("parse analyze column commands") {
+val table = "table"
+assertAnalyzeCommand(
+  s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS key, value",
+  classOf[AnalyzeColumnCommand])
+
+intercept[AnalysisException] {
+  sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS")
+}
+  }
+
+  test("check correctness of columns") {
+val table = "tbl"
+val quotedColumn = "x.yz"
+val quotedName = s"`$quotedColumn`"
+withTable(table) {
+  sql(s"CREATE TABLE $table (abc int, $quotedName string)")
+
+  val invalidColError = intercept[AnalysisException] {
+sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS key")
+  }
+  assert(invalidColError.message == s"Invalid column name: key.")
--- End diff --

A useless `s`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...

2016-09-20 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/15090#discussion_r79543008
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsColumnSuite.scala 
---
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import java.sql.{Date, Timestamp}
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.plans.logical.ColumnStats
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.command.AnalyzeColumnCommand
+import org.apache.spark.sql.types._
+
+class StatisticsColumnSuite extends StatisticsTest {
+  import testImplicits._
+
+  test("parse analyze column commands") {
+val table = "table"
+assertAnalyzeCommand(
+  s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS key, value",
+  classOf[AnalyzeColumnCommand])
+
+intercept[AnalysisException] {
--- End diff --

This is a `ParseException`, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...

2016-09-20 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/15090#discussion_r79542607
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala ---
@@ -87,19 +87,19 @@ class SparkSqlAstBuilder(conf: SQLConf) extends 
AstBuilder {
   }
 
   /**
-   * Create an [[AnalyzeTableCommand]] command. This currently only 
implements the NOSCAN
-   * option (other options are passed on to Hive) e.g.:
-   * {{{
-   *   ANALYZE TABLE table COMPUTE STATISTICS NOSCAN;
-   * }}}
--- End diff --

Any reason why you removed it? Since this is out-dated, you just need to 
update it. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...

2016-09-19 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/15090#discussion_r79521372
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsColumnSuite.scala 
---
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import java.sql.{Date, Timestamp}
+
+import org.apache.spark.sql.{AnalysisException, Row}
+import org.apache.spark.sql.catalyst.plans.logical.BasicColStats
+import org.apache.spark.sql.execution.command.AnalyzeColumnCommand
+import org.apache.spark.sql.types._
+
+class StatisticsColumnSuite extends StatisticsTest {
+
+  test("parse analyze column commands") {
+val table = "table"
+assertAnalyzeCommand(
+  s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS key, value",
+  classOf[AnalyzeColumnCommand])
+
+val noColumnError = intercept[AnalysisException] {
+  sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS")
+}
+assert(noColumnError.message == "Need to specify the columns to 
analyze. Usage: " +
+  "ANALYZE TABLE tbl COMPUTE STATISTICS FOR COLUMNS key, value")
+
+withTable(table) {
+  sql(s"CREATE TABLE $table (key INT, value STRING)")
+  val invalidColError = intercept[AnalysisException] {
+sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS k")
+  }
+  assert(invalidColError.message == s"Invalid column name: k")
+
+  val duplicateColError = intercept[AnalysisException] {
+sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS key, 
value, key")
+  }
+  assert(duplicateColError.message == s"Duplicate column name: key")
+
+  withSQLConf("spark.sql.caseSensitive" -> "true") {
+val invalidErr = intercept[AnalysisException] {
+  sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS keY")
+}
+assert(invalidErr.message == s"Invalid column name: keY")
+  }
+
+  withSQLConf("spark.sql.caseSensitive" -> "false") {
+val duplicateErr = intercept[AnalysisException] {
+  sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS key, 
value, vaLue")
+}
+assert(duplicateErr.message == s"Duplicate column name: vaLue")
+  }
+}
+  }
+
+  test("basic statistics for integral type columns") {
+val rdd = sparkContext.parallelize(Seq("1", null, "2", "3", null)).map 
{ i =>
+  if (i != null) Row(i.toByte, i.toShort, i.toInt, i.toLong) else 
Row(i, i, i, i)
--- End diff --

Cool, please add some salt to this when you fix (as I don't think mine is 
perfect anyway :)).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...

2016-09-19 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/15090#discussion_r79520564
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsColumnSuite.scala 
---
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import java.sql.{Date, Timestamp}
+
+import org.apache.spark.sql.{AnalysisException, Row}
+import org.apache.spark.sql.catalyst.plans.logical.BasicColStats
+import org.apache.spark.sql.execution.command.AnalyzeColumnCommand
+import org.apache.spark.sql.types._
+
+class StatisticsColumnSuite extends StatisticsTest {
+
+  test("parse analyze column commands") {
+val table = "table"
+assertAnalyzeCommand(
+  s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS key, value",
+  classOf[AnalyzeColumnCommand])
+
+val noColumnError = intercept[AnalysisException] {
+  sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS")
+}
+assert(noColumnError.message == "Need to specify the columns to 
analyze. Usage: " +
+  "ANALYZE TABLE tbl COMPUTE STATISTICS FOR COLUMNS key, value")
+
+withTable(table) {
+  sql(s"CREATE TABLE $table (key INT, value STRING)")
+  val invalidColError = intercept[AnalysisException] {
+sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS k")
+  }
+  assert(invalidColError.message == s"Invalid column name: k")
+
+  val duplicateColError = intercept[AnalysisException] {
+sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS key, 
value, key")
+  }
+  assert(duplicateColError.message == s"Duplicate column name: key")
+
+  withSQLConf("spark.sql.caseSensitive" -> "true") {
+val invalidErr = intercept[AnalysisException] {
+  sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS keY")
+}
+assert(invalidErr.message == s"Invalid column name: keY")
+  }
+
+  withSQLConf("spark.sql.caseSensitive" -> "false") {
+val duplicateErr = intercept[AnalysisException] {
+  sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS key, 
value, vaLue")
+}
+assert(duplicateErr.message == s"Duplicate column name: vaLue")
+  }
+}
+  }
+
+  test("basic statistics for integral type columns") {
+val rdd = sparkContext.parallelize(Seq("1", null, "2", "3", null)).map 
{ i =>
+  if (i != null) Row(i.toByte, i.toShort, i.toInt, i.toLong) else 
Row(i, i, i, i)
--- End diff --

@HyukjinKwon Seems better. Let me change the code based on this. Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...

2016-09-19 Thread ron8hu
Github user ron8hu commented on a diff in the pull request:

https://github.com/apache/spark/pull/15090#discussion_r79508043
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
 ---
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import scala.collection.mutable
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
+import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, 
CatalogTable}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, 
BasicColStats, Statistics}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.types._
+
+
+/**
+ * Analyzes the given columns of the given table in the current database 
to generate statistics,
+ * which will be used in query optimizations.
+ */
+case class AnalyzeColumnCommand(
+tableIdent: TableIdentifier,
+columnNames: Seq[String]) extends RunnableCommand {
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+val sessionState = sparkSession.sessionState
+val relation = 
EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdent))
+
+// check correctness of column names
+val validColumns = mutable.MutableList[NamedExpression]()
+val resolver = sessionState.conf.resolver
+columnNames.foreach { col =>
+  val exprOption = relation.resolve(col.split("\\."), resolver)
+  if (exprOption.isEmpty) {
+throw new AnalysisException(s"Invalid column name: $col")
+  }
+  if (validColumns.map(_.exprId).contains(exprOption.get.exprId)) {
+throw new AnalysisException(s"Duplicate column name: $col")
+  }
+  validColumns += exprOption.get
+}
+
+relation match {
+  case catalogRel: CatalogRelation =>
+updateStats(catalogRel.catalogTable,
+  AnalyzeTableCommand.calculateTotalSize(sparkSession, 
catalogRel.catalogTable))
+
+  case logicalRel: LogicalRelation if 
logicalRel.catalogTable.isDefined =>
+updateStats(logicalRel.catalogTable.get, 
logicalRel.relation.sizeInBytes)
+
+  case otherRelation =>
+throw new AnalysisException("ANALYZE TABLE is not supported for " +
+  s"${otherRelation.nodeName}.")
+}
+
+def updateStats(catalogTable: CatalogTable, newTotalSize: Long): Unit 
= {
+  // Collect statistics per column.
+  // The first element in the result will be the overall row count, 
the following elements
+  // will be structs containing all column stats.
+  // The layout of each struct follows the layout of the BasicColStats.
+  val ndvMaxErr = sessionState.conf.ndvMaxError
+  val expressions = Count(Literal(1)).toAggregateExpression() +:
+validColumns.map(ColumnStatsStruct(_, ndvMaxErr))
+  val namedExpressions = expressions.map(e => Alias(e, e.toString)())
+  val statsRow = Dataset.ofRows(sparkSession, Aggregate(Nil, 
namedExpressions, relation))
+.queryExecution.toRdd.collect().head
+
+  // unwrap the result
+  val rowCount = statsRow.getLong(0)
+  val colStats = validColumns.zipWithIndex.map { case (expr, i) =>
+val colInfo = statsRow.getStruct(i + 1, 
ColumnStatsStruct.statsNumber)
+val colStats = ColumnStatsStruct.unwrapRow(expr, colInfo)
+(expr.name, colStats)
+  }.toMap
+
+  val statistics =
+Statistics(sizeInBytes = newTotalSize, rowCount = Some(rowCount), 
basicColStats = colStats)
--- End diff --

For the "all or nothing" statistics approach, we can better 

[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...

2016-09-19 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/15090#discussion_r79502791
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
 ---
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import scala.collection.mutable
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
+import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, 
CatalogTable}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, 
BasicColStats, Statistics}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.types._
+
+
+/**
+ * Analyzes the given columns of the given table in the current database 
to generate statistics,
+ * which will be used in query optimizations.
+ */
+case class AnalyzeColumnCommand(
+tableIdent: TableIdentifier,
+columnNames: Seq[String]) extends RunnableCommand {
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+val sessionState = sparkSession.sessionState
+val relation = 
EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdent))
+
+// check correctness of column names
+val validColumns = mutable.MutableList[NamedExpression]()
+val resolver = sessionState.conf.resolver
+columnNames.foreach { col =>
+  val exprOption = relation.resolve(col.split("\\."), resolver)
+  if (exprOption.isEmpty) {
+throw new AnalysisException(s"Invalid column name: $col")
+  }
+  if (validColumns.map(_.exprId).contains(exprOption.get.exprId)) {
+throw new AnalysisException(s"Duplicate column name: $col")
+  }
+  validColumns += exprOption.get
+}
+
+relation match {
+  case catalogRel: CatalogRelation =>
+updateStats(catalogRel.catalogTable,
+  AnalyzeTableCommand.calculateTotalSize(sparkSession, 
catalogRel.catalogTable))
+
+  case logicalRel: LogicalRelation if 
logicalRel.catalogTable.isDefined =>
+updateStats(logicalRel.catalogTable.get, 
logicalRel.relation.sizeInBytes)
+
+  case otherRelation =>
+throw new AnalysisException("ANALYZE TABLE is not supported for " +
+  s"${otherRelation.nodeName}.")
+}
+
+def updateStats(catalogTable: CatalogTable, newTotalSize: Long): Unit 
= {
+  // Collect statistics per column.
+  // The first element in the result will be the overall row count, 
the following elements
+  // will be structs containing all column stats.
+  // The layout of each struct follows the layout of the BasicColStats.
+  val ndvMaxErr = sessionState.conf.ndvMaxError
+  val expressions = Count(Literal(1)).toAggregateExpression() +:
+validColumns.map(ColumnStatsStruct(_, ndvMaxErr))
+  val namedExpressions = expressions.map(e => Alias(e, e.toString)())
+  val statsRow = Dataset.ofRows(sparkSession, Aggregate(Nil, 
namedExpressions, relation))
+.queryExecution.toRdd.collect().head
+
+  // unwrap the result
+  val rowCount = statsRow.getLong(0)
+  val colStats = validColumns.zipWithIndex.map { case (expr, i) =>
+val colInfo = statsRow.getStruct(i + 1, 
ColumnStatsStruct.statsNumber)
+val colStats = ColumnStatsStruct.unwrapRow(expr, colInfo)
+(expr.name, colStats)
+  }.toMap
+
+  val statistics =
+Statistics(sizeInBytes = newTotalSize, rowCount = Some(rowCount), 
basicColStats = colStats)
--- End diff --

@hvanhovell Agree. If we want to support refresh column stats 

[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...

2016-09-19 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/15090#discussion_r79496929
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
 ---
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import scala.collection.mutable
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
+import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, 
CatalogTable}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, 
BasicColStats, Statistics}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.types._
+
+
+/**
+ * Analyzes the given columns of the given table in the current database 
to generate statistics,
+ * which will be used in query optimizations.
+ */
+case class AnalyzeColumnCommand(
+tableIdent: TableIdentifier,
+columnNames: Seq[String]) extends RunnableCommand {
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+val sessionState = sparkSession.sessionState
+val relation = 
EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdent))
+
+// check correctness of column names
+val validColumns = mutable.MutableList[NamedExpression]()
+val resolver = sessionState.conf.resolver
+columnNames.foreach { col =>
+  val exprOption = relation.resolve(col.split("\\."), resolver)
+  if (exprOption.isEmpty) {
+throw new AnalysisException(s"Invalid column name: $col")
+  }
+  if (validColumns.map(_.exprId).contains(exprOption.get.exprId)) {
+throw new AnalysisException(s"Duplicate column name: $col")
--- End diff --

Agree. Just do not forget to add a test case for it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...

2016-09-19 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/15090#discussion_r79496528
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
 ---
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import scala.collection.mutable
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
+import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, 
CatalogTable}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, 
BasicColStats, Statistics}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.types._
+
+
+/**
+ * Analyzes the given columns of the given table in the current database 
to generate statistics,
+ * which will be used in query optimizations.
+ */
+case class AnalyzeColumnCommand(
+tableIdent: TableIdentifier,
+columnNames: Seq[String]) extends RunnableCommand {
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+val sessionState = sparkSession.sessionState
+val relation = 
EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdent))
+
+// check correctness of column names
+val validColumns = mutable.MutableList[NamedExpression]()
+val resolver = sessionState.conf.resolver
+columnNames.foreach { col =>
+  val exprOption = relation.resolve(col.split("\\."), resolver)
--- End diff --

Hive does not store them in a case-sensitive way. However, we want to 
support it. See the PR: https://github.com/apache/spark/pull/14750 : )


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...

2016-09-19 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/15090#discussion_r79492362
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/StatisticsSuite.scala ---
@@ -101,4 +101,47 @@ class StatisticsSuite extends QueryTest with 
SharedSQLContext {
   checkTableStats(tableName, expectedRowCount = Some(2))
 }
   }
+
+  test("test column-level statistics for data source table created in 
InMemoryCatalog") {
+def checkColStats(colStats: BasicColStats, expectedColStats: 
BasicColStats): Unit = {
+  assert(colStats.dataType == expectedColStats.dataType)
+  assert(colStats.numNulls == expectedColStats.numNulls)
+  assert(colStats.max == expectedColStats.max)
+  assert(colStats.min == expectedColStats.min)
+  if (expectedColStats.ndv.isDefined) {
+// ndv is an approximate value, so we just make sure we have the 
value
+assert(colStats.ndv.get >= 0)
--- End diff --

BTW: sorry for the very cryptic comment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...

2016-09-19 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/15090#discussion_r79492145
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
 ---
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import scala.collection.mutable
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
+import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, 
CatalogTable}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, 
BasicColStats, Statistics}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.types._
+
+
+/**
+ * Analyzes the given columns of the given table in the current database 
to generate statistics,
+ * which will be used in query optimizations.
+ */
+case class AnalyzeColumnCommand(
+tableIdent: TableIdentifier,
+columnNames: Seq[String]) extends RunnableCommand {
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+val sessionState = sparkSession.sessionState
+val relation = 
EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdent))
+
+// check correctness of column names
+val validColumns = mutable.MutableList[NamedExpression]()
+val resolver = sessionState.conf.resolver
+columnNames.foreach { col =>
+  val exprOption = relation.resolve(col.split("\\."), resolver)
--- End diff --

Ok, that makes sense. You could also use the resolved attribute's name as a 
key to store the stats. @gatorsmile does hive store column names in a case 
sensitive way? I thought that we always convert them to lower case?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...

2016-09-19 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/15090#discussion_r79491476
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
 ---
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import scala.collection.mutable
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
+import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, 
CatalogTable}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, 
BasicColStats, Statistics}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.types._
+
+
+/**
+ * Analyzes the given columns of the given table in the current database 
to generate statistics,
+ * which will be used in query optimizations.
+ */
+case class AnalyzeColumnCommand(
+tableIdent: TableIdentifier,
+columnNames: Seq[String]) extends RunnableCommand {
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+val sessionState = sparkSession.sessionState
+val relation = 
EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdent))
+
+// check correctness of column names
+val validColumns = mutable.MutableList[NamedExpression]()
+val resolver = sessionState.conf.resolver
+columnNames.foreach { col =>
+  val exprOption = relation.resolve(col.split("\\."), resolver)
+  if (exprOption.isEmpty) {
+throw new AnalysisException(s"Invalid column name: $col")
+  }
+  if (validColumns.map(_.exprId).contains(exprOption.get.exprId)) {
+throw new AnalysisException(s"Duplicate column name: $col")
+  }
+  validColumns += exprOption.get
+}
+
+relation match {
+  case catalogRel: CatalogRelation =>
+updateStats(catalogRel.catalogTable,
+  AnalyzeTableCommand.calculateTotalSize(sparkSession, 
catalogRel.catalogTable))
+
+  case logicalRel: LogicalRelation if 
logicalRel.catalogTable.isDefined =>
+updateStats(logicalRel.catalogTable.get, 
logicalRel.relation.sizeInBytes)
+
+  case otherRelation =>
+throw new AnalysisException("ANALYZE TABLE is not supported for " +
+  s"${otherRelation.nodeName}.")
+}
+
+def updateStats(catalogTable: CatalogTable, newTotalSize: Long): Unit 
= {
+  // Collect statistics per column.
+  // The first element in the result will be the overall row count, 
the following elements
+  // will be structs containing all column stats.
+  // The layout of each struct follows the layout of the BasicColStats.
+  val ndvMaxErr = sessionState.conf.ndvMaxError
+  val expressions = Count(Literal(1)).toAggregateExpression() +:
+validColumns.map(ColumnStatsStruct(_, ndvMaxErr))
+  val namedExpressions = expressions.map(e => Alias(e, e.toString)())
+  val statsRow = Dataset.ofRows(sparkSession, Aggregate(Nil, 
namedExpressions, relation))
+.queryExecution.toRdd.collect().head
+
+  // unwrap the result
+  val rowCount = statsRow.getLong(0)
+  val colStats = validColumns.zipWithIndex.map { case (expr, i) =>
+val colInfo = statsRow.getStruct(i + 1, 
ColumnStatsStruct.statsNumber)
+val colStats = ColumnStatsStruct.unwrapRow(expr, colInfo)
+(expr.name, colStats)
+  }.toMap
+
+  val statistics =
+Statistics(sizeInBytes = newTotalSize, rowCount = Some(rowCount), 
basicColStats = colStats)
--- End diff --

I am just not to sure that we should follow an all or nothing 

[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...

2016-09-19 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/15090#discussion_r79489925
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/StatisticsSuite.scala ---
@@ -101,4 +101,47 @@ class StatisticsSuite extends QueryTest with 
SharedSQLContext {
   checkTableStats(tableName, expectedRowCount = Some(2))
 }
   }
+
+  test("test column-level statistics for data source table created in 
InMemoryCatalog") {
+def checkColStats(colStats: BasicColStats, expectedColStats: 
BasicColStats): Unit = {
+  assert(colStats.dataType == expectedColStats.dataType)
+  assert(colStats.numNulls == expectedColStats.numNulls)
+  assert(colStats.max == expectedColStats.max)
+  assert(colStats.min == expectedColStats.min)
+  if (expectedColStats.ndv.isDefined) {
+// ndv is an approximate value, so we just make sure we have the 
value
+assert(colStats.ndv.get >= 0)
--- End diff --

I just think it is better to test this a bit more thoroughly. The HLL++ 
tests use a similar test.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...

2016-09-19 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/15090#discussion_r79441079
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/StatisticsSuite.scala ---
@@ -101,4 +101,47 @@ class StatisticsSuite extends QueryTest with 
SharedSQLContext {
   checkTableStats(tableName, expectedRowCount = Some(2))
 }
   }
+
+  test("test column-level statistics for data source table created in 
InMemoryCatalog") {
+def checkColStats(colStats: BasicColStats, expectedColStats: 
BasicColStats): Unit = {
+  assert(colStats.dataType == expectedColStats.dataType)
+  assert(colStats.numNulls == expectedColStats.numNulls)
+  assert(colStats.max == expectedColStats.max)
+  assert(colStats.min == expectedColStats.min)
+  if (expectedColStats.ndv.isDefined) {
+// ndv is an approximate value, so we just make sure we have the 
value
+assert(colStats.ndv.get >= 0)
--- End diff --

I think I may know what @hvanhovell meant. The standard deviation is the 
parameter we pass to HyperLogLogPlusPlus, so the code may look like this:
```
// ndv is an approximate value, so we make sure we have the value, 
and it should be
// within 3*SD's of the given rsd.
assert(colStats.ndv.get >= 0)
val rsd = spark.sessionState.conf.ndvMaxError
val error = math.abs((colStats.ndv.get / 
expectedColStats.ndv.get.toDouble) - 1.0d)
assert(error <= rsd * 3.0d, "Error should be within 3 std. errors.")
```
What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...

2016-09-19 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/15090#discussion_r79335325
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsColumnSuite.scala 
---
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import java.sql.{Date, Timestamp}
+
+import org.apache.spark.sql.{AnalysisException, Row}
+import org.apache.spark.sql.catalyst.plans.logical.BasicColStats
+import org.apache.spark.sql.execution.command.AnalyzeColumnCommand
+import org.apache.spark.sql.types._
+
+class StatisticsColumnSuite extends StatisticsTest {
+
+  test("parse analyze column commands") {
+val table = "table"
+assertAnalyzeCommand(
+  s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS key, value",
+  classOf[AnalyzeColumnCommand])
+
+val noColumnError = intercept[AnalysisException] {
+  sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS")
+}
+assert(noColumnError.message == "Need to specify the columns to 
analyze. Usage: " +
+  "ANALYZE TABLE tbl COMPUTE STATISTICS FOR COLUMNS key, value")
+
+withTable(table) {
+  sql(s"CREATE TABLE $table (key INT, value STRING)")
+  val invalidColError = intercept[AnalysisException] {
+sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS k")
+  }
+  assert(invalidColError.message == s"Invalid column name: k")
+
+  val duplicateColError = intercept[AnalysisException] {
+sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS key, 
value, key")
+  }
+  assert(duplicateColError.message == s"Duplicate column name: key")
+
+  withSQLConf("spark.sql.caseSensitive" -> "true") {
+val invalidErr = intercept[AnalysisException] {
+  sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS keY")
+}
+assert(invalidErr.message == s"Invalid column name: keY")
+  }
+
+  withSQLConf("spark.sql.caseSensitive" -> "false") {
+val duplicateErr = intercept[AnalysisException] {
+  sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS key, 
value, vaLue")
+}
+assert(duplicateErr.message == s"Duplicate column name: vaLue")
+  }
+}
+  }
+
+  test("basic statistics for integral type columns") {
+val rdd = sparkContext.parallelize(Seq("1", null, "2", "3", null)).map 
{ i =>
+  if (i != null) Row(i.toByte, i.toShort, i.toInt, i.toLong) else 
Row(i, i, i, i)
--- End diff --

@wzhfy I guess he understood `"1", null, "2", "3", null` are the actual 
values for rows. Could we maybe make this easier to read? How about the codes 
below?

```scala
val values = (0 to 5).map { i =>
  if (i % 2 == 0) None else Some(i)
}
val data = values.map { i =>
  (i.map(_.toByte), i.map(_.toShort), i.map(_.toInt), i.map(_.toLong))
}

val df = data.toDF("c1", "c2", "c3", "c4")
val statsSeq = df.schema.map { f =>
  val basicStats = BasicColStats(
dataType = f.dataType,
numNulls = values.count(_.isDefined),
max = values.filter(_.isDefined).max,
min = values.filter(_.isDefined).min,
ndv = Some(values.distinct.length.toLong))
  (f.name, basicStats)
}

checkColStats(df, statsSeq)
```

with importing  `import testImplicits._` right below 
`StatisticsColumnSuite` and then changing `checkColStats`

as below:

```scala
def checkColStats(
df: DataFrame,
expectedColStatsSeq: Seq[(String, BasicColStats)]): Unit = {
  val table = "tbl"
  withTable(table) {
df.write.format("json").saveAsTable(table)
val columns = expectedColStatsSeq.map(_._1).mkString(", ")
sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS $columns")
val readback = sql(s"SELECT * FROM $table")
val stats = 

[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...

2016-09-19 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/15090#discussion_r79332105
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/StatisticsSuite.scala ---
@@ -101,4 +101,47 @@ class StatisticsSuite extends QueryTest with 
SharedSQLContext {
   checkTableStats(tableName, expectedRowCount = Some(2))
 }
   }
+
+  test("test column-level statistics for data source table created in 
InMemoryCatalog") {
+def checkColStats(colStats: BasicColStats, expectedColStats: 
BasicColStats): Unit = {
+  assert(colStats.dataType == expectedColStats.dataType)
+  assert(colStats.numNulls == expectedColStats.numNulls)
+  assert(colStats.max == expectedColStats.max)
+  assert(colStats.min == expectedColStats.min)
+  if (expectedColStats.ndv.isDefined) {
+// ndv is an approximate value, so we just make sure we have the 
value
+assert(colStats.ndv.get >= 0)
--- End diff --

Of course my previous suggestion `0 <= ndv <= count(col)` is also failed 
for this standard. Using sd to check it is more reliable. I am not strong 
against this. But it should be better. See what @hvanhovell and @cloud-fan 
think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...

2016-09-19 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/15090#discussion_r79331978
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/StatisticsSuite.scala ---
@@ -101,4 +101,47 @@ class StatisticsSuite extends QueryTest with 
SharedSQLContext {
   checkTableStats(tableName, expectedRowCount = Some(2))
 }
   }
+
+  test("test column-level statistics for data source table created in 
InMemoryCatalog") {
+def checkColStats(colStats: BasicColStats, expectedColStats: 
BasicColStats): Unit = {
+  assert(colStats.dataType == expectedColStats.dataType)
+  assert(colStats.numNulls == expectedColStats.numNulls)
+  assert(colStats.max == expectedColStats.max)
+  assert(colStats.min == expectedColStats.min)
+  if (expectedColStats.ndv.isDefined) {
+// ndv is an approximate value, so we just make sure we have the 
value
+assert(colStats.ndv.get >= 0)
--- End diff --

Because the check here for estimated ndv value is not really related to the 
actual ndv value, we can't make sure if we really get the estimated ndv back. 
Especially we only use `ndv > 0` to check it. In an extreme case, we can 
replace the ndv expression with a literal(0) in the aggregation, the test would 
not check it out.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...

2016-09-19 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/15090#discussion_r79331719
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/StatisticsSuite.scala ---
@@ -101,4 +101,47 @@ class StatisticsSuite extends QueryTest with 
SharedSQLContext {
   checkTableStats(tableName, expectedRowCount = Some(2))
 }
   }
+
+  test("test column-level statistics for data source table created in 
InMemoryCatalog") {
+def checkColStats(colStats: BasicColStats, expectedColStats: 
BasicColStats): Unit = {
+  assert(colStats.dataType == expectedColStats.dataType)
+  assert(colStats.numNulls == expectedColStats.numNulls)
+  assert(colStats.max == expectedColStats.max)
+  assert(colStats.min == expectedColStats.min)
+  if (expectedColStats.ndv.isDefined) {
+// ndv is an approximate value, so we just make sure we have the 
value
+assert(colStats.ndv.get >= 0)
--- End diff --

I mean `0 <= ndv <= count(col)`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   >