spark git commit: [SPARK-19723][SQL] create datasource table with an non-existent location should work
Repository: spark Updated Branches: refs/heads/master fb9beda54 -> f6fdf92d0 [SPARK-19723][SQL] create datasource table with an non-existent location should work ## What changes were proposed in this pull request? This JIRA is a follow up work after [SPARK-19583](https://issues.apache.org/jira/browse/SPARK-19583) As we discussed in that [PR](https://github.com/apache/spark/pull/16938) The following DDL for datasource table with an non-existent location should work: ``` CREATE TABLE ... (PARTITIONED BY ...) LOCATION path ``` Currently it will throw exception that path not exists for datasource table for datasource table ## How was this patch tested? unit test added Author: windpiger Closes #17055 from windpiger/CTDataSourcePathNotExists. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f6fdf92d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f6fdf92d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f6fdf92d Branch: refs/heads/master Commit: f6fdf92d0dce2cb3340f3e2ff768e09ef69176cd Parents: fb9beda Author: windpiger Authored: Fri Mar 10 20:59:32 2017 -0800 Committer: Wenchen Fan Committed: Fri Mar 10 20:59:32 2017 -0800 -- .../command/createDataSourceTables.scala| 3 +- .../spark/sql/execution/command/DDLSuite.scala | 106 +++--- .../spark/sql/hive/execution/HiveDDLSuite.scala | 111 --- 3 files changed, 115 insertions(+), 105 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f6fdf92d/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index 3da66af..2d89011 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -73,7 +73,8 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo className = table.provider.get, bucketSpec = table.bucketSpec, options = table.storage.properties ++ pathOption, -catalogTable = Some(tableWithDefaultOptions)).resolveRelation() +// As discussed in SPARK-19583, we don't check if the location is existed +catalogTable = Some(tableWithDefaultOptions)).resolveRelation(checkFilesExist = false) val partitionColumnNames = if (table.schema.nonEmpty) { table.partitionColumnNames http://git-wip-us.apache.org/repos/asf/spark/blob/f6fdf92d/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 5f70a8c..0666f44 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -230,7 +230,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { } private def getDBPath(dbName: String): URI = { -val warehousePath = makeQualifiedPath(s"${spark.sessionState.conf.warehousePath}") +val warehousePath = makeQualifiedPath(spark.sessionState.conf.warehousePath) new Path(CatalogUtils.URIToString(warehousePath), s"$dbName.db").toUri } @@ -1899,7 +1899,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { } } - test("insert data to a data source table which has a not existed location should succeed") { + test("insert data to a data source table which has a non-existing location should succeed") { withTable("t") { withTempDir { dir => spark.sql( @@ -1939,7 +1939,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { } } - test("insert into a data source table with no existed partition location should succeed") { + test("insert into a data source table with a non-existing partition location should succeed") { withTable("t") { withTempDir { dir => spark.sql( @@ -1966,7 +1966,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { } } - test("read data from a data source table which has a not existed location should succeed") { + test("read data from a data source table which has a non-existing location should succeed") { withTable("t") { withTempDir { dir => spark.sql( @@ -1994,7 +1994,7 @@ abstract class D
spark git commit: [SPARK-19611][SQL] Introduce configurable table schema inference
Repository: spark Updated Branches: refs/heads/branch-2.1 5a2ad4312 -> e481a7381 [SPARK-19611][SQL] Introduce configurable table schema inference Add a new configuration option that allows Spark SQL to infer a case-sensitive schema from a Hive Metastore table's data files when a case-sensitive schema can't be read from the table properties. - Add spark.sql.hive.caseSensitiveInferenceMode param to SQLConf - Add schemaPreservesCase field to CatalogTable (set to false when schema can't successfully be read from Hive table props) - Perform schema inference in HiveMetastoreCatalog if schemaPreservesCase is false, depending on spark.sql.hive.caseSensitiveInferenceMode - Add alterTableSchema() method to the ExternalCatalog interface - Add HiveSchemaInferenceSuite tests - Refactor and move ParquetFileForamt.meregeMetastoreParquetSchema() as HiveMetastoreCatalog.mergeWithMetastoreSchema - Move schema merging tests from ParquetSchemaSuite to HiveSchemaInferenceSuite [JIRA for this change](https://issues.apache.org/jira/browse/SPARK-19611) The tests in ```HiveSchemaInferenceSuite``` should verify that schema inference is working as expected. ```ExternalCatalogSuite``` has also been extended to cover the new ```alterTableSchema()``` API. Author: Budde Closes #17229 from budde/SPARK-19611-2.1. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e481a738 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e481a738 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e481a738 Branch: refs/heads/branch-2.1 Commit: e481a73819213e4a7919e14e979b79a65098224f Parents: 5a2ad43 Author: Budde Authored: Fri Mar 10 16:38:16 2017 -0800 Committer: Wenchen Fan Committed: Fri Mar 10 16:38:16 2017 -0800 -- .../sql/catalyst/catalog/ExternalCatalog.scala | 15 +- .../sql/catalyst/catalog/InMemoryCatalog.scala | 10 + .../spark/sql/catalyst/catalog/interface.scala | 8 +- .../catalyst/catalog/ExternalCatalogSuite.scala | 15 +- .../sql/catalyst/trees/TreeNodeSuite.scala | 3 +- .../datasources/parquet/ParquetFileFormat.scala | 65 .../org/apache/spark/sql/internal/SQLConf.scala | 22 ++ .../parquet/ParquetSchemaSuite.scala| 82 - .../spark/sql/hive/HiveExternalCatalog.scala| 23 +- .../spark/sql/hive/HiveMetastoreCatalog.scala | 97 +- .../sql/hive/HiveSchemaInferenceSuite.scala | 333 +++ 11 files changed, 513 insertions(+), 160 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e481a738/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala index 78897da..5e83163 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.catalog import org.apache.spark.sql.catalyst.analysis.{FunctionAlreadyExistsException, NoSuchDatabaseException, NoSuchFunctionException, NoSuchTableException} import org.apache.spark.sql.catalyst.expressions.Expression - +import org.apache.spark.sql.types.StructType /** * Interface for the system catalog (of functions, partitions, tables, and databases). @@ -104,6 +104,19 @@ abstract class ExternalCatalog { */ def alterTable(tableDefinition: CatalogTable): Unit + /** + * Alter the schema of a table identified by the provided database and table name. The new schema + * should still contain the existing bucket columns and partition columns used by the table. This + * method will also update any Spark SQL-related parameters stored as Hive table properties (such + * as the schema itself). + * + * @param db Database that table to alter schema for exists in + * @param table Name of table to alter schema for + * @param schema Updated schema to be used for the table (must contain existing partition and + * bucket columns) + */ + def alterTableSchema(db: String, table: String, schema: StructType): Unit + def getTable(db: String, table: String): CatalogTable def getTableOption(db: String, table: String): Option[CatalogTable] http://git-wip-us.apache.org/repos/asf/spark/blob/e481a738/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/m
spark git commit: [SPARK-19893][SQL] should not run DataFrame set oprations with map type
Repository: spark Updated Branches: refs/heads/branch-2.0 c561e6cfa -> e8426cb5a [SPARK-19893][SQL] should not run DataFrame set oprations with map type In spark SQL, map type can't be used in equality test/comparison, and `Intersect`/`Except`/`Distinct` do need equality test for all columns, we should not allow map type in `Intersect`/`Except`/`Distinct`. new regression test Author: Wenchen Fan Closes #17236 from cloud-fan/map. (cherry picked from commit fb9beda54622e0c3190c6504fc468fa4e50eeb45) Signed-off-by: Wenchen Fan Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e8426cb5 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e8426cb5 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e8426cb5 Branch: refs/heads/branch-2.0 Commit: e8426cb5a11b9779d97bd552817b1314d88c7e81 Parents: c561e6c Author: Wenchen Fan Authored: Fri Mar 10 16:14:22 2017 -0800 Committer: Wenchen Fan Committed: Fri Mar 10 16:33:26 2017 -0800 -- .../sql/catalyst/analysis/CheckAnalysis.scala | 21 ++-- .../org/apache/spark/sql/DataFrameSuite.scala | 15 ++ .../columnar/InMemoryColumnarQuerySuite.scala | 14 ++--- 3 files changed, 40 insertions(+), 10 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e8426cb5/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 2fe34e5..3ae81d6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -21,7 +21,6 @@ import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.catalog.SimpleCatalogRelation import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression -import org.apache.spark.sql.catalyst.plans.UsingJoin import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.types._ @@ -46,6 +45,16 @@ trait CheckAnalysis extends PredicateHelper { }).length > 1 } + protected def hasMapType(dt: DataType): Boolean = { +dt.existsRecursively(_.isInstanceOf[MapType]) + } + + protected def mapColumnInSetOperation(plan: LogicalPlan): Option[Attribute] = plan match { +case _: Intersect | _: Except | _: Distinct => + plan.output.find(a => hasMapType(a.dataType)) +case _ => None + } + private def checkLimitClause(limitExpr: Expression): Unit = { limitExpr match { case e if !e.foldable => failAnalysis( @@ -171,7 +180,7 @@ trait CheckAnalysis extends PredicateHelper { s"filter expression '${f.condition.sql}' " + s"of type ${f.condition.dataType.simpleString} is not a boolean.") - case f @ Filter(condition, child) => + case Filter(condition, _) => splitConjunctivePredicates(condition).foreach { case _: PredicateSubquery | Not(_: PredicateSubquery) => case e if PredicateSubquery.hasNullAwarePredicateWithinNot(e) => @@ -364,6 +373,14 @@ trait CheckAnalysis extends PredicateHelper { failAnalysis( s"unresolved operator ${operator.simpleString}") + // TODO: although map type is not orderable, technically map type should be able to be + // used in equality comparison, remove this type check once we support it. + case o if mapColumnInSetOperation(o).isDefined => +val mapCol = mapColumnInSetOperation(o).get +failAnalysis("Cannot have map type columns in DataFrame which calls " + + s"set operations(intersect, except, etc.), but the type of column ${mapCol.name} " + + "is " + mapCol.dataType.simpleString) + case o if o.expressions.exists(!_.deterministic) && !o.isInstanceOf[Project] && !o.isInstanceOf[Filter] && !o.isInstanceOf[Aggregate] && !o.isInstanceOf[Window] => http://git-wip-us.apache.org/repos/asf/spark/blob/e8426cb5/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 623f5eb..6a9279f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -1686,4 +1686,19
spark git commit: [SPARK-19893][SQL] should not run DataFrame set oprations with map type
Repository: spark Updated Branches: refs/heads/branch-2.1 f0d50fd54 -> 5a2ad4312 [SPARK-19893][SQL] should not run DataFrame set oprations with map type In spark SQL, map type can't be used in equality test/comparison, and `Intersect`/`Except`/`Distinct` do need equality test for all columns, we should not allow map type in `Intersect`/`Except`/`Distinct`. new regression test Author: Wenchen Fan Closes #17236 from cloud-fan/map. (cherry picked from commit fb9beda54622e0c3190c6504fc468fa4e50eeb45) Signed-off-by: Wenchen Fan Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5a2ad431 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5a2ad431 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5a2ad431 Branch: refs/heads/branch-2.1 Commit: 5a2ad4312dd00a450eac49ce53d70d9541e9e4cb Parents: f0d50fd Author: Wenchen Fan Authored: Fri Mar 10 16:14:22 2017 -0800 Committer: Wenchen Fan Committed: Fri Mar 10 16:30:42 2017 -0800 -- .../sql/catalyst/analysis/CheckAnalysis.scala | 24 .../org/apache/spark/sql/DataFrameSuite.scala | 16 + .../columnar/InMemoryColumnarQuerySuite.scala | 14 +--- 3 files changed, 42 insertions(+), 12 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/5a2ad431/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 65a2a7b..f7109f4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -21,7 +21,6 @@ import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.catalog.SimpleCatalogRelation import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression -import org.apache.spark.sql.catalyst.plans.UsingJoin import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.types._ @@ -46,6 +45,16 @@ trait CheckAnalysis extends PredicateHelper { }).length > 1 } + protected def hasMapType(dt: DataType): Boolean = { +dt.existsRecursively(_.isInstanceOf[MapType]) + } + + protected def mapColumnInSetOperation(plan: LogicalPlan): Option[Attribute] = plan match { +case _: Intersect | _: Except | _: Distinct => + plan.output.find(a => hasMapType(a.dataType)) +case _ => None + } + private def checkLimitClause(limitExpr: Expression): Unit = { limitExpr match { case e if !e.foldable => failAnalysis( @@ -123,8 +132,7 @@ trait CheckAnalysis extends PredicateHelper { if (conditions.isEmpty && query.output.size != 1) { failAnalysis( s"Scalar subquery must return only one column, but got ${query.output.size}") -} -else if (conditions.nonEmpty) { +} else if (conditions.nonEmpty) { // Collect the columns from the subquery for further checking. var subqueryColumns = conditions.flatMap(_.references).filter(query.output.contains) @@ -202,7 +210,7 @@ trait CheckAnalysis extends PredicateHelper { s"filter expression '${f.condition.sql}' " + s"of type ${f.condition.dataType.simpleString} is not a boolean.") - case f @ Filter(condition, child) => + case Filter(condition, _) => splitConjunctivePredicates(condition).foreach { case _: PredicateSubquery | Not(_: PredicateSubquery) => case e if PredicateSubquery.hasNullAwarePredicateWithinNot(e) => @@ -376,6 +384,14 @@ trait CheckAnalysis extends PredicateHelper { |Conflicting attributes: ${conflictingAttributes.mkString(",")} """.stripMargin) + // TODO: although map type is not orderable, technically map type should be able to be + // used in equality comparison, remove this type check once we support it. + case o if mapColumnInSetOperation(o).isDefined => +val mapCol = mapColumnInSetOperation(o).get +failAnalysis("Cannot have map type columns in DataFrame which calls " + + s"set operations(intersect, except, etc.), but the type of column ${mapCol.name} " + + "is " + mapCol.dataType.simpleString) + case s: SimpleCatalogRelation => failAnalysis( s""" http://git-wip-us.apache.org/repos/asf/spark/blob/5a2ad431/sql/core/src/test/scal
spark git commit: [SPARK-19893][SQL] should not run DataFrame set oprations with map type
Repository: spark Updated Branches: refs/heads/master ffee4f1ce -> fb9beda54 [SPARK-19893][SQL] should not run DataFrame set oprations with map type ## What changes were proposed in this pull request? In spark SQL, map type can't be used in equality test/comparison, and `Intersect`/`Except`/`Distinct` do need equality test for all columns, we should not allow map type in `Intersect`/`Except`/`Distinct`. ## How was this patch tested? new regression test Author: Wenchen Fan Closes #17236 from cloud-fan/map. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fb9beda5 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fb9beda5 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fb9beda5 Branch: refs/heads/master Commit: fb9beda54622e0c3190c6504fc468fa4e50eeb45 Parents: ffee4f1 Author: Wenchen Fan Authored: Fri Mar 10 16:14:22 2017 -0800 Committer: Wenchen Fan Committed: Fri Mar 10 16:14:22 2017 -0800 -- .../sql/catalyst/analysis/CheckAnalysis.scala | 25 +--- .../org/apache/spark/sql/DataFrameSuite.scala | 19 +++ .../columnar/InMemoryColumnarQuerySuite.scala | 14 +-- 3 files changed, 47 insertions(+), 11 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/fb9beda5/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 7529f90..d32fbeb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -44,6 +44,18 @@ trait CheckAnalysis extends PredicateHelper { }).length > 1 } + protected def hasMapType(dt: DataType): Boolean = { +dt.existsRecursively(_.isInstanceOf[MapType]) + } + + protected def mapColumnInSetOperation(plan: LogicalPlan): Option[Attribute] = plan match { +case _: Intersect | _: Except | _: Distinct => + plan.output.find(a => hasMapType(a.dataType)) +case d: Deduplicate => + d.keys.find(a => hasMapType(a.dataType)) +case _ => None + } + private def checkLimitClause(limitExpr: Expression): Unit = { limitExpr match { case e if !e.foldable => failAnalysis( @@ -121,8 +133,7 @@ trait CheckAnalysis extends PredicateHelper { if (conditions.isEmpty && query.output.size != 1) { failAnalysis( s"Scalar subquery must return only one column, but got ${query.output.size}") -} -else if (conditions.nonEmpty) { +} else if (conditions.nonEmpty) { // Collect the columns from the subquery for further checking. var subqueryColumns = conditions.flatMap(_.references).filter(query.output.contains) @@ -200,7 +211,7 @@ trait CheckAnalysis extends PredicateHelper { s"filter expression '${f.condition.sql}' " + s"of type ${f.condition.dataType.simpleString} is not a boolean.") - case f @ Filter(condition, child) => + case Filter(condition, _) => splitConjunctivePredicates(condition).foreach { case _: PredicateSubquery | Not(_: PredicateSubquery) => case e if PredicateSubquery.hasNullAwarePredicateWithinNot(e) => @@ -374,6 +385,14 @@ trait CheckAnalysis extends PredicateHelper { |Conflicting attributes: ${conflictingAttributes.mkString(",")} """.stripMargin) + // TODO: although map type is not orderable, technically map type should be able to be + // used in equality comparison, remove this type check once we support it. + case o if mapColumnInSetOperation(o).isDefined => +val mapCol = mapColumnInSetOperation(o).get +failAnalysis("Cannot have map type columns in DataFrame which calls " + + s"set operations(intersect, except, etc.), but the type of column ${mapCol.name} " + + "is " + mapCol.dataType.simpleString) + case o if o.expressions.exists(!_.deterministic) && !o.isInstanceOf[Project] && !o.isInstanceOf[Filter] && !o.isInstanceOf[Aggregate] && !o.isInstanceOf[Window] => http://git-wip-us.apache.org/repos/asf/spark/blob/fb9beda5/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/Data
spark git commit: [SPARK-19905][SQL] Bring back Dataset.inputFiles for Hive SerDe tables
Repository: spark Updated Branches: refs/heads/master bc3035140 -> ffee4f1ce [SPARK-19905][SQL] Bring back Dataset.inputFiles for Hive SerDe tables ## What changes were proposed in this pull request? `Dataset.inputFiles` works by matching `FileRelation`s in the query plan. In Spark 2.1, Hive SerDe tables are represented by `MetastoreRelation`, which inherits from `FileRelation`. However, in Spark 2.2, Hive SerDe tables are now represented by `CatalogRelation`, which doesn't inherit from `FileRelation` anymore, due to the unification of Hive SerDe tables and data source tables. This change breaks `Dataset.inputFiles` for Hive SerDe tables. This PR tries to fix this issue by explicitly matching `CatalogRelation`s that are Hive SerDe tables in `Dataset.inputFiles`. Note that we can't make `CatalogRelation` inherit from `FileRelation` since not all `CatalogRelation`s are file based (e.g., JDBC data source tables). ## How was this patch tested? New test case added in `HiveDDLSuite`. Author: Cheng Lian Closes #17247 from liancheng/spark-19905-hive-table-input-files. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ffee4f1c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ffee4f1c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ffee4f1c Branch: refs/heads/master Commit: ffee4f1cefb0dfd8d9145ee3be82c6f7b799870b Parents: bc30351 Author: Cheng Lian Authored: Fri Mar 10 15:19:32 2017 -0800 Committer: Wenchen Fan Committed: Fri Mar 10 15:19:32 2017 -0800 -- .../src/main/scala/org/apache/spark/sql/Dataset.scala| 3 +++ .../apache/spark/sql/hive/execution/HiveDDLSuite.scala | 11 +++ 2 files changed, 14 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ffee4f1c/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 0a4d3a9..520663f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -36,6 +36,7 @@ import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst._ import org.apache.spark.sql.catalyst.analysis._ +import org.apache.spark.sql.catalyst.catalog.CatalogRelation import org.apache.spark.sql.catalyst.encoders._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ @@ -2734,6 +2735,8 @@ class Dataset[T] private[sql]( fsBasedRelation.inputFiles case fr: FileRelation => fr.inputFiles + case r: CatalogRelation if DDLUtils.isHiveTable(r.tableMeta) => +r.tableMeta.storage.locationUri.map(_.toString).toArray }.flatten files.toSet.toArray } http://git-wip-us.apache.org/repos/asf/spark/blob/ffee4f1c/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala -- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 23aea24..79ad156 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -1865,4 +1865,15 @@ class HiveDDLSuite } } } + + test("SPARK-19905: Hive SerDe table input paths") { +withTable("spark_19905") { + withTempView("spark_19905_view") { +spark.range(10).createOrReplaceTempView("spark_19905_view") +sql("CREATE TABLE spark_19905 STORED AS RCFILE AS SELECT * FROM spark_19905_view") +assert(spark.table("spark_19905").inputFiles.nonEmpty) +assert(sql("SELECT input_file_name() FROM spark_19905").count() > 0) + } +} + } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-19611][SQL] Preserve metastore field order when merging inferred schema
Repository: spark Updated Branches: refs/heads/master 8f0490e22 -> bc3035140 [SPARK-19611][SQL] Preserve metastore field order when merging inferred schema ## What changes were proposed in this pull request? The ```HiveMetastoreCatalog.mergeWithMetastoreSchema()``` method added in #16944 may not preserve the same field order as the metastore schema in some cases, which can cause queries to fail. This change ensures that the metastore field order is preserved. ## How was this patch tested? A test for ensuring that metastore order is preserved was added to ```HiveSchemaInferenceSuite.``` The particular failure usecase from #16944 was tested manually as well. Author: Budde Closes #17249 from budde/PreserveMetastoreFieldOrder. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bc303514 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bc303514 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bc303514 Branch: refs/heads/master Commit: bc30351404d8bc610cbae65fdc12ca613e7735c6 Parents: 8f0490e Author: Budde Authored: Fri Mar 10 15:18:37 2017 -0800 Committer: Wenchen Fan Committed: Fri Mar 10 15:18:37 2017 -0800 -- .../spark/sql/hive/HiveMetastoreCatalog.scala | 5 + .../sql/hive/HiveSchemaInferenceSuite.scala | 21 2 files changed, 22 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/bc303514/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala -- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 056af49..9f0d1ce 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -356,13 +356,10 @@ private[hive] object HiveMetastoreCatalog { .filterKeys(!inferredSchema.map(_.name.toLowerCase).contains(_)) .values .filter(_.nullable) - // Merge missing nullable fields to inferred schema and build a case-insensitive field map. val inferredFields = StructType(inferredSchema ++ missingNullables) .map(f => f.name.toLowerCase -> f).toMap -StructType(metastoreFields.map { case(name, field) => - field.copy(name = inferredFields(name).name) -}.toSeq) +StructType(metastoreSchema.map(f => f.copy(name = inferredFields(f.name).name))) } catch { case NonFatal(_) => val msg = s"""Detected conflicting schemas when merging the schema obtained from the Hive http://git-wip-us.apache.org/repos/asf/spark/blob/bc303514/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSchemaInferenceSuite.scala -- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSchemaInferenceSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSchemaInferenceSuite.scala index 7895580..e48ce23 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSchemaInferenceSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSchemaInferenceSuite.scala @@ -293,6 +293,27 @@ class HiveSchemaInferenceSuite StructField("firstField", StringType, nullable = true), StructField("secondField", StringType, nullable = true }.getMessage.contains("Detected conflicting schemas")) + +// Schema merge should maintain metastore order. +assertResult( + StructType(Seq( +StructField("first_field", StringType, nullable = true), +StructField("second_field", StringType, nullable = true), +StructField("third_field", StringType, nullable = true), +StructField("fourth_field", StringType, nullable = true), +StructField("fifth_field", StringType, nullable = true { + HiveMetastoreCatalog.mergeWithMetastoreSchema( +StructType(Seq( + StructField("first_field", StringType, nullable = true), + StructField("second_field", StringType, nullable = true), + StructField("third_field", StringType, nullable = true), + StructField("fourth_field", StringType, nullable = true), + StructField("fifth_field", StringType, nullable = true))), +StructType(Seq( + StructField("fifth_field", StringType, nullable = true), + StructField("third_field", StringType, nullable = true), + StructField("second_field", StringType, nullable = true +} } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: c
spark git commit: [SPARK-17979][SPARK-14453] Remove deprecated SPARK_YARN_USER_ENV and SPARK_JAVA_OPTS
Repository: spark Updated Branches: refs/heads/master dd9049e04 -> 8f0490e22 [SPARK-17979][SPARK-14453] Remove deprecated SPARK_YARN_USER_ENV and SPARK_JAVA_OPTS This fix removes deprecated support for config `SPARK_YARN_USER_ENV`, as is mentioned in SPARK-17979. This fix also removes deprecated support for the following: ``` SPARK_YARN_USER_ENV SPARK_JAVA_OPTS SPARK_CLASSPATH SPARK_WORKER_INSTANCES ``` Related JIRA: [SPARK-14453]: https://issues.apache.org/jira/browse/SPARK-14453 [SPARK-12344]: https://issues.apache.org/jira/browse/SPARK-12344 [SPARK-15781]: https://issues.apache.org/jira/browse/SPARK-15781 Existing tests should pass. Author: Yong Tang Closes #17212 from yongtang/SPARK-17979. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8f0490e2 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8f0490e2 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8f0490e2 Branch: refs/heads/master Commit: 8f0490e22b4c7f1fdf381c70c5894d46b7f7e6fb Parents: dd9049e Author: Yong Tang Authored: Fri Mar 10 13:33:58 2017 -0800 Committer: Marcelo Vanzin Committed: Fri Mar 10 13:34:01 2017 -0800 -- conf/spark-env.sh.template | 3 - .../main/scala/org/apache/spark/SparkConf.scala | 65 .../spark/deploy/FaultToleranceTest.scala | 3 +- .../spark/launcher/WorkerCommandBuilder.scala | 1 - docs/rdd-programming-guide.md | 2 +- .../spark/launcher/AbstractCommandBuilder.java | 1 - .../launcher/SparkClassCommandBuilder.java | 2 - .../launcher/SparkSubmitCommandBuilder.java | 1 - .../MesosCoarseGrainedSchedulerBackend.scala| 5 -- .../MesosFineGrainedSchedulerBackend.scala | 4 -- .../org/apache/spark/deploy/yarn/Client.scala | 39 +--- .../spark/deploy/yarn/ExecutorRunnable.scala| 8 --- 12 files changed, 3 insertions(+), 131 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/8f0490e2/conf/spark-env.sh.template -- diff --git a/conf/spark-env.sh.template b/conf/spark-env.sh.template index 5c1e876..94bd2c4 100755 --- a/conf/spark-env.sh.template +++ b/conf/spark-env.sh.template @@ -25,12 +25,10 @@ # - HADOOP_CONF_DIR, to point Spark towards Hadoop configuration files # - SPARK_LOCAL_IP, to set the IP address Spark binds to on this node # - SPARK_PUBLIC_DNS, to set the public dns name of the driver program -# - SPARK_CLASSPATH, default classpath entries to append # Options read by executors and drivers running inside the cluster # - SPARK_LOCAL_IP, to set the IP address Spark binds to on this node # - SPARK_PUBLIC_DNS, to set the public DNS name of the driver program -# - SPARK_CLASSPATH, default classpath entries to append # - SPARK_LOCAL_DIRS, storage directories to use on this node for shuffle and RDD data # - MESOS_NATIVE_JAVA_LIBRARY, to point to your libmesos.so if you use Mesos @@ -48,7 +46,6 @@ # - SPARK_WORKER_CORES, to set the number of cores to use on this machine # - SPARK_WORKER_MEMORY, to set how much total memory workers have to give executors (e.g. 1000m, 2g) # - SPARK_WORKER_PORT / SPARK_WORKER_WEBUI_PORT, to use non-default ports for the worker -# - SPARK_WORKER_INSTANCES, to set the number of worker processes per node # - SPARK_WORKER_DIR, to set the working directory of worker processes # - SPARK_WORKER_OPTS, to set config properties only for the worker (e.g. "-Dx=y") # - SPARK_DAEMON_MEMORY, to allocate to the master, worker and history server themselves (default: 1g). http://git-wip-us.apache.org/repos/asf/spark/blob/8f0490e2/core/src/main/scala/org/apache/spark/SparkConf.scala -- diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index fe912e6..2a2ce05 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -518,71 +518,6 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria } } -// Check for legacy configs -sys.env.get("SPARK_JAVA_OPTS").foreach { value => - val warning = -s""" - |SPARK_JAVA_OPTS was detected (set to '$value'). - |This is deprecated in Spark 1.0+. - | - |Please instead use: - | - ./spark-submit with conf/spark-defaults.conf to set defaults for an application - | - ./spark-submit with --driver-java-options to set -X options for a driver - | - spark.executor.extraJavaOptions to set -X options for executors - | - SPARK_DAEMON_JAVA_OPTS to set java options for standalone daemons (master or worker)
spark git commit: [SPARK-19620][SQL] Fix incorrect exchange coordinator id in the physical plan
Repository: spark Updated Branches: refs/heads/master fcb68e0f5 -> dd9049e04 [SPARK-19620][SQL] Fix incorrect exchange coordinator id in the physical plan ## What changes were proposed in this pull request? When adaptive execution is enabled, an exchange coordinator is used in the Exchange operators. For Join, the same exchange coordinator is used for its two Exchanges. But the physical plan shows two different coordinator Ids which is confusing. This PR is to fix the incorrect exchange coordinator id in the physical plan. The coordinator object instead of the `Option[ExchangeCoordinator]` should be used to generate the identity hash code of the same coordinator. ## How was this patch tested? Before the patch, the physical plan shows two different exchange coordinator id for Join. ``` == Physical Plan == *Project [key1#3L, value2#12L] +- *SortMergeJoin [key1#3L], [key2#11L], Inner :- *Sort [key1#3L ASC NULLS FIRST], false, 0 : +- Exchange(coordinator id: 1804587700) hashpartitioning(key1#3L, 10), coordinator[target post-shuffle partition size: 67108864] : +- *Project [(id#0L % 500) AS key1#3L] :+- *Filter isnotnull((id#0L % 500)) : +- *Range (0, 1000, step=1, splits=Some(10)) +- *Sort [key2#11L ASC NULLS FIRST], false, 0 +- Exchange(coordinator id: 793927319) hashpartitioning(key2#11L, 10), coordinator[target post-shuffle partition size: 67108864] +- *Project [(id#8L % 500) AS key2#11L, id#8L AS value2#12L] +- *Filter isnotnull((id#8L % 500)) +- *Range (0, 1000, step=1, splits=Some(10)) ``` After the patch, two exchange coordinator id are the same. Author: Carson Wang Closes #16952 from carsonwang/FixCoordinatorId. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dd9049e0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dd9049e0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dd9049e0 Branch: refs/heads/master Commit: dd9049e0492cc70b629518fee9b3d1632374c612 Parents: fcb68e0 Author: Carson Wang Authored: Fri Mar 10 11:13:26 2017 -0800 Committer: Yin Huai Committed: Fri Mar 10 11:13:26 2017 -0800 -- .../org/apache/spark/sql/execution/exchange/ShuffleExchange.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/dd9049e0/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala index 125a493..f06544e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala @@ -46,7 +46,7 @@ case class ShuffleExchange( override def nodeName: String = { val extraInfo = coordinator match { case Some(exchangeCoordinator) => -s"(coordinator id: ${System.identityHashCode(coordinator)})" +s"(coordinator id: ${System.identityHashCode(exchangeCoordinator)})" case None => "" } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-19786][SQL] Facilitate loop optimizations in a JIT compiler regarding range()
Repository: spark Updated Branches: refs/heads/master 501b71119 -> fcb68e0f5 [SPARK-19786][SQL] Facilitate loop optimizations in a JIT compiler regarding range() ## What changes were proposed in this pull request? This PR improves performance of operations with `range()` by changing Java code generated by Catalyst. This PR is inspired by the [blog article](https://databricks.com/blog/2017/02/16/processing-trillion-rows-per-second-single-machine-can-nested-loop-joins-fast.html). This PR changes generated code in the following two points. 1. Replace a while-loop with long instance variables a for-loop with int local varibles 2. Suppress generation of `shouldStop()` method if this method is unnecessary (e.g. `append()` is not generated). These points facilitates compiler optimizations in a JIT compiler by feeding the simplified Java code into the JIT compiler. The performance is improved by 7.6x. Benchmark program: ```java val N = 1 << 29 val iters = 2 val benchmark = new Benchmark("range.count", N * iters) benchmark.addCase(s"with this PR") { i => var n = 0 var len = 0 while (n < iters) { len += sparkSession.range(N).selectExpr("count(id)").collect.length n += 1 } } benchmark.run ``` Performance result without this PR ``` OpenJDK 64-Bit Server VM 1.8.0_111-8u111-b14-2ubuntu0.16.04.2-b14 on Linux 4.4.0-47-generic Intel(R) Xeon(R) CPU E5-2667 v3 3.20GHz range.count: Best/Avg Time(ms)Rate(M/s) Per Row(ns) Relative w/o this PR 1349 / 1356796.2 1.3 1.0X ``` Performance result with this PR ``` OpenJDK 64-Bit Server VM 1.8.0_111-8u111-b14-2ubuntu0.16.04.2-b14 on Linux 4.4.0-47-generic Intel(R) Xeon(R) CPU E5-2667 v3 3.20GHz range.count: Best/Avg Time(ms)Rate(M/s) Per Row(ns) Relative with this PR 177 / 271 6065.3 0.2 1.0X ``` Here is a comparison between generated code w/o and with this PR. Only the method ```agg_doAggregateWithoutKey``` is changed. Generated code without this PR ```java /* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator { /* 006 */ private Object[] references; /* 007 */ private scala.collection.Iterator[] inputs; /* 008 */ private boolean agg_initAgg; /* 009 */ private boolean agg_bufIsNull; /* 010 */ private long agg_bufValue; /* 011 */ private org.apache.spark.sql.execution.metric.SQLMetric range_numOutputRows; /* 012 */ private org.apache.spark.sql.execution.metric.SQLMetric range_numGeneratedRows; /* 013 */ private boolean range_initRange; /* 014 */ private long range_number; /* 015 */ private TaskContext range_taskContext; /* 016 */ private InputMetrics range_inputMetrics; /* 017 */ private long range_batchEnd; /* 018 */ private long range_numElementsTodo; /* 019 */ private scala.collection.Iterator range_input; /* 020 */ private UnsafeRow range_result; /* 021 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder range_holder; /* 022 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter range_rowWriter; /* 023 */ private org.apache.spark.sql.execution.metric.SQLMetric agg_numOutputRows; /* 024 */ private org.apache.spark.sql.execution.metric.SQLMetric agg_aggTime; /* 025 */ private UnsafeRow agg_result; /* 026 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder agg_holder; /* 027 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter; /* 028 */ /* 029 */ public GeneratedIterator(Object[] references) { /* 030 */ this.references = references; /* 031 */ } /* 032 */ /* 033 */ public void init(int index, scala.collection.Iterator[] inputs) { /* 034 */ partitionIndex = index; /* 035 */ this.inputs = inputs; /* 036 */ agg_initAgg = false; /* 037 */ /* 038 */ this.range_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[0]; /* 039 */ this.range_numGeneratedRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[1]; /* 040 */ range_initRange = false; /* 041 */ range_number = 0L; /* 042 */ range_taskContext = TaskContext.get(); /* 043 */ range_inputMetrics = range_taskContext.taskMetrics().inputMetrics(); /* 044 */ range_batchEnd = 0; /* 045 */ range_numElementsTodo = 0L; /* 046 */ range_input = inputs[0]; /* 047 */ range_result = new UnsafeRow(1); /* 048 */ this.range_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(range_result, 0); /* 049 */ this.range_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(range_holder