[GitHub] spark pull request #16233: [SPARK-18801][SQL] Add `View` operator to help re...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/16233#discussion_r93898801 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala --- @@ -860,6 +864,24 @@ abstract class CatalogTestUtils { bucketSpec = Some(BucketSpec(4, Seq("col1"), Nil))) } + def newView(name: String, database: Option[String] = None): CatalogTable = { +CatalogTable( + identifier = TableIdentifier(name, database), + tableType = CatalogTableType.VIEW, + storage = CatalogStorageFormat.empty, + schema = new StructType() +.add("col1", "int") +.add("col2", "string") +.add("a", "int") +.add("b", "string"), + provider = Some("hive"), --- End diff -- Do we need a case that the provider is not set? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16233: [SPARK-18801][SQL] Add `View` operator to help re...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/16233#discussion_r93898653 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala --- @@ -551,17 +551,26 @@ class SessionCatalog( * * If a database is specified in `name`, this will return the table/view from that database. * If no database is specified, this will first attempt to return a temporary table/view with - * the same name, then, if that does not exist, return the table/view from the current database. + * the same name, then, if that does not exist, and defaultDatabase is defined, return the + * table/view from the defaultDatabase, else return the table/view from the catalog.currentDb. * * Note that, the global temp view database is also valid here, this will return the global temp * view matching the given name. * - * If the relation is a view, the relation will be wrapped in a [[SubqueryAlias]] which will - * track the name of the view. + * If the relation is a view, we generate a [[View]] operator from the view description, and + * wrap the logical plan in a [[SubqueryAlias]] which will track the name of the view. + * + * @param name The name of the table/view that we lookup. + * @param alias The alias name of the table/view that we lookup. + * @param defaultDatabase The database name we should use to lookup the table/view, if the + *database part of [[TableIdentifier]] is not defined. --- End diff -- Let's also explain the precedence (db name in the table identifier -> default db -> current db). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16233: [SPARK-18801][SQL] Add `View` operator to help re...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/16233#discussion_r93898562 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -658,6 +720,17 @@ class Analyzer( Generate(newG.asInstanceOf[Generator], join, outer, qualifier, output, child) } + // A special case for View, replace the output attributes with the attributes that have the + // same names from the child. If the corresponding attribute is not found, throw an + // AnalysisException. + // TODO: Also check the dataTypes and nullabilites of the output. --- End diff -- Can we also explain why we need to replace output attributes in the code comment? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16233: [SPARK-18801][SQL] Add `View` operator to help re...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/16233#discussion_r93898349 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -510,32 +510,94 @@ class Analyzer( * Replaces [[UnresolvedRelation]]s with concrete relations from the catalog. */ object ResolveRelations extends Rule[LogicalPlan] { -private def lookupTableFromCatalog(u: UnresolvedRelation): LogicalPlan = { + +def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { + case i @ InsertIntoTable(u: UnresolvedRelation, parts, child, _, _) if child.resolved => +i.copy(table = EliminateSubqueryAliases(lookupTableFromCatalog(u))) + case u: UnresolvedRelation => resolveRelation(u) +} + +// If the unresolved relation is running directly on files, we just return the original +// UnresolvedRelation, the plan will get resolved later. Else we look up the table from catalog +// and change the default database name if it is a view. +// +// Note this is compatible with the views defined by older versions of Spark(before 2.2), which +// have empty defaultDatabase and all the relations in viewText have database part defined. +def resolveRelation( +plan: LogicalPlan, +defaultDatabase: Option[String] = None): LogicalPlan = plan match { + case u @ UnresolvedRelation(table: TableIdentifier, _) if isRunningDirectlyOnFiles(table) => +u + case u: UnresolvedRelation => +resolveView(lookupTableFromCatalog(u, defaultDatabase)) +} + +// Look up the table with the given name from catalog. If `defaultDatabase` is set, we look up +// the table in the database `defaultDatabase`, else we follow the default way. +private def lookupTableFromCatalog( +u: UnresolvedRelation, +defaultDatabase: Option[String] = None): LogicalPlan = { try { -catalog.lookupRelation(u.tableIdentifier, u.alias) +catalog.lookupRelation(u.tableIdentifier, u.alias, defaultDatabase) } catch { case _: NoSuchTableException => u.failAnalysis(s"Table or view not found: ${u.tableName}") } } -def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { - case i @ InsertIntoTable(u: UnresolvedRelation, parts, child, _, _) if child.resolved => -i.copy(table = EliminateSubqueryAliases(lookupTableFromCatalog(u))) - case u: UnresolvedRelation => -val table = u.tableIdentifier -if (table.database.isDefined && conf.runSQLonFile && !catalog.isTemporaryTable(table) && -(!catalog.databaseExists(table.database.get) || !catalog.tableExists(table))) { - // If the database part is specified, and we support running SQL directly on files, and - // it's not a temporary view, and the table does not exist, then let's just return the - // original UnresolvedRelation. It is possible we are matching a query like "select * - // from parquet.`/path/to/query`". The plan will get resolved later. - // Note that we are testing (!db_exists || !table_exists) because the catalog throws - // an exception from tableExists if the database does not exist. - u -} else { - lookupTableFromCatalog(u) +// If the database part is specified, and we support running SQL directly on files, and +// it's not a temporary view, and the table does not exist, then let's just return the +// original UnresolvedRelation. It is possible we are matching a query like "select * +// from parquet.`/path/to/query`". The plan will get resolved later. +// Note that we are testing (!db_exists || !table_exists) because the catalog throws +// an exception from tableExists if the database does not exist. +private def isRunningDirectlyOnFiles(table: TableIdentifier): Boolean = { + table.database.isDefined && conf.runSQLonFile && !catalog.isTemporaryTable(table) && +(!catalog.databaseExists(table.database.get) || !catalog.tableExists(table)) +} + +// Change the default database name if the plan is a view, and transformDown with the new +// database name to resolve all UnresolvedRelations and Views. +// If the view is defined in a DataSource other than Hive, and the view's child is empty, +// set the view's child to a SimpleCatalogRelation, else throw an AnalysisException. +def resolveView(plan: LogicalPlan): LogicalPlan = plan match
[GitHub] spark pull request #16233: [SPARK-18801][SQL] Add `View` operator to help re...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/16233#discussion_r93898300 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -510,32 +510,94 @@ class Analyzer( * Replaces [[UnresolvedRelation]]s with concrete relations from the catalog. */ object ResolveRelations extends Rule[LogicalPlan] { -private def lookupTableFromCatalog(u: UnresolvedRelation): LogicalPlan = { + +def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { + case i @ InsertIntoTable(u: UnresolvedRelation, parts, child, _, _) if child.resolved => +i.copy(table = EliminateSubqueryAliases(lookupTableFromCatalog(u))) + case u: UnresolvedRelation => resolveRelation(u) +} + +// If the unresolved relation is running directly on files, we just return the original +// UnresolvedRelation, the plan will get resolved later. Else we look up the table from catalog +// and change the default database name if it is a view. +// +// Note this is compatible with the views defined by older versions of Spark(before 2.2), which +// have empty defaultDatabase and all the relations in viewText have database part defined. +def resolveRelation( +plan: LogicalPlan, +defaultDatabase: Option[String] = None): LogicalPlan = plan match { + case u @ UnresolvedRelation(table: TableIdentifier, _) if isRunningDirectlyOnFiles(table) => +u + case u: UnresolvedRelation => +resolveView(lookupTableFromCatalog(u, defaultDatabase)) +} + +// Look up the table with the given name from catalog. If `defaultDatabase` is set, we look up +// the table in the database `defaultDatabase`, else we follow the default way. +private def lookupTableFromCatalog( +u: UnresolvedRelation, +defaultDatabase: Option[String] = None): LogicalPlan = { try { -catalog.lookupRelation(u.tableIdentifier, u.alias) +catalog.lookupRelation(u.tableIdentifier, u.alias, defaultDatabase) } catch { case _: NoSuchTableException => u.failAnalysis(s"Table or view not found: ${u.tableName}") } } -def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { - case i @ InsertIntoTable(u: UnresolvedRelation, parts, child, _, _) if child.resolved => -i.copy(table = EliminateSubqueryAliases(lookupTableFromCatalog(u))) - case u: UnresolvedRelation => -val table = u.tableIdentifier -if (table.database.isDefined && conf.runSQLonFile && !catalog.isTemporaryTable(table) && -(!catalog.databaseExists(table.database.get) || !catalog.tableExists(table))) { - // If the database part is specified, and we support running SQL directly on files, and - // it's not a temporary view, and the table does not exist, then let's just return the - // original UnresolvedRelation. It is possible we are matching a query like "select * - // from parquet.`/path/to/query`". The plan will get resolved later. - // Note that we are testing (!db_exists || !table_exists) because the catalog throws - // an exception from tableExists if the database does not exist. - u -} else { - lookupTableFromCatalog(u) +// If the database part is specified, and we support running SQL directly on files, and +// it's not a temporary view, and the table does not exist, then let's just return the +// original UnresolvedRelation. It is possible we are matching a query like "select * +// from parquet.`/path/to/query`". The plan will get resolved later. +// Note that we are testing (!db_exists || !table_exists) because the catalog throws +// an exception from tableExists if the database does not exist. +private def isRunningDirectlyOnFiles(table: TableIdentifier): Boolean = { + table.database.isDefined && conf.runSQLonFile && !catalog.isTemporaryTable(table) && +(!catalog.databaseExists(table.database.get) || !catalog.tableExists(table)) +} + +// Change the default database name if the plan is a view, and transformDown with the new +// database name to resolve all UnresolvedRelations and Views. +// If the view is defined in a DataSource other than Hive, and the view's child is empty, +// set the view's child to a SimpleCatalogRelation, else throw an AnalysisException. +def resolveView(plan: LogicalPlan): LogicalPlan = plan match
[GitHub] spark pull request #16233: [SPARK-18801][SQL] Add `View` operator to help re...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/16233#discussion_r93898210 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -510,32 +510,94 @@ class Analyzer( * Replaces [[UnresolvedRelation]]s with concrete relations from the catalog. */ object ResolveRelations extends Rule[LogicalPlan] { -private def lookupTableFromCatalog(u: UnresolvedRelation): LogicalPlan = { + +def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { + case i @ InsertIntoTable(u: UnresolvedRelation, parts, child, _, _) if child.resolved => +i.copy(table = EliminateSubqueryAliases(lookupTableFromCatalog(u))) + case u: UnresolvedRelation => resolveRelation(u) +} + +// If the unresolved relation is running directly on files, we just return the original +// UnresolvedRelation, the plan will get resolved later. Else we look up the table from catalog +// and change the default database name if it is a view. +// +// Note this is compatible with the views defined by older versions of Spark(before 2.2), which +// have empty defaultDatabase and all the relations in viewText have database part defined. +def resolveRelation( +plan: LogicalPlan, +defaultDatabase: Option[String] = None): LogicalPlan = plan match { + case u @ UnresolvedRelation(table: TableIdentifier, _) if isRunningDirectlyOnFiles(table) => +u + case u: UnresolvedRelation => +resolveView(lookupTableFromCatalog(u, defaultDatabase)) +} + +// Look up the table with the given name from catalog. If `defaultDatabase` is set, we look up +// the table in the database `defaultDatabase`, else we follow the default way. +private def lookupTableFromCatalog( +u: UnresolvedRelation, +defaultDatabase: Option[String] = None): LogicalPlan = { try { -catalog.lookupRelation(u.tableIdentifier, u.alias) +catalog.lookupRelation(u.tableIdentifier, u.alias, defaultDatabase) } catch { case _: NoSuchTableException => u.failAnalysis(s"Table or view not found: ${u.tableName}") } } -def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { - case i @ InsertIntoTable(u: UnresolvedRelation, parts, child, _, _) if child.resolved => -i.copy(table = EliminateSubqueryAliases(lookupTableFromCatalog(u))) - case u: UnresolvedRelation => -val table = u.tableIdentifier -if (table.database.isDefined && conf.runSQLonFile && !catalog.isTemporaryTable(table) && -(!catalog.databaseExists(table.database.get) || !catalog.tableExists(table))) { - // If the database part is specified, and we support running SQL directly on files, and - // it's not a temporary view, and the table does not exist, then let's just return the - // original UnresolvedRelation. It is possible we are matching a query like "select * - // from parquet.`/path/to/query`". The plan will get resolved later. - // Note that we are testing (!db_exists || !table_exists) because the catalog throws - // an exception from tableExists if the database does not exist. - u -} else { - lookupTableFromCatalog(u) +// If the database part is specified, and we support running SQL directly on files, and +// it's not a temporary view, and the table does not exist, then let's just return the +// original UnresolvedRelation. It is possible we are matching a query like "select * +// from parquet.`/path/to/query`". The plan will get resolved later. +// Note that we are testing (!db_exists || !table_exists) because the catalog throws +// an exception from tableExists if the database does not exist. +private def isRunningDirectlyOnFiles(table: TableIdentifier): Boolean = { + table.database.isDefined && conf.runSQLonFile && !catalog.isTemporaryTable(table) && +(!catalog.databaseExists(table.database.get) || !catalog.tableExists(table)) +} + +// Change the default database name if the plan is a view, and transformDown with the new +// database name to resolve all UnresolvedRelations and Views. +// If the view is defined in a DataSource other than Hive, and the view's child is empty, +// set the view's child to a SimpleCatalogRelation, else throw an AnalysisException. --- End diff -- ``` // If the view is defined in a DataSource other than
[GitHub] spark pull request #16233: [SPARK-18801][SQL] Add `View` operator to help re...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/16233#discussion_r93898130 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -510,32 +510,94 @@ class Analyzer( * Replaces [[UnresolvedRelation]]s with concrete relations from the catalog. */ object ResolveRelations extends Rule[LogicalPlan] { -private def lookupTableFromCatalog(u: UnresolvedRelation): LogicalPlan = { + +def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { + case i @ InsertIntoTable(u: UnresolvedRelation, parts, child, _, _) if child.resolved => +i.copy(table = EliminateSubqueryAliases(lookupTableFromCatalog(u))) + case u: UnresolvedRelation => resolveRelation(u) +} + +// If the unresolved relation is running directly on files, we just return the original +// UnresolvedRelation, the plan will get resolved later. Else we look up the table from catalog +// and change the default database name if it is a view. +// +// Note this is compatible with the views defined by older versions of Spark(before 2.2), which +// have empty defaultDatabase and all the relations in viewText have database part defined. +def resolveRelation( +plan: LogicalPlan, +defaultDatabase: Option[String] = None): LogicalPlan = plan match { + case u @ UnresolvedRelation(table: TableIdentifier, _) if isRunningDirectlyOnFiles(table) => +u + case u: UnresolvedRelation => +resolveView(lookupTableFromCatalog(u, defaultDatabase)) +} + +// Look up the table with the given name from catalog. If `defaultDatabase` is set, we look up +// the table in the database `defaultDatabase`, else we follow the default way. --- End diff -- Let's be specific on what "else we follow the default way" means at here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16233: [SPARK-18801][SQL] Add `View` operator to help re...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/16233#discussion_r93898018 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -510,32 +510,94 @@ class Analyzer( * Replaces [[UnresolvedRelation]]s with concrete relations from the catalog. */ object ResolveRelations extends Rule[LogicalPlan] { -private def lookupTableFromCatalog(u: UnresolvedRelation): LogicalPlan = { + +def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { + case i @ InsertIntoTable(u: UnresolvedRelation, parts, child, _, _) if child.resolved => +i.copy(table = EliminateSubqueryAliases(lookupTableFromCatalog(u))) + case u: UnresolvedRelation => resolveRelation(u) +} + +// If the unresolved relation is running directly on files, we just return the original +// UnresolvedRelation, the plan will get resolved later. Else we look up the table from catalog +// and change the default database name if it is a view. +// +// Note this is compatible with the views defined by older versions of Spark(before 2.2), which +// have empty defaultDatabase and all the relations in viewText have database part defined. +def resolveRelation( +plan: LogicalPlan, +defaultDatabase: Option[String] = None): LogicalPlan = plan match { --- End diff -- Let's explain defaultDatabase more, like when we need it, when it is set, and how it is different from the current database in the session catalog. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16388: [SPARK-18989][SQL] DESC TABLE should not fail wit...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/16388#discussion_r93897706 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala --- @@ -408,8 +408,15 @@ private[hive] class HiveClientImpl( lastAccessTime = h.getLastAccessTime.toLong * 1000, storage = CatalogStorageFormat( locationUri = shim.getDataLocation(h), - inputFormat = Option(h.getInputFormatClass).map(_.getName), - outputFormat = Option(h.getOutputFormatClass).map(_.getName), + // To avoid ClassNotFound exception, we try our best to not get the format class, but get + // the class name directly. However, for non-native tables, there is no interface to get + // the format class name, so we may still throw ClassNotFound in this case. + inputFormat = Option(h.getTTable.getSd.getInputFormat).orElse { +Option(h.getStorageHandler).map(_.getInputFormatClass.getName) --- End diff -- Is it actually also fixed a bug? Is it possible to have a test? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16233: [SPARK-18801][SQL] Add `View` operator to help resolve a...
Github user yhuai commented on the issue: https://github.com/apache/spark/pull/16233 > We can make ResolveRelations View aware, and make it keep track of the default databases (plural - in case of nested views). The default database will be the one of the last seen parent view. This approach makes is trivial to limit the depth of nested views (which might be needed at some point), or we can make this only resolve one layer of nested views at a time and use the analyzer's maxIterations as an implicit limit. Yea. This seems the easiest way to achieve what we need. I am good with this approach. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15996: [SPARK-18567][SQL] Simplify CreateDataSourceTableAsSelec...
Github user yhuai commented on the issue: https://github.com/apache/spark/pull/15996 ah https://github.com/apache/spark/commit/9a1ad71db44558bb6eb380dc23a1a1abbc2f3e98 failed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15996: [SPARK-18567][SQL] Simplify CreateDataSourceTableAsSelec...
Github user yhuai commented on the issue: https://github.com/apache/spark/pull/15996 LGTM. Can you update the comment to address my last comment (https://github.com/apache/spark/pull/15996#discussion_r93730700)? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15996: [SPARK-18567][SQL] Simplify CreateDataSourceTable...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/15996#discussion_r93730700 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala --- @@ -643,6 +644,14 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be withTable("t") { val provider = "org.apache.spark.sql.test.DefaultSource" sql(s"CREATE TABLE t USING $provider") + + // make sure the data source doesn't provide `InsertableRelation`, so that we can only append + // data to it with `CreatableRelationProvider.createRelation` --- End diff -- One last comment. Let's explicitly say that we want to test the case that a data source is a CreatableRelationProvider but its relation does not implement InsertableRelation. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15996: [SPARK-18567][SQL] Simplify CreateDataSourceTable...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/15996#discussion_r93720714 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala --- @@ -195,12 +195,25 @@ class PartitionProviderCompatibilitySuite withTempDir { dir => setupPartitionedDatasourceTable("test", dir) if (enabled) { - spark.sql("msck repair table test") + assert(spark.table("test").count() == 0) +} else { + assert(spark.table("test").count() == 5) } -assert(spark.sql("select * from test").count() == 5) -spark.range(10).selectExpr("id as fieldOne", "id as partCol") + +spark.range(3, 13).selectExpr("id as fieldOne", "id as partCol") --- End diff -- It will be good to also explain the reason that we use (3, 13) in the comment. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15996: [SPARK-18567][SQL] Simplify CreateDataSourceTable...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/15996#discussion_r93720687 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala --- @@ -195,12 +195,25 @@ class PartitionProviderCompatibilitySuite withTempDir { dir => setupPartitionedDatasourceTable("test", dir) if (enabled) { - spark.sql("msck repair table test") + assert(spark.table("test").count() == 0) +} else { + assert(spark.table("test").count() == 5) } -assert(spark.sql("select * from test").count() == 5) -spark.range(10).selectExpr("id as fieldOne", "id as partCol") + +spark.range(3, 13).selectExpr("id as fieldOne", "id as partCol") .write.partitionBy("partCol").mode("append").saveAsTable("test") -assert(spark.sql("select * from test").count() == 15) + +if (enabled) { + // Only the newly written partitions are visible, which means the partitions --- End diff -- Let's also explain why we only see newly written partitions. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15996: [SPARK-18567][SQL] Simplify CreateDataSourceTable...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/15996#discussion_r93720630 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala --- @@ -195,12 +195,25 @@ class PartitionProviderCompatibilitySuite withTempDir { dir => setupPartitionedDatasourceTable("test", dir) if (enabled) { - spark.sql("msck repair table test") --- End diff -- oh, we are checking the number of rows before the msck, right? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15996: [SPARK-18567][SQL] Simplify CreateDataSourceTable...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/15996#discussion_r93720613 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala --- @@ -195,12 +195,25 @@ class PartitionProviderCompatibilitySuite withTempDir { dir => setupPartitionedDatasourceTable("test", dir) if (enabled) { - spark.sql("msck repair table test") --- End diff -- Why we do not need this anymore? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15996: [SPARK-18567][SQL] Simplify CreateDataSourceTable...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/15996#discussion_r93720521 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala --- @@ -140,153 +140,55 @@ case class CreateDataSourceTableAsSelectCommand( val tableIdentWithDB = table.identifier.copy(database = Some(db)) val tableName = tableIdentWithDB.unquotedString -var createMetastoreTable = false -// We may need to reorder the columns of the query to match the existing table. -var reorderedColumns = Option.empty[Seq[NamedExpression]] if (sessionState.catalog.tableExists(tableIdentWithDB)) { - // Check if we need to throw an exception or just return. - mode match { -case SaveMode.ErrorIfExists => - throw new AnalysisException(s"Table $tableName already exists. " + -s"If you are using saveAsTable, you can set SaveMode to SaveMode.Append to " + -s"insert data into the table or set SaveMode to SaveMode.Overwrite to overwrite" + -s"the existing data. " + -s"Or, if you are using SQL CREATE TABLE, you need to drop $tableName first.") -case SaveMode.Ignore => - // Since the table already exists and the save mode is Ignore, we will just return. - return Seq.empty[Row] -case SaveMode.Append => - val existingTable = sessionState.catalog.getTableMetadata(tableIdentWithDB) - - if (existingTable.provider.get == DDLUtils.HIVE_PROVIDER) { -throw new AnalysisException(s"Saving data in the Hive serde table $tableName is " + - "not supported yet. Please use the insertInto() API as an alternative.") - } - - // Check if the specified data source match the data source of the existing table. - val existingProvider = DataSource.lookupDataSource(existingTable.provider.get) - val specifiedProvider = DataSource.lookupDataSource(table.provider.get) - // TODO: Check that options from the resolved relation match the relation that we are - // inserting into (i.e. using the same compression). - if (existingProvider != specifiedProvider) { -throw new AnalysisException(s"The format of the existing table $tableName is " + - s"`${existingProvider.getSimpleName}`. It doesn't match the specified format " + - s"`${specifiedProvider.getSimpleName}`.") - } - - if (query.schema.length != existingTable.schema.length) { -throw new AnalysisException( - s"The column number of the existing table $tableName" + -s"(${existingTable.schema.catalogString}) doesn't match the data schema" + -s"(${query.schema.catalogString})") - } - - val resolver = sessionState.conf.resolver - val tableCols = existingTable.schema.map(_.name) - - reorderedColumns = Some(existingTable.schema.map { f => -query.resolve(Seq(f.name), resolver).getOrElse { - val inputColumns = query.schema.map(_.name).mkString(", ") - throw new AnalysisException( -s"cannot resolve '${f.name}' given input columns: [$inputColumns]") -} - }) - - // In `AnalyzeCreateTable`, we verified the consistency between the user-specified table - // definition(partition columns, bucketing) and the SELECT query, here we also need to - // verify the the consistency between the user-specified table definition and the existing - // table definition. - - // Check if the specified partition columns match the existing table. - val specifiedPartCols = CatalogUtils.normalizePartCols( -tableName, tableCols, table.partitionColumnNames, resolver) - if (specifiedPartCols != existingTable.partitionColumnNames) { -throw new AnalysisException( - s""" -|Specified partitioning does not match that of the existing table $tableName. -|Specified partition columns: [${specifiedPartCols.mkString(", ")}] -|Existing partition columns: [${existingTable.partitionColumnNames.mkString(", ")}] - """.stripMargin) - } - - // Check if the specified bucketing match the existing table. - val specifiedBucketSpec = tab
[GitHub] spark pull request #15996: [SPARK-18567][SQL] Simplify CreateDataSourceTable...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/15996#discussion_r93720313 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala --- @@ -363,48 +365,125 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { throw new AnalysisException("Cannot create hive serde table with saveAsTable API") } -val tableExists = df.sparkSession.sessionState.catalog.tableExists(tableIdent) - -(tableExists, mode) match { - case (true, SaveMode.Ignore) => -// Do nothing - - case (true, SaveMode.ErrorIfExists) => -throw new AnalysisException(s"Table $tableIdent already exists.") - - case _ => -val existingTable = if (tableExists) { - Some(df.sparkSession.sessionState.catalog.getTableMetadata(tableIdent)) -} else { - None -} -val storage = if (tableExists) { - existingTable.get.storage -} else { - DataSource.buildStorageFormatFromOptions(extraOptions.toMap) -} -val tableType = if (tableExists) { - existingTable.get.tableType -} else if (storage.locationUri.isDefined) { - CatalogTableType.EXTERNAL -} else { - CatalogTableType.MANAGED +val catalog = df.sparkSession.sessionState.catalog +val db = tableIdent.database.getOrElse(catalog.getCurrentDatabase) +val tableIdentWithDB = tableIdent.copy(database = Some(db)) +val tableName = tableIdent.unquotedString + +catalog.getTableMetadataOption(tableIdent) match { + // If the table already exists... + case Some(tableMeta) => +mode match { + case SaveMode.Ignore => // Do nothing + + case SaveMode.ErrorIfExists => +throw new AnalysisException(s"Table $tableName already exists. You can set SaveMode " + + "to SaveMode.Append to insert data into the table or set SaveMode to " + + "SaveMode.Overwrite to overwrite the existing data.") + + case SaveMode.Append => +// Check if the specified data source match the data source of the existing table. +val specifiedProvider = DataSource.lookupDataSource(source) +// TODO: Check that options from the resolved relation match the relation that we are +// inserting into (i.e. using the same compression). + +// Pass a table identifier with database part, so that `lookupRelation` won't get temp +// views unexpectedly. + EliminateSubqueryAliases(catalog.lookupRelation(tableIdentWithDB)) match { + case l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation, _, _) => +// check if the file formats match +l.relation match { + case r: HadoopFsRelation if r.fileFormat.getClass != specifiedProvider => +throw new AnalysisException( + s"The file format of the existing table $tableName is " + +s"`${r.fileFormat.getClass.getName}`. It doesn't match the specified " + +s"format `$source`") + case _ => +} + case s: SimpleCatalogRelation if DDLUtils.isDatasourceTable(s.metadata) => // OK. + case c: CatalogRelation if c.catalogTable.provider == Some(DDLUtils.HIVE_PROVIDER) => +throw new AnalysisException(s"Saving data in the Hive serde table $tableName " + + s"is not supported yet. Please use the insertInto() API as an alternative.") + case o => +throw new AnalysisException(s"Saving data in ${o.toString} is not supported.") +} + +val existingSchema = tableMeta.schema +if (df.logicalPlan.schema.size != existingSchema.size) { + throw new AnalysisException( +s"The column number of the existing table $tableName" + + s"(${existingSchema.catalogString}) doesn't match the data schema" + + s"(${df.logicalPlan.schema.catalogString})") +} + +if (partitioningColumns.isDefined) { + logWarning("append to an existing table, the specified partition columns " + +s"[${partitioningColumns.get.mkString(", ")}] will be ignored.") +
[GitHub] spark pull request #15996: [SPARK-18567][SQL] Simplify CreateDataSourceTable...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/15996#discussion_r93720195 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala --- @@ -364,48 +366,162 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { throw new AnalysisException("Cannot create hive serde table with saveAsTable API") } -val tableExists = df.sparkSession.sessionState.catalog.tableExists(tableIdent) - -(tableExists, mode) match { - case (true, SaveMode.Ignore) => -// Do nothing - - case (true, SaveMode.ErrorIfExists) => -throw new AnalysisException(s"Table $tableIdent already exists.") - - case _ => -val existingTable = if (tableExists) { - Some(df.sparkSession.sessionState.catalog.getTableMetadata(tableIdent)) -} else { - None -} -val storage = if (tableExists) { - existingTable.get.storage -} else { - DataSource.buildStorageFormatFromOptions(extraOptions.toMap) -} -val tableType = if (tableExists) { - existingTable.get.tableType -} else if (storage.locationUri.isDefined) { - CatalogTableType.EXTERNAL -} else { - CatalogTableType.MANAGED +val catalog = df.sparkSession.sessionState.catalog +val db = tableIdent.database.getOrElse(catalog.getCurrentDatabase) +val tableIdentWithDB = tableIdent.copy(database = Some(db)) +val tableName = tableIdentWithDB.unquotedString + +catalog.getTableMetadataOption(tableIdentWithDB) match { + // If the table already exists... + case Some(existingTable) => +mode match { + case SaveMode.Ignore => // Do nothing + + case SaveMode.ErrorIfExists => +throw new AnalysisException(s"Table $tableName already exists. You can set SaveMode " + + "to SaveMode.Append to insert data into the table or set SaveMode to " + + "SaveMode.Overwrite to overwrite the existing data.") + + case SaveMode.Append => +if (existingTable.tableType == CatalogTableType.VIEW) { + throw new AnalysisException("Saving data into a view is not allowed.") +} + +if (existingTable.provider.get == DDLUtils.HIVE_PROVIDER) { + throw new AnalysisException(s"Saving data in the Hive serde table $tableName is " + +"not supported yet. Please use the insertInto() API as an alternative.") +} + +// Check if the specified data source match the data source of the existing table. +val existingProvider = DataSource.lookupDataSource(existingTable.provider.get) +val specifiedProvider = DataSource.lookupDataSource(source) +// TODO: Check that options from the resolved relation match the relation that we are +// inserting into (i.e. using the same compression). +if (existingProvider != specifiedProvider) { + throw new AnalysisException(s"The format of the existing table $tableName is " + +s"`${existingProvider.getSimpleName}`. It doesn't match the specified format " + +s"`${specifiedProvider.getSimpleName}`.") +} + +if (df.schema.length != existingTable.schema.length) { + throw new AnalysisException( +s"The column number of the existing table $tableName" + + s"(${existingTable.schema.catalogString}) doesn't match the data schema" + + s"(${df.schema.catalogString})") +} + +val resolver = df.sparkSession.sessionState.conf.resolver +val tableCols = existingTable.schema.map(_.name) + +// As we are inserting into an existing table, we should respect the existing schema and +// adjust the column order of the given dataframe according to it, or throw exception +// if the column names do not match. +val adjustedColumns = tableCols.map { col => + df.queryExecution.analyzed.resolve(Seq(col), resolver).getOrElse { +val inputColumns = df.schema.map(_.name).mkString(", ") +throw new AnalysisException( + s"cannot resolve '$col' given input columns: [$inputColumns]") + } +} + +
[GitHub] spark pull request #15996: [SPARK-18567][SQL] Simplify CreateDataSourceTable...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/15996#discussion_r93719938 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala --- @@ -364,48 +366,162 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { throw new AnalysisException("Cannot create hive serde table with saveAsTable API") } -val tableExists = df.sparkSession.sessionState.catalog.tableExists(tableIdent) - -(tableExists, mode) match { - case (true, SaveMode.Ignore) => -// Do nothing - - case (true, SaveMode.ErrorIfExists) => -throw new AnalysisException(s"Table $tableIdent already exists.") - - case _ => -val existingTable = if (tableExists) { - Some(df.sparkSession.sessionState.catalog.getTableMetadata(tableIdent)) -} else { - None -} -val storage = if (tableExists) { - existingTable.get.storage -} else { - DataSource.buildStorageFormatFromOptions(extraOptions.toMap) -} -val tableType = if (tableExists) { - existingTable.get.tableType -} else if (storage.locationUri.isDefined) { - CatalogTableType.EXTERNAL -} else { - CatalogTableType.MANAGED +val catalog = df.sparkSession.sessionState.catalog +val db = tableIdent.database.getOrElse(catalog.getCurrentDatabase) +val tableIdentWithDB = tableIdent.copy(database = Some(db)) +val tableName = tableIdentWithDB.unquotedString + +catalog.getTableMetadataOption(tableIdentWithDB) match { + // If the table already exists... + case Some(existingTable) => +mode match { + case SaveMode.Ignore => // Do nothing + + case SaveMode.ErrorIfExists => +throw new AnalysisException(s"Table $tableName already exists. You can set SaveMode " + + "to SaveMode.Append to insert data into the table or set SaveMode to " + + "SaveMode.Overwrite to overwrite the existing data.") + + case SaveMode.Append => +if (existingTable.tableType == CatalogTableType.VIEW) { + throw new AnalysisException("Saving data into a view is not allowed.") +} + +if (existingTable.provider.get == DDLUtils.HIVE_PROVIDER) { + throw new AnalysisException(s"Saving data in the Hive serde table $tableName is " + +"not supported yet. Please use the insertInto() API as an alternative.") +} + +// Check if the specified data source match the data source of the existing table. +val existingProvider = DataSource.lookupDataSource(existingTable.provider.get) +val specifiedProvider = DataSource.lookupDataSource(source) +// TODO: Check that options from the resolved relation match the relation that we are +// inserting into (i.e. using the same compression). +if (existingProvider != specifiedProvider) { + throw new AnalysisException(s"The format of the existing table $tableName is " + +s"`${existingProvider.getSimpleName}`. It doesn't match the specified format " + +s"`${specifiedProvider.getSimpleName}`.") +} + +if (df.schema.length != existingTable.schema.length) { + throw new AnalysisException( +s"The column number of the existing table $tableName" + + s"(${existingTable.schema.catalogString}) doesn't match the data schema" + + s"(${df.schema.catalogString})") +} + +val resolver = df.sparkSession.sessionState.conf.resolver +val tableCols = existingTable.schema.map(_.name) + +// As we are inserting into an existing table, we should respect the existing schema and +// adjust the column order of the given dataframe according to it, or throw exception +// if the column names do not match. +val adjustedColumns = tableCols.map { col => + df.queryExecution.analyzed.resolve(Seq(col), resolver).getOrElse { +val inputColumns = df.schema.map(_.name).mkString(", ") +throw new AnalysisException( + s"cannot resolve '$col' given input columns: [$inputColumns]") + } +} --- End diff --
[GitHub] spark pull request #15996: [SPARK-18567][SQL] Simplify CreateDataSourceTable...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/15996#discussion_r93719624 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala --- @@ -635,4 +638,13 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be checkAnswer(spark.table("t"), Row(1, "a") :: Row(2, "b") :: Nil) } } + + test("use saveAsTable to append to a data source table implementing CreatableRelationProvider") { +withTable("t") { + val provider = "org.apache.spark.sql.test.DefaultSource" --- End diff -- I am thinking that we probably should create a test data source that explicitly mentions its relation is not an InsertableRelation. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15996: [SPARK-18567][SQL] Simplify CreateDataSourceTable...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/15996#discussion_r93718921 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala --- @@ -635,4 +638,13 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be checkAnswer(spark.table("t"), Row(1, "a") :: Row(2, "b") :: Nil) } } + + test("use saveAsTable to append to a data source table implementing CreatableRelationProvider") { +withTable("t") { + val provider = "org.apache.spark.sql.test.DefaultSource" --- End diff -- Is the relation created by this guy an InsertableRelation? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16233: [SPARK-18801][SQL] Add `View` operator to help resolve a...
Github user yhuai commented on the issue: https://github.com/apache/spark/pull/16233 One general comment, let's explain how this patch maintains the compatibility with views defined by previous versions of Spark. It is also good to explain it in the corresponding part in the code. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16233: [SPARK-18801][SQL] Add `View` operator to help re...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/16233#discussion_r93551189 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala --- @@ -549,17 +549,26 @@ class SessionCatalog( * * If a database is specified in `name`, this will return the table/view from that database. * If no database is specified, this will first attempt to return a temporary table/view with - * the same name, then, if that does not exist, return the table/view from the current database. + * the same name, then, if that does not exist, and currentDatabase is defined, return the + * table/view from the currentDatabase, else return the table/view from the catalog.currentDb. * * Note that, the global temp view database is also valid here, this will return the global temp * view matching the given name. * - * If the relation is a view, the relation will be wrapped in a [[SubqueryAlias]] which will - * track the name of the view. + * If the relation is a view, we add a [[View]] operator over the relation, and wrap the logical + * plan in a [[SubqueryAlias]] which will track the name of the view. + * + * @param name The name of the table/view that we lookup. + * @param alias The alias name of the table/view that we lookup. + * @param currentDatabase The database name we should use to lookup the table/view, if the + *database part of [[TableIdentifier]] is not defined. */ - def lookupRelation(name: TableIdentifier, alias: Option[String] = None): LogicalPlan = { + def lookupRelation( + name: TableIdentifier, + alias: Option[String] = None, + currentDatabase: Option[String] = None): LogicalPlan = { synchronized { - val db = formatDatabaseName(name.database.getOrElse(currentDb)) + val db = formatDatabaseName(name.database.getOrElse(currentDatabase.getOrElse(currentDb))) --- End diff -- I feel that currentDatabase and currentDb will confuse readers. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16233: [SPARK-18801][SQL] Add `View` operator to help re...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/16233#discussion_r93551078 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -510,32 +510,62 @@ class Analyzer( * Replaces [[UnresolvedRelation]]s with concrete relations from the catalog. */ object ResolveRelations extends Rule[LogicalPlan] { -private def lookupTableFromCatalog(u: UnresolvedRelation): LogicalPlan = { + +def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { + case i @ InsertIntoTable(u: UnresolvedRelation, parts, child, _, _) if child.resolved => +i.copy(table = EliminateSubqueryAliases(lookupTableFromCatalog(u))) + case u: UnresolvedRelation => resolveRelation(u) +} + +// If the unresolved relation is running directly on files, we just return the original +// UnresolvedRelation, the plan will get resolved later. Else we lookup the table from catalog +// and change the current database name if it is a view. +def resolveRelation( +plan: LogicalPlan, +currentDatabase: Option[String] = None): LogicalPlan = plan match { + case u @ UnresolvedRelation(table: TableIdentifier, _) if isRunningDirectlyOnFiles(table) => +u + case u: UnresolvedRelation => +resolveView(lookupTableFromCatalog(u, currentDatabase)) +} + +// Lookup the table with the given name from catalog. If `currentDatabase` is set, we lookup +// the table in the database `currentDatabase`, else we follow the default way. +private def lookupTableFromCatalog( +u: UnresolvedRelation, +currentDatabase: Option[String] = None): LogicalPlan = { try { -catalog.lookupRelation(u.tableIdentifier, u.alias) +catalog.lookupRelation(u.tableIdentifier, u.alias, currentDatabase) } catch { case _: NoSuchTableException => u.failAnalysis(s"Table or view not found: ${u.tableName}") } } -def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { - case i @ InsertIntoTable(u: UnresolvedRelation, parts, child, _, _) if child.resolved => -i.copy(table = EliminateSubqueryAliases(lookupTableFromCatalog(u))) - case u: UnresolvedRelation => -val table = u.tableIdentifier -if (table.database.isDefined && conf.runSQLonFile && !catalog.isTemporaryTable(table) && -(!catalog.databaseExists(table.database.get) || !catalog.tableExists(table))) { - // If the database part is specified, and we support running SQL directly on files, and - // it's not a temporary view, and the table does not exist, then let's just return the - // original UnresolvedRelation. It is possible we are matching a query like "select * - // from parquet.`/path/to/query`". The plan will get resolved later. - // Note that we are testing (!db_exists || !table_exists) because the catalog throws - // an exception from tableExists if the database does not exist. - u -} else { - lookupTableFromCatalog(u) +// If the database part is specified, and we support running SQL directly on files, and +// it's not a temporary view, and the table does not exist, then let's just return the +// original UnresolvedRelation. It is possible we are matching a query like "select * +// from parquet.`/path/to/query`". The plan will get resolved later. +// Note that we are testing (!db_exists || !table_exists) because the catalog throws +// an exception from tableExists if the database does not exist. +private def isRunningDirectlyOnFiles(table: TableIdentifier): Boolean = { + table.database.isDefined && conf.runSQLonFile && !catalog.isTemporaryTable(table) && +(!catalog.databaseExists(table.database.get) || !catalog.tableExists(table)) +} + +// Change the current database name if the plan is a view, and transformDown with the new +// database name to resolve all UnresolvedRelation. +def resolveView(plan: LogicalPlan): LogicalPlan = plan match { + case p @ SubqueryAlias(_, view: View, _) => +val currentDatabase = view.currentDatabase +val newChild = view transform { + case v: View if !v.resolved => +resolveView(v) + case u: UnresolvedRelation => +resolveRelation(u, currentDatabase) --- End diff -- oh, resolveRelation's second argument is the current db. --- If your
[GitHub] spark pull request #16233: [SPARK-18801][SQL] Add `View` operator to help re...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/16233#discussion_r93549589 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala --- @@ -396,6 +396,20 @@ case class InsertIntoTable( } /** + * A container for holding the current database of a view and a query plan. + * This operator will be removed at the begining of the optimize stage so we can see what is part + * of a view in a analyzed plan. + * + * @param child The logical plan of this view. + * @param currentDatabase The database name we use to resolve the logical plan. + */ +case class View(child: LogicalPlan, currentDatabase: Option[String]) extends LogicalPlan { --- End diff -- Also, I feel `currentDatabase` may not be a good name. I am wondering if `defaultDatabase` is better. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16233: [SPARK-18801][SQL] Add `View` operator to help re...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/16233#discussion_r93549309 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -510,32 +510,62 @@ class Analyzer( * Replaces [[UnresolvedRelation]]s with concrete relations from the catalog. */ object ResolveRelations extends Rule[LogicalPlan] { -private def lookupTableFromCatalog(u: UnresolvedRelation): LogicalPlan = { + +def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { + case i @ InsertIntoTable(u: UnresolvedRelation, parts, child, _, _) if child.resolved => +i.copy(table = EliminateSubqueryAliases(lookupTableFromCatalog(u))) + case u: UnresolvedRelation => resolveRelation(u) +} + +// If the unresolved relation is running directly on files, we just return the original +// UnresolvedRelation, the plan will get resolved later. Else we lookup the table from catalog +// and change the current database name if it is a view. +def resolveRelation( +plan: LogicalPlan, +currentDatabase: Option[String] = None): LogicalPlan = plan match { + case u @ UnresolvedRelation(table: TableIdentifier, _) if isRunningDirectlyOnFiles(table) => +u + case u: UnresolvedRelation => +resolveView(lookupTableFromCatalog(u, currentDatabase)) +} + +// Lookup the table with the given name from catalog. If `currentDatabase` is set, we lookup +// the table in the database `currentDatabase`, else we follow the default way. +private def lookupTableFromCatalog( --- End diff -- If so, should we just inline it? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16233: [SPARK-18801][SQL] Add `View` operator to help re...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/16233#discussion_r93549165 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -510,32 +510,62 @@ class Analyzer( * Replaces [[UnresolvedRelation]]s with concrete relations from the catalog. */ object ResolveRelations extends Rule[LogicalPlan] { -private def lookupTableFromCatalog(u: UnresolvedRelation): LogicalPlan = { + +def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { + case i @ InsertIntoTable(u: UnresolvedRelation, parts, child, _, _) if child.resolved => +i.copy(table = EliminateSubqueryAliases(lookupTableFromCatalog(u))) + case u: UnresolvedRelation => resolveRelation(u) +} + +// If the unresolved relation is running directly on files, we just return the original +// UnresolvedRelation, the plan will get resolved later. Else we lookup the table from catalog +// and change the current database name if it is a view. +def resolveRelation( +plan: LogicalPlan, +currentDatabase: Option[String] = None): LogicalPlan = plan match { + case u @ UnresolvedRelation(table: TableIdentifier, _) if isRunningDirectlyOnFiles(table) => +u + case u: UnresolvedRelation => +resolveView(lookupTableFromCatalog(u, currentDatabase)) +} + +// Lookup the table with the given name from catalog. If `currentDatabase` is set, we lookup +// the table in the database `currentDatabase`, else we follow the default way. +private def lookupTableFromCatalog( +u: UnresolvedRelation, +currentDatabase: Option[String] = None): LogicalPlan = { try { -catalog.lookupRelation(u.tableIdentifier, u.alias) +catalog.lookupRelation(u.tableIdentifier, u.alias, currentDatabase) } catch { case _: NoSuchTableException => u.failAnalysis(s"Table or view not found: ${u.tableName}") } } -def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { - case i @ InsertIntoTable(u: UnresolvedRelation, parts, child, _, _) if child.resolved => -i.copy(table = EliminateSubqueryAliases(lookupTableFromCatalog(u))) - case u: UnresolvedRelation => -val table = u.tableIdentifier -if (table.database.isDefined && conf.runSQLonFile && !catalog.isTemporaryTable(table) && -(!catalog.databaseExists(table.database.get) || !catalog.tableExists(table))) { - // If the database part is specified, and we support running SQL directly on files, and - // it's not a temporary view, and the table does not exist, then let's just return the - // original UnresolvedRelation. It is possible we are matching a query like "select * - // from parquet.`/path/to/query`". The plan will get resolved later. - // Note that we are testing (!db_exists || !table_exists) because the catalog throws - // an exception from tableExists if the database does not exist. - u -} else { - lookupTableFromCatalog(u) +// If the database part is specified, and we support running SQL directly on files, and +// it's not a temporary view, and the table does not exist, then let's just return the +// original UnresolvedRelation. It is possible we are matching a query like "select * +// from parquet.`/path/to/query`". The plan will get resolved later. +// Note that we are testing (!db_exists || !table_exists) because the catalog throws +// an exception from tableExists if the database does not exist. +private def isRunningDirectlyOnFiles(table: TableIdentifier): Boolean = { + table.database.isDefined && conf.runSQLonFile && !catalog.isTemporaryTable(table) && +(!catalog.databaseExists(table.database.get) || !catalog.tableExists(table)) +} + +// Change the current database name if the plan is a view, and transformDown with the new +// database name to resolve all UnresolvedRelation. +def resolveView(plan: LogicalPlan): LogicalPlan = plan match { + case p @ SubqueryAlias(_, view: View, _) => +val currentDatabase = view.currentDatabase +val newChild = view transform { + case v: View if !v.resolved => +resolveView(v) + case u: UnresolvedRelation => +resolveRelation(u, currentDatabase) --- End diff -- Is this right? `u` can also have db name, right? --- If your project is s
[GitHub] spark pull request #16233: [SPARK-18801][SQL] Add `View` operator to help re...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/16233#discussion_r93548847 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -510,32 +510,62 @@ class Analyzer( * Replaces [[UnresolvedRelation]]s with concrete relations from the catalog. */ object ResolveRelations extends Rule[LogicalPlan] { -private def lookupTableFromCatalog(u: UnresolvedRelation): LogicalPlan = { + +def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { + case i @ InsertIntoTable(u: UnresolvedRelation, parts, child, _, _) if child.resolved => +i.copy(table = EliminateSubqueryAliases(lookupTableFromCatalog(u))) + case u: UnresolvedRelation => resolveRelation(u) +} + +// If the unresolved relation is running directly on files, we just return the original +// UnresolvedRelation, the plan will get resolved later. Else we lookup the table from catalog +// and change the current database name if it is a view. +def resolveRelation( +plan: LogicalPlan, +currentDatabase: Option[String] = None): LogicalPlan = plan match { + case u @ UnresolvedRelation(table: TableIdentifier, _) if isRunningDirectlyOnFiles(table) => +u + case u: UnresolvedRelation => +resolveView(lookupTableFromCatalog(u, currentDatabase)) +} + +// Lookup the table with the given name from catalog. If `currentDatabase` is set, we lookup +// the table in the database `currentDatabase`, else we follow the default way. +private def lookupTableFromCatalog( --- End diff -- Is this function only used once? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16357: [SPARK-18928][branch-2.0]Check TaskContext.isInterrupted...
Github user yhuai commented on the issue: https://github.com/apache/spark/pull/16357 @mridulm ok. I merged this because it is a backport (the original patch has already been merged to 2.1 and master) and I believe Josh has already addressed your concerns. If you want us hold the merge, it will be good to explicitly mention it next time. Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
spark git commit: [SPARK-18951] Upgrade com.thoughtworks.paranamer/paranamer to 2.6
Repository: spark Updated Branches: refs/heads/master b7650f11c -> 1a6438897 [SPARK-18951] Upgrade com.thoughtworks.paranamer/paranamer to 2.6 ## What changes were proposed in this pull request? I recently hit a bug of com.thoughtworks.paranamer/paranamer, which causes jackson fail to handle byte array defined in a case class. Then I find https://github.com/FasterXML/jackson-module-scala/issues/48, which suggests that it is caused by a bug in paranamer. Let's upgrade paranamer. Since we are using jackson 2.6.5 and jackson-module-paranamer 2.6.5 use com.thoughtworks.paranamer/paranamer 2.6, I suggests that we upgrade paranamer to 2.6. Author: Yin Huai <yh...@databricks.com> Closes #16359 from yhuai/SPARK-18951. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1a643889 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1a643889 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1a643889 Branch: refs/heads/master Commit: 1a64388973711b4e567f25fa33d752066a018b49 Parents: b7650f1 Author: Yin Huai <yh...@databricks.com> Authored: Wed Dec 21 09:26:13 2016 -0800 Committer: Yin Huai <yh...@databricks.com> Committed: Wed Dec 21 09:26:13 2016 -0800 -- dev/deps/spark-deps-hadoop-2.2 | 2 +- dev/deps/spark-deps-hadoop-2.3 | 2 +- dev/deps/spark-deps-hadoop-2.4 | 2 +- dev/deps/spark-deps-hadoop-2.6 | 2 +- dev/deps/spark-deps-hadoop-2.7 | 2 +- pom.xml| 7 ++- 6 files changed, 11 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/1a643889/dev/deps/spark-deps-hadoop-2.2 -- diff --git a/dev/deps/spark-deps-hadoop-2.2 b/dev/deps/spark-deps-hadoop-2.2 index afbdae0..9cbab3d8 100644 --- a/dev/deps/spark-deps-hadoop-2.2 +++ b/dev/deps/spark-deps-hadoop-2.2 @@ -128,7 +128,7 @@ objenesis-2.1.jar opencsv-2.3.jar oro-2.0.8.jar osgi-resource-locator-1.0.1.jar -paranamer-2.3.jar +paranamer-2.6.jar parquet-column-1.8.1.jar parquet-common-1.8.1.jar parquet-encoding-1.8.1.jar http://git-wip-us.apache.org/repos/asf/spark/blob/1a643889/dev/deps/spark-deps-hadoop-2.3 -- diff --git a/dev/deps/spark-deps-hadoop-2.3 b/dev/deps/spark-deps-hadoop-2.3 index adf3863..63ce6c6 100644 --- a/dev/deps/spark-deps-hadoop-2.3 +++ b/dev/deps/spark-deps-hadoop-2.3 @@ -135,7 +135,7 @@ objenesis-2.1.jar opencsv-2.3.jar oro-2.0.8.jar osgi-resource-locator-1.0.1.jar -paranamer-2.3.jar +paranamer-2.6.jar parquet-column-1.8.1.jar parquet-common-1.8.1.jar parquet-encoding-1.8.1.jar http://git-wip-us.apache.org/repos/asf/spark/blob/1a643889/dev/deps/spark-deps-hadoop-2.4 -- diff --git a/dev/deps/spark-deps-hadoop-2.4 b/dev/deps/spark-deps-hadoop-2.4 index 88e6b3f..122d5c2 100644 --- a/dev/deps/spark-deps-hadoop-2.4 +++ b/dev/deps/spark-deps-hadoop-2.4 @@ -135,7 +135,7 @@ objenesis-2.1.jar opencsv-2.3.jar oro-2.0.8.jar osgi-resource-locator-1.0.1.jar -paranamer-2.3.jar +paranamer-2.6.jar parquet-column-1.8.1.jar parquet-common-1.8.1.jar parquet-encoding-1.8.1.jar http://git-wip-us.apache.org/repos/asf/spark/blob/1a643889/dev/deps/spark-deps-hadoop-2.6 -- diff --git a/dev/deps/spark-deps-hadoop-2.6 b/dev/deps/spark-deps-hadoop-2.6 index 15c5d9f..776aabd 100644 --- a/dev/deps/spark-deps-hadoop-2.6 +++ b/dev/deps/spark-deps-hadoop-2.6 @@ -143,7 +143,7 @@ objenesis-2.1.jar opencsv-2.3.jar oro-2.0.8.jar osgi-resource-locator-1.0.1.jar -paranamer-2.3.jar +paranamer-2.6.jar parquet-column-1.8.1.jar parquet-common-1.8.1.jar parquet-encoding-1.8.1.jar http://git-wip-us.apache.org/repos/asf/spark/blob/1a643889/dev/deps/spark-deps-hadoop-2.7 -- diff --git a/dev/deps/spark-deps-hadoop-2.7 b/dev/deps/spark-deps-hadoop-2.7 index 77fb537..524e824 100644 --- a/dev/deps/spark-deps-hadoop-2.7 +++ b/dev/deps/spark-deps-hadoop-2.7 @@ -144,7 +144,7 @@ objenesis-2.1.jar opencsv-2.3.jar oro-2.0.8.jar osgi-resource-locator-1.0.1.jar -paranamer-2.3.jar +paranamer-2.6.jar parquet-column-1.8.1.jar parquet-common-1.8.1.jar parquet-encoding-1.8.1.jar http://git-wip-us.apache.org/repos/asf/spark/blob/1a643889/pom.xml -- diff --git a/pom.xml b/pom.xml index 4f12085..72e5442 100644 --- a/pom.xml +++ b/pom.xml @@ -179,7 +179,7 @@ 4.5.3 1.1 2.52.0 -2.8 +2.6 1.8 1.0.0 @@ -1863,6 +1863,11 @@ + +com.thoughtworks.paranamer +paranamer +
[GitHub] spark issue #16359: [SPARK-18951] Upgrade com.thoughtworks.paranamer/paranam...
Github user yhuai commented on the issue: https://github.com/apache/spark/pull/16359 Thanks! I will get this in master. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16359: [SPARK-18951] Upgrade com.thoughtworks.paranamer/paranam...
Github user yhuai commented on the issue: https://github.com/apache/spark/pull/16359 test this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
spark git commit: [SPARK-18928][BRANCH-2.0] Check TaskContext.isInterrupted() in FileScanRDD, JDBCRDD & UnsafeSorter
Repository: spark Updated Branches: refs/heads/branch-2.0 678d91c1d -> 2aae220b5 [SPARK-18928][BRANCH-2.0] Check TaskContext.isInterrupted() in FileScanRDD, JDBCRDD & UnsafeSorter This is a branch-2.0 backport of #16340; the original description follows: ## What changes were proposed in this pull request? In order to respond to task cancellation, Spark tasks must periodically check `TaskContext.isInterrupted()`, but this check is missing on a few critical read paths used in Spark SQL, including `FileScanRDD`, `JDBCRDD`, and UnsafeSorter-based sorts. This can cause interrupted / cancelled tasks to continue running and become zombies (as also described in #16189). This patch aims to fix this problem by adding `TaskContext.isInterrupted()` checks to these paths. Note that I could have used `InterruptibleIterator` to simply wrap a bunch of iterators but in some cases this would have an adverse performance penalty or might not be effective due to certain special uses of Iterators in Spark SQL. Instead, I inlined `InterruptibleIterator`-style logic into existing iterator subclasses. ## How was this patch tested? Tested manually in `spark-shell` with two different reproductions of non-cancellable tasks, one involving scans of huge files and another involving sort-merge joins that spill to disk. Both causes of zombie tasks are fixed by the changes added here. Author: Josh RosenCloses #16357 from JoshRosen/sql-task-interruption-branch-2.0. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2aae220b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2aae220b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2aae220b Branch: refs/heads/branch-2.0 Commit: 2aae220b536065f55b2cf644a2a223aab0d051d0 Parents: 678d91c Author: Josh Rosen Authored: Tue Dec 20 16:05:04 2016 -0800 Committer: Yin Huai Committed: Tue Dec 20 16:05:04 2016 -0800 -- .../collection/unsafe/sort/UnsafeInMemorySorter.java| 11 +++ .../collection/unsafe/sort/UnsafeSorterSpillReader.java | 11 +++ .../spark/sql/execution/datasources/FileScanRDD.scala | 12 ++-- .../spark/sql/execution/datasources/jdbc/JDBCRDD.scala | 9 - 4 files changed, 40 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/2aae220b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java -- diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java index b517371..2bd756f 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java @@ -21,6 +21,8 @@ import java.util.Comparator; import org.apache.avro.reflect.Nullable; +import org.apache.spark.TaskContext; +import org.apache.spark.TaskKilledException; import org.apache.spark.memory.MemoryConsumer; import org.apache.spark.memory.TaskMemoryManager; import org.apache.spark.unsafe.Platform; @@ -226,6 +228,7 @@ public final class UnsafeInMemorySorter { private long keyPrefix; private int recordLength; private long currentPageNumber; +private final TaskContext taskContext = TaskContext.get(); private SortedIterator(int numRecords, int offset) { this.numRecords = numRecords; @@ -256,6 +259,14 @@ public final class UnsafeInMemorySorter { @Override public void loadNext() { + // Kill the task in case it has been marked as killed. This logic is from + // InterruptibleIterator, but we inline it here instead of wrapping the iterator in order + // to avoid performance overhead. This check is added here in `loadNext()` instead of in + // `hasNext()` because it's technically possible for the caller to be relying on + // `getNumRecords()` instead of `hasNext()` to know when to stop. + if (taskContext != null && taskContext.isInterrupted()) { +throw new TaskKilledException(); + } // This pointer points to a 4-byte record length, followed by the record's bytes final long recordPointer = array.get(offset + position); currentPageNumber = TaskMemoryManager.decodePageNumber(recordPointer); http://git-wip-us.apache.org/repos/asf/spark/blob/2aae220b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java -- diff --git
[GitHub] spark issue #16357: [SPARK-18928][branch-2.0]Check TaskContext.isInterrupted...
Github user yhuai commented on the issue: https://github.com/apache/spark/pull/16357 Merging to branch 2.0. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
spark git commit: [SPARK-18761][BRANCH-2.0] Introduce "task reaper" to oversee task killing in executors
Repository: spark Updated Branches: refs/heads/branch-2.0 1f0c5fa75 -> 678d91c1d [SPARK-18761][BRANCH-2.0] Introduce "task reaper" to oversee task killing in executors Branch-2.0 backport of #16189; original description follows: ## What changes were proposed in this pull request? Spark's current task cancellation / task killing mechanism is "best effort" because some tasks may not be interruptible or may not respond to their "killed" flags being set. If a significant fraction of a cluster's task slots are occupied by tasks that have been marked as killed but remain running then this can lead to a situation where new jobs and tasks are starved of resources that are being used by these zombie tasks. This patch aims to address this problem by adding a "task reaper" mechanism to executors. At a high-level, task killing now launches a new thread which attempts to kill the task and then watches the task and periodically checks whether it has been killed. The TaskReaper will periodically re-attempt to call `TaskRunner.kill()` and will log warnings if the task keeps running. I modified TaskRunner to rename its thread at the start of the task, allowing TaskReaper to take a thread dump and filter it in order to log stacktraces from the exact task thread that we are waiting to finish. If the task has not stopped after a configurable timeout then the TaskReaper will throw an exception to trigger executor JVM death, thereby forcibly freeing any resources consumed by the zombie tasks. This feature is flagged off by default and is controlled by four new configurations under the `spark.task.reaper.*` namespace. See the updated `configuration.md` doc for details. ## How was this patch tested? Tested via a new test case in `JobCancellationSuite`, plus manual testing. Author: Josh RosenCloses #16358 from JoshRosen/cancellation-branch-2.0. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/678d91c1 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/678d91c1 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/678d91c1 Branch: refs/heads/branch-2.0 Commit: 678d91c1d2283d9965a39656af9d383bad093ba8 Parents: 1f0c5fa Author: Josh Rosen Authored: Tue Dec 20 15:56:56 2016 -0800 Committer: Yin Huai Committed: Tue Dec 20 15:56:56 2016 -0800 -- .../org/apache/spark/executor/Executor.scala| 169 ++- .../scala/org/apache/spark/util/Utils.scala | 26 ++- .../org/apache/spark/JobCancellationSuite.scala | 77 + docs/configuration.md | 42 + 4 files changed, 300 insertions(+), 14 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/678d91c1/core/src/main/scala/org/apache/spark/executor/Executor.scala -- diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 9a017f2..93e994b 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -84,6 +84,16 @@ private[spark] class Executor( // Start worker thread pool private val threadPool = ThreadUtils.newDaemonCachedThreadPool("Executor task launch worker") private val executorSource = new ExecutorSource(threadPool, executorId) + // Pool used for threads that supervise task killing / cancellation + private val taskReaperPool = ThreadUtils.newDaemonCachedThreadPool("Task reaper") + // For tasks which are in the process of being killed, this map holds the most recently created + // TaskReaper. All accesses to this map should be synchronized on the map itself (this isn't + // a ConcurrentHashMap because we use the synchronization for purposes other than simply guarding + // the integrity of the map's internal state). The purpose of this map is to prevent the creation + // of a separate TaskReaper for every killTask() of a given task. Instead, this map allows us to + // track whether an existing TaskReaper fulfills the role of a TaskReaper that we would otherwise + // create. The map key is a task id. + private val taskReaperForTask: HashMap[Long, TaskReaper] = HashMap[Long, TaskReaper]() if (!isLocal) { env.metricsSystem.registerSource(executorSource) @@ -93,6 +103,9 @@ private[spark] class Executor( // Whether to load classes in user jars before those in Spark jars private val userClassPathFirst = conf.getBoolean("spark.executor.userClassPathFirst", false) + // Whether to monitor killed / interrupted tasks + private val taskReaperEnabled = conf.getBoolean("spark.task.reaper.enabled", false) + // Create our
[GitHub] spark issue #16358: [SPARK-18761][branch-2.0] Introduce "task reaper" to ove...
Github user yhuai commented on the issue: https://github.com/apache/spark/pull/16358 LGTM. Merging to branch-2.0 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16359: [SPARK-18951] Upgrade com.thoughtworks.paranamer/paranam...
Github user yhuai commented on the issue: https://github.com/apache/spark/pull/16359 @srowen @JoshRosen for review --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16359: [SPARK-18951] Upgrade com.thoughtworks.paranamer/...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/16359#discussion_r93334655 --- Diff: pom.xml --- @@ -179,7 +179,7 @@ 4.5.3 1.1 2.52.0 -2.8 +2.6 --- End diff -- Although we defined the version to 2.8, but it is not used this variable. So, we has been using 2.3. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16359: [SPARK-18951] Upgrade com.thoughtworks.paranamer/...
GitHub user yhuai opened a pull request: https://github.com/apache/spark/pull/16359 [SPARK-18951] Upgrade com.thoughtworks.paranamer/paranamer to 2.6 ## What changes were proposed in this pull request? I recently hit a bug of com.thoughtworks.paranamer/paranamer, which causes jackson fail to handle byte array defined in a case class. Then I find https://github.com/FasterXML/jackson-module-scala/issues/48, which suggests that it is caused by a bug in paranamer. Let's upgrade paranamer. Since we are using jackson 2.6.5 and jackson-module-paranamer 2.6.5 use com.thoughtworks.paranamer/paranamer uses 2.6, I suggests that we upgrade paranamer to 2.6. You can merge this pull request into a Git repository by running: $ git pull https://github.com/yhuai/spark SPARK-18951 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/16359.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #16359 commit c502aeb123641f634c107c0ad8c0f1986fea8ee1 Author: Yin Huai <yh...@databricks.com> Date: 2016-12-20T21:58:42Z [SPARK-18951] Upgrade com.thoughtworks.paranamer/paranamer to 2.6 I recently hit a bug of com.thoughtworks.paranamer/paranamer, which causes jackson fail to handle byte array defined in a case class. Then I find https://github.com/FasterXML/jackson-module-scala/issues/48, which suggests that it is caused by a bug in paranamer. Let's upgrade paranamer. Since we are using jackson 2.6.5 and jackson-module-paranamer 2.6.5 use com.thoughtworks.paranamer/paranamer uses 2.6, I suggests that we upgrade paranamer to 2.6. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16357: [SPARK-18928][branch-2.0]Check TaskContext.isInterrupted...
Github user yhuai commented on the issue: https://github.com/apache/spark/pull/16357 LGTM --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16330: [SPARK-18817][SPARKR][SQL] change derby log outpu...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/16330#discussion_r93176308 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala --- @@ -104,6 +104,12 @@ class SparkHadoopUtil extends Logging { } val bufferSize = conf.get("spark.buffer.size", "65536") hadoopConf.set("io.file.buffer.size", bufferSize) + + if (conf.contains("spark.sql.default.derby.dir")) { --- End diff -- Why do we need to introduce this flag? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16189: [SPARK-18761][CORE] Introduce "task reaper" to oversee t...
Github user yhuai commented on the issue: https://github.com/apache/spark/pull/16189 @mridulm Sure. Also, please feel free to leave more comments :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
spark git commit: [SPARK-18761][CORE] Introduce "task reaper" to oversee task killing in executors
Repository: spark Updated Branches: refs/heads/master 5857b9ac2 -> fa829ce21 [SPARK-18761][CORE] Introduce "task reaper" to oversee task killing in executors ## What changes were proposed in this pull request? Spark's current task cancellation / task killing mechanism is "best effort" because some tasks may not be interruptible or may not respond to their "killed" flags being set. If a significant fraction of a cluster's task slots are occupied by tasks that have been marked as killed but remain running then this can lead to a situation where new jobs and tasks are starved of resources that are being used by these zombie tasks. This patch aims to address this problem by adding a "task reaper" mechanism to executors. At a high-level, task killing now launches a new thread which attempts to kill the task and then watches the task and periodically checks whether it has been killed. The TaskReaper will periodically re-attempt to call `TaskRunner.kill()` and will log warnings if the task keeps running. I modified TaskRunner to rename its thread at the start of the task, allowing TaskReaper to take a thread dump and filter it in order to log stacktraces from the exact task thread that we are waiting to finish. If the task has not stopped after a configurable timeout then the TaskReaper will throw an exception to trigger executor JVM death, thereby forcibly freeing any resources consumed by the zombie tasks. This feature is flagged off by default and is controlled by four new configurations under the `spark.task.reaper.*` namespace. See the updated `configuration.md` doc for details. ## How was this patch tested? Tested via a new test case in `JobCancellationSuite`, plus manual testing. Author: Josh RosenCloses #16189 from JoshRosen/cancellation. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fa829ce2 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fa829ce2 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fa829ce2 Branch: refs/heads/master Commit: fa829ce21fb84028d90b739a49c4ece70a17ccfd Parents: 5857b9a Author: Josh Rosen Authored: Mon Dec 19 18:43:59 2016 -0800 Committer: Yin Huai Committed: Mon Dec 19 18:43:59 2016 -0800 -- .../org/apache/spark/executor/Executor.scala| 169 ++- .../scala/org/apache/spark/util/Utils.scala | 56 +++--- .../org/apache/spark/JobCancellationSuite.scala | 77 + docs/configuration.md | 42 + 4 files changed, 316 insertions(+), 28 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/fa829ce2/core/src/main/scala/org/apache/spark/executor/Executor.scala -- diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 9501dd9..3346f6d 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -84,6 +84,16 @@ private[spark] class Executor( // Start worker thread pool private val threadPool = ThreadUtils.newDaemonCachedThreadPool("Executor task launch worker") private val executorSource = new ExecutorSource(threadPool, executorId) + // Pool used for threads that supervise task killing / cancellation + private val taskReaperPool = ThreadUtils.newDaemonCachedThreadPool("Task reaper") + // For tasks which are in the process of being killed, this map holds the most recently created + // TaskReaper. All accesses to this map should be synchronized on the map itself (this isn't + // a ConcurrentHashMap because we use the synchronization for purposes other than simply guarding + // the integrity of the map's internal state). The purpose of this map is to prevent the creation + // of a separate TaskReaper for every killTask() of a given task. Instead, this map allows us to + // track whether an existing TaskReaper fulfills the role of a TaskReaper that we would otherwise + // create. The map key is a task id. + private val taskReaperForTask: HashMap[Long, TaskReaper] = HashMap[Long, TaskReaper]() if (!isLocal) { env.metricsSystem.registerSource(executorSource) @@ -93,6 +103,9 @@ private[spark] class Executor( // Whether to load classes in user jars before those in Spark jars private val userClassPathFirst = conf.getBoolean("spark.executor.userClassPathFirst", false) + // Whether to monitor killed / interrupted tasks + private val taskReaperEnabled = conf.getBoolean("spark.task.reaper.enabled", false) + // Create our ClassLoader // do this after SparkEnv creation so can access the SecurityManager
[GitHub] spark issue #16189: [SPARK-18761][CORE] Introduce "task reaper" to oversee t...
Github user yhuai commented on the issue: https://github.com/apache/spark/pull/16189 LGTM! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16189: [SPARK-18761][CORE] Introduce "task reaper" to oversee t...
Github user yhuai commented on the issue: https://github.com/apache/spark/pull/16189 Thank you for those comments. I am merging this to master. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16189: [SPARK-18761][CORE] Introduce "task reaper" to ov...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/16189#discussion_r93162832 --- Diff: core/src/test/scala/org/apache/spark/JobCancellationSuite.scala --- @@ -209,6 +209,83 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft assert(jobB.get() === 100) } + test("task reaper kills JVM if killed tasks keep running for too long") { +val conf = new SparkConf() + .set("spark.task.reaper.enabled", "true") + .set("spark.task.reaper.killTimeout", "5s") +sc = new SparkContext("local-cluster[2,1,1024]", "test", conf) + +// Add a listener to release the semaphore once any tasks are launched. +val sem = new Semaphore(0) +sc.addSparkListener(new SparkListener { + override def onTaskStart(taskStart: SparkListenerTaskStart) { +sem.release() + } +}) + +// jobA is the one to be cancelled. +val jobA = Future { + sc.setJobGroup("jobA", "this is a job to be cancelled", interruptOnCancel = true) + sc.parallelize(1 to 1, 2).map { i => +while (true) { } + }.count() +} + +// Block until both tasks of job A have started and cancel job A. +sem.acquire(2) +// Small delay to ensure tasks actually start executing the task body +Thread.sleep(1000) + +sc.clearJobGroup() +val jobB = sc.parallelize(1 to 100, 2).countAsync() +sc.cancelJobGroup("jobA") +val e = intercept[SparkException] { ThreadUtils.awaitResult(jobA, 15.seconds) }.getCause +assert(e.getMessage contains "cancel") + +// Once A is cancelled, job B should finish fairly quickly. +assert(ThreadUtils.awaitResult(jobB, 60.seconds) === 100) + } + + test("task reaper will not kill JVM if spark.task.killTimeout == -1") { +val conf = new SparkConf() + .set("spark.task.reaper.enabled", "true") + .set("spark.task.reaper.killTimeout", "-1") + .set("spark.task.reaper.PollingInterval", "1s") + .set("spark.deploy.maxExecutorRetries", "1") --- End diff -- We set it to 1 to make sure that we will not kill JVM, right (if we kill JVM, we will remove the application because spark.deploy.maxExecutorRetries is 1.)? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
spark git commit: [SPARK-18921][SQL] check database existence with Hive.databaseExists instead of getDatabase
Repository: spark Updated Branches: refs/heads/branch-2.1 fc1b25660 -> c1a26b458 [SPARK-18921][SQL] check database existence with Hive.databaseExists instead of getDatabase ## What changes were proposed in this pull request? It's weird that we use `Hive.getDatabase` to check the existence of a database, while Hive has a `databaseExists` interface. What's worse, `Hive.getDatabase` will produce an error message if the database doesn't exist, which is annoying when we only want to check the database existence. This PR fixes this and use `Hive.databaseExists` to check database existence. ## How was this patch tested? N/A Author: Wenchen FanCloses #16332 from cloud-fan/minor. (cherry picked from commit 7a75ee1c9224aa5c2e954fe2a71f9ad506f6782b) Signed-off-by: Yin Huai Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c1a26b45 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c1a26b45 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c1a26b45 Branch: refs/heads/branch-2.1 Commit: c1a26b458dd353be3ab1a2b3f9bb80809cf63479 Parents: fc1b256 Author: Wenchen Fan Authored: Mon Dec 19 11:42:59 2016 -0800 Committer: Yin Huai Committed: Mon Dec 19 11:43:55 2016 -0800 -- .../apache/spark/sql/hive/HiveExternalCatalog.scala| 2 +- .../org/apache/spark/sql/hive/client/HiveClient.scala | 8 +++- .../apache/spark/sql/hive/client/HiveClientImpl.scala | 12 .../apache/spark/sql/hive/client/VersionsSuite.scala | 13 +++-- 4 files changed, 19 insertions(+), 16 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c1a26b45/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala -- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index f67ddc9..f321c45 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -167,7 +167,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat } override def databaseExists(db: String): Boolean = withClient { -client.getDatabaseOption(db).isDefined +client.databaseExists(db) } override def listDatabases(): Seq[String] = withClient { http://git-wip-us.apache.org/repos/asf/spark/blob/c1a26b45/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala -- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala index 8e7c871..0be5b0b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala @@ -58,12 +58,10 @@ private[hive] trait HiveClient { def setCurrentDatabase(databaseName: String): Unit /** Returns the metadata for specified database, throwing an exception if it doesn't exist */ - final def getDatabase(name: String): CatalogDatabase = { -getDatabaseOption(name).getOrElse(throw new NoSuchDatabaseException(name)) - } + def getDatabase(name: String): CatalogDatabase - /** Returns the metadata for a given database, or None if it doesn't exist. */ - def getDatabaseOption(name: String): Option[CatalogDatabase] + /** Return whether a table/view with the specified name exists. */ + def databaseExists(dbName: String): Boolean /** List the names of all the databases that match the specified pattern. */ def listDatabases(pattern: String): Seq[String] http://git-wip-us.apache.org/repos/asf/spark/blob/c1a26b45/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala -- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index db73596..e0f7156 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -300,7 +300,7 @@ private[hive] class HiveClientImpl( } override def setCurrentDatabase(databaseName: String): Unit = withHiveState { -if (getDatabaseOption(databaseName).isDefined) { +if (databaseExists(databaseName)) { state.setCurrentDatabase(databaseName) } else { throw new
spark git commit: [SPARK-18921][SQL] check database existence with Hive.databaseExists instead of getDatabase
Repository: spark Updated Branches: refs/heads/master 24482858e -> 7a75ee1c9 [SPARK-18921][SQL] check database existence with Hive.databaseExists instead of getDatabase ## What changes were proposed in this pull request? It's weird that we use `Hive.getDatabase` to check the existence of a database, while Hive has a `databaseExists` interface. What's worse, `Hive.getDatabase` will produce an error message if the database doesn't exist, which is annoying when we only want to check the database existence. This PR fixes this and use `Hive.databaseExists` to check database existence. ## How was this patch tested? N/A Author: Wenchen FanCloses #16332 from cloud-fan/minor. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7a75ee1c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7a75ee1c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7a75ee1c Branch: refs/heads/master Commit: 7a75ee1c9224aa5c2e954fe2a71f9ad506f6782b Parents: 2448285 Author: Wenchen Fan Authored: Mon Dec 19 11:42:59 2016 -0800 Committer: Yin Huai Committed: Mon Dec 19 11:42:59 2016 -0800 -- .../apache/spark/sql/hive/HiveExternalCatalog.scala| 2 +- .../org/apache/spark/sql/hive/client/HiveClient.scala | 8 +++- .../apache/spark/sql/hive/client/HiveClientImpl.scala | 12 .../apache/spark/sql/hive/client/VersionsSuite.scala | 13 +++-- 4 files changed, 19 insertions(+), 16 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7a75ee1c/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala -- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 544f277..9c19a0e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -167,7 +167,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat } override def databaseExists(db: String): Boolean = withClient { -client.getDatabaseOption(db).isDefined +client.databaseExists(db) } override def listDatabases(): Seq[String] = withClient { http://git-wip-us.apache.org/repos/asf/spark/blob/7a75ee1c/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala -- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala index 837b6c5..8bdcf31 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala @@ -58,12 +58,10 @@ private[hive] trait HiveClient { def setCurrentDatabase(databaseName: String): Unit /** Returns the metadata for specified database, throwing an exception if it doesn't exist */ - final def getDatabase(name: String): CatalogDatabase = { -getDatabaseOption(name).getOrElse(throw new NoSuchDatabaseException(name)) - } + def getDatabase(name: String): CatalogDatabase - /** Returns the metadata for a given database, or None if it doesn't exist. */ - def getDatabaseOption(name: String): Option[CatalogDatabase] + /** Return whether a table/view with the specified name exists. */ + def databaseExists(dbName: String): Boolean /** List the names of all the databases that match the specified pattern. */ def listDatabases(pattern: String): Seq[String] http://git-wip-us.apache.org/repos/asf/spark/blob/7a75ee1c/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala -- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index b75f6e9..bacae8a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -300,7 +300,7 @@ private[hive] class HiveClientImpl( } override def setCurrentDatabase(databaseName: String): Unit = withHiveState { -if (getDatabaseOption(databaseName).isDefined) { +if (databaseExists(databaseName)) { state.setCurrentDatabase(databaseName) } else { throw new NoSuchDatabaseException(databaseName) @@ -336,14 +336,18 @@ private[hive] class HiveClientImpl(
[GitHub] spark issue #16332: [SPARK-18921][SQL] check database existence with Hive.da...
Github user yhuai commented on the issue: https://github.com/apache/spark/pull/16332 LGTM. Merging to master and branch 2.1. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16189: [SPARK-18761][CORE] Introduce "task reaper" to ov...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/16189#discussion_r92905791 --- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala --- @@ -432,6 +465,93 @@ private[spark] class Executor( } /** + * Supervises the killing / cancellation of a task by sending the interrupted flag, optionally + * sending a Thread.interrupt(), and monitoring the task until it finishes. + */ + private class TaskReaper( --- End diff -- It will be good to explain more about how this class works. It is great that we have comments at specific parts, but a section explains the workflow will be very helpful to future readers. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16189: [SPARK-18761][CORE] Introduce "task reaper" to ov...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/16189#discussion_r92905622 --- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala --- @@ -432,6 +465,93 @@ private[spark] class Executor( } /** + * Supervises the killing / cancellation of a task by sending the interrupted flag, optionally + * sending a Thread.interrupt(), and monitoring the task until it finishes. + */ + private class TaskReaper( + taskRunner: TaskRunner, + val interruptThread: Boolean) +extends Runnable { + +private[this] val taskId: Long = taskRunner.taskId + +private[this] val killPollingIntervalMs: Long = + conf.getTimeAsMs("spark.task.reaper.pollingInterval", "10s") + +private[this] val killTimeoutMs: Long = conf.getTimeAsMs("spark.task.reaper.killTimeout", "2m") + +private[this] val takeThreadDump: Boolean = + conf.getBoolean("spark.task.reaper.threadDump", true) + +override def run(): Unit = { + val startTimeMs = System.currentTimeMillis() + def elapsedTimeMs = System.currentTimeMillis() - startTimeMs + def timeoutExceeded(): Boolean = killTimeoutMs > 0 && elapsedTimeMs > killTimeoutMs + try { +// Only attempt to kill the task once. If interruptThread = false then a second kill +// attempt would be a no-op and if interruptThread = true then it may not be safe or +// effective to interrupt multiple times: +taskRunner.kill(interruptThread = interruptThread) +// Monitor the killed task until it exits: +var finished: Boolean = false +while (!finished && !timeoutExceeded()) { + taskRunner.synchronized { +// We need to synchronize on the TaskRunner while checking whether the task has +// finished in order to avoid a race where the task is marked as finished right after +// we check and before we call wait(). +if (taskRunner.isFinished) { + finished = true +} else { + taskRunner.wait(killPollingIntervalMs) --- End diff -- oh, the notifyAll is used for this, right? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16189: [SPARK-18761][CORE] Introduce "task reaper" to ov...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/16189#discussion_r92905603 --- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala --- @@ -432,6 +465,93 @@ private[spark] class Executor( } /** + * Supervises the killing / cancellation of a task by sending the interrupted flag, optionally + * sending a Thread.interrupt(), and monitoring the task until it finishes. + */ + private class TaskReaper( + taskRunner: TaskRunner, + val interruptThread: Boolean) +extends Runnable { + +private[this] val taskId: Long = taskRunner.taskId + +private[this] val killPollingIntervalMs: Long = + conf.getTimeAsMs("spark.task.reaper.pollingInterval", "10s") + +private[this] val killTimeoutMs: Long = conf.getTimeAsMs("spark.task.reaper.killTimeout", "2m") + +private[this] val takeThreadDump: Boolean = + conf.getBoolean("spark.task.reaper.threadDump", true) + +override def run(): Unit = { + val startTimeMs = System.currentTimeMillis() + def elapsedTimeMs = System.currentTimeMillis() - startTimeMs + def timeoutExceeded(): Boolean = killTimeoutMs > 0 && elapsedTimeMs > killTimeoutMs + try { +// Only attempt to kill the task once. If interruptThread = false then a second kill +// attempt would be a no-op and if interruptThread = true then it may not be safe or +// effective to interrupt multiple times: +taskRunner.kill(interruptThread = interruptThread) +// Monitor the killed task until it exits: +var finished: Boolean = false +while (!finished && !timeoutExceeded()) { + taskRunner.synchronized { +// We need to synchronize on the TaskRunner while checking whether the task has +// finished in order to avoid a race where the task is marked as finished right after +// we check and before we call wait(). +if (taskRunner.isFinished) { + finished = true +} else { + taskRunner.wait(killPollingIntervalMs) +} + } + if (taskRunner.isFinished) { +finished = true + } else { +logWarning(s"Killed task $taskId is still running after $elapsedTimeMs ms") +if (takeThreadDump) { + try { + Utils.getThreadDumpForThread(taskRunner.getThreadId).foreach { thread => + if (thread.threadName == taskRunner.threadName) { +logWarning(s"Thread dump from task $taskId:\n${thread.stackTrace}") + } +} + } catch { +case NonFatal(e) => + logWarning("Exception thrown while obtaining thread dump: ", e) + } +} + } +} + +if (!taskRunner.isFinished && timeoutExceeded()) { + if (isLocal) { +logError(s"Killed task $taskId could not be stopped within $killTimeoutMs ms; " + + "not killing JVM because we are running in local mode.") + } else { +throw new SparkException( + s"Killing executor JVM because killed task $taskId could not be stopped within " + +s"$killTimeoutMs ms.") --- End diff -- I guess I am not clear how we kill the JVM. Are we using this exception to kill the JVM? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16189: [SPARK-18761][CORE] Introduce "task reaper" to ov...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/16189#discussion_r92904925 --- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala --- @@ -229,9 +259,12 @@ private[spark] class Executor( // ClosedByInterruptException during execBackend.statusUpdate which causes // Executor to crash Thread.interrupted() + notifyAll() --- End diff -- How about we also comment the purpose of this `notifyAll`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16189: [SPARK-18761][CORE] Introduce "task reaper" to oversee t...
Github user yhuai commented on the issue: https://github.com/apache/spark/pull/16189 I am reviewing it now --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16189: [SPARK-18761][CORE] Introduce "task reaper" to oversee t...
Github user yhuai commented on the issue: https://github.com/apache/spark/pull/16189 let's trigger more tests and see if the test is flaky. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16189: [SPARK-18761][CORE] Introduce "task reaper" to oversee t...
Github user yhuai commented on the issue: https://github.com/apache/spark/pull/16189 test this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16288: [SPARK-18869][SQL] Add TreeNode.p that returns BaseType
Github user yhuai commented on the issue: https://github.com/apache/spark/pull/16288 lgtm --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16277: [SPARK-18854][SQL] numberedTreeString and apply(i) incon...
Github user yhuai commented on the issue: https://github.com/apache/spark/pull/16277 LGTM --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16268: [SPARK-18843][Core]Fix timeout in awaitResultInForkJoinS...
Github user yhuai commented on the issue: https://github.com/apache/spark/pull/16268 LGTM --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
spark git commit: [SPARK-13747][CORE] Fix potential ThreadLocal leaks in RPC when using ForkJoinPool
Repository: spark Updated Branches: refs/heads/master d53f18cae -> fb3081d3b [SPARK-13747][CORE] Fix potential ThreadLocal leaks in RPC when using ForkJoinPool ## What changes were proposed in this pull request? Some places in SQL may call `RpcEndpointRef.askWithRetry` (e.g., ParquetFileFormat.buildReader -> SparkContext.broadcast -> ... -> BlockManagerMaster.updateBlockInfo -> RpcEndpointRef.askWithRetry), which will finally call `Await.result`. It may cause `java.lang.IllegalArgumentException: spark.sql.execution.id is already set` when running in Scala ForkJoinPool. This PR includes the following changes to fix this issue: - Remove `ThreadUtils.awaitResult` - Rename `ThreadUtils. awaitResultInForkJoinSafely` to `ThreadUtils.awaitResult` - Replace `Await.result` in RpcTimeout with `ThreadUtils.awaitResult`. ## How was this patch tested? Jenkins Author: Shixiong ZhuCloses #16230 from zsxwing/fix-SPARK-13747. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fb3081d3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fb3081d3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fb3081d3 Branch: refs/heads/master Commit: fb3081d3b38a50aa5e023c603e1b191e57f7c876 Parents: d53f18c Author: Shixiong Zhu Authored: Tue Dec 13 09:53:22 2016 -0800 Committer: Yin Huai Committed: Tue Dec 13 09:53:22 2016 -0800 -- .../scala/org/apache/spark/rpc/RpcTimeout.scala | 12 ++ .../org/apache/spark/util/ThreadUtils.scala | 41 .../apache/spark/rdd/AsyncRDDActionsSuite.scala | 3 +- .../OutputCommitCoordinatorSuite.scala | 3 +- scalastyle-config.xml | 1 - .../sql/execution/basicPhysicalOperators.scala | 2 +- .../exchange/BroadcastExchangeExec.scala| 3 +- 7 files changed, 23 insertions(+), 42 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/fb3081d3/core/src/main/scala/org/apache/spark/rpc/RpcTimeout.scala -- diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcTimeout.scala b/core/src/main/scala/org/apache/spark/rpc/RpcTimeout.scala index 2761d39..efd2648 100644 --- a/core/src/main/scala/org/apache/spark/rpc/RpcTimeout.scala +++ b/core/src/main/scala/org/apache/spark/rpc/RpcTimeout.scala @@ -24,7 +24,7 @@ import scala.concurrent.duration._ import scala.util.control.NonFatal import org.apache.spark.{SparkConf, SparkException} -import org.apache.spark.util.Utils +import org.apache.spark.util.{ThreadUtils, Utils} /** * An exception thrown if RpcTimeout modifies a [[TimeoutException]]. @@ -72,15 +72,9 @@ private[spark] class RpcTimeout(val duration: FiniteDuration, val timeoutProp: S * is still not ready */ def awaitResult[T](future: Future[T]): T = { -val wrapAndRethrow: PartialFunction[Throwable, T] = { - case NonFatal(t) => -throw new SparkException("Exception thrown in awaitResult", t) -} try { - // scalastyle:off awaitresult - Await.result(future, duration) - // scalastyle:on awaitresult -} catch addMessageIfTimeout.orElse(wrapAndRethrow) + ThreadUtils.awaitResult(future, duration) +} catch addMessageIfTimeout } } http://git-wip-us.apache.org/repos/asf/spark/blob/fb3081d3/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala -- diff --git a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala index 60a6e82..1aa4456 100644 --- a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala @@ -19,7 +19,7 @@ package org.apache.spark.util import java.util.concurrent._ -import scala.concurrent.{Await, Awaitable, ExecutionContext, ExecutionContextExecutor} +import scala.concurrent.{Awaitable, ExecutionContext, ExecutionContextExecutor} import scala.concurrent.duration.Duration import scala.concurrent.forkjoin.{ForkJoinPool => SForkJoinPool, ForkJoinWorkerThread => SForkJoinWorkerThread} import scala.util.control.NonFatal @@ -180,39 +180,30 @@ private[spark] object ThreadUtils { // scalastyle:off awaitresult /** - * Preferred alternative to `Await.result()`. This method wraps and re-throws any exceptions - * thrown by the underlying `Await` call, ensuring that this thread's stack trace appears in - * logs. - */ - @throws(classOf[SparkException]) - def awaitResult[T](awaitable: Awaitable[T], atMost: Duration): T = { -try { - Await.result(awaitable, atMost) - // scalastyle:on awaitresult -} catch { - case
[GitHub] spark issue #16230: [SPARK-13747][Core]Fix potential ThreadLocal leaks in RP...
Github user yhuai commented on the issue: https://github.com/apache/spark/pull/16230 Merging to master --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16230: [SPARK-13747][Core]Fix potential ThreadLocal leaks in RP...
Github user yhuai commented on the issue: https://github.com/apache/spark/pull/16230 LGTM --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
spark git commit: [SPARK-18675][SQL] CTAS for hive serde table should work for all hive versions
Repository: spark Updated Branches: refs/heads/master 096f868b7 -> d53f18cae [SPARK-18675][SQL] CTAS for hive serde table should work for all hive versions ## What changes were proposed in this pull request? Before hive 1.1, when inserting into a table, hive will create the staging directory under a common scratch directory. After the writing is finished, hive will simply empty the table directory and move the staging directory to it. After hive 1.1, hive will create the staging directory under the table directory, and when moving staging directory to table directory, hive will still empty the table directory, but will exclude the staging directory there. In `InsertIntoHiveTable`, we simply copy the code from hive 1.2, which means we will always create the staging directory under the table directory, no matter what the hive version is. This causes problems if the hive version is prior to 1.1, because the staging directory will be removed by hive when hive is trying to empty the table directory. This PR copies the code from hive 0.13, so that we have 2 branches to create staging directory. If hive version is prior to 1.1, we'll go to the old style branch(i.e. create the staging directory under a common scratch directory), else, go to the new style branch(i.e. create the staging directory under the table directory) ## How was this patch tested? new test Author: Wenchen FanCloses #16104 from cloud-fan/hive-0.13. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d53f18ca Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d53f18ca Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d53f18ca Branch: refs/heads/master Commit: d53f18cae41c6c77a0cff3f1fd266e4c1b9ea79a Parents: 096f868 Author: Wenchen Fan Authored: Tue Dec 13 09:46:58 2016 -0800 Committer: Yin Huai Committed: Tue Dec 13 09:46:58 2016 -0800 -- .../hive/execution/InsertIntoHiveTable.scala| 68 +--- .../spark/sql/hive/client/VersionsSuite.scala | 19 +- 2 files changed, 75 insertions(+), 12 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d53f18ca/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala -- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index db2239d..82c7b1a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -22,7 +22,6 @@ import java.net.URI import java.text.SimpleDateFormat import java.util.{Date, Locale, Random} -import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.hive.common.FileUtils import org.apache.hadoop.hive.ql.exec.TaskRunner @@ -86,6 +85,7 @@ case class InsertIntoHiveTable( val hadoopConf = sessionState.newHadoopConf() val stagingDir = hadoopConf.get("hive.exec.stagingdir", ".hive-staging") + val scratchDir = hadoopConf.get("hive.exec.scratchdir", "/tmp/hive") private def executionId: String = { val rand: Random = new Random @@ -93,7 +93,7 @@ case class InsertIntoHiveTable( "hive_" + format.format(new Date) + "_" + Math.abs(rand.nextLong) } - private def getStagingDir(inputPath: Path, hadoopConf: Configuration): Path = { + private def getStagingDir(inputPath: Path): Path = { val inputPathUri: URI = inputPath.toUri val inputPathName: String = inputPathUri.getPath val fs: FileSystem = inputPath.getFileSystem(hadoopConf) @@ -121,21 +121,69 @@ case class InsertIntoHiveTable( return dir } - private def getExternalScratchDir(extURI: URI, hadoopConf: Configuration): Path = { -getStagingDir(new Path(extURI.getScheme, extURI.getAuthority, extURI.getPath), hadoopConf) + private def getExternalScratchDir(extURI: URI): Path = { +getStagingDir(new Path(extURI.getScheme, extURI.getAuthority, extURI.getPath)) } - def getExternalTmpPath(path: Path, hadoopConf: Configuration): Path = { + def getExternalTmpPath(path: Path): Path = { +import org.apache.spark.sql.hive.client.hive._ + +val hiveVersion = externalCatalog.asInstanceOf[HiveExternalCatalog].client.version +// Before Hive 1.1, when inserting into a table, Hive will create the staging directory under +// a common scratch directory. After the writing is finished, Hive will simply empty the table +// directory and move the staging directory to it. +// After Hive 1.1, Hive will
[GitHub] spark issue #16104: [SPARK-18675][SQL] CTAS for hive serde table should work...
Github user yhuai commented on the issue: https://github.com/apache/spark/pull/16104 LGTM. Thanks. Merging to master --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
spark git commit: [SPARK-18631][SQL] Changed ExchangeCoordinator re-partitioning to avoid more data skew
Repository: spark Updated Branches: refs/heads/master d57a594b8 -> f8878a4c6 [SPARK-18631][SQL] Changed ExchangeCoordinator re-partitioning to avoid more data skew ## What changes were proposed in this pull request? Re-partitioning logic in ExchangeCoordinator changed so that adding another pre-shuffle partition to the post-shuffle partition will not be done if doing so would cause the size of the post-shuffle partition to exceed the target partition size. ## How was this patch tested? Existing tests updated to reflect new expectations. Author: Mark HamstraCloses #16065 from markhamstra/SPARK-17064. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f8878a4c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f8878a4c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f8878a4c Branch: refs/heads/master Commit: f8878a4c6f7c4ebb16e4aef26ad0869ba12eb9fc Parents: d57a594 Author: Mark Hamstra Authored: Tue Nov 29 15:01:12 2016 -0800 Committer: Yin Huai Committed: Tue Nov 29 15:01:12 2016 -0800 -- .../exchange/ExchangeCoordinator.scala | 32 .../execution/ExchangeCoordinatorSuite.scala| 40 ++-- 2 files changed, 35 insertions(+), 37 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f8878a4c/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ExchangeCoordinator.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ExchangeCoordinator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ExchangeCoordinator.scala index 57da85f..deb2c24 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ExchangeCoordinator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ExchangeCoordinator.scala @@ -69,15 +69,18 @@ import org.apache.spark.sql.execution.{ShuffledRowRDD, SparkPlan} * post-shuffle partition. Once we have size statistics of pre-shuffle partitions from stages * corresponding to the registered [[ShuffleExchange]]s, we will do a pass of those statistics and * pack pre-shuffle partitions with continuous indices to a single post-shuffle partition until - * the size of a post-shuffle partition is equal or greater than the target size. + * adding another pre-shuffle partition would cause the size of a post-shuffle partition to be + * greater than the target size. + * * For example, we have two stages with the following pre-shuffle partition size statistics: * stage 1: [100 MB, 20 MB, 100 MB, 10MB, 30 MB] * stage 2: [10 MB, 10 MB, 70 MB, 5 MB, 5 MB] * assuming the target input size is 128 MB, we will have three post-shuffle partitions, * which are: - * - post-shuffle partition 0: pre-shuffle partition 0 and 1 - * - post-shuffle partition 1: pre-shuffle partition 2 - * - post-shuffle partition 2: pre-shuffle partition 3 and 4 + * - post-shuffle partition 0: pre-shuffle partition 0 (size 110 MB) + * - post-shuffle partition 1: pre-shuffle partition 1 (size 30 MB) + * - post-shuffle partition 2: pre-shuffle partition 2 (size 170 MB) + * - post-shuffle partition 3: pre-shuffle partition 3 and 4 (size 50 MB) */ class ExchangeCoordinator( numExchanges: Int, @@ -164,25 +167,20 @@ class ExchangeCoordinator( while (i < numPreShufflePartitions) { // We calculate the total size of ith pre-shuffle partitions from all pre-shuffle stages. // Then, we add the total size to postShuffleInputSize. + var nextShuffleInputSize = 0L var j = 0 while (j < mapOutputStatistics.length) { -postShuffleInputSize += mapOutputStatistics(j).bytesByPartitionId(i) +nextShuffleInputSize += mapOutputStatistics(j).bytesByPartitionId(i) j += 1 } - // If the current postShuffleInputSize is equal or greater than the - // targetPostShuffleInputSize, We need to add a new element in partitionStartIndices. - if (postShuffleInputSize >= targetPostShuffleInputSize) { -if (i < numPreShufflePartitions - 1) { - // Next start index. - partitionStartIndices += i + 1 -} else { - // This is the last element. So, we do not need to append the next start index to - // partitionStartIndices. -} + // If including the nextShuffleInputSize would exceed the target partition size, then start a + // new partition. + if (i > 0 && postShuffleInputSize + nextShuffleInputSize > targetPostShuffleInputSize) { +partitionStartIndices += i // reset postShuffleInputSize. -postShuffleInputSize = 0L - } +
[GitHub] spark issue #16065: [SPARK-18631][SQL] Changed ExchangeCoordinator re-partit...
Github user yhuai commented on the issue: https://github.com/apache/spark/pull/16065 Thanks @markhamstra Merging to master. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16065: [SPARK-18631][SQL] Changed ExchangeCoordinator re-partit...
Github user yhuai commented on the issue: https://github.com/apache/spark/pull/16065 lgtm --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14638: [SPARK-11374][SQL] Support `skip.header.line.coun...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/14638#discussion_r90098793 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala --- @@ -122,10 +126,20 @@ class HadoopTableReader( val attrsWithIndex = attributes.zipWithIndex val mutableRow = new SpecificInternalRow(attributes.map(_.dataType)) -val deserializedHadoopRDD = hadoopRDD.mapPartitions { iter => +val deserializedHadoopRDD = hadoopRDD.mapPartitionsWithIndex { (index, iter) => val hconf = broadcastedHadoopConf.value.value val deserializer = deserializerClass.newInstance() deserializer.initialize(hconf, tableDesc.getProperties) + if (skipHeaderLineCount > 0 && isTextInputFormatTable) { +val partition = hadoopRDD.partitions(index).asInstanceOf[HadoopPartition] --- End diff -- I am +1 on adding the check. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14638: [SPARK-11374][SQL] Support `skip.header.line.coun...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/14638#discussion_r90098551 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala --- @@ -122,10 +126,20 @@ class HadoopTableReader( val attrsWithIndex = attributes.zipWithIndex val mutableRow = new SpecificInternalRow(attributes.map(_.dataType)) -val deserializedHadoopRDD = hadoopRDD.mapPartitions { iter => +val deserializedHadoopRDD = hadoopRDD.mapPartitionsWithIndex { (index, iter) => val hconf = broadcastedHadoopConf.value.value val deserializer = deserializerClass.newInstance() deserializer.initialize(hconf, tableDesc.getProperties) + if (skipHeaderLineCount > 0 && isTextInputFormatTable) { +val partition = hadoopRDD.partitions(index).asInstanceOf[HadoopPartition] +if (partition.inputSplit.t.asInstanceOf[FileSplit].getStart() == 0) { --- End diff -- is `partition.inputSplit.t.asInstanceOf[FileSplit].getStart() != 0` tested (for a split that does not start from the start of the file, we do not skip)? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15979: [SPARK-18251][SQL] the type of Dataset can't be Option o...
Github user yhuai commented on the issue: https://github.com/apache/spark/pull/15979 looks good. @liancheng want to double check? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15979: [SPARK-18251][SQL] the type of Dataset can't be O...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/15979#discussion_r89936859 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala --- @@ -47,6 +47,14 @@ object ExpressionEncoder { // We convert the not-serializable TypeTag into StructType and ClassTag. val mirror = typeTag[T].mirror val tpe = typeTag[T].tpe + +if (ScalaReflection.optionOfNonFlatType(tpe)) { + throw new UnsupportedOperationException( +"Cannot create encoder for Option of non-flat type, as non-flat type is represented " + + "as a row, and the entire row can not be null in Spark SQL like normal databases. " + + "You can wrap your type with Tuple1 if you do want top level null objects.") --- End diff -- Let's provide an example in the error message to help users understand how to handle this case. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
spark git commit: [SPARK-18602] Set the version of org.codehaus.janino:commons-compiler to 3.0.0 to match the version of org.codehaus.janino:janino
Repository: spark Updated Branches: refs/heads/branch-2.1 32b259fae -> 34ad4d520 [SPARK-18602] Set the version of org.codehaus.janino:commons-compiler to 3.0.0 to match the version of org.codehaus.janino:janino ## What changes were proposed in this pull request? org.codehaus.janino:janino depends on org.codehaus.janino:commons-compiler and we have been upgraded to org.codehaus.janino:janino 3.0.0. However, seems we are still pulling in org.codehaus.janino:commons-compiler 2.7.6 because of calcite. It looks like an accident because we exclude janino from calcite (see here https://github.com/apache/spark/blob/branch-2.1/pom.xml#L1759). So, this PR upgrades org.codehaus.janino:commons-compiler to 3.0.0. ## How was this patch tested? jenkins Author: Yin Huai <yh...@databricks.com> Closes #16025 from yhuai/janino-commons-compile. (cherry picked from commit eba727757ed5dc23c635e1926795aea62ec0fc66) Signed-off-by: Yin Huai <yh...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/34ad4d52 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/34ad4d52 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/34ad4d52 Branch: refs/heads/branch-2.1 Commit: 34ad4d520ae0e4302972097c5985ab2c5a8d5e04 Parents: 32b259f Author: Yin Huai <yh...@databricks.com> Authored: Mon Nov 28 10:09:30 2016 -0800 Committer: Yin Huai <yh...@databricks.com> Committed: Mon Nov 28 10:09:50 2016 -0800 -- dev/deps/spark-deps-hadoop-2.2 | 2 +- dev/deps/spark-deps-hadoop-2.3 | 2 +- dev/deps/spark-deps-hadoop-2.4 | 2 +- dev/deps/spark-deps-hadoop-2.6 | 2 +- dev/deps/spark-deps-hadoop-2.7 | 2 +- pom.xml| 9 + sql/catalyst/pom.xml | 4 7 files changed, 18 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/34ad4d52/dev/deps/spark-deps-hadoop-2.2 -- diff --git a/dev/deps/spark-deps-hadoop-2.2 b/dev/deps/spark-deps-hadoop-2.2 index bbdea06..89bfcef 100644 --- a/dev/deps/spark-deps-hadoop-2.2 +++ b/dev/deps/spark-deps-hadoop-2.2 @@ -24,7 +24,7 @@ commons-beanutils-core-1.8.0.jar commons-cli-1.2.jar commons-codec-1.10.jar commons-collections-3.2.2.jar -commons-compiler-2.7.6.jar +commons-compiler-3.0.0.jar commons-compress-1.4.1.jar commons-configuration-1.6.jar commons-crypto-1.0.0.jar http://git-wip-us.apache.org/repos/asf/spark/blob/34ad4d52/dev/deps/spark-deps-hadoop-2.3 -- diff --git a/dev/deps/spark-deps-hadoop-2.3 b/dev/deps/spark-deps-hadoop-2.3 index a2dec41..8df3858 100644 --- a/dev/deps/spark-deps-hadoop-2.3 +++ b/dev/deps/spark-deps-hadoop-2.3 @@ -27,7 +27,7 @@ commons-beanutils-core-1.8.0.jar commons-cli-1.2.jar commons-codec-1.10.jar commons-collections-3.2.2.jar -commons-compiler-2.7.6.jar +commons-compiler-3.0.0.jar commons-compress-1.4.1.jar commons-configuration-1.6.jar commons-crypto-1.0.0.jar http://git-wip-us.apache.org/repos/asf/spark/blob/34ad4d52/dev/deps/spark-deps-hadoop-2.4 -- diff --git a/dev/deps/spark-deps-hadoop-2.4 b/dev/deps/spark-deps-hadoop-2.4 index c1f02b9..71e7fb6 100644 --- a/dev/deps/spark-deps-hadoop-2.4 +++ b/dev/deps/spark-deps-hadoop-2.4 @@ -27,7 +27,7 @@ commons-beanutils-core-1.8.0.jar commons-cli-1.2.jar commons-codec-1.10.jar commons-collections-3.2.2.jar -commons-compiler-2.7.6.jar +commons-compiler-3.0.0.jar commons-compress-1.4.1.jar commons-configuration-1.6.jar commons-crypto-1.0.0.jar http://git-wip-us.apache.org/repos/asf/spark/blob/34ad4d52/dev/deps/spark-deps-hadoop-2.6 -- diff --git a/dev/deps/spark-deps-hadoop-2.6 b/dev/deps/spark-deps-hadoop-2.6 index 4f04636..ba31391 100644 --- a/dev/deps/spark-deps-hadoop-2.6 +++ b/dev/deps/spark-deps-hadoop-2.6 @@ -31,7 +31,7 @@ commons-beanutils-core-1.8.0.jar commons-cli-1.2.jar commons-codec-1.10.jar commons-collections-3.2.2.jar -commons-compiler-2.7.6.jar +commons-compiler-3.0.0.jar commons-compress-1.4.1.jar commons-configuration-1.6.jar commons-crypto-1.0.0.jar http://git-wip-us.apache.org/repos/asf/spark/blob/34ad4d52/dev/deps/spark-deps-hadoop-2.7 -- diff --git a/dev/deps/spark-deps-hadoop-2.7 b/dev/deps/spark-deps-hadoop-2.7 index da3af9f..b129e5a 100644 --- a/dev/deps/spark-deps-hadoop-2.7 +++ b/dev/deps/spark-deps-hadoop-2.7 @@ -31,7 +31,7 @@ commons-beanutils-core-1.8.0.jar commons-cli-1.2.jar commons-codec-1.10.jar commons-collections-3.2.2.jar -commons-compiler-2.7.6.jar +commons-compiler-3.0.0.jar commons-compress-1.4.1.jar co
spark git commit: [SPARK-18602] Set the version of org.codehaus.janino:commons-compiler to 3.0.0 to match the version of org.codehaus.janino:janino
Repository: spark Updated Branches: refs/heads/master 237c3b964 -> eba727757 [SPARK-18602] Set the version of org.codehaus.janino:commons-compiler to 3.0.0 to match the version of org.codehaus.janino:janino ## What changes were proposed in this pull request? org.codehaus.janino:janino depends on org.codehaus.janino:commons-compiler and we have been upgraded to org.codehaus.janino:janino 3.0.0. However, seems we are still pulling in org.codehaus.janino:commons-compiler 2.7.6 because of calcite. It looks like an accident because we exclude janino from calcite (see here https://github.com/apache/spark/blob/branch-2.1/pom.xml#L1759). So, this PR upgrades org.codehaus.janino:commons-compiler to 3.0.0. ## How was this patch tested? jenkins Author: Yin Huai <yh...@databricks.com> Closes #16025 from yhuai/janino-commons-compile. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/eba72775 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/eba72775 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/eba72775 Branch: refs/heads/master Commit: eba727757ed5dc23c635e1926795aea62ec0fc66 Parents: 237c3b9 Author: Yin Huai <yh...@databricks.com> Authored: Mon Nov 28 10:09:30 2016 -0800 Committer: Yin Huai <yh...@databricks.com> Committed: Mon Nov 28 10:09:30 2016 -0800 -- dev/deps/spark-deps-hadoop-2.2 | 2 +- dev/deps/spark-deps-hadoop-2.3 | 2 +- dev/deps/spark-deps-hadoop-2.4 | 2 +- dev/deps/spark-deps-hadoop-2.6 | 2 +- dev/deps/spark-deps-hadoop-2.7 | 2 +- pom.xml| 9 + sql/catalyst/pom.xml | 4 7 files changed, 18 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/eba72775/dev/deps/spark-deps-hadoop-2.2 -- diff --git a/dev/deps/spark-deps-hadoop-2.2 b/dev/deps/spark-deps-hadoop-2.2 index bbdea06..89bfcef 100644 --- a/dev/deps/spark-deps-hadoop-2.2 +++ b/dev/deps/spark-deps-hadoop-2.2 @@ -24,7 +24,7 @@ commons-beanutils-core-1.8.0.jar commons-cli-1.2.jar commons-codec-1.10.jar commons-collections-3.2.2.jar -commons-compiler-2.7.6.jar +commons-compiler-3.0.0.jar commons-compress-1.4.1.jar commons-configuration-1.6.jar commons-crypto-1.0.0.jar http://git-wip-us.apache.org/repos/asf/spark/blob/eba72775/dev/deps/spark-deps-hadoop-2.3 -- diff --git a/dev/deps/spark-deps-hadoop-2.3 b/dev/deps/spark-deps-hadoop-2.3 index a2dec41..8df3858 100644 --- a/dev/deps/spark-deps-hadoop-2.3 +++ b/dev/deps/spark-deps-hadoop-2.3 @@ -27,7 +27,7 @@ commons-beanutils-core-1.8.0.jar commons-cli-1.2.jar commons-codec-1.10.jar commons-collections-3.2.2.jar -commons-compiler-2.7.6.jar +commons-compiler-3.0.0.jar commons-compress-1.4.1.jar commons-configuration-1.6.jar commons-crypto-1.0.0.jar http://git-wip-us.apache.org/repos/asf/spark/blob/eba72775/dev/deps/spark-deps-hadoop-2.4 -- diff --git a/dev/deps/spark-deps-hadoop-2.4 b/dev/deps/spark-deps-hadoop-2.4 index c1f02b9..71e7fb6 100644 --- a/dev/deps/spark-deps-hadoop-2.4 +++ b/dev/deps/spark-deps-hadoop-2.4 @@ -27,7 +27,7 @@ commons-beanutils-core-1.8.0.jar commons-cli-1.2.jar commons-codec-1.10.jar commons-collections-3.2.2.jar -commons-compiler-2.7.6.jar +commons-compiler-3.0.0.jar commons-compress-1.4.1.jar commons-configuration-1.6.jar commons-crypto-1.0.0.jar http://git-wip-us.apache.org/repos/asf/spark/blob/eba72775/dev/deps/spark-deps-hadoop-2.6 -- diff --git a/dev/deps/spark-deps-hadoop-2.6 b/dev/deps/spark-deps-hadoop-2.6 index 4f04636..ba31391 100644 --- a/dev/deps/spark-deps-hadoop-2.6 +++ b/dev/deps/spark-deps-hadoop-2.6 @@ -31,7 +31,7 @@ commons-beanutils-core-1.8.0.jar commons-cli-1.2.jar commons-codec-1.10.jar commons-collections-3.2.2.jar -commons-compiler-2.7.6.jar +commons-compiler-3.0.0.jar commons-compress-1.4.1.jar commons-configuration-1.6.jar commons-crypto-1.0.0.jar http://git-wip-us.apache.org/repos/asf/spark/blob/eba72775/dev/deps/spark-deps-hadoop-2.7 -- diff --git a/dev/deps/spark-deps-hadoop-2.7 b/dev/deps/spark-deps-hadoop-2.7 index da3af9f..b129e5a 100644 --- a/dev/deps/spark-deps-hadoop-2.7 +++ b/dev/deps/spark-deps-hadoop-2.7 @@ -31,7 +31,7 @@ commons-beanutils-core-1.8.0.jar commons-cli-1.2.jar commons-codec-1.10.jar commons-collections-3.2.2.jar -commons-compiler-2.7.6.jar +commons-compiler-3.0.0.jar commons-compress-1.4.1.jar commons-configuration-1.6.jar commons-crypto-1.0.0.jar http://git-wip-us.apache.org/repos/asf/spark/
[GitHub] spark issue #16025: [SPARK-18602] Set the version of org.codehaus.janino:com...
Github user yhuai commented on the issue: https://github.com/apache/spark/pull/16025 Thanks. Since we start to use janino 3.0.0 in spark 2.1, I am merging this pr to both master and branch 2.1. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16025: [SPARK-18602] Set the version of org.codehaus.jan...
GitHub user yhuai opened a pull request: https://github.com/apache/spark/pull/16025 [SPARK-18602] Set the version of org.codehaus.janino:commons-compiler to 3.0.0 to match the version of org.codehaus.janino:janino ## What changes were proposed in this pull request? org.codehaus.janino:janino depends on org.codehaus.janino:commons-compiler and we have been upgraded to org.codehaus.janino:janino 3.0.0. However, seems we are still pulling in org.codehaus.janino:commons-compiler 2.7.6 because of calcite. It looks like an accident because we exclude janino from calcite (see here https://github.com/apache/spark/blob/branch-2.1/pom.xml#L1759). So, this PR upgrades org.codehaus.janino:commons-compiler to 3.0.0. ## How was this patch tested? jenkins You can merge this pull request into a Git repository by running: $ git pull https://github.com/yhuai/spark janino-commons-compile Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/16025.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #16025 commit c5685f0bea849f0b7365c8fb161b8a1f0958383b Author: Yin Huai <yh...@databricks.com> Date: 2016-11-27T23:43:25Z [SPARK-18602] Set the version of org.codehaus.janino:commons-compiler to 3.0.0 to match the version of org.codehaus.janino:janino --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16025: [SPARK-18602] Set the version of org.codehaus.janino:com...
Github user yhuai commented on the issue: https://github.com/apache/spark/pull/16025 @kiszk want to take a look? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15062: SPARK-17424: Fix unsound substitution bug in ScalaReflec...
Github user yhuai commented on the issue: https://github.com/apache/spark/pull/15062 any chance that is the same issue as https://issues.apache.org/jira/browse/SPARK-17109? @rdblue When you were debugging this issue, which version of scala did you use? Scala 2.10 or Scala 2.11? If you were using scala 2.10, is it possible to try scala 2.11? Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
spark git commit: [SPARK-18360][SQL] default table path of tables in default database should depend on the location of default database
Repository: spark Updated Branches: refs/heads/branch-2.1 978798880 -> fc466be4f [SPARK-18360][SQL] default table path of tables in default database should depend on the location of default database ## What changes were proposed in this pull request? The current semantic of the warehouse config: 1. it's a static config, which means you can't change it once your spark application is launched. 2. Once a database is created, its location won't change even the warehouse path config is changed. 3. default database is a special case, although its location is fixed, but the locations of tables created in it are not. If a Spark app starts with warehouse path B(while the location of default database is A), then users create a table `tbl` in default database, its location will be `B/tbl` instead of `A/tbl`. If uses change the warehouse path config to C, and create another table `tbl2`, its location will still be `B/tbl2` instead of `C/tbl2`. rule 3 doesn't make sense and I think we made it by mistake, not intentionally. Data source tables don't follow rule 3 and treat default database like normal ones. This PR fixes hive serde tables to make it consistent with data source tables. ## How was this patch tested? HiveSparkSubmitSuite Author: Wenchen FanCloses #15812 from cloud-fan/default-db. (cherry picked from commit ce13c2672318242748f7520ed4ce6bcfad4fb428) Signed-off-by: Yin Huai Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fc466be4 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fc466be4 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fc466be4 Branch: refs/heads/branch-2.1 Commit: fc466be4fd8def06880f59d50e5567c22cc53d6a Parents: 9787988 Author: Wenchen Fan Authored: Thu Nov 17 17:31:12 2016 -0800 Committer: Yin Huai Committed: Thu Nov 17 17:31:43 2016 -0800 -- .../spark/sql/hive/HiveExternalCatalog.scala| 237 ++- .../spark/sql/hive/HiveSparkSubmitSuite.scala | 76 +- 2 files changed, 190 insertions(+), 123 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/fc466be4/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala -- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 8433058..cacffcf 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -197,136 +197,151 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat if (tableDefinition.tableType == VIEW) { client.createTable(tableDefinition, ignoreIfExists) -} else if (tableDefinition.provider.get == DDLUtils.HIVE_PROVIDER) { - // Here we follow data source tables and put table metadata like provider, schema, etc. in - // table properties, so that we can work around the Hive metastore issue about not case - // preserving and make Hive serde table support mixed-case column names. - val tableWithDataSourceProps = tableDefinition.copy( -properties = tableDefinition.properties ++ tableMetaToTableProps(tableDefinition)) - client.createTable(tableWithDataSourceProps, ignoreIfExists) } else { - // To work around some hive metastore issues, e.g. not case-preserving, bad decimal type - // support, no column nullability, etc., we should do some extra works before saving table - // metadata into Hive metastore: - // 1. Put table metadata like provider, schema, etc. in table properties. - // 2. Check if this table is hive compatible. - //2.1 If it's not hive compatible, set location URI, schema, partition columns and bucket - // spec to empty and save table metadata to Hive. - //2.2 If it's hive compatible, set serde information in table metadata and try to save - // it to Hive. If it fails, treat it as not hive compatible and go back to 2.1 - val tableProperties = tableMetaToTableProps(tableDefinition) - // Ideally we should not create a managed table with location, but Hive serde table can // specify location for managed table. And in [[CreateDataSourceTableAsSelectCommand]] we have // to create the table directory and write out data before we create this table, to avoid // exposing a partial written table. val needDefaultTableLocation = tableDefinition.tableType == MANAGED && tableDefinition.storage.locationUri.isEmpty + val
spark git commit: [SPARK-18360][SQL] default table path of tables in default database should depend on the location of default database
Repository: spark Updated Branches: refs/heads/master b0aa1aa1a -> ce13c2672 [SPARK-18360][SQL] default table path of tables in default database should depend on the location of default database ## What changes were proposed in this pull request? The current semantic of the warehouse config: 1. it's a static config, which means you can't change it once your spark application is launched. 2. Once a database is created, its location won't change even the warehouse path config is changed. 3. default database is a special case, although its location is fixed, but the locations of tables created in it are not. If a Spark app starts with warehouse path B(while the location of default database is A), then users create a table `tbl` in default database, its location will be `B/tbl` instead of `A/tbl`. If uses change the warehouse path config to C, and create another table `tbl2`, its location will still be `B/tbl2` instead of `C/tbl2`. rule 3 doesn't make sense and I think we made it by mistake, not intentionally. Data source tables don't follow rule 3 and treat default database like normal ones. This PR fixes hive serde tables to make it consistent with data source tables. ## How was this patch tested? HiveSparkSubmitSuite Author: Wenchen FanCloses #15812 from cloud-fan/default-db. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ce13c267 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ce13c267 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ce13c267 Branch: refs/heads/master Commit: ce13c2672318242748f7520ed4ce6bcfad4fb428 Parents: b0aa1aa Author: Wenchen Fan Authored: Thu Nov 17 17:31:12 2016 -0800 Committer: Yin Huai Committed: Thu Nov 17 17:31:12 2016 -0800 -- .../spark/sql/hive/HiveExternalCatalog.scala| 237 ++- .../spark/sql/hive/HiveSparkSubmitSuite.scala | 76 +- 2 files changed, 190 insertions(+), 123 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ce13c267/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala -- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 8433058..cacffcf 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -197,136 +197,151 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat if (tableDefinition.tableType == VIEW) { client.createTable(tableDefinition, ignoreIfExists) -} else if (tableDefinition.provider.get == DDLUtils.HIVE_PROVIDER) { - // Here we follow data source tables and put table metadata like provider, schema, etc. in - // table properties, so that we can work around the Hive metastore issue about not case - // preserving and make Hive serde table support mixed-case column names. - val tableWithDataSourceProps = tableDefinition.copy( -properties = tableDefinition.properties ++ tableMetaToTableProps(tableDefinition)) - client.createTable(tableWithDataSourceProps, ignoreIfExists) } else { - // To work around some hive metastore issues, e.g. not case-preserving, bad decimal type - // support, no column nullability, etc., we should do some extra works before saving table - // metadata into Hive metastore: - // 1. Put table metadata like provider, schema, etc. in table properties. - // 2. Check if this table is hive compatible. - //2.1 If it's not hive compatible, set location URI, schema, partition columns and bucket - // spec to empty and save table metadata to Hive. - //2.2 If it's hive compatible, set serde information in table metadata and try to save - // it to Hive. If it fails, treat it as not hive compatible and go back to 2.1 - val tableProperties = tableMetaToTableProps(tableDefinition) - // Ideally we should not create a managed table with location, but Hive serde table can // specify location for managed table. And in [[CreateDataSourceTableAsSelectCommand]] we have // to create the table directory and write out data before we create this table, to avoid // exposing a partial written table. val needDefaultTableLocation = tableDefinition.tableType == MANAGED && tableDefinition.storage.locationUri.isEmpty + val tableLocation = if (needDefaultTableLocation) { Some(defaultTablePath(tableDefinition.identifier)) } else {
[GitHub] spark issue #15812: [SPARK-18360][SQL] default table path of tables in defau...
Github user yhuai commented on the issue: https://github.com/apache/spark/pull/15812 LGTM. Merging to master and branch 2.1. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15922: [SPARK-18462] Fix ClassCastException in SparkListenerDri...
Github user yhuai commented on the issue: https://github.com/apache/spark/pull/15922 lgtm pending jenkins --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15703: [SPARK-18186] Migrate HiveUDAFFunction to TypedImperativ...
Github user yhuai commented on the issue: https://github.com/apache/spark/pull/15703 LGTM. Merging to master. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
spark git commit: [SPARK-18186] Migrate HiveUDAFFunction to TypedImperativeAggregate for partial aggregation support
Repository: spark Updated Branches: refs/heads/master a36a76ac4 -> 2ca8ae9aa [SPARK-18186] Migrate HiveUDAFFunction to TypedImperativeAggregate for partial aggregation support ## What changes were proposed in this pull request? While being evaluated in Spark SQL, Hive UDAFs don't support partial aggregation. This PR migrates `HiveUDAFFunction`s to `TypedImperativeAggregate`, which already provides partial aggregation support for aggregate functions that may use arbitrary Java objects as aggregation states. The following snippet shows the effect of this PR: ```scala import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax sql(s"CREATE FUNCTION hive_max AS '${classOf[GenericUDAFMax].getName}'") spark.range(100).createOrReplaceTempView("t") // A query using both Spark SQL native `max` and Hive `max` sql(s"SELECT max(id), hive_max(id) FROM t").explain() ``` Before this PR: ``` == Physical Plan == SortAggregate(key=[], functions=[max(id#1L), default.hive_max(default.hive_max, HiveFunctionWrapper(org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax,org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax7475f57e), id#1L, false, 0, 0)]) +- Exchange SinglePartition +- *Range (0, 100, step=1, splits=Some(1)) ``` After this PR: ``` == Physical Plan == SortAggregate(key=[], functions=[max(id#1L), default.hive_max(default.hive_max, HiveFunctionWrapper(org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax,org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax5e18a6a7), id#1L, false, 0, 0)]) +- Exchange SinglePartition +- SortAggregate(key=[], functions=[partial_max(id#1L), partial_default.hive_max(default.hive_max, HiveFunctionWrapper(org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax,org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax5e18a6a7), id#1L, false, 0, 0)]) +- *Range (0, 100, step=1, splits=Some(1)) ``` The tricky part of the PR is mostly about updating and passing around aggregation states of `HiveUDAFFunction`s since the aggregation state of a Hive UDAF may appear in three different forms. Let's take a look at the testing `MockUDAF` added in this PR as an example. This UDAF computes the count of non-null values together with the count of nulls of a given column. Its aggregation state may appear as the following forms at different time: 1. A `MockUDAFBuffer`, which is a concrete subclass of `GenericUDAFEvaluator.AggregationBuffer` The form used by Hive UDAF API. This form is required by the following scenarios: - Calling `GenericUDAFEvaluator.iterate()` to update an existing aggregation state with new input values. - Calling `GenericUDAFEvaluator.terminate()` to get the final aggregated value from an existing aggregation state. - Calling `GenericUDAFEvaluator.merge()` to merge other aggregation states into an existing aggregation state. The existing aggregation state to be updated must be in this form. Conversions: - To form 2: `GenericUDAFEvaluator.terminatePartial()` - To form 3: Convert to form 2 first, and then to 3. 2. An `Object[]` array containing two `java.lang.Long` values. The form used to interact with Hive's `ObjectInspector`s. This form is required by the following scenarios: - Calling `GenericUDAFEvaluator.terminatePartial()` to convert an existing aggregation state in form 1 to form 2. - Calling `GenericUDAFEvaluator.merge()` to merge other aggregation states into an existing aggregation state. The input aggregation state must be in this form. Conversions: - To form 1: No direct method. Have to create an empty `AggregationBuffer` and merge it into the empty buffer. - To form 3: `unwrapperFor()`/`unwrap()` method of `HiveInspectors` 3. The byte array that holds data of an `UnsafeRow` with two `LongType` fields. The form used by Spark SQL to shuffle partial aggregation results. This form is required because `TypedImperativeAggregate` always asks its subclasses to serialize their aggregation states into a byte array. Conversions: - To form 1: Convert to form 2 first, and then to 1. - To form 2: `wrapperFor()`/`wrap()` method of `HiveInspectors` Here're some micro-benchmark results produced by the most recent master and this PR branch. Master: ``` Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Mac OS X 10.10.5 Intel(R) Core(TM) i7-4960HQ CPU 2.60GHz hive udaf vs spark af: Best/Avg Time(ms)Rate(M/s) Per Row(ns) Relative w/o groupBy339 / 372 3.1 323.2 1.0X w/ groupBy 503 / 529 2.1 479.7 0.7X ``` This PR: ``` Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Mac OS X 10.10.5 Intel(R) Core(TM) i7-4960HQ CPU 2.60GHz hive udaf vs spark af: Best/Avg Time(ms)
[GitHub] spark issue #15703: [SPARK-18186] Migrate HiveUDAFFunction to TypedImperativ...
Github user yhuai commented on the issue: https://github.com/apache/spark/pull/15703 Code changes looks good to me. Let's also do a benchmark to sanity check our implementation. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15703: [SPARK-18186] Migrate HiveUDAFFunction to TypedIm...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/15703#discussion_r88326725 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala --- @@ -289,73 +302,75 @@ private[hive] case class HiveUDAFFunction( funcWrapper.createFunction[AbstractGenericUDAFResolver]() } + // Hive `ObjectInspector`s for all child expressions (input parameters of the function). @transient - private lazy val inspectors = children.map(toInspector).toArray + private lazy val inputInspectors = children.map(toInspector).toArray + // Spark SQL data types of input parameters. @transient - private lazy val functionAndInspector = { -val parameterInfo = new SimpleGenericUDAFParameterInfo(inspectors, false, false) -val f = resolver.getEvaluator(parameterInfo) -f -> f.init(GenericUDAFEvaluator.Mode.COMPLETE, inspectors) + private lazy val inputDataTypes: Array[DataType] = children.map(_.dataType).toArray + + private def newEvaluator(): GenericUDAFEvaluator = { +val parameterInfo = new SimpleGenericUDAFParameterInfo(inputInspectors, false, false) +resolver.getEvaluator(parameterInfo) } + // The UDAF evaluator used to consume raw input rows and produce partial aggregation results. @transient - private lazy val function = functionAndInspector._1 + private lazy val partial1ModeEvaluator = newEvaluator() --- End diff -- ok. I think in general we should avoid of using this pattern. If we have to use it now, let's explain it in the comment. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15900: [SPARK-18464][SQL] support old table which doesn'...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/15900#discussion_r88289527 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala --- @@ -1371,4 +1371,23 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv } } } + + test("SPARK-18464: support old table which doesn't store schema in table properties") { +withTable("old") { + withTempPath { path => +Seq(1 -> "a").toDF("i", "j").write.parquet(path.getAbsolutePath) +val tableDesc = CatalogTable( + identifier = TableIdentifier("old", Some("default")), + tableType = CatalogTableType.EXTERNAL, + storage = CatalogStorageFormat.empty.copy( +properties = Map("path" -> path.getAbsolutePath) + ), + schema = new StructType(), + properties = Map( +HiveExternalCatalog.DATASOURCE_PROVIDER -> "parquet")) +hiveClient.createTable(tableDesc, ignoreIfExists = false) +checkAnswer(spark.table("old"), Row(1, "a")) --- End diff -- Can we also test `describe table` and make sure it can provide correct column info? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15900: [SPARK-18464][SQL] support old table which doesn'...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/15900#discussion_r88289294 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala --- @@ -1023,6 +1023,11 @@ object HiveExternalCatalog { // After SPARK-6024, we removed this flag. // Although we are not using `spark.sql.sources.schema` any more, we need to still support. DataType.fromJson(schema.get).asInstanceOf[StructType] +} else if (props.filterKeys(_.startsWith(DATASOURCE_SCHEMA_PREFIX)).isEmpty) { + // If there is no schema information in table properties, it means the schema of this table + // was empty when saving into metastore, which is possible in older version of Spark. We + // should respect it. + new StructType() --- End diff -- btw, this function is only needed for data source tables, right? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15900: [SPARK-18464][SQL] support old table which doesn'...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/15900#discussion_r88289046 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala --- @@ -1371,4 +1371,23 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv } } } + + test("SPARK-18464: support old table which doesn't store schema in table properties") { +withTable("old") { + withTempPath { path => +Seq(1 -> "a").toDF("i", "j").write.parquet(path.getAbsolutePath) +val tableDesc = CatalogTable( + identifier = TableIdentifier("old", Some("default")), + tableType = CatalogTableType.EXTERNAL, + storage = CatalogStorageFormat.empty.copy( +properties = Map("path" -> path.getAbsolutePath) + ), + schema = new StructType(), + properties = Map( +HiveExternalCatalog.DATASOURCE_PROVIDER -> "parquet")) +hiveClient.createTable(tableDesc, ignoreIfExists = false) +checkAnswer(spark.table("old"), Row(1, "a")) + } +} + } --- End diff -- It will be good to actually create a set of compatibility tests to make sure a new version of Spark can access table metadata created by a older version (starting from Spark 1.3) without problem. Let's create a follow-up jira for this task and do it during the QA period of spark 2.1. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15703: [SPARK-18186] Migrate HiveUDAFFunction to TypedIm...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/15703#discussion_r88172060 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala --- @@ -365,4 +380,66 @@ private[hive] case class HiveUDAFFunction( val distinct = if (isDistinct) "DISTINCT " else " " s"$name($distinct${children.map(_.sql).mkString(", ")})" } + + override def createAggregationBuffer(): AggregationBuffer = +partial1ModeEvaluator.getNewAggregationBuffer + + @transient + private lazy val inputProjection = new InterpretedProjection(children) + + override def update(buffer: AggregationBuffer, input: InternalRow): Unit = { +partial1ModeEvaluator.iterate( + buffer, wrap(inputProjection(input), inputWrappers, cached, inputDataTypes)) + } + + override def merge(buffer: AggregationBuffer, input: AggregationBuffer): Unit = { +partial2ModeEvaluator.merge(buffer, partial1ModeEvaluator.terminatePartial(input)) + } + + override def eval(buffer: AggregationBuffer): Any = { +resultUnwrapper(finalModeEvaluator.terminate(buffer)) + } + + override def serialize(buffer: AggregationBuffer): Array[Byte] = { +aggBufferSerDe.serialize(buffer) + } + + override def deserialize(bytes: Array[Byte]): AggregationBuffer = { +aggBufferSerDe.deserialize(bytes) + } + + // Helper class used to de/serialize Hive UDAF `AggregationBuffer` objects + private class AggregationBufferSerDe { --- End diff -- Can we take this class out from HiveUDAFFunction? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15703: [SPARK-18186] Migrate HiveUDAFFunction to TypedIm...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/15703#discussion_r88171437 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala --- @@ -289,73 +302,75 @@ private[hive] case class HiveUDAFFunction( funcWrapper.createFunction[AbstractGenericUDAFResolver]() } + // Hive `ObjectInspector`s for all child expressions (input parameters of the function). @transient - private lazy val inspectors = children.map(toInspector).toArray + private lazy val inputInspectors = children.map(toInspector).toArray + // Spark SQL data types of input parameters. @transient - private lazy val functionAndInspector = { -val parameterInfo = new SimpleGenericUDAFParameterInfo(inspectors, false, false) -val f = resolver.getEvaluator(parameterInfo) -f -> f.init(GenericUDAFEvaluator.Mode.COMPLETE, inspectors) + private lazy val inputDataTypes: Array[DataType] = children.map(_.dataType).toArray + + private def newEvaluator(): GenericUDAFEvaluator = { +val parameterInfo = new SimpleGenericUDAFParameterInfo(inputInspectors, false, false) +resolver.getEvaluator(parameterInfo) } + // The UDAF evaluator used to consume raw input rows and produce partial aggregation results. @transient - private lazy val function = functionAndInspector._1 + private lazy val partial1ModeEvaluator = newEvaluator() + // Hive `ObjectInspector` used to inspect partial aggregation results. @transient - private lazy val wrappers = children.map(x => wrapperFor(toInspector(x), x.dataType)).toArray + private val partialResultInspector = partial1ModeEvaluator.init( +GenericUDAFEvaluator.Mode.PARTIAL1, +inputInspectors + ) + // The UDAF evaluator used to merge partial aggregation results. @transient - private lazy val returnInspector = functionAndInspector._2 + private lazy val partial2ModeEvaluator = { +val evaluator = newEvaluator() +evaluator.init(GenericUDAFEvaluator.Mode.PARTIAL2, Array(partialResultInspector)) +evaluator + } + // Spark SQL data type of partial aggregation results @transient - private lazy val unwrapper = unwrapperFor(returnInspector) + private lazy val partialResultDataType = inspectorToDataType(partialResultInspector) + // The UDAF evaluator used to compute the final result from a partial aggregation result objects. @transient - private[this] var buffer: GenericUDAFEvaluator.AggregationBuffer = _ - - override def eval(input: InternalRow): Any = unwrapper(function.evaluate(buffer)) + private lazy val finalModeEvaluator = newEvaluator() + // Hive `ObjectInspector` used to inspect the final aggregation result object. @transient - private lazy val inputProjection = new InterpretedProjection(children) + private val returnInspector = finalModeEvaluator.init( +GenericUDAFEvaluator.Mode.FINAL, +Array(partialResultInspector) + ) + // Wrapper functions used to wrap Spark SQL input arguments into Hive specific format. @transient - private lazy val cached = new Array[AnyRef](children.length) + private lazy val inputWrappers = children.map(x => wrapperFor(toInspector(x), x.dataType)).toArray + // Unwrapper function used to unwrap final aggregation result objects returned by Hive UDAFs into + // Spark SQL specific format. @transient - private lazy val inputDataTypes: Array[DataType] = children.map(_.dataType).toArray - - // Hive UDAF has its own buffer, so we don't need to occupy a slot in the aggregation - // buffer for it. - override def aggBufferSchema: StructType = StructType(Nil) - - override def update(_buffer: InternalRow, input: InternalRow): Unit = { -val inputs = inputProjection(input) -function.iterate(buffer, wrap(inputs, wrappers, cached, inputDataTypes)) - } - - override def merge(buffer1: InternalRow, buffer2: InternalRow): Unit = { -throw new UnsupportedOperationException( - "Hive UDAF doesn't support partial aggregate") - } + private lazy val resultUnwrapper = unwrapperFor(returnInspector) - override def initialize(_buffer: InternalRow): Unit = { -buffer = function.getNewAggregationBuffer - } - - override val aggBufferAttributes: Seq[AttributeReference] = Nil + @transient + private lazy val cached: Array[AnyRef] = new Array[AnyRef](children.length) - // Note: although this simply copies aggBufferAttributes, this common code can not be placed - // in the superclass because that will lead to initialization ordering issues. - override val inputAggBu
[GitHub] spark pull request #15703: [SPARK-18186] Migrate HiveUDAFFunction to TypedIm...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/15703#discussion_r88171373 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala --- @@ -289,73 +302,75 @@ private[hive] case class HiveUDAFFunction( funcWrapper.createFunction[AbstractGenericUDAFResolver]() } + // Hive `ObjectInspector`s for all child expressions (input parameters of the function). @transient - private lazy val inspectors = children.map(toInspector).toArray + private lazy val inputInspectors = children.map(toInspector).toArray + // Spark SQL data types of input parameters. @transient - private lazy val functionAndInspector = { -val parameterInfo = new SimpleGenericUDAFParameterInfo(inspectors, false, false) -val f = resolver.getEvaluator(parameterInfo) -f -> f.init(GenericUDAFEvaluator.Mode.COMPLETE, inspectors) + private lazy val inputDataTypes: Array[DataType] = children.map(_.dataType).toArray + + private def newEvaluator(): GenericUDAFEvaluator = { +val parameterInfo = new SimpleGenericUDAFParameterInfo(inputInspectors, false, false) +resolver.getEvaluator(parameterInfo) } + // The UDAF evaluator used to consume raw input rows and produce partial aggregation results. @transient - private lazy val function = functionAndInspector._1 + private lazy val partial1ModeEvaluator = newEvaluator() + // Hive `ObjectInspector` used to inspect partial aggregation results. --- End diff -- Partial aggregation result is aggregation buffer, right? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15703: [SPARK-18186] Migrate HiveUDAFFunction to TypedIm...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/15703#discussion_r88171285 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala --- @@ -289,73 +302,75 @@ private[hive] case class HiveUDAFFunction( funcWrapper.createFunction[AbstractGenericUDAFResolver]() } + // Hive `ObjectInspector`s for all child expressions (input parameters of the function). @transient - private lazy val inspectors = children.map(toInspector).toArray + private lazy val inputInspectors = children.map(toInspector).toArray --- End diff -- Let's add docs to explain when these internal vals are used (like which vals are needed for a given mode). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15703: [SPARK-18186] Migrate HiveUDAFFunction to TypedIm...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/15703#discussion_r88171097 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala --- @@ -289,73 +302,75 @@ private[hive] case class HiveUDAFFunction( funcWrapper.createFunction[AbstractGenericUDAFResolver]() } + // Hive `ObjectInspector`s for all child expressions (input parameters of the function). @transient - private lazy val inspectors = children.map(toInspector).toArray + private lazy val inputInspectors = children.map(toInspector).toArray + // Spark SQL data types of input parameters. @transient - private lazy val functionAndInspector = { -val parameterInfo = new SimpleGenericUDAFParameterInfo(inspectors, false, false) -val f = resolver.getEvaluator(parameterInfo) -f -> f.init(GenericUDAFEvaluator.Mode.COMPLETE, inspectors) + private lazy val inputDataTypes: Array[DataType] = children.map(_.dataType).toArray + + private def newEvaluator(): GenericUDAFEvaluator = { +val parameterInfo = new SimpleGenericUDAFParameterInfo(inputInspectors, false, false) +resolver.getEvaluator(parameterInfo) } + // The UDAF evaluator used to consume raw input rows and produce partial aggregation results. @transient - private lazy val function = functionAndInspector._1 + private lazy val partial1ModeEvaluator = newEvaluator() --- End diff -- Should we always call init to make the code consistent? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15703: [SPARK-18186] Migrate HiveUDAFFunction to TypedIm...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/15703#discussion_r88170694 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala --- @@ -263,8 +265,19 @@ private[hive] case class HiveGenericUDTF( } /** - * Currently we don't support partial aggregation for queries using Hive UDAF, which may hurt - * performance a lot. + * While being evaluated by Spark SQL, the aggregation state of a Hive UDAF may be in the following + * three formats: + * + * 1. a Spark SQL value, or + * 2. an instance of some concrete `GenericUDAFEvaluator.AggregationBuffer` class, or + * 3. a Java object that can be inspected using the `ObjectInspector` returned by the + * `GenericUDAFEvaluator.init()` method. --- End diff -- (is the doc below enough?) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15703: [SPARK-18186] Migrate HiveUDAFFunction to TypedIm...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/15703#discussion_r88170550 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala --- @@ -263,8 +265,19 @@ private[hive] case class HiveGenericUDTF( } /** - * Currently we don't support partial aggregation for queries using Hive UDAF, which may hurt - * performance a lot. + * While being evaluated by Spark SQL, the aggregation state of a Hive UDAF may be in the following + * three formats: + * + * 1. a Spark SQL value, or + * 2. an instance of some concrete `GenericUDAFEvaluator.AggregationBuffer` class, or + * 3. a Java object that can be inspected using the `ObjectInspector` returned by the + * `GenericUDAFEvaluator.init()` method. --- End diff -- (we can just put the pr description to here) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15703: [SPARK-18186] Migrate HiveUDAFFunction to TypedIm...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/15703#discussion_r88168751 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala --- @@ -263,8 +265,19 @@ private[hive] case class HiveGenericUDTF( } /** - * Currently we don't support partial aggregation for queries using Hive UDAF, which may hurt - * performance a lot. + * While being evaluated by Spark SQL, the aggregation state of a Hive UDAF may be in the following + * three formats: + * + * 1. a Spark SQL value, or + * 2. an instance of some concrete `GenericUDAFEvaluator.AggregationBuffer` class, or + * 3. a Java object that can be inspected using the `ObjectInspector` returned by the + * `GenericUDAFEvaluator.init()` method. --- End diff -- Besides of explaining what are these three formats, let's also explain when we will use each of them. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15703: [SPARK-18186] Migrate HiveUDAFFunction to TypedIm...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/15703#discussion_r88140760 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala --- @@ -365,4 +380,66 @@ private[hive] case class HiveUDAFFunction( val distinct = if (isDistinct) "DISTINCT " else " " s"$name($distinct${children.map(_.sql).mkString(", ")})" } + + override def createAggregationBuffer(): AggregationBuffer = +partial1ModeEvaluator.getNewAggregationBuffer + + @transient + private lazy val inputProjection = new InterpretedProjection(children) --- End diff -- Why `InterpretedProjection`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15703: [SPARK-18186] Migrate HiveUDAFFunction to TypedIm...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/15703#discussion_r88140970 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala --- @@ -365,4 +380,66 @@ private[hive] case class HiveUDAFFunction( val distinct = if (isDistinct) "DISTINCT " else " " s"$name($distinct${children.map(_.sql).mkString(", ")})" } + + override def createAggregationBuffer(): AggregationBuffer = +partial1ModeEvaluator.getNewAggregationBuffer + + @transient + private lazy val inputProjection = new InterpretedProjection(children) + + override def update(buffer: AggregationBuffer, input: InternalRow): Unit = { +partial1ModeEvaluator.iterate( + buffer, wrap(inputProjection(input), inputWrappers, cached, inputDataTypes)) + } + + override def merge(buffer: AggregationBuffer, input: AggregationBuffer): Unit = { +partial2ModeEvaluator.merge(buffer, partial1ModeEvaluator.terminatePartial(input)) --- End diff -- Let's explain what we are trying to do using `partial1ModeEvaluator.terminatePartial(input)`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org